mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-01-07 21:45:22 +00:00
qemu: Avoid bogus error at the end of tunnelled migration
Once qemu monitor reports migration has completed, we just closed our end of the pipe and let migration tunnel die. This generated bogus error in case we did so before the thread saw EOF on the pipe and migration was aborted even though it was in fact successful. With this patch we first wake up the tunnel thread and once it has read all data from the pipe and finished the stream we close the filedescriptor. A small additional bonus of this patch is that real errors reported inside qemuMigrationIOFunc are not overwritten by virStreamAbort any more.
This commit is contained in:
parent
e173e81ed9
commit
b109b1140c
@ -25,6 +25,7 @@
|
||||
#include <gnutls/gnutls.h>
|
||||
#include <gnutls/x509.h>
|
||||
#include <fcntl.h>
|
||||
#include <poll.h>
|
||||
|
||||
#include "qemu_migration.h"
|
||||
#include "qemu_monitor.h"
|
||||
@ -1584,49 +1585,113 @@ struct _qemuMigrationIOThread {
|
||||
virStreamPtr st;
|
||||
int sock;
|
||||
virError err;
|
||||
int wakeupRecvFD;
|
||||
int wakeupSendFD;
|
||||
};
|
||||
|
||||
static void qemuMigrationIOFunc(void *arg)
|
||||
{
|
||||
qemuMigrationIOThreadPtr data = arg;
|
||||
char *buffer;
|
||||
int nbytes = TUNNEL_SEND_BUF_SIZE;
|
||||
char *buffer = NULL;
|
||||
struct pollfd fds[2];
|
||||
int timeout = -1;
|
||||
virErrorPtr err = NULL;
|
||||
|
||||
VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d",
|
||||
data->st, data->sock);
|
||||
|
||||
if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) {
|
||||
virReportOOMError();
|
||||
virStreamAbort(data->st);
|
||||
goto error;
|
||||
goto abrt;
|
||||
}
|
||||
|
||||
fds[0].fd = data->sock;
|
||||
fds[1].fd = data->wakeupRecvFD;
|
||||
|
||||
for (;;) {
|
||||
nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
|
||||
if (nbytes < 0) {
|
||||
virReportSystemError(errno, "%s",
|
||||
_("tunnelled migration failed to read from qemu"));
|
||||
virStreamAbort(data->st);
|
||||
VIR_FREE(buffer);
|
||||
goto error;
|
||||
}
|
||||
else if (nbytes == 0)
|
||||
/* EOF; get out of here */
|
||||
break;
|
||||
int ret;
|
||||
|
||||
if (virStreamSend(data->st, buffer, nbytes) < 0) {
|
||||
VIR_FREE(buffer);
|
||||
goto error;
|
||||
fds[0].events = fds[1].events = POLLIN;
|
||||
fds[0].revents = fds[1].revents = 0;
|
||||
|
||||
ret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
|
||||
|
||||
if (ret < 0) {
|
||||
if (errno == EAGAIN || errno == EINTR)
|
||||
continue;
|
||||
virReportSystemError(errno, "%s",
|
||||
_("poll failed in migration tunnel"));
|
||||
goto abrt;
|
||||
}
|
||||
|
||||
if (ret == 0) {
|
||||
/* We were asked to gracefully stop but reading would block. This
|
||||
* can only happen if qemu told us migration finished but didn't
|
||||
* close the migration fd. We handle this in the same way as EOF.
|
||||
*/
|
||||
VIR_DEBUG("QEMU forgot to close migration fd");
|
||||
break;
|
||||
}
|
||||
|
||||
if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) {
|
||||
char stop = 0;
|
||||
|
||||
if (saferead(data->wakeupRecvFD, &stop, 1) != 1) {
|
||||
virReportSystemError(errno, "%s",
|
||||
_("failed to read from wakeup fd"));
|
||||
goto abrt;
|
||||
}
|
||||
|
||||
VIR_DEBUG("Migration tunnel was asked to %s",
|
||||
stop ? "abort" : "finish");
|
||||
if (stop) {
|
||||
goto abrt;
|
||||
} else {
|
||||
timeout = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) {
|
||||
int nbytes;
|
||||
|
||||
nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
|
||||
if (nbytes > 0) {
|
||||
if (virStreamSend(data->st, buffer, nbytes) < 0)
|
||||
goto error;
|
||||
} else if (nbytes < 0) {
|
||||
virReportSystemError(errno, "%s",
|
||||
_("tunnelled migration failed to read from qemu"));
|
||||
goto abrt;
|
||||
} else {
|
||||
/* EOF; get out of here */
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VIR_FREE(buffer);
|
||||
|
||||
if (virStreamFinish(data->st) < 0)
|
||||
goto error;
|
||||
|
||||
VIR_FREE(buffer);
|
||||
|
||||
return;
|
||||
|
||||
abrt:
|
||||
err = virSaveLastError();
|
||||
if (err && err->code == VIR_ERR_OK) {
|
||||
virFreeError(err);
|
||||
err = NULL;
|
||||
}
|
||||
virStreamAbort(data->st);
|
||||
if (err) {
|
||||
virSetError(err);
|
||||
virFreeError(err);
|
||||
}
|
||||
|
||||
error:
|
||||
virCopyLastError(&data->err);
|
||||
virResetLastError();
|
||||
VIR_FREE(buffer);
|
||||
}
|
||||
|
||||
|
||||
@ -1634,37 +1699,63 @@ static qemuMigrationIOThreadPtr
|
||||
qemuMigrationStartTunnel(virStreamPtr st,
|
||||
int sock)
|
||||
{
|
||||
qemuMigrationIOThreadPtr io;
|
||||
qemuMigrationIOThreadPtr io = NULL;
|
||||
int wakeupFD[2] = { -1, -1 };
|
||||
|
||||
if (VIR_ALLOC(io) < 0) {
|
||||
virReportOOMError();
|
||||
return NULL;
|
||||
if (pipe2(wakeupFD, O_CLOEXEC) < 0) {
|
||||
virReportSystemError(errno, "%s",
|
||||
_("Unable to make pipe"));
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (VIR_ALLOC(io) < 0)
|
||||
goto no_memory;
|
||||
|
||||
io->st = st;
|
||||
io->sock = sock;
|
||||
io->wakeupRecvFD = wakeupFD[0];
|
||||
io->wakeupSendFD = wakeupFD[1];
|
||||
|
||||
if (virThreadCreate(&io->thread, true,
|
||||
qemuMigrationIOFunc,
|
||||
io) < 0) {
|
||||
virReportSystemError(errno, "%s",
|
||||
_("Unable to create migration thread"));
|
||||
VIR_FREE(io);
|
||||
return NULL;
|
||||
goto error;
|
||||
}
|
||||
|
||||
return io;
|
||||
|
||||
no_memory:
|
||||
virReportOOMError();
|
||||
error:
|
||||
VIR_FORCE_CLOSE(wakeupFD[0]);
|
||||
VIR_FORCE_CLOSE(wakeupFD[1]);
|
||||
VIR_FREE(io);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int
|
||||
qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io)
|
||||
qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error)
|
||||
{
|
||||
int rv = -1;
|
||||
char stop = error ? 1 : 0;
|
||||
|
||||
/* make sure the thread finishes its job and is joinable */
|
||||
if (safewrite(io->wakeupSendFD, &stop, 1) != 1) {
|
||||
virReportSystemError(errno, "%s",
|
||||
_("failed to wakeup migration tunnel"));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
virThreadJoin(&io->thread);
|
||||
|
||||
/* Forward error from the IO thread, to this thread */
|
||||
if (io->err.code != VIR_ERR_OK) {
|
||||
virSetError(&io->err);
|
||||
if (error)
|
||||
rv = 0;
|
||||
else
|
||||
virSetError(&io->err);
|
||||
virResetError(&io->err);
|
||||
goto cleanup;
|
||||
}
|
||||
@ -1672,6 +1763,8 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io)
|
||||
rv = 0;
|
||||
|
||||
cleanup:
|
||||
VIR_FORCE_CLOSE(io->wakeupSendFD);
|
||||
VIR_FORCE_CLOSE(io->wakeupRecvFD);
|
||||
VIR_FREE(io);
|
||||
return rv;
|
||||
}
|
||||
@ -1879,10 +1972,9 @@ cleanup:
|
||||
orig_err = virSaveLastError();
|
||||
|
||||
if (spec->fwdType != MIGRATION_FWD_DIRECT) {
|
||||
/* Close now to ensure the IO thread quits & is joinable */
|
||||
VIR_FORCE_CLOSE(fd);
|
||||
if (iothread && qemuMigrationStopTunnel(iothread) < 0)
|
||||
if (iothread && qemuMigrationStopTunnel(iothread, ret < 0) < 0)
|
||||
ret = -1;
|
||||
VIR_FORCE_CLOSE(fd);
|
||||
}
|
||||
|
||||
if (ret == 0 &&
|
||||
|
Loading…
Reference in New Issue
Block a user