ch: events: Read and parse cloud-hypervisor events

Implement `chReadProcessEvents` and `chProcessEvents` to read events from
event monitor FIFO file and parse them accordingly.

Signed-off-by: Purna Pavan Chandra Aekkaladevi <paekkaladevi@linux.microsoft.com>
Co-authored-by: Vineeth Pillai <viremana@linux.microsoft.com>
Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
Reviewed-by: Michal Privoznik <mprivozn@redhat.com>
This commit is contained in:
Purna Pavan Chandra Aekkaladevi 2025-01-17 17:11:07 +00:00 committed by Michal Privoznik
parent 3015c28c1e
commit 104b0036ad
4 changed files with 149 additions and 1 deletions

View File

@ -22,6 +22,7 @@ src/bhyve/bhyve_process.c
src/ch/ch_conf.c
src/ch/ch_domain.c
src/ch/ch_driver.c
src/ch/ch_events.c
src/ch/ch_hostdev.c
src/ch/ch_interface.c
src/ch/ch_monitor.c

View File

@ -29,6 +29,139 @@
VIR_LOG_INIT("ch.ch_events");
/**
* virCHProcessEvents:
* @mon: the CH monitor object
*
* Parse the events from the event buffer and process them
* Example event:
* {
* "timestamp": {
* "secs": 0,
* "nanos": 29228206
* },
* "source": "vm",
* "event": "booted",
* "properties": null
* }
*
* Returns: 0 on success, -1 on failure
*/
static int
virCHProcessEvents(virCHMonitor *mon)
{
virDomainObj *vm = mon->vm;
char *buf = mon->event_buffer.buffer;
ssize_t sz = mon->event_buffer.buf_fill_sz;
virJSONValue *obj = NULL;
int blocks = 0;
size_t i = 0;
char *json_start;
ssize_t start_index = -1;
ssize_t end_index = -1;
char tmp;
while (i < sz) {
if (buf[i] == '{') {
blocks++;
if (blocks == 1)
start_index = i;
} else if (buf[i] == '}' && blocks > 0) {
blocks--;
if (blocks == 0) {
/* valid json document */
end_index = i;
/* temporarily null terminate the JSON doc */
tmp = buf[end_index + 1];
buf[end_index + 1] = '\0';
json_start = buf + start_index;
if ((obj = virJSONValueFromString(json_start))) {
/* Process the event string (obj) here */
virJSONValueFree(obj);
} else {
VIR_ERROR(_("%1$s: Invalid JSON event doc: %2$s"),
vm->def->name, json_start);
return -1;
}
/* replace the original character */
buf[end_index + 1] = tmp;
start_index = -1;
}
}
i++;
}
if (start_index == -1) {
/* We have processed all the JSON docs in the buffer */
mon->event_buffer.buf_fill_sz = 0;
} else if (start_index > 0) {
/* We have an incomplete JSON doc at the end of the buffer
* Move it to the start of the buffer
*/
mon->event_buffer.buf_fill_sz = sz - start_index;
memmove(buf, buf+start_index, mon->event_buffer.buf_fill_sz);
}
return 0;
}
static int
virCHReadProcessEvents(virCHMonitor *mon)
{
/* Event json string must always terminate with null char.
* So, reserve one byte for '\0' at the end.
*/
size_t max_sz = CH_EVENT_BUFFER_SZ - 1;
char *buf = mon->event_buffer.buffer;
virDomainObj *vm = mon->vm;
bool incomplete = false;
size_t sz = 0;
int event_monitor_fd = mon->eventmonitorfd;
memset(buf, 0, max_sz);
do {
ssize_t ret;
ret = read(event_monitor_fd, buf + sz, max_sz - sz);
if (ret == 0 || (ret < 0 && errno == EINTR)) {
g_usleep(G_USEC_PER_SEC);
continue;
} else if (ret < 0) {
/* We should never reach here. read(2) says possible errors
* are EINTR, EAGAIN, EBADF, EFAULT, EINVAL, EIO, EISDIR
* We handle EINTR gracefully. There is some serious issue
* if we encounter any of the other errors(either in our code
* or in the system).
*/
VIR_ERROR(_("%1$s: Failed to read ch events!: %2$s"),
vm->def->name, g_strerror(errno));
return -1;
}
sz += ret;
mon->event_buffer.buf_fill_sz = sz;
if (virCHProcessEvents(mon) < 0) {
VIR_ERROR(_("%1$s: Failed to parse and process events"),
vm->def->name);
return -1;
}
if (mon->event_buffer.buf_fill_sz != 0)
incomplete = true;
else
incomplete = false;
sz = mon->event_buffer.buf_fill_sz;
} while (virDomainObjIsActive(vm) && (sz < max_sz) && incomplete);
return 0;
}
static void
virCHEventHandlerLoop(void *data)
{
@ -40,11 +173,17 @@ virCHEventHandlerLoop(void *data)
VIR_DEBUG("%s: Event handler loop thread starting", vm->def->name);
mon->event_buffer.buffer = g_new0(char, CH_EVENT_BUFFER_SZ);
mon->event_buffer.buf_fill_sz = 0;
while (g_atomic_int_get(&mon->event_handler_stop) == 0) {
VIR_DEBUG("%s: Reading events from event monitor file", vm->def->name);
/* Read and process events here */
if (virCHReadProcessEvents(mon) < 0) {
virCHStopEventHandler(mon);
}
}
g_clear_pointer(&mon->event_buffer.buffer, g_free);
virObjectUnref(vm);
VIR_DEBUG("%s: Event handler loop thread exiting", vm->def->name);
return;

View File

@ -22,5 +22,7 @@
#include "ch_monitor.h"
#define CH_EVENT_BUFFER_SZ PIPE_BUF
int virCHStartEventHandler(virCHMonitor *mon);
void virCHStopEventHandler(virCHMonitor *mon);

View File

@ -101,6 +101,12 @@ struct _virCHMonitor {
virThread event_handler_thread;
int event_handler_stop;
struct {
/* Buffer to hold the data read from pipe */
char *buffer;
/* Size of the data read from pipe into buffer */
size_t buf_fill_sz;
} event_buffer;
pid_t pid;