Dynamically adjust worker threads in daemon

This commit is contained in:
Daniel P. Berrange 2009-01-20 19:27:11 +00:00
parent f61341173b
commit d82071710e
3 changed files with 111 additions and 16 deletions

View File

@ -1,3 +1,8 @@
Tue Jan 20 19:26:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
* qemud/qemud.c, qemud/qemud.h: Dynamic spawn/cleanup threads
for processing RPC calls as number of clients changes
Tue Jan 20 19:24:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
* qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Allow the

View File

@ -167,7 +167,7 @@ static void sig_handler(int sig, siginfo_t * siginfo,
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
@ -1247,6 +1247,20 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
server->clients[server->nclients++] = client;
if (server->nclients > server->nactiveworkers &&
server->nactiveworkers < server->nworkers) {
int i;
for (i = 0 ; i < server->nworkers ; i++) {
if (!server->workers[i].hasThread) {
if (qemudStartWorker(server, &server->workers[i]) < 0)
return -1;
server->nactiveworkers++;
break;
}
}
}
return 0;
cleanup:
@ -1302,19 +1316,28 @@ static struct qemud_client *qemudPendingJob(struct qemud_server *server)
static void *qemudWorker(void *data)
{
struct qemud_server *server = data;
struct qemud_worker *worker = data;
struct qemud_server *server = worker->server;
while (1) {
struct qemud_client *client = NULL;
struct qemud_client_message *reply;
virMutexLock(&server->lock);
while ((client = qemudPendingJob(server)) == NULL) {
while (((client = qemudPendingJob(server)) == NULL) &&
!worker->quitRequest) {
if (virCondWait(&server->job, &server->lock) < 0) {
virMutexUnlock(&server->lock);
return NULL;
}
}
if (worker->quitRequest) {
if (client)
virMutexUnlock(&client->lock);
virMutexUnlock(&server->lock);
return NULL;
}
worker->processingCall = 1;
virMutexUnlock(&server->lock);
/* We own a locked client now... */
@ -1341,9 +1364,40 @@ static void *qemudWorker(void *data)
client->refs--;
virMutexUnlock(&client->lock);
virMutexLock(&server->lock);
worker->processingCall = 0;
virMutexUnlock(&server->lock);
}
}
static int qemudStartWorker(struct qemud_server *server,
struct qemud_worker *worker) {
pthread_attr_t attr;
pthread_attr_init(&attr);
/* We want to join workers, so don't detach them */
/*pthread_attr_setdetachstate(&attr, 1);*/
if (worker->hasThread)
return -1;
worker->server = server;
worker->hasThread = 1;
worker->quitRequest = 0;
worker->processingCall = 0;
if (pthread_create(&worker->thread,
&attr,
qemudWorker,
worker) != 0) {
worker->hasThread = 0;
worker->server = NULL;
return -1;
}
return 0;
}
/*
* Read data into buffer using wire decoding (plain or TLS)
@ -1940,21 +1994,19 @@ static int qemudRunLoop(struct qemud_server *server) {
virMutexLock(&server->lock);
server->nworkers = min_workers;
if (min_workers > max_workers)
max_workers = min_workers;
server->nworkers = max_workers;
if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
VIR_ERROR0(_("Failed to allocate workers"));
return -1;
}
for (i = 0 ; i < server->nworkers ; i++) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, 1);
pthread_create(&server->workers[i],
&attr,
qemudWorker,
server);
for (i = 0 ; i < min_workers ; i++) {
if (qemudStartWorker(server, &server->workers[i]) < 0)
goto cleanup;
server->nactiveworkers++;
}
for (;;) {
@ -2000,6 +2052,26 @@ static int qemudRunLoop(struct qemud_server *server) {
}
}
/* If number of active workers exceeds both the min_workers
* threshold and the number of clients, then kill some
* off */
for (i = 0 ; (i < server->nworkers &&
server->nactiveworkers > server->nclients &&
server->nactiveworkers > min_workers) ; i++) {
if (server->workers[i].hasThread &&
!server->workers[i].processingCall) {
server->workers[i].quitRequest = 1;
virCondBroadcast(&server->job);
virMutexUnlock(&server->lock);
pthread_join(server->workers[i].thread, NULL);
virMutexLock(&server->lock);
server->workers[i].hasThread = 0;
server->nactiveworkers--;
}
}
/* Unregister any timeout that's active, since we
* just had an event processed
*/
@ -2015,11 +2087,18 @@ static int qemudRunLoop(struct qemud_server *server) {
}
}
cleanup:
for (i = 0 ; i < server->nworkers ; i++) {
pthread_t thread = server->workers[i];
if (!server->workers[i].hasThread)
continue;
server->workers[i].quitRequest = 1;
virCondBroadcast(&server->job);
virMutexUnlock(&server->lock);
pthread_join(thread, NULL);
pthread_join(server->workers[i].thread, NULL);
virMutexLock(&server->lock);
server->workers[i].hasThread = 0;
}
VIR_FREE(server->workers);

View File

@ -157,13 +157,24 @@ struct qemud_socket {
struct qemud_socket *next;
};
struct qemud_worker {
pthread_t thread;
int hasThread :1;
int processingCall :1;
int quitRequest : 1;
/* back-pointer to our server */
struct qemud_server *server;
};
/* Main server state */
struct qemud_server {
virMutex lock;
virCond job;
int nworkers;
pthread_t *workers;
int nactiveworkers;
struct qemud_worker *workers;
int nsockets;
struct qemud_socket *sockets;
int nclients;