Fully asynchronous monitor I/O processing

Change the QEMU monitor file handle watch to poll for both
read & write events, as well as EOF. All I/O to/from the
QEMU monitor FD is now done in the event callback thread.

When the QEMU driver needs to send a command, it puts the
data to be sent into a qemuMonitorMessagePtr object instance,
queues it for dispatch, and then goes to sleep on a condition
variable. The event thread sends all the data, and then waits
for the reply to arrive, putting the response / error data
back into the qemuMonitorMessagePtr and notifying the condition
variable.

There is a temporary hack in the disk passphrase callback to
avoid acquiring the domain lock.  This avoids a deadlock in
the command processing, since the domain lock is still held
when running monitor commands. The next commit will remove
the locking when running commands & thus allow re-introduction
of locking the disk passphrase callback

* src/qemu/qemu_driver.c: Temporarily don't acquire lock in
  disk passphrase callback. To be reverted in next commit
* src/qemu/qemu_monitor.c, src/qemu/qemu_monitor.h: Remove
  raw I/O functions, and a generic qemuMonitorSend() for
  invoking a command
* src/qemu/qemu_monitor_text.c, src/qemu/qemu_monitor_text.h:
  Remove all low level I/O, and use the new qemuMonitorSend()
  API. Provide a qemuMonitorTextIOProcess() method for detecting
  command/reply/prompt boundaries in the monitor data stream
This commit is contained in:
Daniel P. Berrange 2009-10-14 18:40:51 +01:00
parent 6c70802374
commit 1dc10a7b28
7 changed files with 707 additions and 479 deletions

View File

@ -1,5 +1,6 @@
^src/libvirt\.c$
^src/qemu/qemu_driver\.c$
^src/qemu/qemu_monitor\.c$
^src/util/util\.c$
^src/xen/xend_internal\.c$
^daemon/libvirtd.c$

View File

@ -5076,7 +5076,8 @@ static virDomainObjPtr virDomainLoadStatus(virConnectPtr conn,
return obj;
error:
virDomainObjUnref(obj);
if (obj)
virDomainObjUnref(obj);
VIR_FREE(statusFile);
return NULL;
}

View File

