Run tunnelled migration IO in separate thread

By running the doTunnelSendAll code in a separate thread, the
main thread can do qemuMigrationWaitForCompletion as with
normal migration. This in turn ensures that job signals work
correctly and that progress monitoring can be done

* src/qemu/qemu_migration.c: Run tunnelled migration in
  separate thread
This commit is contained in:
Daniel P. Berrange 2011-05-09 16:52:42 +01:00
parent 5a6ca96a01
commit 1d916a60a7

View File

@ -1287,44 +1287,103 @@ cleanup:
#define TUNNEL_SEND_BUF_SIZE 65536
static int doTunnelSendAll(virStreamPtr st,
int sock)
typedef struct _qemuMigrationIOThread qemuMigrationIOThread;
typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr;
struct _qemuMigrationIOThread {
virThread thread;
virStreamPtr st;
int sock;
virError err;
};
static void qemuMigrationIOFunc(void *arg)
{
qemuMigrationIOThreadPtr data = arg;
char *buffer;
int nbytes = TUNNEL_SEND_BUF_SIZE;
if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) {
virReportOOMError();
virStreamAbort(st);
return -1;
virStreamAbort(data->st);
goto error;
}
for (;;) {
nbytes = saferead(sock, buffer, TUNNEL_SEND_BUF_SIZE);
nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
if (nbytes < 0) {
virReportSystemError(errno, "%s",
_("tunnelled migration failed to read from qemu"));
virStreamAbort(st);
virStreamAbort(data->st);
VIR_FREE(buffer);
return -1;
goto error;
}
else if (nbytes == 0)
/* EOF; get out of here */
break;
if (virStreamSend(st, buffer, nbytes) < 0) {
if (virStreamSend(data->st, buffer, nbytes) < 0) {
VIR_FREE(buffer);
return -1;
goto error;
}
}
VIR_FREE(buffer);
if (virStreamFinish(st) < 0)
/* virStreamFinish set the error for us */
return -1;
if (virStreamFinish(data->st) < 0)
goto error;
return 0;
return;
error:
virCopyLastError(&data->err);
virResetLastError();
}
static qemuMigrationIOThreadPtr
qemuMigrationStartTunnel(virStreamPtr st,
int sock)
{
qemuMigrationIOThreadPtr io;
if (VIR_ALLOC(io) < 0) {
virReportOOMError();
return NULL;
}
io->st = st;
io->sock = sock;
if (virThreadCreate(&io->thread, true,
qemuMigrationIOFunc,
io) < 0) {
virReportSystemError(errno, "%s",
_("Unable to create migration thread"));
VIR_FREE(io);
return NULL;
}
return io;
}
static int
qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io)
{
int rv = -1;
virThreadJoin(&io->thread);
/* Forward error from the IO thread, to this thread */
if (io->err.code != VIR_ERR_OK) {
virSetError(&io->err);
virResetError(&io->err);
goto cleanup;
}
rv = 0;
cleanup:
VIR_FREE(io);
return rv;
}
@ -1349,6 +1408,7 @@ static int doTunnelMigrate(struct qemud_driver *driver,
unsigned int background_flags = QEMU_MONITOR_MIGRATE_BACKGROUND;
int ret = -1;
qemuMigrationCookiePtr mig = NULL;
qemuMigrationIOThreadPtr iothread = NULL;
if (!qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_UNIX) &&
!qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_EXEC)) {
@ -1484,7 +1544,16 @@ static int doTunnelMigrate(struct qemud_driver *driver,
goto cancel;
}
ret = doTunnelSendAll(st, client_sock);
if (!(iothread = qemuMigrationStartTunnel(st, client_sock)))
goto cancel;
ret = qemuMigrationWaitForCompletion(driver, vm);
/* Close now to ensure the IO thread quits & is joinable in next method */
VIR_FORCE_CLOSE(client_sock);
if (qemuMigrationStopTunnel(iothread) < 0)
ret = -1;
if (ret == 0 &&
qemuMigrationBakeCookie(mig, driver, vm, cookieout, cookieoutlen, 0) < 0)