Move libvirtd event loop into background thread

The virStateInitialize() call for starting up stateful drivers
may require that the event loop is running already. This it is
neccessary to start the event loop before this call. At the
same time, network clients must not be processed until afte
virStateInitialize has completed.

The qemudListenUnix() and remoteListenTCP() methods must
therefore not register file handle watches, merely open the
network sockets & listen() on them. This means clients can
connected and are queued, pending completion of initialization

The qemudRunLoop() method is moved into a background thread
that is started early to allow access to the event loop during
driver initialization. The main process thread leader pretty
much does nothing once the daemon is running, merely waits
for the event loop thread to quit

* daemon/libvirtd.c, daemon/libvirtd.h: Move event loop into
  a background thread
* daemon/THREADING.txt: Rewrite docs to better reflect reality
This commit is contained in:
Daniel P. Berrange 2009-10-16 16:34:37 +01:00
parent 9ae8fa5839
commit 075bb5f1aa
3 changed files with 118 additions and 83 deletions

View File

@ -1,9 +1,29 @@
Threading: the RULES. Threading in the libvirtd daemon
==================== ================================
If you don't understand this, don't touch the code. Ask for To allow efficient processing of RPC requests, the libvirtd daemon
further advice / explanation on the mailing list first. makes use of threads.
- The process leader. This is the initial thread of control
when the daemon starts running. It is responsible for
initializing all the state, and starting the event loop.
Once that's all done, this thread does nothing except
wait for the event loop to quit, thus indicating an orderly
shutdown is required.
- The event loop. This thread runs the event loop, sitting
in poll() on all monitored file handles, and calculating
and dispatching any timers that may be registered. When
this thread quits, the entire daemon will shutdown.
- The workers. These 'n' threads all sit around waiting to
process incoming RPC requests. Since RPC requests may take
a long time to complete, with long idle periods, there will
be quite a few workers running.
The use of threads obviously requires locking to ensure safety when
accessing/changing data structures.
- the top level lock is on 'struct qemud_server'. This must be - the top level lock is on 'struct qemud_server'. This must be
held before acquiring any other lock held before acquiring any other lock
@ -16,47 +36,17 @@ further advice / explanation on the mailing list first.
this as a caller of virEvent APIs. this as a caller of virEvent APIs.
The server lock is only needed / used once the daemon has entered The server lock is used in conjunction with a condition variable
its main loop, which is the qemudRunLoop() . The initial thread to pass jobs from the event loop thread to the workers. The main
acquires the lock upon entering this method. event loop thread handles I/O from the client socket, and once a
complete RPC message has been read off the wire (and optionally
decrypted), it will be placed onto the 'dx' job queue for the
associated client object. The job condition will be signalled and
a worker will wakup and process it.
It immediatelty spawns 'n' worker threads, whose main loop is The worker thread must quickly drop its locks on the server and
the qemudWorker() method. The workers will immediately try to client to allow the main event loop thread to continue running
acquire the server lock, and thus block since its held by the with its other work. Critically important, is that now libvirt
initial thread. API call will ever be made with the server or client locks held.
When the initial thread enters the poll() call, it drops the -- End
server lock. The worker locks now each wakeup, acquire the
server lock and go into a condition wait on the 'job' condition
variable. The workers are now all 'primed' for incoming RPC
calls.
A file descriptor event now occurrs, causing the initial thread
to exit poll(). It invokes the registered callback associated
with the file descriptors on which the event occurrs. The callbacks
are required to immediately acquire the server lock.
If the callback is dealing with a client event, it will then
acquire the client lock, and drop the server lock.
The callback will now handle the I/O event, reading or writing
a RPC message. Once a complete RPC message has been read the
client is marked as being in state QEMUD_MODE_WAIT_DISPATCH,
and the 'job' condition variable is signaled. The callback
now drops the client lock and goes back into the poll() loop
waiting for more I/O events.
Meanwhile one of the worker threads wakes up from its condition
variable sleep, holding the server lock. It now searches for a
client in state QEMUD_MODE_WAIT_DISPATCH. If it doesn't find
one, it goes back to sleep. If it does find one, then it calls
into the remoteDispatchClientRequest() method de-serialize the
incoming message into an XDR object and invoke the helper method
for the associated RPC call.
While the helper method is executing, no locks are held on either
the client or server, but the ref count on the 'struct qemud_client'
object is incremented to ensure its not deleted. The helper can
now safely invoke the necessary libvirt API call.

