diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index c589236947..99e20c09c6 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -143,6 +143,7 @@ virCommandAddEnvString; virCommandAllowCap; virCommandClearCaps; virCommandDaemonize; +virCommandDoAsyncIO; virCommandExec; virCommandFree; virCommandHandshakeNotify; diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a7d5..db7dbca9bc 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -47,11 +47,12 @@ /* Flags for virExecWithHook */ enum { - VIR_EXEC_NONE = 0, - VIR_EXEC_NONBLOCK = (1 << 0), - VIR_EXEC_DAEMON = (1 << 1), + VIR_EXEC_NONE = 0, + VIR_EXEC_NONBLOCK = (1 << 0), + VIR_EXEC_DAEMON = (1 << 1), VIR_EXEC_CLEAR_CAPS = (1 << 2), - VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_ASYNC_IO = (1 << 4), }; struct _virCommand { @@ -84,6 +85,11 @@ struct _virCommand { int *outfdptr; int *errfdptr; + size_t inbufOffset; + int inWatch; + int outWatch; + int errWatch; + bool handshake; int handshakeWait[2]; int handshakeNotify[2]; @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[1] = -1; cmd->infd = cmd->outfd = cmd->errfd = -1; + cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; cmd->pid = -1; virCommandAddArgSet(cmd, args); @@ -1394,8 +1401,9 @@ virCommandSetWorkingDirectory(virCommandPtr cmd, const char *pwd) * @cmd: the command to modify * @inbuf: string to feed to stdin * - * Feed the child's stdin from a string buffer. This requires the use - * of virCommandRun(). + * Feed the child's stdin from a string buffer. This requires the + * use of virCommandRun() or combination of virCommandDoAsyncIO and + * virCommandRunAsync. The buffer is forgotten after each @cmd run. */ void virCommandSetInputBuffer(virCommandPtr cmd, const char *inbuf) @@ -1423,8 +1431,10 @@ virCommandSetInputBuffer(virCommandPtr cmd, const char *inbuf) * Capture the child's stdout to a string buffer. *outbuf is * guaranteed to be allocated after successful virCommandRun or * virCommandWait, and is best-effort allocated after failed - * virCommandRun; caller is responsible for freeing *outbuf. - * This requires the use of virCommandRun. + * virCommandRun or virCommandRunAsync; caller is responsible for + * freeing *outbuf. This requires the use of virCommandRun() or + * combination of virCommandDoAsyncIO and virCommandRunAsync. The + * buffer is forgotten after each @cmd run. */ void virCommandSetOutputBuffer(virCommandPtr cmd, char **outbuf) @@ -1452,11 +1462,13 @@ virCommandSetOutputBuffer(virCommandPtr cmd, char **outbuf) * Capture the child's stderr to a string buffer. *errbuf is * guaranteed to be allocated after successful virCommandRun or * virCommandWait, and is best-effort allocated after failed - * virCommandRun; caller is responsible for freeing *errbuf. - * This requires the use of virCommandRun. It is possible to - * pass the same pointer as for virCommandSetOutputBuffer(), in - * which case the child process will interleave all output into - * a single string. + * virCommandRun or virCommandRunAsync; caller is responsible for + * freeing *errbuf. It is possible to pass the same pointer as + * for virCommandSetOutputBuffer(), in which case the child + * process will interleave all output into a single string. This + * requires the use of virCommandRun() or combination of + * virCommandDoAsyncIO and virCommandRunAsync.The buffer is + * forgotten after each @cmd run. */ void virCommandSetErrorBuffer(virCommandPtr cmd, char **errbuf) @@ -2122,6 +2134,181 @@ virCommandHook(void *data) } +static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char ***bufptr = NULL; + char buf[1024]; + ssize_t nread, nwritten; + size_t len = 0; + int *watchPtr = NULL; + bool eof = false; + int *fdptr = NULL, **fdptrptr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + errno = 0; + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + fdptr = &cmd->infd; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf); + + while (true) { + nwritten = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset); + if (nwritten < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + eof = true; + } + break; + } + + if (nwritten == 0) { + eof = true; + break; + } + + cmd->inbufOffset += nwritten; + if (cmd->inbufOffset == len) { + VIR_FORCE_CLOSE(cmd->infd); + eof = true; + break; + } + } + + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = &cmd->outbuf; + fdptr = &cmd->outfd; + fdptrptr = &cmd->outfdptr; + } else { + watchPtr = &cmd->errWatch; + bufptr = &cmd->errbuf; + fdptr = &cmd->errfd; + fdptrptr = &cmd->errfdptr; + } + + if (events & VIR_EVENT_HANDLE_READABLE) { + if (**bufptr) + len = strlen(**bufptr); + + while (true) { + nread = read(fd, buf, sizeof(buf)); + if (nread < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + eof = true; + } + break; + } + + if (nread == 0) { + eof = true; + break; + } + + if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(**bufptr + len, buf, nread); + (**bufptr)[len + nread] = '\0'; + } + + } + } + + if (eof || (events & VIR_EVENT_HANDLE_HANGUP) || + (events & VIR_EVENT_HANDLE_ERROR)) { + virEventRemoveHandle(watch); + + *watchPtr = -1; + VIR_FORCE_CLOSE(*fdptr); + if (bufptr) + *bufptr = NULL; + if (fdptrptr) + *fdptrptr = NULL; + } +} + + +static int +virCommandRegisterEventLoop(virCommandPtr cmd) +{ + int ret = -1; + + if (cmd->inbuf && + (cmd->inWatch = virEventAddHandle(cmd->infd, + VIR_EVENT_HANDLE_WRITABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register infd %d in the event loop"), + cmd->infd); + goto cleanup; + } + + if (cmd->outbuf && cmd->outfdptr == &cmd->outfd && + (cmd->outWatch = virEventAddHandle(cmd->outfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register outfd %d in the event loop"), + cmd->outfd); + + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + goto cleanup; + } + + if (cmd->errbuf && cmd->errfdptr == &cmd->errfd && + (cmd->errWatch = virEventAddHandle(cmd->errfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register errfd %d in the event loop"), + cmd->errfd); + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + if (cmd->outWatch != -1) { + virEventRemoveHandle(cmd->outWatch); + cmd->outWatch = -1; + } + goto cleanup; + } + + ret = 0; + +cleanup: + return ret; +} + + /** * virCommandRunAsync: * @cmd: command to start @@ -2149,6 +2336,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) char *str; int i; bool synchronous = false; + int infd[2] = {-1, -1}; if (!cmd || cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2163,10 +2351,23 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) synchronous = cmd->flags & VIR_EXEC_RUN_SYNC; cmd->flags &= ~VIR_EXEC_RUN_SYNC; - /* Buffer management can only be requested via virCommandRun. */ - if ((cmd->inbuf && cmd->infd == -1) || - (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || - (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { + /* Buffer management can only be requested via virCommandRun, unless help + * from the event loop has been requested via virCommandDoAsyncIO. */ + if (cmd->flags & VIR_EXEC_ASYNC_IO) { + /* If we have an input buffer, we need + * a pipe to feed the data to the child */ + if (cmd->inbuf && cmd->infd == -1) { + if (pipe2(infd, O_CLOEXEC) < 0) { + virReportSystemError(errno, "%s", + _("unable to open pipe")); + cmd->has_error = -1; + return -1; + } + cmd->infd = infd[0]; + } + } else if ((cmd->inbuf && cmd->infd == -1) || + (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || + (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot mix string I/O with asynchronous command")); return -1; @@ -2228,6 +2429,16 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) else cmd->reap = true; + if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) { + cmd->flags &= ~VIR_EXEC_ASYNC_IO; + if (cmd->inbuf && cmd->infd != -1) { + /* close the read end of infd and replace it with the write end */ + VIR_FORCE_CLOSE(cmd->infd); + cmd->infd = infd[1]; + } + ret = virCommandRegisterEventLoop(cmd); + } + return ret; } @@ -2248,6 +2459,7 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) { int ret; int status = 0; + const int events = VIR_EVENT_HANDLE_READABLE | VIR_EVENT_HANDLE_HANGUP; if (!cmd ||cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2272,6 +2484,24 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) * guarantee that virProcessWait only fails due to failure to wait, * and repeat the exitstatus check code ourselves. */ ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status); + + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + + if (cmd->outWatch != -1) { + virEventRemoveHandle(cmd->outWatch); + virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd); + cmd->outWatch = -1; + } + + if (cmd->errWatch != -1) { + virEventRemoveHandle(cmd->errWatch); + virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd); + cmd->errWatch = -1; + } + if (ret == 0) { cmd->pid = -1; cmd->reap = false; @@ -2521,3 +2751,50 @@ virCommandFree(virCommandPtr cmd) VIR_FREE(cmd); } + +/** + * virCommandDoAsyncIO: + * @cmd: command to do async IO on + * + * This requests asynchronous string IO on @cmd. It is useful in + * combination with virCommandRunAsync(): + * + * virCommandPtr cmd = virCommandNew*(...); + * char *buf = NULL; + * + * ... + * + * virCommandSetOutputBuffer(cmd, &buf); + * virCommandDoAsyncIO(cmd); + * + * if (virCommandRunAsync(cmd, NULL) < 0) + * goto cleanup; + * + * ... + * + * if (virCommandWait(cmd, NULL) < 0) + * goto cleanup; + * + * // @buf now contains @cmd's stdout + * VIR_DEBUG("STDOUT: %s", NULLSTR(buf)); + * + * ... + * + * cleanup: + * VIR_FREE(buf); + * virCommandFree(cmd); + * + * The libvirt's event loop is used for handling stdios of @cmd. + * Since current implementation uses strlen to determine length + * of data to be written to @cmd's stdin, don't pass any binary + * data. If you want to re-run command, you need to call this and + * buffer setting functions (virCommandSet.*Buffer) prior each run. + */ +void +virCommandDoAsyncIO(virCommandPtr cmd) +{ + if (!cmd || cmd->has_error) + return; + + cmd->flags |= VIR_EXEC_ASYNC_IO | VIR_EXEC_NONBLOCK; +} diff --git a/src/util/vircommand.h b/src/util/vircommand.h index 9b7117d998..c1a2e246ff 100644 --- a/src/util/vircommand.h +++ b/src/util/vircommand.h @@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd); void virCommandFree(virCommandPtr cmd); +void virCommandDoAsyncIO(virCommandPtr cmd); #endif /* __VIR_COMMAND_H__ */