diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index ecf81585e6..8b7f7f4513 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -25,6 +25,7 @@ #include #include #include +#include #include "qemu_migration.h" #include "qemu_monitor.h" @@ -1584,49 +1585,113 @@ struct _qemuMigrationIOThread { virStreamPtr st; int sock; virError err; + int wakeupRecvFD; + int wakeupSendFD; }; static void qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; - char *buffer; - int nbytes = TUNNEL_SEND_BUF_SIZE; + char *buffer = NULL; + struct pollfd fds[2]; + int timeout = -1; + virErrorPtr err = NULL; + + VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d", + data->st, data->sock); if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) { virReportOOMError(); - virStreamAbort(data->st); - goto error; + goto abrt; } + fds[0].fd = data->sock; + fds[1].fd = data->wakeupRecvFD; + for (;;) { - nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); - if (nbytes < 0) { - virReportSystemError(errno, "%s", - _("tunnelled migration failed to read from qemu")); - virStreamAbort(data->st); - VIR_FREE(buffer); - goto error; - } - else if (nbytes == 0) - /* EOF; get out of here */ - break; + int ret; - if (virStreamSend(data->st, buffer, nbytes) < 0) { - VIR_FREE(buffer); - goto error; + fds[0].events = fds[1].events = POLLIN; + fds[0].revents = fds[1].revents = 0; + + ret = poll(fds, ARRAY_CARDINALITY(fds), timeout); + + if (ret < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + virReportSystemError(errno, "%s", + _("poll failed in migration tunnel")); + goto abrt; + } + + if (ret == 0) { + /* We were asked to gracefully stop but reading would block. This + * can only happen if qemu told us migration finished but didn't + * close the migration fd. We handle this in the same way as EOF. + */ + VIR_DEBUG("QEMU forgot to close migration fd"); + break; + } + + if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { + char stop = 0; + + if (saferead(data->wakeupRecvFD, &stop, 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to read from wakeup fd")); + goto abrt; + } + + VIR_DEBUG("Migration tunnel was asked to %s", + stop ? "abort" : "finish"); + if (stop) { + goto abrt; + } else { + timeout = 0; + } + } + + if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { + int nbytes; + + nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); + if (nbytes > 0) { + if (virStreamSend(data->st, buffer, nbytes) < 0) + goto error; + } else if (nbytes < 0) { + virReportSystemError(errno, "%s", + _("tunnelled migration failed to read from qemu")); + goto abrt; + } else { + /* EOF; get out of here */ + break; + } } } - VIR_FREE(buffer); - if (virStreamFinish(data->st) < 0) goto error; + VIR_FREE(buffer); + return; +abrt: + err = virSaveLastError(); + if (err && err->code == VIR_ERR_OK) { + virFreeError(err); + err = NULL; + } + virStreamAbort(data->st); + if (err) { + virSetError(err); + virFreeError(err); + } + error: virCopyLastError(&data->err); virResetLastError(); + VIR_FREE(buffer); } @@ -1634,37 +1699,63 @@ static qemuMigrationIOThreadPtr qemuMigrationStartTunnel(virStreamPtr st, int sock) { - qemuMigrationIOThreadPtr io; + qemuMigrationIOThreadPtr io = NULL; + int wakeupFD[2] = { -1, -1 }; - if (VIR_ALLOC(io) < 0) { - virReportOOMError(); - return NULL; + if (pipe2(wakeupFD, O_CLOEXEC) < 0) { + virReportSystemError(errno, "%s", + _("Unable to make pipe")); + goto error; } + if (VIR_ALLOC(io) < 0) + goto no_memory; + io->st = st; io->sock = sock; + io->wakeupRecvFD = wakeupFD[0]; + io->wakeupSendFD = wakeupFD[1]; if (virThreadCreate(&io->thread, true, qemuMigrationIOFunc, io) < 0) { virReportSystemError(errno, "%s", _("Unable to create migration thread")); - VIR_FREE(io); - return NULL; + goto error; } return io; + +no_memory: + virReportOOMError(); +error: + VIR_FORCE_CLOSE(wakeupFD[0]); + VIR_FORCE_CLOSE(wakeupFD[1]); + VIR_FREE(io); + return NULL; } static int -qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io) +qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) { int rv = -1; + char stop = error ? 1 : 0; + + /* make sure the thread finishes its job and is joinable */ + if (safewrite(io->wakeupSendFD, &stop, 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to wakeup migration tunnel")); + goto cleanup; + } + virThreadJoin(&io->thread); /* Forward error from the IO thread, to this thread */ if (io->err.code != VIR_ERR_OK) { - virSetError(&io->err); + if (error) + rv = 0; + else + virSetError(&io->err); virResetError(&io->err); goto cleanup; } @@ -1672,6 +1763,8 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io) rv = 0; cleanup: + VIR_FORCE_CLOSE(io->wakeupSendFD); + VIR_FORCE_CLOSE(io->wakeupRecvFD); VIR_FREE(io); return rv; } @@ -1879,10 +1972,9 @@ cleanup: orig_err = virSaveLastError(); if (spec->fwdType != MIGRATION_FWD_DIRECT) { - /* Close now to ensure the IO thread quits & is joinable */ - VIR_FORCE_CLOSE(fd); - if (iothread && qemuMigrationStopTunnel(iothread) < 0) + if (iothread && qemuMigrationStopTunnel(iothread, ret < 0) < 0) ret = -1; + VIR_FORCE_CLOSE(fd); } if (ret == 0 &&