vmm: dbus: broadcast event_monitor events over the DBus API

This commit builds on top of the `Monitor::subscribe` function and
makes it possible to broadcast events published from `event-monitor`
over D-Bus.

The broadcasting functionality is enabled if the D-Bus API is enabled
and users who wish to also enable the file based `event-monitor` can do
so with the CLI arg `--event-monitor`.

Signed-off-by: Omer Faruk Bayram <omer.faruk@sartura.hr>
This commit is contained in:
Omer Faruk Bayram 2023-08-13 20:32:44 +00:00 committed by Bo Chen
parent e02efe9ba0
commit 2ed96cd3ed
6 changed files with 93 additions and 45 deletions

1
Cargo.lock generated
View File

@ -2344,6 +2344,7 @@ dependencies = [
"devices",
"epoll",
"event_monitor",
"flume",
"futures",
"gdbstub",
"gdbstub_arch",

View File

@ -24,12 +24,12 @@ struct Event<'a> {
pub struct Monitor {
pub rx: flume::Receiver<String>,
pub file: File,
pub file: Option<File>,
pub broadcast: Vec<flume::Sender<Arc<String>>>,
}
impl Monitor {
pub fn new(rx: flume::Receiver<String>, file: File) -> Self {
pub fn new(rx: flume::Receiver<String>, file: Option<File>) -> Self {
Self {
rx,
file,
@ -68,11 +68,14 @@ fn set_file_nonblocking(file: &File) -> io::Result<()> {
/// This function must only be called once from the main thread before any threads
/// are created to avoid race conditions.
pub fn set_monitor(file: File) -> io::Result<Monitor> {
pub fn set_monitor(file: Option<File>) -> io::Result<Monitor> {
// SAFETY: there is only one caller of this function, so MONITOR is written to only once
assert!(unsafe { MONITOR.is_none() });
set_file_nonblocking(&file)?;
if let Some(ref file) = file {
set_file_nonblocking(file)?;
}
let (tx, rx) = flume::unbounded();
let monitor = Monitor::new(rx, file);

View File

@ -449,18 +449,6 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
(None, None)
};
#[cfg(feature = "dbus_api")]
let dbus_options = match (&toplevel.dbus_name, &toplevel.dbus_path) {
(Some(ref name), Some(ref path)) => Ok(Some(DBusApiOptions {
service_name: name.to_owned(),
object_path: path.to_owned(),
system_bus: toplevel.dbus_system_bus,
})),
(Some(_), None) => Err(Error::MissingDBusObjectPath),
(None, Some(_)) => Err(Error::MissingDBusServiceName),
(None, None) => Ok(None),
}?;
let (api_request_sender, api_request_receiver) = channel();
let api_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateApiEventFd)?;
@ -530,31 +518,67 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateExitEventFd)?;
if let Some(ref monitor_config) = toplevel.event_monitor {
#[allow(unused_mut)]
let mut event_monitor = toplevel
.event_monitor
.as_ref()
.map(|monitor_config| {
let mut parser = OptionParser::new();
parser.add("path").add("fd");
parser
.parse(monitor_config)
.map_err(Error::ParsingEventMonitor)?;
let file = if parser.is_set("fd") {
if parser.is_set("fd") {
let fd = parser
.convert("fd")
.map_err(Error::ParsingEventMonitor)?
.unwrap();
// SAFETY: fd is valid
unsafe { File::from_raw_fd(fd) }
Ok(Some(unsafe { File::from_raw_fd(fd) }))
} else if parser.is_set("path") {
Ok(Some(
std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(parser.get("path").unwrap())
.map_err(Error::EventMonitorIo)?
.map_err(Error::EventMonitorIo)?,
))
} else {
return Err(Error::BareEventMonitor);
Err(Error::BareEventMonitor)
}
})
.transpose()?
.map(|event_monitor_file| {
event_monitor::set_monitor(event_monitor_file).map_err(Error::EventMonitorIo)
})
.transpose()?;
#[cfg(feature = "dbus_api")]
let dbus_options = match (&toplevel.dbus_name, &toplevel.dbus_path) {
(Some(ref name), Some(ref path)) => {
// monitor is either set (file based) or not.
// if it's not set, create one without file support.
let mut monitor = match event_monitor.take() {
Some(monitor) => monitor,
None => event_monitor::set_monitor(None).map_err(Error::EventMonitorIo)?,
};
let options = DBusApiOptions {
service_name: name.to_owned(),
object_path: path.to_owned(),
system_bus: toplevel.dbus_system_bus,
event_monitor_rx: monitor.subscribe(),
};
let monitor = event_monitor::set_monitor(file).map_err(Error::EventMonitorIo)?;
event_monitor = Some(monitor);
Ok(Some(options))
}
(Some(_), None) => Err(Error::MissingDBusObjectPath),
(None, Some(_)) => Err(Error::MissingDBusServiceName),
(None, None) => Ok(None),
}?;
if let Some(monitor) = event_monitor {
vmm::start_event_monitor_thread(
monitor,
&seccomp_action,

View File

@ -25,6 +25,7 @@ blocking = { version = "1.3.0", optional = true }
devices = { path = "../devices" }
epoll = "4.3.3"
event_monitor = { path = "../event_monitor" }
flume = "0.10.14"
futures = { version = "0.3.27", optional = true }
gdbstub = { version = "0.6.4", optional = true }
gdbstub_arch = { version = "0.2.4", optional = true }

View File

@ -25,6 +25,7 @@ pub struct DBusApiOptions {
pub service_name: String,
pub object_path: String,
pub system_bus: bool,
pub event_monitor_rx: flume::Receiver<Arc<String>>,
}
pub struct DBusApi {
@ -297,6 +298,10 @@ impl DBusApi {
.await
.map(|_| ())
}
// implementation of this function is provided by the `dbus_interface` macro
#[dbus_interface(signal)]
async fn event(ctxt: &zbus::SignalContext<'_>, event: Arc<String>) -> zbus::Result<()>;
}
pub fn start_dbus_thread(
@ -308,19 +313,26 @@ pub fn start_dbus_thread(
hypervisor_type: HypervisorType,
) -> VmmResult<(thread::JoinHandle<VmmResult<()>>, DBusApiShutdownChannels)> {
let dbus_iface = DBusApi::new(api_notifier, api_sender);
let connection = executor::block_on(async move {
let (connection, iface_ref) = executor::block_on(async move {
let conn_builder = if dbus_options.system_bus {
ConnectionBuilder::system()?
} else {
ConnectionBuilder::session()?
};
conn_builder
let conn = conn_builder
.internal_executor(false)
.name(dbus_options.service_name)?
.serve_at(dbus_options.object_path, dbus_iface)?
.serve_at(dbus_options.object_path.as_str(), dbus_iface)?
.build()
.await
.await?;
let iface_ref = conn
.object_server()
.interface::<_, DBusApi>(dbus_options.object_path)
.await?;
Ok((conn, iface_ref))
})
.map_err(VmmError::CreateDBusSession)?;
@ -359,6 +371,11 @@ pub fn start_dbus_thread(
send_done.send(()).ok();
break;
},
ret = dbus_options.event_monitor_rx.recv_async() => {
if let Ok(event) = ret {
DBusApi::event(iface_ref.signal_context(), event).await.ok();
}
}
}
}
})

View File

@ -321,8 +321,10 @@ pub fn start_event_monitor_thread(
while let Ok(event) = monitor.rx.recv() {
let event = Arc::new(event);
monitor.file.write_all(event.as_bytes().as_ref()).ok();
monitor.file.write_all(b"\n\n").ok();
if let Some(ref mut file) = monitor.file {
file.write_all(event.as_bytes().as_ref()).ok();
file.write_all(b"\n\n").ok();
}
for tx in monitor.broadcast.iter() {
tx.send(event.clone()).ok();