/*
* virfdstream.c: generic streams impl for file descriptors
*
* Copyright (C) 2009-2012, 2014 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library. If not, see
* .
*
*/
#include
#include
#include
#include
#include
#ifndef WIN32
# include
#endif
#include "virfdstream.h"
#include "virerror.h"
#include "datatypes.h"
#include "virlog.h"
#include "viralloc.h"
#include "virutil.h"
#include "virfile.h"
#include "configmake.h"
#include "virstring.h"
#include "virtime.h"
#include "virprocess.h"
#include "virsocket.h"
#define VIR_FROM_THIS VIR_FROM_STREAMS
VIR_LOG_INIT("fdstream");
#ifndef WIN32
typedef enum {
VIR_FDSTREAM_MSG_TYPE_DATA,
VIR_FDSTREAM_MSG_TYPE_HOLE,
} virFDStreamMsgType;
typedef struct _virFDStreamMsg virFDStreamMsg;
typedef virFDStreamMsg *virFDStreamMsgPtr;
struct _virFDStreamMsg {
virFDStreamMsgPtr next;
virFDStreamMsgType type;
union {
struct {
char *buf;
size_t len;
size_t offset;
} data;
struct {
long long len;
} hole;
} stream;
};
/* Tunnelled migration stream support */
typedef struct virFDStreamData virFDStreamData;
typedef virFDStreamData *virFDStreamDataPtr;
struct virFDStreamData {
virObjectLockable parent;
int fd;
unsigned long long offset;
unsigned long long length;
int watch;
int events; /* events the stream callback is subscribed for */
bool cbRemoved;
bool dispatching;
bool closed;
virStreamEventCallback cb;
void *opaque;
virFreeCallback ff;
/* don't call the abort callback more than once */
bool abortCallbackCalled;
bool abortCallbackDispatching;
/* internal callback, as the regular one (from generic streams) gets
* eaten up by the server stream driver */
virFDStreamInternalCloseCb icbCb;
virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
void *icbOpaque;
/* Thread data */
virThreadPtr thread;
virCond threadCond;
virErrorPtr threadErr;
bool threadQuit;
bool threadAbort;
bool threadDoRead;
virFDStreamMsgPtr msg;
};
static virClassPtr virFDStreamDataClass;
static __thread bool virFDStreamDataDisposed;
static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
static void
virFDStreamDataDispose(void *obj)
{
virFDStreamDataPtr fdst = obj;
VIR_DEBUG("obj=%p", fdst);
virFDStreamDataDisposed = true;
virFreeError(fdst->threadErr);
virFDStreamMsgQueueFree(&fdst->msg);
}
static int virFDStreamDataOnceInit(void)
{
if (!VIR_CLASS_NEW(virFDStreamData, virClassForObjectLockable()))
return -1;
return 0;
}
VIR_ONCE_GLOBAL_INIT(virFDStreamData);
static int
virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
virFDStreamMsgPtr *msg,
int fd,
const char *fdname)
{
virFDStreamMsgPtr *tmp = &fdst->msg;
char c = '1';
while (*tmp)
tmp = &(*tmp)->next;
*tmp = g_steal_pointer(msg);
virCondSignal(&fdst->threadCond);
if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
virReportSystemError(errno,
_("Unable to write to %s"),
fdname);
return -1;
}
return 0;
}
static virFDStreamMsgPtr
virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
int fd,
const char *fdname)
{
virFDStreamMsgPtr tmp = fdst->msg;
char c;
if (tmp) {
fdst->msg = tmp->next;
tmp->next = NULL;
}
virCondSignal(&fdst->threadCond);
if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
virReportSystemError(errno,
_("Unable to read from %s"),
fdname);
return NULL;
}
return tmp;
}
static void
virFDStreamMsgFree(virFDStreamMsgPtr msg)
{
if (!msg)
return;
switch (msg->type) {
case VIR_FDSTREAM_MSG_TYPE_DATA:
VIR_FREE(msg->stream.data.buf);
break;
case VIR_FDSTREAM_MSG_TYPE_HOLE:
/* nada */
break;
}
VIR_FREE(msg);
}
G_DEFINE_AUTOPTR_CLEANUP_FUNC(virFDStreamMsg, virFDStreamMsgFree);
static void
virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
{
virFDStreamMsgPtr tmp = *queue;
while (tmp) {
virFDStreamMsgPtr next = tmp->next;
virFDStreamMsgFree(tmp);
tmp = next;
}
*queue = NULL;
}
static int virFDStreamRemoveCallback(virStreamPtr stream)
{
virFDStreamDataPtr fdst = stream->privateData;
int ret = -1;
if (!fdst) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream is not open"));
return -1;
}
virObjectLock(fdst);
if (fdst->watch == 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream does not have a callback registered"));
goto cleanup;
}
virEventRemoveHandle(fdst->watch);
if (fdst->dispatching)
fdst->cbRemoved = true;
else if (fdst->ff)
(fdst->ff)(fdst->opaque);
fdst->watch = 0;
fdst->ff = NULL;
fdst->cb = NULL;
fdst->events = 0;
fdst->opaque = NULL;
ret = 0;
cleanup:
virObjectUnlock(fdst);
return ret;
}
static int virFDStreamUpdateCallback(virStreamPtr stream, int events)
{
virFDStreamDataPtr fdst = stream->privateData;
int ret = -1;
if (!fdst) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream is not open"));
return -1;
}
virObjectLock(fdst);
if (fdst->watch == 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream does not have a callback registered"));
goto cleanup;
}
virEventUpdateHandle(fdst->watch, events);
fdst->events = events;
ret = 0;
cleanup:
virObjectUnlock(fdst);
return ret;
}
static void virFDStreamEvent(int watch G_GNUC_UNUSED,
int fd G_GNUC_UNUSED,
int events,
void *opaque)
{
virStreamPtr stream = opaque;
virFDStreamDataPtr fdst = stream->privateData;
virStreamEventCallback cb;
void *cbopaque;
virFreeCallback ff;
bool closed;
if (!fdst)
return;
virObjectLock(fdst);
if (!fdst->cb) {
virObjectUnlock(fdst);
return;
}
if (fdst->threadErr) {
events |= VIR_STREAM_EVENT_ERROR;
virSetError(fdst->threadErr);
}
cb = fdst->cb;
cbopaque = fdst->opaque;
ff = fdst->ff;
fdst->dispatching = true;
virObjectUnlock(fdst);
cb(stream, events, cbopaque);
virObjectLock(fdst);
fdst->dispatching = false;
if (fdst->cbRemoved && ff)
(ff)(cbopaque);
closed = fdst->closed;
virObjectUnlock(fdst);
if (closed)
virObjectUnref(fdst);
}
static void virFDStreamCallbackFree(void *opaque)
{
virObjectUnref(opaque);
}
static int
virFDStreamAddCallback(virStreamPtr st,
int events,
virStreamEventCallback cb,
void *opaque,
virFreeCallback ff)
{
virFDStreamDataPtr fdst = st->privateData;
int ret = -1;
if (!fdst) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream is not open"));
return -1;
}
virObjectLock(fdst);
if (fdst->watch != 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream already has a callback registered"));
goto cleanup;
}
if ((fdst->watch = virEventAddHandle(fdst->fd,
events,
virFDStreamEvent,
st,
virFDStreamCallbackFree)) < 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("cannot register file watch on stream"));
goto cleanup;
}
fdst->cbRemoved = false;
fdst->cb = cb;
fdst->opaque = opaque;
fdst->ff = ff;
fdst->events = events;
fdst->abortCallbackCalled = false;
virStreamRef(st);
ret = 0;
cleanup:
virObjectUnlock(fdst);
return ret;
}
typedef struct _virFDStreamThreadData virFDStreamThreadData;
typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
struct _virFDStreamThreadData {
virStreamPtr st;
size_t length;
bool doRead;
bool sparse;
bool isBlock;
int fdin;
char *fdinname;
int fdout;
char *fdoutname;
};
static void
virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
{
if (!data)
return;
virObjectUnref(data->st);
VIR_FREE(data->fdinname);
VIR_FREE(data->fdoutname);
VIR_FREE(data);
}
static ssize_t
virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
bool sparse,
bool isBlock,
const int fdin,
const int fdout,
const char *fdinname,
const char *fdoutname,
size_t length,
size_t total,
size_t *dataLen,
size_t buflen)
{
g_autoptr(virFDStreamMsg) msg = NULL;
int inData = 0;
long long sectionLen = 0;
g_autofree char *buf = NULL;
ssize_t got;
if (sparse && *dataLen == 0) {
if (isBlock) {
/* Block devices are always in data section by definition. The
* @sectionLen is slightly more tricky. While we could try and get
* how much bytes is there left until EOF, we can pretend there is
* always X bytes left and let the saferead() below hit EOF (which
* is then handled gracefully anyway). Worst case scenario, this
* branch is called more than once.
* X was chosen to be 1MiB but it has ho special meaning. */
inData = 1;
sectionLen = 1 * 1024 * 1024;
} else {
if (virFileInData(fdin, &inData, §ionLen) < 0)
return -1;
}
if (length &&
sectionLen > length - total)
sectionLen = length - total;
if (inData)
*dataLen = sectionLen;
}
if (length &&
buflen > length - total)
buflen = length - total;
msg = g_new0(virFDStreamMsg, 1);
if (sparse && *dataLen == 0) {
msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
msg->stream.hole.len = sectionLen;
got = sectionLen;
/* HACK: The message queue is one directional. So caller
* cannot make us skip the hole. Do that for them instead. */
if (sectionLen &&
lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
virReportSystemError(errno,
_("unable to seek in %s"),
fdinname);
return -1;
}
} else {
if (sparse &&
buflen > *dataLen)
buflen = *dataLen;
buf = g_new0(char, buflen);
if ((got = saferead(fdin, buf, buflen)) < 0) {
virReportSystemError(errno,
_("Unable to read %s"),
fdinname);
return -1;
}
msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
msg->stream.data.buf = g_steal_pointer(&buf);
msg->stream.data.len = got;
if (sparse)
*dataLen -= got;
}
virFDStreamMsgQueuePush(fdst, &msg, fdout, fdoutname);
return got;
}
static ssize_t
virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
bool sparse,
bool isBlock,
const int fdin,
const int fdout,
const char *fdinname,
const char *fdoutname)
{
ssize_t got = 0;
virFDStreamMsgPtr msg = fdst->msg;
bool pop = false;
switch (msg->type) {
case VIR_FDSTREAM_MSG_TYPE_DATA:
got = safewrite(fdout,
msg->stream.data.buf + msg->stream.data.offset,
msg->stream.data.len - msg->stream.data.offset);
if (got < 0) {
virReportSystemError(errno,
_("Unable to write %s"),
fdoutname);
return -1;
}
msg->stream.data.offset += got;
pop = msg->stream.data.offset == msg->stream.data.len;
break;
case VIR_FDSTREAM_MSG_TYPE_HOLE:
if (!sparse) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("unexpected stream hole"));
return -1;
}
got = msg->stream.hole.len;
if (isBlock) {
g_autofree char * buf = NULL;
const size_t buflen = 1 * 1024 * 1024; /* 1MiB */
size_t toWrite = got;
/* While for files it's enough to lseek() and ftruncate() to create
* a hole which would emulate zeroes on read(), for block devices
* we have to write zeroes to read() zeroes. And we have to write
* @got bytes of zeroes. Do that in smaller chunks though.*/
buf = g_new0(char, buflen);
while (toWrite) {
size_t count = MIN(toWrite, buflen);
ssize_t r;
if ((r = safewrite(fdout, buf, count)) < 0) {
virReportSystemError(errno,
_("Unable to write %s"),
fdoutname);
return -1;
}
toWrite -= r;
}
} else {
off_t off;
off = lseek(fdout, got, SEEK_CUR);
if (off == (off_t) -1) {
virReportSystemError(errno,
_("unable to seek in %s"),
fdoutname);
return -1;
}
if (ftruncate(fdout, off) < 0) {
virReportSystemError(errno,
_("unable to truncate %s"),
fdoutname);
return -1;
}
}
pop = true;
break;
}
if (pop) {
virFDStreamMsgQueuePop(fdst, fdin, fdinname);
virFDStreamMsgFree(msg);
}
return got;
}
static void
virFDStreamThread(void *opaque)
{
virFDStreamThreadDataPtr data = opaque;
virStreamPtr st = data->st;
size_t length = data->length;
bool sparse = data->sparse;
bool isBlock = data->isBlock;
VIR_AUTOCLOSE fdin = data->fdin;
char *fdinname = data->fdinname;
VIR_AUTOCLOSE fdout = data->fdout;
char *fdoutname = data->fdoutname;
virFDStreamDataPtr fdst = st->privateData;
bool doRead = fdst->threadDoRead;
size_t buflen = 256 * 1024;
size_t total = 0;
size_t dataLen = 0;
virObjectRef(fdst);
virObjectLock(fdst);
while (1) {
ssize_t got;
while (doRead == (fdst->msg != NULL) &&
!fdst->threadQuit) {
if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
virReportSystemError(errno, "%s",
_("failed to wait on condition"));
goto error;
}
}
if (fdst->threadQuit) {
/* If stream abort was requested, quit early. */
if (fdst->threadAbort)
goto cleanup;
/* Otherwise flush buffers and quit gracefully. */
if (doRead == (fdst->msg != NULL))
break;
}
if (doRead)
got = virFDStreamThreadDoRead(fdst, sparse, isBlock,
fdin, fdout,
fdinname, fdoutname,
length, total,
&dataLen, buflen);
else
got = virFDStreamThreadDoWrite(fdst, sparse, isBlock,
fdin, fdout,
fdinname, fdoutname);
if (got < 0)
goto error;
if (got == 0)
break;
total += got;
}
cleanup:
fdst->threadQuit = true;
virObjectUnlock(fdst);
virFDStreamDataDisposed = false;
virObjectUnref(fdst);
if (virFDStreamDataDisposed)
st->privateData = NULL;
virFDStreamThreadDataFree(data);
return;
error:
fdst->threadErr = virSaveLastError();
goto cleanup;
}
static int
virFDStreamJoinWorker(virFDStreamDataPtr fdst,
bool streamAbort)
{
int ret = -1;
if (!fdst->thread)
return 0;
fdst->threadAbort = streamAbort;
fdst->threadQuit = true;
virCondSignal(&fdst->threadCond);
/* Give the thread a chance to lock the FD stream object. */
virObjectUnlock(fdst);
virThreadJoin(fdst->thread);
virObjectLock(fdst);
if (fdst->threadErr && !streamAbort) {
/* errors are expected on streamAbort */
goto cleanup;
}
ret = 0;
cleanup:
VIR_FREE(fdst->thread);
virCondDestroy(&fdst->threadCond);
return ret;
}
static int
virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
{
virFDStreamDataPtr fdst;
virStreamEventCallback cb;
void *opaque;
int ret;
VIR_DEBUG("st=%p", st);
if (!st || !(fdst = st->privateData) || fdst->abortCallbackDispatching)
return 0;
virObjectLock(fdst);
/* aborting the stream, ensure the callback is called if it's
* registered for stream error event */
if (streamAbort &&
fdst->cb &&
(fdst->events & (VIR_STREAM_EVENT_READABLE |
VIR_STREAM_EVENT_WRITABLE))) {
/* don't enter this function accidentally from the callback again */
if (fdst->abortCallbackCalled) {
virObjectUnlock(fdst);
return 0;
}
fdst->abortCallbackCalled = true;
fdst->abortCallbackDispatching = true;
/* cache the pointers */
cb = fdst->cb;
opaque = fdst->opaque;
virObjectUnlock(fdst);
/* call failure callback, poll reports nothing on closed fd */
(cb)(st, VIR_STREAM_EVENT_ERROR, opaque);
virObjectLock(fdst);
fdst->abortCallbackDispatching = false;
}
if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
ret = -1;
/* mutex locked */
if ((ret = VIR_CLOSE(fdst->fd)) < 0)
virReportSystemError(errno, "%s",
_("Unable to close"));
st->privateData = NULL;
/* call the internal stream closing callback */
if (fdst->icbCb) {
/* the mutex is not accessible anymore, as private data is null */
(fdst->icbCb)(st, fdst->icbOpaque);
if (fdst->icbFreeOpaque)
(fdst->icbFreeOpaque)(fdst->icbOpaque);
}
if (fdst->dispatching) {
fdst->closed = true;
virObjectUnlock(fdst);
} else {
virObjectUnlock(fdst);
virObjectUnref(fdst);
}
return ret;
}
static int
virFDStreamClose(virStreamPtr st)
{
return virFDStreamCloseInt(st, false);
}
static int
virFDStreamAbort(virStreamPtr st)
{
return virFDStreamCloseInt(st, true);
}
static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
{
virFDStreamDataPtr fdst = st->privateData;
g_autoptr(virFDStreamMsg) msg = NULL;
int ret = -1;
if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s",
_("Too many bytes to write to stream"));
return -1;
}
if (!fdst) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream is not open"));
return -1;
}
virObjectLock(fdst);
if (fdst->length) {
if (fdst->length == fdst->offset) {
virReportSystemError(ENOSPC, "%s",
_("cannot write to stream"));
virObjectUnlock(fdst);
return -1;
}
if ((fdst->length - fdst->offset) < nbytes)
nbytes = fdst->length - fdst->offset;
}
if (fdst->thread) {
char *buf;
if (fdst->threadQuit || fdst->threadErr) {
/* virStreamSend will virResetLastError possibly set
* by virFDStreamEvent */
if (fdst->threadErr && !virGetLastError())
virSetError(fdst->threadErr);
else
virReportSystemError(EBADF, "%s", _("cannot write to stream"));
goto cleanup;
}
msg = g_new0(virFDStreamMsg, 1);
buf = g_new0(char, nbytes);
memcpy(buf, bytes, nbytes);
msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
msg->stream.data.buf = buf;
msg->stream.data.len = nbytes;
virFDStreamMsgQueuePush(fdst, &msg, fdst->fd, "pipe");
ret = nbytes;
} else {
retry:
ret = write(fdst->fd, bytes, nbytes);
if (ret < 0) {
VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
if (errno == EAGAIN || errno == EWOULDBLOCK) {
VIR_WARNINGS_RESET
ret = -2;
} else if (errno == EINTR) {
goto retry;
} else {
ret = -1;
virReportSystemError(errno, "%s",
_("cannot write to stream"));
}
}
}
if (fdst->length)
fdst->offset += ret;
cleanup:
virObjectUnlock(fdst);
return ret;
}
static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
{
virFDStreamDataPtr fdst = st->privateData;
int ret = -1;
if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s",
_("Too many bytes to read from stream"));
return -1;
}
if (!fdst) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream is not open"));
return -1;
}
virObjectLock(fdst);
if (fdst->length) {
if (fdst->length == fdst->offset) {
virObjectUnlock(fdst);
return 0;
}
if ((fdst->length - fdst->offset) < nbytes)
nbytes = fdst->length - fdst->offset;
}
if (fdst->thread) {
virFDStreamMsgPtr msg = NULL;
while (!(msg = fdst->msg)) {
if (fdst->threadQuit || fdst->threadErr) {
if (nbytes) {
/* virStreamRecv will virResetLastError possibly set
* by virFDStreamEvent */
if (fdst->threadErr && !virGetLastError())
virSetError(fdst->threadErr);
else
virReportSystemError(EBADF, "%s",
_("stream is not open"));
} else {
ret = 0;
}
goto cleanup;
} else {
virObjectUnlock(fdst);
virCondSignal(&fdst->threadCond);
virObjectLock(fdst);
}
}
/* Shortcut, if the stream is in the trailing hole,
* return 0 immediately. */
if (msg->type == VIR_FDSTREAM_MSG_TYPE_HOLE &&
msg->stream.hole.len == 0) {
ret = 0;
goto cleanup;
}
if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
/* Nope, nope, I'm outta here */
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("unexpected message type"));
goto cleanup;
}
if (nbytes > msg->stream.data.len - msg->stream.data.offset)
nbytes = msg->stream.data.len - msg->stream.data.offset;
memcpy(bytes,
msg->stream.data.buf + msg->stream.data.offset,
nbytes);
msg->stream.data.offset += nbytes;
if (msg->stream.data.offset == msg->stream.data.len) {
virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
virFDStreamMsgFree(msg);
}
ret = nbytes;
} else {
retry:
ret = read(fdst->fd, bytes, nbytes);
if (ret < 0) {
VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
if (errno == EAGAIN || errno == EWOULDBLOCK) {
VIR_WARNINGS_RESET
ret = -2;
} else if (errno == EINTR) {
goto retry;
} else {
ret = -1;
virReportSystemError(errno, "%s",
_("cannot read from stream"));
}
goto cleanup;
}
}
if (fdst->length)
fdst->offset += ret;
cleanup:
virObjectUnlock(fdst);
return ret;
}
static int
virFDStreamSendHole(virStreamPtr st,
long long length,
unsigned int flags)
{
virFDStreamDataPtr fdst = st->privateData;
g_autoptr(virFDStreamMsg) msg = NULL;
off_t off;
int ret = -1;
virCheckFlags(0, -1);
virCheckPositiveArgReturn(length, -1);
virObjectLock(fdst);
if (fdst->length) {
if (length > fdst->length - fdst->offset)
length = fdst->length - fdst->offset;
fdst->offset += length;
}
if (fdst->thread) {
/* Things are a bit complicated here. If FDStream is in a
* read mode, then if the message at the queue head is
* HOLE, just pop it. The thread has lseek()-ed anyway.
* However, if the FDStream is in write mode, then tell
* the thread to do the lseek() for us. Under no
* circumstances we can do the lseek() ourselves here. We
* might mess up file position for the thread. */
if (fdst->threadQuit || fdst->threadErr) {
/* virStreamSendHole will virResetLastError possibly set
* by virFDStreamEvent */
if (fdst->threadErr && !virGetLastError())
virSetError(fdst->threadErr);
else
virReportSystemError(EBADF, "%s", _("stream is not open"));
goto cleanup;
}
if (fdst->threadDoRead) {
msg = fdst->msg;
if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Invalid stream hole"));
goto cleanup;
}
virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
} else {
msg = g_new0(virFDStreamMsg, 1);
msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
msg->stream.hole.len = length;
virFDStreamMsgQueuePush(fdst, &msg, fdst->fd, "pipe");
}
} else {
off = lseek(fdst->fd, length, SEEK_CUR);
if (off == (off_t) -1) {
virReportSystemError(errno, "%s",
_("unable to seek"));
goto cleanup;
}
if (ftruncate(fdst->fd, off) < 0) {
virReportSystemError(errno, "%s",
_("unable to truncate"));
goto cleanup;
}
}
ret = 0;
cleanup:
virObjectUnlock(fdst);
return ret;
}
static int
virFDStreamInData(virStreamPtr st,
int *inData,
long long *length)
{
virFDStreamDataPtr fdst = st->privateData;
int ret = -1;
virObjectLock(fdst);
if (fdst->thread) {
virFDStreamMsgPtr msg;
if (fdst->threadErr)
goto cleanup;
while (!(msg = fdst->msg)) {
if (fdst->threadQuit) {
*inData = *length = 0;
ret = 0;
goto cleanup;
} else {
virObjectUnlock(fdst);
virCondSignal(&fdst->threadCond);
virObjectLock(fdst);
}
}
if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) {
*inData = 1;
*length = msg->stream.data.len - msg->stream.data.offset;
} else {
*inData = 0;
*length = msg->stream.hole.len;
}
ret = 0;
} else {
ret = virFileInData(fdst->fd, inData, length);
}
cleanup:
virObjectUnlock(fdst);
return ret;
}
static virStreamDriver virFDStreamDrv = {
.streamSend = virFDStreamWrite,
.streamRecv = virFDStreamRead,
.streamFinish = virFDStreamClose,
.streamAbort = virFDStreamAbort,
.streamSendHole = virFDStreamSendHole,
.streamInData = virFDStreamInData,
.streamEventAddCallback = virFDStreamAddCallback,
.streamEventUpdateCallback = virFDStreamUpdateCallback,
.streamEventRemoveCallback = virFDStreamRemoveCallback
};
static int virFDStreamOpenInternal(virStreamPtr st,
int fd,
virFDStreamThreadDataPtr threadData,
unsigned long long length)
{
virFDStreamDataPtr fdst;
VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu",
st, fd, threadData, length);
if (virFDStreamDataInitialize() < 0)
return -1;
if ((st->flags & VIR_STREAM_NONBLOCK) &&
virSetNonBlock(fd) < 0) {
virReportSystemError(errno, "%s", _("Unable to set non-blocking mode"));
return -1;
}
if (!(fdst = virObjectLockableNew(virFDStreamDataClass)))
return -1;
fdst->fd = fd;
fdst->length = length;
st->driver = &virFDStreamDrv;
st->privateData = fdst;
if (threadData) {
fdst->threadDoRead = threadData->doRead;
/* Create the thread after fdst and st were initialized.
* The thread worker expects them to be that way. */
fdst->thread = g_new0(virThread, 1);
if (virCondInit(&fdst->threadCond) < 0) {
virReportSystemError(errno, "%s",
_("cannot initialize condition variable"));
goto error;
}
if (virThreadCreateFull(fdst->thread,
true,
virFDStreamThread,
"fd-stream",
false,
threadData) < 0)
goto error;
}
return 0;
error:
VIR_FREE(fdst->thread);
st->driver = NULL;
st->privateData = NULL;
virObjectUnref(fdst);
return -1;
}
int virFDStreamOpen(virStreamPtr st,
int fd)
{
return virFDStreamOpenInternal(st, fd, NULL, 0);
}
int virFDStreamConnectUNIX(virStreamPtr st,
const char *path,
bool abstract)
{
struct sockaddr_un sa;
virTimeBackOffVar timeout;
VIR_AUTOCLOSE fd = -1;
int ret;
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
virReportSystemError(errno, "%s", _("Unable to open UNIX socket"));
return -1;
}
memset(&sa, 0, sizeof(sa));
sa.sun_family = AF_UNIX;
if (abstract) {
if (virStrcpy(sa.sun_path+1, path, sizeof(sa.sun_path)-1) < 0)
return -1;
sa.sun_path[0] = '\0';
} else {
if (virStrcpyStatic(sa.sun_path, path) < 0)
return -1;
}
if (virTimeBackOffStart(&timeout, 1, 3*1000 /* ms */) < 0)
return -1;
while (virTimeBackOffWait(&timeout)) {
ret = connect(fd, (struct sockaddr *)&sa, sizeof(sa));
if (ret == 0)
break;
if (errno == ENOENT || errno == ECONNREFUSED) {
/* ENOENT : Socket may not have shown up yet
* ECONNREFUSED : Leftover socket hasn't been removed yet */
continue;
}
return -1;
}
if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0)
return -1;
fd = -1;
return 0;
}
static int
virFDStreamOpenFileInternal(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int oflags,
int mode,
bool forceIOHelper,
bool sparse)
{
int fd = -1;
int pipefds[2] = { -1, -1 };
int tmpfd = -1;
struct stat sb;
virFDStreamThreadDataPtr threadData = NULL;
VIR_DEBUG("st=%p path=%s oflags=0x%x offset=%llu length=%llu mode=0%o",
st, path, oflags, offset, length, mode);
oflags |= O_NOCTTY;
if (oflags & O_CREAT)
fd = open(path, oflags, mode);
else
fd = open(path, oflags);
if (fd < 0) {
virReportSystemError(errno,
_("Unable to open stream for '%s'"),
path);
return -1;
}
tmpfd = fd;
if (fstat(fd, &sb) < 0) {
virReportSystemError(errno,
_("Unable to access stream for '%s'"),
path);
goto error;
}
if (offset &&
lseek(fd, offset, SEEK_SET) < 0) {
virReportSystemError(errno,
_("Unable to seek %s to %llu"),
path, offset);
goto error;
}
/* Thanks to the POSIX i/o model, we can't reliably get
* non-blocking I/O on block devs/regular files. To
* support those we need to create a helper thread to do
* the I/O so we just have a fifo. Or use AIO :-(
*/
if ((st->flags & VIR_STREAM_NONBLOCK) &&
((!S_ISCHR(sb.st_mode) &&
!S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
if ((oflags & O_ACCMODE) == O_RDWR) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("%s: Cannot request read and write flags together"),
path);
goto error;
}
if (virPipe(pipefds) < 0)
goto error;
threadData = g_new0(virFDStreamThreadData, 1);
threadData->st = virObjectRef(st);
threadData->length = length;
threadData->sparse = sparse;
threadData->isBlock = !!S_ISBLK(sb.st_mode);
if ((oflags & O_ACCMODE) == O_RDONLY) {
threadData->fdin = fd;
threadData->fdout = pipefds[1];
threadData->fdinname = g_strdup(path);
threadData->fdoutname = g_strdup("pipe");
tmpfd = pipefds[0];
threadData->doRead = true;
} else {
threadData->fdin = pipefds[0];
threadData->fdout = fd;
threadData->fdinname = g_strdup("pipe");
threadData->fdoutname = g_strdup(path);
tmpfd = pipefds[1];
threadData->doRead = false;
}
}
if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0)
goto error;
return 0;
error:
VIR_FORCE_CLOSE(fd);
VIR_FORCE_CLOSE(pipefds[0]);
VIR_FORCE_CLOSE(pipefds[1]);
if (oflags & O_CREAT)
unlink(path);
virFDStreamThreadDataFree(threadData);
return -1;
}
int virFDStreamOpenFile(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int oflags)
{
if (oflags & O_CREAT) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Attempt to create %s without specifying mode"),
path);
return -1;
}
return virFDStreamOpenFileInternal(st, path,
offset, length,
oflags, 0, false, false);
}
int virFDStreamCreateFile(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int oflags,
mode_t mode)
{
return virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, mode,
false, false);
}
int virFDStreamOpenPTY(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int oflags)
{
virFDStreamDataPtr fdst = NULL;
struct termios rawattr;
if (virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, 0,
false, false) < 0)
return -1;
fdst = st->privateData;
if (tcgetattr(fdst->fd, &rawattr) < 0) {
virReportSystemError(errno,
_("unable to get tty attributes: %s"),
path);
goto cleanup;
}
cfmakeraw(&rawattr);
if (tcsetattr(fdst->fd, TCSANOW, &rawattr) < 0) {
virReportSystemError(errno,
_("unable to set tty attributes: %s"),
path);
goto cleanup;
}
return 0;
cleanup:
virFDStreamClose(st);
return -1;
}
int virFDStreamOpenBlockDevice(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
bool sparse,
int oflags)
{
return virFDStreamOpenFileInternal(st, path,
offset, length,
oflags, 0, true, sparse);
}
int virFDStreamSetInternalCloseCb(virStreamPtr st,
virFDStreamInternalCloseCb cb,
void *opaque,
virFDStreamInternalCloseCbFreeOpaque fcb)
{
virFDStreamDataPtr fdst = st->privateData;
virObjectLock(fdst);
if (fdst->icbFreeOpaque)
(fdst->icbFreeOpaque)(fdst->icbOpaque);
fdst->icbCb = cb;
fdst->icbOpaque = opaque;
fdst->icbFreeOpaque = fcb;
virObjectUnlock(fdst);
return 0;
}
#else /* WIN32 */
int
virFDStreamOpen(virStreamPtr st G_GNUC_UNUSED,
int fd G_GNUC_UNUSED)
{
virReportSystemError(ENOSYS, "%s",
_("File streams are not supported on this platform"));
return -1;
}
int
virFDStreamConnectUNIX(virStreamPtr st G_GNUC_UNUSED,
const char *path G_GNUC_UNUSED,
bool abstract G_GNUC_UNUSED)
{
virReportSystemError(ENOSYS, "%s",
_("File streams are not supported on this platform"));
return -1;
}
int
virFDStreamOpenFile(virStreamPtr st G_GNUC_UNUSED,
const char *path G_GNUC_UNUSED,
unsigned long long offset G_GNUC_UNUSED,
unsigned long long length G_GNUC_UNUSED,
int oflags G_GNUC_UNUSED)
{
virReportSystemError(ENOSYS, "%s",
_("File streams are not supported on this platform"));
return -1;
}
int
virFDStreamCreateFile(virStreamPtr st G_GNUC_UNUSED,
const char *path G_GNUC_UNUSED,
unsigned long long offset G_GNUC_UNUSED,
unsigned long long length G_GNUC_UNUSED,
int oflags G_GNUC_UNUSED,
mode_t mode G_GNUC_UNUSED)
{
virReportSystemError(ENOSYS, "%s",
_("File streams are not supported on this platform"));
return -1;
}
int
virFDStreamOpenPTY(virStreamPtr st G_GNUC_UNUSED,
const char *path G_GNUC_UNUSED,
unsigned long long offset G_GNUC_UNUSED,
unsigned long long length G_GNUC_UNUSED,
int oflags G_GNUC_UNUSED)
{
virReportSystemError(ENOSYS, "%s",
_("File streams are not supported on this platform"));
return -1;
}
int
virFDStreamOpenBlockDevice(virStreamPtr st G_GNUC_UNUSED,
const char *path G_GNUC_UNUSED,
unsigned long long offset G_GNUC_UNUSED,
unsigned long long length G_GNUC_UNUSED,
bool sparse G_GNUC_UNUSED,
int oflags G_GNUC_UNUSED)
{
virReportSystemError(ENOSYS, "%s",
_("File streams are not supported on this platform"));
return -1;
}
int
virFDStreamSetInternalCloseCb(virStreamPtr st G_GNUC_UNUSED,
virFDStreamInternalCloseCb cb G_GNUC_UNUSED,
void *opaque G_GNUC_UNUSED,
virFDStreamInternalCloseCbFreeOpaque fcb G_GNUC_UNUSED)
{
virReportSystemError(ENOSYS, "%s",
_("File streams are not supported on this platform"));
return -1;
}
#endif /* WIN32 */