Enhance the streams helper to support plain file I/O

The O_NONBLOCK flag doesn't work as desired on plain files
or block devices. Introduce an I/O helper program that does
the blocking I/O operations, communicating over a pipe that
can support O_NONBLOCK

* src/fdstream.c, src/fdstream.h: Add non-blocking I/O
  on plain files/block devices
* src/Makefile.am, src/util/iohelper.c: I/O helper program
* src/qemu/qemu_driver.c, src/lxc/lxc_driver.c,
  src/uml/uml_driver.c, src/xen/xen_driver.c: Update for
  streams API change
This commit is contained in:
Daniel P. Berrange 2011-02-22 12:05:20 +00:00
parent 0c97e70b74
commit e886237af5
9 changed files with 411 additions and 66 deletions

View File

@ -92,6 +92,7 @@ src/util/event_poll.c
src/util/hash.c
src/util/hooks.c
src/util/hostusb.c
src/util/iohelper.c
src/util/interface.c
src/util/iptables.c
src/util/json.c

View File

@ -376,6 +376,9 @@ STORAGE_DRIVER_DISK_SOURCES = \
STORAGE_HELPER_DISK_SOURCES = \
storage/parthelper.c
UTIL_IO_HELPER_SOURCES = \
util/iohelper.c
# Network filters
NWFILTER_DRIVER_SOURCES = \
nwfilter/nwfilter_driver.h nwfilter/nwfilter_driver.c \
@ -1178,6 +1181,15 @@ EXTRA_DIST += $(LIBVIRT_QEMU_SYMBOL_FILE)
libexec_PROGRAMS =
libexec_PROGRAMS += libvirt_iohelper
libvirt_iohelper_SOURCES = $(UTIL_IO_HELPER_SOURCES)
libvirt_iohelper_LDFLAGS = $(WARN_LDFLAGS) $(AM_LDFLAGS)
libvirt_iohelper_LDADD = \
libvirt_util.la \
../gnulib/lib/libgnu.la
libvirt_iohelper_CFLAGS = $(AM_CFLAGS)
if WITH_STORAGE_DISK
if WITH_LIBVIRTD
libexec_PROGRAMS += libvirt_parthelper

View File

