qemu: convert agent to use the per-VM event loop

This converts the QEMU agent APIs to use the per-VM
event loop, which involves switching from virEvent APIs
to GMainContext / GSource APIs.

A GSocket is used as a convenient way to create a GSource
for a socket, but is not yet used for actual I/O.

Reviewed-by: Michal Privoznik <mprivozn@redhat.com>
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
This commit is contained in:
Daniel P. Berrangé 2020-02-12 14:54:19 +00:00
parent 0d62faf62a
commit a18f2c52ac
4 changed files with 84 additions and 65 deletions

View File

@ -25,6 +25,7 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/time.h>
#include <gio/gio.h>
#include "qemu_agent.h"
#include "qemu_domain.h"
@ -101,7 +102,10 @@ struct _qemuAgent {
virCond notify;
int fd;
int watch;
GMainContext *context;
GSocket *socket;
GSource *watch;
bool running;
@ -172,6 +176,7 @@ static void qemuAgentDispose(void *obj)
(agent->cb->destroy)(agent, agent->vm);
virCondDestroy(&agent->notify);
VIR_FREE(agent->buffer);
g_main_context_unref(agent->context);
virResetError(&agent->lastError);
}
@ -188,13 +193,6 @@ qemuAgentOpenUnix(const char *socketpath)
return -1;
}
if (virSetNonBlock(agentfd) < 0) {
virReportSystemError(errno, "%s",
_("Unable to put monitor "
"into non-blocking mode"));
goto error;
}
if (virSetCloseExec(agentfd) < 0) {
virReportSystemError(errno, "%s",
_("Unable to set agent "
@ -498,28 +496,62 @@ qemuAgentIORead(qemuAgentPtr agent)
}
static void qemuAgentUpdateWatch(qemuAgentPtr agent)
{
int events =
VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR;
static gboolean
qemuAgentIO(GSocket *socket,
GIOCondition cond,
gpointer opaque);
if (!agent->watch)
return;
static void
qemuAgentRegister(qemuAgentPtr agent)
{
GIOCondition cond = 0;
if (agent->lastError.code == VIR_ERR_OK) {
events |= VIR_EVENT_HANDLE_READABLE;
cond |= G_IO_IN;
if (agent->msg && agent->msg->txOffset < agent->msg->txLength)
events |= VIR_EVENT_HANDLE_WRITABLE;
cond |= G_IO_OUT;
}
virEventUpdateHandle(agent->watch, events);
agent->watch = g_socket_create_source(agent->socket,
cond,
NULL);
virObjectRef(agent);
g_source_set_callback(agent->watch,
(GSourceFunc)qemuAgentIO,
agent,
(GDestroyNotify)virObjectUnref);
g_source_attach(agent->watch,
agent->context);
}
static void
qemuAgentIO(int watch, int fd, int events, void *opaque)
qemuAgentUnregister(qemuAgentPtr agent)
{
if (agent->watch) {
g_source_destroy(agent->watch);
g_source_unref(agent->watch);
agent->watch = NULL;
}
}
static void qemuAgentUpdateWatch(qemuAgentPtr agent)
{
qemuAgentUnregister(agent);
if (agent->socket)
qemuAgentRegister(agent);
}
static gboolean
qemuAgentIO(GSocket *socket G_GNUC_UNUSED,
GIOCondition cond,
gpointer opaque)
{
qemuAgentPtr agent = opaque;
bool error = false;
@ -529,45 +561,36 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
/* lock access to the agent and protect fd */
virObjectLock(agent);
#if DEBUG_IO
VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd, events);
VIR_DEBUG("Agent %p I/O on watch %d socket %p cond %d", agent, agent->socket, cond);
#endif
if (agent->fd == -1 || agent->watch == 0) {
if (agent->fd == -1 || !agent->watch) {
virObjectUnlock(agent);
virObjectUnref(agent);
return;
return G_SOURCE_REMOVE;
}
if (agent->fd != fd || agent->watch != watch) {
if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
eof = true;
virReportError(VIR_ERR_INTERNAL_ERROR,
_("event from unexpected fd %d!=%d / watch %d!=%d"),
agent->fd, fd, agent->watch, watch);
error = true;
} else if (agent->lastError.code != VIR_ERR_OK) {
if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
if (agent->lastError.code != VIR_ERR_OK) {
if (cond & (G_IO_HUP | G_IO_ERR))
eof = true;
error = true;
} else {
if (events & VIR_EVENT_HANDLE_WRITABLE) {
if (cond & G_IO_OUT) {
if (qemuAgentIOWrite(agent) < 0)
error = true;
events &= ~VIR_EVENT_HANDLE_WRITABLE;
}
if (!error &&
events & VIR_EVENT_HANDLE_READABLE) {
cond & G_IO_IN) {
int got = qemuAgentIORead(agent);
events &= ~VIR_EVENT_HANDLE_READABLE;
if (got < 0) {
error = true;
} else if (got == 0) {
eof = true;
} else {
/* Ignore hangup/error events if we read some data, to
/* Ignore hangup/error cond if we read some data, to
* give time for that data to be consumed */
events = 0;
cond = 0;
if (qemuAgentIOProcess(agent) < 0)
error = true;
@ -575,25 +598,17 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
}
if (!error &&
events & VIR_EVENT_HANDLE_HANGUP) {
cond & G_IO_HUP) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("End of file from agent socket"));
eof = true;
events &= ~VIR_EVENT_HANDLE_HANGUP;
}
if (!error && !eof &&
events & VIR_EVENT_HANDLE_ERROR) {
cond & G_IO_ERR) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Invalid file descriptor while waiting for agent"));
eof = true;
events &= ~VIR_EVENT_HANDLE_ERROR;
}
if (!error && events) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Unhandled event %d for agent fd %d"),
events, agent->fd);
error = true;
}
}
@ -649,15 +664,19 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
virObjectUnlock(agent);
virObjectUnref(agent);
}
return G_SOURCE_REMOVE;
}
qemuAgentPtr
qemuAgentOpen(virDomainObjPtr vm,
const virDomainChrSourceDef *config,
GMainContext *context,
qemuAgentCallbacksPtr cb)
{
qemuAgentPtr agent;
g_autoptr(GError) gerr = NULL;
if (!cb || !cb->eofNotify) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@ -693,22 +712,20 @@ qemuAgentOpen(virDomainObjPtr vm,
if (agent->fd == -1)
goto cleanup;
virObjectRef(agent);
if ((agent->watch = virEventAddHandle(agent->fd,
VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR |
VIR_EVENT_HANDLE_READABLE,
qemuAgentIO,
agent,
virObjectFreeCallback)) < 0) {
virObjectUnref(agent);
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("unable to register agent events"));
agent->context = g_main_context_ref(context);
agent->socket = g_socket_new_from_fd(agent->fd, &gerr);
if (!agent->socket) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Unable to create socket object: %s"),
gerr->message);
goto cleanup;
}
qemuAgentRegister(agent);
agent->running = true;
VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd, agent->watch);
VIR_DEBUG("New agent %p fd=%d", agent, agent->fd);
return agent;
@ -763,12 +780,11 @@ void qemuAgentClose(qemuAgentPtr agent)
virObjectLock(agent);
if (agent->fd >= 0) {
if (agent->watch) {
virEventRemoveHandle(agent->watch);
agent->watch = 0;
}
VIR_FORCE_CLOSE(agent->fd);
if (agent->socket) {
qemuAgentUnregister(agent);
g_object_unref(agent->socket);
agent->socket = NULL;
agent->fd = -1;
}
qemuAgentNotifyCloseLocked(agent);

View File

@ -41,6 +41,7 @@ struct _qemuAgentCallbacks {
qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm,
const virDomainChrSourceDef *config,
GMainContext *context,
qemuAgentCallbacksPtr cb);
void qemuAgentClose(qemuAgentPtr mon);

View File

@ -237,6 +237,7 @@ qemuConnectAgent(virQEMUDriverPtr driver, virDomainObjPtr vm)
agent = qemuAgentOpen(vm,
config->source,
virEventThreadGetContext(priv->eventThread),
&agentCallbacks);
virObjectLock(vm);

View File

@ -1406,6 +1406,7 @@ qemuMonitorTestNewAgent(virDomainXMLOptionPtr xmlopt)
if (!(test->agent = qemuAgentOpen(test->vm,
&src,
virEventThreadGetContext(test->eventThread),
&qemuMonitorTestAgentCallbacks)))
goto error;