Remove all linked list handling from remote client event loop

Directly messing around with the linked list is potentially
dangerous. Introduce some helper APIs to deal with list
manipulating the list

* src/rpc/virnetclient.c: Create linked list handlers
This commit is contained in:
Daniel P. Berrange 2011-11-11 15:26:16 +00:00
parent d776170012
commit 2501d27e18

View File

@ -110,6 +110,97 @@ static void virNetClientIncomingEvent(virNetSocketPtr sock,
int events, int events,
void *opaque); void *opaque);
/* Append a call to the end of the list */
static void virNetClientCallQueue(virNetClientCallPtr *head,
virNetClientCallPtr call)
{
virNetClientCallPtr tmp = *head;
while (tmp && tmp->next) {
tmp = tmp->next;
}
if (tmp)
tmp->next = call;
else
*head = call;
call->next = NULL;
}
#if 0
/* Obtain a call from the head of the list */
static virNetClientCallPtr virNetClientCallServe(virNetClientCallPtr *head)
{
virNetClientCallPtr tmp = *head;
if (tmp)
*head = tmp->next;
else
*head = NULL;
tmp->next = NULL;
return tmp;
}
#endif
/* Remove a call from anywhere in the list */
static void virNetClientCallRemove(virNetClientCallPtr *head,
virNetClientCallPtr call)
{
virNetClientCallPtr tmp = *head;
virNetClientCallPtr prev = NULL;
while (tmp) {
if (tmp == call) {
if (prev)
prev->next = tmp->next;
else
*head = tmp->next;
tmp->next = NULL;
return;
}
prev = tmp;
tmp = tmp->next;
}
}
/* Predicate returns true if matches */
typedef bool (*virNetClientCallPredicate)(virNetClientCallPtr call, void *opaque);
/* Remove a list of calls from the list based on a predicate */
static void virNetClientCallRemovePredicate(virNetClientCallPtr *head,
virNetClientCallPredicate pred,
void *opaque)
{
virNetClientCallPtr tmp = *head;
virNetClientCallPtr prev = NULL;
while (tmp) {
virNetClientCallPtr next = tmp->next;
tmp->next = NULL; /* Temp unlink */
if (pred(tmp, opaque)) {
if (prev)
prev->next = next;
else
*head = next;
} else {
tmp->next = next; /* Reverse temp unlink */
prev = tmp;
}
tmp = next;
}
}
/* Returns true if the predicate matches at least one call in the list */
static bool virNetClientCallMatchPredicate(virNetClientCallPtr head,
virNetClientCallPredicate pred,
void *opaque)
{
virNetClientCallPtr tmp = head;
while (tmp) {
if (pred(tmp, opaque)) {
return true;
}
tmp = tmp->next;
}
return false;
}
static void virNetClientEventFree(void *opaque) static void virNetClientEventFree(void *opaque)
{ {
virNetClientPtr client = opaque; virNetClientPtr client = opaque;
@ -896,6 +987,42 @@ virNetClientIOHandleInput(virNetClientPtr client)
} }
static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
void *opaque)
{
struct pollfd *fd = opaque;
if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
fd->events |= POLLIN;
if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
fd->events |= POLLOUT;
return false;
}
static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
void *opaque)
{
virNetClientCallPtr thiscall = opaque;
if (call == thiscall)
return false;
if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE)
return false;
/*
* ...they won't actually wakeup until
* we release our mutex a short while
* later...
*/
VIR_DEBUG("Waking up sleeping call %p", call);
virCondSignal(&call->cond);
return true;
}
/* /*
* Process all calls pending dispatch/receive until we * Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck * get a reply to our own call. Then quit and pass the buck
@ -911,8 +1038,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
fds[1].fd = client->wakeupReadFD; fds[1].fd = client->wakeupReadFD;
for (;;) { for (;;) {
virNetClientCallPtr tmp = client->waitDispatch;
virNetClientCallPtr prev;
char ignore; char ignore;
sigset_t oldmask, blockedsigs; sigset_t oldmask, blockedsigs;
int timeout = -1; int timeout = -1;
@ -928,14 +1053,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
fds[1].events = fds[1].revents = 0; fds[1].events = fds[1].revents = 0;
fds[1].events = POLLIN; fds[1].events = POLLIN;
while (tmp) {
if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
fds[0].events |= POLLIN;
if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
fds[0].events |= POLLOUT;
tmp = tmp->next; /* Calculate poll events for calls */
} virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopPollEvents,
&fds[0]);
/* We have to be prepared to receive stream data /* We have to be prepared to receive stream data
* regardless of whether any of the calls waiting * regardless of whether any of the calls waiting
@ -1008,37 +1130,16 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
/* Iterate through waiting threads and if /* Iterate through waiting threads and if
* any are complete then tell 'em to wakeup * any are complete then tell 'em to wakeup
*/ */
tmp = client->waitDispatch; virNetClientCallRemovePredicate(&client->waitDispatch,
prev = NULL; virNetClientIOEventLoopRemoveDone,
while (tmp) { thiscall);
if (tmp != thiscall &&
tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
/* Take them out of the list */
if (prev)
prev->next = tmp->next;
else
client->waitDispatch = tmp->next;
/* And wake them up....
* ...they won't actually wakeup until
* we release our mutex a short while
* later...
*/
VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch);
virCondSignal(&tmp->cond);
} else {
prev = tmp;
}
tmp = tmp->next;
}
/* Now see if *we* are done */ /* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
/* We're at head of the list already, so virNetClientCallRemove(&client->waitDispatch, thiscall);
* remove us
*/
client->waitDispatch = thiscall->next;
VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
/* See if someone else is still waiting /* See if someone else is still waiting
* and if so, then pass the buck ! */ * and if so, then pass the buck ! */
if (client->waitDispatch) { if (client->waitDispatch) {
@ -1058,7 +1159,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
error: error:
client->waitDispatch = thiscall->next; virNetClientCallRemove(&client->waitDispatch, thiscall);
VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch);
/* See if someone else is still waiting /* See if someone else is still waiting
* and if so, then pass the buck ! */ * and if so, then pass the buck ! */
@ -1119,22 +1220,13 @@ static int virNetClientIO(virNetClientPtr client,
/* Check to see if another thread is dispatching */ /* Check to see if another thread is dispatching */
if (client->waitDispatch) { if (client->waitDispatch) {
/* Stick ourselves on the end of the wait queue */
virNetClientCallPtr tmp = client->waitDispatch;
char ignore = 1; char ignore = 1;
while (tmp && tmp->next) /* Stick ourselves on the end of the wait queue */
tmp = tmp->next; virNetClientCallQueue(&client->waitDispatch, thiscall);
if (tmp)
tmp->next = thiscall;
else
client->waitDispatch = thiscall;
/* Force other thread to wakeup from poll */ /* Force other thread to wakeup from poll */
if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
if (tmp) virNetClientCallRemove(&client->waitDispatch, thiscall);
tmp->next = NULL;
else
client->waitDispatch = NULL;
virReportSystemError(errno, "%s", virReportSystemError(errno, "%s",
_("failed to wake up polling thread")); _("failed to wake up polling thread"));
return -1; return -1;
@ -1143,17 +1235,7 @@ static int virNetClientIO(virNetClientPtr client,
VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
/* Go to sleep while other thread is working... */ /* Go to sleep while other thread is working... */
if (virCondWait(&thiscall->cond, &client->lock) < 0) { if (virCondWait(&thiscall->cond, &client->lock) < 0) {
if (client->waitDispatch == thiscall) { virNetClientCallRemove(&client->waitDispatch, thiscall);
client->waitDispatch = thiscall->next;
} else {
tmp = client->waitDispatch;
while (tmp && tmp->next &&
tmp->next != thiscall) {
tmp = tmp->next;
}
if (tmp && tmp->next == thiscall)
tmp->next = thiscall->next;
}
virNetError(VIR_ERR_INTERNAL_ERROR, "%s", virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("failed to wait on condition")); _("failed to wait on condition"));
return -1; return -1;
@ -1177,10 +1259,9 @@ static int virNetClientIO(virNetClientPtr client,
} }
/* Grr, someone passed the buck onto us ... */ /* Grr, someone passed the buck onto us ... */
} else { } else {
/* We're first to catch the buck */ /* We're the first to arrive */
client->waitDispatch = thiscall; virNetClientCallQueue(&client->waitDispatch, thiscall);
} }
VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall);