Handle data streams in remote client

* src/remote_internal.c: Add helper APIs for processing data streams
This commit is contained in:
Daniel P. Berrange 2009-08-07 15:13:46 +01:00
parent 4f17809a36
commit 401c404811

View File

@ -1,4 +1,3 @@
/* /*
* remote_internal.c: driver to provide access to libvirtd running * remote_internal.c: driver to provide access to libvirtd running
* on a remote machine * on a remote machine
@ -111,7 +110,8 @@ enum {
struct remote_thread_call { struct remote_thread_call {
int mode; int mode;
/* 4 byte length, followed by RPC message header+body */ /* Buffer for outgoing data packet
* 4 byte length, followed by RPC message header+body */
char buffer[4 + REMOTE_MESSAGE_MAX]; char buffer[4 + REMOTE_MESSAGE_MAX];
unsigned int bufferLength; unsigned int bufferLength;
unsigned int bufferOffset; unsigned int bufferOffset;
@ -121,6 +121,7 @@ struct remote_thread_call {
virCond cond; virCond cond;
int want_reply;
xdrproc_t ret_filter; xdrproc_t ret_filter;
char *ret; char *ret;
@ -129,6 +130,26 @@ struct remote_thread_call {
struct remote_thread_call *next; struct remote_thread_call *next;
}; };
struct private_stream_data {
unsigned int has_error : 1;
remote_error err;
unsigned int serial;
unsigned int proc_nr;
/* XXX this is potentially unbounded if the client
* app has domain events registered, since packets
* may be read off wire, while app isn't ready to
* recv them. Figure out how to address this some
* time....
*/
char *incoming;
unsigned int incomingOffset;
unsigned int incomingLength;
struct private_stream_data *next;
};
struct private_data { struct private_data {
virMutex lock; virMutex lock;
@ -155,7 +176,8 @@ struct private_data {
unsigned int saslEncodedOffset; unsigned int saslEncodedOffset;
#endif #endif
/* 4 byte length, followed by RPC message header+body */ /* Buffer for incoming data packets
* 4 byte length, followed by RPC message header+body */
char buffer[4 + REMOTE_MESSAGE_MAX]; char buffer[4 + REMOTE_MESSAGE_MAX];
unsigned int bufferLength; unsigned int bufferLength;
unsigned int bufferOffset; unsigned int bufferOffset;
@ -176,6 +198,8 @@ struct private_data {
/* List of threads currently waiting for dispatch */ /* List of threads currently waiting for dispatch */
struct remote_thread_call *waitDispatch; struct remote_thread_call *waitDispatch;
struct private_stream_data *streams;
}; };
enum { enum {
@ -194,6 +218,10 @@ static void remoteDriverUnlock(struct private_data *driver)
virMutexUnlock(&driver->lock); virMutexUnlock(&driver->lock);
} }
static int remoteIO(virConnectPtr conn,
struct private_data *priv,
int flags,
struct remote_thread_call *thiscall);
static int call (virConnectPtr conn, struct private_data *priv, static int call (virConnectPtr conn, struct private_data *priv,
int flags, int proc_nr, int flags, int proc_nr,
xdrproc_t args_filter, char *args, xdrproc_t args_filter, char *args,
@ -6669,6 +6697,361 @@ done:
return rv; return rv;
} }
#if 0
static struct private_stream_data *
remoteStreamOpen(virStreamPtr st,
int output ATTRIBUTE_UNUSED,
unsigned int proc_nr,
unsigned int serial)
{
struct private_data *priv = st->conn->privateData;
struct private_stream_data *stpriv;
if (VIR_ALLOC(stpriv) < 0)
return NULL;
/* Initialize call object used to receive replies */
stpriv->proc_nr = proc_nr;
stpriv->serial = serial;
stpriv->next = priv->streams;
priv->streams = stpriv;
return stpriv;
}
static int
remoteStreamPacket(virStreamPtr st,
int status,
const char *data,
size_t nbytes)
{
DEBUG("st=%p status=%d data=%p nbytes=%d", st, status, data, nbytes);
struct private_data *priv = st->conn->privateData;
struct private_stream_data *privst = st->privateData;
XDR xdr;
struct remote_thread_call *thiscall;
remote_message_header hdr;
memset(&hdr, 0, sizeof hdr);
if (VIR_ALLOC(thiscall) < 0) {
virReportOOMError(st->conn);
return -1;
}
thiscall->mode = REMOTE_MODE_WAIT_TX;
thiscall->serial = privst->serial;
thiscall->proc_nr = privst->proc_nr;
if (status == REMOTE_OK ||
status == REMOTE_ERROR)
thiscall->want_reply = 1;
if (virCondInit(&thiscall->cond) < 0) {
VIR_FREE(thiscall);
error (st->conn, VIR_ERR_INTERNAL_ERROR,
_("cannot initialize mutex"));
return -1;
}
/* Don't fill in any other fields in 'thiscall' since
* we're not expecting a reply for this */
hdr.prog = REMOTE_PROGRAM;
hdr.vers = REMOTE_PROTOCOL_VERSION;
hdr.proc = privst->proc_nr;
hdr.type = REMOTE_STREAM;
hdr.serial = privst->serial;
hdr.status = status;
/* Length must include the length word itself (always encoded in
* 4 bytes as per RFC 4506), so offset start length. We write this
* later.
*/
thiscall->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
/* Serialise header followed by args. */
xdrmem_create (&xdr, thiscall->buffer + thiscall->bufferLength,
REMOTE_MESSAGE_MAX, XDR_ENCODE);
if (!xdr_remote_message_header (&xdr, &hdr)) {
error (st->conn,
VIR_ERR_RPC, _("xdr_remote_message_header failed"));
goto error;
}
thiscall->bufferLength += xdr_getpos (&xdr);
xdr_destroy (&xdr);
if (status == REMOTE_CONTINUE) {
if (((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength) < nbytes) {
errorf(st->conn,
VIR_ERR_RPC, _("data size %d too large for payload %d"),
nbytes, ((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength));
goto error;
}
memcpy(thiscall->buffer + thiscall->bufferLength, data, nbytes);
thiscall->bufferLength += nbytes;
}
/* Go back to packet start and encode the length word. */
xdrmem_create (&xdr, thiscall->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE);
if (!xdr_u_int (&xdr, &thiscall->bufferLength)) {
error(st->conn, VIR_ERR_RPC,
_("xdr_u_int (length word)"));
goto error;
}
xdr_destroy (&xdr);
/* remoteIO frees 'thiscall' for us (XXX that's dubious semantics) */
if (remoteIO(st->conn, priv, 0, thiscall) < 0)
return -1;
return nbytes;
error:
xdr_destroy (&xdr);
VIR_FREE(thiscall);
return -1;
}
static int
remoteStreamHasError(virStreamPtr st) {
struct private_stream_data *privst = st->privateData;
if (!privst->has_error) {
return 0;
}
VIR_WARN0("Raising async error");
virRaiseErrorFull(st->conn,
__FILE__, __FUNCTION__, __LINE__,
privst->err.domain,
privst->err.code,
privst->err.level,
privst->err.str1 ? *privst->err.str1 : NULL,
privst->err.str2 ? *privst->err.str2 : NULL,
privst->err.str3 ? *privst->err.str3 : NULL,
privst->err.int1,
privst->err.int2,
"%s", privst->err.message ? *privst->err.message : NULL);
return 1;
}
static void
remoteStreamRelease(virStreamPtr st)
{
struct private_data *priv = st->conn->privateData;
struct private_stream_data *privst = st->privateData;
if (priv->streams == privst)
priv->streams = privst->next;
else {
struct private_stream_data *tmp = priv->streams;
while (tmp && tmp->next) {
if (tmp->next == privst) {
tmp->next = privst->next;
break;
}
}
}
if (privst->has_error)
xdr_free((xdrproc_t)xdr_remote_error, (char *)&privst->err);
VIR_FREE(privst);
st->driver = NULL;
st->privateData = NULL;
}
static int
remoteStreamSend(virStreamPtr st,
const char *data,
size_t nbytes)
{
DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes);
struct private_data *priv = st->conn->privateData;
int rv = -1;
remoteDriverLock(priv);
if (remoteStreamHasError(st))
goto cleanup;
rv = remoteStreamPacket(st,
REMOTE_CONTINUE,
data,
nbytes);
cleanup:
if (rv == -1)
remoteStreamRelease(st);
remoteDriverUnlock(priv);
return rv;
}
static int
remoteStreamRecv(virStreamPtr st,
char *data,
size_t nbytes)
{
DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes);
struct private_data *priv = st->conn->privateData;
struct private_stream_data *privst = st->privateData;
int rv = -1;
remoteDriverLock(priv);
if (remoteStreamHasError(st))
goto cleanup;
if (!privst->incomingOffset) {
struct remote_thread_call *thiscall;
if (VIR_ALLOC(thiscall) < 0) {
virReportOOMError(st->conn);
goto cleanup;
}
/* We're not really doing an RPC calls, so we're
* skipping straight to RX part */
thiscall->mode = REMOTE_MODE_WAIT_RX;
thiscall->serial = privst->serial;
thiscall->proc_nr = privst->proc_nr;
thiscall->want_reply = 1;
if (virCondInit(&thiscall->cond) < 0) {
VIR_FREE(thiscall);
error (st->conn, VIR_ERR_INTERNAL_ERROR,
_("cannot initialize mutex"));
goto cleanup;
}
/* remoteIO frees 'thiscall' for us (XXX that's dubious semantics) */
if (remoteIO(st->conn, priv, 0, thiscall) < 0)
goto cleanup;
}
DEBUG("After IO %d", privst->incomingOffset);
if (privst->incomingOffset) {
int want = privst->incomingOffset;
if (want > nbytes)
want = nbytes;
memcpy(data, privst->incoming, want);
if (want < privst->incomingOffset) {
memmove(privst->incoming, privst->incoming + want, privst->incomingOffset - want);
privst->incomingOffset -= want;
} else {
VIR_FREE(privst->incoming);
privst->incomingOffset = privst->incomingLength = 0;
}
rv = want;
} else {
rv = 0;
}
DEBUG("Done %d", rv);
cleanup:
if (rv == -1)
remoteStreamRelease(st);
remoteDriverUnlock(priv);
return rv;
}
static int
remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
int events ATTRIBUTE_UNUSED,
virStreamEventCallback cb ATTRIBUTE_UNUSED,
void *opaque ATTRIBUTE_UNUSED,
virFreeCallback ff ATTRIBUTE_UNUSED)
{
return -1;
}
static int
remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
int events ATTRIBUTE_UNUSED)
{
return -1;
}
static int
remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED)
{
return -1;
}
static int
remoteStreamFinish(virStreamPtr st)
{
struct private_data *priv = st->conn->privateData;
int ret = -1;
remoteDriverLock(priv);
if (remoteStreamHasError(st))
goto cleanup;
ret = remoteStreamPacket(st,
REMOTE_OK,
NULL,
0);
cleanup:
remoteStreamRelease(st);
remoteDriverUnlock(priv);
return ret;
}
static int
remoteStreamAbort(virStreamPtr st)
{
struct private_data *priv = st->conn->privateData;
int ret = -1;
remoteDriverLock(priv);
if (remoteStreamHasError(st))
goto cleanup;
ret = remoteStreamPacket(st,
REMOTE_ERROR,
NULL,
0);
cleanup:
remoteStreamRelease(st);
remoteDriverUnlock(priv);
return ret;
}
static virStreamDriver remoteStreamDrv = {
.streamRecv = remoteStreamRecv,
.streamSend = remoteStreamSend,
.streamFinish = remoteStreamFinish,
.streamAbort = remoteStreamAbort,
.streamAddCallback = remoteStreamEventAddCallback,
.streamUpdateCallback = remoteStreamEventUpdateCallback,
.streamRemoveCallback = remoteStreamEventRemoveCallback,
};
#endif
/*----------------------------------------------------------------------*/ /*----------------------------------------------------------------------*/
@ -6700,6 +7083,7 @@ prepareCall(virConnectPtr conn,
rv->proc_nr = proc_nr; rv->proc_nr = proc_nr;
rv->ret_filter = ret_filter; rv->ret_filter = ret_filter;
rv->ret = ret; rv->ret = ret;
rv->want_reply = 1;
hdr.prog = REMOTE_PROGRAM; hdr.prog = REMOTE_PROGRAM;
hdr.vers = REMOTE_PROTOCOL_VERSION; hdr.vers = REMOTE_PROTOCOL_VERSION;
@ -6885,7 +7269,10 @@ remoteIOWriteMessage(virConnectPtr conn,
if (priv->saslEncodedOffset == priv->saslEncodedLength) { if (priv->saslEncodedOffset == priv->saslEncodedLength) {
priv->saslEncoded = NULL; priv->saslEncoded = NULL;
priv->saslEncodedOffset = priv->saslEncodedLength = 0; priv->saslEncodedOffset = priv->saslEncodedLength = 0;
thecall->mode = REMOTE_MODE_WAIT_RX; if (thecall->want_reply)
thecall->mode = REMOTE_MODE_WAIT_RX;
else
thecall->mode = REMOTE_MODE_COMPLETE;
} }
} else { } else {
#endif #endif
@ -6899,7 +7286,10 @@ remoteIOWriteMessage(virConnectPtr conn,
if (thecall->bufferOffset == thecall->bufferLength) { if (thecall->bufferOffset == thecall->bufferLength) {
thecall->bufferOffset = thecall->bufferLength = 0; thecall->bufferOffset = thecall->bufferLength = 0;
thecall->mode = REMOTE_MODE_WAIT_RX; if (thecall->want_reply)
thecall->mode = REMOTE_MODE_WAIT_RX;
else
thecall->mode = REMOTE_MODE_COMPLETE;
} }
#if HAVE_SASL #if HAVE_SASL
} }
@ -7052,6 +7442,12 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
remote_message_header *hdr, remote_message_header *hdr,
XDR *xdr); XDR *xdr);
static int
processCallDispatchStream(virConnectPtr conn, struct private_data *priv,
int in_open,
remote_message_header *hdr,
XDR *xdr);
static int static int
processCallDispatch(virConnectPtr conn, struct private_data *priv, processCallDispatch(virConnectPtr conn, struct private_data *priv,
@ -7061,14 +7457,19 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
int len = priv->bufferLength - 4; int len = priv->bufferLength - 4;
int rv = -1; int rv = -1;
/* Length word has already been read */
priv->bufferOffset = 4;
/* Deserialise reply header. */ /* Deserialise reply header. */
xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE); xdrmem_create (&xdr, priv->buffer + priv->bufferOffset, len, XDR_DECODE);
if (!xdr_remote_message_header (&xdr, &hdr)) { if (!xdr_remote_message_header (&xdr, &hdr)) {
error (in_open ? NULL : conn, error (in_open ? NULL : conn,
VIR_ERR_RPC, _("invalid header in reply")); VIR_ERR_RPC, _("invalid header in reply"));
return -1; return -1;
} }
priv->bufferOffset += xdr_getpos(&xdr);
/* Check program, version, etc. are what we expect. */ /* Check program, version, etc. are what we expect. */
if (hdr.prog != REMOTE_PROGRAM) { if (hdr.prog != REMOTE_PROGRAM) {
virRaiseError (in_open ? NULL : conn, virRaiseError (in_open ? NULL : conn,
@ -7087,6 +7488,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
return -1; return -1;
} }
switch (hdr.type) { switch (hdr.type) {
case REMOTE_REPLY: /* Normal RPC replies */ case REMOTE_REPLY: /* Normal RPC replies */
rv = processCallDispatchReply(conn, priv, in_open, rv = processCallDispatchReply(conn, priv, in_open,
@ -7098,6 +7500,11 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
&hdr, &xdr); &hdr, &xdr);
break; break;
case REMOTE_STREAM: /* Stream protocol */
rv = processCallDispatchStream(conn, priv, in_open,
&hdr, &xdr);
break;
default: default:
virRaiseError (in_open ? NULL : conn, virRaiseError (in_open ? NULL : conn,
NULL, NULL, VIR_FROM_REMOTE, NULL, NULL, VIR_FROM_REMOTE,
@ -7160,6 +7567,7 @@ processCallDispatchReply(virConnectPtr conn, struct private_data *priv,
return 0; return 0;
case REMOTE_ERROR: case REMOTE_ERROR:
VIR_WARN0("Method call error");
memset (&thecall->err, 0, sizeof thecall->err); memset (&thecall->err, 0, sizeof thecall->err);
if (!xdr_remote_error (xdr, &thecall->err)) { if (!xdr_remote_error (xdr, &thecall->err)) {
error (in_open ? NULL : conn, error (in_open ? NULL : conn,
@ -7203,6 +7611,113 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
return 0; return 0;
} }
static int
processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED,
struct private_data *priv,
int in_open ATTRIBUTE_UNUSED,
remote_message_header *hdr,
XDR *xdr) {
struct private_stream_data *privst;
struct remote_thread_call *thecall;
/* Try and find a matching stream */
privst = priv->streams;
while (privst &&
privst->serial != hdr->serial &&
privst->proc_nr != hdr->proc)
privst = privst->next;
if (!privst) {
VIR_WARN("No registered stream matching serial=%d, proc=%d",
hdr->serial, hdr->proc);
return -1;
}
/* See if there's also a (optional) call waiting for this reply */
thecall = priv->waitDispatch;
while (thecall &&
thecall->serial != hdr->serial)
thecall = thecall->next;
/* Status is either REMOTE_OK (meaning that what follows is a ret
* structure), or REMOTE_ERROR (and what follows is a remote_error
* structure).
*/
switch (hdr->status) {
case REMOTE_CONTINUE: {
int avail = privst->incomingLength - privst->incomingOffset;
int need = priv->bufferLength - priv->bufferOffset;
VIR_WARN0("Got a stream data packet");
/* XXX flag stream as complete somwhere if need==0 */
if (need > avail) {
int extra = need - avail;
if (VIR_REALLOC_N(privst->incoming,
privst->incomingLength + extra) < 0) {
VIR_WARN0("Out of memory");
return -1;
}
privst->incomingLength += extra;
}
memcpy(privst->incoming + privst->incomingOffset,
priv->buffer + priv->bufferOffset,
priv->bufferLength - priv->bufferOffset);
privst->incomingOffset += (priv->bufferLength - priv->bufferOffset);
if (thecall && thecall->want_reply) {
VIR_WARN("Got sync data packet offset=%d", privst->incomingOffset);
thecall->mode = REMOTE_MODE_COMPLETE;
} else {
VIR_WARN("Got aysnc data packet offset=%d", privst->incomingOffset);
}
return 0;
}
case REMOTE_OK:
VIR_WARN0("Got a synchronous confirm");
if (!thecall) {
VIR_WARN0("Got unexpected stream finish confirmation");
return -1;
}
thecall->mode = REMOTE_MODE_COMPLETE;
return 0;
case REMOTE_ERROR:
if (thecall && thecall->want_reply) {
VIR_WARN0("Got a synchronous error");
/* Give the error straight to this call */
memset (&thecall->err, 0, sizeof thecall->err);
if (!xdr_remote_error (xdr, &thecall->err)) {
error (in_open ? NULL : conn,
VIR_ERR_RPC, _("unmarshalling remote_error"));
return -1;
}
thecall->mode = REMOTE_MODE_ERROR;
} else {
VIR_WARN0("Got a asynchronous error");
/* No call, so queue the error against the stream */
if (privst->has_error) {
VIR_WARN0("Got unexpected duplicate stream error");
return -1;
}
privst->has_error = 1;
memset (&privst->err, 0, sizeof privst->err);
if (!xdr_remote_error (xdr, &privst->err)) {
VIR_WARN0("Failed to unmarshall error");
return -1;
}
}
return 0;
default:
VIR_WARN("Stream with unexpected serial=%d, proc=%d, status=%d",
hdr->serial, hdr->proc, hdr->status);
return -1;
}
}
static int static int
remoteIOHandleInput(virConnectPtr conn, struct private_data *priv, remoteIOHandleInput(virConnectPtr conn, struct private_data *priv,
@ -7283,6 +7798,9 @@ remoteIOEventLoop(virConnectPtr conn,
tmp = tmp->next; tmp = tmp->next;
} }
if (priv->streams)
fds[0].events |= POLLIN;
/* Release lock while poll'ing so other threads /* Release lock while poll'ing so other threads
* can stuff themselves on the queue */ * can stuff themselves on the queue */
remoteDriverUnlock(priv); remoteDriverUnlock(priv);