From 075bb5f1aabfb7d046faaae33d0e19a352dd7d35 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Fri, 16 Oct 2009 16:34:37 +0100 Subject: [PATCH] Move libvirtd event loop into background thread The virStateInitialize() call for starting up stateful drivers may require that the event loop is running already. This it is neccessary to start the event loop before this call. At the same time, network clients must not be processed until afte virStateInitialize has completed. The qemudListenUnix() and remoteListenTCP() methods must therefore not register file handle watches, merely open the network sockets & listen() on them. This means clients can connected and are queued, pending completion of initialization The qemudRunLoop() method is moved into a background thread that is started early to allow access to the event loop during driver initialization. The main process thread leader pretty much does nothing once the daemon is running, merely waits for the event loop thread to quit * daemon/libvirtd.c, daemon/libvirtd.h: Move event loop into a background thread * daemon/THREADING.txt: Rewrite docs to better reflect reality --- daemon/THREADING.txt | 82 ++++++++++++++---------------- daemon/libvirtd.c | 115 +++++++++++++++++++++++++++++-------------- daemon/libvirtd.h | 4 +- 3 files changed, 118 insertions(+), 83 deletions(-) diff --git a/daemon/THREADING.txt b/daemon/THREADING.txt index 4be71d63ad..6386941da5 100644 --- a/daemon/THREADING.txt +++ b/daemon/THREADING.txt @@ -1,9 +1,29 @@ - Threading: the RULES. - ==================== + Threading in the libvirtd daemon + ================================ -If you don't understand this, don't touch the code. Ask for -further advice / explanation on the mailing list first. +To allow efficient processing of RPC requests, the libvirtd daemon +makes use of threads. + + - The process leader. This is the initial thread of control + when the daemon starts running. It is responsible for + initializing all the state, and starting the event loop. + Once that's all done, this thread does nothing except + wait for the event loop to quit, thus indicating an orderly + shutdown is required. + + - The event loop. This thread runs the event loop, sitting + in poll() on all monitored file handles, and calculating + and dispatching any timers that may be registered. When + this thread quits, the entire daemon will shutdown. + + - The workers. These 'n' threads all sit around waiting to + process incoming RPC requests. Since RPC requests may take + a long time to complete, with long idle periods, there will + be quite a few workers running. + +The use of threads obviously requires locking to ensure safety when +accessing/changing data structures. - the top level lock is on 'struct qemud_server'. This must be held before acquiring any other lock @@ -16,47 +36,17 @@ further advice / explanation on the mailing list first. this as a caller of virEvent APIs. -The server lock is only needed / used once the daemon has entered -its main loop, which is the qemudRunLoop() . The initial thread -acquires the lock upon entering this method. +The server lock is used in conjunction with a condition variable +to pass jobs from the event loop thread to the workers. The main +event loop thread handles I/O from the client socket, and once a +complete RPC message has been read off the wire (and optionally +decrypted), it will be placed onto the 'dx' job queue for the +associated client object. The job condition will be signalled and +a worker will wakup and process it. -It immediatelty spawns 'n' worker threads, whose main loop is -the qemudWorker() method. The workers will immediately try to -acquire the server lock, and thus block since its held by the -initial thread. +The worker thread must quickly drop its locks on the server and +client to allow the main event loop thread to continue running +with its other work. Critically important, is that now libvirt +API call will ever be made with the server or client locks held. -When the initial thread enters the poll() call, it drops the -server lock. The worker locks now each wakeup, acquire the -server lock and go into a condition wait on the 'job' condition -variable. The workers are now all 'primed' for incoming RPC -calls. - - - -A file descriptor event now occurrs, causing the initial thread -to exit poll(). It invokes the registered callback associated -with the file descriptors on which the event occurrs. The callbacks -are required to immediately acquire the server lock. - -If the callback is dealing with a client event, it will then -acquire the client lock, and drop the server lock. - -The callback will now handle the I/O event, reading or writing -a RPC message. Once a complete RPC message has been read the -client is marked as being in state QEMUD_MODE_WAIT_DISPATCH, -and the 'job' condition variable is signaled. The callback -now drops the client lock and goes back into the poll() loop -waiting for more I/O events. - -Meanwhile one of the worker threads wakes up from its condition -variable sleep, holding the server lock. It now searches for a -client in state QEMUD_MODE_WAIT_DISPATCH. If it doesn't find -one, it goes back to sleep. If it does find one, then it calls -into the remoteDispatchClientRequest() method de-serialize the -incoming message into an XDR object and invoke the helper method -for the associated RPC call. - -While the helper method is executing, no locks are held on either -the client or server, but the ref count on the 'struct qemud_client' -object is incremented to ensure its not deleted. The helper can -now safely invoke the necessary libvirt API call. +-- End diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 0615cd2d81..b5a62cce32 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -384,7 +384,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED, case SIGQUIT: case SIGTERM: VIR_WARN(_("Shutting down on signal %d"), siginfo.si_signo); - server->shutdown = 1; + server->quitEventThread = 1; break; default: @@ -393,7 +393,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED, } if (ret != 0) - server->shutdown = 1; + server->quitEventThread = 1; virMutexUnlock(&server->lock); } @@ -579,16 +579,6 @@ static int qemudListenUnix(struct qemud_server *server, goto cleanup; } - if ((sock->watch = virEventAddHandleImpl(sock->fd, - VIR_EVENT_HANDLE_READABLE | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_HANGUP, - qemudDispatchServerEvent, - server, NULL)) < 0) { - VIR_ERROR0(_("Failed to add server event callback")); - goto cleanup; - } - sock->next = server->sockets; server->sockets = sock; server->nsockets++; @@ -713,17 +703,6 @@ remoteListenTCP (struct qemud_server *server, virStrerror (errno, ebuf, sizeof ebuf)); goto cleanup; } - - if ((sock->watch = virEventAddHandleImpl(sock->fd, - VIR_EVENT_HANDLE_READABLE | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_HANGUP, - qemudDispatchServerEvent, - server, NULL)) < 0) { - VIR_ERROR0(_("Failed to add server event callback")); - goto cleanup; - } - } return 0; @@ -1037,6 +1016,25 @@ static int qemudNetworkInit(struct qemud_server *server) { return -1; } +static int qemudNetworkEnable(struct qemud_server *server) { + struct qemud_socket *sock; + + sock = server->sockets; + while (sock) { + if ((sock->watch = virEventAddHandleImpl(sock->fd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_ERROR | + VIR_EVENT_HANDLE_HANGUP, + qemudDispatchServerEvent, + server, NULL)) < 0) { + VIR_ERROR0(_("Failed to add server event callback")); + return -1; + } + + sock = sock->next; + } + return 0; +} static gnutls_session_t remoteInitializeTLSSession (void) @@ -2182,7 +2180,7 @@ static void qemudInactiveTimer(int timerid, void *data) { virEventUpdateTimeoutImpl(timerid, -1); } else { DEBUG0("Timer expired and inactive, shutting down"); - server->shutdown = 1; + server->quitEventThread = 1; } } @@ -2212,9 +2210,10 @@ static void qemudFreeClient(struct qemud_client *client) { VIR_FREE(client); } -static int qemudRunLoop(struct qemud_server *server) { +static void *qemudRunLoop(void *opaque) { + struct qemud_server *server = opaque; int timerid = -1; - int ret = -1, i; + int i; int timerActive = 0; virMutexLock(&server->lock); @@ -2224,7 +2223,7 @@ static int qemudRunLoop(struct qemud_server *server) { qemudInactiveTimer, server, NULL)) < 0) { VIR_ERROR0(_("Failed to register shutdown timeout")); - return -1; + return NULL; } if (min_workers > max_workers) @@ -2233,7 +2232,7 @@ static int qemudRunLoop(struct qemud_server *server) { server->nworkers = max_workers; if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) { VIR_ERROR0(_("Failed to allocate workers")); - return -1; + return NULL; } for (i = 0 ; i < min_workers ; i++) { @@ -2242,7 +2241,7 @@ static int qemudRunLoop(struct qemud_server *server) { server->nactiveworkers++; } - for (;;) { + for (;!server->quitEventThread;) { /* A shutdown timeout is specified, so check * if any drivers have active state, if not * shutdown after timeout seconds @@ -2314,11 +2313,6 @@ static int qemudRunLoop(struct qemud_server *server) { server->nactiveworkers--; } } - - if (server->shutdown) { - ret = 0; - break; - } } cleanup: @@ -2337,9 +2331,28 @@ cleanup: VIR_FREE(server->workers); virMutexUnlock(&server->lock); - return ret; + return NULL; } + +static int qemudStartEventLoop(struct qemud_server *server) { + pthread_attr_t attr; + pthread_attr_init(&attr); + /* We want to join the eventloop, so don't detach it */ + /*pthread_attr_setdetachstate(&attr, 1);*/ + + if (pthread_create(&server->eventThread, + &attr, + qemudRunLoop, + server) != 0) + return -1; + + server->hasEventThread = 1; + + return 0; +} + + static void qemudCleanup(struct qemud_server *server) { struct qemud_socket *sock; @@ -3120,6 +3133,13 @@ int main(int argc, char **argv) { statuswrite = -1; } + /* Start the event loop in a background thread, since + * state initialization needs events to be being processed */ + if (qemudStartEventLoop(server) < 0) { + VIR_ERROR0("Event thread startup failed"); + goto error; + } + /* Start the stateful HV drivers * This is delibrately done after telling the parent process * we're ready, since it can take a long time and this will @@ -3129,9 +3149,32 @@ int main(int argc, char **argv) { goto error; } - qemudRunLoop(server); + /* Start accepting new clients from network */ + virMutexLock(&server->lock); + if (qemudNetworkEnable(server) < 0) { + VIR_ERROR0("Network event loop enablement failed"); + goto shutdown; + } + virMutexUnlock(&server->lock); + ret = 0; +shutdown: + /* In a non-0 shutdown scenario we need to tell event loop + * to quit immediately. Otherwise in normal case we just + * sit in the thread join forever. Sure this means the + * main thread doesn't do anything useful ever, but that's + * not too much of drain on resources + */ + if (ret != 0) { + virMutexLock(&server->lock); + if (server->hasEventThread) + /* This SIGQUIT triggers the shutdown process */ + kill(getpid(), SIGQUIT); + virMutexUnlock(&server->lock); + } + pthread_join(server->eventThread, NULL); + error: if (statuswrite != -1) { if (ret != 0) { diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index c0784d8ef2..e3624c64a0 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -269,7 +269,9 @@ struct qemud_server { int sigread; int sigwrite; char *logDir; - unsigned int shutdown : 1; + pthread_t eventThread; + unsigned int hasEventThread :1; + unsigned int quitEventThread :1; #ifdef HAVE_AVAHI struct libvirtd_mdns *mdns; #endif