Remote driver & daemon impl of new event API

This wires up the remote driver to handle the new events APIs.
The public API allows an application to request a callback filters
events to a specific domain object, and register multiple callbacks
for the same event type. On the wire there are two strategies for
this

 - Register multiple callbacks with the remote daemon, each
   with filtering as needed
 - Register only one callback per event type, with no filtering

Both approaches have potential inefficiency. In the first scheme,
the same event gets sent over the wire many times if multiple
callbacks are registered. With the second scheme, unneccessary
events get sent over the wire if a per-domain filter is set on
the client. The second scheme is far easier to implement though,
so this patch takes that approach.

* daemon/dispatch.h: Don't export remoteRelayDomainEvent since it
  is no longer needed for unregistering callbacks, instead the
  unique callback ID is used
* daemon/libvirtd.c, daemon/libvirtd.h: Track and unregister
  callbacks based on callback ID, instead of function pointer
* daemon/remote.c: Switch over to using virConnectDomainEventRegisterAny
  instead of legacy virConnectDomainEventRegister function. Refactor
  remoteDispatchDomainEventSend() to cope with arbitrary event types
* src/driver.h, src/driver.c: Move verify() call into source file
  instead of header, to avoid polluting the global namespace with
  the verify function name
* src/remote/remote_driver.c: Implement new APIs for event
  registration. Refactor processCallDispatchMessage() to cope
  with arbitrary incoming event types. Merge remoteDomainQueueEvent()
  into processCallDispatchMessage() to avoid duplication of code.
  Rename remoteDomainReadEvent() to remoteDomainReadEventLifecycle()
* src/remote/remote_protocol.x: Define wire format for the new
  virConnectDomainEventRegisterAny and virConnectDomainEventDeregisterAny
  functions
This commit is contained in:
Daniel P. Berrange 2010-03-18 14:56:56 +00:00
parent cef0967e02
commit 097e07a63a
13 changed files with 385 additions and 130 deletions

View File

