util: Refactor thread creation by introducing virThreadPoolExpand

When either creating a threadpool, or creating a new thread to accomplish a job
that had been placed into the jobqueue, every time thread-specific data need to
be allocated, threadpool needs to be (re)-allocated and thread count indicators
updated. Make the code clearer to read by compressing these operations into a
more complex one.

Signed-off-by: Erik Skultety <eskultet@redhat.com>
This commit is contained in:
Erik Skultety 2016-02-25 12:45:32 +01:00
parent 84d21591a8
commit 396f80519e

View File

@ -157,6 +157,43 @@ static void virThreadPoolWorker(void *opaque)
virMutexUnlock(&pool->mutex); virMutexUnlock(&pool->mutex);
} }
static int
virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority)
{
virThreadPtr workers = priority ? pool->prioWorkers : pool->workers;
size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
size_t i = 0;
struct virThreadPoolWorkerData *data = NULL;
if (VIR_EXPAND_N(workers, *curWorkers, gain) < 0)
return -1;
for (i = 0; i < gain; i++) {
if (VIR_ALLOC(data) < 0)
goto error;
data->pool = pool;
data->cond = priority ? &pool->prioCond : &pool->cond;
data->priority = priority;
if (virThreadCreateFull(&workers[i],
false,
virThreadPoolWorker,
pool->jobFuncName,
true,
data) < 0) {
VIR_FREE(data);
goto error;
}
}
return 0;
error:
*curWorkers -= gain - i;
return -1;
}
virThreadPoolPtr virThreadPoolPtr
virThreadPoolNewFull(size_t minWorkers, virThreadPoolNewFull(size_t minWorkers,
size_t maxWorkers, size_t maxWorkers,
@ -166,8 +203,6 @@ virThreadPoolNewFull(size_t minWorkers,
void *opaque) void *opaque)
{ {
virThreadPoolPtr pool; virThreadPoolPtr pool;
size_t i;
struct virThreadPoolWorkerData *data = NULL;
if (minWorkers > maxWorkers) if (minWorkers > maxWorkers)
minWorkers = maxWorkers; minWorkers = maxWorkers;
@ -188,58 +223,23 @@ virThreadPoolNewFull(size_t minWorkers,
if (virCondInit(&pool->quit_cond) < 0) if (virCondInit(&pool->quit_cond) < 0)
goto error; goto error;
if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
goto error;
pool->minWorkers = minWorkers; pool->minWorkers = minWorkers;
pool->maxWorkers = maxWorkers; pool->maxWorkers = maxWorkers;
for (i = 0; i < minWorkers; i++) { if (virThreadPoolExpand(pool, minWorkers, false) < 0)
if (VIR_ALLOC(data) < 0) goto error;
goto error;
data->pool = pool;
data->cond = &pool->cond;
if (virThreadCreateFull(&pool->workers[i],
false,
virThreadPoolWorker,
pool->jobFuncName,
true,
data) < 0) {
goto error;
}
pool->nWorkers++;
}
if (prioWorkers) { if (prioWorkers) {
if (virCondInit(&pool->prioCond) < 0) if (virCondInit(&pool->prioCond) < 0)
goto error; goto error;
if (VIR_ALLOC_N(pool->prioWorkers, prioWorkers) < 0)
if (virThreadPoolExpand(pool, prioWorkers, true) < 0)
goto error; goto error;
for (i = 0; i < prioWorkers; i++) {
if (VIR_ALLOC(data) < 0)
goto error;
data->pool = pool;
data->cond = &pool->prioCond;
data->priority = true;
if (virThreadCreateFull(&pool->prioWorkers[i],
false,
virThreadPoolWorker,
pool->jobFuncName,
true,
data) < 0) {
goto error;
}
pool->nPrioWorkers++;
}
} }
return pool; return pool;
error: error:
VIR_FREE(data);
virThreadPoolFree(pool); virThreadPoolFree(pool);
return NULL; return NULL;
@ -307,36 +307,15 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
void *jobData) void *jobData)
{ {
virThreadPoolJobPtr job; virThreadPoolJobPtr job;
struct virThreadPoolWorkerData *data = NULL;
virMutexLock(&pool->mutex); virMutexLock(&pool->mutex);
if (pool->quit) if (pool->quit)
goto error; goto error;
if (pool->freeWorkers - pool->jobQueueDepth <= 0 && if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
pool->nWorkers < pool->maxWorkers) { pool->nWorkers < pool->maxWorkers &&
if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) virThreadPoolExpand(pool, 1, false) < 0)
goto error; goto error;
if (VIR_ALLOC(data) < 0) {
pool->nWorkers--;
goto error;
}
data->pool = pool;
data->cond = &pool->cond;
if (virThreadCreateFull(&pool->workers[pool->nWorkers - 1],
false,
virThreadPoolWorker,
pool->jobFuncName,
true,
data) < 0) {
VIR_FREE(data);
pool->nWorkers--;
goto error;
}
}
if (VIR_ALLOC(job) < 0) if (VIR_ALLOC(job) < 0)
goto error; goto error;