Handle outgoing data streams in libvirtd

* daemon/dispatch.c: Set streamTX flag on outgoing data packets
* daemon/qemud.h: Add streamTX flag to track outgoing data
* daemon/qemud.c: Re-enable further TX when outgoing data packet
  has been fully sent.
* daemon/stream.h, daemon/stream.c: Add method for enabling TX.
  Support reading from streams and transmitting data out to client
This commit is contained in:
Daniel P. Berrange 2009-08-24 20:57:16 +01:00
parent d790a66d6b
commit 4f17809a36
5 changed files with 106 additions and 1 deletions

View File

@ -636,6 +636,8 @@ remoteSendStreamData(struct qemud_client *client,
DEBUG("Total %d", msg->bufferOffset);
}
if (data)
msg->streamTX = 1;
/* Reset ready for I/O */
msg->bufferLength = msg->bufferOffset;

View File

@ -1898,7 +1898,9 @@ void
qemudClientMessageRelease(struct qemud_client *client,
struct qemud_client_message *msg)
{
if (!msg->async)
if (msg->streamTX) {
remoteStreamMessageFinished(client, msg);
} else if (!msg->async)
client->nrequests--;
/* See if the recv queue is currently throttled */

View File

@ -130,6 +130,7 @@ struct qemud_client_message {
unsigned int bufferOffset;
unsigned int async : 1;
unsigned int streamTX : 1;
remote_message_header hdr;

View File

@ -32,6 +32,9 @@ static int
remoteStreamHandleWrite(struct qemud_client *client,
struct qemud_client_stream *stream);
static int
remoteStreamHandleRead(struct qemud_client *client,
struct qemud_client_stream *stream);
static int
remoteStreamHandleFinish(struct qemud_client *client,
struct qemud_client_stream *stream,
struct qemud_client_message *msg);
@ -48,6 +51,8 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream)
int newEvents = 0;
if (stream->rx)
newEvents |= VIR_STREAM_EVENT_WRITABLE;
if (stream->tx && !stream->recvEOF)
newEvents |= VIR_STREAM_EVENT_READABLE;
virStreamEventUpdateCallback(stream->st, newEvents);
}
@ -87,6 +92,16 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque)
}
}
if (!stream->recvEOF &&
(events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) {
events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP);
if (remoteStreamHandleRead(client, stream) < 0) {
remoteRemoveClientStream(client, stream);
qemudDispatchClientFailure(client);
goto cleanup;
}
}
if (!stream->closed &&
(events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
int ret;
@ -507,3 +522,84 @@ remoteStreamHandleWrite(struct qemud_client *client,
return 0;
}
/*
* Invoked when a stream is signalled as having data
* available to read. This reads upto one message
* worth of data, and then queues that for transmission
* to the client.
*
* Returns 0 if data was queued for TX, or a error RPC
* was sent, or -1 on fatal error, indicating client should
* be killed
*/
static int
remoteStreamHandleRead(struct qemud_client *client,
struct qemud_client_stream *stream)
{
char *buffer;
size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX;
int ret;
DEBUG("stream=%p", stream);
/* Shouldn't ever be called unless we're marked able to
* transmit, but doesn't hurt to check */
if (!stream->tx)
return 0;
if (VIR_ALLOC_N(buffer, bufferLen) < 0)
return -1;
ret = virStreamRecv(stream->st, buffer, bufferLen);
if (ret == -2) {
/* Should never get this, since we're only called when we know
* we're readable, but hey things change... */
ret = 0;
} else if (ret < 0) {
remote_error rerr;
memset(&rerr, 0, sizeof rerr);
remoteDispatchConnError(&rerr, NULL);
ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial);
} else {
stream->tx = 0;
if (ret == 0)
stream->recvEOF = 1;
ret = remoteSendStreamData(client, stream, buffer, ret);
}
VIR_FREE(buffer);
return ret;
}
/*
* Invoked when an outgoing data packet message has been fully sent.
* This simply re-enables TX of further data.
*
* The idea is to stop the daemon growing without bound due to
* fast stream, but slow client
*/
void
remoteStreamMessageFinished(struct qemud_client *client,
struct qemud_client_message *msg)
{
struct qemud_client_stream *stream = client->streams;
while (stream) {
if (msg->hdr.proc == stream->procedure &&
msg->hdr.serial == stream->serial)
break;
stream = stream->next;
}
DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial);
if (stream) {
stream->tx = 1;
remoteStreamUpdateEvents(stream);
}
}

View File

@ -46,4 +46,8 @@ int
remoteRemoveClientStream(struct qemud_client *client,
struct qemud_client_stream *stream);
void
remoteStreamMessageFinished(struct qemud_client *client,
struct qemud_client_message *msg);
#endif /* __LIBVIRTD_STREAM_H__ */