diff --git a/ChangeLog b/ChangeLog index 0855957e6b..9f7ae1f0b2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +Tue Jan 20 19:26:53 GMT 2009 Daniel P. Berrange + + * qemud/qemud.c, qemud/qemud.h: Dynamic spawn/cleanup threads + for processing RPC calls as number of clients changes + Tue Jan 20 19:24:53 GMT 2009 Daniel P. Berrange * qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Allow the diff --git a/qemud/qemud.c b/qemud/qemud.c index 21cecf2397..d60cb3526c 100644 --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -167,7 +167,7 @@ static void sig_handler(int sig, siginfo_t * siginfo, static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); - +static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker); void qemudClientMessageQueuePush(struct qemud_client_message **queue, @@ -1247,6 +1247,20 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket server->clients[server->nclients++] = client; + if (server->nclients > server->nactiveworkers && + server->nactiveworkers < server->nworkers) { + int i; + for (i = 0 ; i < server->nworkers ; i++) { + if (!server->workers[i].hasThread) { + if (qemudStartWorker(server, &server->workers[i]) < 0) + return -1; + server->nactiveworkers++; + break; + } + } + } + + return 0; cleanup: @@ -1302,19 +1316,28 @@ static struct qemud_client *qemudPendingJob(struct qemud_server *server) static void *qemudWorker(void *data) { - struct qemud_server *server = data; + struct qemud_worker *worker = data; + struct qemud_server *server = worker->server; while (1) { struct qemud_client *client = NULL; struct qemud_client_message *reply; virMutexLock(&server->lock); - while ((client = qemudPendingJob(server)) == NULL) { + while (((client = qemudPendingJob(server)) == NULL) && + !worker->quitRequest) { if (virCondWait(&server->job, &server->lock) < 0) { virMutexUnlock(&server->lock); return NULL; } } + if (worker->quitRequest) { + if (client) + virMutexUnlock(&client->lock); + virMutexUnlock(&server->lock); + return NULL; + } + worker->processingCall = 1; virMutexUnlock(&server->lock); /* We own a locked client now... */ @@ -1341,9 +1364,40 @@ static void *qemudWorker(void *data) client->refs--; virMutexUnlock(&client->lock); + + virMutexLock(&server->lock); + worker->processingCall = 0; + virMutexUnlock(&server->lock); } } +static int qemudStartWorker(struct qemud_server *server, + struct qemud_worker *worker) { + pthread_attr_t attr; + pthread_attr_init(&attr); + /* We want to join workers, so don't detach them */ + /*pthread_attr_setdetachstate(&attr, 1);*/ + + if (worker->hasThread) + return -1; + + worker->server = server; + worker->hasThread = 1; + worker->quitRequest = 0; + worker->processingCall = 0; + + if (pthread_create(&worker->thread, + &attr, + qemudWorker, + worker) != 0) { + worker->hasThread = 0; + worker->server = NULL; + return -1; + } + + return 0; +} + /* * Read data into buffer using wire decoding (plain or TLS) @@ -1940,21 +1994,19 @@ static int qemudRunLoop(struct qemud_server *server) { virMutexLock(&server->lock); - server->nworkers = min_workers; + if (min_workers > max_workers) + max_workers = min_workers; + + server->nworkers = max_workers; if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) { VIR_ERROR0(_("Failed to allocate workers")); return -1; } - for (i = 0 ; i < server->nworkers ; i++) { - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, 1); - - pthread_create(&server->workers[i], - &attr, - qemudWorker, - server); + for (i = 0 ; i < min_workers ; i++) { + if (qemudStartWorker(server, &server->workers[i]) < 0) + goto cleanup; + server->nactiveworkers++; } for (;;) { @@ -2000,6 +2052,26 @@ static int qemudRunLoop(struct qemud_server *server) { } } + /* If number of active workers exceeds both the min_workers + * threshold and the number of clients, then kill some + * off */ + for (i = 0 ; (i < server->nworkers && + server->nactiveworkers > server->nclients && + server->nactiveworkers > min_workers) ; i++) { + + if (server->workers[i].hasThread && + !server->workers[i].processingCall) { + server->workers[i].quitRequest = 1; + + virCondBroadcast(&server->job); + virMutexUnlock(&server->lock); + pthread_join(server->workers[i].thread, NULL); + virMutexLock(&server->lock); + server->workers[i].hasThread = 0; + server->nactiveworkers--; + } + } + /* Unregister any timeout that's active, since we * just had an event processed */ @@ -2015,11 +2087,18 @@ static int qemudRunLoop(struct qemud_server *server) { } } +cleanup: for (i = 0 ; i < server->nworkers ; i++) { - pthread_t thread = server->workers[i]; + if (!server->workers[i].hasThread) + continue; + + server->workers[i].quitRequest = 1; + virCondBroadcast(&server->job); + virMutexUnlock(&server->lock); - pthread_join(thread, NULL); + pthread_join(server->workers[i].thread, NULL); virMutexLock(&server->lock); + server->workers[i].hasThread = 0; } VIR_FREE(server->workers); diff --git a/qemud/qemud.h b/qemud/qemud.h index 9a2ff80f23..7938f89ed8 100644 --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -157,13 +157,24 @@ struct qemud_socket { struct qemud_socket *next; }; +struct qemud_worker { + pthread_t thread; + int hasThread :1; + int processingCall :1; + int quitRequest : 1; + + /* back-pointer to our server */ + struct qemud_server *server; +}; + /* Main server state */ struct qemud_server { virMutex lock; virCond job; int nworkers; - pthread_t *workers; + int nactiveworkers; + struct qemud_worker *workers; int nsockets; struct qemud_socket *sockets; int nclients;