client rpc: Don't drop non-blocking calls

So far, we were dropping non-blocking calls whenever sending them would
block. In case a client is sending lots of stream calls (which are not
supposed to generate any reply), the assumption that having other calls
in a queue is sufficient to get a reply from the server doesn't work. I
tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but
failed and reverted that commit.

With this patch, non-blocking calls are never dropped (unless the
connection is being closed) and will always be sent.
(cherry picked from commit 78602c4e83bbf51f89c24a0be14074e71d01bf60)
This commit is contained in:
Jiri Denemark 2012-06-08 14:21:00 +02:00 committed by Cole Robinson
parent 8cb0d0893f
commit 5badf8c44b

View File

@ -58,7 +58,6 @@ struct _virNetClientCall {
bool expectReply; bool expectReply;
bool nonBlock; bool nonBlock;
bool haveThread; bool haveThread;
bool sentSomeData;
virCond cond; virCond cond;
@ -108,6 +107,10 @@ struct _virNetClient {
}; };
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall);
static void virNetClientLock(virNetClientPtr client) static void virNetClientLock(virNetClientPtr client)
{ {
virMutexLock(&client->lock); virMutexLock(&client->lock);
@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client)
virNetClientLock(client); virNetClientLock(client);
/* If there is a thread polling for data on the socket, set wantClose flag client->wantClose = true;
* and wake the thread up or just immediately close the socket when no-one
* is polling on it. /* If there is a thread polling for data on the socket, wake the thread up
* otherwise try to pass the buck to a possibly waiting thread. If no
* thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the
* queue and close the client because we set client->wantClose.
*/ */
if (client->waitDispatch) { if (client->haveTheBuck) {
char ignore = 1; char ignore = 1;
size_t len = sizeof(ignore); size_t len = sizeof(ignore);
client->wantClose = true;
if (safewrite(client->wakeupSendFD, &ignore, len) != len) if (safewrite(client->wakeupSendFD, &ignore, len) != len)
VIR_ERROR(_("failed to wake up polling thread")); VIR_ERROR(_("failed to wake up polling thread"));
} else { } else {
virNetClientCloseLocked(client); virNetClientIOEventLoopPassTheBuck(client, NULL);
} }
virNetClientUnlock(client); virNetClientUnlock(client);
@ -967,8 +972,6 @@ virNetClientIOWriteMessage(virNetClientPtr client,
ret = virNetSocketWrite(client->sock, ret = virNetSocketWrite(client->sock,
thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->buffer + thecall->msg->bufferOffset,
thecall->msg->bufferLength - thecall->msg->bufferOffset); thecall->msg->bufferLength - thecall->msg->bufferOffset);
if (ret > 0 || virNetSocketHasPendingData(client->sock))
thecall->sentSomeData = true;
if (ret <= 0) if (ret <= 0)
return ret; return ret;
@ -1185,7 +1188,25 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
} }
static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, static bool
virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call,
void *opaque)
{
virNetClientCallPtr thiscall = opaque;
if (call != thiscall && call->nonBlock && call->haveThread) {
VIR_DEBUG("Waking up sleep %p", call);
call->haveThread = false;
virCondSignal(&call->cond);
return true;
}
return false;
}
static bool
virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call,
void *opaque) void *opaque)
{ {
virNetClientCallPtr thiscall = opaque; virNetClientCallPtr thiscall = opaque;
@ -1193,63 +1214,17 @@ static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
if (call == thiscall) if (call == thiscall)
return false; return false;
if (!call->nonBlock)
return false;
if (call->sentSomeData) {
/*
* If some data has been sent we must keep it in the list,
* but still wakeup any thread
*/
if (call->haveThread) {
VIR_DEBUG("Waking up sleep %p", call);
virCondSignal(&call->cond);
} else {
VIR_DEBUG("Keeping unfinished call %p in the list", call);
}
return false;
} else {
/*
* If no data has been sent, we can remove it from the list.
* Wakup any thread, otherwise free the caller ourselves
*/
if (call->haveThread) {
VIR_DEBUG("Waking up sleep %p", call);
virCondSignal(&call->cond);
} else {
VIR_DEBUG("Removing call %p", call); VIR_DEBUG("Removing call %p", call);
if (call->expectReply)
VIR_WARN("Got a call expecting a reply but without a waiting thread");
ignore_value(virCondDestroy(&call->cond)); ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call->msg); VIR_FREE(call->msg);
VIR_FREE(call); VIR_FREE(call);
}
return true; return true;
} }
}
static void static void
virNetClientIOEventLoopRemoveAll(virNetClientPtr client, virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall) virNetClientCallPtr thiscall)
{
if (!client->waitDispatch)
return;
if (client->waitDispatch == thiscall) {
/* just pretend nothing was sent and the caller will free the call */
thiscall->sentSomeData = false;
} else {
virNetClientCallPtr call = client->waitDispatch;
virNetClientCallRemove(&client->waitDispatch, call);
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call->msg);
VIR_FREE(call);
}
}
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
{ {
VIR_DEBUG("Giving up the buck %p", thiscall); VIR_DEBUG("Giving up the buck %p", thiscall);
virNetClientCallPtr tmp = client->waitDispatch; virNetClientCallPtr tmp = client->waitDispatch;
@ -1268,14 +1243,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
VIR_DEBUG("No thread to pass the buck to"); VIR_DEBUG("No thread to pass the buck to");
if (client->wantClose) { if (client->wantClose) {
virNetClientCloseLocked(client); virNetClientCloseLocked(client);
virNetClientIOEventLoopRemoveAll(client, thiscall); virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveAll,
thiscall);
} }
} }
static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) static bool
virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call,
void *opaque ATTRIBUTE_UNUSED)
{ {
return call->nonBlock; return call->nonBlock && call->haveThread;
} }
/* /*
@ -1308,8 +1287,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
if (virNetSocketHasCachedData(client->sock) || client->wantClose) if (virNetSocketHasCachedData(client->sock) || client->wantClose)
timeout = 0; timeout = 0;
/* If there are any non-blocking calls in the queue, /* If there are any non-blocking calls with an associated thread
* then we don't want to sleep in poll() * in the queue, then we don't want to sleep in poll()
*/ */
if (virNetClientCallMatchPredicate(client->waitDispatch, if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock, virNetClientIOEventLoopWantNonBlock,
@ -1382,12 +1361,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
} }
/* If we were woken up because a new non-blocking call was queued, /* If we were woken up because a new non-blocking call was queued,
* we need to re-poll to check if we can send it. * we need to re-poll to check if we can send it. To be precise, we
* will re-poll even if a blocking call arrived when unhandled
* non-blocking calls are still in the queue. But this can't hurt.
*/ */
if (virNetClientCallMatchPredicate(client->waitDispatch, if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock, virNetClientIOEventLoopWantNonBlock,
NULL)) { NULL)) {
VIR_DEBUG("New non-blocking call arrived; repolling"); VIR_DEBUG("The queue contains new non-blocking call(s);"
" repolling");
continue; continue;
} }
} }
@ -1412,17 +1394,17 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
} }
/* Iterate through waiting calls and if any are /* Iterate through waiting calls and if any are
* complete, remove them from the dispatch list.. * complete, remove them from the dispatch list.
*/ */
virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone, virNetClientIOEventLoopRemoveDone,
thiscall); thiscall);
/* Iterate through waiting calls and if any are /* Iterate through waiting calls and wake up and detach threads
* non-blocking, remove them from the dispatch list... * attached to non-blocking calls.
*/ */
virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopRemoveNonBlocking, virNetClientIOEventLoopDetachNonBlocking,
thiscall); thiscall);
/* Now see if *we* are done */ /* Now see if *we* are done */
@ -1432,15 +1414,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
return 2; return 2;
} }
/* We're not done, but we're non-blocking */ /* We're not done, but we're non-blocking; keep the call queued */
if (thiscall->nonBlock) { if (thiscall->nonBlock) {
thiscall->haveThread = false;
virNetClientIOEventLoopPassTheBuck(client, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall);
if (thiscall->sentSomeData) {
return 1; return 1;
} else {
virNetClientCallRemove(&client->waitDispatch, thiscall);
return 0;
}
} }
if (fds[0].revents & (POLLHUP | POLLERR)) { if (fds[0].revents & (POLLHUP | POLLERR)) {
@ -1450,7 +1428,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
} }
} }
error: error:
virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetClientIOEventLoopPassTheBuck(client, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall);
@ -1602,9 +1579,11 @@ static int virNetClientIO(virNetClientPtr client,
goto cleanup; goto cleanup;
} }
/* If we're non-blocking, get outta here */ /* If we're non-blocking, we were either queued (and detached) or the
* call was not sent because of an error.
*/
if (thiscall->nonBlock) { if (thiscall->nonBlock) {
if (thiscall->sentSomeData) if (!thiscall->haveThread)
rv = 1; /* In progress */ rv = 1; /* In progress */
else else
rv = 0; /* none at all */ rv = 0; /* none at all */
@ -1696,7 +1675,7 @@ done:
/* /*
* Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), * Returns 2 if fully sent, 1 if queued (only for nonBlock==true),
* 0 if nothing sent (only for nonBlock==true) and -1 on error * 0 if nothing sent (only for nonBlock==true) and -1 on error
*/ */
static int virNetClientSendInternal(virNetClientPtr client, static int virNetClientSendInternal(virNetClientPtr client,
@ -1756,15 +1735,14 @@ static int virNetClientSendInternal(virNetClientPtr client,
ret = virNetClientIO(client, call); ret = virNetClientIO(client, call);
/* If partially sent, then the call is still on the dispatch queue */ /* If queued, the call will be finished and freed later by another thread;
if (ret == 1) { * we're done. */
call->haveThread = false; if (ret == 1)
} else { return 1;
ignore_value(virCondDestroy(&call->cond)); ignore_value(virCondDestroy(&call->cond));
}
cleanup: cleanup:
if (ret != 1)
VIR_FREE(call); VIR_FREE(call);
return ret; return ret;
} }