mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-03-07 17:28:15 +00:00
Define an API for registering incoming message dispatch filters
All incoming messages currently get routed to the generic method remoteDispatchClientRequest() for processing. To allow incoming data stream messages to bypass this and be routed to a specific location, a concept of dispatch filters is introduced. * qemud/qemud.h: Add a qemud_client_filter struct and a callback qemud_client_filter_func. Maintain a list of filters on every struct qemud_client * qemud/qemud.c: Move remoteDecodeClientMessageHeader() out of qemudWorker() into qemudDispatchClientRead(). Check registered message filters in qemudDispatchClientRead() to decide where to send incoming messages for dispatch.
This commit is contained in:
parent
47cab73499
commit
caaa1b8f13
@ -1457,8 +1457,7 @@ static void *qemudWorker(void *data)
|
||||
|
||||
/* This function drops the lock during dispatch,
|
||||
* and re-acquires it before returning */
|
||||
if (remoteDecodeClientMessageHeader(msg) < 0 ||
|
||||
remoteDispatchClientRequest (server, client, msg) < 0) {
|
||||
if (remoteDispatchClientRequest (server, client, msg) < 0) {
|
||||
VIR_FREE(msg);
|
||||
qemudDispatchClientFailure(client);
|
||||
client->refs--;
|
||||
@ -1705,9 +1704,30 @@ readmore:
|
||||
waiting for us */
|
||||
goto readmore;
|
||||
} else {
|
||||
/* Grab the completed message */
|
||||
struct qemud_client_message *msg = qemudClientMessageQueueServe(&client->rx);
|
||||
struct qemud_client_filter *filter;
|
||||
|
||||
/* Decode the header so we can use it for routing decisions */
|
||||
if (remoteDecodeClientMessageHeader(msg) < 0) {
|
||||
VIR_FREE(msg);
|
||||
qemudDispatchClientFailure(client);
|
||||
}
|
||||
|
||||
/* Check if any filters match this message */
|
||||
filter = client->filters;
|
||||
while (filter) {
|
||||
if ((filter->query)(msg, filter->opaque)) {
|
||||
qemudClientMessageQueuePush(&filter->dx, msg);
|
||||
msg = NULL;
|
||||
break;
|
||||
}
|
||||
filter = filter->next;
|
||||
}
|
||||
|
||||
/* Move completed message to the end of the dispatch queue */
|
||||
qemudClientMessageQueuePush(&client->dx, client->rx);
|
||||
client->rx = NULL;
|
||||
if (msg)
|
||||
qemudClientMessageQueuePush(&client->dx, msg);
|
||||
client->nrequests++;
|
||||
|
||||
/* Possibly need to create another receive buffer */
|
||||
|
@ -90,6 +90,19 @@ struct qemud_client_message {
|
||||
struct qemud_client_message *next;
|
||||
};
|
||||
|
||||
/* Allow for filtering of incoming messages to a custom
|
||||
* dispatch processing queue, instead of client->dx.
|
||||
*/
|
||||
typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque);
|
||||
struct qemud_client_filter {
|
||||
qemud_client_filter_func query;
|
||||
void *opaque;
|
||||
|
||||
struct qemud_client_message *dx;
|
||||
|
||||
struct qemud_client_filter *next;
|
||||
};
|
||||
|
||||
/* Stores the per-client connection state */
|
||||
struct qemud_client {
|
||||
virMutex lock;
|
||||
@ -134,6 +147,9 @@ struct qemud_client {
|
||||
/* Zero or many messages waiting for transmit
|
||||
* back to client, including async events */
|
||||
struct qemud_client_message *tx;
|
||||
/* Filters to capture messages that would otherwise
|
||||
* end up on the 'dx' queue */
|
||||
struct qemud_client_filter *filters;
|
||||
|
||||
/* This is only valid if a remote open call has been made on this
|
||||
* connection, otherwise it will be NULL. Also if remote close is
|
||||
|
Loading…
x
Reference in New Issue
Block a user