Allow non-blocking message sending on virNetClient

Add a new virNetClientSendNonBlock which returns 2 on
full send, 1 on partial send, 0 on no send, -1 on error

If a partial send occurs, then a subsequent call to any
of the virNetClientSend* APIs will finish any outstanding
I/O.

TODO: the virNetClientEvent event handler could be used
to speed up completion of partial sends if an event loop
is present.

* src/rpc/virnetsocket.h, src/rpc/virnetsocket.c: Add new
  virNetSocketHasPendingData() API to test for cached
  data pending send.
* src/rpc/virnetclient.c, src/rpc/virnetclient.h: Add new
  virNetClientSendNonBlock() API to send non-blocking API
This commit is contained in:
Daniel P. Berrange 2011-11-08 09:13:27 +00:00
parent b196220337
commit ff465ad203
4 changed files with 201 additions and 18 deletions

View File

@ -55,6 +55,9 @@ struct _virNetClientCall {
virNetMessagePtr msg; virNetMessagePtr msg;
bool expectReply; bool expectReply;
bool nonBlock;
bool haveThread;
bool sentSomeData;
virCond cond; virCond cond;
@ -86,7 +89,12 @@ struct _virNetClient {
int wakeupSendFD; int wakeupSendFD;
int wakeupReadFD; int wakeupReadFD;
/* List of threads currently waiting for dispatch */ /*
* List of calls currently waiting for dispatch
* The calls should all have threads waiting for
* them, except possibly the first call in the list
* which might be a partially sent non-blocking call.
*/
virNetClientCallPtr waitDispatch; virNetClientCallPtr waitDispatch;
/* True if a thread holds the buck */ /* True if a thread holds the buck */
bool haveTheBuck; bool haveTheBuck;
@ -648,7 +656,7 @@ virNetClientCallDispatchReply(virNetClientPtr client)
virNetClientCallPtr thecall; virNetClientCallPtr thecall;
/* Ok, definitely got an RPC reply now find /* Ok, definitely got an RPC reply now find
out who's been waiting for it */ out which waiting call is associated with it */
thecall = client->waitDispatch; thecall = client->waitDispatch;
while (thecall && while (thecall &&
!(thecall->msg->header.prog == client->msg.header.prog && !(thecall->msg->header.prog == client->msg.header.prog &&
@ -824,6 +832,8 @@ virNetClientIOWriteMessage(virNetClientPtr client,
ret = virNetSocketWrite(client->sock, ret = virNetSocketWrite(client->sock,
thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->buffer + thecall->msg->bufferOffset,
thecall->msg->bufferLength - thecall->msg->bufferOffset); thecall->msg->bufferLength - thecall->msg->bufferOffset);
if (ret > 0 || virNetSocketHasPendingData(client->sock))
thecall->sentSomeData = true;
if (ret <= 0) if (ret <= 0)
return ret; return ret;
@ -1015,17 +1025,71 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
return false; return false;
/* /*
* ...they won't actually wakeup until * ...if the call being removed from the list
* still has a thread, then wake that thread up,
* otherwise free the call. The latter should
* only happen for calls without replies.
*
* ...the threads won't actually wakeup until
* we release our mutex a short while * we release our mutex a short while
* later... * later...
*/ */
VIR_DEBUG("Waking up sleeping call %p", call); if (call->haveThread) {
virCondSignal(&call->cond); VIR_DEBUG("Waking up sleep %p", call);
virCondSignal(&call->cond);
} else {
if (call->expectReply)
VIR_WARN("Got a call expecting a reply but without a waiting thread");
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call->msg);
VIR_FREE(call);
}
return true; return true;
} }
static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
void *opaque)
{
virNetClientCallPtr thiscall = opaque;
if (call == thiscall)
return false;
if (!call->nonBlock)
return false;
if (call->sentSomeData) {
/*
* If some data has been sent we must keep it in the list,
* but still wakeup any thread
*/
if (call->haveThread) {
VIR_DEBUG("Waking up sleep %p", call);
virCondSignal(&call->cond);
}
return false;
} else {
/*
* If no data has been sent, we can remove it from the list.
* Wakup any thread, otherwise free the caller ourselves
*/
if (call->haveThread) {
VIR_DEBUG("Waking up sleep %p", call);
virCondSignal(&call->cond);
} else {
if (call->expectReply)
VIR_WARN("Got a call expecting a reply but without a waiting thread");
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call->msg);
VIR_FREE(call);
}
return true;
}
}
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
{ {
VIR_DEBUG("Giving up the buck %p", thiscall); VIR_DEBUG("Giving up the buck %p", thiscall);
@ -1033,19 +1097,29 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
/* See if someone else is still waiting /* See if someone else is still waiting
* and if so, then pass the buck ! */ * and if so, then pass the buck ! */
while (tmp) { while (tmp) {
if (tmp != thiscall) { if (tmp != thiscall && tmp->haveThread) {
VIR_DEBUG("Passing the buck to %p", tmp); VIR_DEBUG("Passing the buck to %p", tmp);
virCondSignal(&tmp->cond); virCondSignal(&tmp->cond);
break; break;
} }
tmp = tmp->next; tmp = tmp->next;
} }
VIR_DEBUG("No thread to pass the buck to");
}
static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED)
{
return call->nonBlock;
} }
/* /*
* Process all calls pending dispatch/receive until we * Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck * get a reply to our own call. Then quit and pass the buck
* to someone else. * to someone else.
*
* Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
* 0 if nothing sent (only for nonBlock==true) and -1 on error
*/ */
static int virNetClientIOEventLoop(virNetClientPtr client, static int virNetClientIOEventLoop(virNetClientPtr client,
virNetClientCallPtr thiscall) virNetClientCallPtr thiscall)
@ -1068,6 +1142,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
if (virNetSocketHasCachedData(client->sock)) if (virNetSocketHasCachedData(client->sock))
timeout = 0; timeout = 0;
/* If there are any non-blocking calls in the queue,
* then we don't want to sleep in poll()
*/
if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock,
NULL))
timeout = 0;
fds[0].events = fds[0].revents = 0; fds[0].events = fds[0].revents = 0;
fds[1].events = fds[1].revents = 0; fds[1].events = fds[1].revents = 0;
@ -1116,8 +1198,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
/* If we have existing SASL decoded data, pretend /* If we have existing SASL decoded data, pretend
* the socket became readable so we consume it * the socket became readable so we consume it
*/ */
if (virNetSocketHasCachedData(client->sock)) if (virNetSocketHasCachedData(client->sock)) {
fds[0].revents |= POLLIN; fds[0].revents |= POLLIN;
}
if (fds[1].revents) { if (fds[1].revents) {
VIR_DEBUG("Woken up from poll by other thread"); VIR_DEBUG("Woken up from poll by other thread");
@ -1129,6 +1212,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
} }
if (ret < 0) { if (ret < 0) {
/* XXX what's this dubious errno check doing ? */
if (errno == EWOULDBLOCK) if (errno == EWOULDBLOCK)
continue; continue;
virReportSystemError(errno, virReportSystemError(errno,
@ -1146,20 +1230,32 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
goto error; goto error;
} }
/* Iterate through waiting threads and if /* Iterate through waiting calls and if any are
* any are complete then tell 'em to wakeup * complete, remove them from the dispatch list..
*/ */
virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone, virNetClientIOEventLoopRemoveDone,
thiscall); thiscall);
/* Iterate through waiting calls and if any are
* non-blocking, remove them from the dispatch list...
*/
virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveNonBlocking,
thiscall);
/* Now see if *we* are done */ /* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetClientIOEventLoopPassTheBuck(client, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall);
return 0; return 2;
} }
/* We're not done, but we're non-blocking */
if (thiscall->nonBlock) {
virNetClientIOEventLoopPassTheBuck(client, thiscall);
return thiscall->sentSomeData ? 1 : 0;
}
if (fds[0].revents & (POLLHUP | POLLERR)) { if (fds[0].revents & (POLLHUP | POLLERR)) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s", virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
@ -1218,7 +1314,31 @@ static void virNetClientIOUpdateCallback(virNetClientPtr client,
* a strategy in power politics when the actions of one country/ * a strategy in power politics when the actions of one country/
* nation are blamed on another, providing an opportunity for war." * nation are blamed on another, providing an opportunity for war."
* *
* NB(5) Don't Panic! * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller
* must *NOT* free it, if this returns '1' (ie partial send).
*
* NB(6) The following input states are valid if *no* threads
* are currently executing this method
*
* - waitDispatch == NULL,
* - waitDispatch != NULL, waitDispatch.nonBlock == true
*
* The following input states are valid, if n threads are currently
* executing
*
* - waitDispatch != NULL
* - 0 or 1 waitDispatch.nonBlock == false, without any threads
* - 0 or more waitDispatch.nonBlock == false, with threads
*
* The following output states are valid when all threads are done
*
* - waitDispatch == NULL,
* - waitDispatch != NULL, waitDispatch.nonBlock == true
*
* NB(7) Don't Panic!
*
* Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
* 0 if nothing sent (only for nonBlock==true) and -1 on error
*/ */
static int virNetClientIO(virNetClientPtr client, static int virNetClientIO(virNetClientPtr client,
virNetClientCallPtr thiscall) virNetClientCallPtr thiscall)
@ -1259,14 +1379,15 @@ static int virNetClientIO(virNetClientPtr client,
} }
VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall);
/* Two reasons we can be woken up /* Three reasons we can be woken up
* 1. Other thread has got our reply ready for us * 1. Other thread has got our reply ready for us
* 2. Other thread is all done, and it is our turn to * 2. Other thread is all done, and it is our turn to
* be the dispatcher to finish waiting for * be the dispatcher to finish waiting for
* our reply * our reply
* 3. I/O was expected to block
*/ */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
rv = 0; rv = 2;
/* /*
* We avoided catching the buck and our reply is ready ! * We avoided catching the buck and our reply is ready !
* We've already had 'thiscall' removed from the list * We've already had 'thiscall' removed from the list
@ -1275,6 +1396,15 @@ static int virNetClientIO(virNetClientPtr client,
goto cleanup; goto cleanup;
} }
/* If we're non-blocking, get outta here */
if (thiscall->nonBlock) {
if (thiscall->sentSomeData)
rv = 1; /* In progress */
else
rv = 0; /* none at all */
goto cleanup;
}
/* Grr, someone passed the buck onto us ... */ /* Grr, someone passed the buck onto us ... */
} }
@ -1348,9 +1478,14 @@ done:
} }
/*
* Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
* 0 if nothing sent (only for nonBlock==true) and -1 on error
*/
static int virNetClientSendInternal(virNetClientPtr client, static int virNetClientSendInternal(virNetClientPtr client,
virNetMessagePtr msg, virNetMessagePtr msg,
bool expectReply) bool expectReply,
bool nonBlock)
{ {
virNetClientCallPtr call; virNetClientCallPtr call;
int ret = -1; int ret = -1;
@ -1369,6 +1504,12 @@ static int virNetClientSendInternal(virNetClientPtr client,
return -1; return -1;
} }
if (expectReply && nonBlock) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Attempt to send an non-blocking message with a synchronous reply"));
return -1;
}
if (VIR_ALLOC(call) < 0) { if (VIR_ALLOC(call) < 0) {
virReportOOMError(); virReportOOMError();
return -1; return -1;
@ -1389,16 +1530,24 @@ static int virNetClientSendInternal(virNetClientPtr client,
call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
call->msg = msg; call->msg = msg;
call->expectReply = expectReply; call->expectReply = expectReply;
call->nonBlock = nonBlock;
call->haveThread = true;
ret = virNetClientIO(client, call); ret = virNetClientIO(client, call);
cleanup: cleanup:
ignore_value(virCondDestroy(&call->cond)); /* If partially sent, then the call is still on the dispatch queue */
VIR_FREE(call); if (ret == 1) {
call->haveThread = false;
} else {
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call);
}
virNetClientUnlock(client); virNetClientUnlock(client);
return ret; return ret;
} }
/* /*
* @msg: a message allocated on heap or stack * @msg: a message allocated on heap or stack
* *
@ -1412,7 +1561,7 @@ cleanup:
int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendWithReply(virNetClientPtr client,
virNetMessagePtr msg) virNetMessagePtr msg)
{ {
int ret = virNetClientSendInternal(client, msg, true); int ret = virNetClientSendInternal(client, msg, true, false);
if (ret < 0) if (ret < 0)
return -1; return -1;
return 0; return 0;
@ -1432,8 +1581,24 @@ int virNetClientSendWithReply(virNetClientPtr client,
int virNetClientSendNoReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client,
virNetMessagePtr msg) virNetMessagePtr msg)
{ {
int ret = virNetClientSendInternal(client, msg, false); int ret = virNetClientSendInternal(client, msg, false, false);
if (ret < 0) if (ret < 0)
return -1; return -1;
return 0; return 0;
} }
/*
* @msg: a message allocated on the heap.
*
* Send a message asynchronously, without any reply
*
* The caller is responsible for free'ing @msg, *except* if
* this method returns -1.
*
* Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error
*/
int virNetClientSendNonBlock(virNetClientPtr client,
virNetMessagePtr msg)
{
return virNetClientSendInternal(client, msg, false, true);
}

