diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 7f6437f2b6..73f45fc434 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -58,7 +58,6 @@ struct _virNetClientCall { bool expectReply; bool nonBlock; bool haveThread; - bool sentSomeData; virCond cond; @@ -108,6 +107,10 @@ struct _virNetClient { }; +static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, + virNetClientCallPtr thiscall); + + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client) virNetClientLock(client); - /* If there is a thread polling for data on the socket, set wantClose flag - * and wake the thread up or just immediately close the socket when no-one - * is polling on it. + client->wantClose = true; + + /* If there is a thread polling for data on the socket, wake the thread up + * otherwise try to pass the buck to a possibly waiting thread. If no + * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the + * queue and close the client because we set client->wantClose. */ - if (client->waitDispatch) { + if (client->haveTheBuck) { char ignore = 1; size_t len = sizeof(ignore); - client->wantClose = true; if (safewrite(client->wakeupSendFD, &ignore, len) != len) VIR_ERROR(_("failed to wake up polling thread")); } else { - virNetClientCloseLocked(client); + virNetClientIOEventLoopPassTheBuck(client, NULL); } virNetClientUnlock(client); @@ -967,8 +972,6 @@ virNetClientIOWriteMessage(virNetClientPtr client, ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); - if (ret > 0 || virNetSocketHasPendingData(client->sock)) - thecall->sentSomeData = true; if (ret <= 0) return ret; @@ -1185,71 +1188,43 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, } -static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, - void *opaque) +static bool +virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call != thiscall && call->nonBlock && call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + call->haveThread = false; + virCondSignal(&call->cond); + return true; + } + + return false; +} + + +static bool +virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call, + void *opaque) { virNetClientCallPtr thiscall = opaque; if (call == thiscall) return false; - if (!call->nonBlock) - return false; - - if (call->sentSomeData) { - /* - * If some data has been sent we must keep it in the list, - * but still wakeup any thread - */ - if (call->haveThread) { - VIR_DEBUG("Waking up sleep %p", call); - virCondSignal(&call->cond); - } else { - VIR_DEBUG("Keeping unfinished call %p in the list", call); - } - return false; - } else { - /* - * If no data has been sent, we can remove it from the list. - * Wakup any thread, otherwise free the caller ourselves - */ - if (call->haveThread) { - VIR_DEBUG("Waking up sleep %p", call); - virCondSignal(&call->cond); - } else { - VIR_DEBUG("Removing call %p", call); - if (call->expectReply) - VIR_WARN("Got a call expecting a reply but without a waiting thread"); - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call->msg); - VIR_FREE(call); - } - return true; - } + VIR_DEBUG("Removing call %p", call); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call->msg); + VIR_FREE(call); + return true; } static void -virNetClientIOEventLoopRemoveAll(virNetClientPtr client, - virNetClientCallPtr thiscall) -{ - if (!client->waitDispatch) - return; - - if (client->waitDispatch == thiscall) { - /* just pretend nothing was sent and the caller will free the call */ - thiscall->sentSomeData = false; - } else { - virNetClientCallPtr call = client->waitDispatch; - virNetClientCallRemove(&client->waitDispatch, call); - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call->msg); - VIR_FREE(call); - } -} - - -static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) +virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, + virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); virNetClientCallPtr tmp = client->waitDispatch; @@ -1268,14 +1243,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli VIR_DEBUG("No thread to pass the buck to"); if (client->wantClose) { virNetClientCloseLocked(client); - virNetClientIOEventLoopRemoveAll(client, thiscall); + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveAll, + thiscall); } } -static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) +static bool +virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, + void *opaque ATTRIBUTE_UNUSED) { - return call->nonBlock; + return call->nonBlock && call->haveThread; } /* @@ -1308,8 +1287,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; - /* If there are any non-blocking calls in the queue, - * then we don't want to sleep in poll() + /* If there are any non-blocking calls with an associated thread + * in the queue, then we don't want to sleep in poll() */ if (virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopWantNonBlock, @@ -1382,12 +1361,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } /* If we were woken up because a new non-blocking call was queued, - * we need to re-poll to check if we can send it. + * we need to re-poll to check if we can send it. To be precise, we + * will re-poll even if a blocking call arrived when unhandled + * non-blocking calls are still in the queue. But this can't hurt. */ if (virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopWantNonBlock, NULL)) { - VIR_DEBUG("New non-blocking call arrived; repolling"); + VIR_DEBUG("The queue contains new non-blocking call(s);" + " repolling"); continue; } } @@ -1412,18 +1394,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } /* Iterate through waiting calls and if any are - * complete, remove them from the dispatch list.. + * complete, remove them from the dispatch list. */ virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, thiscall); - /* Iterate through waiting calls and if any are - * non-blocking, remove them from the dispatch list... + /* Iterate through waiting calls and wake up and detach threads + * attached to non-blocking calls. */ - virNetClientCallRemovePredicate(&client->waitDispatch, - virNetClientIOEventLoopRemoveNonBlocking, - thiscall); + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopDetachNonBlocking, + thiscall); /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { @@ -1432,15 +1414,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, return 2; } - /* We're not done, but we're non-blocking */ + /* We're not done, but we're non-blocking; keep the call queued */ if (thiscall->nonBlock) { + thiscall->haveThread = false; virNetClientIOEventLoopPassTheBuck(client, thiscall); - if (thiscall->sentSomeData) { - return 1; - } else { - virNetClientCallRemove(&client->waitDispatch, thiscall); - return 0; - } + return 1; } if (fds[0].revents & (POLLHUP | POLLERR)) { @@ -1450,7 +1428,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - error: virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); @@ -1602,9 +1579,11 @@ static int virNetClientIO(virNetClientPtr client, goto cleanup; } - /* If we're non-blocking, get outta here */ + /* If we're non-blocking, we were either queued (and detached) or the + * call was not sent because of an error. + */ if (thiscall->nonBlock) { - if (thiscall->sentSomeData) + if (!thiscall->haveThread) rv = 1; /* In progress */ else rv = 0; /* none at all */ @@ -1696,7 +1675,7 @@ done: /* - * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * Returns 2 if fully sent, 1 if queued (only for nonBlock==true), * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientSendInternal(virNetClientPtr client, @@ -1756,16 +1735,15 @@ static int virNetClientSendInternal(virNetClientPtr client, ret = virNetClientIO(client, call); - /* If partially sent, then the call is still on the dispatch queue */ - if (ret == 1) { - call->haveThread = false; - } else { - ignore_value(virCondDestroy(&call->cond)); - } + /* If queued, the call will be finished and freed later by another thread; + * we're done. */ + if (ret == 1) + return 1; + + ignore_value(virCondDestroy(&call->cond)); cleanup: - if (ret != 1) - VIR_FREE(call); + VIR_FREE(call); return ret; }