vmm: dbus: broadcast event_monitor events over the DBus API

This commit builds on top of the `Monitor::subscribe` function and
makes it possible to broadcast events published from `event-monitor`
over D-Bus.

The broadcasting functionality is enabled if the D-Bus API is enabled
and users who wish to also enable the file based `event-monitor` can do
so with the CLI arg `--event-monitor`.

Signed-off-by: Omer Faruk Bayram <omer.faruk@sartura.hr>
This commit is contained in:
Omer Faruk Bayram 2023-08-13 20:32:44 +00:00
parent 0a07f6e4c5
commit 5c89d5b610
6 changed files with 93 additions and 45 deletions

1
Cargo.lock generated
View File

@ -2319,6 +2319,7 @@ dependencies = [
"devices", "devices",
"epoll", "epoll",
"event_monitor", "event_monitor",
"flume",
"futures", "futures",
"gdbstub", "gdbstub",
"gdbstub_arch", "gdbstub_arch",

View File

@ -24,12 +24,12 @@ struct Event<'a> {
pub struct Monitor { pub struct Monitor {
pub rx: flume::Receiver<String>, pub rx: flume::Receiver<String>,
pub file: File, pub file: Option<File>,
pub broadcast: Vec<flume::Sender<Arc<String>>>, pub broadcast: Vec<flume::Sender<Arc<String>>>,
} }
impl Monitor { impl Monitor {
pub fn new(rx: flume::Receiver<String>, file: File) -> Self { pub fn new(rx: flume::Receiver<String>, file: Option<File>) -> Self {
Self { Self {
rx, rx,
file, file,
@ -68,11 +68,14 @@ fn set_file_nonblocking(file: &File) -> io::Result<()> {
/// This function must only be called once from the main thread before any threads /// This function must only be called once from the main thread before any threads
/// are created to avoid race conditions. /// are created to avoid race conditions.
pub fn set_monitor(file: File) -> io::Result<Monitor> { pub fn set_monitor(file: Option<File>) -> io::Result<Monitor> {
// SAFETY: there is only one caller of this function, so MONITOR is written to only once // SAFETY: there is only one caller of this function, so MONITOR is written to only once
assert!(unsafe { MONITOR.is_none() }); assert!(unsafe { MONITOR.is_none() });
set_file_nonblocking(&file)?; if let Some(ref file) = file {
set_file_nonblocking(file)?;
}
let (tx, rx) = flume::unbounded(); let (tx, rx) = flume::unbounded();
let monitor = Monitor::new(rx, file); let monitor = Monitor::new(rx, file);

View File

@ -449,18 +449,6 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
(None, None) (None, None)
}; };
#[cfg(feature = "dbus_api")]
let dbus_options = match (&toplevel.dbus_name, &toplevel.dbus_path) {
(Some(ref name), Some(ref path)) => Ok(Some(DBusApiOptions {
service_name: name.to_owned(),
object_path: path.to_owned(),
system_bus: toplevel.dbus_system_bus,
})),
(Some(_), None) => Err(Error::MissingDBusObjectPath),
(None, Some(_)) => Err(Error::MissingDBusServiceName),
(None, None) => Ok(None),
}?;
let (api_request_sender, api_request_receiver) = channel(); let (api_request_sender, api_request_receiver) = channel();
let api_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateApiEventFd)?; let api_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateApiEventFd)?;
@ -530,31 +518,67 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateExitEventFd)?; let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateExitEventFd)?;
if let Some(ref monitor_config) = toplevel.event_monitor { #[allow(unused_mut)]
let mut event_monitor = toplevel
.event_monitor
.as_ref()
.map(|monitor_config| {
let mut parser = OptionParser::new(); let mut parser = OptionParser::new();
parser.add("path").add("fd"); parser.add("path").add("fd");
parser parser
.parse(monitor_config) .parse(monitor_config)
.map_err(Error::ParsingEventMonitor)?; .map_err(Error::ParsingEventMonitor)?;
let file = if parser.is_set("fd") { if parser.is_set("fd") {
let fd = parser let fd = parser
.convert("fd") .convert("fd")
.map_err(Error::ParsingEventMonitor)? .map_err(Error::ParsingEventMonitor)?
.unwrap(); .unwrap();
// SAFETY: fd is valid // SAFETY: fd is valid
unsafe { File::from_raw_fd(fd) } Ok(Some(unsafe { File::from_raw_fd(fd) }))
} else if parser.is_set("path") { } else if parser.is_set("path") {
Ok(Some(
std::fs::OpenOptions::new() std::fs::OpenOptions::new()
.write(true) .write(true)
.create(true) .create(true)
.open(parser.get("path").unwrap()) .open(parser.get("path").unwrap())
.map_err(Error::EventMonitorIo)? .map_err(Error::EventMonitorIo)?,
))
} else { } else {
return Err(Error::BareEventMonitor); Err(Error::BareEventMonitor)
}
})
.transpose()?
.map(|event_monitor_file| {
event_monitor::set_monitor(event_monitor_file).map_err(Error::EventMonitorIo)
})
.transpose()?;
#[cfg(feature = "dbus_api")]
let dbus_options = match (&toplevel.dbus_name, &toplevel.dbus_path) {
(Some(ref name), Some(ref path)) => {
// monitor is either set (file based) or not.
// if it's not set, create one without file support.
let mut monitor = match event_monitor.take() {
Some(monitor) => monitor,
None => event_monitor::set_monitor(None).map_err(Error::EventMonitorIo)?,
};
let options = DBusApiOptions {
service_name: name.to_owned(),
object_path: path.to_owned(),
system_bus: toplevel.dbus_system_bus,
event_monitor_rx: monitor.subscribe(),
}; };
let monitor = event_monitor::set_monitor(file).map_err(Error::EventMonitorIo)?; event_monitor = Some(monitor);
Ok(Some(options))
}
(Some(_), None) => Err(Error::MissingDBusObjectPath),
(None, Some(_)) => Err(Error::MissingDBusServiceName),
(None, None) => Ok(None),
}?;
if let Some(monitor) = event_monitor {
vmm::start_event_monitor_thread( vmm::start_event_monitor_thread(
monitor, monitor,
&seccomp_action, &seccomp_action,

View File

@ -25,6 +25,7 @@ blocking = { version = "1.3.0", optional = true }
devices = { path = "../devices" } devices = { path = "../devices" }
epoll = "4.3.3" epoll = "4.3.3"
event_monitor = { path = "../event_monitor" } event_monitor = { path = "../event_monitor" }
flume = "0.10.14"
futures = { version = "0.3.27", optional = true } futures = { version = "0.3.27", optional = true }
gdbstub = { version = "0.6.4", optional = true } gdbstub = { version = "0.6.4", optional = true }
gdbstub_arch = { version = "0.2.4", optional = true } gdbstub_arch = { version = "0.2.4", optional = true }

View File

@ -25,6 +25,7 @@ pub struct DBusApiOptions {
pub service_name: String, pub service_name: String,
pub object_path: String, pub object_path: String,
pub system_bus: bool, pub system_bus: bool,
pub event_monitor_rx: flume::Receiver<Arc<String>>,
} }
pub struct DBusApi { pub struct DBusApi {
@ -297,6 +298,10 @@ impl DBusApi {
.await .await
.map(|_| ()) .map(|_| ())
} }
// implementation of this function is provided by the `dbus_interface` macro
#[dbus_interface(signal)]
async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>;
} }
pub fn start_dbus_thread( pub fn start_dbus_thread(
@ -308,19 +313,26 @@ pub fn start_dbus_thread(
hypervisor_type: HypervisorType, hypervisor_type: HypervisorType,
) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> { ) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
let dbus_iface = DBusApi::new(api_notifier, api_sender); let dbus_iface = DBusApi::new(api_notifier, api_sender);
let connection = executor::block_on(async move { let (connection, iface_ref) = executor::block_on(async move {
let conn_builder = if dbus_options.system_bus { let conn_builder = if dbus_options.system_bus {
ConnectionBuilder::system()? ConnectionBuilder::system()?
} else { } else {
ConnectionBuilder::session()? ConnectionBuilder::session()?
}; };
conn_builder let conn = conn_builder
.internal_executor(false) .internal_executor(false)
.name(dbus_options.service_name)? .name(dbus_options.service_name)?
.serve_at(dbus_options.object_path, dbus_iface)? .serve_at(dbus_options.object_path.as_str(), dbus_iface)?
.build() .build()
.await .await?;
let iface_ref = conn
.object_server()
.interface::<_, DBusApi>(dbus_options.object_path)
.await?;
Ok((conn, iface_ref))
}) })
.map_err(VmmError::CreateDBusSession)?; .map_err(VmmError::CreateDBusSession)?;
@ -359,6 +371,11 @@ pub fn start_dbus_thread(
send_done.send(()).ok(); send_done.send(()).ok();
break; break;
}, },
ret = dbus_options.event_monitor_rx.recv_async() => {
if let Ok(event) = ret {
DBusApi::event(iface_ref.signal_context(), event).await.ok();
}
}
} }
} }
}) })

View File

@ -321,8 +321,10 @@ pub fn start_event_monitor_thread(
while let Ok(event) = monitor.rx.recv() { while let Ok(event) = monitor.rx.recv() {
let event = Arc::new(event); let event = Arc::new(event);
monitor.file.write_all(event.as_bytes().as_ref()).ok(); if let Some(ref mut file) = monitor.file {
monitor.file.write_all(b"\n\n").ok(); file.write_all(event.as_bytes().as_ref()).ok();
file.write_all(b"\n\n").ok();
}
for tx in monitor.broadcast.iter() { for tx in monitor.broadcast.iter() {
tx.send(event.clone()).ok(); tx.send(event.clone()).ok();