qemu: convert monitor to use the per-VM event loop

This converts the QEMU monitor APIs to use the per-VM
event loop, which involves switching from virEvent APIs
to GMainContext / GSource APIs.

A GSocket is used as a convenient way to create a GSource
for a socket, but is not yet used for actual I/O.

Reviewed-by: Michal Privoznik <mprivozn@redhat.com>
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
This commit is contained in:
Daniel P. Berrangé 2020-02-12 14:54:19 +00:00
parent ba906ab1c0
commit 436a56e37d
4 changed files with 71 additions and 84 deletions

View File

@ -24,6 +24,7 @@
#include <poll.h> #include <poll.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <gio/gio.h>
#include "qemu_monitor.h" #include "qemu_monitor.h"
#include "qemu_monitor_text.h" #include "qemu_monitor_text.h"
@ -72,12 +73,9 @@ struct _qemuMonitor {
int fd; int fd;
/* Represents the watch number to be used for updating and GMainContext *context;
* unregistering the monitor @fd for events in the event loop: GSocket *socket;
* > 0: valid watch number GSource *watch;
* = 0: not registered
* < 0: an error occurred during the registration of @fd */
int watch;
virDomainObjPtr vm; virDomainObjPtr vm;
@ -227,6 +225,7 @@ qemuMonitorDispose(void *obj)
(mon->cb->destroy)(mon, mon->vm, mon->callbackOpaque); (mon->cb->destroy)(mon, mon->vm, mon->callbackOpaque);
virObjectUnref(mon->vm); virObjectUnref(mon->vm);
g_main_context_unref(mon->context);
virResetError(&mon->lastError); virResetError(&mon->lastError);
virCondDestroy(&mon->notify); virCondDestroy(&mon->notify);
VIR_FREE(mon->buffer); VIR_FREE(mon->buffer);
@ -510,27 +509,16 @@ qemuMonitorIORead(qemuMonitorPtr mon)
static void static void
qemuMonitorUpdateWatch(qemuMonitorPtr mon) qemuMonitorUpdateWatch(qemuMonitorPtr mon)
{ {
int events = qemuMonitorUnregister(mon);
VIR_EVENT_HANDLE_HANGUP | if (mon->socket)
VIR_EVENT_HANDLE_ERROR; qemuMonitorRegister(mon);
if (!mon->watch)
return;
if (mon->lastError.code == VIR_ERR_OK) {
events |= VIR_EVENT_HANDLE_READABLE;
if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) &&
!mon->waitGreeting)
events |= VIR_EVENT_HANDLE_WRITABLE;
}
virEventUpdateHandle(mon->watch, events);
} }
static void static gboolean
qemuMonitorIO(int watch, int fd, int events, void *opaque) qemuMonitorIO(GSocket *socket G_GNUC_UNUSED,
GIOCondition cond,
gpointer opaque)
{ {
qemuMonitorPtr mon = opaque; qemuMonitorPtr mon = opaque;
bool error = false; bool error = false;
@ -542,39 +530,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
/* lock access to the monitor and protect fd */ /* lock access to the monitor and protect fd */
virObjectLock(mon); virObjectLock(mon);
#if DEBUG_IO #if DEBUG_IO
VIR_DEBUG("Monitor %p I/O on watch %d fd %d events %d", mon, watch, fd, events); VIR_DEBUG("Monitor %p I/O on socket %p cond %d", mon, socket, cond);
#endif #endif
if (mon->fd == -1 || mon->watch == 0) { if (mon->fd == -1 || !mon->watch) {
virObjectUnlock(mon); virObjectUnlock(mon);
virObjectUnref(mon); virObjectUnref(mon);
return; return G_SOURCE_REMOVE;
} }
if (mon->fd != fd || mon->watch != watch) { if (mon->lastError.code != VIR_ERR_OK) {
if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) if (cond & (G_IO_HUP | G_IO_ERR))
eof = true;
virReportError(VIR_ERR_INTERNAL_ERROR,
_("event from unexpected fd %d!=%d / watch %d!=%d"),
mon->fd, fd, mon->watch, watch);
error = true;
} else if (mon->lastError.code != VIR_ERR_OK) {
if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
eof = true; eof = true;
error = true; error = true;
} else { } else {
if (events & VIR_EVENT_HANDLE_WRITABLE) { if (cond & G_IO_OUT) {
if (qemuMonitorIOWrite(mon) < 0) { if (qemuMonitorIOWrite(mon) < 0) {
error = true; error = true;
if (errno == ECONNRESET) if (errno == ECONNRESET)
hangup = true; hangup = true;
} }
events &= ~VIR_EVENT_HANDLE_WRITABLE;
} }
if (!error && if (!error && cond & G_IO_IN) {
events & VIR_EVENT_HANDLE_READABLE) {
int got = qemuMonitorIORead(mon); int got = qemuMonitorIORead(mon);
events &= ~VIR_EVENT_HANDLE_READABLE;
if (got < 0) { if (got < 0) {
error = true; error = true;
if (errno == ECONNRESET) if (errno == ECONNRESET)
@ -582,37 +560,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
} else if (got == 0) { } else if (got == 0) {
eof = true; eof = true;
} else { } else {
/* Ignore hangup/error events if we read some data, to /* Ignore hangup/error cond if we read some data, to
* give time for that data to be consumed */ * give time for that data to be consumed */
events = 0; cond = 0;
if (qemuMonitorIOProcess(mon) < 0) if (qemuMonitorIOProcess(mon) < 0)
error = true; error = true;
} }
} }
if (events & VIR_EVENT_HANDLE_HANGUP) { if (cond & G_IO_HUP) {
hangup = true; hangup = true;
if (!error) { if (!error) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("End of file from qemu monitor")); _("End of file from qemu monitor"));
eof = true; eof = true;
events &= ~VIR_EVENT_HANDLE_HANGUP;
} }
} }
if (!error && !eof && if (!error && !eof &&
events & VIR_EVENT_HANDLE_ERROR) { cond & G_IO_ERR) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Invalid file descriptor while waiting for monitor")); _("Invalid file descriptor while waiting for monitor"));
eof = true; eof = true;
events &= ~VIR_EVENT_HANDLE_ERROR;
}
if (!error && events) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Unhandled event %d for monitor fd %d"),
events, mon->fd);
error = true;
} }
} }
@ -680,16 +650,20 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
virObjectUnlock(mon); virObjectUnlock(mon);
virObjectUnref(mon); virObjectUnref(mon);
} }
return G_SOURCE_REMOVE;
} }
static qemuMonitorPtr static qemuMonitorPtr
qemuMonitorOpenInternal(virDomainObjPtr vm, qemuMonitorOpenInternal(virDomainObjPtr vm,
int fd, int fd,
GMainContext *context,
qemuMonitorCallbacksPtr cb, qemuMonitorCallbacksPtr cb,
void *opaque) void *opaque)
{ {
qemuMonitorPtr mon; qemuMonitorPtr mon;
g_autoptr(GError) gerr = NULL;
if (!cb->eofNotify) { if (!cb->eofNotify) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@ -714,6 +688,7 @@ qemuMonitorOpenInternal(virDomainObjPtr vm,
goto cleanup; goto cleanup;
} }
mon->fd = fd; mon->fd = fd;
mon->context = g_main_context_ref(context);
mon->vm = virObjectRef(vm); mon->vm = virObjectRef(vm);
mon->waitGreeting = true; mon->waitGreeting = true;
mon->cb = cb; mon->cb = cb;
@ -724,20 +699,17 @@ qemuMonitorOpenInternal(virDomainObjPtr vm,
"%s", _("Unable to set monitor close-on-exec flag")); "%s", _("Unable to set monitor close-on-exec flag"));
goto cleanup; goto cleanup;
} }
if (virSetNonBlock(mon->fd) < 0) {
mon->socket = g_socket_new_from_fd(fd, &gerr);
if (!mon->socket) {
virReportError(VIR_ERR_INTERNAL_ERROR, virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("Unable to put monitor into non-blocking mode")); _("Unable to create socket object: %s"),
gerr->message);
goto cleanup; goto cleanup;
} }
virObjectLock(mon); virObjectLock(mon);
if (!qemuMonitorRegister(mon)) { qemuMonitorRegister(mon);
virObjectUnlock(mon);
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("unable to register monitor events"));
goto cleanup;
}
PROBE(QEMU_MONITOR_NEW, PROBE(QEMU_MONITOR_NEW,
"mon=%p refs=%d fd=%d", "mon=%p refs=%d fd=%d",
@ -783,6 +755,7 @@ qemuMonitorOpen(virDomainObjPtr vm,
virDomainChrSourceDefPtr config, virDomainChrSourceDefPtr config,
bool retry, bool retry,
unsigned long long timeout, unsigned long long timeout,
GMainContext *context,
qemuMonitorCallbacksPtr cb, qemuMonitorCallbacksPtr cb,
void *opaque) void *opaque)
{ {
@ -816,7 +789,7 @@ qemuMonitorOpen(virDomainObjPtr vm,
goto cleanup; goto cleanup;
} }
ret = qemuMonitorOpenInternal(vm, fd, cb, opaque); ret = qemuMonitorOpenInternal(vm, fd, context, cb, opaque);
cleanup: cleanup:
if (!ret) if (!ret)
VIR_FORCE_CLOSE(fd); VIR_FORCE_CLOSE(fd);
@ -831,25 +804,32 @@ qemuMonitorOpen(virDomainObjPtr vm,
* *
* Registers the monitor in the event loop. The caller has to hold the * Registers the monitor in the event loop. The caller has to hold the
* lock for @mon. * lock for @mon.
*
* Returns true in case of success, false otherwise
*/ */
bool void
qemuMonitorRegister(qemuMonitorPtr mon) qemuMonitorRegister(qemuMonitorPtr mon)
{ {
virObjectRef(mon); GIOCondition cond = 0;
if ((mon->watch = virEventAddHandle(mon->fd,
VIR_EVENT_HANDLE_HANGUP | if (mon->lastError.code == VIR_ERR_OK) {
VIR_EVENT_HANDLE_ERROR | cond |= G_IO_IN;
VIR_EVENT_HANDLE_READABLE,
qemuMonitorIO, if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) &&
mon, !mon->waitGreeting)
virObjectFreeCallback)) < 0) { cond |= G_IO_OUT;
virObjectUnref(mon);
return false;
} }
return true; mon->watch = g_socket_create_source(mon->socket,
cond,
NULL);
virObjectRef(mon);
g_source_set_callback(mon->watch,
(GSourceFunc)qemuMonitorIO,
mon,
(GDestroyNotify)virObjectUnref);
g_source_attach(mon->watch,
mon->context);
} }
@ -857,8 +837,9 @@ void
qemuMonitorUnregister(qemuMonitorPtr mon) qemuMonitorUnregister(qemuMonitorPtr mon)
{ {
if (mon->watch) { if (mon->watch) {
virEventRemoveHandle(mon->watch); g_source_destroy(mon->watch);
mon->watch = 0; g_source_unref(mon->watch);
mon->watch = NULL;
} }
} }
@ -874,9 +855,11 @@ qemuMonitorClose(qemuMonitorPtr mon)
qemuMonitorSetDomainLogLocked(mon, NULL, NULL, NULL); qemuMonitorSetDomainLogLocked(mon, NULL, NULL, NULL);
if (mon->fd >= 0) { if (mon->socket) {
qemuMonitorUnregister(mon); qemuMonitorUnregister(mon);
VIR_FORCE_CLOSE(mon->fd); g_object_unref(mon->socket);
mon->socket = NULL;
mon->fd = -1;
} }
/* In case another thread is waiting for its monitor command to be /* In case another thread is waiting for its monitor command to be

View File

@ -382,11 +382,12 @@ qemuMonitorPtr qemuMonitorOpen(virDomainObjPtr vm,
virDomainChrSourceDefPtr config, virDomainChrSourceDefPtr config,
bool retry, bool retry,
unsigned long long timeout, unsigned long long timeout,
GMainContext *context,
qemuMonitorCallbacksPtr cb, qemuMonitorCallbacksPtr cb,
void *opaque) void *opaque)
ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_NONNULL(5); ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_NONNULL(5);
bool qemuMonitorRegister(qemuMonitorPtr mon) void qemuMonitorRegister(qemuMonitorPtr mon)
ATTRIBUTE_NONNULL(1); ATTRIBUTE_NONNULL(1);
void qemuMonitorUnregister(qemuMonitorPtr mon) void qemuMonitorUnregister(qemuMonitorPtr mon)
ATTRIBUTE_NONNULL(1); ATTRIBUTE_NONNULL(1);

View File

@ -1977,6 +1977,7 @@ qemuConnectMonitor(virQEMUDriverPtr driver, virDomainObjPtr vm, int asyncJob,
priv->monConfig, priv->monConfig,
retry, retry,
timeout, timeout,
virEventThreadGetContext(priv->eventThread),
&monitorCallbacks, &monitorCallbacks,
driver); driver);
@ -8597,8 +8598,9 @@ qemuProcessQMPConnectMonitor(qemuProcessQMPPtr proc)
proc->vm->pid = proc->pid; proc->vm->pid = proc->pid;
if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, 0,
0, &callbacks, NULL))) virEventThreadGetContext(proc->eventThread),
&callbacks, NULL)))
goto cleanup; goto cleanup;
virObjectLock(proc->mon); virObjectLock(proc->mon);

View File

@ -1167,6 +1167,7 @@ qemuMonitorTestNew(virDomainXMLOptionPtr xmlopt,
&src, &src,
true, true,
0, 0,
virEventThreadGetContext(test->eventThread),
&qemuMonitorTestCallbacks, &qemuMonitorTestCallbacks,
driver))) driver)))
goto error; goto error;