View File

@ -384,7 +384,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED,
case SIGQUIT: case SIGQUIT:
case SIGTERM: case SIGTERM:
VIR_WARN(_("Shutting down on signal %d"), siginfo.si_signo); VIR_WARN(_("Shutting down on signal %d"), siginfo.si_signo);
server->shutdown = 1; server->quitEventThread = 1;
break; break;
default: default:
@ -393,7 +393,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED,
} }
if (ret != 0) if (ret != 0)
server->shutdown = 1; server->quitEventThread = 1;
virMutexUnlock(&server->lock); virMutexUnlock(&server->lock);
} }
@ -579,16 +579,6 @@ static int qemudListenUnix(struct qemud_server *server,
goto cleanup; goto cleanup;
} }
if ((sock->watch = virEventAddHandleImpl(sock->fd,
VIR_EVENT_HANDLE_READABLE |
VIR_EVENT_HANDLE_ERROR |
VIR_EVENT_HANDLE_HANGUP,
qemudDispatchServerEvent,
server, NULL)) < 0) {
VIR_ERROR0(_("Failed to add server event callback"));
goto cleanup;
}
sock->next = server->sockets; sock->next = server->sockets;
server->sockets = sock; server->sockets = sock;
server->nsockets++; server->nsockets++;
@ -713,17 +703,6 @@ remoteListenTCP (struct qemud_server *server,
virStrerror (errno, ebuf, sizeof ebuf)); virStrerror (errno, ebuf, sizeof ebuf));
goto cleanup; goto cleanup;
} }
if ((sock->watch = virEventAddHandleImpl(sock->fd,
VIR_EVENT_HANDLE_READABLE |
VIR_EVENT_HANDLE_ERROR |
VIR_EVENT_HANDLE_HANGUP,
qemudDispatchServerEvent,
server, NULL)) < 0) {
VIR_ERROR0(_("Failed to add server event callback"));
goto cleanup;
}
} }
return 0; return 0;
@ -1037,6 +1016,25 @@ static int qemudNetworkInit(struct qemud_server *server) {
return -1; return -1;
} }
static int qemudNetworkEnable(struct qemud_server *server) {
struct qemud_socket *sock;
sock = server->sockets;
while (sock) {
if ((sock->watch = virEventAddHandleImpl(sock->fd,
VIR_EVENT_HANDLE_READABLE |
VIR_EVENT_HANDLE_ERROR |
VIR_EVENT_HANDLE_HANGUP,
qemudDispatchServerEvent,
server, NULL)) < 0) {
VIR_ERROR0(_("Failed to add server event callback"));
return -1;
}
sock = sock->next;
}
return 0;
}
static gnutls_session_t static gnutls_session_t
remoteInitializeTLSSession (void) remoteInitializeTLSSession (void)
@ -2182,7 +2180,7 @@ static void qemudInactiveTimer(int timerid, void *data) {
virEventUpdateTimeoutImpl(timerid, -1); virEventUpdateTimeoutImpl(timerid, -1);
} else { } else {
DEBUG0("Timer expired and inactive, shutting down"); DEBUG0("Timer expired and inactive, shutting down");
server->shutdown = 1; server->quitEventThread = 1;
} }
} }
@ -2212,9 +2210,10 @@ static void qemudFreeClient(struct qemud_client *client) {
VIR_FREE(client); VIR_FREE(client);
} }
static int qemudRunLoop(struct qemud_server *server) { static void *qemudRunLoop(void *opaque) {
struct qemud_server *server = opaque;
int timerid = -1; int timerid = -1;
int ret = -1, i; int i;
int timerActive = 0; int timerActive = 0;
virMutexLock(&server->lock); virMutexLock(&server->lock);
@ -2224,7 +2223,7 @@ static int qemudRunLoop(struct qemud_server *server) {
qemudInactiveTimer, qemudInactiveTimer,
server, NULL)) < 0) { server, NULL)) < 0) {
VIR_ERROR0(_("Failed to register shutdown timeout")); VIR_ERROR0(_("Failed to register shutdown timeout"));
return -1; return NULL;
} }
if (min_workers > max_workers) if (min_workers > max_workers)
@ -2233,7 +2232,7 @@ static int qemudRunLoop(struct qemud_server *server) {
server->nworkers = max_workers; server->nworkers = max_workers;
if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) { if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
VIR_ERROR0(_("Failed to allocate workers")); VIR_ERROR0(_("Failed to allocate workers"));
return -1; return NULL;
} }
for (i = 0 ; i < min_workers ; i++) { for (i = 0 ; i < min_workers ; i++) {
@ -2242,7 +2241,7 @@ static int qemudRunLoop(struct qemud_server *server) {
server->nactiveworkers++; server->nactiveworkers++;
} }
for (;;) { for (;!server->quitEventThread;) {
/* A shutdown timeout is specified, so check /* A shutdown timeout is specified, so check
* if any drivers have active state, if not * if any drivers have active state, if not
* shutdown after timeout seconds * shutdown after timeout seconds
@ -2314,11 +2313,6 @@ static int qemudRunLoop(struct qemud_server *server) {
server->nactiveworkers--; server->nactiveworkers--;
} }
} }
if (server->shutdown) {
ret = 0;
break;
}
} }
cleanup: cleanup:
@ -2337,9 +2331,28 @@ cleanup:
VIR_FREE(server->workers); VIR_FREE(server->workers);
virMutexUnlock(&server->lock); virMutexUnlock(&server->lock);
return ret; return NULL;
} }
static int qemudStartEventLoop(struct qemud_server *server) {
pthread_attr_t attr;
pthread_attr_init(&attr);
/* We want to join the eventloop, so don't detach it */
/*pthread_attr_setdetachstate(&attr, 1);*/
if (pthread_create(&server->eventThread,
&attr,
qemudRunLoop,
server) != 0)
return -1;
server->hasEventThread = 1;
return 0;
}
static void qemudCleanup(struct qemud_server *server) { static void qemudCleanup(struct qemud_server *server) {
struct qemud_socket *sock; struct qemud_socket *sock;
@ -3120,6 +3133,13 @@ int main(int argc, char **argv) {
statuswrite = -1; statuswrite = -1;
} }
/* Start the event loop in a background thread, since
* state initialization needs events to be being processed */
if (qemudStartEventLoop(server) < 0) {
VIR_ERROR0("Event thread startup failed");
goto error;
}
/* Start the stateful HV drivers /* Start the stateful HV drivers
* This is delibrately done after telling the parent process * This is delibrately done after telling the parent process
* we're ready, since it can take a long time and this will * we're ready, since it can take a long time and this will
@ -3129,9 +3149,32 @@ int main(int argc, char **argv) {
goto error; goto error;
} }
qemudRunLoop(server); /* Start accepting new clients from network */
virMutexLock(&server->lock);
if (qemudNetworkEnable(server) < 0) {
VIR_ERROR0("Network event loop enablement failed");
goto shutdown;
}
virMutexUnlock(&server->lock);
ret = 0; ret = 0;
shutdown:
/* In a non-0 shutdown scenario we need to tell event loop
* to quit immediately. Otherwise in normal case we just
* sit in the thread join forever. Sure this means the
* main thread doesn't do anything useful ever, but that's
* not too much of drain on resources
*/
if (ret != 0) {
virMutexLock(&server->lock);
if (server->hasEventThread)
/* This SIGQUIT triggers the shutdown process */
kill(getpid(), SIGQUIT);
virMutexUnlock(&server->lock);
}
pthread_join(server->eventThread, NULL);
error: error:
if (statuswrite != -1) { if (statuswrite != -1) {
if (ret != 0) { if (ret != 0) {

View File

@ -269,7 +269,9 @@ struct qemud_server {
int sigread; int sigread;
int sigwrite; int sigwrite;
char *logDir; char *logDir;
unsigned int shutdown : 1; pthread_t eventThread;
unsigned int hasEventThread :1;
unsigned int quitEventThread :1;
#ifdef HAVE_AVAHI #ifdef HAVE_AVAHI
struct libvirtd_mdns *mdns; struct libvirtd_mdns *mdns;
#endif #endif