diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c index 035ac74947..7984adad71 100644 --- a/src/rpc/virkeepalive.c +++ b/src/rpc/virkeepalive.c @@ -48,9 +48,6 @@ struct _virKeepAlive { time_t intervalStart; int timer; - virNetMessagePtr response; - int responseTimer; - virKeepAliveSendFunc sendCB; virKeepAliveDeadFunc deadCB; virKeepAliveFreeFunc freeCB; @@ -72,12 +69,25 @@ virKeepAliveUnlock(virKeepAlivePtr ka) static virNetMessagePtr -virKeepAliveMessage(int proc) +virKeepAliveMessage(virKeepAlivePtr ka, int proc) { virNetMessagePtr msg; + const char *procstr = NULL; + + switch (proc) { + case KEEPALIVE_PROC_PING: + procstr = "request"; + break; + case KEEPALIVE_PROC_PONG: + procstr = "response"; + break; + default: + VIR_WARN("Refusing to send unknown keepalive message: %d", proc); + return NULL; + } if (!(msg = virNetMessageNew(false))) - return NULL; + goto error; msg->header.prog = KEEPALIVE_PROGRAM; msg->header.vers = KEEPALIVE_PROTOCOL_VERSION; @@ -87,69 +97,20 @@ virKeepAliveMessage(int proc) if (virNetMessageEncodeHeader(msg) < 0 || virNetMessageEncodePayloadEmpty(msg) < 0) { virNetMessageFree(msg); - return NULL; + goto error; } - return msg; -} - - -static void -virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg) -{ - const char *proc = NULL; - void *client = ka->client; - virKeepAliveSendFunc sendCB = ka->sendCB; - - switch (msg->header.proc) { - case KEEPALIVE_PROC_PING: - proc = "request"; - break; - case KEEPALIVE_PROC_PONG: - proc = "response"; - break; - } - - if (!proc) { - VIR_WARN("Refusing to send unknown keepalive message: %d", - msg->header.proc); - virNetMessageFree(msg); - return; - } - - VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client); + VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client); PROBE(RPC_KEEPALIVE_SEND, "ka=%p client=%p prog=%d vers=%d proc=%d", ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc); - ka->refs++; - virKeepAliveUnlock(ka); + return msg; - if (sendCB(client, msg) < 0) { - VIR_WARN("Failed to send keepalive %s to client %p", proc, client); - virNetMessageFree(msg); - } - - virKeepAliveLock(ka); - ka->refs--; -} - - -static void -virKeepAliveScheduleResponse(virKeepAlivePtr ka) -{ - if (ka->responseTimer == -1) - return; - - VIR_DEBUG("Scheduling keepalive response to client %p", ka->client); - - if (!ka->response && - !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) { - VIR_WARN("Failed to generate keepalive response"); - return; - } - - virEventUpdateTimeout(ka->responseTimer, 0); +error: + VIR_WARN("Failed to generate keepalive %s", procstr); + VIR_FREE(msg); + return NULL; } @@ -184,7 +145,7 @@ virKeepAliveTimerInternal(virKeepAlivePtr ka, } else { ka->countToDeath--; ka->intervalStart = now; - *msg = virKeepAliveMessage(KEEPALIVE_PROC_PING); + *msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING); virEventUpdateTimeout(ka->timer, ka->interval * 1000); return false; } @@ -197,47 +158,30 @@ virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virKeepAlivePtr ka = opaque; virNetMessagePtr msg = NULL; bool dead; + void *client; virKeepAliveLock(ka); + client = ka->client; dead = virKeepAliveTimerInternal(ka, &msg); - if (dead) { - virKeepAliveDeadFunc deadCB = ka->deadCB; - void *client = ka->client; - - ka->refs++; - virKeepAliveUnlock(ka); - deadCB(client); - virKeepAliveLock(ka); - ka->refs--; - } else if (msg) { - virKeepAliveSend(ka, msg); - } + if (!dead && !msg) + goto cleanup; + ka->refs++; virKeepAliveUnlock(ka); -} - -static void -virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque) -{ - virKeepAlivePtr ka = opaque; - virNetMessagePtr msg; + if (dead) { + ka->deadCB(client); + } else if (ka->sendCB(client, msg) < 0) { + VIR_WARN("Failed to send keepalive request to client %p", client); + virNetMessageFree(msg); + } virKeepAliveLock(ka); + ka->refs--; - VIR_DEBUG("ka=%p, client=%p, response=%p", - ka, ka->client, ka->response); - - if (ka->response) { - msg = ka->response; - ka->response = NULL; - virKeepAliveSend(ka, msg); - } - - virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1); - +cleanup: virKeepAliveUnlock(ka); } @@ -281,15 +225,6 @@ virKeepAliveNew(int interval, ka->deadCB = deadCB; ka->freeCB = freeCB; - ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer, - ka, virKeepAliveTimerFree); - if (ka->responseTimer < 0) { - virKeepAliveFree(ka); - return NULL; - } - /* the timer now has a reference to ka */ - ka->refs++; - PROBE(RPC_KEEPALIVE_NEW, "ka=%p client=%p refs=%d", ka, ka->client, ka->refs); @@ -394,7 +329,7 @@ cleanup: static void -virKeepAliveStopInternal(virKeepAlivePtr ka, bool all) +virKeepAliveStopInternal(virKeepAlivePtr ka, bool all ATTRIBUTE_UNUSED) { virKeepAliveLock(ka); @@ -407,16 +342,6 @@ virKeepAliveStopInternal(virKeepAlivePtr ka, bool all) ka->timer = -1; } - if (all) { - if (ka->responseTimer > 0) { - virEventRemoveTimeout(ka->responseTimer); - ka->responseTimer = -1; - } - - virNetMessageFree(ka->response); - ka->response = NULL; - } - virKeepAliveUnlock(ka); } @@ -482,13 +407,15 @@ virKeepAliveTrigger(virKeepAlivePtr ka, bool virKeepAliveCheckMessage(virKeepAlivePtr ka, - virNetMessagePtr msg) + virNetMessagePtr msg, + virNetMessagePtr *response) { bool ret = false; VIR_DEBUG("ka=%p, client=%p, msg=%p", ka, ka ? ka->client : "(null)", msg); + *response = NULL; if (!ka) return false; @@ -508,7 +435,7 @@ virKeepAliveCheckMessage(virKeepAlivePtr ka, switch (msg->header.proc) { case KEEPALIVE_PROC_PING: VIR_DEBUG("Got keepalive request from client %p", ka->client); - virKeepAliveScheduleResponse(ka); + *response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG); break; case KEEPALIVE_PROC_PONG: diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h index 09264a5071..62227d0684 100644 --- a/src/rpc/virkeepalive.h +++ b/src/rpc/virkeepalive.h @@ -55,6 +55,7 @@ int virKeepAliveTimeout(virKeepAlivePtr ka); bool virKeepAliveTrigger(virKeepAlivePtr ka, virNetMessagePtr *msg); bool virKeepAliveCheckMessage(virKeepAlivePtr ka, - virNetMessagePtr msg); + virNetMessagePtr msg, + virNetMessagePtr *response); #endif /* __VIR_KEEPALIVE_H__ */ diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 8a3c43052a..69ad52e34a 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -109,6 +109,8 @@ struct _virNetClient { static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall); +static int virNetClientQueueNonBlocking(virNetClientPtr client, + virNetMessagePtr msg); static void virNetClientLock(virNetClientPtr client) @@ -932,14 +934,22 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) static int virNetClientCallDispatch(virNetClientPtr client) { + virNetMessagePtr response = NULL; + PROBE(RPC_CLIENT_MSG_RX, "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u", client, client->msg.bufferLength, client->msg.header.prog, client->msg.header.vers, client->msg.header.proc, client->msg.header.type, client->msg.header.status, client->msg.header.serial); - if (virKeepAliveCheckMessage(client->keepalive, &client->msg)) + if (virKeepAliveCheckMessage(client->keepalive, &client->msg, &response)) { + if (response && + virNetClientQueueNonBlocking(client, response) < 0) { + VIR_WARN("Could not queue keepalive response"); + virNetMessageFree(response); + } return 0; + } switch (client->msg.header.type) { case VIR_NET_REPLY: /* Normal RPC replies */ @@ -1625,6 +1635,8 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, NULL); + virNetClientIOUpdateCallback(client, true); + done: virNetClientUnlock(client); } @@ -1684,6 +1696,27 @@ error: } +static int +virNetClientQueueNonBlocking(virNetClientPtr client, + virNetMessagePtr msg) +{ + virNetClientCallPtr call; + + PROBE(RPC_CLIENT_MSG_TX_QUEUE, + "client=%p len=%zu prog=%u vers=%u proc=%u" + " type=%u status=%u serial=%u", + client, msg->bufferLength, + msg->header.prog, msg->header.vers, msg->header.proc, + msg->header.type, msg->header.status, msg->header.serial); + + if (!(call = virNetClientCallNew(msg, false, true))) + return -1; + + virNetClientCallQueue(&client->waitDispatch, call); + return 0; +} + + /* * Returns 1 if the call was queued and will be completed later (only * for nonBlock==true), 0 if the call was completed and -1 on error. diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index 67600fd00a..8b534940ba 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -103,13 +103,14 @@ struct _virNetServerClient virNetServerClientCloseFunc privateDataCloseFunc; virKeepAlivePtr keepalive; - int keepaliveFilter; }; static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque); static void virNetServerClientUpdateEvent(virNetServerClientPtr client); static void virNetServerClientDispatchRead(virNetServerClientPtr client); +static int virNetServerClientSendMessageLocked(virNetServerClientPtr client, + virNetMessagePtr msg); static void virNetServerClientLock(virNetServerClientPtr client) { @@ -359,7 +360,6 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, client->readonly = readonly; client->tlsCtxt = tls; client->nrequests_max = nrequests_max; - client->keepaliveFilter = -1; client->sockTimer = virEventAddTimeout(-1, virNetServerClientSockTimerFunc, client, NULL); @@ -635,9 +635,6 @@ void virNetServerClientClose(virNetServerClientPtr client) return; } - if (client->keepaliveFilter >= 0) - virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter); - if (client->keepalive) { virKeepAliveStop(client->keepalive); ka = client->keepalive; @@ -835,6 +832,7 @@ readmore: } else { /* Grab the completed message */ virNetMessagePtr msg = client->rx; + virNetMessagePtr response = NULL; virNetServerClientFilterPtr filter; size_t i; @@ -885,23 +883,35 @@ readmore: msg->header.prog, msg->header.vers, msg->header.proc, msg->header.type, msg->header.status, msg->header.serial); - /* Maybe send off for queue against a filter */ - filter = client->filters; - while (filter) { - int ret = filter->func(client, msg, filter->opaque); - if (ret < 0) { - virNetMessageFree(msg); - msg = NULL; - if (ret < 0) - client->wantClose = true; - break; - } - if (ret > 0) { - msg = NULL; - break; - } + if (virKeepAliveCheckMessage(client->keepalive, msg, &response)) { + virNetMessageFree(msg); + client->nrequests--; + msg = NULL; - filter = filter->next; + if (response && + virNetServerClientSendMessageLocked(client, response) < 0) + virNetMessageFree(response); + } + + /* Maybe send off for queue against a filter */ + if (msg) { + filter = client->filters; + while (filter) { + int ret = filter->func(client, msg, filter->opaque); + if (ret < 0) { + virNetMessageFree(msg); + msg = NULL; + if (ret < 0) + client->wantClose = true; + break; + } + if (ret > 0) { + msg = NULL; + break; + } + + filter = filter->next; + } } /* Send off to for normal dispatch to workers */ @@ -1097,16 +1107,15 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque) } -int virNetServerClientSendMessage(virNetServerClientPtr client, - virNetMessagePtr msg) +static int +virNetServerClientSendMessageLocked(virNetServerClientPtr client, + virNetMessagePtr msg) { int ret = -1; VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu", msg, msg->header.proc, msg->bufferLength, msg->bufferOffset); - virNetServerClientLock(client); - msg->donefds = 0; if (client->sock && !client->wantClose) { PROBE(RPC_SERVER_CLIENT_MSG_TX_QUEUE, @@ -1120,6 +1129,16 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, ret = 0; } + return ret; +} + +int virNetServerClientSendMessage(virNetServerClientPtr client, + virNetMessagePtr msg) +{ + int ret; + + virNetServerClientLock(client); + ret = virNetServerClientSendMessageLocked(client, msg); virNetServerClientUnlock(client); return ret; @@ -1156,20 +1175,6 @@ virNetServerClientFreeCB(void *opaque) virNetServerClientFree(opaque); } -static int -virNetServerClientKeepAliveFilter(virNetServerClientPtr client, - virNetMessagePtr msg, - void *opaque ATTRIBUTE_UNUSED) -{ - if (virKeepAliveCheckMessage(client->keepalive, msg)) { - virNetMessageFree(msg); - client->nrequests--; - return 1; - } - - return 0; -} - int virNetServerClientInitKeepAlive(virNetServerClientPtr client, int interval, @@ -1188,13 +1193,6 @@ virNetServerClientInitKeepAlive(virNetServerClientPtr client, /* keepalive object has a reference to client */ client->refs++; - client->keepaliveFilter = - virNetServerClientAddFilterLocked(client, - virNetServerClientKeepAliveFilter, - NULL); - if (client->keepaliveFilter < 0) - goto cleanup; - client->keepalive = ka; ka = NULL;