@ -60,16 +60,6 @@ remoteSerializeStreamError(struct qemud_client *client,
int proc, int proc,
int serial); int serial);
/* Having this here is dubious. It should be in remote.h
* but qemud.c shouldn't depend on that header directly.
* Refactor this later to deal with this properly.
*/
int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
virDomainPtr dom,
int event,
int detail,
void *opaque);
int int
remoteSendStreamData(struct qemud_client *client, remoteSendStreamData(struct qemud_client *client,

View File

@ -1275,6 +1275,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
socklen_t addrlen = (socklen_t) (sizeof addr); socklen_t addrlen = (socklen_t) (sizeof addr);
struct qemud_client *client; struct qemud_client *client;
int no_slow_start = 1; int no_slow_start = 1;
int i;
if ((fd = accept(sock->fd, (struct sockaddr *)&addr, &addrlen)) < 0) { if ((fd = accept(sock->fd, (struct sockaddr *)&addr, &addrlen)) < 0) {
char ebuf[1024]; char ebuf[1024];
@ -1346,6 +1347,10 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
memcpy (&client->addr, &addr, sizeof addr); memcpy (&client->addr, &addr, sizeof addr);
client->addrlen = addrlen; client->addrlen = addrlen;
for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) {
client->domainEventCallbackID[i] = -1;
}
/* Prepare one for packet receive */ /* Prepare one for packet receive */
if (VIR_ALLOC(client->rx) < 0) if (VIR_ALLOC(client->rx) < 0)
goto cleanup; goto cleanup;
@ -1415,7 +1420,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
if (server->nclients > server->nactiveworkers && if (server->nclients > server->nactiveworkers &&
server->nactiveworkers < server->nworkers) { server->nactiveworkers < server->nworkers) {
int i;
for (i = 0 ; i < server->nworkers ; i++) { for (i = 0 ; i < server->nworkers ; i++) {
if (!server->workers[i].hasThread) { if (!server->workers[i].hasThread) {
if (qemudStartWorker(server, &server->workers[i]) < 0) if (qemudStartWorker(server, &server->workers[i]) < 0)
@ -1454,9 +1458,17 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
} }
/* Deregister event delivery callback */ /* Deregister event delivery callback */
if (client->conn && client->domain_events_registered) { if (client->conn) {
DEBUG0("Deregistering to relay remote events"); int i;
virConnectDomainEventDeregister(client->conn, remoteRelayDomainEvent);
for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) {
if (client->domainEventCallbackID[i] != -1) {
DEBUG("Deregistering to relay remote events %d", i);
virConnectDomainEventDeregisterAny(client->conn,
client->domainEventCallbackID[i]);
}
client->domainEventCallbackID[i] = -1;
}
} }
#if HAVE_SASL #if HAVE_SASL

View File

@ -177,7 +177,7 @@ struct qemud_client {
int watch; int watch;
unsigned int readonly :1; unsigned int readonly :1;
unsigned int closing :1; unsigned int closing :1;
unsigned int domain_events_registered :1; int domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LAST];
struct sockaddr_storage addr; struct sockaddr_storage addr;
socklen_t addrlen; socklen_t addrlen;

View File

@ -94,20 +94,24 @@ const dispatch_data const *remoteGetDispatchData(int proc)
/* Prototypes */ /* Prototypes */
static void static void
remoteDispatchDomainEventSend (struct qemud_client *client, remoteDispatchDomainEventSend (struct qemud_client *client,
remote_domain_event_msg *data); int procnr,
xdrproc_t proc,
void *data);
int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED, static int remoteRelayDomainEventLifecycle(virConnectPtr conn ATTRIBUTE_UNUSED,
virDomainPtr dom, virDomainPtr dom,
int event, int event,
int detail, int detail,
void *opaque) void *opaque)
{ {
struct qemud_client *client = opaque; struct qemud_client *client = opaque;
REMOTE_DEBUG("Relaying domain event %d %d", event, detail);
if (client) {
remote_domain_event_msg data; remote_domain_event_msg data;
if (!client)
return -1;
REMOTE_DEBUG("Relaying domain lifecycle event %d %d", event, detail);
virMutexLock(&client->lock); virMutexLock(&client->lock);
/* build return data */ /* build return data */
@ -116,14 +120,22 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
data.event = event; data.event = event;
data.detail = detail; data.detail = detail;
remoteDispatchDomainEventSend (client, &data); remoteDispatchDomainEventSend (client,
REMOTE_PROC_DOMAIN_EVENT,
(xdrproc_t)xdr_remote_domain_event_msg, &data);
virMutexUnlock(&client->lock); virMutexUnlock(&client->lock);
}
return 0; return 0;
} }
static virConnectDomainEventGenericCallback domainEventCallbacks[VIR_DOMAIN_EVENT_ID_LAST] = {
VIR_DOMAIN_EVENT_CALLBACK(remoteRelayDomainEventLifecycle),
};
verify(ARRAY_CARDINALITY(domainEventCallbacks) == VIR_DOMAIN_EVENT_ID_LAST);
/*----- Functions. -----*/ /*----- Functions. -----*/
static int static int
@ -4850,18 +4862,24 @@ remoteDispatchDomainEventsRegister (struct qemud_server *server ATTRIBUTE_UNUSED
remote_domain_events_register_ret *ret ATTRIBUTE_UNUSED) remote_domain_events_register_ret *ret ATTRIBUTE_UNUSED)
{ {
CHECK_CONN(client); CHECK_CONN(client);
int callbackID;
if (virConnectDomainEventRegister(conn, if (client->domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LIFECYCLE] != -1) {
remoteRelayDomainEvent, remoteDispatchFormatError(rerr, _("domain event %d already registered"), VIR_DOMAIN_EVENT_ID_LIFECYCLE);
client, NULL) < 0) { return -1;
}
if ((callbackID = virConnectDomainEventRegisterAny(conn,
NULL,
VIR_DOMAIN_EVENT_ID_LIFECYCLE,
VIR_DOMAIN_EVENT_CALLBACK(remoteRelayDomainEventLifecycle),
client, NULL)) < 0) {
remoteDispatchConnError(rerr, conn); remoteDispatchConnError(rerr, conn);
return -1; return -1;
} }
if (ret) client->domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LIFECYCLE] = callbackID;
ret->cb_registered = 1;
client->domain_events_registered = 1;
return 0; return 0;
} }
@ -4876,21 +4894,26 @@ remoteDispatchDomainEventsDeregister (struct qemud_server *server ATTRIBUTE_UNUS
{ {
CHECK_CONN(client); CHECK_CONN(client);
if (virConnectDomainEventDeregister(conn, remoteRelayDomainEvent) < 0) { if (client->domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LIFECYCLE] == -1) {
remoteDispatchFormatError(rerr, _("domain event %d not registered"), VIR_DOMAIN_EVENT_ID_LIFECYCLE);
return -1;
}
if (virConnectDomainEventDeregisterAny(conn,
client->domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LIFECYCLE]) < 0) {
remoteDispatchConnError(rerr, conn); remoteDispatchConnError(rerr, conn);
return -1; return -1;
} }
if (ret) client->domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LIFECYCLE] = -1;
ret->cb_registered = 0;
client->domain_events_registered = 0;
return 0; return 0;
} }
static void static void
remoteDispatchDomainEventSend (struct qemud_client *client, remoteDispatchDomainEventSend (struct qemud_client *client,
remote_domain_event_msg *data) int procnr,
xdrproc_t proc,
void *data)
{ {
struct qemud_client_message *msg = NULL; struct qemud_client_message *msg = NULL;
XDR xdr; XDR xdr;
@ -4901,7 +4924,7 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
msg->hdr.prog = REMOTE_PROGRAM; msg->hdr.prog = REMOTE_PROGRAM;
msg->hdr.vers = REMOTE_PROTOCOL_VERSION; msg->hdr.vers = REMOTE_PROTOCOL_VERSION;
msg->hdr.proc = REMOTE_PROC_DOMAIN_EVENT; msg->hdr.proc = procnr;
msg->hdr.type = REMOTE_MESSAGE; msg->hdr.type = REMOTE_MESSAGE;
msg->hdr.serial = 1; msg->hdr.serial = 1;
msg->hdr.status = REMOTE_OK; msg->hdr.status = REMOTE_OK;
@ -4919,8 +4942,10 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
if (xdr_setpos (&xdr, msg->bufferOffset) == 0) if (xdr_setpos (&xdr, msg->bufferOffset) == 0)
goto xdr_error; goto xdr_error;
if (!xdr_remote_domain_event_msg(&xdr, data)) if (!(proc)(&xdr, data)) {
VIR_WARN("Failed to serialize domain event %d", procnr);
goto xdr_error; goto xdr_error;
}
/* Update length word to include payload*/ /* Update length word to include payload*/
len = msg->bufferOffset = xdr_getpos (&xdr); len = msg->bufferOffset = xdr_getpos (&xdr);
@ -5524,6 +5549,78 @@ remoteDispatchDomainMigrateSetMaxDowntime(struct qemud_server *server ATTRIBUTE_
} }
static int
remoteDispatchDomainEventsRegisterAny (struct qemud_server *server ATTRIBUTE_UNUSED,
struct qemud_client *client ATTRIBUTE_UNUSED,
virConnectPtr conn,
remote_message_header *hdr ATTRIBUTE_UNUSED,
remote_error *rerr ATTRIBUTE_UNUSED,
remote_domain_events_register_any_args *args,
void *ret ATTRIBUTE_UNUSED)
{
CHECK_CONN(client);
int callbackID;
if (args->eventID >= VIR_DOMAIN_EVENT_ID_LAST ||
args->eventID < 0) {
remoteDispatchFormatError(rerr, _("unsupported event ID %d"), args->eventID);
return -1;
}
if (client->domainEventCallbackID[args->eventID] != -1) {
remoteDispatchFormatError(rerr, _("domain event %d already registered"), args->eventID);
return -1;
}
if ((callbackID = virConnectDomainEventRegisterAny(conn,
NULL,
args->eventID,
domainEventCallbacks[args->eventID],
client, NULL)) < 0) {
remoteDispatchConnError(rerr, conn);
return -1;
}
client->domainEventCallbackID[args->eventID] = callbackID;
return 0;
}
static int
remoteDispatchDomainEventsDeregisterAny (struct qemud_server *server ATTRIBUTE_UNUSED,
struct qemud_client *client ATTRIBUTE_UNUSED,
virConnectPtr conn,
remote_message_header *hdr ATTRIBUTE_UNUSED,
remote_error *rerr ATTRIBUTE_UNUSED,
remote_domain_events_deregister_any_args *args,
void *ret ATTRIBUTE_UNUSED)
{
CHECK_CONN(client);
int callbackID = -1;
if (args->eventID >= VIR_DOMAIN_EVENT_ID_LAST ||
args->eventID < 0) {
remoteDispatchFormatError(rerr, _("unsupported event ID %d"), args->eventID);
return -1;
}
callbackID = client->domainEventCallbackID[args->eventID];
if (callbackID < 0) {
remoteDispatchFormatError(rerr, _("domain event %d not registered"), args->eventID);
return -1;
}
if (virConnectDomainEventDeregisterAny(conn, callbackID) < 0) {
remoteDispatchConnError(rerr, conn);
return -1;
}
client->domainEventCallbackID[args->eventID] = -1;
return 0;
}
/*----- Helpers. -----*/ /*----- Helpers. -----*/
/* get_nonnull_domain and get_nonnull_network turn an on-wire /* get_nonnull_domain and get_nonnull_network turn an on-wire

View File

@ -142,3 +142,5 @@
remote_domain_abort_job_args val_remote_domain_abort_job_args; remote_domain_abort_job_args val_remote_domain_abort_job_args;
remote_storage_vol_wipe_args val_remote_storage_vol_wipe_args; remote_storage_vol_wipe_args val_remote_storage_vol_wipe_args;
remote_domain_migrate_set_max_downtime_args val_remote_domain_migrate_set_max_downtime_args; remote_domain_migrate_set_max_downtime_args val_remote_domain_migrate_set_max_downtime_args;
remote_domain_events_register_any_args val_remote_domain_events_register_any_args;
remote_domain_events_deregister_any_args val_remote_domain_events_deregister_any_args;

View File

@ -178,6 +178,14 @@ static int remoteDispatchDomainEventsDeregister(
remote_error *err, remote_error *err,
void *args, void *args,
remote_domain_events_deregister_ret *ret); remote_domain_events_deregister_ret *ret);
static int remoteDispatchDomainEventsDeregisterAny(
struct qemud_server *server,
struct qemud_client *client,
virConnectPtr conn,
remote_message_header *hdr,
remote_error *err,
remote_domain_events_deregister_any_args *args,
void *ret);
static int remoteDispatchDomainEventsRegister( static int remoteDispatchDomainEventsRegister(
struct qemud_server *server, struct qemud_server *server,
struct qemud_client *client, struct qemud_client *client,
@ -186,6 +194,14 @@ static int remoteDispatchDomainEventsRegister(
remote_error *err, remote_error *err,
void *args, void *args,
remote_domain_events_register_ret *ret); remote_domain_events_register_ret *ret);
static int remoteDispatchDomainEventsRegisterAny(
struct qemud_server *server,
struct qemud_client *client,
virConnectPtr conn,
remote_message_header *hdr,
remote_error *err,
remote_domain_events_register_any_args *args,
void *ret);
static int remoteDispatchDomainGetAutostart( static int remoteDispatchDomainGetAutostart(
struct qemud_server *server, struct qemud_server *server,
struct qemud_client *client, struct qemud_client *client,

View File

@ -837,3 +837,13 @@
.args_filter = (xdrproc_t) xdr_remote_domain_migrate_set_max_downtime_args, .args_filter = (xdrproc_t) xdr_remote_domain_migrate_set_max_downtime_args,
.ret_filter = (xdrproc_t) xdr_void, .ret_filter = (xdrproc_t) xdr_void,
}, },
{ /* DomainEventsRegisterAny => 167 */
.fn = (dispatch_fn) remoteDispatchDomainEventsRegisterAny,
.args_filter = (xdrproc_t) xdr_remote_domain_events_register_any_args,
.ret_filter = (xdrproc_t) xdr_void,
},
{ /* DomainEventsDeregisterAny => 168 */
.fn = (dispatch_fn) remoteDispatchDomainEventsDeregisterAny,
.args_filter = (xdrproc_t) xdr_remote_domain_events_deregister_any_args,
.ret_filter = (xdrproc_t) xdr_void,
},

