Add support for async close of client RPC socket

This commit is contained in:
Jiri Denemark 2011-09-22 15:47:29 +02:00
parent f4324e3292
commit 673adba594

View File

@ -101,9 +101,13 @@ struct _virNetClient {
size_t nstreams;
virNetClientStreamPtr *streams;
bool wantClose;
};
void virNetClientRequestClose(virNetClientPtr client);
static void virNetClientLock(virNetClientPtr client)
{
virMutexLock(&client->lock);
@ -409,12 +413,14 @@ void virNetClientFree(virNetClientPtr client)
}
void virNetClientClose(virNetClientPtr client)
static void
virNetClientCloseLocked(virNetClientPtr client)
{
if (!client)
VIR_DEBUG("client=%p, sock=%p", client, client->sock);
if (!client->sock)
return;
virNetClientLock(client);
virNetSocketRemoveIOCallback(client->sock);
virNetSocketFree(client->sock);
client->sock = NULL;
@ -424,6 +430,41 @@ void virNetClientClose(virNetClientPtr client)
virNetSASLSessionFree(client->sasl);
client->sasl = NULL;
#endif
client->wantClose = false;
}
void virNetClientClose(virNetClientPtr client)
{
if (!client)
return;
virNetClientLock(client);
virNetClientCloseLocked(client);
virNetClientUnlock(client);
}
void
virNetClientRequestClose(virNetClientPtr client)
{
VIR_DEBUG("client=%p", client);
virNetClientLock(client);
/* If there is a thread polling for data on the socket, set wantClose flag
* and wake the thread up or just immediately close the socket when no-one
* is polling on it.
*/
if (client->waitDispatch) {
char ignore = 1;
size_t len = sizeof(ignore);
client->wantClose = true;
if (safewrite(client->wakeupSendFD, &ignore, len) != len)
VIR_ERROR(_("failed to wake up polling thread"));
} else {
virNetClientCloseLocked(client);
}
virNetClientUnlock(client);
}
@ -1096,6 +1137,26 @@ static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
}
static void
virNetClientIOEventLoopRemoveAll(virNetClientPtr client,
virNetClientCallPtr thiscall)
{
if (!client->waitDispatch)
return;
if (client->waitDispatch == thiscall) {
/* just pretend nothing was sent and the caller will free the call */
thiscall->sentSomeData = false;
} else {
virNetClientCallPtr call = client->waitDispatch;
virNetClientCallRemove(&client->waitDispatch, call);
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call->msg);
VIR_FREE(call);
}
}
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
{
VIR_DEBUG("Giving up the buck %p", thiscall);
@ -1110,7 +1171,12 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
}
tmp = tmp->next;
}
VIR_DEBUG("No thread to pass the buck to");
if (client->wantClose) {
virNetClientCloseLocked(client);
virNetClientIOEventLoopRemoveAll(client, thiscall);
}
}
@ -1141,11 +1207,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
sigset_t oldmask, blockedsigs;
int timeout = -1;
/* If we have existing SASL decoded data we
* don't want to sleep in the poll(), just
* check if any other FDs are also ready
/* If we have existing SASL decoded data we don't want to sleep in
* the poll(), just check if any other FDs are also ready.
* If the connection is going to be closed, we don't want to sleep in
* poll() either.
*/
if (virNetSocketHasCachedData(client->sock))
if (virNetSocketHasCachedData(client->sock) || client->wantClose)
timeout = 0;
/* If there are any non-blocking calls in the queue,
@ -1208,6 +1275,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
fds[0].revents |= POLLIN;
}
/* If wantClose flag is set, pretend there was an error on the socket
*/
if (client->wantClose)
fds[0].revents = POLLERR;
if (fds[1].revents) {
VIR_DEBUG("Woken up from poll by other thread");
if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
@ -1441,6 +1513,7 @@ static int virNetClientIO(virNetClientPtr client,
virResetLastError();
rv = virNetClientIOEventLoop(client, thiscall);
if (client->sock)
virNetClientIOUpdateCallback(client, true);
if (rv == 0 &&
@ -1467,7 +1540,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
goto done;
/* This should be impossible, but it doesn't hurt to check */
if (client->haveTheBuck)
if (client->haveTheBuck || client->wantClose)
goto done;
VIR_DEBUG("Event fired %p %d", sock, events);
@ -1528,6 +1601,12 @@ static int virNetClientSendInternal(virNetClientPtr client,
virNetClientLock(client);
if (!client->sock || client->wantClose) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("client socket is closed"));
goto unlock;
}
if (virCondInit(&call->cond) < 0) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot initialize condition variable"));
@ -1554,6 +1633,8 @@ cleanup:
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call);
}
unlock:
virNetClientUnlock(client);
return ret;
}