Allow concurrent processing of RPC calls in daemon

This commit is contained in:
Daniel P. Berrange 2009-01-20 19:25:15 +00:00
parent 84ef468ba8
commit f61341173b
8 changed files with 604 additions and 322 deletions

View File

@ -1,3 +1,12 @@
Tue Jan 20 19:24:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
* qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Allow the
processing of multiple concurrent RPC calls per client
connection.
* qemud/libvirtd.conf, qemud/libvirtd.aug,
qemud/test_libvirtd.aug: Add config param for controlling
number of requests per client.
Tue Jan 20 18:16:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
* src/xm_internal.c: Fix 2 misleading comments & potential

View File

@ -53,6 +53,8 @@ module Libvirtd =
let processing_entry = int_entry "min_workers"
| int_entry "max_workers"
| int_entry "max_clients"
| int_entry "max_requests"
| int_entry "max_client_requests"
let logging_entry = int_entry "log_level"
| str_entry "log_filters"

View File

@ -247,6 +247,22 @@
#min_workers = 5
#max_workers = 20
# Total global limit on concurrent RPC calls. Should be
# at least as large as max_workers. Beyond this, RPC requests
# will be read into memory and queued. This directly impact
# memory usage, currently each request requires 256 KB of
# memory. So by default upto 5 MB of memory is used
#
# XXX this isn't actually enforced yet, only the per-client
# limit is used so far
#max_requests = 20
# Limit on concurrent requests from a single client
# connection. To avoid one client monopolizing the server
# this should be a small fraction of the global max_requests
# and max_workers parameter
#max_client_requests = 5
#################################################################
#
# Logging controls

View File

