Refactor incoming message handling to prepare for data stream support

* src/remote_internal.c: Rename processCallRecvMsg to
  processCallDispatch, and move code specific to method replies
  into processCallDispatchReply, and rename processCallAsyncEvent
  to processCallDispatchMessage
This commit is contained in:
Daniel P. Berrange 2009-07-10 16:03:22 +01:00
parent 27944fac9c
commit 7a61c13834

View File

@ -6569,29 +6569,6 @@ processCallRecvSome(virConnectPtr conn, struct private_data *priv,
} }
static void
processCallAsyncEvent(virConnectPtr conn, struct private_data *priv,
int in_open,
remote_message_header *hdr,
XDR *xdr) {
/* An async message has come in while we were waiting for the
* response. Process it to pull it off the wire, and try again
*/
DEBUG0("Encountered an event while waiting for a response");
if (in_open) {
DEBUG("Ignoring bogus event %d received while in open", hdr->proc);
return;
}
if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) {
remoteDomainQueueEvent(conn, xdr);
virEventUpdateTimeout(priv->eventFlushTimer, 0);
} else {
DEBUG("Unexpected event proc %d", hdr->proc);
}
}
static int static int
processCallRecvLen(virConnectPtr conn, struct private_data *priv, processCallRecvLen(virConnectPtr conn, struct private_data *priv,
int in_open) { int in_open) {
@ -6630,12 +6607,25 @@ processCallRecvLen(virConnectPtr conn, struct private_data *priv,
static int static int
processCallRecvMsg(virConnectPtr conn, struct private_data *priv, processCallDispatchReply(virConnectPtr conn, struct private_data *priv,
int in_open) { int in_open,
remote_message_header *hdr,
XDR *xdr);
static int
processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
int in_open,
remote_message_header *hdr,
XDR *xdr);
static int
processCallDispatch(virConnectPtr conn, struct private_data *priv,
int in_open) {
XDR xdr; XDR xdr;
struct remote_message_header hdr; struct remote_message_header hdr;
int len = priv->bufferLength - 4; int len = priv->bufferLength - 4;
struct remote_thread_call *thecall; int rv = -1;
/* Deserialise reply header. */ /* Deserialise reply header. */
xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE); xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
@ -6663,30 +6653,44 @@ processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
return -1; return -1;
} }
/* Async events from server need special handling */ switch (hdr.type) {
if (hdr.type == REMOTE_MESSAGE) { case REMOTE_REPLY: /* Normal RPC replies */
processCallAsyncEvent(conn, priv, in_open, rv = processCallDispatchReply(conn, priv, in_open,
&hdr, &xdr); &hdr, &xdr);
xdr_destroy(&xdr); break;
return 0;
case REMOTE_MESSAGE: /* Async notifications */
rv = processCallDispatchMessage(conn, priv, in_open,
&hdr, &xdr);
break;
default:
virRaiseError (in_open ? NULL : conn,
NULL, NULL, VIR_FROM_REMOTE,
VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
_("got unexpected RPC call %d from server"),
hdr.proc);
rv = -1;
break;
} }
if (hdr.type != REMOTE_REPLY) { xdr_destroy(&xdr);
virRaiseError (in_open ? NULL : conn, return rv;
NULL, NULL, VIR_FROM_REMOTE, }
VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
_("got unexpected RPC call %d from server"),
hdr.proc); static int
xdr_destroy(&xdr); processCallDispatchReply(virConnectPtr conn, struct private_data *priv,
return -1; int in_open,
} remote_message_header *hdr,
XDR *xdr) {
struct remote_thread_call *thecall;
/* Ok, definitely got an RPC reply now find /* Ok, definitely got an RPC reply now find
out who's been waiting for it */ out who's been waiting for it */
thecall = priv->waitDispatch; thecall = priv->waitDispatch;
while (thecall && while (thecall &&
thecall->serial != hdr.serial) thecall->serial != hdr->serial)
thecall = thecall->next; thecall = thecall->next;
if (!thecall) { if (!thecall) {
@ -6694,18 +6698,16 @@ processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
NULL, NULL, VIR_FROM_REMOTE, NULL, NULL, VIR_FROM_REMOTE,
VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
_("no call waiting for reply with serial %d"), _("no call waiting for reply with serial %d"),
hdr.serial); hdr->serial);
xdr_destroy(&xdr);
return -1; return -1;
} }
if (hdr.proc != thecall->proc_nr) { if (hdr->proc != thecall->proc_nr) {
virRaiseError (in_open ? NULL : conn, virRaiseError (in_open ? NULL : conn,
NULL, NULL, VIR_FROM_REMOTE, NULL, NULL, VIR_FROM_REMOTE,
VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
_("unknown procedure (received %x, expected %x)"), _("unknown procedure (received %x, expected %x)"),
hdr.proc, thecall->proc_nr); hdr->proc, thecall->proc_nr);
xdr_destroy (&xdr);
return -1; return -1;
} }
@ -6713,25 +6715,23 @@ processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
* structure), or REMOTE_ERROR (and what follows is a remote_error * structure), or REMOTE_ERROR (and what follows is a remote_error
* structure). * structure).
*/ */
switch (hdr.status) { switch (hdr->status) {
case REMOTE_OK: case REMOTE_OK:
if (!(*thecall->ret_filter) (&xdr, thecall->ret)) { if (!(*thecall->ret_filter) (xdr, thecall->ret)) {
error (in_open ? NULL : conn, VIR_ERR_RPC, error (in_open ? NULL : conn, VIR_ERR_RPC,
_("unmarshalling ret")); _("unmarshalling ret"));
return -1; return -1;
} }
thecall->mode = REMOTE_MODE_COMPLETE; thecall->mode = REMOTE_MODE_COMPLETE;
xdr_destroy (&xdr);
return 0; return 0;
case REMOTE_ERROR: case REMOTE_ERROR:
memset (&thecall->err, 0, sizeof thecall->err); memset (&thecall->err, 0, sizeof thecall->err);
if (!xdr_remote_error (&xdr, &thecall->err)) { if (!xdr_remote_error (xdr, &thecall->err)) {
error (in_open ? NULL : conn, error (in_open ? NULL : conn,
VIR_ERR_RPC, _("unmarshalling remote_error")); VIR_ERR_RPC, _("unmarshalling remote_error"));
return -1; return -1;
} }
xdr_destroy (&xdr);
thecall->mode = REMOTE_MODE_ERROR; thecall->mode = REMOTE_MODE_ERROR;
return 0; return 0;
@ -6739,12 +6739,36 @@ processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
_("unknown status (received %x)"), _("unknown status (received %x)"),
hdr.status); hdr->status);
xdr_destroy (&xdr);
return -1; return -1;
} }
} }
static int
processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
int in_open,
remote_message_header *hdr,
XDR *xdr) {
/* An async message has come in while we were waiting for the
* response. Process it to pull it off the wire, and try again
*/
DEBUG0("Encountered an event while waiting for a response");
if (in_open) {
DEBUG("Ignoring bogus event %d received while in open", hdr->proc);
return -1;
}
if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) {
remoteDomainQueueEvent(conn, xdr);
virEventUpdateTimeout(priv->eventFlushTimer, 0);
} else {
return -1;
DEBUG("Unexpected event proc %d", hdr->proc);
}
return 0;
}
static int static int
processCallRecv(virConnectPtr conn, struct private_data *priv, processCallRecv(virConnectPtr conn, struct private_data *priv,
@ -6775,7 +6799,7 @@ processCallRecv(virConnectPtr conn, struct private_data *priv,
* next iteration. * next iteration.
*/ */
} else { } else {
ret = processCallRecvMsg(conn, priv, in_open); ret = processCallDispatch(conn, priv, in_open);
priv->bufferOffset = priv->bufferLength = 0; priv->bufferOffset = priv->bufferLength = 0;
/* /*
* We've completed one call, so return even * We've completed one call, so return even