mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-03-07 17:28:15 +00:00
rpc: invoke the message dispatch callback with client unlocked
Currently if the virNetServer instance is created with max_workers==0 to request a non-threaded dispatch process, we deadlock during dispatch #0 0x00007fb845f6f42d in __lll_lock_wait () from /lib64/libpthread.so.0 #1 0x00007fb845f681d3 in pthread_mutex_lock () from /lib64/libpthread.so.0 #2 0x000055a6628bb305 in virMutexLock (m=<optimized out>) at util/virthread.c:89 #3 0x000055a6628a984b in virObjectLock (anyobj=<optimized out>) at util/virobject.c:435 #4 0x000055a66286fcde in virNetServerClientIsAuthenticated (client=client@entry=0x55a663a7b960) at rpc/virnetserverclient.c:1565 #5 0x000055a66286cc17 in virNetServerProgramDispatchCall (msg=0x55a663a7bc50, client=0x55a663a7b960, server=0x55a663a77550, prog=0x55a663a78020) at rpc/virnetserverprogram.c:407 #6 virNetServerProgramDispatch (prog=prog@entry=0x55a663a78020, server=server@entry=0x55a663a77550, client=client@entry=0x55a663a7b960, msg=msg@entry=0x55a663a7bc50) at rpc/virnetserverprogram.c:307 #7 0x000055a662871d56 in virNetServerProcessMsg (msg=0x55a663a7bc50, prog=0x55a663a78020, client=0x55a663a7b960, srv=0x55a663a77550) at rpc/virnetserver.c:148 #8 virNetServerDispatchNewMessage (client=0x55a663a7b960, msg=0x55a663a7bc50, opaque=0x55a663a77550) at rpc/virnetserver.c:227 #9 0x000055a66286e4c0 in virNetServerClientDispatchRead (client=client@entry=0x55a663a7b960) at rpc/virnetserverclient.c:1322 #10 0x000055a66286e813 in virNetServerClientDispatchEvent (sock=<optimized out>, events=1, opaque=0x55a663a7b960) at rpc/virnetserverclient.c:1507 #11 0x000055a662899be0 in virEventPollDispatchHandles (fds=0x55a663a7bdc0, nfds=<optimized out>) at util/vireventpoll.c:508 #12 virEventPollRunOnce () at util/vireventpoll.c:657 #13 0x000055a6628982f1 in virEventRunDefaultImpl () at util/virevent.c:327 #14 0x000055a6628716d5 in virNetDaemonRun (dmn=0x55a663a771b0) at rpc/virnetdaemon.c:858 #15 0x000055a662864c1d in main (argc=<optimized out>, #argv=0x7ffd105b4838) at logging/log_daemon.c:1235 Reviewed-by: John Ferlan <jferlan@redhat.com> Reviewed-by: Jim Fehlig <jfehlig@suse.com> Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
This commit is contained in:
parent
c6f1d5190b
commit
06e7ebb608
@ -143,7 +143,7 @@ VIR_ONCE_GLOBAL_INIT(virNetServerClient)
|
|||||||
|
|
||||||
static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
|
static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
|
||||||
static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
|
static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
|
||||||
static void virNetServerClientDispatchRead(virNetServerClientPtr client);
|
static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client);
|
||||||
static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
|
static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
|
||||||
virNetMessagePtr msg);
|
virNetMessagePtr msg);
|
||||||
|
|
||||||
@ -340,18 +340,40 @@ virNetServerClientCheckAccess(virNetServerClientPtr client)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
static void virNetServerClientDispatchMessage(virNetServerClientPtr client,
|
||||||
|
virNetMessagePtr msg)
|
||||||
|
{
|
||||||
|
virObjectLock(client);
|
||||||
|
if (!client->dispatchFunc) {
|
||||||
|
virNetMessageFree(msg);
|
||||||
|
client->wantClose = true;
|
||||||
|
virObjectUnlock(client);
|
||||||
|
} else {
|
||||||
|
virObjectUnlock(client);
|
||||||
|
/* Accessing 'client' is safe, because virNetServerClientSetDispatcher
|
||||||
|
* only permits setting 'dispatchFunc' once, so if non-NULL, it will
|
||||||
|
* never change again
|
||||||
|
*/
|
||||||
|
client->dispatchFunc(client, msg, client->dispatchOpaque);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void virNetServerClientSockTimerFunc(int timer,
|
static void virNetServerClientSockTimerFunc(int timer,
|
||||||
void *opaque)
|
void *opaque)
|
||||||
{
|
{
|
||||||
virNetServerClientPtr client = opaque;
|
virNetServerClientPtr client = opaque;
|
||||||
|
virNetMessagePtr msg = NULL;
|
||||||
virObjectLock(client);
|
virObjectLock(client);
|
||||||
virEventUpdateTimeout(timer, -1);
|
virEventUpdateTimeout(timer, -1);
|
||||||
/* Although client->rx != NULL when this timer is enabled, it might have
|
/* Although client->rx != NULL when this timer is enabled, it might have
|
||||||
* changed since the client was unlocked in the meantime. */
|
* changed since the client was unlocked in the meantime. */
|
||||||
if (client->rx)
|
if (client->rx)
|
||||||
virNetServerClientDispatchRead(client);
|
msg = virNetServerClientDispatchRead(client);
|
||||||
virObjectUnlock(client);
|
virObjectUnlock(client);
|
||||||
|
|
||||||
|
if (msg)
|
||||||
|
virNetServerClientDispatchMessage(client, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -950,8 +972,13 @@ void virNetServerClientSetDispatcher(virNetServerClientPtr client,
|
|||||||
void *opaque)
|
void *opaque)
|
||||||
{
|
{
|
||||||
virObjectLock(client);
|
virObjectLock(client);
|
||||||
|
/* Only set dispatcher if not already set, to avoid race
|
||||||
|
* with dispatch code that runs without locks held
|
||||||
|
*/
|
||||||
|
if (!client->dispatchFunc) {
|
||||||
client->dispatchFunc = func;
|
client->dispatchFunc = func;
|
||||||
client->dispatchOpaque = opaque;
|
client->dispatchOpaque = opaque;
|
||||||
|
}
|
||||||
virObjectUnlock(client);
|
virObjectUnlock(client);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1196,26 +1223,32 @@ static ssize_t virNetServerClientRead(virNetServerClientPtr client)
|
|||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Read data until we get a complete message to process
|
* Read data until we get a complete message to process.
|
||||||
|
* If a complete message is available, it will be returned
|
||||||
|
* from this method, for dispatch by the caller.
|
||||||
|
*
|
||||||
|
* Returns a complete message for dispatch, or NULL if none is
|
||||||
|
* yet available, or an error occurred. On error, the wantClose
|
||||||
|
* flag will be set.
|
||||||
*/
|
*/
|
||||||
static void virNetServerClientDispatchRead(virNetServerClientPtr client)
|
static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client)
|
||||||
{
|
{
|
||||||
readmore:
|
readmore:
|
||||||
if (client->rx->nfds == 0) {
|
if (client->rx->nfds == 0) {
|
||||||
if (virNetServerClientRead(client) < 0) {
|
if (virNetServerClientRead(client) < 0) {
|
||||||
client->wantClose = true;
|
client->wantClose = true;
|
||||||
return; /* Error */
|
return NULL; /* Error */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (client->rx->bufferOffset < client->rx->bufferLength)
|
if (client->rx->bufferOffset < client->rx->bufferLength)
|
||||||
return; /* Still not read enough */
|
return NULL; /* Still not read enough */
|
||||||
|
|
||||||
/* Either done with length word header */
|
/* Either done with length word header */
|
||||||
if (client->rx->bufferLength == VIR_NET_MESSAGE_LEN_MAX) {
|
if (client->rx->bufferLength == VIR_NET_MESSAGE_LEN_MAX) {
|
||||||
if (virNetMessageDecodeLength(client->rx) < 0) {
|
if (virNetMessageDecodeLength(client->rx) < 0) {
|
||||||
client->wantClose = true;
|
client->wantClose = true;
|
||||||
return;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
virNetServerClientUpdateEvent(client);
|
virNetServerClientUpdateEvent(client);
|
||||||
@ -1236,7 +1269,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
|
|||||||
virNetMessageQueueServe(&client->rx);
|
virNetMessageQueueServe(&client->rx);
|
||||||
virNetMessageFree(msg);
|
virNetMessageFree(msg);
|
||||||
client->wantClose = true;
|
client->wantClose = true;
|
||||||
return;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Now figure out if we need to read more data to get some
|
/* Now figure out if we need to read more data to get some
|
||||||
@ -1246,7 +1279,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
|
|||||||
virNetMessageQueueServe(&client->rx);
|
virNetMessageQueueServe(&client->rx);
|
||||||
virNetMessageFree(msg);
|
virNetMessageFree(msg);
|
||||||
client->wantClose = true;
|
client->wantClose = true;
|
||||||
return; /* Error */
|
return NULL; /* Error */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try getting the file descriptors (may fail if blocking) */
|
/* Try getting the file descriptors (may fail if blocking) */
|
||||||
@ -1256,7 +1289,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
|
|||||||
virNetMessageQueueServe(&client->rx);
|
virNetMessageQueueServe(&client->rx);
|
||||||
virNetMessageFree(msg);
|
virNetMessageFree(msg);
|
||||||
client->wantClose = true;
|
client->wantClose = true;
|
||||||
return;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (rv == 0) /* Blocking */
|
if (rv == 0) /* Blocking */
|
||||||
break;
|
break;
|
||||||
@ -1270,7 +1303,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
|
|||||||
* again next time we run this method
|
* again next time we run this method
|
||||||
*/
|
*/
|
||||||
client->rx->bufferOffset = client->rx->bufferLength;
|
client->rx->bufferOffset = client->rx->bufferLength;
|
||||||
return;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1313,16 +1346,6 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send off to for normal dispatch to workers */
|
|
||||||
if (msg) {
|
|
||||||
if (!client->dispatchFunc) {
|
|
||||||
virNetMessageFree(msg);
|
|
||||||
client->wantClose = true;
|
|
||||||
} else {
|
|
||||||
client->dispatchFunc(client, msg, client->dispatchOpaque);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Possibly need to create another receive buffer */
|
/* Possibly need to create another receive buffer */
|
||||||
if (client->nrequests < client->nrequests_max) {
|
if (client->nrequests < client->nrequests_max) {
|
||||||
if (!(client->rx = virNetMessageNew(true))) {
|
if (!(client->rx = virNetMessageNew(true))) {
|
||||||
@ -1338,6 +1361,8 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
virNetServerClientUpdateEvent(client);
|
virNetServerClientUpdateEvent(client);
|
||||||
|
|
||||||
|
return msg;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1482,6 +1507,7 @@ static void
|
|||||||
virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
|
virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
|
||||||
{
|
{
|
||||||
virNetServerClientPtr client = opaque;
|
virNetServerClientPtr client = opaque;
|
||||||
|
virNetMessagePtr msg = NULL;
|
||||||
|
|
||||||
virObjectLock(client);
|
virObjectLock(client);
|
||||||
|
|
||||||
@ -1504,7 +1530,7 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
|
|||||||
virNetServerClientDispatchWrite(client);
|
virNetServerClientDispatchWrite(client);
|
||||||
if (events & VIR_EVENT_HANDLE_READABLE &&
|
if (events & VIR_EVENT_HANDLE_READABLE &&
|
||||||
client->rx)
|
client->rx)
|
||||||
virNetServerClientDispatchRead(client);
|
msg = virNetServerClientDispatchRead(client);
|
||||||
#if WITH_GNUTLS
|
#if WITH_GNUTLS
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -1517,6 +1543,9 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
|
|||||||
client->wantClose = true;
|
client->wantClose = true;
|
||||||
|
|
||||||
virObjectUnlock(client);
|
virObjectUnlock(client);
|
||||||
|
|
||||||
|
if (msg)
|
||||||
|
virNetServerClientDispatchMessage(client, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user