@ -138,6 +138,11 @@ static int min_workers = 5;
static int max_workers = 20;
static int max_clients = 20;
/* Total number of 'in-process' RPC calls allowed across all clients */
static int max_requests = 20;
/* Total number of 'in-process' RPC calls allowed by a single client*/
static int max_client_requests = 5;
#define DH_BITS 1024
static sig_atomic_t sig_errors = 0;
@ -162,9 +167,35 @@ static void sig_handler(int sig, siginfo_t * siginfo,
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
static int qemudRegisterClientEvent(struct qemud_server *server,
struct qemud_client *client,
int removeFirst);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
struct qemud_client_message *msg)
{
struct qemud_client_message *tmp = *queue;
if (tmp) {
while (tmp->next)
tmp = tmp->next;
tmp->next = msg;
} else {
*queue = msg;
}
}
static struct qemud_client_message *
qemudClientMessageQueueServe(struct qemud_client_message **queue)
{
struct qemud_client_message *tmp = *queue;
if (tmp) {
*queue = tmp->next;
tmp->next = NULL;
}
return tmp;
}
static int
remoteCheckCertFile(const char *type, const char *file)
@ -1042,6 +1073,8 @@ remoteCheckCertificate (gnutls_session_t session)
static int
remoteCheckAccess (struct qemud_client *client)
{
struct qemud_client_message *confirm;
/* Verify client certificate. */
if (remoteCheckCertificate (client->tlssession) == -1) {
VIR_ERROR0(_("remoteCheckCertificate: "
@ -1051,14 +1084,25 @@ remoteCheckAccess (struct qemud_client *client)
"is set so the bad certificate is ignored"));
}
if (client->tx) {
VIR_INFO("%s",
_("client had unexpected data pending tx after access check"));
return -1;
}
if (VIR_ALLOC(confirm) < 0)
return -1;
/* Checks have succeeded. Write a '\1' byte back to the client to
* indicate this (otherwise the socket is abruptly closed).
* (NB. The '\1' byte is sent in an encrypted record).
*/
client->bufferLength = 1;
client->bufferOffset = 0;
client->buffer[0] = '\1';
client->mode = QEMUD_MODE_TX_PACKET;
confirm->async = 1;
confirm->bufferLength = 1;
confirm->bufferOffset = 0;
confirm->buffer[0] = '\1';
client->tx = confirm;
return 0;
}
@ -1084,6 +1128,7 @@ int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid) {
}
#endif
static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket *sock) {
int fd;
struct sockaddr_storage addr;
@ -1099,7 +1144,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
}
if (server->nclients >= max_clients) {
VIR_ERROR0(_("Too many active clients, dropping connection"));
VIR_ERROR(_("Too many active clients (%d), dropping connection"), max_clients);
close(fd);
return -1;
}
@ -1137,6 +1182,12 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
client->addrlen = addrlen;
client->server = server;
/* Prepare one for packet receive */
if (VIR_ALLOC(client->rx) < 0)
goto cleanup;
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
#if HAVE_POLKIT
/* Only do policy checks for non-root - allow root user
through with no checks, as a fail-safe - root can easily
@ -1158,9 +1209,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
#endif
if (client->type != QEMUD_SOCK_TYPE_TLS) {
client->mode = QEMUD_MODE_RX_HEADER;
client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
/* Plain socket, so prepare to read first message */
if (qemudRegisterClientEvent (server, client, 0) < 0)
goto cleanup;
} else {
@ -1180,12 +1229,12 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
if (remoteCheckAccess (client) == -1)
goto cleanup;
/* Handshake & cert check OK, so prepare to read first message */
if (qemudRegisterClientEvent(server, client, 0) < 0)
goto cleanup;
} else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
/* Most likely. */
client->mode = QEMUD_MODE_TLS_HANDSHAKE;
client->bufferLength = -1;
/* Most likely, need to do more handshake data */
client->handshake = 1;
if (qemudRegisterClientEvent (server, client, 0) < 0)
goto cleanup;
@ -1204,7 +1253,8 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
if (client &&
client->tlssession) gnutls_deinit (client->tlssession);
close (fd);
free (client);
VIR_FREE(client->rx);
VIR_FREE(client);
return -1;
}
@ -1216,8 +1266,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
* We keep the libvirt connection open until any async
* jobs have finished, then clean it up elsehwere
*/
static void qemudDispatchClientFailure(struct qemud_server *server ATTRIBUTE_UNUSED,
struct qemud_client *client) {
void qemudDispatchClientFailure(struct qemud_client *client) {
virEventRemoveHandleImpl(client->watch);
/* Deregister event delivery callback */
@ -1242,7 +1291,7 @@ static struct qemud_client *qemudPendingJob(struct qemud_server *server)
int i;
for (i = 0 ; i < server->nclients ; i++) {
virMutexLock(&server->clients[i]->lock);
if (server->clients[i]->mode == QEMUD_MODE_WAIT_DISPATCH) {
if (server->clients[i]->dx) {
/* Delibrately don't unlock client - caller wants the lock */
return server->clients[i];
}
@ -1256,8 +1305,9 @@ static void *qemudWorker(void *data)
struct qemud_server *server = data;
while (1) {
struct qemud_client *client;
int len;
struct qemud_client *client = NULL;
struct qemud_client_message *reply;
virMutexLock(&server->lock);
while ((client = qemudPendingJob(server)) == NULL) {
if (virCondWait(&server->job, &server->lock) < 0) {
@ -1268,55 +1318,75 @@ static void *qemudWorker(void *data)
virMutexUnlock(&server->lock);
/* We own a locked client now... */
client->mode = QEMUD_MODE_IN_DISPATCH;
client->refs++;
if ((len = remoteDispatchClientRequest (server, client)) == 0)
qemudDispatchClientFailure(server, client);
/* Remove our message from dispatch queue while we use it */
reply = qemudClientMessageQueueServe(&client->dx);
/* Set up the output buffer. */
client->mode = QEMUD_MODE_TX_PACKET;
client->bufferLength = len;
client->bufferOffset = 0;
/* This function drops the lock during dispatch,
* and re-acquires it before returning */
if (remoteDispatchClientRequest (server, client, reply) < 0) {
VIR_FREE(reply);
qemudDispatchClientFailure(client);
client->refs--;
virMutexUnlock(&client->lock);
continue;
}
/* Put reply on end of tx queue to send out */
qemudClientMessageQueuePush(&client->tx, reply);
if (qemudRegisterClientEvent(server, client, 1) < 0)
qemudDispatchClientFailure(server, client);
qemudDispatchClientFailure(client);
client->refs--;
virMutexUnlock(&client->lock);
virMutexUnlock(&server->lock);
}
}
static int qemudClientReadBuf(struct qemud_server *server,
struct qemud_client *client,
char *data, unsigned len) {
int ret;
/*
* Read data into buffer using wire decoding (plain or TLS)
*
* Returns:
* -1 on error or EOF
* 0 on EAGAIN
* n number of bytes
*/
static ssize_t qemudClientReadBuf(struct qemud_client *client,
char *data, ssize_t len) {
ssize_t ret;
if (len < 0) {
VIR_ERROR(_("unexpected negative length request %d"), len);
qemudDispatchClientFailure(client);
return -1;
}
/*qemudDebug ("qemudClientRead: len = %d", len);*/
if (!client->tlssession) {
if ((ret = read (client->fd, data, len)) <= 0) {
if (ret == 0 || errno != EAGAIN) {
ret = read (client->fd, data, len);
if (ret == -1 && (errno == EAGAIN ||
errno == EINTR))
return 0;
if (ret <= 0) {
if (ret != 0)
VIR_ERROR(_("read: %s"), strerror (errno));
qemudDispatchClientFailure(server, client);
}
qemudDispatchClientFailure(client);
return -1;
}
} else {
ret = gnutls_record_recv (client->tlssession, data, len);
if (qemudRegisterClientEvent (server, client, 1) < 0)
qemudDispatchClientFailure (server, client);
else if (ret <= 0) {
if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
ret != GNUTLS_E_INTERRUPTED)) {
if (ret < 0 && (ret == GNUTLS_E_AGAIN ||
ret == GNUTLS_E_INTERRUPTED))
return 0;
if (ret <= 0) {
if (ret != 0)
VIR_ERROR(_("gnutls_record_recv: %s"),
gnutls_strerror (ret));
qemudDispatchClientFailure (server, client);
}
qemudDispatchClientFailure(client);
return -1;
}
}
@ -1324,22 +1394,37 @@ static int qemudClientReadBuf(struct qemud_server *server,
return ret;
}
static int qemudClientReadPlain(struct qemud_server *server,
struct qemud_client *client) {
int ret;
ret = qemudClientReadBuf(server, client,
client->buffer + client->bufferOffset,
client->bufferLength - client->bufferOffset);
if (ret < 0)
/*
* Read data into buffer without decoding
*
* Returns:
* -1 on error or EOF
* 0 on EAGAIN
* n number of bytes
*/
static ssize_t qemudClientReadPlain(struct qemud_client *client) {
ssize_t ret;
ret = qemudClientReadBuf(client,
client->rx->buffer + client->rx->bufferOffset,
client->rx->bufferLength - client->rx->bufferOffset);
if (ret <= 0)
return ret; /* -1 error, 0 eagain */
client->rx->bufferOffset += ret;
return ret;
client->bufferOffset += ret;
return 0;
}
#if HAVE_SASL
static int qemudClientReadSASL(struct qemud_server *server,
struct qemud_client *client) {
int got, want;
/*
* Read data into buffer decoding with SASL
*
* Returns:
* -1 on error or EOF
* 0 on EAGAIN
* n number of bytes
*/
static ssize_t qemudClientReadSASL(struct qemud_client *client) {
ssize_t got, want;
/* We're doing a SSF data read, so now its times to ensure
* future writes are under SSF too.
@ -1350,166 +1435,176 @@ static int qemudClientReadSASL(struct qemud_server *server,
/* Need to read some more data off the wire */
if (client->saslDecoded == NULL) {
int ret;
char encoded[8192];
int encodedLen = sizeof(encoded);
encodedLen = qemudClientReadBuf(server, client, encoded, encodedLen);
ssize_t encodedLen = sizeof(encoded);
encodedLen = qemudClientReadBuf(client, encoded, encodedLen);
if (encodedLen < 0)
return -1;
if (encodedLen <= 0)
return encodedLen;
sasl_decode(client->saslconn, encoded, encodedLen,
ret = sasl_decode(client->saslconn, encoded, encodedLen,
&client->saslDecoded, &client->saslDecodedLength);
if (ret != SASL_OK) {
VIR_ERROR(_("failed to decode SASL data %s"),
sasl_errstring(ret, NULL, NULL));
qemudDispatchClientFailure(client);
return -1;
}
client->saslDecodedOffset = 0;
}
/* Some buffered decoded data to return now */
got = client->saslDecodedLength - client->saslDecodedOffset;
want = client->bufferLength - client->bufferOffset;
want = client->rx->bufferLength - client->rx->bufferOffset;
if (want > got)
want = got;
memcpy(client->buffer + client->bufferOffset,
memcpy(client->rx->buffer + client->rx->bufferOffset,
client->saslDecoded + client->saslDecodedOffset, want);
client->saslDecodedOffset += want;
client->bufferOffset += want;
client->rx->bufferOffset += want;
if (client->saslDecodedOffset == client->saslDecodedLength) {
client->saslDecoded = NULL;
client->saslDecodedOffset = client->saslDecodedLength = 0;
}
return 0;
return want;
}
#endif
static int qemudClientRead(struct qemud_server *server,
struct qemud_client *client) {
/*
* Read as much data off wire as possible till we fill our
* buffer, or would block on I/O
*/
static ssize_t qemudClientRead(struct qemud_client *client) {
#if HAVE_SASL
if (client->saslSSF & QEMUD_SASL_SSF_READ)
return qemudClientReadSASL(server, client);
return qemudClientReadSASL(client);
else
#endif
return qemudClientReadPlain(server, client);
return qemudClientReadPlain(client);
}
static void qemudDispatchClientRead(struct qemud_server *server, struct qemud_client *client) {
unsigned int len;
/*
* Read data until we get a complete message to process
*/
static void qemudDispatchClientRead(struct qemud_server *server,
struct qemud_client *client) {
/*qemudDebug ("qemudDispatchClientRead: mode = %d", client->mode);*/
switch (client->mode) {
case QEMUD_MODE_RX_HEADER: {
readmore:
if (qemudClientRead(client) < 0)
return; /* Error */
if (client->rx->bufferOffset < client->rx->bufferLength)
return; /* Still not read enough */
/* Either done with length word header */
if (client->rx->bufferLength == REMOTE_MESSAGE_HEADER_XDR_LEN) {
unsigned int len;
XDR x;
if (qemudClientRead(server, client) < 0)
return; /* Error, or blocking */
if (client->bufferOffset < client->bufferLength)
return; /* Not read enough */
xdrmem_create(&x, client->buffer, client->bufferLength, XDR_DECODE);
xdrmem_create(&x, client->rx->buffer, client->rx->bufferLength, XDR_DECODE);
if (!xdr_u_int(&x, &len)) {
xdr_destroy (&x);
DEBUG0("Failed to decode packet length");
qemudDispatchClientFailure(server, client);
qemudDispatchClientFailure(client);
return;
}
xdr_destroy (&x);
if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) {
DEBUG("Packet length %u too small", len);
qemudDispatchClientFailure(client);
return;
}
/* Length includes the size of the length word itself */
len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
if (len > REMOTE_MESSAGE_MAX) {
DEBUG("Packet length %u too large", len);
qemudDispatchClientFailure(server, client);
qemudDispatchClientFailure(client);
return;
}
/* Length include length of the length field itself, so
* check minimum size requirements */
if (len <= REMOTE_MESSAGE_HEADER_XDR_LEN) {
DEBUG("Packet length %u too small", len);
qemudDispatchClientFailure(server, client);
return;
}
client->mode = QEMUD_MODE_RX_PAYLOAD;
client->bufferLength = len - REMOTE_MESSAGE_HEADER_XDR_LEN;
client->bufferOffset = 0;
/* Prepare to read rest of message */
client->rx->bufferLength += len;
if (qemudRegisterClientEvent(server, client, 1) < 0) {
qemudDispatchClientFailure(server, client);
qemudDispatchClientFailure(client);
return;
}
/* Fall through */
}
case QEMUD_MODE_RX_PAYLOAD: {
if (qemudClientRead(server, client) < 0)
return; /* Error, or blocking */
if (client->bufferOffset < client->bufferLength)
return; /* Not read enough */
client->mode = QEMUD_MODE_WAIT_DISPATCH;
if (qemudRegisterClientEvent(server, client, 1) < 0)
qemudDispatchClientFailure(server, client);
virCondSignal(&server->job);
break;
}
case QEMUD_MODE_TLS_HANDSHAKE: {
int ret;
/* Continue the handshake. */
ret = gnutls_handshake (client->tlssession);
if (ret == 0) {
/* Finished. Next step is to check the certificate. */
if (remoteCheckAccess (client) == -1)
qemudDispatchClientFailure (server, client);
else if (qemudRegisterClientEvent (server, client, 1) < 0)
qemudDispatchClientFailure (server, client);
} else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
VIR_ERROR(_("TLS handshake failed: %s"),
gnutls_strerror (ret));
qemudDispatchClientFailure (server, client);
/* Try and read payload immediately instead of going back
into poll() because chances are the data is already
waiting for us */
goto readmore;
} else {
if (qemudRegisterClientEvent (server ,client, 1) < 0)
qemudDispatchClientFailure (server, client);
}
/* Move completed message to the end of the dispatch queue */
qemudClientMessageQueuePush(&client->dx, client->rx);
client->rx = NULL;
client->nrequests++;
break;
}
/* Possibly need to create another receive buffer */
if ((client->nrequests < max_client_requests &&
VIR_ALLOC(client->rx) < 0)) {
qemudDispatchClientFailure(client);
} else {
if (client->rx)
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
default:
DEBUG("Got unexpected data read while in %d mode", client->mode);
qemudDispatchClientFailure(server, client);
if (qemudRegisterClientEvent(server, client, 1) < 0)
qemudDispatchClientFailure(client);
else
/* Tell one of the workers to get on with it... */
virCondSignal(&server->job);
}
}
}
static int qemudClientWriteBuf(struct qemud_server *server,
struct qemud_client *client,
const char *data, int len) {
int ret;
/*
* Send a chunk of data using wire encoding (plain or TLS)
*
* Returns:
* -1 on error
* 0 on EAGAIN
* n number of bytes
*/
static ssize_t qemudClientWriteBuf(struct qemud_client *client,
const char *data, ssize_t len) {
ssize_t ret;
if (len < 0) {
VIR_ERROR(_("unexpected negative length request %d"), len);
qemudDispatchClientFailure(client);
return -1;
}
if (!client->tlssession) {
if ((ret = safewrite(client->fd, data, len)) == -1) {
if ((ret = write(client->fd, data, len)) == -1) {
if (errno == EAGAIN || errno == EINTR)
return 0;
VIR_ERROR(_("write: %s"), strerror (errno));
qemudDispatchClientFailure(server, client);
qemudDispatchClientFailure(client);
return -1;
}
} else {
ret = gnutls_record_send (client->tlssession, data, len);
if (qemudRegisterClientEvent (server, client, 1) < 0)
qemudDispatchClientFailure (server, client);
else if (ret < 0) {
if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
if (ret < 0) {
if (ret == GNUTLS_E_INTERRUPTED ||
ret == GNUTLS_E_AGAIN)
return 0;
VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
qemudDispatchClientFailure (server, client);
}
qemudDispatchClientFailure(client);
return -1;
}
}
@ -1517,42 +1612,62 @@ static int qemudClientWriteBuf(struct qemud_server *server,
}
static int qemudClientWritePlain(struct qemud_server *server,
struct qemud_client *client) {
int ret = qemudClientWriteBuf(server, client,
client->buffer + client->bufferOffset,
client->bufferLength - client->bufferOffset);
if (ret < 0)
return -1;
client->bufferOffset += ret;
return 0;
/*
* Send client->tx using no encoding
*
* Returns:
* -1 on error or EOF
* 0 on EAGAIN
* n number of bytes
*/
static int qemudClientWritePlain(struct qemud_client *client) {
int ret = qemudClientWriteBuf(client,
client->tx->buffer + client->tx->bufferOffset,
client->tx->bufferLength - client->tx->bufferOffset);
if (ret <= 0)
return ret; /* -1 error, 0 = egain */
client->tx->bufferOffset += ret;
return ret;
}
#if HAVE_SASL
static int qemudClientWriteSASL(struct qemud_server *server,
struct qemud_client *client) {
/*
* Send client->tx using SASL encoding
*
* Returns:
* -1 on error
* 0 on EAGAIN
* n number of bytes
*/
static int qemudClientWriteSASL(struct qemud_client *client) {
int ret;
/* Not got any pending encoded data, so we need to encode raw stuff */
if (client->saslEncoded == NULL) {
int err;
err = sasl_encode(client->saslconn,
client->buffer + client->bufferOffset,
client->bufferLength - client->bufferOffset,
ret = sasl_encode(client->saslconn,
client->tx->buffer + client->tx->bufferOffset,
client->tx->bufferLength - client->tx->bufferOffset,
&client->saslEncoded,
&client->saslEncodedLength);
if (ret != SASL_OK) {
VIR_ERROR(_("failed to encode SASL data %s"),
sasl_errstring(ret, NULL, NULL));
qemudDispatchClientFailure(client);
return -1;
}
client->saslEncodedOffset = 0;
}
/* Send some of the encoded stuff out on the wire */
ret = qemudClientWriteBuf(server, client,
ret = qemudClientWriteBuf(client,
client->saslEncoded + client->saslEncodedOffset,
client->saslEncodedLength - client->saslEncodedOffset);
if (ret < 0)
return -1;
if (ret <= 0)
return ret; /* -1 error, 0 == egain */
/* Note how much we sent */
client->saslEncodedOffset += ret;
@ -1561,78 +1676,107 @@ static int qemudClientWriteSASL(struct qemud_server *server,
if (client->saslEncodedOffset == client->saslEncodedLength) {
client->saslEncoded = NULL;
client->saslEncodedOffset = client->saslEncodedLength = 0;
client->bufferOffset = client->bufferLength;
/* Mark as complete, so caller detects completion */
client->tx->bufferOffset = client->tx->bufferLength;
}
return 0;
return ret;
}
#endif
static int qemudClientWrite(struct qemud_server *server,
struct qemud_client *client) {
/*
* Send as much data in the client->tx as possible
*
* Returns:
* -1 on error or EOF
* 0 on EAGAIN
* n number of bytes
*/
static ssize_t qemudClientWrite(struct qemud_client *client) {
#if HAVE_SASL
if (client->saslSSF & QEMUD_SASL_SSF_WRITE)
return qemudClientWriteSASL(server, client);
return qemudClientWriteSASL(client);
else
#endif
return qemudClientWritePlain(server, client);
return qemudClientWritePlain(client);
}
void
/*
* Process all queued client->tx messages until
* we would block on I/O
*/
static void
qemudDispatchClientWrite(struct qemud_server *server,
struct qemud_client *client) {
switch (client->mode) {
case QEMUD_MODE_TX_PACKET: {
if (qemudClientWrite(server, client) < 0)
while (client->tx) {
ssize_t ret;
ret = qemudClientWrite(client);
if (ret < 0) {
qemudDispatchClientFailure(client);
return;
}
if (ret == 0)
return; /* Would block on write EAGAIN */
if (client->bufferOffset == client->bufferLength) {
if (client->closing) {
qemudDispatchClientFailure (server, client);
if (client->tx->bufferOffset == client->tx->bufferLength) {
struct qemud_client_message *reply;
/* Get finished reply from head of tx queue */
reply = qemudClientMessageQueueServe(&client->tx);
/* If its not an async message, then we have
* just completed an RPC request */
if (!reply->async)
client->nrequests--;
/* Move record to end of 'rx' ist */
if (!client->rx &&
client->nrequests < max_client_requests) {
/* Reset message record for next RX attempt */
client->rx = reply;
client->rx->bufferOffset = 0;
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
} else {
/* Done writing, switch back to receive */
client->mode = QEMUD_MODE_RX_HEADER;
client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
client->bufferOffset = 0;
if (qemudRegisterClientEvent (server, client, 1) < 0)
qemudDispatchClientFailure (server, client);
}
}
/* Still writing */
break;
VIR_FREE(reply);
}
case QEMUD_MODE_TLS_HANDSHAKE: {
if (client->closing ||
qemudRegisterClientEvent (server, client, 1) < 0)
qemudDispatchClientFailure(client);
}
}
}
static void
qemudDispatchClientHandshake(struct qemud_server *server,
struct qemud_client *client) {
int ret;
/* Continue the handshake. */
ret = gnutls_handshake (client->tlssession);
if (ret == 0) {
/* Finished. Next step is to check the certificate. */
if (remoteCheckAccess (client) == -1)
qemudDispatchClientFailure (server, client);
qemudDispatchClientFailure(client);
else if (qemudRegisterClientEvent (server, client, 1))
qemudDispatchClientFailure (server, client);
} else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
VIR_ERROR(_("TLS handshake failed: %s"), gnutls_strerror (ret));
qemudDispatchClientFailure (server, client);
} else {
qemudDispatchClientFailure(client);
} else if (ret == GNUTLS_E_AGAIN ||
ret == GNUTLS_E_INTERRUPTED) {
/* Carry on waiting for more handshake. Update
the events just in case handshake data flow
direction has changed */
if (qemudRegisterClientEvent (server, client, 1))
qemudDispatchClientFailure (server, client);
}
break;
}
default:
DEBUG("Got unexpected data write while in %d mode", client->mode);
qemudDispatchClientFailure(server, client);
qemudDispatchClientFailure(client);
} else {
/* Fatal error in handshake */
VIR_ERROR(_("TLS handshake failed: %s"),
gnutls_strerror (ret));
qemudDispatchClientFailure(client);
}
}
static void
qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
struct qemud_server *server = (struct qemud_server *)opaque;
@ -1642,59 +1786,66 @@ qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
virMutexLock(&server->lock);
for (i = 0 ; i < server->nclients ; i++) {
virMutexLock(&server->clients[i]->lock);
if (server->clients[i]->watch == watch) {
client = server->clients[i];
break;
}
virMutexUnlock(&server->clients[i]->lock);
}
virMutexUnlock(&server->lock);
if (!client) {
virMutexUnlock(&server->lock);
return;
}
virMutexLock(&client->lock);
virMutexUnlock(&server->lock);
if (client->fd != fd)
if (client->fd != fd) {
virMutexUnlock(&client->lock);
return;
}
if (events == VIR_EVENT_HANDLE_WRITABLE)
if (events & (VIR_EVENT_HANDLE_WRITABLE |
VIR_EVENT_HANDLE_READABLE)) {
if (client->handshake) {
qemudDispatchClientHandshake(server, client);
} else {
if (events & VIR_EVENT_HANDLE_WRITABLE)
qemudDispatchClientWrite(server, client);
else if (events == VIR_EVENT_HANDLE_READABLE)
if (events & VIR_EVENT_HANDLE_READABLE)
qemudDispatchClientRead(server, client);
else
qemudDispatchClientFailure(server, client);
}
}
/* NB, will get HANGUP + READABLE at same time upon
* disconnect */
if (events & (VIR_EVENT_HANDLE_ERROR |
VIR_EVENT_HANDLE_HANGUP))
qemudDispatchClientFailure(client);
virMutexUnlock(&client->lock);
}
static int qemudRegisterClientEvent(struct qemud_server *server,
int qemudRegisterClientEvent(struct qemud_server *server,
struct qemud_client *client,
int update) {
int mode;
switch (client->mode) {
case QEMUD_MODE_TLS_HANDSHAKE:
int mode = 0;
if (client->handshake) {
if (gnutls_record_get_direction (client->tlssession) == 0)
mode = VIR_EVENT_HANDLE_READABLE;
mode |= VIR_EVENT_HANDLE_READABLE;
else
mode = VIR_EVENT_HANDLE_WRITABLE;
break;
mode |= VIR_EVENT_HANDLE_WRITABLE;
} else {
/* If there is a message on the rx queue then
* we're wanting more input */
if (client->rx)
mode |= VIR_EVENT_HANDLE_READABLE;
case QEMUD_MODE_RX_HEADER:
case QEMUD_MODE_RX_PAYLOAD:
mode = VIR_EVENT_HANDLE_READABLE;
break;
case QEMUD_MODE_TX_PACKET:
mode = VIR_EVENT_HANDLE_WRITABLE;
break;
case QEMUD_MODE_WAIT_DISPATCH:
mode = 0;
break;
default:
return -1;
/* If there are one or more messages to send back to client,
then monitor for writability on socket */
if (client->tx)
mode |= VIR_EVENT_HANDLE_WRITABLE;
}
if (update) {
@ -1760,6 +1911,29 @@ static void qemudInactiveTimer(int timer ATTRIBUTE_UNUSED, void *data) {
}
}
static void qemudFreeClient(struct qemud_client *client) {
while (client->rx) {
struct qemud_client_message *msg
= qemudClientMessageQueueServe(&client->rx);
VIR_FREE(msg);
}
while (client->dx) {
struct qemud_client_message *msg
= qemudClientMessageQueueServe(&client->dx);
VIR_FREE(msg);
}
while (client->tx) {
struct qemud_client_message *msg
= qemudClientMessageQueueServe(&client->tx);
VIR_FREE(msg);
}
if (client->conn)
virConnectClose(client->conn);
virMutexDestroy(&client->lock);
VIR_FREE(client);
}
static int qemudRunLoop(struct qemud_server *server) {
int timerid = -1;
int ret = -1, i;
@ -1796,8 +1970,11 @@ static int qemudRunLoop(struct qemud_server *server) {
}
virMutexUnlock(&server->lock);
if (qemudOneLoop() < 0)
if (qemudOneLoop() < 0) {
virMutexLock(&server->lock);
DEBUG0("Loop iteration error, exiting\n");
break;
}
virMutexLock(&server->lock);
reprocess:
@ -1808,17 +1985,18 @@ static int qemudRunLoop(struct qemud_server *server) {
&& server->clients[i]->refs == 0;
virMutexUnlock(&server->clients[i]->lock);
if (inactive) {
if (server->clients[i]->conn)
virConnectClose(server->clients[i]->conn);
virMutexDestroy(&server->clients[i]->lock);
VIR_FREE(server->clients[i]);
qemudFreeClient(server->clients[i]);
server->nclients--;
if (i < server->nclients) {
if (i < server->nclients)
memmove(server->clients + i,
server->clients + i + 1,
server->nclients - i);
goto reprocess;
sizeof (*server->clients) * (server->nclients - i));
if (VIR_REALLOC_N(server->clients,
server->nclients) < 0) {
; /* ignore */
}
goto reprocess;
}
}
@ -1843,6 +2021,7 @@ static int qemudRunLoop(struct qemud_server *server) {
pthread_join(thread, NULL);
virMutexLock(&server->lock);
}
VIR_FREE(server->workers);
free(server->workers);
virMutexUnlock(&server->lock);
@ -2223,6 +2402,9 @@ remoteReadConfigFile (struct qemud_server *server, const char *filename)
GET_CONF_INT (conf, filename, max_workers);
GET_CONF_INT (conf, filename, max_clients);
GET_CONF_INT (conf, filename, max_requests);
GET_CONF_INT (conf, filename, max_client_requests);
virConfFree (conf);
return 0;

View File

@ -65,15 +65,6 @@
#define qemudDebug DEBUG
enum qemud_mode {
QEMUD_MODE_RX_HEADER, /* Receiving the fixed length RPC header data */
QEMUD_MODE_RX_PAYLOAD, /* Receiving the variable length RPC payload data */
QEMUD_MODE_WAIT_DISPATCH, /* Message received, waiting for worker to process */
QEMUD_MODE_IN_DISPATCH, /* RPC call being processed */
QEMUD_MODE_TX_PACKET, /* Transmitting reply to RPC call */
QEMUD_MODE_TLS_HANDSHAKE, /* Performing TLS handshake */
};
/* Whether we're passing reads & writes through a sasl SSF */
enum qemud_sasl_ssf {
QEMUD_SASL_SSF_NONE = 0,
@ -87,6 +78,16 @@ enum qemud_sock_type {
QEMUD_SOCK_TYPE_TLS = 2,
};
struct qemud_client_message {
char buffer [REMOTE_MESSAGE_MAX + REMOTE_MESSAGE_HEADER_XDR_LEN];
unsigned int bufferLength;
unsigned int bufferOffset;
int async : 1;
struct qemud_client_message *next;
};
/* Stores the per-client connection state */
struct qemud_client {
virMutex lock;
@ -97,7 +98,6 @@ struct qemud_client {
int watch;
int readonly:1;
int closing:1;
enum qemud_mode mode;
struct sockaddr_storage addr;
socklen_t addrlen;
@ -105,6 +105,7 @@ struct qemud_client {
int type; /* qemud_sock_type */
gnutls_session_t tlssession;
int auth;
int handshake : 1; /* If we're in progress for TLS handshake */
#if HAVE_SASL
sasl_conn_t *saslconn;
int saslSSF;
@ -117,12 +118,20 @@ struct qemud_client {
char *saslUsername;
#endif
unsigned int incomingSerial;
unsigned int outgoingSerial;
char buffer [REMOTE_MESSAGE_MAX];
unsigned int bufferLength;
unsigned int bufferOffset;
/* Count of meages in 'dx' or 'tx' queue
* ie RPC calls in progress. Does not count
* async events which are not used for
* throttling calculations */
int nrequests;
/* Zero or one messages being received. Zero if
* nrequests >= max_clients and throttling */
struct qemud_client_message *rx;
/* Zero or many messages waiting for a worker
* to process them */
struct qemud_client_message *dx;
/* Zero or many messages waiting for transmit
* back to client, including async events */
struct qemud_client_message *tx;
/* This is only valid if a remote open call has been made on this
* connection, otherwise it will be NULL. Also if remote close is
@ -181,16 +190,20 @@ void qemudLog(int priority, const char *fmt, ...)
int qemudSetCloseExec(int fd);
int qemudSetNonBlock(int fd);
unsigned int
int
remoteDispatchClientRequest (struct qemud_server *server,
struct qemud_client *client);
struct qemud_client *client,
struct qemud_client_message *req);
void qemudDispatchClientWrite(struct qemud_server *server,
struct qemud_client *client);
int qemudRegisterClientEvent(struct qemud_server *server,
struct qemud_client *client,
int update);
#if HAVE_POLKIT
int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
#endif
void qemudDispatchClientFailure(struct qemud_client *client);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
struct qemud_client_message *msg);
int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
virDomainPtr dom,
@ -198,4 +211,9 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
int detail,
void *opaque);
#if HAVE_POLKIT
int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
#endif
#endif

View File

@ -111,6 +111,7 @@ static const dispatch_data const dispatch_table[] = {
/* Prototypes */
static void
remoteDispatchDomainEventSend (struct qemud_client *client,
struct qemud_client_message *msg,
virDomainPtr dom,
int event,
int detail);
@ -219,9 +220,10 @@ remoteDispatchConnError (remote_error *rerr,
* Server object is unlocked
* Client object is locked
*/
unsigned int
int
remoteDispatchClientRequest (struct qemud_server *server,
struct qemud_client *client)
struct qemud_client *client,
struct qemud_client_message *msg)
{
XDR xdr;
remote_message_header req, rep;
@ -229,7 +231,8 @@ remoteDispatchClientRequest (struct qemud_server *server,
dispatch_args args;
dispatch_ret ret;
const dispatch_data *data = NULL;
int rv = -1, len;
int rv = -1;
unsigned int len;
virConnectPtr conn = NULL;
memset(&args, 0, sizeof args);
@ -237,7 +240,10 @@ remoteDispatchClientRequest (struct qemud_server *server,
memset(&rerr, 0, sizeof rerr);
/* Parse the header. */
xdrmem_create (&xdr, client->buffer, client->bufferLength, XDR_DECODE);
xdrmem_create (&xdr,
msg->buffer + REMOTE_MESSAGE_HEADER_XDR_LEN,
msg->bufferLength - REMOTE_MESSAGE_HEADER_XDR_LEN,
XDR_DECODE);
if (!xdr_remote_message_header (&xdr, &req))
goto fatal_error;
@ -333,10 +339,10 @@ rpc_error:
rep.status = rv < 0 ? REMOTE_ERROR : REMOTE_OK;
/* Serialise the return header. */
xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
len = 0; /* We'll come back and write this later. */
if (!xdr_int (&xdr, &len)) {
if (!xdr_u_int (&xdr, &len)) {
if (rv == 0) xdr_free (data->ret_filter, (char*)&ret);
goto fatal_error;
}
@ -364,17 +370,21 @@ rpc_error:
if (xdr_setpos (&xdr, 0) == 0)
goto fatal_error;
if (!xdr_int (&xdr, &len))
if (!xdr_u_int (&xdr, &len))
goto fatal_error;
xdr_destroy (&xdr);
return len;
msg->bufferLength = len;
msg->bufferOffset = 0;
return 0;
fatal_error:
/* Seriously bad stuff happened, so we'll kill off this client
and not send back any RPC error */
xdr_destroy (&xdr);
return 0;
return -1;
}
int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
@ -386,9 +396,20 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
struct qemud_client *client = opaque;
REMOTE_DEBUG("Relaying domain event %d %d", event, detail);
if(client) {
remoteDispatchDomainEventSend (client, dom, event, detail);
qemudDispatchClientWrite(client->server,client);
if (client) {
struct qemud_client_message *ev;
if (VIR_ALLOC(ev) < 0)
return -1;
virMutexLock(&client->lock);
remoteDispatchDomainEventSend (client, ev, dom, event, detail);
if (qemudRegisterClientEvent(client->server, client, 1) < 0)
qemudDispatchClientFailure(client);
virMutexUnlock(&client->lock);
}
return 0;
}
@ -4202,13 +4223,14 @@ remoteDispatchDomainEventsDeregister (struct qemud_server *server ATTRIBUTE_UNUS
static void
remoteDispatchDomainEventSend (struct qemud_client *client,
struct qemud_client_message *msg,
virDomainPtr dom,
int event,
int detail)
{
remote_message_header rep;
XDR xdr;
int len;
unsigned int len;
remote_domain_event_ret data;
if (!client)
@ -4222,11 +4244,11 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
rep.status = REMOTE_OK;
/* Serialise the return header and event. */
xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
len = 0; /* We'll come back and write this later. */
if (!xdr_int (&xdr, &len)) {
/*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (1)"));*/
if (!xdr_u_int (&xdr, &len)) {
/*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (1)"));*/
xdr_destroy (&xdr);
return;
}
@ -4254,8 +4276,8 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
return;
}
if (!xdr_int (&xdr, &len)) {
/*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (2)"));*/
if (!xdr_u_int (&xdr, &len)) {
/*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (2)"));*/
xdr_destroy (&xdr);
return;
}
@ -4263,9 +4285,10 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
xdr_destroy (&xdr);
/* Send it. */
client->mode = QEMUD_MODE_TX_PACKET;
client->bufferLength = len;
client->bufferOffset = 0;
msg->async = 1;
msg->bufferLength = len;
msg->bufferOffset = 0;
qemudClientMessageQueuePush(&client->tx, msg);
}
/*----- Helpers. -----*/

View File

@ -246,6 +246,19 @@ max_clients = 20
# of clients allowed
min_workers = 5
max_workers = 20
# Total global limit on concurrent RPC calls. Should be
# at least as large as max_workers. Beyond this, RPC requests
# will be read into memory and queued. This directly impact
# memory usage, currently each request requires 256 KB of
# memory. So by default upto 5 MB of memory is used
max_requests = 20
# Limit on concurrent requests from a single client
# connection. To avoid one client monopolizing the server
# this should be a small fraction of the global max_requests
# and max_workers parameter
max_client_requests = 5
"
test Libvirtd.lns get conf =
@ -499,3 +512,16 @@ max_workers = 20
{ "#comment" = "of clients allowed"}
{ "min_workers" = "5" }
{ "max_workers" = "20" }
{ "#empty" }
{ "#comment" = "Total global limit on concurrent RPC calls. Should be" }
{ "#comment" = "at least as large as max_workers. Beyond this, RPC requests" }
{ "#comment" = "will be read into memory and queued. This directly impact" }
{ "#comment" = "memory usage, currently each request requires 256 KB of" }
{ "#comment" = "memory. So by default upto 5 MB of memory is used" }
{ "max_requests" = "20" }
{ "#empty" }
{ "#comment" = "Limit on concurrent requests from a single client" }
{ "#comment" = "connection. To avoid one client monopolizing the server" }
{ "#comment" = "this should be a small fraction of the global max_requests" }
{ "#comment" = "and max_workers parameter" }
{ "max_client_requests" = "5" }

View File

@ -5663,13 +5663,13 @@ prepareCall(virConnectPtr conn,
/* Length must include the length word itself (always encoded in
* 4 bytes as per RFC 4506).
*/
rv->bufferLength += 4;
rv->bufferLength += REMOTE_MESSAGE_HEADER_XDR_LEN;
/* Encode the length word. */
xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
xdrmem_create (&xdr, rv->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE);
if (!xdr_u_int (&xdr, &rv->bufferLength)) {
error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
_("xdr_int (length word)"));
_("xdr_u_int (length word)"));
goto error;
}
xdr_destroy (&xdr);
@ -5965,20 +5965,26 @@ static int
processCallRecvLen(virConnectPtr conn, struct private_data *priv,
int in_open) {
XDR xdr;
int len;
unsigned int len;
xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
if (!xdr_int (&xdr, &len)) {
if (!xdr_u_int (&xdr, &len)) {
error (in_open ? NULL : conn,
VIR_ERR_RPC, _("xdr_int (length word, reply)"));
VIR_ERR_RPC, _("xdr_u_int (length word, reply)"));
return -1;
}
xdr_destroy (&xdr);
/* Length includes length word - adjust to real length to read. */
len -= 4;
if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) {
error (in_open ? NULL : conn,
VIR_ERR_RPC, _("packet received from server too small"));
return -1;
}
if (len < 0 || len > REMOTE_MESSAGE_MAX) {
/* Length includes length word - adjust to real length to read. */
len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
if (len > REMOTE_MESSAGE_MAX) {
error (in_open ? NULL : conn,
VIR_ERR_RPC, _("packet received from server too large"));
return -1;