diff --git a/ChangeLog b/ChangeLog index d12c1b3600..0855957e6b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +Tue Jan 20 19:24:53 GMT 2009 Daniel P. Berrange + + * qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Allow the + processing of multiple concurrent RPC calls per client + connection. + * qemud/libvirtd.conf, qemud/libvirtd.aug, + qemud/test_libvirtd.aug: Add config param for controlling + number of requests per client. + Tue Jan 20 18:16:53 GMT 2009 Daniel P. Berrange * src/xm_internal.c: Fix 2 misleading comments & potential diff --git a/qemud/libvirtd.aug b/qemud/libvirtd.aug index 7cfd458efb..40acd9363a 100644 --- a/qemud/libvirtd.aug +++ b/qemud/libvirtd.aug @@ -53,6 +53,8 @@ module Libvirtd = let processing_entry = int_entry "min_workers" | int_entry "max_workers" | int_entry "max_clients" + | int_entry "max_requests" + | int_entry "max_client_requests" let logging_entry = int_entry "log_level" | str_entry "log_filters" diff --git a/qemud/libvirtd.conf b/qemud/libvirtd.conf index ecb28dcdf3..49320843ba 100644 --- a/qemud/libvirtd.conf +++ b/qemud/libvirtd.conf @@ -247,6 +247,22 @@ #min_workers = 5 #max_workers = 20 +# Total global limit on concurrent RPC calls. Should be +# at least as large as max_workers. Beyond this, RPC requests +# will be read into memory and queued. This directly impact +# memory usage, currently each request requires 256 KB of +# memory. So by default upto 5 MB of memory is used +# +# XXX this isn't actually enforced yet, only the per-client +# limit is used so far +#max_requests = 20 + +# Limit on concurrent requests from a single client +# connection. To avoid one client monopolizing the server +# this should be a small fraction of the global max_requests +# and max_workers parameter +#max_client_requests = 5 + ################################################################# # # Logging controls diff --git a/qemud/qemud.c b/qemud/qemud.c index 5eec6c0590..21cecf2397 100644 --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -138,6 +138,11 @@ static int min_workers = 5; static int max_workers = 20; static int max_clients = 20; +/* Total number of 'in-process' RPC calls allowed across all clients */ +static int max_requests = 20; +/* Total number of 'in-process' RPC calls allowed by a single client*/ +static int max_client_requests = 5; + #define DH_BITS 1024 static sig_atomic_t sig_errors = 0; @@ -162,9 +167,35 @@ static void sig_handler(int sig, siginfo_t * siginfo, static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); -static int qemudRegisterClientEvent(struct qemud_server *server, - struct qemud_client *client, - int removeFirst); + + +void +qemudClientMessageQueuePush(struct qemud_client_message **queue, + struct qemud_client_message *msg) +{ + struct qemud_client_message *tmp = *queue; + + if (tmp) { + while (tmp->next) + tmp = tmp->next; + tmp->next = msg; + } else { + *queue = msg; + } +} + +static struct qemud_client_message * +qemudClientMessageQueueServe(struct qemud_client_message **queue) +{ + struct qemud_client_message *tmp = *queue; + + if (tmp) { + *queue = tmp->next; + tmp->next = NULL; + } + + return tmp; +} static int remoteCheckCertFile(const char *type, const char *file) @@ -1042,6 +1073,8 @@ remoteCheckCertificate (gnutls_session_t session) static int remoteCheckAccess (struct qemud_client *client) { + struct qemud_client_message *confirm; + /* Verify client certificate. */ if (remoteCheckCertificate (client->tlssession) == -1) { VIR_ERROR0(_("remoteCheckCertificate: " @@ -1051,14 +1084,25 @@ remoteCheckAccess (struct qemud_client *client) "is set so the bad certificate is ignored")); } + if (client->tx) { + VIR_INFO("%s", + _("client had unexpected data pending tx after access check")); + return -1; + } + + if (VIR_ALLOC(confirm) < 0) + return -1; + /* Checks have succeeded. Write a '\1' byte back to the client to * indicate this (otherwise the socket is abruptly closed). * (NB. The '\1' byte is sent in an encrypted record). */ - client->bufferLength = 1; - client->bufferOffset = 0; - client->buffer[0] = '\1'; - client->mode = QEMUD_MODE_TX_PACKET; + confirm->async = 1; + confirm->bufferLength = 1; + confirm->bufferOffset = 0; + confirm->buffer[0] = '\1'; + + client->tx = confirm; return 0; } @@ -1084,6 +1128,7 @@ int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid) { } #endif + static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket *sock) { int fd; struct sockaddr_storage addr; @@ -1099,7 +1144,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket } if (server->nclients >= max_clients) { - VIR_ERROR0(_("Too many active clients, dropping connection")); + VIR_ERROR(_("Too many active clients (%d), dropping connection"), max_clients); close(fd); return -1; } @@ -1137,6 +1182,12 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket client->addrlen = addrlen; client->server = server; + /* Prepare one for packet receive */ + if (VIR_ALLOC(client->rx) < 0) + goto cleanup; + client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; + + #if HAVE_POLKIT /* Only do policy checks for non-root - allow root user through with no checks, as a fail-safe - root can easily @@ -1158,9 +1209,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket #endif if (client->type != QEMUD_SOCK_TYPE_TLS) { - client->mode = QEMUD_MODE_RX_HEADER; - client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; - + /* Plain socket, so prepare to read first message */ if (qemudRegisterClientEvent (server, client, 0) < 0) goto cleanup; } else { @@ -1180,12 +1229,12 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket if (remoteCheckAccess (client) == -1) goto cleanup; + /* Handshake & cert check OK, so prepare to read first message */ if (qemudRegisterClientEvent(server, client, 0) < 0) goto cleanup; } else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) { - /* Most likely. */ - client->mode = QEMUD_MODE_TLS_HANDSHAKE; - client->bufferLength = -1; + /* Most likely, need to do more handshake data */ + client->handshake = 1; if (qemudRegisterClientEvent (server, client, 0) < 0) goto cleanup; @@ -1204,7 +1253,8 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket if (client && client->tlssession) gnutls_deinit (client->tlssession); close (fd); - free (client); + VIR_FREE(client->rx); + VIR_FREE(client); return -1; } @@ -1216,8 +1266,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket * We keep the libvirt connection open until any async * jobs have finished, then clean it up elsehwere */ -static void qemudDispatchClientFailure(struct qemud_server *server ATTRIBUTE_UNUSED, - struct qemud_client *client) { +void qemudDispatchClientFailure(struct qemud_client *client) { virEventRemoveHandleImpl(client->watch); /* Deregister event delivery callback */ @@ -1242,7 +1291,7 @@ static struct qemud_client *qemudPendingJob(struct qemud_server *server) int i; for (i = 0 ; i < server->nclients ; i++) { virMutexLock(&server->clients[i]->lock); - if (server->clients[i]->mode == QEMUD_MODE_WAIT_DISPATCH) { + if (server->clients[i]->dx) { /* Delibrately don't unlock client - caller wants the lock */ return server->clients[i]; } @@ -1256,8 +1305,9 @@ static void *qemudWorker(void *data) struct qemud_server *server = data; while (1) { - struct qemud_client *client; - int len; + struct qemud_client *client = NULL; + struct qemud_client_message *reply; + virMutexLock(&server->lock); while ((client = qemudPendingJob(server)) == NULL) { if (virCondWait(&server->job, &server->lock) < 0) { @@ -1268,55 +1318,75 @@ static void *qemudWorker(void *data) virMutexUnlock(&server->lock); /* We own a locked client now... */ - client->mode = QEMUD_MODE_IN_DISPATCH; client->refs++; - if ((len = remoteDispatchClientRequest (server, client)) == 0) - qemudDispatchClientFailure(server, client); + /* Remove our message from dispatch queue while we use it */ + reply = qemudClientMessageQueueServe(&client->dx); - /* Set up the output buffer. */ - client->mode = QEMUD_MODE_TX_PACKET; - client->bufferLength = len; - client->bufferOffset = 0; + /* This function drops the lock during dispatch, + * and re-acquires it before returning */ + if (remoteDispatchClientRequest (server, client, reply) < 0) { + VIR_FREE(reply); + qemudDispatchClientFailure(client); + client->refs--; + virMutexUnlock(&client->lock); + continue; + } + + /* Put reply on end of tx queue to send out */ + qemudClientMessageQueuePush(&client->tx, reply); if (qemudRegisterClientEvent(server, client, 1) < 0) - qemudDispatchClientFailure(server, client); + qemudDispatchClientFailure(client); client->refs--; virMutexUnlock(&client->lock); - virMutexUnlock(&server->lock); } } -static int qemudClientReadBuf(struct qemud_server *server, - struct qemud_client *client, - char *data, unsigned len) { - int ret; +/* + * Read data into buffer using wire decoding (plain or TLS) + * + * Returns: + * -1 on error or EOF + * 0 on EAGAIN + * n number of bytes + */ +static ssize_t qemudClientReadBuf(struct qemud_client *client, + char *data, ssize_t len) { + ssize_t ret; + + if (len < 0) { + VIR_ERROR(_("unexpected negative length request %d"), len); + qemudDispatchClientFailure(client); + return -1; + } /*qemudDebug ("qemudClientRead: len = %d", len);*/ if (!client->tlssession) { - if ((ret = read (client->fd, data, len)) <= 0) { - if (ret == 0 || errno != EAGAIN) { - if (ret != 0) - VIR_ERROR(_("read: %s"), strerror (errno)); - qemudDispatchClientFailure(server, client); - } + ret = read (client->fd, data, len); + if (ret == -1 && (errno == EAGAIN || + errno == EINTR)) + return 0; + if (ret <= 0) { + if (ret != 0) + VIR_ERROR(_("read: %s"), strerror (errno)); + qemudDispatchClientFailure(client); return -1; } } else { ret = gnutls_record_recv (client->tlssession, data, len); - if (qemudRegisterClientEvent (server, client, 1) < 0) - qemudDispatchClientFailure (server, client); - else if (ret <= 0) { - if (ret == 0 || (ret != GNUTLS_E_AGAIN && - ret != GNUTLS_E_INTERRUPTED)) { - if (ret != 0) - VIR_ERROR(_("gnutls_record_recv: %s"), - gnutls_strerror (ret)); - qemudDispatchClientFailure (server, client); - } + + if (ret < 0 && (ret == GNUTLS_E_AGAIN || + ret == GNUTLS_E_INTERRUPTED)) + return 0; + if (ret <= 0) { + if (ret != 0) + VIR_ERROR(_("gnutls_record_recv: %s"), + gnutls_strerror (ret)); + qemudDispatchClientFailure(client); return -1; } } @@ -1324,22 +1394,37 @@ static int qemudClientReadBuf(struct qemud_server *server, return ret; } -static int qemudClientReadPlain(struct qemud_server *server, - struct qemud_client *client) { - int ret; - ret = qemudClientReadBuf(server, client, - client->buffer + client->bufferOffset, - client->bufferLength - client->bufferOffset); - if (ret < 0) - return ret; - client->bufferOffset += ret; - return 0; +/* + * Read data into buffer without decoding + * + * Returns: + * -1 on error or EOF + * 0 on EAGAIN + * n number of bytes + */ +static ssize_t qemudClientReadPlain(struct qemud_client *client) { + ssize_t ret; + ret = qemudClientReadBuf(client, + client->rx->buffer + client->rx->bufferOffset, + client->rx->bufferLength - client->rx->bufferOffset); + if (ret <= 0) + return ret; /* -1 error, 0 eagain */ + + client->rx->bufferOffset += ret; + return ret; } #if HAVE_SASL -static int qemudClientReadSASL(struct qemud_server *server, - struct qemud_client *client) { - int got, want; +/* + * Read data into buffer decoding with SASL + * + * Returns: + * -1 on error or EOF + * 0 on EAGAIN + * n number of bytes + */ +static ssize_t qemudClientReadSASL(struct qemud_client *client) { + ssize_t got, want; /* We're doing a SSF data read, so now its times to ensure * future writes are under SSF too. @@ -1350,166 +1435,176 @@ static int qemudClientReadSASL(struct qemud_server *server, /* Need to read some more data off the wire */ if (client->saslDecoded == NULL) { + int ret; char encoded[8192]; - int encodedLen = sizeof(encoded); - encodedLen = qemudClientReadBuf(server, client, encoded, encodedLen); + ssize_t encodedLen = sizeof(encoded); + encodedLen = qemudClientReadBuf(client, encoded, encodedLen); - if (encodedLen < 0) + if (encodedLen <= 0) + return encodedLen; + + ret = sasl_decode(client->saslconn, encoded, encodedLen, + &client->saslDecoded, &client->saslDecodedLength); + if (ret != SASL_OK) { + VIR_ERROR(_("failed to decode SASL data %s"), + sasl_errstring(ret, NULL, NULL)); + qemudDispatchClientFailure(client); return -1; - - sasl_decode(client->saslconn, encoded, encodedLen, - &client->saslDecoded, &client->saslDecodedLength); + } client->saslDecodedOffset = 0; } /* Some buffered decoded data to return now */ got = client->saslDecodedLength - client->saslDecodedOffset; - want = client->bufferLength - client->bufferOffset; + want = client->rx->bufferLength - client->rx->bufferOffset; if (want > got) want = got; - memcpy(client->buffer + client->bufferOffset, + memcpy(client->rx->buffer + client->rx->bufferOffset, client->saslDecoded + client->saslDecodedOffset, want); client->saslDecodedOffset += want; - client->bufferOffset += want; + client->rx->bufferOffset += want; if (client->saslDecodedOffset == client->saslDecodedLength) { client->saslDecoded = NULL; client->saslDecodedOffset = client->saslDecodedLength = 0; } - return 0; + return want; } #endif -static int qemudClientRead(struct qemud_server *server, - struct qemud_client *client) { +/* + * Read as much data off wire as possible till we fill our + * buffer, or would block on I/O + */ +static ssize_t qemudClientRead(struct qemud_client *client) { #if HAVE_SASL if (client->saslSSF & QEMUD_SASL_SSF_READ) - return qemudClientReadSASL(server, client); + return qemudClientReadSASL(client); else #endif - return qemudClientReadPlain(server, client); + return qemudClientReadPlain(client); } -static void qemudDispatchClientRead(struct qemud_server *server, struct qemud_client *client) { - unsigned int len; +/* + * Read data until we get a complete message to process + */ +static void qemudDispatchClientRead(struct qemud_server *server, + struct qemud_client *client) { /*qemudDebug ("qemudDispatchClientRead: mode = %d", client->mode);*/ - switch (client->mode) { - case QEMUD_MODE_RX_HEADER: { +readmore: + if (qemudClientRead(client) < 0) + return; /* Error */ + + if (client->rx->bufferOffset < client->rx->bufferLength) + return; /* Still not read enough */ + + /* Either done with length word header */ + if (client->rx->bufferLength == REMOTE_MESSAGE_HEADER_XDR_LEN) { + unsigned int len; XDR x; - if (qemudClientRead(server, client) < 0) - return; /* Error, or blocking */ - - if (client->bufferOffset < client->bufferLength) - return; /* Not read enough */ - - xdrmem_create(&x, client->buffer, client->bufferLength, XDR_DECODE); + xdrmem_create(&x, client->rx->buffer, client->rx->bufferLength, XDR_DECODE); if (!xdr_u_int(&x, &len)) { xdr_destroy (&x); DEBUG0("Failed to decode packet length"); - qemudDispatchClientFailure(server, client); + qemudDispatchClientFailure(client); return; } xdr_destroy (&x); + if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) { + DEBUG("Packet length %u too small", len); + qemudDispatchClientFailure(client); + return; + } + + /* Length includes the size of the length word itself */ + len -= REMOTE_MESSAGE_HEADER_XDR_LEN; + if (len > REMOTE_MESSAGE_MAX) { DEBUG("Packet length %u too large", len); - qemudDispatchClientFailure(server, client); + qemudDispatchClientFailure(client); return; } - /* Length include length of the length field itself, so - * check minimum size requirements */ - if (len <= REMOTE_MESSAGE_HEADER_XDR_LEN) { - DEBUG("Packet length %u too small", len); - qemudDispatchClientFailure(server, client); - return; - } - - client->mode = QEMUD_MODE_RX_PAYLOAD; - client->bufferLength = len - REMOTE_MESSAGE_HEADER_XDR_LEN; - client->bufferOffset = 0; + /* Prepare to read rest of message */ + client->rx->bufferLength += len; if (qemudRegisterClientEvent(server, client, 1) < 0) { - qemudDispatchClientFailure(server, client); + qemudDispatchClientFailure(client); return; } - /* Fall through */ - } + /* Try and read payload immediately instead of going back + into poll() because chances are the data is already + waiting for us */ + goto readmore; + } else { + /* Move completed message to the end of the dispatch queue */ + qemudClientMessageQueuePush(&client->dx, client->rx); + client->rx = NULL; + client->nrequests++; - case QEMUD_MODE_RX_PAYLOAD: { - if (qemudClientRead(server, client) < 0) - return; /* Error, or blocking */ - - if (client->bufferOffset < client->bufferLength) - return; /* Not read enough */ - - client->mode = QEMUD_MODE_WAIT_DISPATCH; - if (qemudRegisterClientEvent(server, client, 1) < 0) - qemudDispatchClientFailure(server, client); - - virCondSignal(&server->job); - - break; - } - - case QEMUD_MODE_TLS_HANDSHAKE: { - int ret; - - /* Continue the handshake. */ - ret = gnutls_handshake (client->tlssession); - if (ret == 0) { - /* Finished. Next step is to check the certificate. */ - if (remoteCheckAccess (client) == -1) - qemudDispatchClientFailure (server, client); - else if (qemudRegisterClientEvent (server, client, 1) < 0) - qemudDispatchClientFailure (server, client); - } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) { - VIR_ERROR(_("TLS handshake failed: %s"), - gnutls_strerror (ret)); - qemudDispatchClientFailure (server, client); + /* Possibly need to create another receive buffer */ + if ((client->nrequests < max_client_requests && + VIR_ALLOC(client->rx) < 0)) { + qemudDispatchClientFailure(client); } else { - if (qemudRegisterClientEvent (server ,client, 1) < 0) - qemudDispatchClientFailure (server, client); + if (client->rx) + client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; + + if (qemudRegisterClientEvent(server, client, 1) < 0) + qemudDispatchClientFailure(client); + else + /* Tell one of the workers to get on with it... */ + virCondSignal(&server->job); } - - break; - } - - default: - DEBUG("Got unexpected data read while in %d mode", client->mode); - qemudDispatchClientFailure(server, client); } } -static int qemudClientWriteBuf(struct qemud_server *server, - struct qemud_client *client, - const char *data, int len) { - int ret; +/* + * Send a chunk of data using wire encoding (plain or TLS) + * + * Returns: + * -1 on error + * 0 on EAGAIN + * n number of bytes + */ +static ssize_t qemudClientWriteBuf(struct qemud_client *client, + const char *data, ssize_t len) { + ssize_t ret; + + if (len < 0) { + VIR_ERROR(_("unexpected negative length request %d"), len); + qemudDispatchClientFailure(client); + return -1; + } + if (!client->tlssession) { - if ((ret = safewrite(client->fd, data, len)) == -1) { + if ((ret = write(client->fd, data, len)) == -1) { + if (errno == EAGAIN || errno == EINTR) + return 0; VIR_ERROR(_("write: %s"), strerror (errno)); - qemudDispatchClientFailure(server, client); + qemudDispatchClientFailure(client); return -1; } } else { ret = gnutls_record_send (client->tlssession, data, len); - if (qemudRegisterClientEvent (server, client, 1) < 0) - qemudDispatchClientFailure (server, client); - else if (ret < 0) { - if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) { - VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret)); - qemudDispatchClientFailure (server, client); - } + if (ret < 0) { + if (ret == GNUTLS_E_INTERRUPTED || + ret == GNUTLS_E_AGAIN) + return 0; + + VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret)); + qemudDispatchClientFailure(client); return -1; } } @@ -1517,42 +1612,62 @@ static int qemudClientWriteBuf(struct qemud_server *server, } -static int qemudClientWritePlain(struct qemud_server *server, - struct qemud_client *client) { - int ret = qemudClientWriteBuf(server, client, - client->buffer + client->bufferOffset, - client->bufferLength - client->bufferOffset); - if (ret < 0) - return -1; - client->bufferOffset += ret; - return 0; +/* + * Send client->tx using no encoding + * + * Returns: + * -1 on error or EOF + * 0 on EAGAIN + * n number of bytes + */ +static int qemudClientWritePlain(struct qemud_client *client) { + int ret = qemudClientWriteBuf(client, + client->tx->buffer + client->tx->bufferOffset, + client->tx->bufferLength - client->tx->bufferOffset); + if (ret <= 0) + return ret; /* -1 error, 0 = egain */ + client->tx->bufferOffset += ret; + return ret; } #if HAVE_SASL -static int qemudClientWriteSASL(struct qemud_server *server, - struct qemud_client *client) { +/* + * Send client->tx using SASL encoding + * + * Returns: + * -1 on error + * 0 on EAGAIN + * n number of bytes + */ +static int qemudClientWriteSASL(struct qemud_client *client) { int ret; /* Not got any pending encoded data, so we need to encode raw stuff */ if (client->saslEncoded == NULL) { - int err; - err = sasl_encode(client->saslconn, - client->buffer + client->bufferOffset, - client->bufferLength - client->bufferOffset, + ret = sasl_encode(client->saslconn, + client->tx->buffer + client->tx->bufferOffset, + client->tx->bufferLength - client->tx->bufferOffset, &client->saslEncoded, &client->saslEncodedLength); + if (ret != SASL_OK) { + VIR_ERROR(_("failed to encode SASL data %s"), + sasl_errstring(ret, NULL, NULL)); + qemudDispatchClientFailure(client); + return -1; + } + client->saslEncodedOffset = 0; } /* Send some of the encoded stuff out on the wire */ - ret = qemudClientWriteBuf(server, client, + ret = qemudClientWriteBuf(client, client->saslEncoded + client->saslEncodedOffset, client->saslEncodedLength - client->saslEncodedOffset); - if (ret < 0) - return -1; + if (ret <= 0) + return ret; /* -1 error, 0 == egain */ /* Note how much we sent */ client->saslEncodedOffset += ret; @@ -1561,77 +1676,106 @@ static int qemudClientWriteSASL(struct qemud_server *server, if (client->saslEncodedOffset == client->saslEncodedLength) { client->saslEncoded = NULL; client->saslEncodedOffset = client->saslEncodedLength = 0; - client->bufferOffset = client->bufferLength; + + /* Mark as complete, so caller detects completion */ + client->tx->bufferOffset = client->tx->bufferLength; } - return 0; + return ret; } #endif -static int qemudClientWrite(struct qemud_server *server, - struct qemud_client *client) { +/* + * Send as much data in the client->tx as possible + * + * Returns: + * -1 on error or EOF + * 0 on EAGAIN + * n number of bytes + */ +static ssize_t qemudClientWrite(struct qemud_client *client) { #if HAVE_SASL if (client->saslSSF & QEMUD_SASL_SSF_WRITE) - return qemudClientWriteSASL(server, client); + return qemudClientWriteSASL(client); else #endif - return qemudClientWritePlain(server, client); + return qemudClientWritePlain(client); } -void +/* + * Process all queued client->tx messages until + * we would block on I/O + */ +static void qemudDispatchClientWrite(struct qemud_server *server, struct qemud_client *client) { - switch (client->mode) { - case QEMUD_MODE_TX_PACKET: { - if (qemudClientWrite(server, client) < 0) + while (client->tx) { + ssize_t ret; + + ret = qemudClientWrite(client); + if (ret < 0) { + qemudDispatchClientFailure(client); return; + } + if (ret == 0) + return; /* Would block on write EAGAIN */ - if (client->bufferOffset == client->bufferLength) { - if (client->closing) { - qemudDispatchClientFailure (server, client); + if (client->tx->bufferOffset == client->tx->bufferLength) { + struct qemud_client_message *reply; + + /* Get finished reply from head of tx queue */ + reply = qemudClientMessageQueueServe(&client->tx); + + /* If its not an async message, then we have + * just completed an RPC request */ + if (!reply->async) + client->nrequests--; + + /* Move record to end of 'rx' ist */ + if (!client->rx && + client->nrequests < max_client_requests) { + /* Reset message record for next RX attempt */ + client->rx = reply; + client->rx->bufferOffset = 0; + client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; } else { - /* Done writing, switch back to receive */ - client->mode = QEMUD_MODE_RX_HEADER; - client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; - client->bufferOffset = 0; - - if (qemudRegisterClientEvent (server, client, 1) < 0) - qemudDispatchClientFailure (server, client); + VIR_FREE(reply); } - } - /* Still writing */ - break; - } - case QEMUD_MODE_TLS_HANDSHAKE: { - int ret; - - /* Continue the handshake. */ - ret = gnutls_handshake (client->tlssession); - if (ret == 0) { - /* Finished. Next step is to check the certificate. */ - if (remoteCheckAccess (client) == -1) - qemudDispatchClientFailure (server, client); - else if (qemudRegisterClientEvent (server, client, 1)) - qemudDispatchClientFailure (server, client); - } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) { - VIR_ERROR(_("TLS handshake failed: %s"), gnutls_strerror (ret)); - qemudDispatchClientFailure (server, client); - } else { - if (qemudRegisterClientEvent (server, client, 1)) - qemudDispatchClientFailure (server, client); - } - - break; - } - - default: - DEBUG("Got unexpected data write while in %d mode", client->mode); - qemudDispatchClientFailure(server, client); + if (client->closing || + qemudRegisterClientEvent (server, client, 1) < 0) + qemudDispatchClientFailure(client); + } } } +static void +qemudDispatchClientHandshake(struct qemud_server *server, + struct qemud_client *client) { + int ret; + /* Continue the handshake. */ + ret = gnutls_handshake (client->tlssession); + if (ret == 0) { + /* Finished. Next step is to check the certificate. */ + if (remoteCheckAccess (client) == -1) + qemudDispatchClientFailure(client); + else if (qemudRegisterClientEvent (server, client, 1)) + qemudDispatchClientFailure(client); + } else if (ret == GNUTLS_E_AGAIN || + ret == GNUTLS_E_INTERRUPTED) { + /* Carry on waiting for more handshake. Update + the events just in case handshake data flow + direction has changed */ + if (qemudRegisterClientEvent (server, client, 1)) + qemudDispatchClientFailure(client); + } else { + /* Fatal error in handshake */ + VIR_ERROR(_("TLS handshake failed: %s"), + gnutls_strerror (ret)); + qemudDispatchClientFailure(client); + } +} static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) { @@ -1642,59 +1786,66 @@ qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) { virMutexLock(&server->lock); for (i = 0 ; i < server->nclients ; i++) { + virMutexLock(&server->clients[i]->lock); if (server->clients[i]->watch == watch) { client = server->clients[i]; break; } + virMutexUnlock(&server->clients[i]->lock); } - if (!client) { - virMutexUnlock(&server->lock); - return; - } - - virMutexLock(&client->lock); virMutexUnlock(&server->lock); - if (client->fd != fd) + if (!client) { return; + } + + if (client->fd != fd) { + virMutexUnlock(&client->lock); + return; + } + + if (events & (VIR_EVENT_HANDLE_WRITABLE | + VIR_EVENT_HANDLE_READABLE)) { + if (client->handshake) { + qemudDispatchClientHandshake(server, client); + } else { + if (events & VIR_EVENT_HANDLE_WRITABLE) + qemudDispatchClientWrite(server, client); + if (events & VIR_EVENT_HANDLE_READABLE) + qemudDispatchClientRead(server, client); + } + } + + /* NB, will get HANGUP + READABLE at same time upon + * disconnect */ + if (events & (VIR_EVENT_HANDLE_ERROR | + VIR_EVENT_HANDLE_HANGUP)) + qemudDispatchClientFailure(client); - if (events == VIR_EVENT_HANDLE_WRITABLE) - qemudDispatchClientWrite(server, client); - else if (events == VIR_EVENT_HANDLE_READABLE) - qemudDispatchClientRead(server, client); - else - qemudDispatchClientFailure(server, client); virMutexUnlock(&client->lock); } -static int qemudRegisterClientEvent(struct qemud_server *server, - struct qemud_client *client, - int update) { - int mode; - switch (client->mode) { - case QEMUD_MODE_TLS_HANDSHAKE: +int qemudRegisterClientEvent(struct qemud_server *server, + struct qemud_client *client, + int update) { + int mode = 0; + + if (client->handshake) { if (gnutls_record_get_direction (client->tlssession) == 0) - mode = VIR_EVENT_HANDLE_READABLE; + mode |= VIR_EVENT_HANDLE_READABLE; else - mode = VIR_EVENT_HANDLE_WRITABLE; - break; + mode |= VIR_EVENT_HANDLE_WRITABLE; + } else { + /* If there is a message on the rx queue then + * we're wanting more input */ + if (client->rx) + mode |= VIR_EVENT_HANDLE_READABLE; - case QEMUD_MODE_RX_HEADER: - case QEMUD_MODE_RX_PAYLOAD: - mode = VIR_EVENT_HANDLE_READABLE; - break; - - case QEMUD_MODE_TX_PACKET: - mode = VIR_EVENT_HANDLE_WRITABLE; - break; - - case QEMUD_MODE_WAIT_DISPATCH: - mode = 0; - break; - - default: - return -1; + /* If there are one or more messages to send back to client, + then monitor for writability on socket */ + if (client->tx) + mode |= VIR_EVENT_HANDLE_WRITABLE; } if (update) { @@ -1760,6 +1911,29 @@ static void qemudInactiveTimer(int timer ATTRIBUTE_UNUSED, void *data) { } } +static void qemudFreeClient(struct qemud_client *client) { + while (client->rx) { + struct qemud_client_message *msg + = qemudClientMessageQueueServe(&client->rx); + VIR_FREE(msg); + } + while (client->dx) { + struct qemud_client_message *msg + = qemudClientMessageQueueServe(&client->dx); + VIR_FREE(msg); + } + while (client->tx) { + struct qemud_client_message *msg + = qemudClientMessageQueueServe(&client->tx); + VIR_FREE(msg); + } + + if (client->conn) + virConnectClose(client->conn); + virMutexDestroy(&client->lock); + VIR_FREE(client); +} + static int qemudRunLoop(struct qemud_server *server) { int timerid = -1; int ret = -1, i; @@ -1796,8 +1970,11 @@ static int qemudRunLoop(struct qemud_server *server) { } virMutexUnlock(&server->lock); - if (qemudOneLoop() < 0) + if (qemudOneLoop() < 0) { + virMutexLock(&server->lock); + DEBUG0("Loop iteration error, exiting\n"); break; + } virMutexLock(&server->lock); reprocess: @@ -1808,17 +1985,18 @@ static int qemudRunLoop(struct qemud_server *server) { && server->clients[i]->refs == 0; virMutexUnlock(&server->clients[i]->lock); if (inactive) { - if (server->clients[i]->conn) - virConnectClose(server->clients[i]->conn); - virMutexDestroy(&server->clients[i]->lock); - VIR_FREE(server->clients[i]); + qemudFreeClient(server->clients[i]); server->nclients--; - if (i < server->nclients) { + if (i < server->nclients) memmove(server->clients + i, server->clients + i + 1, - server->nclients - i); - goto reprocess; + sizeof (*server->clients) * (server->nclients - i)); + + if (VIR_REALLOC_N(server->clients, + server->nclients) < 0) { + ; /* ignore */ } + goto reprocess; } } @@ -1843,6 +2021,7 @@ static int qemudRunLoop(struct qemud_server *server) { pthread_join(thread, NULL); virMutexLock(&server->lock); } + VIR_FREE(server->workers); free(server->workers); virMutexUnlock(&server->lock); @@ -2223,6 +2402,9 @@ remoteReadConfigFile (struct qemud_server *server, const char *filename) GET_CONF_INT (conf, filename, max_workers); GET_CONF_INT (conf, filename, max_clients); + GET_CONF_INT (conf, filename, max_requests); + GET_CONF_INT (conf, filename, max_client_requests); + virConfFree (conf); return 0; diff --git a/qemud/qemud.h b/qemud/qemud.h index 12897a12bf..9a2ff80f23 100644 --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -65,15 +65,6 @@ #define qemudDebug DEBUG -enum qemud_mode { - QEMUD_MODE_RX_HEADER, /* Receiving the fixed length RPC header data */ - QEMUD_MODE_RX_PAYLOAD, /* Receiving the variable length RPC payload data */ - QEMUD_MODE_WAIT_DISPATCH, /* Message received, waiting for worker to process */ - QEMUD_MODE_IN_DISPATCH, /* RPC call being processed */ - QEMUD_MODE_TX_PACKET, /* Transmitting reply to RPC call */ - QEMUD_MODE_TLS_HANDSHAKE, /* Performing TLS handshake */ -}; - /* Whether we're passing reads & writes through a sasl SSF */ enum qemud_sasl_ssf { QEMUD_SASL_SSF_NONE = 0, @@ -87,6 +78,16 @@ enum qemud_sock_type { QEMUD_SOCK_TYPE_TLS = 2, }; +struct qemud_client_message { + char buffer [REMOTE_MESSAGE_MAX + REMOTE_MESSAGE_HEADER_XDR_LEN]; + unsigned int bufferLength; + unsigned int bufferOffset; + + int async : 1; + + struct qemud_client_message *next; +}; + /* Stores the per-client connection state */ struct qemud_client { virMutex lock; @@ -97,7 +98,6 @@ struct qemud_client { int watch; int readonly:1; int closing:1; - enum qemud_mode mode; struct sockaddr_storage addr; socklen_t addrlen; @@ -105,6 +105,7 @@ struct qemud_client { int type; /* qemud_sock_type */ gnutls_session_t tlssession; int auth; + int handshake : 1; /* If we're in progress for TLS handshake */ #if HAVE_SASL sasl_conn_t *saslconn; int saslSSF; @@ -117,12 +118,20 @@ struct qemud_client { char *saslUsername; #endif - unsigned int incomingSerial; - unsigned int outgoingSerial; - - char buffer [REMOTE_MESSAGE_MAX]; - unsigned int bufferLength; - unsigned int bufferOffset; + /* Count of meages in 'dx' or 'tx' queue + * ie RPC calls in progress. Does not count + * async events which are not used for + * throttling calculations */ + int nrequests; + /* Zero or one messages being received. Zero if + * nrequests >= max_clients and throttling */ + struct qemud_client_message *rx; + /* Zero or many messages waiting for a worker + * to process them */ + struct qemud_client_message *dx; + /* Zero or many messages waiting for transmit + * back to client, including async events */ + struct qemud_client_message *tx; /* This is only valid if a remote open call has been made on this * connection, otherwise it will be NULL. Also if remote close is @@ -181,16 +190,20 @@ void qemudLog(int priority, const char *fmt, ...) int qemudSetCloseExec(int fd); int qemudSetNonBlock(int fd); -unsigned int +int remoteDispatchClientRequest (struct qemud_server *server, - struct qemud_client *client); + struct qemud_client *client, + struct qemud_client_message *req); -void qemudDispatchClientWrite(struct qemud_server *server, - struct qemud_client *client); +int qemudRegisterClientEvent(struct qemud_server *server, + struct qemud_client *client, + int update); -#if HAVE_POLKIT -int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid); -#endif +void qemudDispatchClientFailure(struct qemud_client *client); + +void +qemudClientMessageQueuePush(struct qemud_client_message **queue, + struct qemud_client_message *msg); int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED, virDomainPtr dom, @@ -198,4 +211,9 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED, int detail, void *opaque); + +#if HAVE_POLKIT +int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid); +#endif + #endif diff --git a/qemud/remote.c b/qemud/remote.c index 25a6f4bd16..e41e2ee840 100644 --- a/qemud/remote.c +++ b/qemud/remote.c @@ -111,6 +111,7 @@ static const dispatch_data const dispatch_table[] = { /* Prototypes */ static void remoteDispatchDomainEventSend (struct qemud_client *client, + struct qemud_client_message *msg, virDomainPtr dom, int event, int detail); @@ -219,9 +220,10 @@ remoteDispatchConnError (remote_error *rerr, * Server object is unlocked * Client object is locked */ -unsigned int +int remoteDispatchClientRequest (struct qemud_server *server, - struct qemud_client *client) + struct qemud_client *client, + struct qemud_client_message *msg) { XDR xdr; remote_message_header req, rep; @@ -229,7 +231,8 @@ remoteDispatchClientRequest (struct qemud_server *server, dispatch_args args; dispatch_ret ret; const dispatch_data *data = NULL; - int rv = -1, len; + int rv = -1; + unsigned int len; virConnectPtr conn = NULL; memset(&args, 0, sizeof args); @@ -237,7 +240,10 @@ remoteDispatchClientRequest (struct qemud_server *server, memset(&rerr, 0, sizeof rerr); /* Parse the header. */ - xdrmem_create (&xdr, client->buffer, client->bufferLength, XDR_DECODE); + xdrmem_create (&xdr, + msg->buffer + REMOTE_MESSAGE_HEADER_XDR_LEN, + msg->bufferLength - REMOTE_MESSAGE_HEADER_XDR_LEN, + XDR_DECODE); if (!xdr_remote_message_header (&xdr, &req)) goto fatal_error; @@ -333,10 +339,10 @@ rpc_error: rep.status = rv < 0 ? REMOTE_ERROR : REMOTE_OK; /* Serialise the return header. */ - xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE); + xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE); len = 0; /* We'll come back and write this later. */ - if (!xdr_int (&xdr, &len)) { + if (!xdr_u_int (&xdr, &len)) { if (rv == 0) xdr_free (data->ret_filter, (char*)&ret); goto fatal_error; } @@ -364,17 +370,21 @@ rpc_error: if (xdr_setpos (&xdr, 0) == 0) goto fatal_error; - if (!xdr_int (&xdr, &len)) + if (!xdr_u_int (&xdr, &len)) goto fatal_error; xdr_destroy (&xdr); - return len; + + msg->bufferLength = len; + msg->bufferOffset = 0; + + return 0; fatal_error: /* Seriously bad stuff happened, so we'll kill off this client and not send back any RPC error */ xdr_destroy (&xdr); - return 0; + return -1; } int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED, @@ -386,9 +396,20 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED, struct qemud_client *client = opaque; REMOTE_DEBUG("Relaying domain event %d %d", event, detail); - if(client) { - remoteDispatchDomainEventSend (client, dom, event, detail); - qemudDispatchClientWrite(client->server,client); + if (client) { + struct qemud_client_message *ev; + + if (VIR_ALLOC(ev) < 0) + return -1; + + virMutexLock(&client->lock); + + remoteDispatchDomainEventSend (client, ev, dom, event, detail); + + if (qemudRegisterClientEvent(client->server, client, 1) < 0) + qemudDispatchClientFailure(client); + + virMutexUnlock(&client->lock); } return 0; } @@ -4202,13 +4223,14 @@ remoteDispatchDomainEventsDeregister (struct qemud_server *server ATTRIBUTE_UNUS static void remoteDispatchDomainEventSend (struct qemud_client *client, + struct qemud_client_message *msg, virDomainPtr dom, int event, int detail) { remote_message_header rep; XDR xdr; - int len; + unsigned int len; remote_domain_event_ret data; if (!client) @@ -4222,11 +4244,11 @@ remoteDispatchDomainEventSend (struct qemud_client *client, rep.status = REMOTE_OK; /* Serialise the return header and event. */ - xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE); + xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE); len = 0; /* We'll come back and write this later. */ - if (!xdr_int (&xdr, &len)) { - /*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (1)"));*/ + if (!xdr_u_int (&xdr, &len)) { + /*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (1)"));*/ xdr_destroy (&xdr); return; } @@ -4254,8 +4276,8 @@ remoteDispatchDomainEventSend (struct qemud_client *client, return; } - if (!xdr_int (&xdr, &len)) { - /*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (2)"));*/ + if (!xdr_u_int (&xdr, &len)) { + /*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (2)"));*/ xdr_destroy (&xdr); return; } @@ -4263,9 +4285,10 @@ remoteDispatchDomainEventSend (struct qemud_client *client, xdr_destroy (&xdr); /* Send it. */ - client->mode = QEMUD_MODE_TX_PACKET; - client->bufferLength = len; - client->bufferOffset = 0; + msg->async = 1; + msg->bufferLength = len; + msg->bufferOffset = 0; + qemudClientMessageQueuePush(&client->tx, msg); } /*----- Helpers. -----*/ diff --git a/qemud/test_libvirtd.aug b/qemud/test_libvirtd.aug index f3c00b7ff5..e2ea363ef6 100644 --- a/qemud/test_libvirtd.aug +++ b/qemud/test_libvirtd.aug @@ -246,6 +246,19 @@ max_clients = 20 # of clients allowed min_workers = 5 max_workers = 20 + +# Total global limit on concurrent RPC calls. Should be +# at least as large as max_workers. Beyond this, RPC requests +# will be read into memory and queued. This directly impact +# memory usage, currently each request requires 256 KB of +# memory. So by default upto 5 MB of memory is used +max_requests = 20 + +# Limit on concurrent requests from a single client +# connection. To avoid one client monopolizing the server +# this should be a small fraction of the global max_requests +# and max_workers parameter +max_client_requests = 5 " test Libvirtd.lns get conf = @@ -499,3 +512,16 @@ max_workers = 20 { "#comment" = "of clients allowed"} { "min_workers" = "5" } { "max_workers" = "20" } + { "#empty" } + { "#comment" = "Total global limit on concurrent RPC calls. Should be" } + { "#comment" = "at least as large as max_workers. Beyond this, RPC requests" } + { "#comment" = "will be read into memory and queued. This directly impact" } + { "#comment" = "memory usage, currently each request requires 256 KB of" } + { "#comment" = "memory. So by default upto 5 MB of memory is used" } + { "max_requests" = "20" } + { "#empty" } + { "#comment" = "Limit on concurrent requests from a single client" } + { "#comment" = "connection. To avoid one client monopolizing the server" } + { "#comment" = "this should be a small fraction of the global max_requests" } + { "#comment" = "and max_workers parameter" } + { "max_client_requests" = "5" } diff --git a/src/remote_internal.c b/src/remote_internal.c index 449bafa7e5..8c7dd7672b 100644 --- a/src/remote_internal.c +++ b/src/remote_internal.c @@ -5663,13 +5663,13 @@ prepareCall(virConnectPtr conn, /* Length must include the length word itself (always encoded in * 4 bytes as per RFC 4506). */ - rv->bufferLength += 4; + rv->bufferLength += REMOTE_MESSAGE_HEADER_XDR_LEN; /* Encode the length word. */ - xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE); - if (!xdr_int (&xdr, (int *)&rv->bufferLength)) { + xdrmem_create (&xdr, rv->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE); + if (!xdr_u_int (&xdr, &rv->bufferLength)) { error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, - _("xdr_int (length word)")); + _("xdr_u_int (length word)")); goto error; } xdr_destroy (&xdr); @@ -5965,20 +5965,26 @@ static int processCallRecvLen(virConnectPtr conn, struct private_data *priv, int in_open) { XDR xdr; - int len; + unsigned int len; xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE); - if (!xdr_int (&xdr, &len)) { + if (!xdr_u_int (&xdr, &len)) { error (in_open ? NULL : conn, - VIR_ERR_RPC, _("xdr_int (length word, reply)")); + VIR_ERR_RPC, _("xdr_u_int (length word, reply)")); return -1; } xdr_destroy (&xdr); - /* Length includes length word - adjust to real length to read. */ - len -= 4; + if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("packet received from server too small")); + return -1; + } - if (len < 0 || len > REMOTE_MESSAGE_MAX) { + /* Length includes length word - adjust to real length to read. */ + len -= REMOTE_MESSAGE_HEADER_XDR_LEN; + + if (len > REMOTE_MESSAGE_MAX) { error (in_open ? NULL : conn, VIR_ERR_RPC, _("packet received from server too large")); return -1;