View File

@ -31,6 +31,10 @@
#define DEFAULT_DRIVER_DIR LIBDIR "/libvirt/drivers" #define DEFAULT_DRIVER_DIR LIBDIR "/libvirt/drivers"
/* Make sure ... INTERNAL_CALL can not be set by the caller */
verify((VIR_SECRET_GET_VALUE_INTERNAL_CALL &
VIR_SECRET_GET_VALUE_FLAGS_MASK) == 0);
#ifdef WITH_DRIVER_MODULES #ifdef WITH_DRIVER_MODULES
/* XXX re-implment this for other OS, or use libtools helper lib ? */ /* XXX re-implment this for other OS, or use libtools helper lib ? */

View File

@ -908,10 +908,6 @@ enum {
VIR_SECRET_GET_VALUE_INTERNAL_CALL = 1 << 16 VIR_SECRET_GET_VALUE_INTERNAL_CALL = 1 << 16
}; };
/* Make sure ... INTERNAL_CALL can not be set by the caller */
verify((VIR_SECRET_GET_VALUE_INTERNAL_CALL &
VIR_SECRET_GET_VALUE_FLAGS_MASK) == 0);
typedef virSecretPtr typedef virSecretPtr
(*virDrvSecretLookupByUUID) (virConnectPtr conn, (*virDrvSecretLookupByUUID) (virConnectPtr conn,
const unsigned char *uuid); const unsigned char *uuid);

