diff --git a/daemon/admin.c b/daemon/admin.c index 4589eb618f..00e7dc3c00 100644 --- a/daemon/admin.c +++ b/daemon/admin.c @@ -178,4 +178,47 @@ adminDispatchServerGetThreadpoolParameters(virNetServerPtr server ATTRIBUTE_UNUS virObjectUnref(srv); return rv; } + +static int +adminDispatchServerSetThreadpoolParameters(virNetServerPtr server ATTRIBUTE_UNUSED, + virNetServerClientPtr client, + virNetMessagePtr msg ATTRIBUTE_UNUSED, + virNetMessageErrorPtr rerr, + struct admin_server_set_threadpool_parameters_args *args) +{ + int rv = -1; + virNetServerPtr srv = NULL; + virTypedParameterPtr params = NULL; + int nparams = 0; + struct daemonAdmClientPrivate *priv = + virNetServerClientGetPrivateData(client); + + if (!(srv = virNetDaemonGetServer(priv->dmn, args->srv.name))) { + virReportError(VIR_ERR_NO_SERVER, + _("no server with matching name '%s' found"), + args->srv.name); + goto cleanup; + } + + if (virTypedParamsDeserialize((virTypedParameterRemotePtr) args->params.params_val, + args->params.params_len, + ADMIN_SERVER_THREADPOOL_PARAMETERS_MAX, + ¶ms, + &nparams) < 0) + goto cleanup; + + + if (adminServerSetThreadPoolParameters(srv, params, + nparams, args->flags) < 0) + goto cleanup; + + rv = 0; + cleanup: + if (rv < 0) + virNetMessageSaveError(rerr); + + virTypedParamsFree(params, nparams); + virObjectUnref(srv); + return rv; +} #include "admin_dispatch.h" diff --git a/daemon/admin_server.c b/daemon/admin_server.c index 10c00f6386..e39a9bd805 100644 --- a/daemon/admin_server.c +++ b/daemon/admin_server.c @@ -32,6 +32,7 @@ #include "virnetserver.h" #include "virstring.h" #include "virthreadpool.h" +#include "virtypedparam.h" #define VIR_FROM_THIS VIR_FROM_ADMIN @@ -135,3 +136,45 @@ adminServerGetThreadPoolParameters(virNetServerPtr srv, virTypedParamsFree(tmpparams, *nparams); return ret; } + +int +adminServerSetThreadPoolParameters(virNetServerPtr srv, + virTypedParameterPtr params, + int nparams, + unsigned int flags) +{ + long long int minWorkers = -1; + long long int maxWorkers = -1; + long long int prioWorkers = -1; + virTypedParameterPtr param = NULL; + + virCheckFlags(0, -1); + + if (virTypedParamsValidate(params, nparams, + VIR_THREADPOOL_WORKERS_MIN, + VIR_TYPED_PARAM_UINT, + VIR_THREADPOOL_WORKERS_MAX, + VIR_TYPED_PARAM_UINT, + VIR_THREADPOOL_WORKERS_PRIORITY, + VIR_TYPED_PARAM_UINT, + NULL) < 0) + return -1; + + if ((param = virTypedParamsGet(params, nparams, + VIR_THREADPOOL_WORKERS_MIN))) + minWorkers = param->value.ui; + + if ((param = virTypedParamsGet(params, nparams, + VIR_THREADPOOL_WORKERS_MAX))) + maxWorkers = param->value.ui; + + if ((param = virTypedParamsGet(params, nparams, + VIR_THREADPOOL_WORKERS_PRIORITY))) + prioWorkers = param->value.ui; + + if (virNetServerSetThreadPoolParameters(srv, minWorkers, + maxWorkers, prioWorkers) < 0) + return -1; + + return 0; +} diff --git a/daemon/admin_server.h b/daemon/admin_server.h index 2ddaecc707..756e049ff0 100644 --- a/daemon/admin_server.h +++ b/daemon/admin_server.h @@ -40,5 +40,10 @@ adminServerGetThreadPoolParameters(virNetServerPtr srv, virTypedParameterPtr *params, int *nparams, unsigned int flags); +int +adminServerSetThreadPoolParameters(virNetServerPtr srv, + virTypedParameterPtr params, + int nparams, + unsigned int flags); #endif /* __LIBVIRTD_ADMIN_SERVER_H__ */ diff --git a/include/libvirt/libvirt-admin.h b/include/libvirt/libvirt-admin.h index bb13250513..bce6034d97 100644 --- a/include/libvirt/libvirt-admin.h +++ b/include/libvirt/libvirt-admin.h @@ -177,6 +177,11 @@ int virAdmServerGetThreadPoolParameters(virAdmServerPtr srv, int *nparams, unsigned int flags); +int virAdmServerSetThreadPoolParameters(virAdmServerPtr srv, + virTypedParameterPtr params, + int nparams, + unsigned int flags); + # ifdef __cplusplus } # endif diff --git a/src/admin/admin_protocol.x b/src/admin/admin_protocol.x index b1093d8889..c701698e9c 100644 --- a/src/admin/admin_protocol.x +++ b/src/admin/admin_protocol.x @@ -110,6 +110,12 @@ struct admin_server_get_threadpool_parameters_ret { admin_typed_param params; }; +struct admin_server_set_threadpool_parameters_args { + admin_nonnull_server srv; + admin_typed_param params; + unsigned int flags; +}; + /* Define the program number, protocol version and procedure numbers here. */ const ADMIN_PROGRAM = 0x06900690; const ADMIN_PROTOCOL_VERSION = 1; @@ -160,5 +166,10 @@ enum admin_procedure { /** * @generate: none */ - ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6 + ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6, + + /** + * @generate: none + */ + ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7 }; diff --git a/src/admin/admin_remote.c b/src/admin/admin_remote.c index ac38ce91a9..2dd692b342 100644 --- a/src/admin/admin_remote.c +++ b/src/admin/admin_remote.c @@ -267,3 +267,38 @@ remoteAdminServerGetThreadPoolParameters(virAdmServerPtr srv, virObjectUnlock(priv); return rv; } + +static int +remoteAdminServerSetThreadPoolParameters(virAdmServerPtr srv, + virTypedParameterPtr params, + int nparams, + unsigned int flags) +{ + int rv = -1; + remoteAdminPrivPtr priv = srv->conn->privateData; + admin_server_set_threadpool_parameters_args args; + + args.flags = flags; + make_nonnull_server(&args.srv, srv); + + virObjectLock(priv); + + if (virTypedParamsSerialize(params, nparams, + (virTypedParameterRemotePtr *) &args.params.params_val, + &args.params.params_len, + 0) < 0) + goto cleanup; + + + if (call(srv->conn, 0, ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS, + (xdrproc_t)xdr_admin_server_set_threadpool_parameters_args, (char *) &args, + (xdrproc_t)xdr_void, (char *) NULL) == -1) + goto cleanup; + + rv = 0; + cleanup: + virTypedParamsRemoteFree((virTypedParameterRemotePtr) args.params.params_val, + args.params.params_len); + virObjectUnlock(priv); + return rv; +} diff --git a/src/admin_protocol-structs b/src/admin_protocol-structs index c4e679a900..650d31d06a 100644 --- a/src/admin_protocol-structs +++ b/src/admin_protocol-structs @@ -61,6 +61,14 @@ struct admin_server_get_threadpool_parameters_ret { admin_typed_param * params_val; } params; }; +struct admin_server_set_threadpool_parameters_args { + admin_nonnull_server srv; + struct { + u_int params_len; + admin_typed_param * params_val; + } params; + u_int flags; +}; enum admin_procedure { ADMIN_PROC_CONNECT_OPEN = 1, ADMIN_PROC_CONNECT_CLOSE = 2, @@ -68,4 +76,5 @@ enum admin_procedure { ADMIN_PROC_CONNECT_LIST_SERVERS = 4, ADMIN_PROC_CONNECT_LOOKUP_SERVER = 5, ADMIN_PROC_SERVER_GET_THREADPOOL_PARAMETERS = 6, + ADMIN_PROC_SERVER_SET_THREADPOOL_PARAMETERS = 7, }; diff --git a/src/libvirt-admin.c b/src/libvirt-admin.c index 07d46c4db5..df71649c2e 100644 --- a/src/libvirt-admin.c +++ b/src/libvirt-admin.c @@ -721,3 +721,40 @@ virAdmServerGetThreadPoolParameters(virAdmServerPtr srv, virDispatchError(NULL); return -1; } + +/** + * virAdmServerSetThreadPoolParameters: + * @srv: a valid server object reference + * @params: pointer to threadpool typed parameter objects + * @nparams: number of parameters in @params + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Change server threadpool parameters according to @params. Note that some + * tunables are read-only, thus any attempt to set them will result in a + * failure. + * + * Returns 0 on success, -1 in case of an error. + */ +int +virAdmServerSetThreadPoolParameters(virAdmServerPtr srv, + virTypedParameterPtr params, + int nparams, + unsigned int flags) +{ + VIR_DEBUG("srv=%p, params=%p, nparams=%x, flags=%x", + srv, params, nparams, flags); + + virResetLastError(); + + virCheckAdmServerReturn(srv, -1); + virCheckNonNullArgGoto(params, error); + + if (remoteAdminServerSetThreadPoolParameters(srv, params, + nparams, flags) < 0) + goto error; + + return 0; + error: + virDispatchError(NULL); + return -1; +} diff --git a/src/libvirt_admin_private.syms b/src/libvirt_admin_private.syms index b05067c7e3..b150d8a861 100644 --- a/src/libvirt_admin_private.syms +++ b/src/libvirt_admin_private.syms @@ -14,6 +14,7 @@ xdr_admin_connect_lookup_server_ret; xdr_admin_connect_open_args; xdr_admin_server_get_threadpool_parameters_args; xdr_admin_server_get_threadpool_parameters_ret; +xdr_admin_server_set_threadpool_parameters_args; # datatypes.h virAdmConnectClass; diff --git a/src/libvirt_admin_public.syms b/src/libvirt_admin_public.syms index 0a12b5fb3f..0a16444816 100644 --- a/src/libvirt_admin_public.syms +++ b/src/libvirt_admin_public.syms @@ -26,4 +26,5 @@ LIBVIRT_ADMIN_1.3.0 { virAdmServerGetThreadPoolParameters; virAdmServerFree; virAdmConnectLookupServer; + virAdmServerSetThreadPoolParameters; }; diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 6d90eca675..f046fbff97 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -2373,6 +2373,7 @@ virThreadPoolGetMinWorkers; virThreadPoolGetPriorityWorkers; virThreadPoolNewFull; virThreadPoolSendJob; +virThreadPoolSetParameters; # util/virtime.h diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c index 3878547700..57bd95c22e 100644 --- a/src/rpc/virnetserver.c +++ b/src/rpc/virnetserver.c @@ -899,3 +899,18 @@ virNetServerGetThreadPoolParameters(virNetServerPtr srv, virObjectUnlock(srv); return 0; } + +int +virNetServerSetThreadPoolParameters(virNetServerPtr srv, + long long int minWorkers, + long long int maxWorkers, + long long int prioWorkers) +{ + int ret; + + virObjectLock(srv); + ret = virThreadPoolSetParameters(srv->workers, minWorkers, + maxWorkers, prioWorkers); + virObjectUnlock(srv); + return ret; +} diff --git a/src/rpc/virnetserver.h b/src/rpc/virnetserver.h index 6f17d1cc4c..8b304f68e7 100644 --- a/src/rpc/virnetserver.h +++ b/src/rpc/virnetserver.h @@ -97,4 +97,9 @@ int virNetServerGetThreadPoolParameters(virNetServerPtr srv, size_t *nPrioWorkers, size_t *jobQueueDepth); +int virNetServerSetThreadPoolParameters(virNetServerPtr srv, + long long int minWorkers, + long long int maxWorkers, + long long int prioWorkers); + #endif /* __VIR_NET_SERVER_H__ */ diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c index fec8620c10..10f2bd2c3a 100644 --- a/src/util/virthreadpool.c +++ b/src/util/virthreadpool.c @@ -73,6 +73,7 @@ struct _virThreadPool { size_t nWorkers; virThreadPtr workers; + size_t maxPrioWorkers; size_t nPrioWorkers; virThreadPtr prioWorkers; virCond prioCond; @@ -84,12 +85,22 @@ struct virThreadPoolWorkerData { bool priority; }; +/* Test whether the worker needs to quit if the current number of workers @count + * is greater than @limit actually allows. + */ +static inline bool virThreadPoolWorkerQuitHelper(size_t count, size_t limit) +{ + return count > limit; +} + static void virThreadPoolWorker(void *opaque) { struct virThreadPoolWorkerData *data = opaque; virThreadPoolPtr pool = data->pool; virCondPtr cond = data->cond; bool priority = data->priority; + size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers; + size_t *maxLimit = priority ? &pool->maxPrioWorkers : &pool->maxWorkers; virThreadPoolJobPtr job = NULL; VIR_FREE(data); @@ -97,6 +108,14 @@ static void virThreadPoolWorker(void *opaque) virMutexLock(&pool->mutex); while (1) { + /* In order to support async worker termination, we need ensure that + * both busy and free workers know if they need to terminated. Thus, + * busy workers need to check for this fact before they start waiting for + * another job (and before taking another one from the queue); and + * free workers need to check for this right after waking up. + */ + if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit)) + goto out; while (!pool->quit && ((!priority && !pool->jobList.head) || (priority && !pool->jobList.firstPrio))) { @@ -109,6 +128,9 @@ static void virThreadPoolWorker(void *opaque) } if (!priority) pool->freeWorkers--; + + if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit)) + goto out; } if (pool->quit) @@ -160,12 +182,12 @@ static void virThreadPoolWorker(void *opaque) static int virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority) { - virThreadPtr workers = priority ? pool->prioWorkers : pool->workers; + virThreadPtr *workers = priority ? &pool->prioWorkers : &pool->workers; size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers; size_t i = 0; struct virThreadPoolWorkerData *data = NULL; - if (VIR_EXPAND_N(workers, *curWorkers, gain) < 0) + if (VIR_EXPAND_N(*workers, *curWorkers, gain) < 0) return -1; for (i = 0; i < gain; i++) { @@ -176,7 +198,7 @@ virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority) data->cond = priority ? &pool->prioCond : &pool->cond; data->priority = priority; - if (virThreadCreateFull(&workers[i], + if (virThreadCreateFull(&(*workers)[i], false, virThreadPoolWorker, pool->jobFuncName, @@ -226,6 +248,7 @@ virThreadPoolNewFull(size_t minWorkers, pool->minWorkers = minWorkers; pool->maxWorkers = maxWorkers; + pool->maxPrioWorkers = prioWorkers; if (virThreadPoolExpand(pool, minWorkers, false) < 0) goto error; @@ -399,3 +422,54 @@ int virThreadPoolSendJob(virThreadPoolPtr pool, virMutexUnlock(&pool->mutex); return -1; } + +int +virThreadPoolSetParameters(virThreadPoolPtr pool, + long long int minWorkers, + long long int maxWorkers, + long long int prioWorkers) +{ + size_t max; + size_t min; + + virMutexLock(&pool->mutex); + + max = maxWorkers >= 0 ? maxWorkers : pool->maxWorkers; + min = minWorkers >= 0 ? minWorkers : pool->minWorkers; + if (min > max) { + virReportError(VIR_ERR_INVALID_ARG, "%s", + _("minWorkers cannot be larger than maxWorkers")); + goto error; + } + + if (minWorkers >= 0) { + if ((size_t) minWorkers > pool->nWorkers && + virThreadPoolExpand(pool, minWorkers - pool->nWorkers, + false) < 0) + goto error; + pool->minWorkers = minWorkers; + } + + if (maxWorkers >= 0) { + pool->maxWorkers = maxWorkers; + virCondBroadcast(&pool->cond); + } + + if (prioWorkers >= 0) { + if (prioWorkers < pool->nPrioWorkers) { + virCondBroadcast(&pool->prioCond); + } else if ((size_t) prioWorkers > pool->nPrioWorkers && + virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers, + true) < 0) { + goto error; + } + pool->maxPrioWorkers = prioWorkers; + } + + virMutexUnlock(&pool->mutex); + return 0; + + error: + virMutexUnlock(&pool->mutex); + return -1; +} diff --git a/src/util/virthreadpool.h b/src/util/virthreadpool.h index bc0c90771b..e1f362f5bb 100644 --- a/src/util/virthreadpool.h +++ b/src/util/virthreadpool.h @@ -57,4 +57,9 @@ int virThreadPoolSendJob(virThreadPoolPtr pool, void *jobdata) ATTRIBUTE_NONNULL(1) ATTRIBUTE_RETURN_CHECK; +int virThreadPoolSetParameters(virThreadPoolPtr pool, + long long int minWorkers, + long long int maxWorkers, + long long int prioWorkers); + #endif