diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 469c6a5a4a..ffe067c76d 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1703,8 +1703,6 @@ static int virNetClientSendInternal(virNetClientPtr client, return -1; } - virNetClientLock(client); - if (!client->sock || client->wantClose) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("client socket is closed")); @@ -1739,7 +1737,6 @@ static int virNetClientSendInternal(virNetClientPtr client, cleanup: if (ret != 1) VIR_FREE(call); - virNetClientUnlock(client); return ret; } @@ -1757,7 +1754,10 @@ cleanup: int virNetClientSendWithReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, true, false); + int ret; + virNetClientLock(client); + ret = virNetClientSendInternal(client, msg, true, false); + virNetClientUnlock(client); if (ret < 0) return -1; return 0; @@ -1777,7 +1777,10 @@ int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, false, false); + int ret; + virNetClientLock(client); + ret = virNetClientSendInternal(client, msg, false, false); + virNetClientUnlock(client); if (ret < 0) return -1; return 0; @@ -1796,5 +1799,41 @@ int virNetClientSendNoReply(virNetClientPtr client, int virNetClientSendNonBlock(virNetClientPtr client, virNetMessagePtr msg) { - return virNetClientSendInternal(client, msg, false, true); + int ret; + virNetClientLock(client); + ret = virNetClientSendInternal(client, msg, false, true); + virNetClientUnlock(client); + return ret; +} + +/* + * @msg: a message allocated on heap or stack + * + * Send a message synchronously, and wait for the reply synchronously + * + * The caller is responsible for free'ing @msg if it was allocated + * on the heap + * + * Returns 0 on success, -1 on failure + */ +int virNetClientSendWithReplyStream(virNetClientPtr client, + virNetMessagePtr msg, + virNetClientStreamPtr st) +{ + int ret; + virNetClientLock(client); + /* Other thread might have already received + * stream EOF so we don't want sent anything. + * Server won't respond anyway. + */ + if (virNetClientStreamEOF(st)) { + virNetClientUnlock(client); + return 0; + } + + ret = virNetClientSendInternal(client, msg, true, false); + virNetClientUnlock(client); + if (ret < 0) + return -1; + return 0; } diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index 61d51e16d6..7c30d2bac3 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -76,6 +76,9 @@ int virNetClientSendNoReply(virNetClientPtr client, int virNetClientSendNonBlock(virNetClientPtr client, virNetMessagePtr msg); +int virNetClientSendWithReplyStream(virNetClientPtr client, + virNetMessagePtr msg, + virNetClientStreamPtr st); # ifdef HAVE_SASL void virNetClientSetSASLSession(virNetClientPtr client, diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index a4292e77a4..be06c66b48 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -408,7 +408,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virMutexUnlock(&st->lock); - ret = virNetClientSendWithReply(client, msg); + ret = virNetClientSendWithReplyStream(client, msg, st); virMutexLock(&st->lock); virNetMessageFree(msg); @@ -530,3 +530,8 @@ cleanup: virMutexUnlock(&st->lock); return ret; } + +bool virNetClientStreamEOF(virNetClientStreamPtr st) +{ + return st->incomingEOF; +} diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 6c8d538099..fd7a2ee1f7 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -72,5 +72,7 @@ int virNetClientStreamEventUpdateCallback(virNetClientStreamPtr st, int events); int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st); +bool virNetClientStreamEOF(virNetClientStreamPtr st) + ATTRIBUTE_NONNULL(1); #endif /* __VIR_NET_CLIENT_STREAM_H__ */