vmm: dbus: graceful shutdown of the DBusApi thread

This commit adds support for graceful shutdown of the DBusApi thread
using `futures::channel::oneshot` channels. By using oneshot channels,
we ensure that the thread has enough time to send a response to the
`VmmShutdown` method call before it is terminated. Without this step,
the thread may be terminated before it can send a response, resulting
in an error message on the client side stating that the message
recipient disconnected from the message bus without providing a reply.

Also changes the default values for DBus service name, object path
and interface name.

Signed-off-by: Omer Faruk Bayram <omer.faruk@sartura.hr>
This commit is contained in:
Omer Faruk Bayram 2023-03-25 15:18:13 +03:00 committed by Bo Chen
parent c016a0d4d3
commit f00df25d40
3 changed files with 70 additions and 13 deletions

View File

@ -18,6 +18,8 @@ use std::os::unix::io::{FromRawFd, RawFd};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use thiserror::Error;
#[cfg(feature = "dbus_api")]
use vmm::api::dbus::dbus_api_graceful_shutdown;
use vmm::config;
use vmm_sys_util::eventfd::EventFd;
use vmm_sys_util::signal::block_signal;
@ -513,7 +515,7 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateExitEventFd)?;
let vmm_thread = vmm::start_vmm_thread(
let vmm_thread_handle = vmm::start_vmm_thread(
vmm::VmmVersionInfo::new(env!("BUILD_VERSION"), env!("CARGO_PKG_VERSION")),
&api_socket_path,
api_socket_fd,
@ -568,11 +570,15 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
}
}
vmm_thread
vmm_thread_handle
.thread_handle
.join()
.map_err(Error::ThreadJoin)?
.map_err(Error::VmmThread)?;
#[cfg(feature = "dbus_api")]
dbus_api_graceful_shutdown(vmm_thread_handle.dbus_shutdown_chs);
r.map(|_| api_socket_path)
}

View File

