mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-03-07 17:28:15 +00:00
virfdstream: Drop iohelper in favour of a thread
Currently we use iohelper for virFDStream implementation. This is because UNIX I/O can lie sometimes: even though a FD for a file/block device is set as unblocking, actual read()/write() can block. To avoid this, a pipe is created and one end is kept for read/write while the other is handed over to iohelper to write/read the data for us. Thus it's iohelper which gets blocked and not our event loop. This approach has two problems: 1) we are spawning a new process. 2) any exchange of information between daemon and iohelper can be done only through the pipe. Therefore, iohelper is replaced with an implementation in thread which is created just for the stream lifetime. The data are still transferred through pipe (for now), but both problems described above are solved. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> Reviewed-by: John Ferlan <jferlan@redhat.com>
This commit is contained in:
parent
585eb46920
commit
d1a60f4c3b
@ -56,8 +56,6 @@ struct virFDStreamData {
|
|||||||
virObjectLockable parent;
|
virObjectLockable parent;
|
||||||
|
|
||||||
int fd;
|
int fd;
|
||||||
int errfd;
|
|
||||||
virCommandPtr cmd;
|
|
||||||
unsigned long long offset;
|
unsigned long long offset;
|
||||||
unsigned long long length;
|
unsigned long long length;
|
||||||
|
|
||||||
@ -79,6 +77,11 @@ struct virFDStreamData {
|
|||||||
virFDStreamInternalCloseCb icbCb;
|
virFDStreamInternalCloseCb icbCb;
|
||||||
virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
|
virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
|
||||||
void *icbOpaque;
|
void *icbOpaque;
|
||||||
|
|
||||||
|
/* Thread data */
|
||||||
|
virThreadPtr thread;
|
||||||
|
int threadErr;
|
||||||
|
bool threadQuit;
|
||||||
};
|
};
|
||||||
|
|
||||||
static virClassPtr virFDStreamDataClass;
|
static virClassPtr virFDStreamDataClass;
|
||||||
@ -264,57 +267,124 @@ virFDStreamAddCallback(virStreamPtr st,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
|
||||||
virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort)
|
|
||||||
{
|
|
||||||
char buf[1024];
|
|
||||||
ssize_t len;
|
|
||||||
int status;
|
|
||||||
int ret = -1;
|
|
||||||
|
|
||||||
if (!fdst->cmd)
|
typedef struct _virFDStreamThreadData virFDStreamThreadData;
|
||||||
|
typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
|
||||||
|
struct _virFDStreamThreadData {
|
||||||
|
virStreamPtr st;
|
||||||
|
size_t length;
|
||||||
|
int fdin;
|
||||||
|
char *fdinname;
|
||||||
|
int fdout;
|
||||||
|
char *fdoutname;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
|
||||||
|
{
|
||||||
|
if (!data)
|
||||||
|
return;
|
||||||
|
|
||||||
|
virObjectUnref(data->st);
|
||||||
|
VIR_FREE(data->fdinname);
|
||||||
|
VIR_FREE(data->fdoutname);
|
||||||
|
VIR_FREE(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
virFDStreamThread(void *opaque)
|
||||||
|
{
|
||||||
|
virFDStreamThreadDataPtr data = opaque;
|
||||||
|
virStreamPtr st = data->st;
|
||||||
|
size_t length = data->length;
|
||||||
|
int fdin = data->fdin;
|
||||||
|
char *fdinname = data->fdinname;
|
||||||
|
int fdout = data->fdout;
|
||||||
|
char *fdoutname = data->fdoutname;
|
||||||
|
virFDStreamDataPtr fdst = st->privateData;
|
||||||
|
char *buf = NULL;
|
||||||
|
size_t buflen = 256 * 1024;
|
||||||
|
size_t total = 0;
|
||||||
|
|
||||||
|
virObjectRef(fdst);
|
||||||
|
|
||||||
|
if (VIR_ALLOC_N(buf, buflen) < 0)
|
||||||
|
goto error;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
ssize_t got;
|
||||||
|
|
||||||
|
if (length &&
|
||||||
|
(length - total) < buflen)
|
||||||
|
buflen = length - total;
|
||||||
|
|
||||||
|
if (buflen == 0)
|
||||||
|
break; /* End of requested data from client */
|
||||||
|
|
||||||
|
if ((got = saferead(fdin, buf, buflen)) < 0) {
|
||||||
|
virReportSystemError(errno,
|
||||||
|
_("Unable to read %s"),
|
||||||
|
fdinname);
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (got == 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
total += got;
|
||||||
|
|
||||||
|
if (safewrite(fdout, buf, got) < 0) {
|
||||||
|
virReportSystemError(errno,
|
||||||
|
_("Unable to write %s"),
|
||||||
|
fdoutname);
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
if (!virObjectUnref(fdst))
|
||||||
|
st->privateData = NULL;
|
||||||
|
VIR_FORCE_CLOSE(fdin);
|
||||||
|
VIR_FORCE_CLOSE(fdout);
|
||||||
|
virFDStreamThreadDataFree(data);
|
||||||
|
VIR_FREE(buf);
|
||||||
|
return;
|
||||||
|
|
||||||
|
error:
|
||||||
|
virObjectLock(fdst);
|
||||||
|
fdst->threadErr = errno;
|
||||||
|
virObjectUnlock(fdst);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
virFDStreamJoinWorker(virFDStreamDataPtr fdst,
|
||||||
|
bool streamAbort)
|
||||||
|
{
|
||||||
|
int ret = -1;
|
||||||
|
if (!fdst->thread)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0)
|
/* Give the thread a chance to lock the FD stream object. */
|
||||||
buf[0] = '\0';
|
virObjectUnlock(fdst);
|
||||||
else
|
virThreadJoin(fdst->thread);
|
||||||
buf[len] = '\0';
|
virObjectLock(fdst);
|
||||||
|
|
||||||
virCommandRawStatus(fdst->cmd);
|
if (fdst->threadErr && !streamAbort) {
|
||||||
if (virCommandWait(fdst->cmd, &status) < 0)
|
/* errors are expected on streamAbort */
|
||||||
goto cleanup;
|
|
||||||
|
|
||||||
if (status != 0) {
|
|
||||||
if (buf[0] != '\0') {
|
|
||||||
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf);
|
|
||||||
} else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGPIPE) {
|
|
||||||
if (streamAbort) {
|
|
||||||
/* Explicit abort request means the caller doesn't care
|
|
||||||
if there's data left over, so skip the error */
|
|
||||||
goto out;
|
|
||||||
}
|
|
||||||
|
|
||||||
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
|
|
||||||
_("I/O helper exited "
|
|
||||||
"before all data was processed"));
|
|
||||||
} else {
|
|
||||||
char *str = virProcessTranslateStatus(status);
|
|
||||||
virReportError(VIR_ERR_INTERNAL_ERROR,
|
|
||||||
_("I/O helper exited with %s"),
|
|
||||||
NULLSTR(str));
|
|
||||||
VIR_FREE(str);
|
|
||||||
}
|
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
out:
|
|
||||||
ret = 0;
|
ret = 0;
|
||||||
cleanup:
|
cleanup:
|
||||||
virCommandFree(fdst->cmd);
|
VIR_FREE(fdst->thread);
|
||||||
fdst->cmd = NULL;
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
|
virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
|
||||||
{
|
{
|
||||||
@ -359,12 +429,9 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
|
|||||||
|
|
||||||
/* mutex locked */
|
/* mutex locked */
|
||||||
ret = VIR_CLOSE(fdst->fd);
|
ret = VIR_CLOSE(fdst->fd);
|
||||||
if (virFDStreamCloseCommand(fdst, streamAbort) < 0)
|
if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
|
||||||
ret = -1;
|
ret = -1;
|
||||||
|
|
||||||
if (VIR_CLOSE(fdst->errfd) < 0)
|
|
||||||
VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd);
|
|
||||||
|
|
||||||
st->privateData = NULL;
|
st->privateData = NULL;
|
||||||
|
|
||||||
/* call the internal stream closing callback */
|
/* call the internal stream closing callback */
|
||||||
@ -516,14 +583,13 @@ static virStreamDriver virFDStreamDrv = {
|
|||||||
|
|
||||||
static int virFDStreamOpenInternal(virStreamPtr st,
|
static int virFDStreamOpenInternal(virStreamPtr st,
|
||||||
int fd,
|
int fd,
|
||||||
virCommandPtr cmd,
|
virFDStreamThreadDataPtr threadData,
|
||||||
int errfd,
|
|
||||||
unsigned long long length)
|
unsigned long long length)
|
||||||
{
|
{
|
||||||
virFDStreamDataPtr fdst;
|
virFDStreamDataPtr fdst;
|
||||||
|
|
||||||
VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
|
VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu",
|
||||||
st, fd, cmd, errfd, length);
|
st, fd, threadData, length);
|
||||||
|
|
||||||
if (virFDStreamDataInitialize() < 0)
|
if (virFDStreamDataInitialize() < 0)
|
||||||
return -1;
|
return -1;
|
||||||
@ -538,21 +604,39 @@ static int virFDStreamOpenInternal(virStreamPtr st,
|
|||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
fdst->fd = fd;
|
fdst->fd = fd;
|
||||||
fdst->cmd = cmd;
|
|
||||||
fdst->errfd = errfd;
|
|
||||||
fdst->length = length;
|
fdst->length = length;
|
||||||
|
|
||||||
st->driver = &virFDStreamDrv;
|
st->driver = &virFDStreamDrv;
|
||||||
st->privateData = fdst;
|
st->privateData = fdst;
|
||||||
|
|
||||||
|
if (threadData) {
|
||||||
|
/* Create the thread after fdst and st were initialized.
|
||||||
|
* The thread worker expects them to be that way. */
|
||||||
|
if (VIR_ALLOC(fdst->thread) < 0)
|
||||||
|
goto error;
|
||||||
|
|
||||||
|
if (virThreadCreate(fdst->thread,
|
||||||
|
true,
|
||||||
|
virFDStreamThread,
|
||||||
|
threadData) < 0)
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
error:
|
||||||
|
VIR_FREE(fdst->thread);
|
||||||
|
st->driver = NULL;
|
||||||
|
st->privateData = NULL;
|
||||||
|
virObjectUnref(fdst);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int virFDStreamOpen(virStreamPtr st,
|
int virFDStreamOpen(virStreamPtr st,
|
||||||
int fd)
|
int fd)
|
||||||
{
|
{
|
||||||
return virFDStreamOpenInternal(st, fd, NULL, -1, 0);
|
return virFDStreamOpenInternal(st, fd, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -598,7 +682,7 @@ int virFDStreamConnectUNIX(virStreamPtr st,
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0)
|
if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0)
|
||||||
goto error;
|
goto error;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
@ -627,11 +711,10 @@ virFDStreamOpenFileInternal(virStreamPtr st,
|
|||||||
bool forceIOHelper)
|
bool forceIOHelper)
|
||||||
{
|
{
|
||||||
int fd = -1;
|
int fd = -1;
|
||||||
int childfd = -1;
|
int pipefds[2] = { -1, -1 };
|
||||||
|
int tmpfd = -1;
|
||||||
struct stat sb;
|
struct stat sb;
|
||||||
virCommandPtr cmd = NULL;
|
virFDStreamThreadDataPtr threadData = NULL;
|
||||||
int errfd = -1;
|
|
||||||
char *iohelper_path = NULL;
|
|
||||||
|
|
||||||
VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o",
|
VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o",
|
||||||
st, path, oflags, offset, length, mode);
|
st, path, oflags, offset, length, mode);
|
||||||
@ -648,6 +731,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
|
|||||||
path);
|
path);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
tmpfd = fd;
|
||||||
|
|
||||||
if (fstat(fd, &sb) < 0) {
|
if (fstat(fd, &sb) < 0) {
|
||||||
virReportSystemError(errno,
|
virReportSystemError(errno,
|
||||||
@ -666,13 +750,12 @@ virFDStreamOpenFileInternal(virStreamPtr st,
|
|||||||
|
|
||||||
/* Thanks to the POSIX i/o model, we can't reliably get
|
/* Thanks to the POSIX i/o model, we can't reliably get
|
||||||
* non-blocking I/O on block devs/regular files. To
|
* non-blocking I/O on block devs/regular files. To
|
||||||
* support those we need to fork a helper process to do
|
* support those we need to create a helper thread to do
|
||||||
* the I/O so we just have a fifo. Or use AIO :-(
|
* the I/O so we just have a fifo. Or use AIO :-(
|
||||||
*/
|
*/
|
||||||
if ((st->flags & VIR_STREAM_NONBLOCK) &&
|
if ((st->flags & VIR_STREAM_NONBLOCK) &&
|
||||||
((!S_ISCHR(sb.st_mode) &&
|
((!S_ISCHR(sb.st_mode) &&
|
||||||
!S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
|
!S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
|
||||||
int fds[2] = { -1, -1 };
|
|
||||||
|
|
||||||
if ((oflags & O_ACCMODE) == O_RDWR) {
|
if ((oflags & O_ACCMODE) == O_RDWR) {
|
||||||
virReportError(VIR_ERR_INTERNAL_ERROR,
|
virReportError(VIR_ERR_INTERNAL_ERROR,
|
||||||
@ -681,58 +764,47 @@ virFDStreamOpenFileInternal(virStreamPtr st,
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pipe(fds) < 0) {
|
if (pipe(pipefds) < 0) {
|
||||||
virReportSystemError(errno, "%s",
|
virReportSystemError(errno, "%s",
|
||||||
_("Unable to create pipe"));
|
_("Unable to create pipe"));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(iohelper_path = virFileFindResource("libvirt_iohelper",
|
if (VIR_ALLOC(threadData) < 0)
|
||||||
abs_topbuilddir "/src",
|
|
||||||
LIBEXECDIR)))
|
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
cmd = virCommandNewArgList(iohelper_path,
|
threadData->st = virObjectRef(st);
|
||||||
path,
|
threadData->length = length;
|
||||||
NULL);
|
|
||||||
|
|
||||||
VIR_FREE(iohelper_path);
|
|
||||||
|
|
||||||
virCommandAddArgFormat(cmd, "%llu", length);
|
|
||||||
virCommandPassFD(cmd, fd,
|
|
||||||
VIR_COMMAND_PASS_FD_CLOSE_PARENT);
|
|
||||||
virCommandAddArgFormat(cmd, "%d", fd);
|
|
||||||
|
|
||||||
if ((oflags & O_ACCMODE) == O_RDONLY) {
|
if ((oflags & O_ACCMODE) == O_RDONLY) {
|
||||||
childfd = fds[1];
|
threadData->fdin = fd;
|
||||||
fd = fds[0];
|
threadData->fdout = pipefds[1];
|
||||||
virCommandSetOutputFD(cmd, &childfd);
|
if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
|
||||||
} else {
|
VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
|
||||||
childfd = fds[0];
|
|
||||||
fd = fds[1];
|
|
||||||
virCommandSetInputFD(cmd, childfd);
|
|
||||||
}
|
|
||||||
virCommandSetErrorFD(cmd, &errfd);
|
|
||||||
|
|
||||||
if (virCommandRunAsync(cmd, NULL) < 0)
|
|
||||||
goto error;
|
goto error;
|
||||||
|
tmpfd = pipefds[0];
|
||||||
VIR_FORCE_CLOSE(childfd);
|
} else {
|
||||||
|
threadData->fdin = pipefds[0];
|
||||||
|
threadData->fdout = fd;
|
||||||
|
if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
|
||||||
|
VIR_STRDUP(threadData->fdoutname, path) < 0)
|
||||||
|
goto error;
|
||||||
|
tmpfd = pipefds[1];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0)
|
if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
virCommandFree(cmd);
|
|
||||||
VIR_FORCE_CLOSE(fd);
|
VIR_FORCE_CLOSE(fd);
|
||||||
VIR_FORCE_CLOSE(childfd);
|
VIR_FORCE_CLOSE(pipefds[0]);
|
||||||
VIR_FORCE_CLOSE(errfd);
|
VIR_FORCE_CLOSE(pipefds[1]);
|
||||||
VIR_FREE(iohelper_path);
|
|
||||||
if (oflags & O_CREAT)
|
if (oflags & O_CREAT)
|
||||||
unlink(path);
|
unlink(path);
|
||||||
|
virFDStreamThreadDataFree(threadData);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
# define __VIR_FDSTREAM_H_
|
# define __VIR_FDSTREAM_H_
|
||||||
|
|
||||||
# include "internal.h"
|
# include "internal.h"
|
||||||
# include "vircommand.h"
|
|
||||||
|
|
||||||
/* internal callback, the generic one is used up by daemon stream driver */
|
/* internal callback, the generic one is used up by daemon stream driver */
|
||||||
/* the close callback is called with fdstream private data locked */
|
/* the close callback is called with fdstream private data locked */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user