mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-01-11 23:37:42 +00:00
Add support for async close of client RPC socket
This commit is contained in:
parent
f4324e3292
commit
673adba594
@ -101,9 +101,13 @@ struct _virNetClient {
|
||||
|
||||
size_t nstreams;
|
||||
virNetClientStreamPtr *streams;
|
||||
|
||||
bool wantClose;
|
||||
};
|
||||
|
||||
|
||||
void virNetClientRequestClose(virNetClientPtr client);
|
||||
|
||||
static void virNetClientLock(virNetClientPtr client)
|
||||
{
|
||||
virMutexLock(&client->lock);
|
||||
@ -409,12 +413,14 @@ void virNetClientFree(virNetClientPtr client)
|
||||
}
|
||||
|
||||
|
||||
void virNetClientClose(virNetClientPtr client)
|
||||
static void
|
||||
virNetClientCloseLocked(virNetClientPtr client)
|
||||
{
|
||||
if (!client)
|
||||
VIR_DEBUG("client=%p, sock=%p", client, client->sock);
|
||||
|
||||
if (!client->sock)
|
||||
return;
|
||||
|
||||
virNetClientLock(client);
|
||||
virNetSocketRemoveIOCallback(client->sock);
|
||||
virNetSocketFree(client->sock);
|
||||
client->sock = NULL;
|
||||
@ -424,6 +430,41 @@ void virNetClientClose(virNetClientPtr client)
|
||||
virNetSASLSessionFree(client->sasl);
|
||||
client->sasl = NULL;
|
||||
#endif
|
||||
client->wantClose = false;
|
||||
}
|
||||
|
||||
void virNetClientClose(virNetClientPtr client)
|
||||
{
|
||||
if (!client)
|
||||
return;
|
||||
|
||||
virNetClientLock(client);
|
||||
virNetClientCloseLocked(client);
|
||||
virNetClientUnlock(client);
|
||||
}
|
||||
|
||||
void
|
||||
virNetClientRequestClose(virNetClientPtr client)
|
||||
{
|
||||
VIR_DEBUG("client=%p", client);
|
||||
|
||||
virNetClientLock(client);
|
||||
|
||||
/* If there is a thread polling for data on the socket, set wantClose flag
|
||||
* and wake the thread up or just immediately close the socket when no-one
|
||||
* is polling on it.
|
||||
*/
|
||||
if (client->waitDispatch) {
|
||||
char ignore = 1;
|
||||
size_t len = sizeof(ignore);
|
||||
|
||||
client->wantClose = true;
|
||||
if (safewrite(client->wakeupSendFD, &ignore, len) != len)
|
||||
VIR_ERROR(_("failed to wake up polling thread"));
|
||||
} else {
|
||||
virNetClientCloseLocked(client);
|
||||
}
|
||||
|
||||
virNetClientUnlock(client);
|
||||
}
|
||||
|
||||
@ -1096,6 +1137,26 @@ static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
virNetClientIOEventLoopRemoveAll(virNetClientPtr client,
|
||||
virNetClientCallPtr thiscall)
|
||||
{
|
||||
if (!client->waitDispatch)
|
||||
return;
|
||||
|
||||
if (client->waitDispatch == thiscall) {
|
||||
/* just pretend nothing was sent and the caller will free the call */
|
||||
thiscall->sentSomeData = false;
|
||||
} else {
|
||||
virNetClientCallPtr call = client->waitDispatch;
|
||||
virNetClientCallRemove(&client->waitDispatch, call);
|
||||
ignore_value(virCondDestroy(&call->cond));
|
||||
VIR_FREE(call->msg);
|
||||
VIR_FREE(call);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
|
||||
{
|
||||
VIR_DEBUG("Giving up the buck %p", thiscall);
|
||||
@ -1110,7 +1171,12 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
|
||||
}
|
||||
tmp = tmp->next;
|
||||
}
|
||||
|
||||
VIR_DEBUG("No thread to pass the buck to");
|
||||
if (client->wantClose) {
|
||||
virNetClientCloseLocked(client);
|
||||
virNetClientIOEventLoopRemoveAll(client, thiscall);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1141,11 +1207,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
|
||||
sigset_t oldmask, blockedsigs;
|
||||
int timeout = -1;
|
||||
|
||||
/* If we have existing SASL decoded data we
|
||||
* don't want to sleep in the poll(), just
|
||||
* check if any other FDs are also ready
|
||||
/* If we have existing SASL decoded data we don't want to sleep in
|
||||
* the poll(), just check if any other FDs are also ready.
|
||||
* If the connection is going to be closed, we don't want to sleep in
|
||||
* poll() either.
|
||||
*/
|
||||
if (virNetSocketHasCachedData(client->sock))
|
||||
if (virNetSocketHasCachedData(client->sock) || client->wantClose)
|
||||
timeout = 0;
|
||||
|
||||
/* If there are any non-blocking calls in the queue,
|
||||
@ -1208,6 +1275,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
|
||||
fds[0].revents |= POLLIN;
|
||||
}
|
||||
|
||||
/* If wantClose flag is set, pretend there was an error on the socket
|
||||
*/
|
||||
if (client->wantClose)
|
||||
fds[0].revents = POLLERR;
|
||||
|
||||
if (fds[1].revents) {
|
||||
VIR_DEBUG("Woken up from poll by other thread");
|
||||
if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
|
||||
@ -1441,6 +1513,7 @@ static int virNetClientIO(virNetClientPtr client,
|
||||
virResetLastError();
|
||||
rv = virNetClientIOEventLoop(client, thiscall);
|
||||
|
||||
if (client->sock)
|
||||
virNetClientIOUpdateCallback(client, true);
|
||||
|
||||
if (rv == 0 &&
|
||||
@ -1467,7 +1540,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
|
||||
goto done;
|
||||
|
||||
/* This should be impossible, but it doesn't hurt to check */
|
||||
if (client->haveTheBuck)
|
||||
if (client->haveTheBuck || client->wantClose)
|
||||
goto done;
|
||||
|
||||
VIR_DEBUG("Event fired %p %d", sock, events);
|
||||
@ -1528,6 +1601,12 @@ static int virNetClientSendInternal(virNetClientPtr client,
|
||||
|
||||
virNetClientLock(client);
|
||||
|
||||
if (!client->sock || client->wantClose) {
|
||||
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
|
||||
_("client socket is closed"));
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
if (virCondInit(&call->cond) < 0) {
|
||||
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
|
||||
_("cannot initialize condition variable"));
|
||||
@ -1554,6 +1633,8 @@ cleanup:
|
||||
ignore_value(virCondDestroy(&call->cond));
|
||||
VIR_FREE(call);
|
||||
}
|
||||
|
||||
unlock:
|
||||
virNetClientUnlock(client);
|
||||
return ret;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user