View File

@ -73,6 +73,10 @@ int virNetClientSendWithReply(virNetClientPtr client,
int virNetClientSendNoReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client,
virNetMessagePtr msg); virNetMessagePtr msg);
int virNetClientSendNonBlock(virNetClientPtr client,
virNetMessagePtr msg);
# ifdef HAVE_SASL # ifdef HAVE_SASL
void virNetClientSetSASLSession(virNetClientPtr client, void virNetClientSetSASLSession(virNetClientPtr client,
virNetSASLSessionPtr sasl); virNetSASLSessionPtr sasl);

View File

@ -932,6 +932,19 @@ bool virNetSocketHasCachedData(virNetSocketPtr sock ATTRIBUTE_UNUSED)
} }
bool virNetSocketHasPendingData(virNetSocketPtr sock ATTRIBUTE_UNUSED)
{
bool hasPending = false;
virMutexLock(&sock->lock);
#if HAVE_SASL
if (sock->saslEncoded)
hasPending = true;
#endif
virMutexUnlock(&sock->lock);
return hasPending;
}
static ssize_t virNetSocketReadWire(virNetSocketPtr sock, char *buf, size_t len) static ssize_t virNetSocketReadWire(virNetSocketPtr sock, char *buf, size_t len)
{ {
char *errout = NULL; char *errout = NULL;

View File

@ -106,6 +106,7 @@ void virNetSocketSetSASLSession(virNetSocketPtr sock,
virNetSASLSessionPtr sess); virNetSASLSessionPtr sess);
# endif # endif
bool virNetSocketHasCachedData(virNetSocketPtr sock); bool virNetSocketHasCachedData(virNetSocketPtr sock);
bool virNetSocketHasPendingData(virNetSocketPtr sock);
void virNetSocketRef(virNetSocketPtr sock); void virNetSocketRef(virNetSocketPtr sock);
void virNetSocketFree(virNetSocketPtr sock); void virNetSocketFree(virNetSocketPtr sock);