diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index cf12bf5107..d8b9cf88d0 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -1,4 +1,3 @@ - /* * remote_internal.c: driver to provide access to libvirtd running * on a remote machine @@ -111,7 +110,8 @@ enum { struct remote_thread_call { int mode; - /* 4 byte length, followed by RPC message header+body */ + /* Buffer for outgoing data packet + * 4 byte length, followed by RPC message header+body */ char buffer[4 + REMOTE_MESSAGE_MAX]; unsigned int bufferLength; unsigned int bufferOffset; @@ -121,6 +121,7 @@ struct remote_thread_call { virCond cond; + int want_reply; xdrproc_t ret_filter; char *ret; @@ -129,6 +130,26 @@ struct remote_thread_call { struct remote_thread_call *next; }; +struct private_stream_data { + unsigned int has_error : 1; + remote_error err; + + unsigned int serial; + unsigned int proc_nr; + + /* XXX this is potentially unbounded if the client + * app has domain events registered, since packets + * may be read off wire, while app isn't ready to + * recv them. Figure out how to address this some + * time.... + */ + char *incoming; + unsigned int incomingOffset; + unsigned int incomingLength; + + struct private_stream_data *next; +}; + struct private_data { virMutex lock; @@ -155,7 +176,8 @@ struct private_data { unsigned int saslEncodedOffset; #endif - /* 4 byte length, followed by RPC message header+body */ + /* Buffer for incoming data packets + * 4 byte length, followed by RPC message header+body */ char buffer[4 + REMOTE_MESSAGE_MAX]; unsigned int bufferLength; unsigned int bufferOffset; @@ -176,6 +198,8 @@ struct private_data { /* List of threads currently waiting for dispatch */ struct remote_thread_call *waitDispatch; + + struct private_stream_data *streams; }; enum { @@ -194,6 +218,10 @@ static void remoteDriverUnlock(struct private_data *driver) virMutexUnlock(&driver->lock); } +static int remoteIO(virConnectPtr conn, + struct private_data *priv, + int flags, + struct remote_thread_call *thiscall); static int call (virConnectPtr conn, struct private_data *priv, int flags, int proc_nr, xdrproc_t args_filter, char *args, @@ -6669,6 +6697,361 @@ done: return rv; } + +#if 0 +static struct private_stream_data * +remoteStreamOpen(virStreamPtr st, + int output ATTRIBUTE_UNUSED, + unsigned int proc_nr, + unsigned int serial) +{ + struct private_data *priv = st->conn->privateData; + struct private_stream_data *stpriv; + + if (VIR_ALLOC(stpriv) < 0) + return NULL; + + /* Initialize call object used to receive replies */ + stpriv->proc_nr = proc_nr; + stpriv->serial = serial; + + stpriv->next = priv->streams; + priv->streams = stpriv; + + return stpriv; +} + + +static int +remoteStreamPacket(virStreamPtr st, + int status, + const char *data, + size_t nbytes) +{ + DEBUG("st=%p status=%d data=%p nbytes=%d", st, status, data, nbytes); + struct private_data *priv = st->conn->privateData; + struct private_stream_data *privst = st->privateData; + XDR xdr; + struct remote_thread_call *thiscall; + remote_message_header hdr; + + memset(&hdr, 0, sizeof hdr); + + if (VIR_ALLOC(thiscall) < 0) { + virReportOOMError(st->conn); + return -1; + } + + thiscall->mode = REMOTE_MODE_WAIT_TX; + thiscall->serial = privst->serial; + thiscall->proc_nr = privst->proc_nr; + if (status == REMOTE_OK || + status == REMOTE_ERROR) + thiscall->want_reply = 1; + + if (virCondInit(&thiscall->cond) < 0) { + VIR_FREE(thiscall); + error (st->conn, VIR_ERR_INTERNAL_ERROR, + _("cannot initialize mutex")); + return -1; + } + + /* Don't fill in any other fields in 'thiscall' since + * we're not expecting a reply for this */ + + hdr.prog = REMOTE_PROGRAM; + hdr.vers = REMOTE_PROTOCOL_VERSION; + hdr.proc = privst->proc_nr; + hdr.type = REMOTE_STREAM; + hdr.serial = privst->serial; + hdr.status = status; + + + /* Length must include the length word itself (always encoded in + * 4 bytes as per RFC 4506), so offset start length. We write this + * later. + */ + thiscall->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; + + /* Serialise header followed by args. */ + xdrmem_create (&xdr, thiscall->buffer + thiscall->bufferLength, + REMOTE_MESSAGE_MAX, XDR_ENCODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (st->conn, + VIR_ERR_RPC, _("xdr_remote_message_header failed")); + goto error; + } + + thiscall->bufferLength += xdr_getpos (&xdr); + xdr_destroy (&xdr); + + if (status == REMOTE_CONTINUE) { + if (((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength) < nbytes) { + errorf(st->conn, + VIR_ERR_RPC, _("data size %d too large for payload %d"), + nbytes, ((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength)); + goto error; + } + + memcpy(thiscall->buffer + thiscall->bufferLength, data, nbytes); + thiscall->bufferLength += nbytes; + } + + /* Go back to packet start and encode the length word. */ + xdrmem_create (&xdr, thiscall->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE); + if (!xdr_u_int (&xdr, &thiscall->bufferLength)) { + error(st->conn, VIR_ERR_RPC, + _("xdr_u_int (length word)")); + goto error; + } + xdr_destroy (&xdr); + + /* remoteIO frees 'thiscall' for us (XXX that's dubious semantics) */ + if (remoteIO(st->conn, priv, 0, thiscall) < 0) + return -1; + + return nbytes; + +error: + xdr_destroy (&xdr); + VIR_FREE(thiscall); + return -1; +} + +static int +remoteStreamHasError(virStreamPtr st) { + struct private_stream_data *privst = st->privateData; + if (!privst->has_error) { + return 0; + } + + VIR_WARN0("Raising async error"); + virRaiseErrorFull(st->conn, + __FILE__, __FUNCTION__, __LINE__, + privst->err.domain, + privst->err.code, + privst->err.level, + privst->err.str1 ? *privst->err.str1 : NULL, + privst->err.str2 ? *privst->err.str2 : NULL, + privst->err.str3 ? *privst->err.str3 : NULL, + privst->err.int1, + privst->err.int2, + "%s", privst->err.message ? *privst->err.message : NULL); + + return 1; +} + +static void +remoteStreamRelease(virStreamPtr st) +{ + struct private_data *priv = st->conn->privateData; + struct private_stream_data *privst = st->privateData; + + if (priv->streams == privst) + priv->streams = privst->next; + else { + struct private_stream_data *tmp = priv->streams; + while (tmp && tmp->next) { + if (tmp->next == privst) { + tmp->next = privst->next; + break; + } + } + } + + if (privst->has_error) + xdr_free((xdrproc_t)xdr_remote_error, (char *)&privst->err); + + VIR_FREE(privst); + + st->driver = NULL; + st->privateData = NULL; +} + + +static int +remoteStreamSend(virStreamPtr st, + const char *data, + size_t nbytes) +{ + DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes); + struct private_data *priv = st->conn->privateData; + int rv = -1; + + remoteDriverLock(priv); + + if (remoteStreamHasError(st)) + goto cleanup; + + rv = remoteStreamPacket(st, + REMOTE_CONTINUE, + data, + nbytes); + +cleanup: + if (rv == -1) + remoteStreamRelease(st); + + remoteDriverUnlock(priv); + + return rv; +} + + +static int +remoteStreamRecv(virStreamPtr st, + char *data, + size_t nbytes) +{ + DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes); + struct private_data *priv = st->conn->privateData; + struct private_stream_data *privst = st->privateData; + int rv = -1; + + remoteDriverLock(priv); + + if (remoteStreamHasError(st)) + goto cleanup; + + if (!privst->incomingOffset) { + struct remote_thread_call *thiscall; + + if (VIR_ALLOC(thiscall) < 0) { + virReportOOMError(st->conn); + goto cleanup; + } + + /* We're not really doing an RPC calls, so we're + * skipping straight to RX part */ + thiscall->mode = REMOTE_MODE_WAIT_RX; + thiscall->serial = privst->serial; + thiscall->proc_nr = privst->proc_nr; + thiscall->want_reply = 1; + + if (virCondInit(&thiscall->cond) < 0) { + VIR_FREE(thiscall); + error (st->conn, VIR_ERR_INTERNAL_ERROR, + _("cannot initialize mutex")); + goto cleanup; + } + + /* remoteIO frees 'thiscall' for us (XXX that's dubious semantics) */ + if (remoteIO(st->conn, priv, 0, thiscall) < 0) + goto cleanup; + } + + DEBUG("After IO %d", privst->incomingOffset); + if (privst->incomingOffset) { + int want = privst->incomingOffset; + if (want > nbytes) + want = nbytes; + memcpy(data, privst->incoming, want); + if (want < privst->incomingOffset) { + memmove(privst->incoming, privst->incoming + want, privst->incomingOffset - want); + privst->incomingOffset -= want; + } else { + VIR_FREE(privst->incoming); + privst->incomingOffset = privst->incomingLength = 0; + } + rv = want; + } else { + rv = 0; + } + + DEBUG("Done %d", rv); + +cleanup: + if (rv == -1) + remoteStreamRelease(st); + remoteDriverUnlock(priv); + + return rv; +} + +static int +remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED, + int events ATTRIBUTE_UNUSED, + virStreamEventCallback cb ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED, + virFreeCallback ff ATTRIBUTE_UNUSED) +{ + return -1; +} + +static int +remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED, + int events ATTRIBUTE_UNUSED) +{ + return -1; +} + + +static int +remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED) +{ + return -1; +} + +static int +remoteStreamFinish(virStreamPtr st) +{ + struct private_data *priv = st->conn->privateData; + int ret = -1; + + remoteDriverLock(priv); + + if (remoteStreamHasError(st)) + goto cleanup; + + ret = remoteStreamPacket(st, + REMOTE_OK, + NULL, + 0); + +cleanup: + remoteStreamRelease(st); + + remoteDriverUnlock(priv); + return ret; +} + +static int +remoteStreamAbort(virStreamPtr st) +{ + struct private_data *priv = st->conn->privateData; + int ret = -1; + + remoteDriverLock(priv); + + if (remoteStreamHasError(st)) + goto cleanup; + + ret = remoteStreamPacket(st, + REMOTE_ERROR, + NULL, + 0); + +cleanup: + remoteStreamRelease(st); + + remoteDriverUnlock(priv); + return ret; +} + + + +static virStreamDriver remoteStreamDrv = { + .streamRecv = remoteStreamRecv, + .streamSend = remoteStreamSend, + .streamFinish = remoteStreamFinish, + .streamAbort = remoteStreamAbort, + .streamAddCallback = remoteStreamEventAddCallback, + .streamUpdateCallback = remoteStreamEventUpdateCallback, + .streamRemoveCallback = remoteStreamEventRemoveCallback, +}; +#endif + + /*----------------------------------------------------------------------*/ @@ -6700,6 +7083,7 @@ prepareCall(virConnectPtr conn, rv->proc_nr = proc_nr; rv->ret_filter = ret_filter; rv->ret = ret; + rv->want_reply = 1; hdr.prog = REMOTE_PROGRAM; hdr.vers = REMOTE_PROTOCOL_VERSION; @@ -6885,7 +7269,10 @@ remoteIOWriteMessage(virConnectPtr conn, if (priv->saslEncodedOffset == priv->saslEncodedLength) { priv->saslEncoded = NULL; priv->saslEncodedOffset = priv->saslEncodedLength = 0; - thecall->mode = REMOTE_MODE_WAIT_RX; + if (thecall->want_reply) + thecall->mode = REMOTE_MODE_WAIT_RX; + else + thecall->mode = REMOTE_MODE_COMPLETE; } } else { #endif @@ -6899,7 +7286,10 @@ remoteIOWriteMessage(virConnectPtr conn, if (thecall->bufferOffset == thecall->bufferLength) { thecall->bufferOffset = thecall->bufferLength = 0; - thecall->mode = REMOTE_MODE_WAIT_RX; + if (thecall->want_reply) + thecall->mode = REMOTE_MODE_WAIT_RX; + else + thecall->mode = REMOTE_MODE_COMPLETE; } #if HAVE_SASL } @@ -7052,6 +7442,12 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv, remote_message_header *hdr, XDR *xdr); +static int +processCallDispatchStream(virConnectPtr conn, struct private_data *priv, + int in_open, + remote_message_header *hdr, + XDR *xdr); + static int processCallDispatch(virConnectPtr conn, struct private_data *priv, @@ -7061,14 +7457,19 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv, int len = priv->bufferLength - 4; int rv = -1; + /* Length word has already been read */ + priv->bufferOffset = 4; + /* Deserialise reply header. */ - xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE); + xdrmem_create (&xdr, priv->buffer + priv->bufferOffset, len, XDR_DECODE); if (!xdr_remote_message_header (&xdr, &hdr)) { error (in_open ? NULL : conn, VIR_ERR_RPC, _("invalid header in reply")); return -1; } + priv->bufferOffset += xdr_getpos(&xdr); + /* Check program, version, etc. are what we expect. */ if (hdr.prog != REMOTE_PROGRAM) { virRaiseError (in_open ? NULL : conn, @@ -7087,6 +7488,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv, return -1; } + switch (hdr.type) { case REMOTE_REPLY: /* Normal RPC replies */ rv = processCallDispatchReply(conn, priv, in_open, @@ -7098,6 +7500,11 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv, &hdr, &xdr); break; + case REMOTE_STREAM: /* Stream protocol */ + rv = processCallDispatchStream(conn, priv, in_open, + &hdr, &xdr); + break; + default: virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, @@ -7160,6 +7567,7 @@ processCallDispatchReply(virConnectPtr conn, struct private_data *priv, return 0; case REMOTE_ERROR: + VIR_WARN0("Method call error"); memset (&thecall->err, 0, sizeof thecall->err); if (!xdr_remote_error (xdr, &thecall->err)) { error (in_open ? NULL : conn, @@ -7203,6 +7611,113 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv, return 0; } +static int +processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED, + struct private_data *priv, + int in_open ATTRIBUTE_UNUSED, + remote_message_header *hdr, + XDR *xdr) { + struct private_stream_data *privst; + struct remote_thread_call *thecall; + + /* Try and find a matching stream */ + privst = priv->streams; + while (privst && + privst->serial != hdr->serial && + privst->proc_nr != hdr->proc) + privst = privst->next; + + if (!privst) { + VIR_WARN("No registered stream matching serial=%d, proc=%d", + hdr->serial, hdr->proc); + return -1; + } + + /* See if there's also a (optional) call waiting for this reply */ + thecall = priv->waitDispatch; + while (thecall && + thecall->serial != hdr->serial) + thecall = thecall->next; + + + /* Status is either REMOTE_OK (meaning that what follows is a ret + * structure), or REMOTE_ERROR (and what follows is a remote_error + * structure). + */ + switch (hdr->status) { + case REMOTE_CONTINUE: { + int avail = privst->incomingLength - privst->incomingOffset; + int need = priv->bufferLength - priv->bufferOffset; + VIR_WARN0("Got a stream data packet"); + + /* XXX flag stream as complete somwhere if need==0 */ + + if (need > avail) { + int extra = need - avail; + if (VIR_REALLOC_N(privst->incoming, + privst->incomingLength + extra) < 0) { + VIR_WARN0("Out of memory"); + return -1; + } + privst->incomingLength += extra; + } + + memcpy(privst->incoming + privst->incomingOffset, + priv->buffer + priv->bufferOffset, + priv->bufferLength - priv->bufferOffset); + privst->incomingOffset += (priv->bufferLength - priv->bufferOffset); + + if (thecall && thecall->want_reply) { + VIR_WARN("Got sync data packet offset=%d", privst->incomingOffset); + thecall->mode = REMOTE_MODE_COMPLETE; + } else { + VIR_WARN("Got aysnc data packet offset=%d", privst->incomingOffset); + } + return 0; + } + + case REMOTE_OK: + VIR_WARN0("Got a synchronous confirm"); + if (!thecall) { + VIR_WARN0("Got unexpected stream finish confirmation"); + return -1; + } + thecall->mode = REMOTE_MODE_COMPLETE; + return 0; + + case REMOTE_ERROR: + if (thecall && thecall->want_reply) { + VIR_WARN0("Got a synchronous error"); + /* Give the error straight to this call */ + memset (&thecall->err, 0, sizeof thecall->err); + if (!xdr_remote_error (xdr, &thecall->err)) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("unmarshalling remote_error")); + return -1; + } + thecall->mode = REMOTE_MODE_ERROR; + } else { + VIR_WARN0("Got a asynchronous error"); + /* No call, so queue the error against the stream */ + if (privst->has_error) { + VIR_WARN0("Got unexpected duplicate stream error"); + return -1; + } + privst->has_error = 1; + memset (&privst->err, 0, sizeof privst->err); + if (!xdr_remote_error (xdr, &privst->err)) { + VIR_WARN0("Failed to unmarshall error"); + return -1; + } + } + return 0; + + default: + VIR_WARN("Stream with unexpected serial=%d, proc=%d, status=%d", + hdr->serial, hdr->proc, hdr->status); + return -1; + } +} static int remoteIOHandleInput(virConnectPtr conn, struct private_data *priv, @@ -7283,6 +7798,9 @@ remoteIOEventLoop(virConnectPtr conn, tmp = tmp->next; } + if (priv->streams) + fds[0].events |= POLLIN; + /* Release lock while poll'ing so other threads * can stuff themselves on the queue */ remoteDriverUnlock(priv);