util: add stop/drain functions to thread pool

Stop just send signal for threads to exit when they finish with
current task. Drain waits when all threads will finish.

Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
Reviewed-by: Daniel Henrique Barboza <danielhb413@gmail.com>
This commit is contained in:
Nikolay Shirokovskiy 2020-07-09 10:58:02 +03:00
parent 018e213f5d
commit 255437eeb7
3 changed files with 43 additions and 5 deletions

View File

@ -3332,6 +3332,7 @@ virThreadJobSetWorker;
# util/virthreadpool.h
virThreadPoolDrain;
virThreadPoolFree;
virThreadPoolGetCurrentWorkers;
virThreadPoolGetFreeWorkers;
@ -3342,6 +3343,7 @@ virThreadPoolGetPriorityWorkers;
virThreadPoolNewFull;
virThreadPoolSendJob;
virThreadPoolSetParameters;
virThreadPoolStop;
# util/virtime.h

View File

@ -268,19 +268,27 @@ virThreadPoolNewFull(size_t minWorkers,
}
void virThreadPoolFree(virThreadPoolPtr pool)
{
virThreadPoolJobPtr job;
if (!pool)
static void
virThreadPoolStopLocked(virThreadPoolPtr pool)
{
if (pool->quit)
return;
virMutexLock(&pool->mutex);
pool->quit = true;
if (pool->nWorkers > 0)
virCondBroadcast(&pool->cond);
if (pool->nPrioWorkers > 0)
virCondBroadcast(&pool->prioCond);
}
static void
virThreadPoolDrainLocked(virThreadPoolPtr pool)
{
virThreadPoolJobPtr job;
virThreadPoolStopLocked(pool);
while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
@ -289,6 +297,15 @@ void virThreadPoolFree(virThreadPoolPtr pool)
pool->jobList.head = pool->jobList.head->next;
VIR_FREE(job);
}
}
void virThreadPoolFree(virThreadPoolPtr pool)
{
if (!pool)
return;
virMutexLock(&pool->mutex);
virThreadPoolDrainLocked(pool);
VIR_FREE(pool->workers);
virMutexUnlock(&pool->mutex);
@ -475,3 +492,19 @@ virThreadPoolSetParameters(virThreadPoolPtr pool,
virMutexUnlock(&pool->mutex);
return -1;
}
void
virThreadPoolStop(virThreadPoolPtr pool)
{
virMutexLock(&pool->mutex);
virThreadPoolStopLocked(pool);
virMutexUnlock(&pool->mutex);
}
void
virThreadPoolDrain(virThreadPoolPtr pool)
{
virMutexLock(&pool->mutex);
virThreadPoolDrainLocked(pool);
virMutexUnlock(&pool->mutex);
}

View File

@ -56,3 +56,6 @@ int virThreadPoolSetParameters(virThreadPoolPtr pool,
long long int minWorkers,
long long int maxWorkers,
long long int prioWorkers);
void virThreadPoolStop(virThreadPoolPtr pool);
void virThreadPoolDrain(virThreadPoolPtr pool);