2009-07-10 12:06:36 +00:00
|
|
|
/*
|
2018-02-20 13:16:28 +00:00
|
|
|
* remote_daemon_stream.c: APIs for managing client streams
|
2009-07-10 12:06:36 +00:00
|
|
|
*
|
2018-02-20 13:16:28 +00:00
|
|
|
* Copyright (C) 2009-2018 Red Hat, Inc.
|
2009-07-10 12:06:36 +00:00
|
|
|
*
|
|
|
|
* This library is free software; you can redistribute it and/or
|
|
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
|
|
* License as published by the Free Software Foundation; either
|
|
|
|
* version 2.1 of the License, or (at your option) any later version.
|
|
|
|
*
|
|
|
|
* This library is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
|
|
* Lesser General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Lesser General Public
|
2012-09-20 22:30:55 +00:00
|
|
|
* License along with this library. If not, see
|
2012-07-21 10:06:23 +00:00
|
|
|
* <http://www.gnu.org/licenses/>.
|
2009-07-10 12:06:36 +00:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
#include <config.h>
|
|
|
|
|
2018-02-20 13:16:28 +00:00
|
|
|
#include "remote_daemon_stream.h"
|
|
|
|
#include "remote_daemon_dispatch.h"
|
2012-12-12 18:06:53 +00:00
|
|
|
#include "viralloc.h"
|
2012-12-12 17:59:27 +00:00
|
|
|
#include "virlog.h"
|
2011-05-16 17:13:11 +00:00
|
|
|
#include "virnetserverclient.h"
|
2012-12-13 18:21:53 +00:00
|
|
|
#include "virerror.h"
|
2016-04-11 09:58:19 +00:00
|
|
|
#include "libvirt_internal.h"
|
2011-05-14 15:46:00 +00:00
|
|
|
|
|
|
|
#define VIR_FROM_THIS VIR_FROM_STREAMS
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2014-02-28 12:16:17 +00:00
|
|
|
VIR_LOG_INIT("daemon.stream");
|
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
struct daemonClientStream {
|
|
|
|
daemonClientPrivatePtr priv;
|
2011-08-31 16:01:01 +00:00
|
|
|
int refs;
|
2011-05-16 17:13:11 +00:00
|
|
|
|
|
|
|
virNetServerProgramPtr prog;
|
|
|
|
|
|
|
|
virStreamPtr st;
|
|
|
|
int procedure;
|
2016-03-29 15:33:14 +00:00
|
|
|
unsigned int serial;
|
2011-05-16 17:13:11 +00:00
|
|
|
|
2016-04-04 10:46:19 +00:00
|
|
|
bool recvEOF;
|
|
|
|
bool closed;
|
2011-05-16 17:13:11 +00:00
|
|
|
|
|
|
|
int filterID;
|
|
|
|
|
|
|
|
virNetMessagePtr rx;
|
2016-04-04 10:49:48 +00:00
|
|
|
bool tx;
|
2011-05-16 17:13:11 +00:00
|
|
|
|
2016-04-11 14:50:04 +00:00
|
|
|
bool allowSkip;
|
2016-04-11 09:58:19 +00:00
|
|
|
size_t dataLen; /* How much data is there remaining until we see a hole */
|
2016-04-11 14:50:04 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonClientStreamPtr next;
|
|
|
|
};
|
|
|
|
|
2009-08-24 19:53:48 +00:00
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleWrite(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream);
|
2009-08-24 19:53:48 +00:00
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleRead(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream);
|
2009-08-24 19:57:16 +00:00
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleFinish(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream,
|
|
|
|
virNetMessagePtr msg);
|
2009-08-24 19:53:48 +00:00
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleAbort(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream,
|
|
|
|
virNetMessagePtr msg);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamUpdateEvents(daemonClientStream *stream)
|
2009-08-24 19:53:48 +00:00
|
|
|
{
|
|
|
|
int newEvents = 0;
|
2016-04-25 17:18:26 +00:00
|
|
|
if (stream->closed)
|
|
|
|
return;
|
2009-08-24 19:53:48 +00:00
|
|
|
if (stream->rx)
|
|
|
|
newEvents |= VIR_STREAM_EVENT_WRITABLE;
|
2009-08-24 19:57:16 +00:00
|
|
|
if (stream->tx && !stream->recvEOF)
|
|
|
|
newEvents |= VIR_STREAM_EVENT_READABLE;
|
2009-08-24 19:53:48 +00:00
|
|
|
|
|
|
|
virStreamEventUpdateCallback(stream->st, newEvents);
|
|
|
|
}
|
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
/*
|
|
|
|
* Invoked when an outgoing data packet message has been fully sent.
|
|
|
|
* This simply re-enables TX of further data.
|
|
|
|
*
|
|
|
|
* The idea is to stop the daemon growing without bound due to
|
|
|
|
* fast stream, but slow client
|
|
|
|
*/
|
|
|
|
static void
|
2016-03-29 15:33:14 +00:00
|
|
|
daemonStreamMessageFinished(virNetMessagePtr msg,
|
2011-05-16 17:13:11 +00:00
|
|
|
void *opaque)
|
|
|
|
{
|
|
|
|
daemonClientStream *stream = opaque;
|
2016-03-29 15:33:14 +00:00
|
|
|
VIR_DEBUG("stream=%p proc=%d serial=%u",
|
2011-05-16 17:13:11 +00:00
|
|
|
stream, msg->header.proc, msg->header.serial);
|
|
|
|
|
2016-04-04 10:49:48 +00:00
|
|
|
stream->tx = true;
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamUpdateEvents(stream);
|
2011-08-31 16:01:01 +00:00
|
|
|
|
|
|
|
daemonFreeClientStream(NULL, stream);
|
2011-05-16 17:13:11 +00:00
|
|
|
}
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2011-07-08 11:33:52 +00:00
|
|
|
|
2009-08-24 19:53:48 +00:00
|
|
|
/*
|
|
|
|
* Callback that gets invoked when a stream becomes writable/readable
|
|
|
|
*/
|
|
|
|
static void
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamEvent(virStreamPtr st, int events, void *opaque)
|
2009-08-24 19:53:48 +00:00
|
|
|
{
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetServerClientPtr client = opaque;
|
|
|
|
daemonClientStream *stream;
|
|
|
|
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
virMutexLock(&priv->lock);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
stream = priv->streams;
|
|
|
|
while (stream) {
|
|
|
|
if (stream->st == st)
|
|
|
|
break;
|
|
|
|
stream = stream->next;
|
|
|
|
}
|
2009-08-24 19:53:48 +00:00
|
|
|
|
|
|
|
if (!stream) {
|
|
|
|
VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st);
|
|
|
|
virStreamEventRemoveCallback(st);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
VIR_DEBUG("st=%p events=%d EOF=%d closed=%d", st, events, stream->recvEOF, stream->closed);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2011-10-07 16:38:59 +00:00
|
|
|
if (!stream->closed &&
|
|
|
|
(events & VIR_STREAM_EVENT_WRITABLE)) {
|
2011-05-16 17:13:11 +00:00
|
|
|
if (daemonStreamHandleWrite(client, stream) < 0) {
|
|
|
|
daemonRemoveClientStream(client, stream);
|
|
|
|
virNetServerClientClose(client);
|
2009-08-24 19:53:48 +00:00
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2011-10-07 16:38:59 +00:00
|
|
|
if (!stream->closed && !stream->recvEOF &&
|
|
|
|
(events & (VIR_STREAM_EVENT_READABLE))) {
|
|
|
|
events = events & ~(VIR_STREAM_EVENT_READABLE);
|
2011-05-16 17:13:11 +00:00
|
|
|
if (daemonStreamHandleRead(client, stream) < 0) {
|
|
|
|
daemonRemoveClientStream(client, stream);
|
|
|
|
virNetServerClientClose(client);
|
2009-08-24 19:57:16 +00:00
|
|
|
goto cleanup;
|
|
|
|
}
|
2013-04-09 12:24:02 +00:00
|
|
|
/* If we detected EOF during read processing,
|
|
|
|
* then clear hangup/error conditions, since
|
|
|
|
* we want the client to see the EOF message
|
|
|
|
* we just sent them
|
|
|
|
*/
|
|
|
|
if (stream->recvEOF)
|
|
|
|
events = events & ~(VIR_STREAM_EVENT_HANGUP |
|
|
|
|
VIR_STREAM_EVENT_ERROR);
|
2009-08-24 19:57:16 +00:00
|
|
|
}
|
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
/* If we have a completion/abort message, always process it */
|
|
|
|
if (stream->rx) {
|
|
|
|
virNetMessagePtr msg = stream->rx;
|
|
|
|
switch (msg->header.status) {
|
|
|
|
case VIR_NET_CONTINUE:
|
|
|
|
/* nada */
|
|
|
|
break;
|
|
|
|
case VIR_NET_OK:
|
|
|
|
virNetMessageQueueServe(&stream->rx);
|
|
|
|
if (daemonStreamHandleFinish(client, stream, msg) < 0) {
|
|
|
|
virNetMessageFree(msg);
|
|
|
|
daemonRemoveClientStream(client, stream);
|
|
|
|
virNetServerClientClose(client);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case VIR_NET_ERROR:
|
|
|
|
default:
|
|
|
|
virNetMessageQueueServe(&stream->rx);
|
|
|
|
if (daemonStreamHandleAbort(client, stream, msg) < 0) {
|
|
|
|
virNetMessageFree(msg);
|
|
|
|
daemonRemoveClientStream(client, stream);
|
|
|
|
virNetServerClientClose(client);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2011-10-07 16:38:59 +00:00
|
|
|
|
|
|
|
/* If we got HANGUP, we need to only send an empty
|
|
|
|
* packet so the client sees an EOF and cleans up
|
|
|
|
*/
|
|
|
|
if (!stream->closed && !stream->recvEOF &&
|
|
|
|
(events & VIR_STREAM_EVENT_HANGUP)) {
|
|
|
|
virNetMessagePtr msg;
|
|
|
|
events &= ~(VIR_STREAM_EVENT_HANGUP);
|
2016-04-04 10:49:48 +00:00
|
|
|
stream->tx = false;
|
2016-04-04 10:46:19 +00:00
|
|
|
stream->recvEOF = true;
|
2011-10-07 16:38:59 +00:00
|
|
|
if (!(msg = virNetMessageNew(false))) {
|
|
|
|
daemonRemoveClientStream(client, stream);
|
|
|
|
virNetServerClientClose(client);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
msg->cb = daemonStreamMessageFinished;
|
|
|
|
msg->opaque = stream;
|
|
|
|
stream->refs++;
|
2018-03-08 12:20:38 +00:00
|
|
|
if (virNetServerProgramSendStreamData(stream->prog,
|
2011-10-07 16:38:59 +00:00
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial,
|
|
|
|
"", 0) < 0) {
|
|
|
|
virNetMessageFree(msg);
|
|
|
|
daemonRemoveClientStream(client, stream);
|
|
|
|
virNetServerClientClose(client);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-08-24 19:53:48 +00:00
|
|
|
if (!stream->closed &&
|
|
|
|
(events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
|
|
|
|
int ret;
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessagePtr msg;
|
|
|
|
virNetMessageError rerr;
|
2018-12-06 17:33:20 +00:00
|
|
|
virErrorPtr origErr;
|
|
|
|
|
|
|
|
virErrorPreserveLast(&origErr);
|
2011-05-16 17:13:11 +00:00
|
|
|
|
|
|
|
memset(&rerr, 0, sizeof(rerr));
|
2016-04-04 10:46:19 +00:00
|
|
|
stream->closed = true;
|
2010-08-17 15:30:51 +00:00
|
|
|
virStreamEventRemoveCallback(stream->st);
|
2009-08-24 19:53:48 +00:00
|
|
|
virStreamAbort(stream->st);
|
2017-05-29 14:29:36 +00:00
|
|
|
if (origErr && origErr->code != VIR_ERR_OK) {
|
2018-12-06 17:33:20 +00:00
|
|
|
virErrorRestore(&origErr);
|
2017-05-29 14:29:36 +00:00
|
|
|
} else {
|
2018-12-06 17:33:20 +00:00
|
|
|
virFreeError(origErr);
|
2017-05-29 14:29:36 +00:00
|
|
|
if (events & VIR_STREAM_EVENT_HANGUP)
|
|
|
|
virReportError(VIR_ERR_RPC,
|
|
|
|
"%s", _("stream had unexpected termination"));
|
|
|
|
else
|
|
|
|
virReportError(VIR_ERR_RPC,
|
|
|
|
"%s", _("stream had I/O failure"));
|
|
|
|
}
|
2011-05-16 17:13:11 +00:00
|
|
|
|
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 16:42:58 +00:00
|
|
|
msg = virNetMessageNew(false);
|
2011-05-16 17:13:11 +00:00
|
|
|
if (!msg) {
|
|
|
|
ret = -1;
|
|
|
|
} else {
|
2018-03-08 12:20:38 +00:00
|
|
|
ret = virNetServerProgramSendStreamError(stream->prog,
|
2011-05-16 17:13:11 +00:00
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
&rerr,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial);
|
|
|
|
}
|
|
|
|
daemonRemoveClientStream(client, stream);
|
2009-08-24 19:53:48 +00:00
|
|
|
if (ret < 0)
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetServerClientClose(client);
|
2009-08-24 19:53:48 +00:00
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (stream->closed) {
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonRemoveClientStream(client, stream);
|
2009-08-24 19:53:48 +00:00
|
|
|
} else {
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamUpdateEvents(stream);
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
|
2014-03-25 06:45:38 +00:00
|
|
|
cleanup:
|
2011-05-16 17:13:11 +00:00
|
|
|
virMutexUnlock(&priv->lock);
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
|
2009-07-10 12:06:36 +00:00
|
|
|
|
|
|
|
/*
|
|
|
|
* @client: a locked client object
|
|
|
|
*
|
|
|
|
* Invoked by the main loop when filtering incoming messages.
|
|
|
|
*
|
|
|
|
* Returns 1 if the message was processed, 0 if skipped,
|
|
|
|
* -1 on fatal client error
|
|
|
|
*/
|
|
|
|
static int
|
2019-11-19 11:39:50 +00:00
|
|
|
daemonStreamFilter(virNetServerClientPtr client,
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessagePtr msg,
|
|
|
|
void *opaque)
|
2009-07-10 12:06:36 +00:00
|
|
|
{
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonClientStream *stream = opaque;
|
|
|
|
int ret = 0;
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2019-11-25 14:13:17 +00:00
|
|
|
/* We must honour lock ordering here. Client private data lock must
|
|
|
|
* be acquired before client lock. Bu we are already called with
|
|
|
|
* client locked. To avoid stream disappearing while we unlock
|
|
|
|
* everything, let's increase its refcounter. This has some
|
|
|
|
* implications though. */
|
|
|
|
stream->refs++;
|
2019-11-19 11:39:50 +00:00
|
|
|
virObjectUnlock(client);
|
2011-05-16 17:13:11 +00:00
|
|
|
virMutexLock(&stream->priv->lock);
|
2019-11-19 11:39:50 +00:00
|
|
|
virObjectLock(client);
|
2011-05-16 17:13:11 +00:00
|
|
|
|
2019-11-25 14:13:17 +00:00
|
|
|
if (stream->refs == 1) {
|
|
|
|
/* So we are the only ones holding the reference to the stream.
|
|
|
|
* Return 1 to signal to the caller that we've processed the
|
|
|
|
* message. And to "process" means free. */
|
|
|
|
virNetMessageFree(msg);
|
|
|
|
ret = 1;
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
2016-04-04 12:54:46 +00:00
|
|
|
if (msg->header.type != VIR_NET_STREAM &&
|
|
|
|
msg->header.type != VIR_NET_STREAM_HOLE)
|
2011-05-16 17:13:11 +00:00
|
|
|
goto cleanup;
|
|
|
|
|
|
|
|
if (!virNetServerProgramMatches(stream->prog, msg))
|
|
|
|
goto cleanup;
|
|
|
|
|
|
|
|
if (msg->header.proc != stream->procedure ||
|
|
|
|
msg->header.serial != stream->serial)
|
|
|
|
goto cleanup;
|
|
|
|
|
2016-03-29 15:33:14 +00:00
|
|
|
VIR_DEBUG("Incoming client=%p, rx=%p, serial=%u, proc=%d, status=%d",
|
2011-05-16 17:13:11 +00:00
|
|
|
client, stream->rx, msg->header.proc,
|
|
|
|
msg->header.serial, msg->header.status);
|
|
|
|
|
|
|
|
virNetMessageQueuePush(&stream->rx, msg);
|
|
|
|
daemonStreamUpdateEvents(stream);
|
|
|
|
ret = 1;
|
|
|
|
|
2014-03-25 06:45:38 +00:00
|
|
|
cleanup:
|
2011-05-16 17:13:11 +00:00
|
|
|
virMutexUnlock(&stream->priv->lock);
|
2019-11-25 14:13:17 +00:00
|
|
|
/* Don't pass client here, because client is locked here and this
|
|
|
|
* function might try to lock it again which would result in a
|
|
|
|
* deadlock. */
|
|
|
|
daemonFreeClientStream(NULL, stream);
|
2011-05-16 17:13:11 +00:00
|
|
|
return ret;
|
2009-07-10 12:06:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* @conn: a connection object to associate the stream with
|
2011-05-16 17:13:11 +00:00
|
|
|
* @header: the method call to associate with the stream
|
2009-07-10 12:06:36 +00:00
|
|
|
*
|
|
|
|
* Creates a new stream for this conn
|
|
|
|
*
|
|
|
|
* Returns a new stream object, or NULL upon OOM
|
|
|
|
*/
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonClientStream *
|
|
|
|
daemonCreateClientStream(virNetServerClientPtr client,
|
|
|
|
virStreamPtr st,
|
|
|
|
virNetServerProgramPtr prog,
|
2016-04-11 14:50:04 +00:00
|
|
|
virNetMessageHeaderPtr header,
|
|
|
|
bool allowSkip)
|
2009-07-10 12:06:36 +00:00
|
|
|
{
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonClientStream *stream;
|
|
|
|
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2016-03-29 15:33:14 +00:00
|
|
|
VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p",
|
2011-05-16 17:13:11 +00:00
|
|
|
client, header->proc, header->serial, st);
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2013-07-04 09:58:18 +00:00
|
|
|
if (VIR_ALLOC(stream) < 0)
|
2009-07-10 12:06:36 +00:00
|
|
|
return NULL;
|
|
|
|
|
2011-08-31 16:01:01 +00:00
|
|
|
stream->refs = 1;
|
2011-05-16 17:13:11 +00:00
|
|
|
stream->priv = priv;
|
2012-07-11 13:35:52 +00:00
|
|
|
stream->prog = virObjectRef(prog);
|
2011-05-16 17:13:11 +00:00
|
|
|
stream->procedure = header->proc;
|
|
|
|
stream->serial = header->serial;
|
|
|
|
stream->filterID = -1;
|
|
|
|
stream->st = st;
|
2016-04-11 14:50:04 +00:00
|
|
|
stream->allowSkip = allowSkip;
|
2009-07-10 12:06:36 +00:00
|
|
|
|
|
|
|
return stream;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* @stream: an unused client stream
|
|
|
|
*
|
|
|
|
* Frees the memory associated with this inactive client
|
|
|
|
* stream
|
|
|
|
*/
|
2011-05-16 17:13:11 +00:00
|
|
|
int daemonFreeClientStream(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream)
|
2009-07-10 12:06:36 +00:00
|
|
|
{
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessagePtr msg;
|
|
|
|
int ret = 0;
|
2009-07-10 12:06:36 +00:00
|
|
|
|
|
|
|
if (!stream)
|
2011-05-16 17:13:11 +00:00
|
|
|
return 0;
|
|
|
|
|
2011-08-31 16:01:01 +00:00
|
|
|
stream->refs--;
|
|
|
|
if (stream->refs)
|
|
|
|
return 0;
|
|
|
|
|
2016-03-29 15:33:14 +00:00
|
|
|
VIR_DEBUG("client=%p, proc=%d, serial=%u",
|
2011-05-16 17:13:11 +00:00
|
|
|
client, stream->procedure, stream->serial);
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2012-07-11 13:35:52 +00:00
|
|
|
virObjectUnref(stream->prog);
|
2009-07-10 12:06:36 +00:00
|
|
|
|
|
|
|
msg = stream->rx;
|
|
|
|
while (msg) {
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessagePtr tmp = msg->next;
|
2011-08-14 22:44:45 +00:00
|
|
|
if (client) {
|
|
|
|
/* Send a dummy reply to free up 'msg' & unblock client rx */
|
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 16:42:58 +00:00
|
|
|
virNetMessageClear(msg);
|
2011-08-14 22:44:45 +00:00
|
|
|
msg->header.type = VIR_NET_REPLY;
|
|
|
|
if (virNetServerClientSendMessage(client, msg) < 0) {
|
|
|
|
virNetServerClientImmediateClose(client);
|
|
|
|
virNetMessageFree(msg);
|
|
|
|
ret = -1;
|
|
|
|
}
|
|
|
|
} else {
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessageFree(msg);
|
|
|
|
}
|
2009-07-10 12:06:36 +00:00
|
|
|
msg = tmp;
|
|
|
|
}
|
|
|
|
|
2014-11-30 15:19:38 +00:00
|
|
|
virObjectUnref(stream->st);
|
2009-07-10 12:06:36 +00:00
|
|
|
VIR_FREE(stream);
|
2011-05-16 17:13:11 +00:00
|
|
|
|
|
|
|
return ret;
|
2009-07-10 12:06:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* @client: a locked client to add the stream to
|
|
|
|
* @stream: a stream to add
|
|
|
|
*/
|
2011-05-16 17:13:11 +00:00
|
|
|
int daemonAddClientStream(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream,
|
|
|
|
bool transmit)
|
2009-07-10 12:06:36 +00:00
|
|
|
{
|
2020-08-03 15:28:06 +00:00
|
|
|
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
|
|
|
|
|
2016-03-29 15:33:14 +00:00
|
|
|
VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p, transmit=%d",
|
2011-05-16 17:13:11 +00:00
|
|
|
client, stream->procedure, stream->serial, stream->st, transmit);
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
if (stream->filterID != -1) {
|
|
|
|
VIR_WARN("Filter already added to client %p", client);
|
|
|
|
return -1;
|
|
|
|
}
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2009-08-24 19:53:48 +00:00
|
|
|
if (virStreamEventAddCallback(stream->st, 0,
|
2011-07-08 11:33:52 +00:00
|
|
|
daemonStreamEvent, client,
|
2012-07-11 13:35:52 +00:00
|
|
|
virObjectFreeCallback) < 0)
|
2009-08-24 19:53:48 +00:00
|
|
|
return -1;
|
|
|
|
|
2012-07-11 13:35:52 +00:00
|
|
|
virObjectRef(client);
|
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
if ((stream->filterID = virNetServerClientAddFilter(client,
|
|
|
|
daemonStreamFilter,
|
|
|
|
stream)) < 0) {
|
|
|
|
virStreamEventRemoveCallback(stream->st);
|
|
|
|
return -1;
|
2009-07-10 12:06:36 +00:00
|
|
|
}
|
|
|
|
|
2009-09-30 10:47:43 +00:00
|
|
|
if (transmit)
|
2016-04-04 10:49:48 +00:00
|
|
|
stream->tx = true;
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
virMutexLock(&priv->lock);
|
|
|
|
stream->next = priv->streams;
|
|
|
|
priv->streams = stream;
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamUpdateEvents(stream);
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
virMutexUnlock(&priv->lock);
|
2009-07-10 12:06:36 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
return 0;
|
2009-07-10 12:06:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* @client: a locked client object
|
|
|
|
* @stream: an inactive, closed stream object
|
|
|
|
*
|
|
|
|
* Removes a stream from the list of active streams for the client
|
|
|
|
*
|
2016-09-21 08:36:09 +00:00
|
|
|
* Returns 0 if the stream was removed, -1 if it doesn't exist
|
2009-07-10 12:06:36 +00:00
|
|
|
*/
|
|
|
|
int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonRemoveClientStream(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream)
|
2009-07-10 12:06:36 +00:00
|
|
|
{
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
|
|
|
|
daemonClientStream *curr = priv->streams;
|
|
|
|
daemonClientStream *prev = NULL;
|
|
|
|
|
2020-08-03 15:28:06 +00:00
|
|
|
VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p",
|
|
|
|
client, stream->procedure, stream->serial, stream->st);
|
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
if (stream->filterID != -1) {
|
|
|
|
virNetServerClientRemoveFilter(client,
|
|
|
|
stream->filterID);
|
|
|
|
stream->filterID = -1;
|
2009-07-10 12:06:36 +00:00
|
|
|
}
|
|
|
|
|
2010-08-17 15:30:51 +00:00
|
|
|
if (!stream->closed) {
|
2016-04-25 18:16:07 +00:00
|
|
|
stream->closed = true;
|
2010-08-17 15:30:51 +00:00
|
|
|
virStreamEventRemoveCallback(stream->st);
|
2009-07-10 12:06:36 +00:00
|
|
|
virStreamAbort(stream->st);
|
2010-08-17 15:30:51 +00:00
|
|
|
}
|
2009-07-10 12:06:36 +00:00
|
|
|
|
|
|
|
while (curr) {
|
|
|
|
if (curr == stream) {
|
|
|
|
if (prev)
|
|
|
|
prev->next = curr->next;
|
|
|
|
else
|
2011-05-16 17:13:11 +00:00
|
|
|
priv->streams = curr->next;
|
|
|
|
return daemonFreeClientStream(client, stream);
|
2009-07-10 12:06:36 +00:00
|
|
|
}
|
|
|
|
prev = curr;
|
|
|
|
curr = curr->next;
|
|
|
|
}
|
|
|
|
return -1;
|
|
|
|
}
|
2009-08-24 19:53:48 +00:00
|
|
|
|
|
|
|
|
2011-08-14 22:44:45 +00:00
|
|
|
void
|
|
|
|
daemonRemoveAllClientStreams(daemonClientStream *stream)
|
|
|
|
{
|
|
|
|
daemonClientStream *tmp;
|
|
|
|
|
|
|
|
VIR_DEBUG("stream=%p", stream);
|
|
|
|
|
|
|
|
while (stream) {
|
|
|
|
tmp = stream->next;
|
|
|
|
|
|
|
|
if (!stream->closed) {
|
2016-04-25 18:16:07 +00:00
|
|
|
stream->closed = true;
|
2011-08-14 22:44:45 +00:00
|
|
|
virStreamEventRemoveCallback(stream->st);
|
|
|
|
virStreamAbort(stream->st);
|
|
|
|
}
|
|
|
|
|
|
|
|
daemonFreeClientStream(NULL, stream);
|
|
|
|
|
|
|
|
VIR_DEBUG("next stream=%p", tmp);
|
|
|
|
stream = tmp;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-08-24 19:53:48 +00:00
|
|
|
/*
|
|
|
|
* Returns:
|
|
|
|
* -1 if fatal error occurred
|
|
|
|
* 0 if message was fully processed
|
|
|
|
* 1 if message is still being processed
|
|
|
|
*/
|
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleWriteData(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream,
|
|
|
|
virNetMessagePtr msg)
|
2009-08-24 19:53:48 +00:00
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
2016-03-29 15:33:14 +00:00
|
|
|
VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u, len=%zu, offset=%zu",
|
2011-05-16 17:13:11 +00:00
|
|
|
client, stream, msg->header.proc, msg->header.serial,
|
|
|
|
msg->bufferLength, msg->bufferOffset);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
|
|
|
ret = virStreamSend(stream->st,
|
|
|
|
msg->buffer + msg->bufferOffset,
|
|
|
|
msg->bufferLength - msg->bufferOffset);
|
|
|
|
|
|
|
|
if (ret > 0) {
|
|
|
|
msg->bufferOffset += ret;
|
|
|
|
|
|
|
|
/* Partial write, so indicate we have more todo later */
|
|
|
|
if (msg->bufferOffset < msg->bufferLength)
|
|
|
|
return 1;
|
|
|
|
} else if (ret == -2) {
|
|
|
|
/* Blocking, so indicate we have more todo later */
|
|
|
|
return 1;
|
2018-03-12 09:18:05 +00:00
|
|
|
} else if (ret < 0) {
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessageError rerr;
|
2018-12-06 17:33:20 +00:00
|
|
|
virErrorPtr err;
|
|
|
|
|
|
|
|
virErrorPreserveLast(&err);
|
2011-05-16 17:13:11 +00:00
|
|
|
|
|
|
|
memset(&rerr, 0, sizeof(rerr));
|
|
|
|
|
2011-05-09 09:24:09 +00:00
|
|
|
VIR_INFO("Stream send failed");
|
2016-04-04 10:46:19 +00:00
|
|
|
stream->closed = true;
|
2016-04-25 17:15:48 +00:00
|
|
|
virStreamEventRemoveCallback(stream->st);
|
|
|
|
virStreamAbort(stream->st);
|
|
|
|
|
2018-12-06 17:33:20 +00:00
|
|
|
virErrorRestore(&err);
|
2018-03-12 09:18:05 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
return virNetServerProgramSendReplyError(stream->prog,
|
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
&rerr,
|
|
|
|
&msg->header);
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
2011-12-04 00:06:07 +00:00
|
|
|
* Process a finish handshake from the client.
|
2009-08-24 19:53:48 +00:00
|
|
|
*
|
2011-05-16 17:13:11 +00:00
|
|
|
* Returns a VIR_NET_OK confirmation if successful, or a VIR_NET_ERROR
|
2009-08-24 19:53:48 +00:00
|
|
|
* if there was a stream error
|
|
|
|
*
|
|
|
|
* Returns 0 if successfully sent RPC reply, -1 upon fatal error
|
|
|
|
*/
|
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleFinish(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream,
|
|
|
|
virNetMessagePtr msg)
|
2009-08-24 19:53:48 +00:00
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
2016-03-29 15:33:14 +00:00
|
|
|
VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u",
|
2011-05-16 17:13:11 +00:00
|
|
|
client, stream, msg->header.proc, msg->header.serial);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2016-04-04 10:46:19 +00:00
|
|
|
stream->closed = true;
|
2010-08-17 15:30:51 +00:00
|
|
|
virStreamEventRemoveCallback(stream->st);
|
2009-08-24 19:53:48 +00:00
|
|
|
ret = virStreamFinish(stream->st);
|
|
|
|
|
|
|
|
if (ret < 0) {
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessageError rerr;
|
|
|
|
memset(&rerr, 0, sizeof(rerr));
|
|
|
|
return virNetServerProgramSendReplyError(stream->prog,
|
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
&rerr,
|
|
|
|
&msg->header);
|
2009-08-24 19:53:48 +00:00
|
|
|
} else {
|
|
|
|
/* Send zero-length confirm */
|
2011-05-16 17:13:11 +00:00
|
|
|
return virNetServerProgramSendStreamData(stream->prog,
|
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial,
|
|
|
|
NULL, 0);
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Process an abort request from the client.
|
|
|
|
*
|
|
|
|
* Returns 0 if successfully aborted, -1 upon error
|
|
|
|
*/
|
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleAbort(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream,
|
|
|
|
virNetMessagePtr msg)
|
2009-08-24 19:53:48 +00:00
|
|
|
{
|
2016-04-24 20:28:18 +00:00
|
|
|
int ret;
|
|
|
|
bool raise_error = false;
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2020-08-03 15:28:06 +00:00
|
|
|
VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u",
|
|
|
|
client, stream, msg->header.proc, msg->header.serial);
|
|
|
|
|
2016-04-04 10:46:19 +00:00
|
|
|
stream->closed = true;
|
2010-08-17 15:30:51 +00:00
|
|
|
virStreamEventRemoveCallback(stream->st);
|
2016-04-24 20:28:18 +00:00
|
|
|
ret = virStreamAbort(stream->st);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2014-09-03 19:39:21 +00:00
|
|
|
if (msg->header.status == VIR_NET_ERROR) {
|
2016-04-24 20:28:18 +00:00
|
|
|
VIR_INFO("stream aborted at client request");
|
|
|
|
raise_error = (ret < 0);
|
2014-09-03 19:39:21 +00:00
|
|
|
} else {
|
2012-07-18 18:30:53 +00:00
|
|
|
virReportError(VIR_ERR_RPC,
|
|
|
|
_("stream aborted with unexpected status %d"),
|
|
|
|
msg->header.status);
|
2016-04-24 20:28:18 +00:00
|
|
|
raise_error = true;
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
|
2016-04-24 20:28:18 +00:00
|
|
|
if (raise_error) {
|
|
|
|
virNetMessageError rerr;
|
|
|
|
memset(&rerr, 0, sizeof(rerr));
|
2018-03-08 12:20:38 +00:00
|
|
|
return virNetServerProgramSendReplyError(stream->prog,
|
2016-04-24 20:28:18 +00:00
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
&rerr,
|
|
|
|
&msg->header);
|
|
|
|
} else {
|
|
|
|
/* Send zero-length confirm */
|
|
|
|
return virNetServerProgramSendStreamData(stream->prog,
|
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial,
|
|
|
|
NULL, 0);
|
|
|
|
}
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-05-12 09:26:33 +00:00
|
|
|
static int
|
|
|
|
daemonStreamHandleHole(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream,
|
|
|
|
virNetMessagePtr msg)
|
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
virNetStreamHole data;
|
|
|
|
|
|
|
|
VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u",
|
|
|
|
client, stream, msg->header.proc, msg->header.serial);
|
|
|
|
|
|
|
|
/* Let's check if client plays nicely and advertised usage of
|
|
|
|
* sparse stream upfront. */
|
|
|
|
if (!stream->allowSkip) {
|
|
|
|
virReportError(VIR_ERR_RPC, "%s",
|
|
|
|
_("Unexpected stream hole"));
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (virNetMessageDecodePayload(msg,
|
|
|
|
(xdrproc_t) xdr_virNetStreamHole,
|
|
|
|
&data) < 0)
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
ret = virStreamSendHole(stream->st, data.length, data.flags);
|
|
|
|
|
|
|
|
if (ret < 0) {
|
|
|
|
virNetMessageError rerr;
|
|
|
|
|
|
|
|
memset(&rerr, 0, sizeof(rerr));
|
|
|
|
|
|
|
|
VIR_INFO("Stream send hole failed");
|
|
|
|
stream->closed = true;
|
|
|
|
virStreamEventRemoveCallback(stream->st);
|
|
|
|
virStreamAbort(stream->st);
|
|
|
|
|
|
|
|
return virNetServerProgramSendReplyError(stream->prog,
|
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
&rerr,
|
|
|
|
&msg->header);
|
|
|
|
}
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2009-08-24 19:53:48 +00:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Called when the stream is signalled has being able to accept
|
|
|
|
* data writes. Will process all pending incoming messages
|
|
|
|
* until they're all gone, or I/O blocks
|
|
|
|
*
|
|
|
|
* Returns 0 on success, or -1 upon fatal error
|
|
|
|
*/
|
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleWrite(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream)
|
2009-08-24 19:53:48 +00:00
|
|
|
{
|
2011-05-16 17:13:11 +00:00
|
|
|
VIR_DEBUG("client=%p, stream=%p", client, stream);
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
while (stream->rx && !stream->closed) {
|
|
|
|
virNetMessagePtr msg = stream->rx;
|
2009-08-24 19:53:48 +00:00
|
|
|
int ret;
|
2011-05-16 17:13:11 +00:00
|
|
|
|
2017-05-12 09:26:33 +00:00
|
|
|
if (msg->header.type == VIR_NET_STREAM_HOLE) {
|
|
|
|
/* Handle special case when the client sent us a hole.
|
|
|
|
* Otherwise just carry on with processing stream
|
|
|
|
* data. */
|
|
|
|
ret = daemonStreamHandleHole(client, stream, msg);
|
|
|
|
} else if (msg->header.type == VIR_NET_STREAM) {
|
|
|
|
switch (msg->header.status) {
|
|
|
|
case VIR_NET_OK:
|
|
|
|
ret = daemonStreamHandleFinish(client, stream, msg);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case VIR_NET_CONTINUE:
|
|
|
|
ret = daemonStreamHandleWriteData(client, stream, msg);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case VIR_NET_ERROR:
|
|
|
|
default:
|
|
|
|
ret = daemonStreamHandleAbort(client, stream, msg);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
virReportError(VIR_ERR_RPC,
|
|
|
|
_("Unexpected message type: %d"),
|
|
|
|
msg->header.type);
|
|
|
|
ret = -1;
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
if (ret > 0)
|
|
|
|
break; /* still processing data from msg */
|
2009-08-24 19:53:48 +00:00
|
|
|
|
2011-05-16 17:13:11 +00:00
|
|
|
virNetMessageQueueServe(&stream->rx);
|
|
|
|
if (ret < 0) {
|
|
|
|
virNetMessageFree(msg);
|
2011-07-08 11:54:29 +00:00
|
|
|
virNetServerClientImmediateClose(client);
|
2011-05-16 17:13:11 +00:00
|
|
|
return -1;
|
|
|
|
}
|
2011-06-30 11:28:10 +00:00
|
|
|
|
|
|
|
/* 'CONTINUE' messages don't send a reply (unless error
|
|
|
|
* occurred), so to release the 'msg' object we need to
|
|
|
|
* send a fake zero-length reply. Nothing actually gets
|
|
|
|
* onto the wire, but this causes the client to reset
|
|
|
|
* its active request count / throttling
|
|
|
|
*/
|
|
|
|
if (msg->header.status == VIR_NET_CONTINUE) {
|
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 16:42:58 +00:00
|
|
|
virNetMessageClear(msg);
|
2011-06-30 11:28:10 +00:00
|
|
|
msg->header.type = VIR_NET_REPLY;
|
|
|
|
if (virNetServerClientSendMessage(client, msg) < 0) {
|
|
|
|
virNetMessageFree(msg);
|
2011-07-08 11:54:29 +00:00
|
|
|
virNetServerClientImmediateClose(client);
|
2011-06-30 11:28:10 +00:00
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
}
|
2009-08-24 19:53:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
2009-08-24 19:57:16 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Invoked when a stream is signalled as having data
|
2012-10-11 16:31:20 +00:00
|
|
|
* available to read. This reads up to one message
|
2009-08-24 19:57:16 +00:00
|
|
|
* worth of data, and then queues that for transmission
|
|
|
|
* to the client.
|
|
|
|
*
|
2016-12-01 20:50:08 +00:00
|
|
|
* Returns 0 if data was queued for TX, or an error RPC
|
2009-08-24 19:57:16 +00:00
|
|
|
* was sent, or -1 on fatal error, indicating client should
|
|
|
|
* be killed
|
|
|
|
*/
|
|
|
|
static int
|
2011-05-16 17:13:11 +00:00
|
|
|
daemonStreamHandleRead(virNetServerClientPtr client,
|
|
|
|
daemonClientStream *stream)
|
2009-08-24 19:57:16 +00:00
|
|
|
{
|
2016-04-08 14:56:28 +00:00
|
|
|
virNetMessagePtr msg = NULL;
|
|
|
|
virNetMessageError rerr;
|
2009-08-24 19:57:16 +00:00
|
|
|
char *buffer;
|
2013-09-30 16:27:51 +00:00
|
|
|
size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
|
2016-04-08 14:56:28 +00:00
|
|
|
int ret = -1;
|
|
|
|
int rv;
|
2016-04-11 09:58:19 +00:00
|
|
|
int inData = 0;
|
|
|
|
long long length = 0;
|
2009-08-24 19:57:16 +00:00
|
|
|
|
2011-08-16 23:20:58 +00:00
|
|
|
VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d",
|
|
|
|
client, stream, stream->tx, stream->closed);
|
|
|
|
|
|
|
|
/* We might have had an event pending before we shut
|
|
|
|
* down the stream, so if we're marked as closed,
|
|
|
|
* then do nothing
|
|
|
|
*/
|
|
|
|
if (stream->closed)
|
|
|
|
return 0;
|
2009-08-24 19:57:16 +00:00
|
|
|
|
|
|
|
/* Shouldn't ever be called unless we're marked able to
|
|
|
|
* transmit, but doesn't hurt to check */
|
|
|
|
if (!stream->tx)
|
|
|
|
return 0;
|
|
|
|
|
2016-04-08 14:56:28 +00:00
|
|
|
memset(&rerr, 0, sizeof(rerr));
|
|
|
|
|
2009-08-24 19:57:16 +00:00
|
|
|
if (VIR_ALLOC_N(buffer, bufferLen) < 0)
|
|
|
|
return -1;
|
|
|
|
|
2016-04-08 14:56:28 +00:00
|
|
|
if (!(msg = virNetMessageNew(false)))
|
|
|
|
goto cleanup;
|
|
|
|
|
2016-04-11 09:58:19 +00:00
|
|
|
if (stream->allowSkip && stream->dataLen == 0) {
|
|
|
|
/* Handle skip. We want to send some data to the client. But we might
|
|
|
|
* be in a hole. Seek to next data. But if we are in data already, just
|
|
|
|
* carry on. */
|
|
|
|
|
|
|
|
rv = virStreamInData(stream->st, &inData, &length);
|
|
|
|
VIR_DEBUG("rv=%d inData=%d length=%lld", rv, inData, length);
|
|
|
|
|
|
|
|
if (rv < 0) {
|
2018-03-08 12:20:38 +00:00
|
|
|
if (virNetServerProgramSendStreamError(stream->prog,
|
2016-04-11 09:58:19 +00:00
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
&rerr,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial) < 0)
|
|
|
|
goto cleanup;
|
|
|
|
msg = NULL;
|
|
|
|
|
|
|
|
/* We're done with this call */
|
|
|
|
goto done;
|
|
|
|
} else {
|
|
|
|
if (!inData && length) {
|
|
|
|
stream->tx = false;
|
|
|
|
msg->cb = daemonStreamMessageFinished;
|
|
|
|
msg->opaque = stream;
|
|
|
|
stream->refs++;
|
2018-03-08 12:20:38 +00:00
|
|
|
if (virNetServerProgramSendStreamHole(stream->prog,
|
2016-04-11 09:58:19 +00:00
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial,
|
|
|
|
length,
|
|
|
|
0) < 0)
|
|
|
|
goto cleanup;
|
|
|
|
|
|
|
|
msg = NULL;
|
|
|
|
|
|
|
|
/* We have successfully sent stream skip to the other side.
|
|
|
|
* To keep streams in sync seek locally too. */
|
|
|
|
virStreamSendHole(stream->st, length, 0);
|
|
|
|
/* We're done with this call */
|
|
|
|
goto done;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
stream->dataLen = length;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (stream->allowSkip &&
|
|
|
|
bufferLen > stream->dataLen)
|
|
|
|
bufferLen = stream->dataLen;
|
|
|
|
|
2016-04-08 14:56:28 +00:00
|
|
|
rv = virStreamRecv(stream->st, buffer, bufferLen);
|
|
|
|
if (rv == -2) {
|
2009-08-24 19:57:16 +00:00
|
|
|
/* Should never get this, since we're only called when we know
|
|
|
|
* we're readable, but hey things change... */
|
2016-04-08 14:56:28 +00:00
|
|
|
} else if (rv < 0) {
|
2018-03-08 12:20:38 +00:00
|
|
|
if (virNetServerProgramSendStreamError(stream->prog,
|
2016-04-08 14:56:28 +00:00
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
&rerr,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial) < 0)
|
|
|
|
goto cleanup;
|
|
|
|
msg = NULL;
|
2009-08-24 19:57:16 +00:00
|
|
|
} else {
|
2016-04-11 09:58:19 +00:00
|
|
|
if (stream->allowSkip)
|
|
|
|
stream->dataLen -= rv;
|
|
|
|
|
2016-04-04 10:49:48 +00:00
|
|
|
stream->tx = false;
|
2016-04-08 14:56:28 +00:00
|
|
|
if (rv == 0)
|
2016-04-04 10:46:19 +00:00
|
|
|
stream->recvEOF = true;
|
2011-05-16 17:13:11 +00:00
|
|
|
|
2016-04-08 14:56:28 +00:00
|
|
|
msg->cb = daemonStreamMessageFinished;
|
|
|
|
msg->opaque = stream;
|
|
|
|
stream->refs++;
|
2018-03-08 12:20:38 +00:00
|
|
|
if (virNetServerProgramSendStreamData(stream->prog,
|
2016-04-08 14:56:28 +00:00
|
|
|
client,
|
|
|
|
msg,
|
|
|
|
stream->procedure,
|
|
|
|
stream->serial,
|
|
|
|
buffer, rv) < 0)
|
|
|
|
goto cleanup;
|
|
|
|
msg = NULL;
|
2009-08-24 19:57:16 +00:00
|
|
|
}
|
|
|
|
|
2016-04-11 09:58:19 +00:00
|
|
|
done:
|
2016-04-08 14:56:28 +00:00
|
|
|
ret = 0;
|
|
|
|
cleanup:
|
2009-08-24 19:57:16 +00:00
|
|
|
VIR_FREE(buffer);
|
2016-04-08 14:56:28 +00:00
|
|
|
virNetMessageFree(msg);
|
2009-08-24 19:57:16 +00:00
|
|
|
return ret;
|
|
|
|
}
|