@ -4,7 +4,8 @@
//
use super::{ApiRequest, VmAction};
use crate::{Error as VmmError, Result as VmmResult};
use futures::executor;
use futures::channel::oneshot;
use futures::{executor, FutureExt};
use hypervisor::HypervisorType;
use seccompiler::SeccompAction;
use std::sync::mpsc::Sender;
@ -15,6 +16,8 @@ use zbus::fdo::{self, Result};
use zbus::zvariant::Optional;
use zbus::{dbus_interface, ConnectionBuilder};
pub type DBusApiShutdownChannels = (oneshot::Sender<()>, oneshot::Receiver<()>);
pub struct DBusApi {
api_notifier: EventFd,
api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
@ -24,6 +27,26 @@ fn api_error(error: impl std::fmt::Debug) -> fdo::Error {
fdo::Error::Failed(format!("{error:?}"))
}
// This method is intended to ensure that the DBusApi thread has enough time to
// send a response to the VmmShutdown method call before it is terminated. If
// this step is omitted, the thread may be terminated before it can send a
// response, resulting in an error message stating that the message recipient
// disconnected from the message bus without providing a reply.
pub fn dbus_api_graceful_shutdown(ch: DBusApiShutdownChannels) {
let (send_shutdown, mut recv_done) = ch;
// send the shutdown signal and return
// if it errors out
if send_shutdown.send(()).is_err() {
return;
}
// loop until `recv_err` errors out
// or as long as the return value indicates
// "immediately stale" (None)
while let Ok(None) = recv_done.try_recv() {}
}
impl DBusApi {
pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
Self {
@ -60,7 +83,7 @@ impl DBusApi {
}
}
#[dbus_interface]
#[dbus_interface(name = "org.cloudhypervisor.DBusApi1")]
impl DBusApi {
async fn vmm_ping(&self) -> Result<String> {
let api_sender = self.clone_api_sender().await;
@ -261,26 +284,42 @@ pub fn start_dbus_thread(
_seccomp_action: &SeccompAction,
_exit_evt: EventFd,
_hypervisor_type: HypervisorType,
) -> VmmResult<thread::JoinHandle<VmmResult<()>>> {
) -> VmmResult<(thread::JoinHandle<()>, DBusApiShutdownChannels)> {
let dbus_iface = DBusApi::new(api_notifier, api_sender);
let connection = executor::block_on(async move {
ConnectionBuilder::session()?
.internal_executor(false)
.name("org.cloudhypervisor.ZBUS")?
.serve_at("/org/cloudhypervisor/ZBUS", dbus_iface)?
.name("org.cloudhypervisor.DBusApi")?
.serve_at("/org/cloudhypervisor/DBusApi", dbus_iface)?
.build()
.await
})
.map_err(VmmError::CreateDBusSession)?;
thread::Builder::new()
let (send_shutdown, recv_shutdown) = oneshot::channel::<()>();
let (send_done, recv_done) = oneshot::channel::<()>();
let thread_join_handle = thread::Builder::new()
.name("dbus-thread".to_string())
.spawn(move || {
executor::block_on(async move {
let recv_shutdown = recv_shutdown.fuse();
let executor_tick = futures::future::Fuse::terminated();
futures::pin_mut!(recv_shutdown, executor_tick);
executor_tick.set(connection.executor().tick().fuse());
loop {
connection.executor().tick().await;
futures::select! {
_ = executor_tick => executor_tick.set(connection.executor().tick().fuse()),
_ = recv_shutdown => {
send_done.send(()).ok();
break;
},
}
}
})
})
.map_err(VmmError::DBusThreadSpawn)
.map_err(VmmError::DBusThreadSpawn)?;
Ok((thread_join_handle, (send_shutdown, recv_done)))
}

View File

@ -25,6 +25,8 @@ use crate::migration::{recv_vm_config, recv_vm_state};
use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::vm::{Error as VmError, Vm, VmState};
use anyhow::anyhow;
#[cfg(feature = "dbus_api")]
use api::dbus::DBusApiShutdownChannels;
use libc::{tcsetattr, termios, EFD_NONBLOCK, SIGINT, SIGTERM, TCSANOW};
use memory_manager::MemoryManagerSnapshotData;
use pci::PciBdf;
@ -301,7 +303,7 @@ pub fn start_vmm_thread(
exit_event: EventFd,
seccomp_action: &SeccompAction,
hypervisor: Arc<dyn hypervisor::Hypervisor>,
) -> Result<thread::JoinHandle<Result<()>>> {
) -> Result<VmmThreadHandle> {
#[cfg(feature = "guest_debug")]
let gdb_hw_breakpoints = hypervisor.get_guest_debug_hw_bps();
#[cfg(feature = "guest_debug")]
@ -355,7 +357,7 @@ pub fn start_vmm_thread(
// The VMM thread is started, we can start the dbus thread
// and start serving HTTP requests
#[cfg(feature = "dbus_api")]
api::start_dbus_thread(
let (_, dbus_shutdown_chs) = api::start_dbus_thread(
api_event_clone.try_clone().map_err(Error::EventFdClone)?,
api_sender.clone(),
seccomp_action,
@ -397,7 +399,11 @@ pub fn start_vmm_thread(
.map_err(Error::GdbThreadSpawn)?;
}
Ok(thread)
Ok(VmmThreadHandle {
thread_handle: thread,
#[cfg(feature = "dbus_api")]
dbus_shutdown_chs,
})
}
#[derive(Clone, Deserialize, Serialize)]
@ -423,6 +429,12 @@ impl VmmVersionInfo {
}
}
pub struct VmmThreadHandle {
pub thread_handle: thread::JoinHandle<Result<()>>,
#[cfg(feature = "dbus_api")]
pub dbus_shutdown_chs: DBusApiShutdownChannels,
}
pub struct Vmm {
epoll: EpollContext,
exit_evt: EventFd,