Make event loop thread safe & re-entrant safe

This commit is contained in:
Daniel P. Berrange 2008-12-04 22:14:15 +00:00
parent dd6b3318f3
commit f8a5192347
5 changed files with 174 additions and 28 deletions

View File

@ -1,3 +1,10 @@
Thu Dec 4 22:14:41 GMT 2008 Daniel P. Berrange <berrange@redhat.com>
* src/event.c, src/event.h: Make all event handling thread
safe and re-entrant safe, and allow poll() to be woken up
by other threads.
* src/qemud.c, src/qemud.h: Initialize event loop explicitly
Thu Dec 4 22:12:41 GMT 2008 Daniel P. Berrange <berrange@redhat.com>
* qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Make all

View File

@ -28,13 +28,17 @@
#include <poll.h>
#include <sys/time.h>
#include <errno.h>
#include <unistd.h>
#include "qemud.h"
#include "event.h"
#include "memory.h"
#include "util.h"
#define EVENT_DEBUG(fmt, ...) qemudDebug("EVENT: " fmt, __VA_ARGS__)
static int virEventInterruptLocked(void);
/* State for a single file handle being monitored */
struct virEventHandle {
int watch;
@ -63,6 +67,9 @@ struct virEventTimeout {
/* State for the main event loop */
struct virEventLoop {
pthread_mutex_t lock;
pthread_t leader;
int wakeupfd[2];
int handlesCount;
int handlesAlloc;
struct virEventHandle *handles;
@ -80,6 +87,16 @@ static int nextWatch = 0;
/* Unique ID for the next timer to be registered */
static int nextTimer = 0;
static void virEventLock(void)
{
pthread_mutex_lock(&eventLoop.lock);
}
static void virEventUnlock(void)
{
pthread_mutex_unlock(&eventLoop.lock);
}
/*
* Register a callback for monitoring file handle events.
* NB, it *must* be safe to call this from within a callback
@ -89,17 +106,23 @@ int virEventAddHandleImpl(int fd, int events,
virEventHandleCallback cb,
void *opaque,
virFreeCallback ff) {
int watch;
EVENT_DEBUG("Add handle %d %d %p %p", fd, events, cb, opaque);
virEventLock();
if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
EVENT_DEBUG("Used %d handle slots, adding %d more",
eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.handles,
(eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0)
(eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) {
virEventUnlock();
return -1;
}
eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT;
}
eventLoop.handles[eventLoop.handlesCount].watch = nextWatch++;
watch = nextWatch++;
eventLoop.handles[eventLoop.handlesCount].watch = watch;
eventLoop.handles[eventLoop.handlesCount].fd = fd;
eventLoop.handles[eventLoop.handlesCount].events =
virEventHandleTypeToPollEvent(events);
@ -110,11 +133,15 @@ int virEventAddHandleImpl(int fd, int events,
eventLoop.handlesCount++;
return nextWatch-1;
virEventInterruptLocked();
virEventUnlock();
return watch;
}
void virEventUpdateHandleImpl(int watch, int events) {
int i;
virEventLock();
for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].watch == watch) {
eventLoop.handles[i].events =
@ -122,6 +149,8 @@ void virEventUpdateHandleImpl(int watch, int events) {
break;
}
}
virEventInterruptLocked();
virEventUnlock();
}
/*
@ -133,6 +162,7 @@ void virEventUpdateHandleImpl(int watch, int events) {
int virEventRemoveHandleImpl(int watch) {
int i;
EVENT_DEBUG("Remove handle %d", watch);
virEventLock();
for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].deleted)
continue;
@ -140,9 +170,12 @@ int virEventRemoveHandleImpl(int watch) {
if (eventLoop.handles[i].watch == watch) {
EVENT_DEBUG("mark delete %d %d", i, eventLoop.handles[i].fd);
eventLoop.handles[i].deleted = 1;
virEventUnlock();
return 0;
}
}
virEventInterruptLocked();
virEventUnlock();
return -1;
}
@ -157,17 +190,21 @@ int virEventAddTimeoutImpl(int frequency,
void *opaque,
virFreeCallback ff) {
struct timeval now;
int ret;
EVENT_DEBUG("Adding timer %d with %d ms freq", nextTimer, frequency);
if (gettimeofday(&now, NULL) < 0) {
return -1;
}
virEventLock();
if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) {
EVENT_DEBUG("Used %d timeout slots, adding %d more",
eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.timeouts,
(eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0)
(eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) {
virEventUnlock();
return -1;
}
eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT;
}
@ -183,8 +220,10 @@ int virEventAddTimeoutImpl(int frequency,
(((unsigned long long)now.tv_usec)/1000) : 0;
eventLoop.timeoutsCount++;
return nextTimer-1;
ret = nextTimer-1;
virEventInterruptLocked();
virEventUnlock();
return ret;
}
void virEventUpdateTimeoutImpl(int timer, int frequency) {
@ -195,6 +234,7 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
return;
}
virEventLock();
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].frequency = frequency;
@ -205,6 +245,8 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
break;
}
}
virEventInterruptLocked();
virEventUnlock();
}
/*
@ -216,15 +258,19 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
int virEventRemoveTimeoutImpl(int timer) {
int i;
EVENT_DEBUG("Remove timer %d", timer);
virEventLock();
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].deleted)
continue;
if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].deleted = 1;
virEventUnlock();
return 0;
}
}
virEventInterruptLocked();
virEventUnlock();
return -1;
}
@ -336,10 +382,15 @@ static int virEventDispatchTimeouts(void) {
continue;
if (eventLoop.timeouts[i].expiresAt <= now) {
(eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer,
eventLoop.timeouts[i].opaque);
virEventTimeoutCallback cb = eventLoop.timeouts[i].cb;
int timer = eventLoop.timeouts[i].timer;
void *opaque = eventLoop.timeouts[i].opaque;
eventLoop.timeouts[i].expiresAt =
now + eventLoop.timeouts[i].frequency;
virEventUnlock();
(cb)(timer, opaque);
virEventLock();
}
}
return 0;
@ -356,28 +407,25 @@ static int virEventDispatchTimeouts(void) {
*
* Returns 0 upon success, -1 if an error occurred
*/
static int virEventDispatchHandles(struct pollfd *fds) {
static int virEventDispatchHandles(int nfds, struct pollfd *fds) {
int i;
virEventHandleType hEvents;
/* Save this now - it may be changed during dispatch */
int nhandles = eventLoop.handlesCount;
for (i = 0 ; i < nhandles ; i++) {
for (i = 0 ; i < nfds ; i++) {
if (eventLoop.handles[i].deleted) {
EVENT_DEBUG("Skip deleted %d", eventLoop.handles[i].fd);
continue;
}
if (fds[i].revents) {
hEvents = virPollEventToEventHandleType(fds[i].revents);
EVENT_DEBUG("Dispatch %d %d %d %p",
eventLoop.handles[i].watch,
fds[i].fd, fds[i].revents,
eventLoop.handles[i].opaque);
(eventLoop.handles[i].cb)(eventLoop.handles[i].watch,
fds[i].fd,
hEvents,
eventLoop.handles[i].opaque);
virEventHandleCallback cb = eventLoop.handles[i].cb;
void *opaque = eventLoop.handles[i].opaque;
int hEvents = virPollEventToEventHandleType(fds[i].revents);
EVENT_DEBUG("Dispatch %d %d %p", fds[i].fd,
fds[i].revents, eventLoop.handles[i].opaque);
virEventUnlock();
(cb)(eventLoop.handles[i].watch,
fds[i].fd, hEvents, opaque);
virEventLock();
}
}
@ -472,14 +520,21 @@ int virEventRunOnce(void) {
struct pollfd *fds;
int ret, timeout, nfds;
if ((nfds = virEventMakePollFDs(&fds)) < 0)
virEventLock();
eventLoop.leader = pthread_self();
if ((nfds = virEventMakePollFDs(&fds)) < 0) {
virEventUnlock();
return -1;
}
if (virEventCalculateTimeout(&timeout) < 0) {
VIR_FREE(fds);
virEventUnlock();
return -1;
}
virEventUnlock();
retry:
EVENT_DEBUG("Poll on %d handles %p timeout %d", nfds, fds, timeout);
ret = poll(fds, nfds, timeout);
@ -491,27 +546,88 @@ int virEventRunOnce(void) {
VIR_FREE(fds);
return -1;
}
virEventLock();
if (virEventDispatchTimeouts() < 0) {
VIR_FREE(fds);
virEventUnlock();
return -1;
}
if (ret > 0 &&
virEventDispatchHandles(fds) < 0) {
virEventDispatchHandles(nfds, fds) < 0) {
VIR_FREE(fds);
virEventUnlock();
return -1;
}
VIR_FREE(fds);
if (virEventCleanupTimeouts() < 0)
if (virEventCleanupTimeouts() < 0) {
virEventUnlock();
return -1;
}
if (virEventCleanupHandles() < 0) {
virEventUnlock();
return -1;
}
eventLoop.leader = 0;
virEventUnlock();
return 0;
}
static void virEventHandleWakeup(int watch ATTRIBUTE_UNUSED,
int fd,
int events ATTRIBUTE_UNUSED,
void *opaque ATTRIBUTE_UNUSED)
{
char c;
virEventLock();
saferead(fd, &c, sizeof(c));
virEventUnlock();
}
int virEventInit(void)
{
if (pthread_mutex_init(&eventLoop.lock, NULL) != 0)
return -1;
if (virEventCleanupHandles() < 0)
if (pipe(eventLoop.wakeupfd) < 0 ||
qemudSetNonBlock(eventLoop.wakeupfd[0]) < 0 ||
qemudSetNonBlock(eventLoop.wakeupfd[1]) < 0 ||
qemudSetCloseExec(eventLoop.wakeupfd[0]) < 0 ||
qemudSetCloseExec(eventLoop.wakeupfd[1]) < 0)
return -1;
if (virEventAddHandleImpl(eventLoop.wakeupfd[0],
VIR_EVENT_HANDLE_READABLE,
virEventHandleWakeup, NULL, NULL) < 0)
return -1;
return 0;
}
static int virEventInterruptLocked(void)
{
char c = '\0';
if (pthread_self() == eventLoop.leader)
return 0;
if (safewrite(eventLoop.wakeupfd[1], &c, sizeof(c)) != sizeof(c))
return -1;
return 0;
}
int virEventInterrupt(void)
{
int ret;
virEventLock();
ret = virEventInterruptLocked();
virEventUnlock();
return ret;
}
int
virEventHandleTypeToPollEvent(int events)
{

View File

@ -100,6 +100,13 @@ void virEventUpdateTimeoutImpl(int timer, int frequency);
*/
int virEventRemoveTimeoutImpl(int timer);
/**
* virEventInit: Initialize the event loop
*
* returns -1 if initialization failed
*/
int virEventInit(void);
/**
* virEventRunOnce: run a single iteration of the event loop.
*
@ -116,5 +123,12 @@ int
virPollEventToEventHandleType(int events);
/**
* virEventInterrupt: wakeup any thread waiting in poll()
*
* return -1 if wakup failed
*/
int virEventInterrupt(void);
#endif /* __VIRTD_EVENT_H__ */

View File

@ -296,7 +296,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED,
server->shutdown = 1;
}
static int qemudSetCloseExec(int fd) {
int qemudSetCloseExec(int fd) {
int flags;
if ((flags = fcntl(fd, F_GETFD)) < 0)
goto error;
@ -311,7 +311,7 @@ static int qemudSetCloseExec(int fd) {
}
static int qemudSetNonBlock(int fd) {
int qemudSetNonBlock(int fd) {
int flags;
if ((flags = fcntl(fd, F_GETFL)) < 0)
goto error;
@ -753,6 +753,12 @@ static struct qemud_server *qemudInitialize(int sigread) {
server->sigread = sigread;
if (virEventInit() < 0) {
qemudLog(QEMUD_ERR, "%s", _("Failed to initialize event system"));
VIR_FREE(server);
return NULL;
}
virInitialize();
/*

View File

@ -177,6 +177,9 @@ void qemudLog(int priority, const char *fmt, ...)
#define qemudDebug(fmt, ...) do {} while(0)
#endif
int qemudSetCloseExec(int fd);
int qemudSetNonBlock(int fd);
unsigned int
remoteDispatchClientRequest (struct qemud_server *server,
struct qemud_client *client);