View File

@ -260,7 +260,6 @@ static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, vi
static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src); static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
static void make_nonnull_secret (remote_nonnull_secret *secret_dst, virSecretPtr secret_src); static void make_nonnull_secret (remote_nonnull_secret *secret_dst, virSecretPtr secret_src);
void remoteDomainEventFired(int watch, int fd, int event, void *data); void remoteDomainEventFired(int watch, int fd, int event, void *data);
static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
void remoteDomainEventQueueFlush(int timer, void *opaque); void remoteDomainEventQueueFlush(int timer, void *opaque);
/*----------------------------------------------------------------------*/ /*----------------------------------------------------------------------*/
@ -6830,7 +6829,7 @@ static int remoteDomainEventRegister (virConnectPtr conn,
goto done; goto done;
} }
if ( priv->callbackList->count == 1 ) { if (virDomainEventCallbackListCountID(conn, priv->callbackList, VIR_DOMAIN_EVENT_ID_LIFECYCLE) == 1) {
/* Tell the server when we are the first callback deregistering */ /* Tell the server when we are the first callback deregistering */
if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_REGISTER, if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_REGISTER,
(xdrproc_t) xdr_void, (char *) NULL, (xdrproc_t) xdr_void, (char *) NULL,
@ -6865,15 +6864,15 @@ static int remoteDomainEventDeregister (virConnectPtr conn,
error (conn, VIR_ERR_RPC, _("removing cb from list")); error (conn, VIR_ERR_RPC, _("removing cb from list"));
goto done; goto done;
} }
}
if ( priv->callbackList->count == 0 ) { if (virDomainEventCallbackListCountID(conn, priv->callbackList, VIR_DOMAIN_EVENT_ID_LIFECYCLE) == 0) {
/* Tell the server when we are the last callback deregistering */ /* Tell the server when we are the last callback deregistering */
if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER, if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER,
(xdrproc_t) xdr_void, (char *) NULL, (xdrproc_t) xdr_void, (char *) NULL,
(xdrproc_t) xdr_void, (char *) NULL) == -1) (xdrproc_t) xdr_void, (char *) NULL) == -1)
goto done; goto done;
} }
}
rv = 0; rv = 0;
@ -6882,6 +6881,38 @@ done:
return rv; return rv;
} }
/**
* remoteDomainReadEventLifecycle
*
* Read the domain lifecycle event data off the wire
*/
static virDomainEventPtr
remoteDomainReadEventLifecycle(virConnectPtr conn, XDR *xdr)
{
remote_domain_event_msg msg;
virDomainPtr dom;
virDomainEventPtr event = NULL;
memset (&msg, 0, sizeof msg);
/* unmarshall parameters, and process it*/
if (! xdr_remote_domain_event_msg(xdr, &msg) ) {
error (conn, VIR_ERR_RPC,
_("unable to demarshall lifecycle event"));
return NULL;
}
dom = get_nonnull_domain(conn,msg.dom);
if (!dom)
return NULL;
event = virDomainEventNewFromDom(dom, msg.event, msg.detail);
xdr_free ((xdrproc_t) &xdr_remote_domain_event_msg, (char *) &msg);
virDomainFree(dom);
return event;
}
static virDrvOpenStatus ATTRIBUTE_NONNULL (1) static virDrvOpenStatus ATTRIBUTE_NONNULL (1)
remoteSecretOpen (virConnectPtr conn, remoteSecretOpen (virConnectPtr conn,
virConnectAuthPtr auth, virConnectAuthPtr auth,
@ -7763,6 +7794,101 @@ done:
} }
static int remoteDomainEventRegisterAny(virConnectPtr conn,
virDomainPtr dom,
int eventID,
virConnectDomainEventGenericCallback callback,
void *opaque,
virFreeCallback freecb)
{
int rv = -1;
struct private_data *priv = conn->privateData;
remote_domain_events_register_any_args args;
int callbackID;
remoteDriverLock(priv);
if (priv->eventFlushTimer < 0) {
error (conn, VIR_ERR_NO_SUPPORT, _("no event support"));
goto done;
}
if ((callbackID = virDomainEventCallbackListAddID(conn, priv->callbackList,
dom, eventID,
callback, opaque, freecb)) < 0) {
error (conn, VIR_ERR_RPC, _("adding cb to list"));
goto done;
}
/* If this is the first callback for this eventID, we need to enable
* events on the server */
if (virDomainEventCallbackListCountID(conn, priv->callbackList, eventID) == 1) {
args.eventID = eventID;
if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_REGISTER_ANY,
(xdrproc_t) xdr_remote_domain_events_register_any_args, (char *) &args,
(xdrproc_t) xdr_void, (char *)NULL) == -1) {
virDomainEventCallbackListRemoveID(conn, priv->callbackList, callbackID);
goto done;
}
}
rv = callbackID;
done:
remoteDriverUnlock(priv);
return rv;
}
static int remoteDomainEventDeregisterAny(virConnectPtr conn,
int callbackID)
{
struct private_data *priv = conn->privateData;
int rv = -1;
remote_domain_events_deregister_any_args args;
int eventID;
remoteDriverLock(priv);
if ((eventID = virDomainEventCallbackListEventID(conn, priv->callbackList, callbackID)) < 0) {
errorf (conn, VIR_ERR_RPC, _("unable to find callback ID %d"), callbackID);
goto done;
}
if (priv->domainEventDispatching) {
if (virDomainEventCallbackListMarkDeleteID(conn, priv->callbackList,
callbackID) < 0) {
error (conn, VIR_ERR_RPC, _("marking cb for deletion"));
goto done;
}
} else {
if (virDomainEventCallbackListRemoveID(conn, priv->callbackList,
callbackID) < 0) {
error (conn, VIR_ERR_RPC, _("removing cb from list"));
goto done;
}
}
/* If that was the last callback for this eventID, we need to disable
* events on the server */
if (virDomainEventCallbackListCountID(conn, priv->callbackList, eventID) == 0) {
args.eventID = eventID;
if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER_ANY,
(xdrproc_t) xdr_remote_domain_events_deregister_any_args, (char *) &args,
(xdrproc_t) xdr_void, (char *) NULL) == -1)
goto done;
}
rv = 0;
done:
remoteDriverUnlock(priv);
return rv;
}
/*----------------------------------------------------------------------*/ /*----------------------------------------------------------------------*/
@ -8314,6 +8440,7 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
int in_open, int in_open,
remote_message_header *hdr, remote_message_header *hdr,
XDR *xdr) { XDR *xdr) {
virDomainEventPtr event = NULL;
/* An async message has come in while we were waiting for the /* An async message has come in while we were waiting for the
* response. Process it to pull it off the wire, and try again * response. Process it to pull it off the wire, and try again
*/ */
@ -8324,13 +8451,26 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
return -1; return -1;
} }
if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) { switch (hdr->proc) {
remoteDomainQueueEvent(conn, xdr); case REMOTE_PROC_DOMAIN_EVENT:
virEventUpdateTimeout(priv->eventFlushTimer, 0); event = remoteDomainReadEventLifecycle(conn, xdr);
} else { break;
return -1;
default:
DEBUG("Unexpected event proc %d", hdr->proc); DEBUG("Unexpected event proc %d", hdr->proc);
break;
} }
if (!event)
return -1;
if (virDomainEventQueuePush(priv->domainEvents,
event) < 0) {
DEBUG0("Error adding event to queue");
virDomainEventFree(event);
}
virEventUpdateTimeout(priv->eventFlushTimer, 0);
return 0; return 0;
} }
@ -8860,54 +9000,6 @@ call (virConnectPtr conn, struct private_data *priv,
} }
/**
* remoteDomainReadEvent
*
* Read the event data off the wire
*/
static virDomainEventPtr
remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
{
remote_domain_event_msg msg;
virDomainPtr dom;
virDomainEventPtr event = NULL;
memset (&msg, 0, sizeof msg);
/* unmarshall parameters, and process it*/
if (! xdr_remote_domain_event_msg(xdr, &msg) ) {
error (conn, VIR_ERR_RPC,
_("remoteDomainProcessEvent: unmarshalling msg"));
return NULL;
}
dom = get_nonnull_domain(conn,msg.dom);
if (!dom)
return NULL;
event = virDomainEventNewFromDom(dom, msg.event, msg.detail);
virDomainFree(dom);
return event;
}
static void
remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
{
struct private_data *priv = conn->privateData;
virDomainEventPtr event;
event = remoteDomainReadEvent(conn, xdr);
if (!event)
return;
if (virDomainEventQueuePush(priv->domainEvents,
event) < 0) {
DEBUG0("Error adding event to queue");
virDomainEventFree(event);
}
}
/** remoteDomainEventFired: /** remoteDomainEventFired:
* *
* The callback for monitoring the remote socket * The callback for monitoring the remote socket
@ -8989,14 +9081,6 @@ remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
/* Purge any deleted callbacks */ /* Purge any deleted callbacks */
virDomainEventCallbackListPurgeMarked(priv->callbackList); virDomainEventCallbackListPurgeMarked(priv->callbackList);
if ( priv->callbackList->count == 0 ) {
/* Tell the server when we are the last callback deregistering */
if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER,
(xdrproc_t) xdr_void, (char *) NULL,
(xdrproc_t) xdr_void, (char *) NULL) == -1)
VIR_WARN0("Failed to de-register events");
}
priv->domainEventDispatching = 0; priv->domainEventDispatching = 0;
remoteDriverUnlock(priv); remoteDriverUnlock(priv);
@ -9189,8 +9273,8 @@ static virDriver remote_driver = {
remoteDomainGetJobInfo, /* domainGetJobInfo */ remoteDomainGetJobInfo, /* domainGetJobInfo */
remoteDomainAbortJob, /* domainFinishJob */ remoteDomainAbortJob, /* domainFinishJob */
remoteDomainMigrateSetMaxDowntime, /* domainMigrateSetMaxDowntime */ remoteDomainMigrateSetMaxDowntime, /* domainMigrateSetMaxDowntime */
NULL, /* domainEventRegisterAny */ remoteDomainEventRegisterAny, /* domainEventRegisterAny */
NULL, /* domainEventDeregisterAny */ remoteDomainEventDeregisterAny, /* domainEventDeregisterAny */
}; };
static virNetworkDriver network_driver = { static virNetworkDriver network_driver = {

View File

@ -3032,6 +3032,24 @@ xdr_remote_domain_migrate_set_max_downtime_args (XDR *xdrs, remote_domain_migrat
return TRUE; return TRUE;
} }
bool_t
xdr_remote_domain_events_register_any_args (XDR *xdrs, remote_domain_events_register_any_args *objp)
{
if (!xdr_int (xdrs, &objp->eventID))
return FALSE;
return TRUE;
}
bool_t
xdr_remote_domain_events_deregister_any_args (XDR *xdrs, remote_domain_events_deregister_any_args *objp)
{
if (!xdr_int (xdrs, &objp->eventID))
return FALSE;
return TRUE;
}
bool_t bool_t
xdr_remote_procedure (XDR *xdrs, remote_procedure *objp) xdr_remote_procedure (XDR *xdrs, remote_procedure *objp)
{ {

View File

@ -1717,6 +1717,16 @@ struct remote_domain_migrate_set_max_downtime_args {
u_int flags; u_int flags;
}; };
typedef struct remote_domain_migrate_set_max_downtime_args remote_domain_migrate_set_max_downtime_args; typedef struct remote_domain_migrate_set_max_downtime_args remote_domain_migrate_set_max_downtime_args;
struct remote_domain_events_register_any_args {
int eventID;
};
typedef struct remote_domain_events_register_any_args remote_domain_events_register_any_args;
struct remote_domain_events_deregister_any_args {
int eventID;
};
typedef struct remote_domain_events_deregister_any_args remote_domain_events_deregister_any_args;
#define REMOTE_PROGRAM 0x20008086 #define REMOTE_PROGRAM 0x20008086
#define REMOTE_PROTOCOL_VERSION 1 #define REMOTE_PROTOCOL_VERSION 1
@ -1887,6 +1897,8 @@ enum remote_procedure {
REMOTE_PROC_DOMAIN_ABORT_JOB = 164, REMOTE_PROC_DOMAIN_ABORT_JOB = 164,
REMOTE_PROC_STORAGE_VOL_WIPE = 165, REMOTE_PROC_STORAGE_VOL_WIPE = 165,
REMOTE_PROC_DOMAIN_MIGRATE_SET_MAX_DOWNTIME = 166, REMOTE_PROC_DOMAIN_MIGRATE_SET_MAX_DOWNTIME = 166,
REMOTE_PROC_DOMAIN_EVENTS_REGISTER_ANY = 167,
REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER_ANY = 168,
}; };
typedef enum remote_procedure remote_procedure; typedef enum remote_procedure remote_procedure;
@ -2198,6 +2210,8 @@ extern bool_t xdr_remote_domain_get_job_info_args (XDR *, remote_domain_get_job
extern bool_t xdr_remote_domain_get_job_info_ret (XDR *, remote_domain_get_job_info_ret*); extern bool_t xdr_remote_domain_get_job_info_ret (XDR *, remote_domain_get_job_info_ret*);
extern bool_t xdr_remote_domain_abort_job_args (XDR *, remote_domain_abort_job_args*); extern bool_t xdr_remote_domain_abort_job_args (XDR *, remote_domain_abort_job_args*);
extern bool_t xdr_remote_domain_migrate_set_max_downtime_args (XDR *, remote_domain_migrate_set_max_downtime_args*); extern bool_t xdr_remote_domain_migrate_set_max_downtime_args (XDR *, remote_domain_migrate_set_max_downtime_args*);
extern bool_t xdr_remote_domain_events_register_any_args (XDR *, remote_domain_events_register_any_args*);
extern bool_t xdr_remote_domain_events_deregister_any_args (XDR *, remote_domain_events_deregister_any_args*);
extern bool_t xdr_remote_procedure (XDR *, remote_procedure*); extern bool_t xdr_remote_procedure (XDR *, remote_procedure*);
extern bool_t xdr_remote_message_type (XDR *, remote_message_type*); extern bool_t xdr_remote_message_type (XDR *, remote_message_type*);
extern bool_t xdr_remote_message_status (XDR *, remote_message_status*); extern bool_t xdr_remote_message_status (XDR *, remote_message_status*);
@ -2483,6 +2497,8 @@ extern bool_t xdr_remote_domain_get_job_info_args ();
extern bool_t xdr_remote_domain_get_job_info_ret (); extern bool_t xdr_remote_domain_get_job_info_ret ();
extern bool_t xdr_remote_domain_abort_job_args (); extern bool_t xdr_remote_domain_abort_job_args ();
extern bool_t xdr_remote_domain_migrate_set_max_downtime_args (); extern bool_t xdr_remote_domain_migrate_set_max_downtime_args ();
extern bool_t xdr_remote_domain_events_register_any_args ();
extern bool_t xdr_remote_domain_events_deregister_any_args ();
extern bool_t xdr_remote_procedure (); extern bool_t xdr_remote_procedure ();
extern bool_t xdr_remote_message_type (); extern bool_t xdr_remote_message_type ();
extern bool_t xdr_remote_message_status (); extern bool_t xdr_remote_message_status ();

View File

@ -1528,6 +1528,14 @@ struct remote_domain_migrate_set_max_downtime_args {
unsigned flags; unsigned flags;
}; };
struct remote_domain_events_register_any_args {
int eventID;
};
struct remote_domain_events_deregister_any_args {
int eventID;
};
/*----- Protocol. -----*/ /*----- Protocol. -----*/
@ -1717,7 +1725,9 @@ enum remote_procedure {
REMOTE_PROC_DOMAIN_GET_JOB_INFO = 163, REMOTE_PROC_DOMAIN_GET_JOB_INFO = 163,
REMOTE_PROC_DOMAIN_ABORT_JOB = 164, REMOTE_PROC_DOMAIN_ABORT_JOB = 164,
REMOTE_PROC_STORAGE_VOL_WIPE = 165, REMOTE_PROC_STORAGE_VOL_WIPE = 165,
REMOTE_PROC_DOMAIN_MIGRATE_SET_MAX_DOWNTIME = 166 REMOTE_PROC_DOMAIN_MIGRATE_SET_MAX_DOWNTIME = 166,
REMOTE_PROC_DOMAIN_EVENTS_REGISTER_ANY = 167,
REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER_ANY = 168
/* /*
* Notice how the entries are grouped in sets of 10 ? * Notice how the entries are grouped in sets of 10 ?