@ -347,9 +347,8 @@ qemuHandleMonitorEOF(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
struct qemud_driver *driver = qemu_driver;
virDomainEventPtr event = NULL;
qemuDriverLock(driver);
VIR_DEBUG("Received EOF on %p '%s'", vm, vm->def->name);
virDomainObjLock(vm);
qemuDriverUnlock(driver);
event = virDomainEventNewFromObj(vm,
VIR_DOMAIN_EVENT_STOPPED,
@ -413,11 +412,24 @@ findVolumeQcowPassphrase(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
char *passphrase;
unsigned char *data;
size_t size;
int ret = -1;
/* XXX
* We ought to be taking the lock here, but that would
* require that it be released when monitor commands are
* run. Currently we deadlock if we try to take it again
*
* Until this is resolved, don't take the lock and rely
* on fact that the thread invoking this callback is
* running lock-step with the thread holding the lock
*
* virDomainObjLock(vm);
*/
if (!conn) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_NO_SUPPORT,
"%s", _("cannot find secrets without a connection"));
return -1;
goto cleanup;
}
if (conn->secretDriver == NULL ||
@ -425,7 +437,7 @@ findVolumeQcowPassphrase(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
conn->secretDriver->getValue == NULL) {
qemudReportError(conn, NULL, NULL, VIR_ERR_NO_SUPPORT, "%s",
_("secret storage not supported"));
return -1;
goto cleanup;
}
enc = findDomainDiskEncryption(conn, vm, path);
@ -438,18 +450,18 @@ findVolumeQcowPassphrase(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
VIR_STORAGE_ENCRYPTION_SECRET_TYPE_PASSPHRASE) {
qemudReportError(conn, NULL, NULL, VIR_ERR_INVALID_DOMAIN,
_("invalid <encryption> for volume %s"), path);
return -1;
goto cleanup;
}
secret = conn->secretDriver->lookupByUUID(conn,
enc->secrets[0]->uuid);
if (secret == NULL)
return -1;
goto cleanup;
data = conn->secretDriver->getValue(secret, &size,
VIR_SECRET_GET_VALUE_INTERNAL_CALL);
virUnrefSecret(secret);
if (data == NULL)
return -1;
goto cleanup;
if (memchr(data, '\0', size) != NULL) {
memset(data, 0, size);
@ -457,14 +469,14 @@ findVolumeQcowPassphrase(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
qemudReportError(conn, NULL, NULL, VIR_ERR_INVALID_SECRET,
_("format='qcow' passphrase for %s must not contain a "
"'\\0'"), path);
return -1;
goto cleanup;
}
if (VIR_ALLOC_N(passphrase, size + 1) < 0) {
memset(data, 0, size);
VIR_FREE(data);
virReportOOMError(conn);
return -1;
goto cleanup;
}
memcpy(passphrase, data, size);
passphrase[size] = '\0';
@ -475,15 +487,24 @@ findVolumeQcowPassphrase(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
*secretRet = passphrase;
*secretLen = size;
return 0;
ret = 0;
cleanup:
/*
* XXX
* See earlier comment about lock
*
* virDomainObjUnlock(vm);
*/
return ret;
}
static int
qemuConnectMonitor(virDomainObjPtr vm, int reconnect)
qemuConnectMonitor(virDomainObjPtr vm)
{
qemuDomainObjPrivatePtr priv = vm->privateData;
if ((priv->mon = qemuMonitorOpen(vm, reconnect, qemuHandleMonitorEOF)) == NULL) {
if ((priv->mon = qemuMonitorOpen(vm, qemuHandleMonitorEOF)) == NULL) {
VIR_ERROR(_("Failed to connect monitor for %s\n"), vm->def->name);
return -1;
}
@ -506,7 +527,10 @@ qemuReconnectDomain(void *payload, const char *name ATTRIBUTE_UNUSED, void *opaq
virDomainObjLock(obj);
if (qemuConnectMonitor(obj, 1) < 0)
VIR_DEBUG("Reconnect monitor to %p '%s'", obj, obj->def->name);
/* XXX check PID liveliness & EXE path */
if (qemuConnectMonitor(obj) < 0)
goto error;
if (qemuUpdateActivePciHostdevs(driver, obj->def) < 0) {
@ -530,7 +554,10 @@ error:
* to remove danger of it ending up running twice if
* user tries to start it again later */
qemudShutdownVMDaemon(NULL, driver, obj);
virDomainObjUnlock(obj);
if (!obj->persistent)
virDomainRemoveInactive(&driver->domains, obj);
else
virDomainObjUnlock(obj);
}
/**
@ -1128,7 +1155,8 @@ qemudWaitForMonitor(virConnectPtr conn,
return -1;
}
if (qemuConnectMonitor(vm, 0) < 0)
VIR_DEBUG("Connect monitor to %p '%s'", vm, vm->def->name);
if (qemuConnectMonitor(vm) < 0)
return -1;
return 0;
@ -2178,7 +2206,7 @@ static void qemudShutdownVMDaemon(virConnectPtr conn,
if (!virDomainObjIsActive(vm))
return;
VIR_DEBUG(_("Shutting down VM '%s'\n"), vm->def->name);
VIR_DEBUG("Shutting down VM '%s'", vm->def->name);
if (driver->macFilter) {
int i;
@ -6121,6 +6149,10 @@ qemudDomainMigratePrepareTunnel(virConnectPtr dconn,
qemust = qemuStreamMigOpen(st, unixfile);
if (qemust == NULL) {
qemudShutdownVMDaemon(NULL, driver, vm);
if (!vm->persistent) {
virDomainRemoveInactive(&driver->domains, vm);
vm = NULL;
}
virReportSystemError(dconn, errno,
_("cannot open unix socket '%s' for tunnelled migration"),
unixfile);

View File

@ -40,6 +40,9 @@
struct _qemuMonitor {
virMutex lock;
virCond notify;
virDomainObjPtr dom;
int fd;
int watch;
@ -49,6 +52,25 @@ struct _qemuMonitor {
qemuMonitorEOFNotify eofCB;
qemuMonitorDiskSecretLookup secretCB;
/* If there's a command being processed this will be
* non-NULL */
qemuMonitorMessagePtr msg;
/* Buffer incoming data ready for Text/QMP monitor
* code to process & find message boundaries */
size_t bufferOffset;
size_t bufferLength;
char *buffer;
/* If anything went wrong, this will be fed back
* the next monitor msg */
int lastErrno;
/* If the monitor callback is currently active */
unsigned eofcb: 1;
/* If the monitor callback should free the closed monitor */
unsigned closed: 1;
};
void qemuMonitorLock(qemuMonitorPtr mon)
@ -61,134 +83,25 @@ void qemuMonitorUnlock(qemuMonitorPtr mon)
virMutexUnlock(&mon->lock);
}
/* Return -1 for error, 1 to continue reading and 0 for success */
typedef int qemuMonitorHandleOutput(virDomainObjPtr vm,
const char *output);
/*
* Returns -1 for error, 0 on end-of-file, 1 for success
*/
static int
qemuMonitorReadOutput(virDomainObjPtr vm,
int fd,
char *buf,
size_t buflen,
qemuMonitorHandleOutput func,
const char *what,
int timeout)
static void qemuMonitorFree(qemuMonitorPtr mon, int lockDomain)
{
size_t got = 0;
buf[0] = '\0';
timeout *= 1000; /* poll wants milli seconds */
/* Consume & discard the initial greeting */
while (got < (buflen-1)) {
ssize_t ret;
ret = read(fd, buf+got, buflen-got-1);
if (ret < 0) {
struct pollfd pfd = { .fd = fd, .events = POLLIN };
if (errno == EINTR)
continue;
if (errno != EAGAIN) {
virReportSystemError(NULL, errno,
_("Failure while reading %s startup output"),
what);
return -1;
}
ret = poll(&pfd, 1, timeout);
if (ret == 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
_("Timed out while reading %s startup output"), what);
return -1;
} else if (ret == -1) {
if (errno != EINTR) {
virReportSystemError(NULL, errno,
_("Failure while reading %s startup output"),
what);
return -1;
}
} else {
/* Make sure we continue loop & read any further data
available before dealing with EOF */
if (pfd.revents & (POLLIN | POLLHUP))
continue;
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
_("Failure while reading %s startup output"), what);
return -1;
}
} else if (ret == 0) {
return 0;
} else {
got += ret;
buf[got] = '\0';
ret = func(vm, buf);
if (ret == -1)
return -1;
if (ret == 1)
continue;
return 1;
}
VIR_DEBUG("mon=%p, lockDomain=%d", mon, lockDomain);
if (mon->vm) {
if (lockDomain)
virDomainObjLock(mon->vm);
if (!virDomainObjUnref(mon->vm) && lockDomain)
virDomainObjUnlock(mon->vm);
}
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
_("Out of space while reading %s startup output"), what);
return -1;
if (virCondDestroy(&mon->notify) < 0)
{}
virMutexDestroy(&mon->lock);
VIR_FREE(mon);
}
static int
qemuMonitorCheckPrompt(virDomainObjPtr vm ATTRIBUTE_UNUSED,
const char *output)
{
if (strstr(output, "(qemu) ") == NULL)
return 1; /* keep reading */
return 0;
}
static int
qemuMonitorOpenCommon(virDomainObjPtr vm,
int monfd,
int reconnect)
{
char buf[1024];
int ret;
if (virSetCloseExec(monfd) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
"%s", _("Unable to set monitor close-on-exec flag"));
return -1;
}
if (virSetNonBlock(monfd) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
"%s", _("Unable to put monitor into non-blocking mode"));
return -1;
}
if (!reconnect) {
if (qemuMonitorReadOutput(vm, monfd,
buf, sizeof(buf),
qemuMonitorCheckPrompt,
"monitor", 10) <= 0)
ret = -1;
else
ret = 0;
} else {
ret = 0;
}
return ret;
}
static int
qemuMonitorOpenUnix(virDomainObjPtr vm,
const char *monitor,
int reconnect)
qemuMonitorOpenUnix(const char *monitor)
{
struct sockaddr_un addr;
int monfd;
@ -233,9 +146,6 @@ qemuMonitorOpenUnix(virDomainObjPtr vm,
goto error;
}
if (qemuMonitorOpenCommon(vm, monfd, reconnect) < 0)
goto error;
return monfd;
error:
@ -244,9 +154,7 @@ error:
}
static int
qemuMonitorOpenPty(virDomainObjPtr vm,
const char *monitor,
int reconnect)
qemuMonitorOpenPty(const char *monitor)
{
int monfd;
@ -256,138 +164,54 @@ qemuMonitorOpenPty(virDomainObjPtr vm,
return -1;
}
if (qemuMonitorOpenCommon(vm, monfd, reconnect) < 0)
goto error;
return monfd;
error:
close(monfd);
return -1;
}
static void
qemuMonitorIO(int watch, int fd, int events, void *opaque) {
qemuMonitorPtr mon = opaque;
int quit = 0, failed = 0;
static int
qemuMonitorIOProcess(qemuMonitorPtr mon)
{
int len;
qemuMonitorMessagePtr msg = NULL;
if (mon->fd != fd || mon->watch != watch) {
VIR_ERROR0(_("event from unexpected fd/watch"));
failed = 1;
/* See if there's a message & whether its ready for its reply
* ie whether its completed writing all its data */
if (mon->msg && mon->msg->txOffset == mon->msg->txLength)
msg = mon->msg;
VIR_DEBUG("Process %d", mon->bufferOffset);
len = qemuMonitorTextIOProcess(mon,
mon->buffer, mon->bufferOffset,
msg);
if (len < 0) {
mon->lastErrno = errno;
return -1;
}
if (len < mon->bufferOffset) {
memmove(mon->buffer, mon->buffer + len, mon->bufferOffset - len);
mon->bufferOffset -= len;
} else {
if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
quit = 1;
else {
VIR_ERROR(_("unhandled fd event %d for monitor fd %d"),
events, mon->fd);
failed = 1;
}
VIR_FREE(mon->buffer);
mon->bufferOffset = mon->bufferLength = 0;
}
mon->eofCB(mon, mon->vm, failed);
VIR_DEBUG("Process done %d used %d", mon->bufferOffset, len);
if (msg && msg->finished)
virCondBroadcast(&mon->notify);
return len;
}
qemuMonitorPtr
qemuMonitorOpen(virDomainObjPtr vm,
int reconnect,
qemuMonitorEOFNotify eofCB)
{
qemuMonitorPtr mon;
if (VIR_ALLOC(mon) < 0) {
virReportOOMError(NULL);
return NULL;
}
if (virMutexInit(&mon->lock) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot initialize monitor mutex"));
VIR_FREE(mon);
return NULL;
}
mon->fd = -1;
mon->vm = vm;
mon->eofCB = eofCB;
switch (vm->monitor_chr->type) {
case VIR_DOMAIN_CHR_TYPE_UNIX:
mon->hasSendFD = 1;
mon->fd = qemuMonitorOpenUnix(vm, vm->monitor_chr->data.nix.path,
reconnect);
break;
case VIR_DOMAIN_CHR_TYPE_PTY:
mon->fd = qemuMonitorOpenPty(vm, vm->monitor_chr->data.file.path,
reconnect);
break;
default:
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
_("unable to handle monitor type: %s"),
virDomainChrTypeToString(vm->monitor_chr->type));
goto cleanup;
}
if ((mon->watch = virEventAddHandle(mon->fd,
VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR,
qemuMonitorIO,
mon, NULL)) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR, "%s",
_("unable to register monitor events"));
goto cleanup;
}
return mon;
cleanup:
qemuMonitorClose(mon);
return NULL;
}
void qemuMonitorClose(qemuMonitorPtr mon)
{
if (!mon)
return;
if (mon->watch)
virEventRemoveHandle(mon->watch);
if (mon->fd != -1)
close(mon->fd);
virMutexDestroy(&mon->lock);
VIR_FREE(mon);
}
void qemuMonitorRegisterDiskSecretLookup(qemuMonitorPtr mon,
qemuMonitorDiskSecretLookup secretCB)
{
mon->secretCB = secretCB;
}
int qemuMonitorWrite(qemuMonitorPtr mon,
const char *data,
size_t len)
{
return safewrite(mon->fd, data, len);
}
int qemuMonitorWriteWithFD(qemuMonitorPtr mon,
const char *data,
size_t len,
int fd)
static int
qemuMonitorIOWriteWithFD(qemuMonitorPtr mon,
const char *data,
size_t len,
int fd)
{
struct msghdr msg;
struct iovec iov[1];
ssize_t ret;
int ret;
char control[CMSG_SPACE(sizeof(int))];
struct cmsghdr *cmsg;
@ -417,27 +241,336 @@ int qemuMonitorWriteWithFD(qemuMonitorPtr mon,
ret = sendmsg(mon->fd, &msg, 0);
} while (ret < 0 && errno == EINTR);
return ret == len ? 0 : -1;
return ret;
}
int qemuMonitorRead(qemuMonitorPtr mon,
char *data,
size_t len)
/* Called when the monitor is able to write data */
static int
qemuMonitorIOWrite(qemuMonitorPtr mon)
{
return read(mon->fd, data, len);
}
int done;
int qemuMonitorWaitForInput(qemuMonitorPtr mon)
{
struct pollfd fd = { mon->fd, POLLIN | POLLERR | POLLHUP, 0 };
/* If no active message, or fully transmitted, the no-op */
if (!mon->msg || mon->msg->txOffset == mon->msg->txLength)
return 0;
retry:
if (poll(&fd, 1, -1) < 0) {
if (errno == EINTR)
goto retry;
if (mon->msg->txFD == -1)
done = write(mon->fd,
mon->msg->txBuffer + mon->msg->txOffset,
mon->msg->txLength - mon->msg->txOffset);
else
done = qemuMonitorIOWriteWithFD(mon,
mon->msg->txBuffer + mon->msg->txOffset,
mon->msg->txLength - mon->msg->txOffset,
mon->msg->txFD);
if (done < 0) {
if (errno == EAGAIN)
return 0;
mon->lastErrno = errno;
return -1;
}
return 0;
mon->msg->txOffset += done;
return done;
}
/*
* Called when the monitor has incoming data to read
*
* Returns -1 on error, or number of bytes read
*/
static int
qemuMonitorIORead(qemuMonitorPtr mon)
{
size_t avail = mon->bufferLength - mon->bufferOffset;
int ret = 0;
if (avail < 1024) {
if (VIR_REALLOC_N(mon->buffer,
mon->bufferLength + 1024) < 0) {
errno = ENOMEM;
return -1;
}
mon->bufferLength += 1024;
avail += 1024;
}
/* Read as much as we can get into our buffer,
until we block on EAGAIN, or hit EOF */
while (avail > 1) {
int got;
got = read(mon->fd,
mon->buffer + mon->bufferOffset,
avail - 1);
if (got < 0) {
if (errno == EAGAIN)
break;
mon->lastErrno = errno;
ret = -1;
break;
}
if (got == 0)
break;
ret += got;
avail -= got;
mon->bufferOffset += got;
mon->buffer[mon->bufferOffset] = '\0';
}
VIR_DEBUG("Now read %d bytes of data", mon->bufferOffset);
return ret;
}
static void qemuMonitorUpdateWatch(qemuMonitorPtr mon)
{
int events =
VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR;
if (!mon->lastErrno) {
events |= VIR_EVENT_HANDLE_READABLE;
if (mon->msg && mon->msg->txOffset < mon->msg->txLength)
events |= VIR_EVENT_HANDLE_WRITABLE;
}
virEventUpdateHandle(mon->watch, events);
}
static void
qemuMonitorIO(int watch, int fd, int events, void *opaque) {
qemuMonitorPtr mon = opaque;
int quit = 0, failed = 0;
qemuMonitorLock(mon);
VIR_DEBUG("Monitor %p I/O on watch %d fd %d events %d", mon, watch, fd, events);
if (mon->fd != fd || mon->watch != watch) {
VIR_ERROR("event from unexpected fd %d!=%d / watch %d!=%d", mon->fd, fd, mon->watch, watch);
failed = 1;
} else {
if (!mon->lastErrno &&
events & VIR_EVENT_HANDLE_WRITABLE) {
int done = qemuMonitorIOWrite(mon);
if (done < 0)
failed = 1;
events &= ~VIR_EVENT_HANDLE_WRITABLE;
}
if (!mon->lastErrno &&
events & VIR_EVENT_HANDLE_READABLE) {
int got = qemuMonitorIORead(mon);
if (got < 0)
failed = 1;
/* Ignore hangup/error events if we read some data, to
* give time for that data to be consumed */
if (got > 0) {
events = 0;
if (qemuMonitorIOProcess(mon) < 0)
failed = 1;
} else
events &= ~VIR_EVENT_HANDLE_READABLE;
}
/* If IO process resulted in an error & we have a message,
* then wakeup that waiter */
if (mon->lastErrno && mon->msg && !mon->msg->finished) {
mon->msg->lastErrno = mon->lastErrno;
mon->msg->finished = 1;
virCondSignal(&mon->notify);
}
qemuMonitorUpdateWatch(mon);
if (events & VIR_EVENT_HANDLE_HANGUP) {
/* If IO process resulted in EOF & we have a message,
* then wakeup that waiter */
if (mon->msg && !mon->msg->finished) {
mon->msg->finished = 1;
mon->msg->lastErrno = EIO;
virCondSignal(&mon->notify);
}
quit = 1;
} else if (events) {
VIR_ERROR(_("unhandled fd event %d for monitor fd %d"),
events, mon->fd);
failed = 1;
}
}
/* We have to unlock to avoid deadlock against command thread,
* but is this safe ? I think it is, because the callback
* will try to acquire the virDomainObjPtr mutex next */
if (failed || quit) {
/* Make sure anyone waiting wakes up now */
virCondSignal(&mon->notify);
mon->eofcb = 1;
qemuMonitorUnlock(mon);
VIR_DEBUG("Triggering EOF callback error? %d", failed);
mon->eofCB(mon, mon->vm, failed);
qemuMonitorLock(mon);
if (mon->closed) {
qemuMonitorUnlock(mon);
VIR_DEBUG("Delayed free of monitor %p", mon);
qemuMonitorFree(mon, 1);
} else {
qemuMonitorUnlock(mon);
}
} else {
qemuMonitorUnlock(mon);
}
}
qemuMonitorPtr
qemuMonitorOpen(virDomainObjPtr vm,
qemuMonitorEOFNotify eofCB)
{
qemuMonitorPtr mon;
if (VIR_ALLOC(mon) < 0) {
virReportOOMError(NULL);
return NULL;
}
if (virMutexInit(&mon->lock) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot initialize monitor mutex"));
VIR_FREE(mon);
return NULL;
}
if (virCondInit(&mon->notify) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot initialize monitor condition"));
virMutexDestroy(&mon->lock);
VIR_FREE(mon);
return NULL;
}
mon->fd = -1;
mon->vm = vm;
mon->eofCB = eofCB;
qemuMonitorLock(mon);
switch (vm->monitor_chr->type) {
case VIR_DOMAIN_CHR_TYPE_UNIX:
mon->hasSendFD = 1;
mon->fd = qemuMonitorOpenUnix(vm->monitor_chr->data.nix.path);
break;
case VIR_DOMAIN_CHR_TYPE_PTY:
mon->fd = qemuMonitorOpenPty(vm->monitor_chr->data.file.path);
break;
default:
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
_("unable to handle monitor type: %s"),
virDomainChrTypeToString(vm->monitor_chr->type));
goto cleanup;
}
if (virSetCloseExec(mon->fd) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
"%s", _("Unable to set monitor close-on-exec flag"));
goto cleanup;
}
if (virSetNonBlock(mon->fd) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
"%s", _("Unable to put monitor into non-blocking mode"));
goto cleanup;
}
if ((mon->watch = virEventAddHandle(mon->fd,
VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR |
VIR_EVENT_HANDLE_READABLE,
qemuMonitorIO,
mon, NULL)) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR, "%s",
_("unable to register monitor events"));
goto cleanup;
}
virDomainObjRef(vm);
VIR_DEBUG("New mon %p fd =%d watch=%d", mon, mon->fd, mon->watch);
qemuMonitorUnlock(mon);
return mon;
cleanup:
qemuMonitorUnlock(mon);
qemuMonitorClose(mon);
return NULL;
}
void qemuMonitorClose(qemuMonitorPtr mon)
{
if (!mon)
return;
qemuMonitorLock(mon);
if (!mon->closed) {
if (mon->watch)
virEventRemoveHandle(mon->watch);
if (mon->fd != -1)
close(mon->fd);
/* NB: don't reset fd / watch fields, since active
* callback may still want them */
mon->closed = 1;
}
if (mon->eofcb) {
VIR_DEBUG("Mark monitor to be deleted %p", mon);
qemuMonitorUnlock(mon);
} else {
VIR_DEBUG("Delete monitor now %p", mon);
qemuMonitorFree(mon, 0);
}
}
void qemuMonitorRegisterDiskSecretLookup(qemuMonitorPtr mon,
qemuMonitorDiskSecretLookup secretCB)
{
mon->secretCB = secretCB;
}
int qemuMonitorSend(qemuMonitorPtr mon,
qemuMonitorMessagePtr msg)
{
int ret = -1;
if (mon->eofcb) {
msg->lastErrno = EIO;
qemuMonitorUnlock(mon);
return -1;
}
mon->msg = msg;
qemuMonitorUpdateWatch(mon);
while (!mon->msg->finished) {
if (virCondWait(&mon->notify, &mon->lock) < 0)
goto cleanup;
}
if (mon->lastErrno == 0)
ret = 0;
cleanup:
mon->msg = NULL;
qemuMonitorUpdateWatch(mon);
return ret;
}

