Allow RPC server to run single threaded

Refactor the RPC server dispatcher code so that if 'max_workers==0'
the entire server will run single threaded. This is useful for
use cases where there will only ever be 1 client connected
which serializes its requests

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
This commit is contained in:
Daniel P. Berrange 2012-03-15 18:18:07 +00:00
parent c6b2d5d082
commit dff6d809fb

View File

@ -127,6 +127,49 @@ static void virNetServerUnlock(virNetServerPtr srv)
}
static int virNetServerProcessMsg(virNetServerPtr srv,
virNetServerClientPtr client,
virNetServerProgramPtr prog,
virNetMessagePtr msg)
{
int ret = -1;
if (!prog) {
/* Only send back an error for type == CALL. Other
* message types are not expecting replies, so we
* must just log it & drop them
*/
if (msg->header.type == VIR_NET_CALL ||
msg->header.type == VIR_NET_CALL_WITH_FDS) {
if (virNetServerProgramUnknownError(client,
msg,
&msg->header) < 0)
goto cleanup;
} else {
VIR_INFO("Dropping client mesage, unknown program %d version %d type %d proc %d",
msg->header.prog, msg->header.vers,
msg->header.type, msg->header.proc);
/* Send a dummy reply to free up 'msg' & unblock client rx */
virNetMessageClear(msg);
msg->header.type = VIR_NET_REPLY;
if (virNetServerClientSendMessage(client, msg) < 0)
goto cleanup;
}
goto done;
}
if (virNetServerProgramDispatch(prog,
srv,
client,
msg) < 0)
goto cleanup;
done:
ret = 0;
cleanup:
return ret;
}
static void virNetServerHandleJob(void *jobOpaque, void *opaque)
{
virNetServerPtr srv = opaque;
@ -135,41 +178,13 @@ static void virNetServerHandleJob(void *jobOpaque, void *opaque)
VIR_DEBUG("server=%p client=%p message=%p prog=%p",
srv, job->client, job->msg, job->prog);
if (!job->prog) {
/* Only send back an error for type == CALL. Other
* message types are not expecting replies, so we
* must just log it & drop them
*/
if (job->msg->header.type == VIR_NET_CALL ||
job->msg->header.type == VIR_NET_CALL_WITH_FDS) {
if (virNetServerProgramUnknownError(job->client,
job->msg,
&job->msg->header) < 0)
goto error;
} else {
VIR_INFO("Dropping client mesage, unknown program %d version %d type %d proc %d",
job->msg->header.prog, job->msg->header.vers,
job->msg->header.type, job->msg->header.proc);
/* Send a dummy reply to free up 'msg' & unblock client rx */
virNetMessageClear(job->msg);
job->msg->header.type = VIR_NET_REPLY;
if (virNetServerClientSendMessage(job->client, job->msg) < 0)
goto error;
}
goto cleanup;
}
if (virNetServerProgramDispatch(job->prog,
srv,
job->client,
job->msg) < 0)
if (virNetServerProcessMsg(srv, job->client, job->prog, job->msg) < 0)
goto error;
virNetServerLock(srv);
virNetServerProgramFree(job->prog);
virNetServerUnlock(srv);
cleanup:
virNetServerClientFree(job->client);
VIR_FREE(job);
return;
@ -187,7 +202,6 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
void *opaque)
{
virNetServerPtr srv = opaque;
virNetServerJobPtr job;
virNetServerProgramPtr prog = NULL;
unsigned int priority = 0;
size_t i;
@ -196,34 +210,42 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
VIR_DEBUG("server=%p client=%p message=%p",
srv, client, msg);
if (VIR_ALLOC(job) < 0) {
virReportOOMError();
return -1;
}
job->client = client;
job->msg = msg;
virNetServerLock(srv);
for (i = 0 ; i < srv->nprograms ; i++) {
if (virNetServerProgramMatches(srv->programs[i], job->msg)) {
if (virNetServerProgramMatches(srv->programs[i], msg)) {
prog = srv->programs[i];
break;
}
}
if (prog) {
virNetServerProgramRef(prog);
job->prog = prog;
priority = virNetServerProgramGetPriority(prog, msg->header.proc);
if (srv->workers) {
virNetServerJobPtr job;
if (VIR_ALLOC(job) < 0) {
virReportOOMError();
goto cleanup;
}
job->client = client;
job->msg = msg;
if (prog) {
virNetServerProgramRef(prog);
job->prog = prog;
priority = virNetServerProgramGetPriority(prog, msg->header.proc);
}
ret = virThreadPoolSendJob(srv->workers, priority, job);
if (ret < 0) {
VIR_FREE(job);
virNetServerProgramFree(prog);
}
} else {
ret = virNetServerProcessMsg(srv, client, prog, msg);
}
ret = virThreadPoolSendJob(srv->workers, priority, job);
if (ret < 0) {
VIR_FREE(job);
virNetServerProgramFree(prog);
}
cleanup:
virNetServerUnlock(srv);
return ret;
@ -324,7 +346,8 @@ virNetServerPtr virNetServerNew(size_t min_workers,
srv->refs = 1;
if (!(srv->workers = virThreadPoolNew(min_workers, max_workers,
if (max_workers &&
!(srv->workers = virThreadPoolNew(min_workers, max_workers,
priority_workers,
virNetServerHandleJob,
srv)))