From f00df25d408edc27ee47a93a5c5148a874f71b9e Mon Sep 17 00:00:00 2001 From: Omer Faruk Bayram Date: Sat, 25 Mar 2023 15:18:13 +0300 Subject: [PATCH] vmm: dbus: graceful shutdown of the DBusApi thread This commit adds support for graceful shutdown of the DBusApi thread using `futures::channel::oneshot` channels. By using oneshot channels, we ensure that the thread has enough time to send a response to the `VmmShutdown` method call before it is terminated. Without this step, the thread may be terminated before it can send a response, resulting in an error message on the client side stating that the message recipient disconnected from the message bus without providing a reply. Also changes the default values for DBus service name, object path and interface name. Signed-off-by: Omer Faruk Bayram --- src/main.rs | 10 ++++++-- vmm/src/api/dbus/mod.rs | 55 +++++++++++++++++++++++++++++++++++------ vmm/src/lib.rs | 18 +++++++++++--- 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8455bfec5..cd5a8d202 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,8 @@ use std::os::unix::io::{FromRawFd, RawFd}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use thiserror::Error; +#[cfg(feature = "dbus_api")] +use vmm::api::dbus::dbus_api_graceful_shutdown; use vmm::config; use vmm_sys_util::eventfd::EventFd; use vmm_sys_util::signal::block_signal; @@ -513,7 +515,7 @@ fn start_vmm(toplevel: TopLevel) -> Result, Error> { let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateExitEventFd)?; - let vmm_thread = vmm::start_vmm_thread( + let vmm_thread_handle = vmm::start_vmm_thread( vmm::VmmVersionInfo::new(env!("BUILD_VERSION"), env!("CARGO_PKG_VERSION")), &api_socket_path, api_socket_fd, @@ -568,11 +570,15 @@ fn start_vmm(toplevel: TopLevel) -> Result, Error> { } } - vmm_thread + vmm_thread_handle + .thread_handle .join() .map_err(Error::ThreadJoin)? .map_err(Error::VmmThread)?; + #[cfg(feature = "dbus_api")] + dbus_api_graceful_shutdown(vmm_thread_handle.dbus_shutdown_chs); + r.map(|_| api_socket_path) } diff --git a/vmm/src/api/dbus/mod.rs b/vmm/src/api/dbus/mod.rs index 90d16a445..b84ae9ee7 100644 --- a/vmm/src/api/dbus/mod.rs +++ b/vmm/src/api/dbus/mod.rs @@ -4,7 +4,8 @@ // use super::{ApiRequest, VmAction}; use crate::{Error as VmmError, Result as VmmResult}; -use futures::executor; +use futures::channel::oneshot; +use futures::{executor, FutureExt}; use hypervisor::HypervisorType; use seccompiler::SeccompAction; use std::sync::mpsc::Sender; @@ -15,6 +16,8 @@ use zbus::fdo::{self, Result}; use zbus::zvariant::Optional; use zbus::{dbus_interface, ConnectionBuilder}; +pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>); + pub struct DBusApi { api_notifier: EventFd, api_sender: futures::lock::Mutex>, @@ -24,6 +27,26 @@ fn api_error(error: impl std::fmt::Debug) -> fdo::Error { fdo::Error::Failed(format!("{error:?}")) } +// This method is intended to ensure that the DBusApi thread has enough time to +// send a response to the VmmShutdown method call before it is terminated. If +// this step is omitted, the thread may be terminated before it can send a +// response, resulting in an error message stating that the message recipient +// disconnected from the message bus without providing a reply. +pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) { + let (send_shutdown, mut recv_done) = ch; + + // send the shutdown signal and return + // if it errors out + if send_shutdown.send(()).is_err() { + return; + } + + // loop until `recv_err` errors out + // or as long as the return value indicates + // "immediately stale" (None) + while let Ok(None) = recv_done.try_recv() {} +} + impl DBusApi { pub fn new(api_notifier: EventFd, api_sender: Sender) -> Self { Self { @@ -60,7 +83,7 @@ impl DBusApi { } } -#[dbus_interface] +#[dbus_interface(name = "org.cloudhypervisor.DBusApi1")] impl DBusApi { async fn vmm_ping(&self) -> Result { let api_sender = self.clone_api_sender().await; @@ -261,26 +284,42 @@ pub fn start_dbus_thread( _seccomp_action: &SeccompAction, _exit_evt: EventFd, _hypervisor_type: HypervisorType, -) -> VmmResult>> { +) -> VmmResult<(thread::JoinHandle<()>, DBusApiShutdownChannels)> { let dbus_iface = DBusApi::new(api_notifier, api_sender); let connection = executor::block_on(async move { ConnectionBuilder::session()? .internal_executor(false) - .name("org.cloudhypervisor.ZBUS")? - .serve_at("/org/cloudhypervisor/ZBUS", dbus_iface)? + .name("org.cloudhypervisor.DBusApi")? + .serve_at("/org/cloudhypervisor/DBusApi", dbus_iface)? .build() .await }) .map_err(VmmError::CreateDBusSession)?; - thread::Builder::new() + let (send_shutdown, recv_shutdown) = oneshot::channel::<()>(); + let (send_done, recv_done) = oneshot::channel::<()>(); + + let thread_join_handle = thread::Builder::new() .name("dbus-thread".to_string()) .spawn(move || { executor::block_on(async move { + let recv_shutdown = recv_shutdown.fuse(); + let executor_tick = futures::future::Fuse::terminated(); + futures::pin_mut!(recv_shutdown, executor_tick); + executor_tick.set(connection.executor().tick().fuse()); + loop { - connection.executor().tick().await; + futures::select! { + _ = executor_tick => executor_tick.set(connection.executor().tick().fuse()), + _ = recv_shutdown => { + send_done.send(()).ok(); + break; + }, + } } }) }) - .map_err(VmmError::DBusThreadSpawn) + .map_err(VmmError::DBusThreadSpawn)?; + + Ok((thread_join_handle, (send_shutdown, recv_done))) } diff --git a/vmm/src/lib.rs b/vmm/src/lib.rs index 983ba9ec8..4f90d22d5 100644 --- a/vmm/src/lib.rs +++ b/vmm/src/lib.rs @@ -25,6 +25,8 @@ use crate::migration::{recv_vm_config, recv_vm_state}; use crate::seccomp_filters::{get_seccomp_filter, Thread}; use crate::vm::{Error as VmError, Vm, VmState}; use anyhow::anyhow; +#[cfg(feature = "dbus_api")] +use api::dbus::DBusApiShutdownChannels; use libc::{tcsetattr, termios, EFD_NONBLOCK, SIGINT, SIGTERM, TCSANOW}; use memory_manager::MemoryManagerSnapshotData; use pci::PciBdf; @@ -301,7 +303,7 @@ pub fn start_vmm_thread( exit_event: EventFd, seccomp_action: &SeccompAction, hypervisor: Arc, -) -> Result>> { +) -> Result { #[cfg(feature = "guest_debug")] let gdb_hw_breakpoints = hypervisor.get_guest_debug_hw_bps(); #[cfg(feature = "guest_debug")] @@ -355,7 +357,7 @@ pub fn start_vmm_thread( // The VMM thread is started, we can start the dbus thread // and start serving HTTP requests #[cfg(feature = "dbus_api")] - api::start_dbus_thread( + let (_, dbus_shutdown_chs) = api::start_dbus_thread( api_event_clone.try_clone().map_err(Error::EventFdClone)?, api_sender.clone(), seccomp_action, @@ -397,7 +399,11 @@ pub fn start_vmm_thread( .map_err(Error::GdbThreadSpawn)?; } - Ok(thread) + Ok(VmmThreadHandle { + thread_handle: thread, + #[cfg(feature = "dbus_api")] + dbus_shutdown_chs, + }) } #[derive(Clone, Deserialize, Serialize)] @@ -423,6 +429,12 @@ impl VmmVersionInfo { } } +pub struct VmmThreadHandle { + pub thread_handle: thread::JoinHandle>, + #[cfg(feature = "dbus_api")] + pub dbus_shutdown_chs: DBusApiShutdownChannels, +} + pub struct Vmm { epoll: EpollContext, exit_evt: EventFd,