View File

@ -32,6 +32,33 @@
typedef struct _qemuMonitor qemuMonitor;
typedef qemuMonitor *qemuMonitorPtr;
typedef struct _qemuMonitorMessage qemuMonitorMessage;
typedef qemuMonitorMessage *qemuMonitorMessagePtr;
typedef int (*qemuMonitorPasswordHandler)(qemuMonitorPtr mon,
qemuMonitorMessagePtr msg,
const char *data,
size_t len,
void *opaque);
struct _qemuMonitorMessage {
int txFD;
char *txBuffer;
int txOffset;
int txLength;
char *rxBuffer;
int rxLength;
int finished;
int lastErrno;
qemuMonitorPasswordHandler passwordHandler;
void *passwordOpaque;
};
typedef void (*qemuMonitorEOFNotify)(qemuMonitorPtr mon,
virDomainObjPtr vm,
int withError);
@ -49,7 +76,6 @@ typedef int (*qemuMonitorDiskSecretLookup)(qemuMonitorPtr mon,
size_t *secretLen);
qemuMonitorPtr qemuMonitorOpen(virDomainObjPtr vm,
int reconnect,
qemuMonitorEOFNotify eofCB);
void qemuMonitorClose(qemuMonitorPtr mon);
@ -60,21 +86,11 @@ void qemuMonitorUnlock(qemuMonitorPtr mon);
void qemuMonitorRegisterDiskSecretLookup(qemuMonitorPtr mon,
qemuMonitorDiskSecretLookup secretCB);
int qemuMonitorWrite(qemuMonitorPtr mon,
const char *data,
size_t len);
int qemuMonitorWriteWithFD(qemuMonitorPtr mon,
const char *data,
size_t len,
int fd);
int qemuMonitorRead(qemuMonitorPtr mon,
char *data,
size_t len);
int qemuMonitorWaitForInput(qemuMonitorPtr mon);
/* This API is for use by the internal Text/JSON monitor impl code only */
int qemuMonitorSend(qemuMonitorPtr mon,
qemuMonitorMessagePtr msg);
/* XXX same comment about virConnectPtr as above */
int qemuMonitorGetDiskSecret(qemuMonitorPtr mon,
virConnectPtr conn,
const char *path,

View File

@ -133,183 +133,163 @@ static char *qemuMonitorEscapeShell(const char *in)
return qemuMonitorEscape(in, 1);
}
/* Throw away any data available on the monitor
* This is done before executing a command, in order
* to allow re-synchronization if something went badly
* wrong in the past. it also deals with problem of
* QEMU *sometimes* re-printing its initial greeting
* when we reconnect to the monitor after restarts.
/* When connecting to a monitor, QEMU will print a greeting like
*
* QEMU 0.11.0 monitor - type 'help' for more information
*
* Don't expect the version number bit to be stable :-)
*/
static void
qemuMonitorDiscardPendingData(qemuMonitorPtr mon) {
char buf[1024];
int ret = 0;
#define GREETING_PREFIX "QEMU "
#define GREETING_POSTFIX "type 'help' for more information\r\n(qemu) "
#define BASIC_PROMPT "(qemu) "
#define PASSWORD_PROMPT "Password:"
#define DISK_ENCRYPTION_PREFIX "("
#define DISK_ENCRYPTION_POSTFIX ") is encrypted."
#define LINE_ENDING "\r\n"
/* Monitor is non-blocking, so just loop till we
* get -1 or 0. Don't bother with detecting
* errors, since we'll deal with that better later */
do {
ret = qemuMonitorRead(mon, buf, sizeof (buf)-1);
} while (ret > 0);
}
static int
qemuMonitorSend(qemuMonitorPtr mon,
const char *cmd,
int scm_fd)
int qemuMonitorTextIOProcess(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
const char *data,
size_t len,
qemuMonitorMessagePtr msg)
{
char *full;
size_t len;
int ret = -1;
int used = 0;
if (virAsprintf(&full, "%s\r", cmd) < 0) {
virReportOOMError(NULL);
return -1;
/* Check for & discard greeting */
if (STRPREFIX(data, GREETING_PREFIX)) {
const char *offset = strstr(data, GREETING_POSTFIX);
/* We see the greeting prefix, but not postfix, so pretend we've
not consumed anything. We'll restart when more data arrives. */
if (!offset) {
VIR_DEBUG0("Partial greeting seen, getting out & waiting for more");
return 0;
}
used = offset - data + strlen(GREETING_POSTFIX);
VIR_DEBUG0("Discarded monitor greeting");
}
len = strlen(full);
/* Don't print raw data in debug because its full of control chars */
/*VIR_DEBUG("Process data %d byts of data [%s]", len - used, data + used);*/
VIR_DEBUG("Process data %d byts of data", len - used);
if (scm_fd == -1)
ret = qemuMonitorWrite(mon, full, len);
else
ret = qemuMonitorWriteWithFD(mon, full, len, scm_fd);
/* Look for a non-zero reply followed by prompt */
if (msg && !msg->finished) {
const char *end;
VIR_FREE(full);
return ret;
/* We might get a prompt for a password */
end = strstr(data + used, PASSWORD_PROMPT);
if (end) {
VIR_DEBUG("Woooo passwowrd [%s]", data + used);
if (msg->passwordHandler) {
size_t consumed;
/* Try and handle the prompt */
if (msg->passwordHandler(mon, msg,
data + used,
len - used,
msg->passwordOpaque) < 0)
return -1;
/* Skip over prompt now */
consumed = (end + strlen(PASSWORD_PROMPT))
- (data + used);
used += consumed;
} else {
errno = EACCES;
return -1;
}
}
/* We use the arrival of BASIC_PROMPT to detect when we've got a
* complete reply available from a command */
end = strstr(data + used, BASIC_PROMPT);
if (end) {
/* QEMU echos the command back to us, full of control
* character junk that we don't want. Fortunately this
* is all terminated by LINE_ENDING, so we can easily
* skip over the control character junk */
const char *start = strstr(data + used, LINE_ENDING);
if (!start)
start = data + used;
else
start += strlen(LINE_ENDING);
int want = end - start;
/* Annoyingly some commands may not have any reply data
* at all upon success, but since we've detected the
* BASIC_PROMPT we can reasonably reliably cope */
if (want) {
if (VIR_REALLOC_N(msg->rxBuffer,
msg->rxLength + want + 1) < 0)
return -1;
memcpy(msg->rxBuffer + msg->rxLength, start, want);
msg->rxLength += want;
msg->rxBuffer[msg->rxLength] = '\0';
VIR_DEBUG("Finished %d byte reply [%s]", want, msg->rxBuffer);
} else {
VIR_DEBUG0("Finished 0 byte reply");
}
msg->finished = 1;
used += end - (data + used);
used += strlen(BASIC_PROMPT);
}
}
VIR_DEBUG("Total used %d", used);
return used;
}
static int
qemuMonitorCommandWithHandler(qemuMonitorPtr mon,
const char *cmd,
const char *extraPrompt,
qemuMonitorExtraPromptHandler extraHandler,
void *handlerData,
qemuMonitorPasswordHandler passwordHandler,
void *passwordOpaque,
int scm_fd,
char **reply) {
int size = 0;
char *buf = NULL;
qemuMonitorDiscardPendingData(mon);
VIR_DEBUG("cmd='%s' extraPrompt='%s'", cmd, NULLSTR(extraPrompt));
if (qemuMonitorSend(mon, cmd, scm_fd) < 0)
return -1;
int ret;
qemuMonitorMessage msg;
*reply = NULL;
for (;;) {
/* Read all the data QEMU has sent thus far */
for (;;) {
char data[1024];
int got = qemuMonitorRead(mon, data, sizeof(data));
memset(&msg, 0, sizeof msg);
if (got == 0)
goto error;
if (got < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN)
break;
goto error;
}
if (VIR_REALLOC_N(buf, size+got+1) < 0)
goto error;
memmove(buf+size, data, got);
buf[size+got] = '\0';
size += got;
}
/* Look for QEMU prompt to indicate completion */
if (buf) {
char *foundPrompt;
char *tmp;
if (extraPrompt &&
(foundPrompt = strstr(buf, extraPrompt)) != NULL) {
char *promptEnd;
DEBUG("prompt='%s' handler=%p", extraPrompt, extraHandler);
if (extraHandler(mon, buf, foundPrompt, handlerData) < 0)
return -1;
/* Discard output so far, necessary to detect whether
extraPrompt appears again. We don't need the output between
original command and this prompt anyway. */
promptEnd = foundPrompt + strlen(extraPrompt);
memmove(buf, promptEnd, strlen(promptEnd)+1);
size -= promptEnd - buf;
} else if ((tmp = strstr(buf, QEMU_CMD_PROMPT)) != NULL) {
char *commptr = NULL, *nlptr = NULL;
/* Preserve the newline */
tmp[1] = '\0';
/* The monitor doesn't dump clean output after we have written to
* it. Every character we write dumps a bunch of useless stuff,
* so the result looks like "cXcoXcomXcommXcommaXcommanXcommand"
* Try to throw away everything before the first full command
* occurence, and inbetween the command and the newline starting
* the response
*/
if ((commptr = strstr(buf, cmd))) {
memmove(buf, commptr, strlen(commptr)+1);
if ((nlptr = strchr(buf, '\n')))
memmove(buf+strlen(cmd), nlptr, strlen(nlptr)+1);
}
break;
}
}
/* Need to wait for more data */
if (qemuMonitorWaitForInput(mon) < 0)
goto error;
}
*reply = buf;
DEBUG("reply='%s'", buf);
return 0;
error:
VIR_FREE(buf);
return -1;
}
struct extraHandlerData
{
const char *reply;
bool first;
};
static int
qemuMonitorCommandSimpleExtraHandler(qemuMonitorPtr mon,
const char *buf ATTRIBUTE_UNUSED,
const char *prompt ATTRIBUTE_UNUSED,
void *data_)
{
struct extraHandlerData *data = data_;
if (!data->first)
return 0;
if (qemuMonitorSend(mon, data->reply, -1) < 0)
if (virAsprintf(&msg.txBuffer, "%s\r", cmd) < 0) {
virReportOOMError(NULL);
return -1;
data->first = false;
return 0;
}
}
msg.txLength = strlen(msg.txBuffer);
msg.txFD = scm_fd;
msg.passwordHandler = passwordHandler;
msg.passwordOpaque = passwordOpaque;
static int
qemuMonitorCommandExtra(qemuMonitorPtr mon,
const char *cmd,
const char *extra,
const char *extraPrompt,
int scm_fd,
char **reply) {
struct extraHandlerData data;
VIR_DEBUG("Send command '%s' for write with FD %d", cmd, scm_fd);
data.reply = extra;
data.first = true;
return qemuMonitorCommandWithHandler(mon, cmd, extraPrompt,
qemuMonitorCommandSimpleExtraHandler,
&data, scm_fd, reply);
ret = qemuMonitorSend(mon, &msg);
VIR_DEBUG("Receive command reply ret=%d errno=%d %d bytes '%s'",
ret, msg.lastErrno, msg.rxLength, msg.rxBuffer);
/* Just in case buffer had some passwords in */
memset(msg.txBuffer, 0, msg.txLength);
VIR_FREE(msg.txBuffer);
/* To make life safer for callers, already ensure there's at least an empty string */
if (msg.rxBuffer) {
*reply = msg.rxBuffer;
} else {
*reply = strdup("");
if (!*reply) {
virReportOOMError(NULL);
return -1;
}
}
if (ret < 0)
virReportSystemError(NULL, msg.lastErrno,
_("cannot send monitor command '%s'"), cmd);
return ret;
}
static int
@ -317,7 +297,7 @@ qemuMonitorCommandWithFd(qemuMonitorPtr mon,
const char *cmd,
int scm_fd,
char **reply) {
return qemuMonitorCommandExtra(mon, cmd, NULL, NULL, scm_fd, reply);
return qemuMonitorCommandWithHandler(mon, cmd, NULL, NULL, scm_fd, reply);
}
static int
@ -329,44 +309,74 @@ qemuMonitorCommand(qemuMonitorPtr mon,
static int
qemuMonitorSendVolumePassphrase(qemuMonitorPtr mon,
const char *buf,
const char *prompt,
void *data)
qemuMonitorSendDiskPassphrase(qemuMonitorPtr mon,
qemuMonitorMessagePtr msg,
const char *data,
size_t len ATTRIBUTE_UNUSED,
void *opaque)
{
virConnectPtr conn = data;
char *passphrase = NULL, *path;
const char *prompt_path;
size_t path_len, passphrase_len = 0;
virConnectPtr conn = opaque;
char *path;
char *passphrase = NULL;
size_t passphrase_len = 0;
int res;
const char *pathStart;
const char *pathEnd;
/* The complete prompt looks like this:
ide0-hd0 (/path/to/volume) is encrypted.
Password:
"prompt" starts with ") is encrypted". Extract /path/to/volume. */
for (prompt_path = prompt; prompt_path > buf && prompt_path[-1] != '(';
prompt_path--)
;
if (prompt_path == buf)
/*
* For disk passwords:
*
* ide0-hd0 (/path/to/volume) is encrypted.
* Password:
*
*/
pathStart = strstr(data, DISK_ENCRYPTION_PREFIX);
pathEnd = strstr(data, DISK_ENCRYPTION_POSTFIX);
if (!pathStart || !pathEnd || pathStart >= pathEnd) {
errno = -EINVAL;
return -1;
path_len = prompt - prompt_path;
if (VIR_ALLOC_N(path, path_len + 1) < 0)
return -1;
memcpy(path, prompt_path, path_len);
path[path_len] = '\0';
}
res = qemuMonitorGetDiskSecret(mon, conn, path,
&passphrase, &passphrase_len);
/* Extra the path */
pathStart += strlen(DISK_ENCRYPTION_PREFIX);
path = strndup(pathStart, pathEnd - pathStart);
if (!path) {
errno = ENOMEM;
return -1;
}
/* Fetch the disk password if possible */
res = qemuMonitorGetDiskSecret(mon,
conn,
path,
&passphrase,
&passphrase_len);
VIR_FREE(path);
if (res < 0)
return -1;
res = qemuMonitorSend(mon, passphrase, -1);
/* Enlarge transmit buffer to allow for the extra data
* to be sent back */
if (VIR_REALLOC_N(msg->txBuffer,
msg->txLength + passphrase_len + 1 + 1) < 0) {
memset(passphrase, 0, passphrase_len);
VIR_FREE(passphrase);
errno = ENOMEM;
return -1;
}
/* Queue the password for sending */
memcpy(msg->txBuffer + msg->txLength,
passphrase, passphrase_len);
msg->txLength += passphrase_len;
msg->txBuffer[msg->txLength] = '\r';
msg->txLength++;
msg->txBuffer[msg->txLength] = '\0';
memset(passphrase, 0, passphrase_len);
VIR_FREE(passphrase);
return res;
return 0;
}
int
@ -374,8 +384,9 @@ qemuMonitorTextStartCPUs(qemuMonitorPtr mon,
virConnectPtr conn) {
char *reply;
if (qemuMonitorCommandWithHandler(mon, "cont", ") is encrypted.",
qemuMonitorSendVolumePassphrase, conn,
if (qemuMonitorCommandWithHandler(mon, "cont",
qemuMonitorSendDiskPassphrase,
conn,
-1, &reply) < 0)
return -1;
@ -639,15 +650,44 @@ int qemuMonitorTextGetBlockStatsInfo(qemuMonitorPtr mon,
}
static int
qemuMonitorSendVNCPassphrase(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
qemuMonitorMessagePtr msg,
const char *data ATTRIBUTE_UNUSED,
size_t len ATTRIBUTE_UNUSED,
void *opaque)
{
char *passphrase = opaque;
size_t passphrase_len = strlen(passphrase);
/* Enlarge transmit buffer to allow for the extra data
* to be sent back */
if (VIR_REALLOC_N(msg->txBuffer,
msg->txLength + passphrase_len + 1 + 1) < 0) {
errno = ENOMEM;
return -1;
}
/* Queue the password for sending */
memcpy(msg->txBuffer + msg->txLength,
passphrase, passphrase_len);
msg->txLength += passphrase_len;
msg->txBuffer[msg->txLength] = '\r';
msg->txLength++;
msg->txBuffer[msg->txLength] = '\0';
return 0;
}
int qemuMonitorTextSetVNCPassword(qemuMonitorPtr mon,
const char *password)
{
char *info = NULL;
if (qemuMonitorCommandExtra(mon, "change vnc password",
password,
QEMU_PASSWD_PROMPT,
-1, &info) < 0) {
if (qemuMonitorCommandWithHandler(mon, "change vnc password",
qemuMonitorSendVNCPassphrase,
(char *)password,
-1, &info) < 0) {
qemudReportError(NULL, NULL, NULL, VIR_ERR_INTERNAL_ERROR,
"%s", _("setting VNC password failed"));
return -1;

View File

@ -29,6 +29,11 @@
#include "qemu_monitor.h"
int qemuMonitorTextIOProcess(qemuMonitorPtr mon,
const char *data,
size_t len,
qemuMonitorMessagePtr msg);
int qemuMonitorTextStartCPUs(qemuMonitorPtr mon,
virConnectPtr conn);
int qemuMonitorTextStopCPUs(qemuMonitorPtr mon);