mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-01-21 12:05:17 +00:00
rpc: Do not use timer for sending keepalive responses
When a libvirt API is called from the main event loop (which seems to be common in event-based glib apps), the client IO loop would properly handle keepalive requests sent by a server but will not actually send them because the main event loop is blocked with the API. This patch gets rid of response timer and the thread which is processing keepalive requests is also responsible for queueing responses for delivery. (cherry picked from commit bb85f2298e63b55b0465cb9e1f790019e99611dd)
This commit is contained in:
parent
400a5a9290
commit
4d695acd86
@ -48,9 +48,6 @@ struct _virKeepAlive {
|
|||||||
time_t intervalStart;
|
time_t intervalStart;
|
||||||
int timer;
|
int timer;
|
||||||
|
|
||||||
virNetMessagePtr response;
|
|
||||||
int responseTimer;
|
|
||||||
|
|
||||||
virKeepAliveSendFunc sendCB;
|
virKeepAliveSendFunc sendCB;
|
||||||
virKeepAliveDeadFunc deadCB;
|
virKeepAliveDeadFunc deadCB;
|
||||||
virKeepAliveFreeFunc freeCB;
|
virKeepAliveFreeFunc freeCB;
|
||||||
@ -72,12 +69,25 @@ virKeepAliveUnlock(virKeepAlivePtr ka)
|
|||||||
|
|
||||||
|
|
||||||
static virNetMessagePtr
|
static virNetMessagePtr
|
||||||
virKeepAliveMessage(int proc)
|
virKeepAliveMessage(virKeepAlivePtr ka, int proc)
|
||||||
{
|
{
|
||||||
virNetMessagePtr msg;
|
virNetMessagePtr msg;
|
||||||
|
const char *procstr = NULL;
|
||||||
|
|
||||||
|
switch (proc) {
|
||||||
|
case KEEPALIVE_PROC_PING:
|
||||||
|
procstr = "request";
|
||||||
|
break;
|
||||||
|
case KEEPALIVE_PROC_PONG:
|
||||||
|
procstr = "response";
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
VIR_WARN("Refusing to send unknown keepalive message: %d", proc);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (!(msg = virNetMessageNew(false)))
|
if (!(msg = virNetMessageNew(false)))
|
||||||
return NULL;
|
goto error;
|
||||||
|
|
||||||
msg->header.prog = KEEPALIVE_PROGRAM;
|
msg->header.prog = KEEPALIVE_PROGRAM;
|
||||||
msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
|
msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
|
||||||
@ -87,69 +97,20 @@ virKeepAliveMessage(int proc)
|
|||||||
if (virNetMessageEncodeHeader(msg) < 0 ||
|
if (virNetMessageEncodeHeader(msg) < 0 ||
|
||||||
virNetMessageEncodePayloadEmpty(msg) < 0) {
|
virNetMessageEncodePayloadEmpty(msg) < 0) {
|
||||||
virNetMessageFree(msg);
|
virNetMessageFree(msg);
|
||||||
return NULL;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
return msg;
|
VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client);
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
|
|
||||||
{
|
|
||||||
const char *proc = NULL;
|
|
||||||
void *client = ka->client;
|
|
||||||
virKeepAliveSendFunc sendCB = ka->sendCB;
|
|
||||||
|
|
||||||
switch (msg->header.proc) {
|
|
||||||
case KEEPALIVE_PROC_PING:
|
|
||||||
proc = "request";
|
|
||||||
break;
|
|
||||||
case KEEPALIVE_PROC_PONG:
|
|
||||||
proc = "response";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!proc) {
|
|
||||||
VIR_WARN("Refusing to send unknown keepalive message: %d",
|
|
||||||
msg->header.proc);
|
|
||||||
virNetMessageFree(msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
|
|
||||||
PROBE(RPC_KEEPALIVE_SEND,
|
PROBE(RPC_KEEPALIVE_SEND,
|
||||||
"ka=%p client=%p prog=%d vers=%d proc=%d",
|
"ka=%p client=%p prog=%d vers=%d proc=%d",
|
||||||
ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc);
|
ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc);
|
||||||
|
|
||||||
ka->refs++;
|
return msg;
|
||||||
virKeepAliveUnlock(ka);
|
|
||||||
|
|
||||||
if (sendCB(client, msg) < 0) {
|
error:
|
||||||
VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
|
VIR_WARN("Failed to generate keepalive %s", procstr);
|
||||||
virNetMessageFree(msg);
|
VIR_FREE(msg);
|
||||||
}
|
return NULL;
|
||||||
|
|
||||||
virKeepAliveLock(ka);
|
|
||||||
ka->refs--;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
virKeepAliveScheduleResponse(virKeepAlivePtr ka)
|
|
||||||
{
|
|
||||||
if (ka->responseTimer == -1)
|
|
||||||
return;
|
|
||||||
|
|
||||||
VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
|
|
||||||
|
|
||||||
if (!ka->response &&
|
|
||||||
!(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
|
|
||||||
VIR_WARN("Failed to generate keepalive response");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
virEventUpdateTimeout(ka->responseTimer, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -184,7 +145,7 @@ virKeepAliveTimerInternal(virKeepAlivePtr ka,
|
|||||||
} else {
|
} else {
|
||||||
ka->countToDeath--;
|
ka->countToDeath--;
|
||||||
ka->intervalStart = now;
|
ka->intervalStart = now;
|
||||||
*msg = virKeepAliveMessage(KEEPALIVE_PROC_PING);
|
*msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING);
|
||||||
virEventUpdateTimeout(ka->timer, ka->interval * 1000);
|
virEventUpdateTimeout(ka->timer, ka->interval * 1000);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -197,47 +158,30 @@ virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
|
|||||||
virKeepAlivePtr ka = opaque;
|
virKeepAlivePtr ka = opaque;
|
||||||
virNetMessagePtr msg = NULL;
|
virNetMessagePtr msg = NULL;
|
||||||
bool dead;
|
bool dead;
|
||||||
|
void *client;
|
||||||
|
|
||||||
virKeepAliveLock(ka);
|
virKeepAliveLock(ka);
|
||||||
|
|
||||||
|
client = ka->client;
|
||||||
dead = virKeepAliveTimerInternal(ka, &msg);
|
dead = virKeepAliveTimerInternal(ka, &msg);
|
||||||
|
|
||||||
if (dead) {
|
if (!dead && !msg)
|
||||||
virKeepAliveDeadFunc deadCB = ka->deadCB;
|
goto cleanup;
|
||||||
void *client = ka->client;
|
|
||||||
|
|
||||||
ka->refs++;
|
|
||||||
virKeepAliveUnlock(ka);
|
|
||||||
deadCB(client);
|
|
||||||
virKeepAliveLock(ka);
|
|
||||||
ka->refs--;
|
|
||||||
} else if (msg) {
|
|
||||||
virKeepAliveSend(ka, msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
ka->refs++;
|
||||||
virKeepAliveUnlock(ka);
|
virKeepAliveUnlock(ka);
|
||||||
}
|
|
||||||
|
|
||||||
|
if (dead) {
|
||||||
static void
|
ka->deadCB(client);
|
||||||
virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
|
} else if (ka->sendCB(client, msg) < 0) {
|
||||||
{
|
VIR_WARN("Failed to send keepalive request to client %p", client);
|
||||||
virKeepAlivePtr ka = opaque;
|
virNetMessageFree(msg);
|
||||||
virNetMessagePtr msg;
|
}
|
||||||
|
|
||||||
virKeepAliveLock(ka);
|
virKeepAliveLock(ka);
|
||||||
|
ka->refs--;
|
||||||
|
|
||||||
VIR_DEBUG("ka=%p, client=%p, response=%p",
|
cleanup:
|
||||||
ka, ka->client, ka->response);
|
|
||||||
|
|
||||||
if (ka->response) {
|
|
||||||
msg = ka->response;
|
|
||||||
ka->response = NULL;
|
|
||||||
virKeepAliveSend(ka, msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
|
|
||||||
|
|
||||||
virKeepAliveUnlock(ka);
|
virKeepAliveUnlock(ka);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -281,15 +225,6 @@ virKeepAliveNew(int interval,
|
|||||||
ka->deadCB = deadCB;
|
ka->deadCB = deadCB;
|
||||||
ka->freeCB = freeCB;
|
ka->freeCB = freeCB;
|
||||||
|
|
||||||
ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
|
|
||||||
ka, virKeepAliveTimerFree);
|
|
||||||
if (ka->responseTimer < 0) {
|
|
||||||
virKeepAliveFree(ka);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
/* the timer now has a reference to ka */
|
|
||||||
ka->refs++;
|
|
||||||
|
|
||||||
PROBE(RPC_KEEPALIVE_NEW,
|
PROBE(RPC_KEEPALIVE_NEW,
|
||||||
"ka=%p client=%p refs=%d",
|
"ka=%p client=%p refs=%d",
|
||||||
ka, ka->client, ka->refs);
|
ka, ka->client, ka->refs);
|
||||||
@ -394,7 +329,7 @@ cleanup:
|
|||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
virKeepAliveStopInternal(virKeepAlivePtr ka, bool all)
|
virKeepAliveStopInternal(virKeepAlivePtr ka, bool all ATTRIBUTE_UNUSED)
|
||||||
{
|
{
|
||||||
virKeepAliveLock(ka);
|
virKeepAliveLock(ka);
|
||||||
|
|
||||||
@ -407,16 +342,6 @@ virKeepAliveStopInternal(virKeepAlivePtr ka, bool all)
|
|||||||
ka->timer = -1;
|
ka->timer = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (all) {
|
|
||||||
if (ka->responseTimer > 0) {
|
|
||||||
virEventRemoveTimeout(ka->responseTimer);
|
|
||||||
ka->responseTimer = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
virNetMessageFree(ka->response);
|
|
||||||
ka->response = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
virKeepAliveUnlock(ka);
|
virKeepAliveUnlock(ka);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -482,13 +407,15 @@ virKeepAliveTrigger(virKeepAlivePtr ka,
|
|||||||
|
|
||||||
bool
|
bool
|
||||||
virKeepAliveCheckMessage(virKeepAlivePtr ka,
|
virKeepAliveCheckMessage(virKeepAlivePtr ka,
|
||||||
virNetMessagePtr msg)
|
virNetMessagePtr msg,
|
||||||
|
virNetMessagePtr *response)
|
||||||
{
|
{
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
|
|
||||||
VIR_DEBUG("ka=%p, client=%p, msg=%p",
|
VIR_DEBUG("ka=%p, client=%p, msg=%p",
|
||||||
ka, ka ? ka->client : "(null)", msg);
|
ka, ka ? ka->client : "(null)", msg);
|
||||||
|
|
||||||
|
*response = NULL;
|
||||||
if (!ka)
|
if (!ka)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@ -508,7 +435,7 @@ virKeepAliveCheckMessage(virKeepAlivePtr ka,
|
|||||||
switch (msg->header.proc) {
|
switch (msg->header.proc) {
|
||||||
case KEEPALIVE_PROC_PING:
|
case KEEPALIVE_PROC_PING:
|
||||||
VIR_DEBUG("Got keepalive request from client %p", ka->client);
|
VIR_DEBUG("Got keepalive request from client %p", ka->client);
|
||||||
virKeepAliveScheduleResponse(ka);
|
*response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case KEEPALIVE_PROC_PONG:
|
case KEEPALIVE_PROC_PONG:
|
||||||
|
@ -55,6 +55,7 @@ int virKeepAliveTimeout(virKeepAlivePtr ka);
|
|||||||
bool virKeepAliveTrigger(virKeepAlivePtr ka,
|
bool virKeepAliveTrigger(virKeepAlivePtr ka,
|
||||||
virNetMessagePtr *msg);
|
virNetMessagePtr *msg);
|
||||||
bool virKeepAliveCheckMessage(virKeepAlivePtr ka,
|
bool virKeepAliveCheckMessage(virKeepAlivePtr ka,
|
||||||
virNetMessagePtr msg);
|
virNetMessagePtr msg,
|
||||||
|
virNetMessagePtr *response);
|
||||||
|
|
||||||
#endif /* __VIR_KEEPALIVE_H__ */
|
#endif /* __VIR_KEEPALIVE_H__ */
|
||||||
|
@ -109,6 +109,8 @@ struct _virNetClient {
|
|||||||
|
|
||||||
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
|
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
|
||||||
virNetClientCallPtr thiscall);
|
virNetClientCallPtr thiscall);
|
||||||
|
static int virNetClientQueueNonBlocking(virNetClientPtr client,
|
||||||
|
virNetMessagePtr msg);
|
||||||
|
|
||||||
|
|
||||||
static void virNetClientLock(virNetClientPtr client)
|
static void virNetClientLock(virNetClientPtr client)
|
||||||
@ -932,14 +934,22 @@ static int virNetClientCallDispatchStream(virNetClientPtr client)
|
|||||||
static int
|
static int
|
||||||
virNetClientCallDispatch(virNetClientPtr client)
|
virNetClientCallDispatch(virNetClientPtr client)
|
||||||
{
|
{
|
||||||
|
virNetMessagePtr response = NULL;
|
||||||
|
|
||||||
PROBE(RPC_CLIENT_MSG_RX,
|
PROBE(RPC_CLIENT_MSG_RX,
|
||||||
"client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
|
"client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
|
||||||
client, client->msg.bufferLength,
|
client, client->msg.bufferLength,
|
||||||
client->msg.header.prog, client->msg.header.vers, client->msg.header.proc,
|
client->msg.header.prog, client->msg.header.vers, client->msg.header.proc,
|
||||||
client->msg.header.type, client->msg.header.status, client->msg.header.serial);
|
client->msg.header.type, client->msg.header.status, client->msg.header.serial);
|
||||||
|
|
||||||
if (virKeepAliveCheckMessage(client->keepalive, &client->msg))
|
if (virKeepAliveCheckMessage(client->keepalive, &client->msg, &response)) {
|
||||||
|
if (response &&
|
||||||
|
virNetClientQueueNonBlocking(client, response) < 0) {
|
||||||
|
VIR_WARN("Could not queue keepalive response");
|
||||||
|
virNetMessageFree(response);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
switch (client->msg.header.type) {
|
switch (client->msg.header.type) {
|
||||||
case VIR_NET_REPLY: /* Normal RPC replies */
|
case VIR_NET_REPLY: /* Normal RPC replies */
|
||||||
@ -1625,6 +1635,8 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
|
|||||||
virNetClientCallRemovePredicate(&client->waitDispatch,
|
virNetClientCallRemovePredicate(&client->waitDispatch,
|
||||||
virNetClientIOEventLoopRemoveDone,
|
virNetClientIOEventLoopRemoveDone,
|
||||||
NULL);
|
NULL);
|
||||||
|
virNetClientIOUpdateCallback(client, true);
|
||||||
|
|
||||||
done:
|
done:
|
||||||
virNetClientUnlock(client);
|
virNetClientUnlock(client);
|
||||||
}
|
}
|
||||||
@ -1684,6 +1696,27 @@ error:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
virNetClientQueueNonBlocking(virNetClientPtr client,
|
||||||
|
virNetMessagePtr msg)
|
||||||
|
{
|
||||||
|
virNetClientCallPtr call;
|
||||||
|
|
||||||
|
PROBE(RPC_CLIENT_MSG_TX_QUEUE,
|
||||||
|
"client=%p len=%zu prog=%u vers=%u proc=%u"
|
||||||
|
" type=%u status=%u serial=%u",
|
||||||
|
client, msg->bufferLength,
|
||||||
|
msg->header.prog, msg->header.vers, msg->header.proc,
|
||||||
|
msg->header.type, msg->header.status, msg->header.serial);
|
||||||
|
|
||||||
|
if (!(call = virNetClientCallNew(msg, false, true)))
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
virNetClientCallQueue(&client->waitDispatch, call);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Returns 1 if the call was queued and will be completed later (only
|
* Returns 1 if the call was queued and will be completed later (only
|
||||||
* for nonBlock==true), 0 if the call was completed and -1 on error.
|
* for nonBlock==true), 0 if the call was completed and -1 on error.
|
||||||
|
@ -103,13 +103,14 @@ struct _virNetServerClient
|
|||||||
virNetServerClientCloseFunc privateDataCloseFunc;
|
virNetServerClientCloseFunc privateDataCloseFunc;
|
||||||
|
|
||||||
virKeepAlivePtr keepalive;
|
virKeepAlivePtr keepalive;
|
||||||
int keepaliveFilter;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
|
static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
|
||||||
static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
|
static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
|
||||||
static void virNetServerClientDispatchRead(virNetServerClientPtr client);
|
static void virNetServerClientDispatchRead(virNetServerClientPtr client);
|
||||||
|
static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
|
||||||
|
virNetMessagePtr msg);
|
||||||
|
|
||||||
static void virNetServerClientLock(virNetServerClientPtr client)
|
static void virNetServerClientLock(virNetServerClientPtr client)
|
||||||
{
|
{
|
||||||
@ -359,7 +360,6 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock,
|
|||||||
client->readonly = readonly;
|
client->readonly = readonly;
|
||||||
client->tlsCtxt = tls;
|
client->tlsCtxt = tls;
|
||||||
client->nrequests_max = nrequests_max;
|
client->nrequests_max = nrequests_max;
|
||||||
client->keepaliveFilter = -1;
|
|
||||||
|
|
||||||
client->sockTimer = virEventAddTimeout(-1, virNetServerClientSockTimerFunc,
|
client->sockTimer = virEventAddTimeout(-1, virNetServerClientSockTimerFunc,
|
||||||
client, NULL);
|
client, NULL);
|
||||||
@ -635,9 +635,6 @@ void virNetServerClientClose(virNetServerClientPtr client)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (client->keepaliveFilter >= 0)
|
|
||||||
virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter);
|
|
||||||
|
|
||||||
if (client->keepalive) {
|
if (client->keepalive) {
|
||||||
virKeepAliveStop(client->keepalive);
|
virKeepAliveStop(client->keepalive);
|
||||||
ka = client->keepalive;
|
ka = client->keepalive;
|
||||||
@ -835,6 +832,7 @@ readmore:
|
|||||||
} else {
|
} else {
|
||||||
/* Grab the completed message */
|
/* Grab the completed message */
|
||||||
virNetMessagePtr msg = client->rx;
|
virNetMessagePtr msg = client->rx;
|
||||||
|
virNetMessagePtr response = NULL;
|
||||||
virNetServerClientFilterPtr filter;
|
virNetServerClientFilterPtr filter;
|
||||||
size_t i;
|
size_t i;
|
||||||
|
|
||||||
@ -885,23 +883,35 @@ readmore:
|
|||||||
msg->header.prog, msg->header.vers, msg->header.proc,
|
msg->header.prog, msg->header.vers, msg->header.proc,
|
||||||
msg->header.type, msg->header.status, msg->header.serial);
|
msg->header.type, msg->header.status, msg->header.serial);
|
||||||
|
|
||||||
/* Maybe send off for queue against a filter */
|
if (virKeepAliveCheckMessage(client->keepalive, msg, &response)) {
|
||||||
filter = client->filters;
|
virNetMessageFree(msg);
|
||||||
while (filter) {
|
client->nrequests--;
|
||||||
int ret = filter->func(client, msg, filter->opaque);
|
msg = NULL;
|
||||||
if (ret < 0) {
|
|
||||||
virNetMessageFree(msg);
|
|
||||||
msg = NULL;
|
|
||||||
if (ret < 0)
|
|
||||||
client->wantClose = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (ret > 0) {
|
|
||||||
msg = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
filter = filter->next;
|
if (response &&
|
||||||
|
virNetServerClientSendMessageLocked(client, response) < 0)
|
||||||
|
virNetMessageFree(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Maybe send off for queue against a filter */
|
||||||
|
if (msg) {
|
||||||
|
filter = client->filters;
|
||||||
|
while (filter) {
|
||||||
|
int ret = filter->func(client, msg, filter->opaque);
|
||||||
|
if (ret < 0) {
|
||||||
|
virNetMessageFree(msg);
|
||||||
|
msg = NULL;
|
||||||
|
if (ret < 0)
|
||||||
|
client->wantClose = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (ret > 0) {
|
||||||
|
msg = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
filter = filter->next;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send off to for normal dispatch to workers */
|
/* Send off to for normal dispatch to workers */
|
||||||
@ -1097,16 +1107,15 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int virNetServerClientSendMessage(virNetServerClientPtr client,
|
static int
|
||||||
virNetMessagePtr msg)
|
virNetServerClientSendMessageLocked(virNetServerClientPtr client,
|
||||||
|
virNetMessagePtr msg)
|
||||||
{
|
{
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu",
|
VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu",
|
||||||
msg, msg->header.proc,
|
msg, msg->header.proc,
|
||||||
msg->bufferLength, msg->bufferOffset);
|
msg->bufferLength, msg->bufferOffset);
|
||||||
|
|
||||||
virNetServerClientLock(client);
|
|
||||||
|
|
||||||
msg->donefds = 0;
|
msg->donefds = 0;
|
||||||
if (client->sock && !client->wantClose) {
|
if (client->sock && !client->wantClose) {
|
||||||
PROBE(RPC_SERVER_CLIENT_MSG_TX_QUEUE,
|
PROBE(RPC_SERVER_CLIENT_MSG_TX_QUEUE,
|
||||||
@ -1120,6 +1129,16 @@ int virNetServerClientSendMessage(virNetServerClientPtr client,
|
|||||||
ret = 0;
|
ret = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int virNetServerClientSendMessage(virNetServerClientPtr client,
|
||||||
|
virNetMessagePtr msg)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
virNetServerClientLock(client);
|
||||||
|
ret = virNetServerClientSendMessageLocked(client, msg);
|
||||||
virNetServerClientUnlock(client);
|
virNetServerClientUnlock(client);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -1156,20 +1175,6 @@ virNetServerClientFreeCB(void *opaque)
|
|||||||
virNetServerClientFree(opaque);
|
virNetServerClientFree(opaque);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
|
||||||
virNetServerClientKeepAliveFilter(virNetServerClientPtr client,
|
|
||||||
virNetMessagePtr msg,
|
|
||||||
void *opaque ATTRIBUTE_UNUSED)
|
|
||||||
{
|
|
||||||
if (virKeepAliveCheckMessage(client->keepalive, msg)) {
|
|
||||||
virNetMessageFree(msg);
|
|
||||||
client->nrequests--;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int
|
int
|
||||||
virNetServerClientInitKeepAlive(virNetServerClientPtr client,
|
virNetServerClientInitKeepAlive(virNetServerClientPtr client,
|
||||||
int interval,
|
int interval,
|
||||||
@ -1188,13 +1193,6 @@ virNetServerClientInitKeepAlive(virNetServerClientPtr client,
|
|||||||
/* keepalive object has a reference to client */
|
/* keepalive object has a reference to client */
|
||||||
client->refs++;
|
client->refs++;
|
||||||
|
|
||||||
client->keepaliveFilter =
|
|
||||||
virNetServerClientAddFilterLocked(client,
|
|
||||||
virNetServerClientKeepAliveFilter,
|
|
||||||
NULL);
|
|
||||||
if (client->keepaliveFilter < 0)
|
|
||||||
goto cleanup;
|
|
||||||
|
|
||||||
client->keepalive = ka;
|
client->keepalive = ka;
|
||||||
ka = NULL;
|
ka = NULL;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user