mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-01-15 00:55:17 +00:00
93f77250b3
The existing python demo for domain events does not fully implement the event loop contract. This makes the code useless for real world applications. This change re-writes the demo so that it has a full event loop implementation which is suitable for application usage & better demonstrates integration * examples/domain-events/events-python/event-test.py: Rewrite to include a real world usable event loop implementation
466 lines
16 KiB
Python
466 lines
16 KiB
Python
#!/usr/bin/python -u
|
|
#
|
|
#
|
|
#
|
|
#################################################################################
|
|
# Start off by implementing a general purpose event loop for anyones use
|
|
#################################################################################
|
|
|
|
import sys
|
|
import getopt
|
|
import os
|
|
import libvirt
|
|
import select
|
|
import errno
|
|
import time
|
|
import threading
|
|
|
|
#
|
|
# This general purpose event loop will support waiting for file handle
|
|
# I/O and errors events, as well as scheduling repeatable timers with
|
|
# a fixed interval.
|
|
#
|
|
# It is a pure python implementation based around the poll() API
|
|
#
|
|
class virEventLoopPure:
|
|
# This class contains the data we need to track for a
|
|
# single file handle
|
|
class virEventLoopPureHandle:
|
|
def __init__(self, handle, fd, events, cb, opaque):
|
|
self.handle = handle
|
|
self.fd = fd
|
|
self.events = events
|
|
self.cb = cb
|
|
self.opaque = opaque
|
|
|
|
def get_id(self):
|
|
return self.handle
|
|
|
|
def get_fd(self):
|
|
return self.fd
|
|
|
|
def get_events(self):
|
|
return self.events
|
|
|
|
def set_events(self, events):
|
|
self.events = events
|
|
|
|
def dispatch(self, events):
|
|
self.cb(self.handle,
|
|
self.fd,
|
|
events,
|
|
self.opaque[0],
|
|
self.opaque[1])
|
|
|
|
# This class contains the data we need to track for a
|
|
# single periodic timer
|
|
class virEventLoopPureTimer:
|
|
def __init__(self, timer, interval, cb, opaque):
|
|
self.timer = timer
|
|
self.interval = interval
|
|
self.cb = cb
|
|
self.opaque = opaque
|
|
self.lastfired = 0
|
|
|
|
def get_id(self):
|
|
return self.timer
|
|
|
|
def get_interval(self):
|
|
return self.interval
|
|
|
|
def set_interval(self, interval):
|
|
self.interval = interval
|
|
|
|
def get_last_fired(self):
|
|
return self.lastfired
|
|
|
|
def set_last_fired(self, now):
|
|
self.lastfired = now
|
|
|
|
def dispatch(self):
|
|
self.cb(self.timer,
|
|
self.opaque[0],
|
|
self.opaque[1])
|
|
|
|
|
|
def __init__(self, debug=False):
|
|
self.debugOn = debug
|
|
self.poll = select.poll()
|
|
self.pipetrick = os.pipe()
|
|
self.nextHandleID = 1
|
|
self.nextTimerID = 1
|
|
self.handles = []
|
|
self.timers = []
|
|
self.quit = False
|
|
|
|
# The event loop can be used from multiple threads at once.
|
|
# Specifically while the main thread is sleeping in poll()
|
|
# waiting for events to occur, another thread may come along
|
|
# and add/update/remove a file handle, or timer. When this
|
|
# happens we need to interrupt the poll() sleep in the other
|
|
# thread, so that it'll see the file handle / timer changes.
|
|
#
|
|
# Using OS level signals for this is very unreliable and
|
|
# hard to implement correctly. Thus we use the real classic
|
|
# "self pipe" trick. A anonymous pipe, with one end registered
|
|
# with the event loop for input events. When we need to force
|
|
# the main thread out of a poll() sleep, we simple write a
|
|
# single byte of data to the other end of the pipe.
|
|
self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1]))
|
|
self.poll.register(self.pipetrick[0], select.POLLIN)
|
|
|
|
def debug(self, msg):
|
|
if self.debugOn:
|
|
print msg
|
|
|
|
|
|
# Calculate when the next timeout is due to occurr, returning
|
|
# the absolute timestamp for the next timeout, or 0 if there is
|
|
# no timeout due
|
|
def next_timeout(self):
|
|
next = 0
|
|
for t in self.timers:
|
|
last = t.get_last_fired()
|
|
interval = t.get_interval()
|
|
if interval < 0:
|
|
continue
|
|
if next == 0 or (last + interval) < next:
|
|
next = last + interval
|
|
|
|
return next
|
|
|
|
# Lookup a virEventLoopPureHandle object based on file descriptor
|
|
def get_handle_by_fd(self, fd):
|
|
for h in self.handles:
|
|
if h.get_fd() == fd:
|
|
return h
|
|
return None
|
|
|
|
# Lookup a virEventLoopPureHandle object based on its event loop ID
|
|
def get_handle_by_id(self, handleID):
|
|
for h in self.handles:
|
|
if h.get_id() == handleID:
|
|
return h
|
|
return None
|
|
|
|
|
|
# This is the heart of the event loop, performing one single
|
|
# iteration. It asks when the next timeout is due, and then
|
|
# calcuates the maximum amount of time it is able to sleep
|
|
# for in poll() pending file handle events.
|
|
#
|
|
# It then goes into the poll() sleep.
|
|
#
|
|
# When poll() returns, there will zero or more file handle
|
|
# events which need to be dispatched to registered callbacks
|
|
# It may also be time to fire some periodic timers.
|
|
#
|
|
# Due to the coarse granularity of schedular timeslices, if
|
|
# we ask for a sleep of 500ms in order to satisfy a timer, we
|
|
# may return upto 1 schedular timeslice early. So even though
|
|
# our sleep timeout was reached, the registered timer may not
|
|
# technically be at its expiry point. This leads to us going
|
|
# back around the loop with a crazy 5ms sleep. So when checking
|
|
# if timeouts are due, we allow a margin of 20ms, to avoid
|
|
# these pointless repeated tiny sleeps.
|
|
def run_once(self):
|
|
sleep = -1
|
|
next = self.next_timeout()
|
|
self.debug("Next timeout due at %d" % next)
|
|
if next > 0:
|
|
now = int(time.time() * 1000)
|
|
if now >= next:
|
|
sleep = 0
|
|
else:
|
|
sleep = next - now
|
|
|
|
self.debug("Poll with a sleep of %d" % sleep)
|
|
events = self.poll.poll(sleep / 1000.0)
|
|
|
|
# Dispatch any file handle events that occurred
|
|
for (fd, revents) in events:
|
|
# See if the events was from the self-pipe
|
|
# telling us to wakup. if so, then discard
|
|
# the data just continue
|
|
if fd == self.pipetrick[0]:
|
|
data = os.read(fd, 1)
|
|
continue
|
|
|
|
h = self.get_handle_by_fd(fd)
|
|
if h:
|
|
self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents))
|
|
h.dispatch(self.events_from_poll(revents))
|
|
|
|
now = int(time.time() * 1000)
|
|
for t in self.timers:
|
|
interval = t.get_interval()
|
|
if interval < 0:
|
|
continue
|
|
|
|
want = t.get_last_fired() + interval
|
|
# Deduct 20ms, since schedular timeslice
|
|
# means we could be ever so slightly early
|
|
if now >= (want-20):
|
|
self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want)))
|
|
t.set_last_fired(now)
|
|
t.dispatch()
|
|
|
|
|
|
# Actually the event loop forever
|
|
def run_loop(self):
|
|
self.quit = False
|
|
while not self.quit:
|
|
self.run_once()
|
|
|
|
def interrupt(self):
|
|
os.write(self.pipetrick[1], 'c')
|
|
|
|
|
|
# Registers a new file handle 'fd', monitoring for 'events' (libvirt
|
|
# event constants), firing the callback cb() when an event occurs.
|
|
# Returns a unique integer identier for this handle, that should be
|
|
# used to later update/remove it
|
|
def add_handle(self, fd, events, cb, opaque):
|
|
handleID = self.nextHandleID + 1
|
|
self.nextHandleID = self.nextHandleID + 1
|
|
|
|
h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque)
|
|
self.handles.append(h)
|
|
|
|
self.poll.register(fd, self.events_to_poll(events))
|
|
self.interrupt()
|
|
|
|
self.debug("Add handle %d fd %d events %d" % (handleID, fd, events))
|
|
|
|
return handleID
|
|
|
|
# Registers a new timer with periodic expiry at 'interval' ms,
|
|
# firing cb() each time the timer expires. If 'interval' is -1,
|
|
# then the timer is registered, but not enabled
|
|
# Returns a unique integer identier for this handle, that should be
|
|
# used to later update/remove it
|
|
def add_timer(self, interval, cb, opaque):
|
|
timerID = self.nextTimerID + 1
|
|
self.nextTimerID = self.nextTimerID + 1
|
|
|
|
h = self.virEventLoopPureTimer(timerID, interval, cb, opaque)
|
|
self.timers.append(h)
|
|
self.interrupt()
|
|
|
|
self.debug("Add timer %d interval %d" % (timerID, interval))
|
|
|
|
return timerID
|
|
|
|
# Change the set of events to be monitored on the file handle
|
|
def update_handle(self, handleID, events):
|
|
h = self.get_handle_by_id(handleID)
|
|
if h:
|
|
h.set_events(events)
|
|
self.poll.unregister(h.get_fd())
|
|
self.poll.register(h.get_fd(), self.events_to_poll(events))
|
|
self.interrupt()
|
|
|
|
self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events))
|
|
|
|
# Change the periodic frequency of the timer
|
|
def update_timer(self, timerID, interval):
|
|
for h in self.timers:
|
|
if h.get_id() == timerID:
|
|
h.set_interval(interval);
|
|
self.interrupt()
|
|
|
|
self.debug("Update timer %d interval %d" % (timerID, interval))
|
|
break
|
|
|
|
# Stop monitoring for events on the file handle
|
|
def remove_handle(self, handleID):
|
|
handles = []
|
|
for h in self.handles:
|
|
if h.get_id() == handleID:
|
|
self.poll.unregister(h.get_fd())
|
|
self.debug("Remove handle %d fd %d" % (handleID, h.get_fd()))
|
|
else:
|
|
handles.append(h)
|
|
self.handles = handles
|
|
self.interrupt()
|
|
|
|
# Stop firing the periodic timer
|
|
def remove_timer(self, timerID):
|
|
timers = []
|
|
for h in self.timers:
|
|
if h.get_id() != timerID:
|
|
timers.append(h)
|
|
self.debug("Remove timer %d" % timerID)
|
|
self.timers = timers
|
|
self.interrupt()
|
|
|
|
# Convert from libvirt event constants, to poll() events constants
|
|
def events_to_poll(self, events):
|
|
ret = 0
|
|
if events & libvirt.VIR_EVENT_HANDLE_READABLE:
|
|
ret |= select.POLLIN
|
|
if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
|
|
ret |= select.POLLOUT
|
|
if events & libvirt.VIR_EVENT_HANDLE_ERROR:
|
|
ret |= select.POLLERR;
|
|
if events & libvirt.VIR_EVENT_HANDLE_HANGUP:
|
|
ret |= select.POLLHUP;
|
|
return ret
|
|
|
|
# Convert from poll() event constants, to libvirt events constants
|
|
def events_from_poll(self, events):
|
|
ret = 0;
|
|
if events & select.POLLIN:
|
|
ret |= libvirt.VIR_EVENT_HANDLE_READABLE;
|
|
if events & select.POLLOUT:
|
|
ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE;
|
|
if events & select.POLLNVAL:
|
|
ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
|
|
if events & select.POLLERR:
|
|
ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
|
|
if events & select.POLLHUP:
|
|
ret |= libvirt.VIR_EVENT_HANDLE_HANGUP;
|
|
return ret;
|
|
|
|
|
|
###########################################################################
|
|
# Now glue an instance of the general event loop into libvirt's event loop
|
|
###########################################################################
|
|
|
|
# This single global instance of the event loop wil be used for
|
|
# monitoring libvirt events
|
|
eventLoop = virEventLoopPure(debug=False)
|
|
|
|
# This keeps track of what thread is running the event loop,
|
|
# (if it is run in a background thread)
|
|
eventLoopThread = None
|
|
|
|
|
|
# These next set of 6 methods are the glue between the official
|
|
# libvirt events API, and our particular impl of the event loop
|
|
#
|
|
# There is no reason why the 'virEventLoopPure' has to be used.
|
|
# An application could easily may these 6 glue methods hook into
|
|
# another event loop such as GLib's, or something like the python
|
|
# Twisted event framework.
|
|
|
|
def virEventAddHandleImpl(fd, events, cb, opaque):
|
|
global eventLoop
|
|
return eventLoop.add_handle(fd, events, cb, opaque)
|
|
|
|
def virEventUpdateHandleImpl(handleID, events):
|
|
global eventLoop
|
|
return eventLoop.update_handle(handleID, events)
|
|
|
|
def virEventRemoveHandleImpl(handleID):
|
|
global eventLoop
|
|
return eventLoop.remove_handle(handleID)
|
|
|
|
def virEventAddTimerImpl(interval, cb, opaque):
|
|
global eventLoop
|
|
return eventLoop.add_timer(interval, cb, opaque)
|
|
|
|
def virEventUpdateTimerImpl(timerID, interval):
|
|
global eventLoop
|
|
return eventLoop.update_timer(timerID, interval)
|
|
|
|
def virEventRemoveTimerImpl(timerID):
|
|
global eventLoop
|
|
return eventLoop.remove_timer(timerID)
|
|
|
|
# This tells libvirt what event loop implementation it
|
|
# should use
|
|
def virEventLoopPureRegister():
|
|
libvirt.virEventRegisterImpl(virEventAddHandleImpl,
|
|
virEventUpdateHandleImpl,
|
|
virEventRemoveHandleImpl,
|
|
virEventAddTimerImpl,
|
|
virEventUpdateTimerImpl,
|
|
virEventRemoveTimerImpl)
|
|
|
|
# Directly run the event loop in the current thread
|
|
def virEventLoopPureRun():
|
|
global eventLoop
|
|
eventLoop.run_loop()
|
|
|
|
# Spawn a background thread to run the event loop
|
|
def virEventLoopPureStart():
|
|
global eventLoopThread
|
|
virEventLoopPureRegister()
|
|
eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop")
|
|
eventLoopThread.setDaemon(True)
|
|
eventLoopThread.start()
|
|
|
|
|
|
##########################################################################
|
|
# Everything that now follows is a simple demo of domain lifecycle events
|
|
##########################################################################
|
|
def eventToString(event):
|
|
eventStrings = ( "Added",
|
|
"Removed",
|
|
"Started",
|
|
"Suspended",
|
|
"Resumed",
|
|
"Stopped",
|
|
"Saved",
|
|
"Restored" );
|
|
return eventStrings[event];
|
|
|
|
def myDomainEventCallback1 (conn, dom, event, detail, opaque):
|
|
print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)
|
|
|
|
def myDomainEventCallback2 (conn, dom, event, detail, opaque):
|
|
print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)
|
|
|
|
def usage():
|
|
print "usage: "+os.path.basename(sys.argv[0])+" [uri]"
|
|
print " uri will default to qemu:///system"
|
|
|
|
def main():
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] )
|
|
except getopt.GetoptError, err:
|
|
# print help information and exit:
|
|
print str(err) # will print something like "option -a not recognized"
|
|
usage()
|
|
sys.exit(2)
|
|
for o, a in opts:
|
|
if o in ("-h", "--help"):
|
|
usage()
|
|
sys.exit()
|
|
|
|
if len(sys.argv) > 1:
|
|
uri = sys.argv[1]
|
|
else:
|
|
uri = "qemu:///system"
|
|
|
|
print "Using uri:" + uri
|
|
|
|
# Run a background thread with the event loop
|
|
virEventLoopPureStart()
|
|
|
|
vc = libvirt.open(uri)
|
|
|
|
# Close connection on exit (to test cleanup paths)
|
|
old_exitfunc = getattr(sys, 'exitfunc', None)
|
|
def exit():
|
|
print "Closing " + str(vc)
|
|
vc.close()
|
|
if (old_exitfunc): old_exitfunc()
|
|
sys.exitfunc = exit
|
|
|
|
#Add 2 callbacks to prove this works with more than just one
|
|
vc.domainEventRegister(myDomainEventCallback1,None)
|
|
vc.domainEventRegister(myDomainEventCallback2,None)
|
|
|
|
# The rest of your app would go here normally, but for sake
|
|
# of demo we'll just go to sleep. The other option is to
|
|
# run the event loop in your main thread if your app is
|
|
# totally event based.
|
|
while 1:
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|