diff --git a/daemon/THREADING.txt b/daemon/THREADING.txt index 4be71d63ad..6386941da5 100644 --- a/daemon/THREADING.txt +++ b/daemon/THREADING.txt @@ -1,9 +1,29 @@ - Threading: the RULES. - ==================== + Threading in the libvirtd daemon + ================================ -If you don't understand this, don't touch the code. Ask for -further advice / explanation on the mailing list first. +To allow efficient processing of RPC requests, the libvirtd daemon +makes use of threads. + + - The process leader. This is the initial thread of control + when the daemon starts running. It is responsible for + initializing all the state, and starting the event loop. + Once that's all done, this thread does nothing except + wait for the event loop to quit, thus indicating an orderly + shutdown is required. + + - The event loop. This thread runs the event loop, sitting + in poll() on all monitored file handles, and calculating + and dispatching any timers that may be registered. When + this thread quits, the entire daemon will shutdown. + + - The workers. These 'n' threads all sit around waiting to + process incoming RPC requests. Since RPC requests may take + a long time to complete, with long idle periods, there will + be quite a few workers running. + +The use of threads obviously requires locking to ensure safety when +accessing/changing data structures. - the top level lock is on 'struct qemud_server'. This must be held before acquiring any other lock @@ -16,47 +36,17 @@ further advice / explanation on the mailing list first. this as a caller of virEvent APIs. -The server lock is only needed / used once the daemon has entered -its main loop, which is the qemudRunLoop() . The initial thread -acquires the lock upon entering this method. +The server lock is used in conjunction with a condition variable +to pass jobs from the event loop thread to the workers. The main +event loop thread handles I/O from the client socket, and once a +complete RPC message has been read off the wire (and optionally +decrypted), it will be placed onto the 'dx' job queue for the +associated client object. The job condition will be signalled and +a worker will wakup and process it. -It immediatelty spawns 'n' worker threads, whose main loop is -the qemudWorker() method. The workers will immediately try to -acquire the server lock, and thus block since its held by the -initial thread. +The worker thread must quickly drop its locks on the server and +client to allow the main event loop thread to continue running +with its other work. Critically important, is that now libvirt +API call will ever be made with the server or client locks held. -When the initial thread enters the poll() call, it drops the -server lock. The worker locks now each wakeup, acquire the -server lock and go into a condition wait on the 'job' condition -variable. The workers are now all 'primed' for incoming RPC -calls. - - - -A file descriptor event now occurrs, causing the initial thread -to exit poll(). It invokes the registered callback associated -with the file descriptors on which the event occurrs. The callbacks -are required to immediately acquire the server lock. - -If the callback is dealing with a client event, it will then -acquire the client lock, and drop the server lock. - -The callback will now handle the I/O event, reading or writing -a RPC message. Once a complete RPC message has been read the -client is marked as being in state QEMUD_MODE_WAIT_DISPATCH, -and the 'job' condition variable is signaled. The callback -now drops the client lock and goes back into the poll() loop -waiting for more I/O events. - -Meanwhile one of the worker threads wakes up from its condition -variable sleep, holding the server lock. It now searches for a -client in state QEMUD_MODE_WAIT_DISPATCH. If it doesn't find -one, it goes back to sleep. If it does find one, then it calls -into the remoteDispatchClientRequest() method de-serialize the -incoming message into an XDR object and invoke the helper method -for the associated RPC call. - -While the helper method is executing, no locks are held on either -the client or server, but the ref count on the 'struct qemud_client' -object is incremented to ensure its not deleted. The helper can -now safely invoke the necessary libvirt API call. +-- End diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 0615cd2d81..b5a62cce32 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -384,7 +384,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED, case SIGQUIT: case SIGTERM: VIR_WARN(_("Shutting down on signal %d"), siginfo.si_signo); - server->shutdown = 1; + server->quitEventThread = 1; break; default: @@ -393,7 +393,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED, } if (ret != 0) - server->shutdown = 1; + server->quitEventThread = 1; virMutexUnlock(&server->lock); } @@ -579,16 +579,6 @@ static int qemudListenUnix(struct qemud_server *server, goto cleanup; } - if ((sock->watch = virEventAddHandleImpl(sock->fd, - VIR_EVENT_HANDLE_READABLE | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_HANGUP, - qemudDispatchServerEvent, - server, NULL)) < 0) { - VIR_ERROR0(_("Failed to add server event callback")); - goto cleanup; - } - sock->next = server->sockets; server->sockets = sock; server->nsockets++; @@ -713,17 +703,6 @@ remoteListenTCP (struct qemud_server *server, virStrerror (errno, ebuf, sizeof ebuf)); goto cleanup; } - - if ((sock->watch = virEventAddHandleImpl(sock->fd, - VIR_EVENT_HANDLE_READABLE | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_HANGUP, - qemudDispatchServerEvent, - server, NULL)) < 0) { - VIR_ERROR0(_("Failed to add server event callback")); - goto cleanup; - } - } return 0; @@ -1037,6 +1016,25 @@ static int qemudNetworkInit(struct qemud_server *server) { return -1; } +static int qemudNetworkEnable(struct qemud_server *server) { + struct qemud_socket *sock; + + sock = server->sockets; + while (sock) { + if ((sock->watch = virEventAddHandleImpl(sock->fd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_ERROR | + VIR_EVENT_HANDLE_HANGUP, + qemudDispatchServerEvent, + server, NULL)) < 0) { + VIR_ERROR0(_("Failed to add server event callback")); + return -1; + } + + sock = sock->next; + } + return 0; +} static gnutls_session_t remoteInitializeTLSSession (void) @@ -2182,7 +2180,7 @@ static void qemudInactiveTimer(int timerid, void *data) { virEventUpdateTimeoutImpl(timerid, -1); } else { DEBUG0("Timer expired and inactive, shutting down"); - server->shutdown = 1; + server->quitEventThread = 1; } } @@ -2212,9 +2210,10 @@ static void qemudFreeClient(struct qemud_client *client) { VIR_FREE(client); } -static int qemudRunLoop(struct qemud_server *server) { +static void *qemudRunLoop(void *opaque) { + struct qemud_server *server = opaque; int timerid = -1; - int ret = -1, i; + int i; int timerActive = 0; virMutexLock(&server->lock); @@ -2224,7 +2223,7 @@ static int qemudRunLoop(struct qemud_server *server) { qemudInactiveTimer, server, NULL)) < 0) { VIR_ERROR0(_("Failed to register shutdown timeout")); - return -1; + return NULL; } if (min_workers > max_workers) @@ -2233,7 +2232,7 @@ static int qemudRunLoop(struct qemud_server *server) { server->nworkers = max_workers; if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) { VIR_ERROR0(_("Failed to allocate workers")); - return -1; + return NULL; } for (i = 0 ; i < min_workers ; i++) { @@ -2242,7 +2241,7 @@ static int qemudRunLoop(struct qemud_server *server) { server->nactiveworkers++; } - for (;;) { + for (;!server->quitEventThread;) { /* A shutdown timeout is specified, so check * if any drivers have active state, if not * shutdown after timeout seconds @@ -2314,11 +2313,6 @@ static int qemudRunLoop(struct qemud_server *server) { server->nactiveworkers--; } } - - if (server->shutdown) { - ret = 0; - break; - } } cleanup: @@ -2337,9 +2331,28 @@ cleanup: VIR_FREE(server->workers); virMutexUnlock(&server->lock); - return ret; + return NULL; } + +static int qemudStartEventLoop(struct qemud_server *server) { + pthread_attr_t attr; + pthread_attr_init(&attr); + /* We want to join the eventloop, so don't detach it */ + /*pthread_attr_setdetachstate(&attr, 1);*/ + + if (pthread_create(&server->eventThread, + &attr, + qemudRunLoop, + server) != 0) + return -1; + + server->hasEventThread = 1; + + return 0; +} + + static void qemudCleanup(struct qemud_server *server) { struct qemud_socket *sock; @@ -3120,6 +3133,13 @@ int main(int argc, char **argv) { statuswrite = -1; } + /* Start the event loop in a background thread, since + * state initialization needs events to be being processed */ + if (qemudStartEventLoop(server) < 0) { + VIR_ERROR0("Event thread startup failed"); + goto error; + } + /* Start the stateful HV drivers * This is delibrately done after telling the parent process * we're ready, since it can take a long time and this will @@ -3129,9 +3149,32 @@ int main(int argc, char **argv) { goto error; } - qemudRunLoop(server); + /* Start accepting new clients from network */ + virMutexLock(&server->lock); + if (qemudNetworkEnable(server) < 0) { + VIR_ERROR0("Network event loop enablement failed"); + goto shutdown; + } + virMutexUnlock(&server->lock); + ret = 0; +shutdown: + /* In a non-0 shutdown scenario we need to tell event loop + * to quit immediately. Otherwise in normal case we just + * sit in the thread join forever. Sure this means the + * main thread doesn't do anything useful ever, but that's + * not too much of drain on resources + */ + if (ret != 0) { + virMutexLock(&server->lock); + if (server->hasEventThread) + /* This SIGQUIT triggers the shutdown process */ + kill(getpid(), SIGQUIT); + virMutexUnlock(&server->lock); + } + pthread_join(server->eventThread, NULL); + error: if (statuswrite != -1) { if (ret != 0) { diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index c0784d8ef2..e3624c64a0 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -269,7 +269,9 @@ struct qemud_server { int sigread; int sigwrite; char *logDir; - unsigned int shutdown : 1; + pthread_t eventThread; + unsigned int hasEventThread :1; + unsigned int quitEventThread :1; #ifdef HAVE_AVAHI struct libvirtd_mdns *mdns; #endif