@ -26,18 +26,22 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/wait.h>
#if HAVE_SYS_UN_H
# include <sys/un.h>
#endif
#include <netinet/in.h>
#include <signal.h>
#include "fdstream.h"
#include "virterror_internal.h"
#include "datatypes.h"
#include "logging.h"
#include "memory.h"
#include "event.h"
#include "util.h"
#include "files.h"
#include "configmake.h"
#define VIR_FROM_THIS VIR_FROM_STREAMS
#define streamsReportError(code, ...) \
@ -47,6 +51,10 @@
/* Tunnelled migration stream support */
struct virFDStreamData {
int fd;
int errfd;
virCommandPtr cmd;
unsigned long long offset;
unsigned long long length;
int watch;
unsigned int cbRemoved;
@ -206,6 +214,35 @@ static int virFDStreamFree(struct virFDStreamData *fdst)
{
int ret;
ret = VIR_CLOSE(fdst->fd);
if (fdst->cmd) {
char buf[1024];
ssize_t len;
int status;
if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0)
buf[0] = '\0';
else
buf[len] = '\0';
if (virCommandWait(fdst->cmd, &status) < 0) {
ret = -1;
} else if (status != 0) {
if (buf[0] == '\0') {
if (WIFEXITED(status)) {
streamsReportError(VIR_ERR_INTERNAL_ERROR,
_("I/O helper exited with status %d"),
WEXITSTATUS(status));
} else {
streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("I/O helper exited abnormally"));
}
} else {
streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
buf);
}
ret = -1;
}
virCommandFree(fdst->cmd);
}
VIR_FREE(fdst);
return ret;
}
@ -217,6 +254,8 @@ virFDStreamClose(virStreamPtr st)
struct virFDStreamData *fdst = st->privateData;
int ret;
VIR_DEBUG("st=%p", st);
if (!fdst)
return 0;
@ -250,6 +289,18 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
virMutexLock(&fdst->lock);
if (fdst->length) {
if (fdst->length == fdst->offset) {
virReportSystemError(ENOSPC, "%s",
_("cannot write to stream"));
virMutexUnlock(&fdst->lock);
return -1;
}
if ((fdst->length - fdst->offset) < nbytes)
nbytes = fdst->length - fdst->offset;
}
retry:
ret = write(fdst->fd, bytes, nbytes);
if (ret < 0) {
@ -262,6 +313,8 @@ retry:
virReportSystemError(errno, "%s",
_("cannot write to stream"));
}
} else if (fdst->length) {
fdst->offset += ret;
}
virMutexUnlock(&fdst->lock);
@ -288,6 +341,16 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
virMutexLock(&fdst->lock);
if (fdst->length) {
if (fdst->length == fdst->offset) {
virMutexUnlock(&fdst->lock);
return 0;
}
if ((fdst->length - fdst->offset) < nbytes)
nbytes = fdst->length - fdst->offset;
}
retry:
ret = read(fdst->fd, bytes, nbytes);
if (ret < 0) {
@ -300,6 +363,8 @@ retry:
virReportSystemError(errno, "%s",
_("cannot read from stream"));
}
} else if (fdst->length) {
fdst->offset += ret;
}
virMutexUnlock(&fdst->lock);
@ -317,11 +382,17 @@ static virStreamDriver virFDStreamDrv = {
.streamRemoveCallback = virFDStreamRemoveCallback
};
int virFDStreamOpen(virStreamPtr st,
int fd)
static int virFDStreamOpenInternal(virStreamPtr st,
int fd,
virCommandPtr cmd,
int errfd,
unsigned long long length)
{
struct virFDStreamData *fdst;
VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
st, fd, cmd, errfd, length);
if ((st->flags & VIR_STREAM_NONBLOCK) &&
virSetNonBlock(fd) < 0)
return -1;
@ -332,6 +403,9 @@ int virFDStreamOpen(virStreamPtr st,
}
fdst->fd = fd;
fdst->cmd = cmd;
fdst->errfd = errfd;
fdst->length = length;
if (virMutexInit(&fdst->lock) < 0) {
VIR_FREE(fdst);
streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@ -346,6 +420,13 @@ int virFDStreamOpen(virStreamPtr st,
}
int virFDStreamOpen(virStreamPtr st,
int fd)
{
return virFDStreamOpenInternal(st, fd, NULL, -1, 0);
}
#if HAVE_SYS_UN_H
int virFDStreamConnectUNIX(virStreamPtr st,
const char *path,
@ -387,7 +468,7 @@ int virFDStreamConnectUNIX(virStreamPtr st,
goto error;
} while ((++i <= timeout*5) && (usleep(.2 * 1000000) <= 0));
if (virFDStreamOpen(st, fd) < 0)
if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0)
goto error;
return 0;
@ -406,64 +487,28 @@ int virFDStreamConnectUNIX(virStreamPtr st ATTRIBUTE_UNUSED,
}
#endif
int virFDStreamOpenFile(virStreamPtr st,
const char *path,
int flags)
static int
virFDStreamOpenFileInternal(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int flags,
int mode)
{
int fd;
int fd = -1;
int fds[2] = { -1, -1 };
struct stat sb;
virCommandPtr cmd = NULL;
int errfd = -1;
pid_t pid = 0;
if (flags & O_CREAT) {
streamsReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unexpected O_CREAT flag when opening existing file"));
}
if ((fd = open(path, flags)) < 0) {
virReportSystemError(errno,
_("Unable to open stream for '%s'"),
path);
return -1;
}
if (fstat(fd, &sb) < 0) {
virReportSystemError(errno,
_("Unable to access stream for '%s'"),
path);
goto error;
}
/* Thanks to the POSIX i/o model, we can't reliably get
* non-blocking I/O on block devs/regular files. To
* support those we need to fork a helper process todo
* the I/O so we just have a fifo. Or use AIO :-(
*/
if ((st->flags & VIR_STREAM_NONBLOCK) &&
(!S_ISCHR(sb.st_mode) &&
!S_ISFIFO(sb.st_mode))) {
streamsReportError(VIR_ERR_INTERNAL_ERROR,
_("Non-blocking I/O is not supported on %s"),
path);
goto error;
}
if (virFDStreamOpen(st, fd) < 0)
goto error;
return 0;
error:
VIR_FORCE_CLOSE(fd);
return -1;
}
int virFDStreamCreateFile(virStreamPtr st,
const char *path,
int flags,
mode_t mode)
{
int fd = open(path, flags, mode);
struct stat sb;
VIR_DEBUG("st=%p path=%s flags=%d offset=%llu length=%llu mode=%d",
st, path, flags, offset, length, mode);
if (flags & O_CREAT)
fd = open(path, flags, mode);
else
fd = open(path, flags);
if (fd < 0) {
virReportSystemError(errno,
_("Unable to open stream for '%s'"),
@ -486,18 +531,97 @@ int virFDStreamCreateFile(virStreamPtr st,
if ((st->flags & VIR_STREAM_NONBLOCK) &&
(!S_ISCHR(sb.st_mode) &&
!S_ISFIFO(sb.st_mode))) {
streamsReportError(VIR_ERR_INTERNAL_ERROR,
_("Non-blocking I/O is not supported on %s"),
path);
goto error;
int childfd;
if ((flags & O_RDWR) == O_RDWR) {
streamsReportError(VIR_ERR_INTERNAL_ERROR,
_("%s: Cannot request read and write flags together"),
path);
goto error;
}
VIR_FORCE_CLOSE(fd);
if (pipe(fds) < 0) {
virReportSystemError(errno, "%s",
_("Unable to create pipe"));
goto error;
}
cmd = virCommandNewArgList(LIBEXECDIR "/libvirt_iohelper",
path,
NULL);
virCommandAddArgFormat(cmd, "%d", flags);
virCommandAddArgFormat(cmd, "%d", mode);
virCommandAddArgFormat(cmd, "%llu", offset);
virCommandAddArgFormat(cmd, "%llu", length);
if (flags == O_RDONLY) {
childfd = fds[1];
fd = fds[0];
virCommandSetOutputFD(cmd, &childfd);
} else {
childfd = fds[0];
fd = fds[1];
virCommandSetInputFD(cmd, childfd);
}
virCommandSetErrorFD(cmd, &errfd);
if (virCommandRunAsync(cmd, &pid) < 0)
goto error;
VIR_FORCE_CLOSE(childfd);
} else {
if (offset &&
lseek(fd, offset, SEEK_SET) != offset) {
virReportSystemError(errno,
_("Unable to seek %s to %llu"),
path, offset);
goto error;
}
}
if (virFDStreamOpen(st, fd) < 0)
if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0)
goto error;
return 0;
error:
#ifndef WIN32
if (pid)
kill(SIGTERM, pid);
#endif
virCommandFree(cmd);
VIR_FORCE_CLOSE(fds[0]);
VIR_FORCE_CLOSE(fds[1]);
VIR_FORCE_CLOSE(fd);
return -1;
}
int virFDStreamOpenFile(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int flags)
{
if (flags & O_CREAT) {
streamsReportError(VIR_ERR_INTERNAL_ERROR,
_("Attempt to create %s without specifying mode"),
path);
return -1;
}
return virFDStreamOpenFileInternal(st, path,
offset, length,
flags, 0);
}
int virFDStreamCreateFile(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int flags,
mode_t mode)
{
return virFDStreamOpenFileInternal(st, path,
offset, length,
flags | O_CREAT, mode);
}

View File

@ -24,6 +24,7 @@
# define __VIR_FDSTREAM_H_
# include "internal.h"
# include "command.h"
int virFDStreamOpen(virStreamPtr st,
int fd);
@ -34,9 +35,13 @@ int virFDStreamConnectUNIX(virStreamPtr st,
int virFDStreamOpenFile(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int flags);
int virFDStreamCreateFile(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
int flags,
mode_t mode);

View File

@ -2780,7 +2780,7 @@ lxcDomainOpenConsole(virDomainPtr dom,
goto cleanup;
}
if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
goto cleanup;
ret = 0;

View File

@ -6788,7 +6788,7 @@ qemuDomainOpenConsole(virDomainPtr dom,
goto cleanup;
}
if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
goto cleanup;
ret = 0;

View File

@ -2126,7 +2126,7 @@ umlDomainOpenConsole(virDomainPtr dom,
goto cleanup;
}
if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
goto cleanup;
ret = 0;

203
src/util/iohelper.c Normal file
View File

@ -0,0 +1,203 @@
/*
* iohelper.c: Helper program to perform I/O operations on files
*
* Copyright (C) 2011 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Daniel P. Berrange <berrange@redhat.com>
*
* Current support
* - Read existing file
* - Write existing file
* - Create & write new file
*/
#include <config.h>
#include <locale.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include "util.h"
#include "threads.h"
#include "files.h"
#include "memory.h"
#include "virterror_internal.h"
#include "configmake.h"
#define VIR_FROM_THIS VIR_FROM_STORAGE
static int runIO(const char *path,
int flags,
int mode,
unsigned long long offset,
unsigned long long length)
{
char *buf = NULL;
size_t buflen = 1024*1024;
int fd;
int ret = -1;
int fdin, fdout;
const char *fdinname, *fdoutname;
unsigned long long total = 0;
if (flags & O_CREAT) {
fd = open(path, flags, mode);
} else {
fd = open(path, flags);
}
if (fd < 0) {
virReportSystemError(errno, _("Unable to open %s"), path);
goto cleanup;
}
if (offset) {
if (lseek(fd, offset, SEEK_SET) < 0) {
virReportSystemError(errno, _("Unable to seek %s to %llu"),
path, offset);
goto cleanup;
}
}
if (VIR_ALLOC_N(buf, buflen) < 0) {
virReportOOMError();
goto cleanup;
}
switch (flags & O_ACCMODE) {
case O_RDONLY:
fdin = fd;
fdinname = path;
fdout = STDOUT_FILENO;
fdoutname = "stdout";
break;
case O_WRONLY:
fdin = STDIN_FILENO;
fdinname = "stdin";
fdout = fd;
fdoutname = path;
break;
case O_RDWR:
default:
virReportSystemError(EINVAL,
_("Unable to process file with flags %d"),
(flags & O_ACCMODE));
goto cleanup;
}
while (1) {
ssize_t got;
if (length &&
(length - total) < buflen)
buflen = length - total;
if (buflen == 0)
break; /* End of requested data from client */
if ((got = saferead(fdin, buf, buflen)) < 0) {
virReportSystemError(errno, _("Unable to read %s"), fdinname);
goto cleanup;
}
if (got == 0)
break; /* End of file before end of requested data */
total += got;
if (safewrite(fdout, buf, got) < 0) {
virReportSystemError(errno, _("Unable to write %s"), fdoutname);
goto cleanup;
}
}
ret = 0;
cleanup:
if (VIR_CLOSE(fd) < 0 &&
ret == 0) {
virReportSystemError(errno, _("Unable to close %s"), path);
ret = -1;
}
VIR_FREE(buf);
return ret;
}
int main(int argc, char **argv)
{
const char *path;
virErrorPtr err;
unsigned long long offset;
unsigned long long length;
int flags;
int mode;
if (setlocale(LC_ALL, "") == NULL ||
bindtextdomain(PACKAGE, LOCALEDIR) == NULL ||
textdomain(PACKAGE) == NULL) {
fprintf(stderr, _("%s: initialization failed\n"), argv[0]);
exit(EXIT_FAILURE);
}
if (virThreadInitialize() < 0 ||
virErrorInitialize() < 0 ||
virRandomInitialize(time(NULL) ^ getpid())) {
fprintf(stderr, _("%s: initialization failed\n"), argv[0]);
exit(EXIT_FAILURE);
}
if (argc != 6) {
fprintf(stderr, _("%s: syntax FILENAME FLAGS MODE OFFSET LENGTH\n"), argv[0]);
exit(EXIT_FAILURE);
}
path = argv[1];
if (virStrToLong_i(argv[2], NULL, 10, &flags) < 0) {
fprintf(stderr, _("%s: malformed file flags %s"), argv[0], argv[2]);
exit(EXIT_FAILURE);
}
if (virStrToLong_i(argv[3], NULL, 10, &mode) < 0) {
fprintf(stderr, _("%s: malformed file mode %s"), argv[0], argv[3]);
exit(EXIT_FAILURE);
}
if (virStrToLong_ull(argv[4], NULL, 10, &offset) < 0) {
fprintf(stderr, _("%s: malformed file offset %s"), argv[0], argv[4]);
exit(EXIT_FAILURE);
}
if (virStrToLong_ull(argv[5], NULL, 10, &length) < 0) {
fprintf(stderr, _("%s: malformed file length %s"), argv[0], argv[5]);
exit(EXIT_FAILURE);
}
if (runIO(path, flags, mode, offset, length) < 0)
goto error;
return 0;
error:
err = virGetLastError();
if (err) {
fprintf(stderr, "%s: %s\n", argv[0], err->message);
} else {
fprintf(stderr, _("%s: unknown failure with %s\n"), argv[0], path);
}
exit(EXIT_FAILURE);
}

View File

@ -2019,7 +2019,7 @@ xenUnifiedDomainOpenConsole(virDomainPtr dom,
goto cleanup;
}
if (virFDStreamOpenFile(st, chr->source.data.file.path, O_RDWR) < 0)
if (virFDStreamOpenFile(st, chr->source.data.file.path, 0, 0, O_RDWR) < 0)
goto cleanup;
ret = 0;