diff --git a/Cargo.lock b/Cargo.lock index 4834d4a88..31b5a3328 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2319,6 +2319,7 @@ dependencies = [ "devices", "epoll", "event_monitor", + "flume", "futures", "gdbstub", "gdbstub_arch", diff --git a/event_monitor/src/lib.rs b/event_monitor/src/lib.rs index b8f9b9ab9..c6cf10f6a 100644 --- a/event_monitor/src/lib.rs +++ b/event_monitor/src/lib.rs @@ -24,12 +24,12 @@ struct Event<'a> { pub struct Monitor { pub rx: flume::Receiver, - pub file: File, + pub file: Option, pub broadcast: Vec>>, } impl Monitor { - pub fn new(rx: flume::Receiver, file: File) -> Self { + pub fn new(rx: flume::Receiver, file: Option) -> Self { Self { rx, file, @@ -68,11 +68,14 @@ fn set_file_nonblocking(file: &File) -> io::Result<()> { /// This function must only be called once from the main thread before any threads /// are created to avoid race conditions. -pub fn set_monitor(file: File) -> io::Result { +pub fn set_monitor(file: Option) -> io::Result { // SAFETY: there is only one caller of this function, so MONITOR is written to only once assert!(unsafe { MONITOR.is_none() }); - set_file_nonblocking(&file)?; + if let Some(ref file) = file { + set_file_nonblocking(file)?; + } + let (tx, rx) = flume::unbounded(); let monitor = Monitor::new(rx, file); diff --git a/src/main.rs b/src/main.rs index 780b282b3..ba49a4c14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -449,18 +449,6 @@ fn start_vmm(toplevel: TopLevel) -> Result, Error> { (None, None) }; - #[cfg(feature = "dbus_api")] - let dbus_options = match (&toplevel.dbus_name, &toplevel.dbus_path) { - (Some(ref name), Some(ref path)) => Ok(Some(DBusApiOptions { - service_name: name.to_owned(), - object_path: path.to_owned(), - system_bus: toplevel.dbus_system_bus, - })), - (Some(_), None) => Err(Error::MissingDBusObjectPath), - (None, Some(_)) => Err(Error::MissingDBusServiceName), - (None, None) => Ok(None), - }?; - let (api_request_sender, api_request_receiver) = channel(); let api_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateApiEventFd)?; @@ -530,31 +518,67 @@ fn start_vmm(toplevel: TopLevel) -> Result, Error> { let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateExitEventFd)?; - if let Some(ref monitor_config) = toplevel.event_monitor { - let mut parser = OptionParser::new(); - parser.add("path").add("fd"); - parser - .parse(monitor_config) - .map_err(Error::ParsingEventMonitor)?; + #[allow(unused_mut)] + let mut event_monitor = toplevel + .event_monitor + .as_ref() + .map(|monitor_config| { + let mut parser = OptionParser::new(); + parser.add("path").add("fd"); + parser + .parse(monitor_config) + .map_err(Error::ParsingEventMonitor)?; - let file = if parser.is_set("fd") { - let fd = parser - .convert("fd") - .map_err(Error::ParsingEventMonitor)? - .unwrap(); - // SAFETY: fd is valid - unsafe { File::from_raw_fd(fd) } - } else if parser.is_set("path") { - std::fs::OpenOptions::new() - .write(true) - .create(true) - .open(parser.get("path").unwrap()) - .map_err(Error::EventMonitorIo)? - } else { - return Err(Error::BareEventMonitor); - }; + if parser.is_set("fd") { + let fd = parser + .convert("fd") + .map_err(Error::ParsingEventMonitor)? + .unwrap(); + // SAFETY: fd is valid + Ok(Some(unsafe { File::from_raw_fd(fd) })) + } else if parser.is_set("path") { + Ok(Some( + std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(parser.get("path").unwrap()) + .map_err(Error::EventMonitorIo)?, + )) + } else { + Err(Error::BareEventMonitor) + } + }) + .transpose()? + .map(|event_monitor_file| { + event_monitor::set_monitor(event_monitor_file).map_err(Error::EventMonitorIo) + }) + .transpose()?; - let monitor = event_monitor::set_monitor(file).map_err(Error::EventMonitorIo)?; + #[cfg(feature = "dbus_api")] + let dbus_options = match (&toplevel.dbus_name, &toplevel.dbus_path) { + (Some(ref name), Some(ref path)) => { + // monitor is either set (file based) or not. + // if it's not set, create one without file support. + let mut monitor = match event_monitor.take() { + Some(monitor) => monitor, + None => event_monitor::set_monitor(None).map_err(Error::EventMonitorIo)?, + }; + let options = DBusApiOptions { + service_name: name.to_owned(), + object_path: path.to_owned(), + system_bus: toplevel.dbus_system_bus, + event_monitor_rx: monitor.subscribe(), + }; + + event_monitor = Some(monitor); + Ok(Some(options)) + } + (Some(_), None) => Err(Error::MissingDBusObjectPath), + (None, Some(_)) => Err(Error::MissingDBusServiceName), + (None, None) => Ok(None), + }?; + + if let Some(monitor) = event_monitor { vmm::start_event_monitor_thread( monitor, &seccomp_action, diff --git a/vmm/Cargo.toml b/vmm/Cargo.toml index 15a6a06a3..8437792ca 100644 --- a/vmm/Cargo.toml +++ b/vmm/Cargo.toml @@ -25,6 +25,7 @@ blocking = { version = "1.3.0", optional = true } devices = { path = "../devices" } epoll = "4.3.3" event_monitor = { path = "../event_monitor" } +flume = "0.10.14" futures = { version = "0.3.27", optional = true } gdbstub = { version = "0.6.4", optional = true } gdbstub_arch = { version = "0.2.4", optional = true } diff --git a/vmm/src/api/dbus/mod.rs b/vmm/src/api/dbus/mod.rs index 5306a0970..c063e1008 100644 --- a/vmm/src/api/dbus/mod.rs +++ b/vmm/src/api/dbus/mod.rs @@ -25,6 +25,7 @@ pub struct DBusApiOptions { pub service_name: String, pub object_path: String, pub system_bus: bool, + pub event_monitor_rx: flume::Receiver>, } pub struct DBusApi { @@ -297,6 +298,10 @@ impl DBusApi { .await .map(|_| ()) } + + // implementation of this function is provided by the `dbus_interface` macro + #[dbus_interface(signal)] + async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc) -> zbus::Result<()>; } pub fn start_dbus_thread( @@ -308,19 +313,26 @@ pub fn start_dbus_thread( hypervisor_type: HypervisorType, ) -> VmmResult<(thread::JoinHandle>, DBusApiShutdownChannels)> { let dbus_iface = DBusApi::new(api_notifier, api_sender); - let connection = executor::block_on(async move { + let (connection, iface_ref) = executor::block_on(async move { let conn_builder = if dbus_options.system_bus { ConnectionBuilder::system()? } else { ConnectionBuilder::session()? }; - conn_builder + let conn = conn_builder .internal_executor(false) .name(dbus_options.service_name)? - .serve_at(dbus_options.object_path, dbus_iface)? + .serve_at(dbus_options.object_path.as_str(), dbus_iface)? .build() - .await + .await?; + + let iface_ref = conn + .object_server() + .interface::<_, DBusApi>(dbus_options.object_path) + .await?; + + Ok((conn, iface_ref)) }) .map_err(VmmError::CreateDBusSession)?; @@ -359,6 +371,11 @@ pub fn start_dbus_thread( send_done.send(()).ok(); break; }, + ret = dbus_options.event_monitor_rx.recv_async() => { + if let Ok(event) = ret { + DBusApi::event(iface_ref.signal_context(), event).await.ok(); + } + } } } }) diff --git a/vmm/src/lib.rs b/vmm/src/lib.rs index 06d13cd51..23c9298f4 100644 --- a/vmm/src/lib.rs +++ b/vmm/src/lib.rs @@ -321,8 +321,10 @@ pub fn start_event_monitor_thread( while let Ok(event) = monitor.rx.recv() { let event = Arc::new(event); - monitor.file.write_all(event.as_bytes().as_ref()).ok(); - monitor.file.write_all(b"\n\n").ok(); + if let Some(ref mut file) = monitor.file { + file.write_all(event.as_bytes().as_ref()).ok(); + file.write_all(b"\n\n").ok(); + } for tx in monitor.broadcast.iter() { tx.send(event.clone()).ok();