virfdstream: Use messages instead of pipe

One big downside of using the pipe to transfer the data is that
we can really transfer just bare data. No metadata can be carried
through unless some formatted messages are introduced. That would
be quite painful to achieve so let's use a message queue. It's
fairly easy to exchange info between threads now that iohelper is
no longer used.

The reason why we cannot use the FD for plain files directly is
that despite us setting noblock flag on the FD, any
read()/write() blocks regardless (which is a show stopper since
those parts of the code are run from the event loop) and poll()
reports such FD as always readable/writable - even though the
subsequent operation might block.

The pipe is still not gone though. It is used to signal the event
loop that an event occurred (e.g. data is available for reading
in the queue, or vice versa).

Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
This commit is contained in:
Michal Privoznik 2017-04-13 11:49:30 +02:00
parent ca21d75d25
commit 07c2399c01

View File

@ -49,6 +49,27 @@
VIR_LOG_INIT("fdstream"); VIR_LOG_INIT("fdstream");
typedef enum {
VIR_FDSTREAM_MSG_TYPE_DATA,
} virFDStreamMsgType;
typedef struct _virFDStreamMsg virFDStreamMsg;
typedef virFDStreamMsg *virFDStreamMsgPtr;
struct _virFDStreamMsg {
virFDStreamMsgPtr next;
virFDStreamMsgType type;
union {
struct {
char *buf;
size_t len;
size_t offset;
} data;
} stream;
};
/* Tunnelled migration stream support */ /* Tunnelled migration stream support */
typedef struct virFDStreamData virFDStreamData; typedef struct virFDStreamData virFDStreamData;
typedef virFDStreamData *virFDStreamDataPtr; typedef virFDStreamData *virFDStreamDataPtr;
@ -80,18 +101,25 @@ struct virFDStreamData {
/* Thread data */ /* Thread data */
virThreadPtr thread; virThreadPtr thread;
virCond threadCond;
int threadErr; int threadErr;
bool threadQuit; bool threadQuit;
bool threadAbort;
bool threadDoRead;
virFDStreamMsgPtr msg;
}; };
static virClassPtr virFDStreamDataClass; static virClassPtr virFDStreamDataClass;
static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
static void static void
virFDStreamDataDispose(void *obj) virFDStreamDataDispose(void *obj)
{ {
virFDStreamDataPtr fdst = obj; virFDStreamDataPtr fdst = obj;
VIR_DEBUG("obj=%p", fdst); VIR_DEBUG("obj=%p", fdst);
virFDStreamMsgQueueFree(&fdst->msg);
} }
static int virFDStreamDataOnceInit(void) static int virFDStreamDataOnceInit(void)
@ -108,6 +136,89 @@ static int virFDStreamDataOnceInit(void)
VIR_ONCE_GLOBAL_INIT(virFDStreamData) VIR_ONCE_GLOBAL_INIT(virFDStreamData)
static int
virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
virFDStreamMsgPtr msg,
int fd,
const char *fdname)
{
virFDStreamMsgPtr *tmp = &fdst->msg;
char c = '1';
while (*tmp)
tmp = &(*tmp)->next;
*tmp = msg;
virCondSignal(&fdst->threadCond);
if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
virReportSystemError(errno,
_("Unable to write to %s"),
fdname);
return -1;
}
return 0;
}
static virFDStreamMsgPtr
virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
int fd,
const char *fdname)
{
virFDStreamMsgPtr tmp = fdst->msg;
char c;
if (tmp) {
fdst->msg = tmp->next;
tmp->next = NULL;
}
virCondSignal(&fdst->threadCond);
if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
virReportSystemError(errno,
_("Unable to read from %s"),
fdname);
return NULL;
}
return tmp;
}
static void
virFDStreamMsgFree(virFDStreamMsgPtr msg)
{
if (!msg)
return;
switch (msg->type) {
case VIR_FDSTREAM_MSG_TYPE_DATA:
VIR_FREE(msg->stream.data.buf);
break;
}
VIR_FREE(msg);
}
static void
virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
{
virFDStreamMsgPtr tmp = *queue;
while (tmp) {
virFDStreamMsgPtr next = tmp->next;
virFDStreamMsgFree(tmp);
tmp = next;
}
*queue = NULL;
}
static int virFDStreamRemoveCallback(virStreamPtr stream) static int virFDStreamRemoveCallback(virStreamPtr stream)
{ {
virFDStreamDataPtr fdst = stream->privateData; virFDStreamDataPtr fdst = stream->privateData;
@ -273,6 +384,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
struct _virFDStreamThreadData { struct _virFDStreamThreadData {
virStreamPtr st; virStreamPtr st;
size_t length; size_t length;
bool doRead;
int fdin; int fdin;
char *fdinname; char *fdinname;
int fdout; int fdout;
@ -293,6 +405,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
} }
static ssize_t
virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
const int fdin,
const int fdout,
const char *fdinname,
const char *fdoutname,
size_t buflen)
{
virFDStreamMsgPtr msg = NULL;
char *buf = NULL;
ssize_t got;
if (VIR_ALLOC(msg) < 0)
goto error;
if (VIR_ALLOC_N(buf, buflen) < 0)
goto error;
if ((got = saferead(fdin, buf, buflen)) < 0) {
virReportSystemError(errno,
_("Unable to read %s"),
fdinname);
goto error;
}
msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
msg->stream.data.buf = buf;
msg->stream.data.len = got;
buf = NULL;
virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
msg = NULL;
return got;
error:
VIR_FREE(buf);
virFDStreamMsgFree(msg);
return -1;
}
static ssize_t
virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
const int fdin,
const int fdout,
const char *fdinname,
const char *fdoutname)
{
ssize_t got = 0;
virFDStreamMsgPtr msg = fdst->msg;
bool pop = false;
switch (msg->type) {
case VIR_FDSTREAM_MSG_TYPE_DATA:
got = safewrite(fdout,
msg->stream.data.buf + msg->stream.data.offset,
msg->stream.data.len - msg->stream.data.offset);
if (got < 0) {
virReportSystemError(errno,
_("Unable to write %s"),
fdoutname);
return -1;
}
msg->stream.data.offset += got;
pop = msg->stream.data.offset == msg->stream.data.len;
break;
}
if (pop) {
virFDStreamMsgQueuePop(fdst, fdin, fdinname);
virFDStreamMsgFree(msg);
}
return got;
}
static void static void
virFDStreamThread(void *opaque) virFDStreamThread(void *opaque)
{ {
@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
int fdout = data->fdout; int fdout = data->fdout;
char *fdoutname = data->fdoutname; char *fdoutname = data->fdoutname;
virFDStreamDataPtr fdst = st->privateData; virFDStreamDataPtr fdst = st->privateData;
char *buf = NULL; bool doRead = fdst->threadDoRead;
size_t buflen = 256 * 1024; size_t buflen = 256 * 1024;
size_t total = 0; size_t total = 0;
virObjectRef(fdst); virObjectRef(fdst);
virObjectLock(fdst);
if (VIR_ALLOC_N(buf, buflen) < 0)
goto error;
while (1) { while (1) {
ssize_t got; ssize_t got;
@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
if (buflen == 0) if (buflen == 0)
break; /* End of requested data from client */ break; /* End of requested data from client */
if ((got = saferead(fdin, buf, buflen)) < 0) { while (doRead == (fdst->msg != NULL) &&
virReportSystemError(errno, !fdst->threadQuit) {
_("Unable to read %s"), if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
fdinname); virReportSystemError(errno, "%s",
goto error; _("failed to wait on condition"));
goto error;
}
} }
if (fdst->threadQuit) {
/* If stream abort was requested, quit early. */
if (fdst->threadAbort)
goto cleanup;
/* Otherwise flush buffers and quit gracefully. */
if (doRead == (fdst->msg != NULL))
break;
}
if (doRead)
got = virFDStreamThreadDoRead(fdst,
fdin, fdout,
fdinname, fdoutname,
buflen);
else
got = virFDStreamThreadDoWrite(fdst,
fdin, fdout,
fdinname, fdoutname);
if (got < 0)
goto error;
if (got == 0) if (got == 0)
break; break;
total += got; total += got;
if (safewrite(fdout, buf, got) < 0) {
virReportSystemError(errno,
_("Unable to write %s"),
fdoutname);
goto error;
}
} }
cleanup: cleanup:
fdst->threadQuit = true;
virObjectUnlock(fdst);
if (!virObjectUnref(fdst)) if (!virObjectUnref(fdst))
st->privateData = NULL; st->privateData = NULL;
VIR_FORCE_CLOSE(fdin); VIR_FORCE_CLOSE(fdin);
VIR_FORCE_CLOSE(fdout); VIR_FORCE_CLOSE(fdout);
virFDStreamThreadDataFree(data); virFDStreamThreadDataFree(data);
VIR_FREE(buf);
return; return;
error: error:
virObjectLock(fdst);
fdst->threadErr = errno; fdst->threadErr = errno;
virObjectUnlock(fdst);
goto cleanup; goto cleanup;
} }
@ -368,6 +575,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst,
if (!fdst->thread) if (!fdst->thread)
return 0; return 0;
fdst->threadAbort = streamAbort;
fdst->threadQuit = true;
virCondSignal(&fdst->threadCond);
/* Give the thread a chance to lock the FD stream object. */ /* Give the thread a chance to lock the FD stream object. */
virObjectUnlock(fdst); virObjectUnlock(fdst);
virThreadJoin(fdst->thread); virThreadJoin(fdst->thread);
@ -381,6 +592,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst,
ret = 0; ret = 0;
cleanup: cleanup:
VIR_FREE(fdst->thread); VIR_FREE(fdst->thread);
virCondDestroy(&fdst->threadCond);
return ret; return ret;
} }
@ -427,11 +639,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
fdst->abortCallbackDispatching = false; fdst->abortCallbackDispatching = false;
} }
/* mutex locked */
ret = VIR_CLOSE(fdst->fd);
if (virFDStreamJoinWorker(fdst, streamAbort) < 0) if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
ret = -1; ret = -1;
/* mutex locked */
if ((ret = VIR_CLOSE(fdst->fd)) < 0)
virReportSystemError(errno, "%s",
_("Unable to close"));
st->privateData = NULL; st->privateData = NULL;
/* call the internal stream closing callback */ /* call the internal stream closing callback */
@ -468,7 +683,8 @@ virFDStreamAbort(virStreamPtr st)
static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
{ {
virFDStreamDataPtr fdst = st->privateData; virFDStreamDataPtr fdst = st->privateData;
int ret; virFDStreamMsgPtr msg = NULL;
int ret = -1;
if (nbytes > INT_MAX) { if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s", virReportSystemError(ERANGE, "%s",
@ -496,25 +712,51 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
nbytes = fdst->length - fdst->offset; nbytes = fdst->length - fdst->offset;
} }
retry: if (fdst->thread) {
ret = write(fdst->fd, bytes, nbytes); char *buf;
if (ret < 0) {
VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR if (fdst->threadQuit) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { virReportSystemError(EBADF, "%s",
VIR_WARNINGS_RESET
ret = -2;
} else if (errno == EINTR) {
goto retry;
} else {
ret = -1;
virReportSystemError(errno, "%s",
_("cannot write to stream")); _("cannot write to stream"));
return -1;
}
if (VIR_ALLOC(msg) < 0 ||
VIR_ALLOC_N(buf, nbytes) < 0)
goto cleanup;
memcpy(buf, bytes, nbytes);
msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
msg->stream.data.buf = buf;
msg->stream.data.len = nbytes;
virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
msg = NULL;
ret = nbytes;
} else {
retry:
ret = write(fdst->fd, bytes, nbytes);
if (ret < 0) {
VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
if (errno == EAGAIN || errno == EWOULDBLOCK) {
VIR_WARNINGS_RESET
ret = -2;
} else if (errno == EINTR) {
goto retry;
} else {
ret = -1;
virReportSystemError(errno, "%s",
_("cannot write to stream"));
}
} }
} else if (fdst->length) {
fdst->offset += ret;
} }
if (fdst->length)
fdst->offset += ret;
cleanup:
virObjectUnlock(fdst); virObjectUnlock(fdst);
virFDStreamMsgFree(msg);
return ret; return ret;
} }
@ -522,7 +764,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
{ {
virFDStreamDataPtr fdst = st->privateData; virFDStreamDataPtr fdst = st->privateData;
int ret; int ret = -1;
if (nbytes > INT_MAX) { if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s", virReportSystemError(ERANGE, "%s",
@ -548,24 +790,70 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
nbytes = fdst->length - fdst->offset; nbytes = fdst->length - fdst->offset;
} }
retry: if (fdst->thread) {
ret = read(fdst->fd, bytes, nbytes); virFDStreamMsgPtr msg = NULL;
if (ret < 0) {
VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR while (!(msg = fdst->msg)) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (fdst->threadQuit) {
VIR_WARNINGS_RESET if (nbytes) {
ret = -2; virReportSystemError(EBADF, "%s",
} else if (errno == EINTR) { _("stream is not open"));
goto retry; } else {
} else { ret = 0;
ret = -1; }
virReportSystemError(errno, "%s", goto cleanup;
_("cannot read from stream")); } else {
virObjectUnlock(fdst);
virCondSignal(&fdst->threadCond);
virObjectLock(fdst);
}
}
if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
/* Nope, nope, I'm outta here */
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("unexpected message type"));
goto cleanup;
}
if (nbytes > msg->stream.data.len - msg->stream.data.offset)
nbytes = msg->stream.data.len - msg->stream.data.offset;
memcpy(bytes,
msg->stream.data.buf + msg->stream.data.offset,
nbytes);
msg->stream.data.offset += nbytes;
if (msg->stream.data.offset == msg->stream.data.len) {
virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
virFDStreamMsgFree(msg);
}
ret = nbytes;
} else {
retry:
ret = read(fdst->fd, bytes, nbytes);
if (ret < 0) {
VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
if (errno == EAGAIN || errno == EWOULDBLOCK) {
VIR_WARNINGS_RESET
ret = -2;
} else if (errno == EINTR) {
goto retry;
} else {
ret = -1;
virReportSystemError(errno, "%s",
_("cannot read from stream"));
}
goto cleanup;
} }
} else if (fdst->length) {
fdst->offset += ret;
} }
if (fdst->length)
fdst->offset += ret;
cleanup:
virObjectUnlock(fdst); virObjectUnlock(fdst);
return ret; return ret;
} }
@ -610,11 +898,19 @@ static int virFDStreamOpenInternal(virStreamPtr st,
st->privateData = fdst; st->privateData = fdst;
if (threadData) { if (threadData) {
fdst->threadDoRead = threadData->doRead;
/* Create the thread after fdst and st were initialized. /* Create the thread after fdst and st were initialized.
* The thread worker expects them to be that way. */ * The thread worker expects them to be that way. */
if (VIR_ALLOC(fdst->thread) < 0) if (VIR_ALLOC(fdst->thread) < 0)
goto error; goto error;
if (virCondInit(&fdst->threadCond) < 0) {
virReportSystemError(errno, "%s",
_("cannot initialize condition variable"));
goto error;
}
if (virThreadCreate(fdst->thread, if (virThreadCreate(fdst->thread,
true, true,
virFDStreamThread, virFDStreamThread,
@ -783,6 +1079,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
VIR_STRDUP(threadData->fdoutname, "pipe") < 0) VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
goto error; goto error;
tmpfd = pipefds[0]; tmpfd = pipefds[0];
threadData->doRead = true;
} else { } else {
threadData->fdin = pipefds[0]; threadData->fdin = pipefds[0];
threadData->fdout = fd; threadData->fdout = fd;
@ -790,6 +1087,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
VIR_STRDUP(threadData->fdoutname, path) < 0) VIR_STRDUP(threadData->fdoutname, path) < 0)
goto error; goto error;
tmpfd = pipefds[1]; tmpfd = pipefds[1];
threadData->doRead = false;
} }
} }