vmm: dbus: implement the D-Bus API

This commit introduces three new dependencies: `zbus`, `futures`
and `blocking`. `blocking` is used to call the Internal API in zbus'
async context which is driven by `futures::executor`. They are all
behind the `dbus_api` feature flag.

The D-Bus API implementation is behind the same `dbus_api` feature
flag as well.

Signed-off-by: Omer Faruk Bayram <omer.faruk@sartura.hr>
This commit is contained in:
Omer Faruk Bayram 2023-03-23 00:03:02 +03:00 committed by Bo Chen
parent 5c96fbb19b
commit c016a0d4d3
8 changed files with 1173 additions and 98 deletions

View File

@ -42,6 +42,9 @@ jobs:
- name: Build (default features + tdx) - name: Build (default features + tdx)
run: cargo rustc --locked --bin cloud-hypervisor --features "tdx" -- -D warnings -D clippy::undocumented_unsafe_blocks run: cargo rustc --locked --bin cloud-hypervisor --features "tdx" -- -D warnings -D clippy::undocumented_unsafe_blocks
- name: Build (default features + dbus_api)
run: cargo rustc --locked --bin cloud-hypervisor --features "dbus_api" -- -D warnings -D clippy::undocumented_unsafe_blocks
- name: Build (default features + guest_debug) - name: Build (default features + guest_debug)
run: cargo rustc --locked --bin cloud-hypervisor --features "guest_debug" -- -D warnings -D clippy::undocumented_unsafe_blocks run: cargo rustc --locked --bin cloud-hypervisor --features "guest_debug" -- -D warnings -D clippy::undocumented_unsafe_blocks

935
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -65,6 +65,7 @@ wait-timeout = "0.2.0"
[features] [features]
default = ["kvm"] default = ["kvm"]
dbus_api = ["vmm/dbus_api"]
dhat-heap = ["dhat"] # For heap profiling dhat-heap = ["dhat"] # For heap profiling
guest_debug = ["vmm/guest_debug"] guest_debug = ["vmm/guest_debug"]
kvm = ["vmm/kvm"] kvm = ["vmm/kvm"]

View File

@ -445,7 +445,7 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
let (api_request_sender, api_request_receiver) = channel(); let (api_request_sender, api_request_receiver) = channel();
let api_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateApiEventFd)?; let api_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateApiEventFd)?;
let http_sender = api_request_sender.clone(); let api_request_sender_clone = api_request_sender.clone();
let seccomp_action = match &toplevel.seccomp as &str { let seccomp_action = match &toplevel.seccomp as &str {
"true" => SeccompAction::Trap, "true" => SeccompAction::Trap,
"false" => SeccompAction::Allow, "false" => SeccompAction::Allow,
@ -518,7 +518,7 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
&api_socket_path, &api_socket_path,
api_socket_fd, api_socket_fd,
api_evt.try_clone().unwrap(), api_evt.try_clone().unwrap(),
http_sender, api_request_sender_clone,
api_request_receiver, api_request_receiver,
#[cfg(feature = "guest_debug")] #[cfg(feature = "guest_debug")]
gdb_socket_path, gdb_socket_path,

View File

@ -6,6 +6,7 @@ edition = "2021"
[features] [features]
default = [] default = []
dbus_api = ["blocking", "futures", "zbus"]
guest_debug = ["kvm", "gdbstub", "gdbstub_arch"] guest_debug = ["kvm", "gdbstub", "gdbstub_arch"]
kvm = ["hypervisor/kvm", "vfio-ioctls/kvm", "vm-device/kvm", "pci/kvm"] kvm = ["hypervisor/kvm", "vfio-ioctls/kvm", "vm-device/kvm", "pci/kvm"]
mshv = ["hypervisor/mshv", "vfio-ioctls/mshv", "vm-device/mshv", "pci/mshv"] mshv = ["hypervisor/mshv", "vfio-ioctls/mshv", "vm-device/mshv", "pci/mshv"]
@ -19,9 +20,11 @@ arc-swap = "1.5.1"
arch = { path = "../arch" } arch = { path = "../arch" }
bitflags = "1.3.2" bitflags = "1.3.2"
block_util = { path = "../block_util" } block_util = { path = "../block_util" }
blocking = { version = "1.3.0", optional = true }
devices = { path = "../devices" } devices = { path = "../devices" }
epoll = "4.3.1" epoll = "4.3.1"
event_monitor = { path = "../event_monitor" } event_monitor = { path = "../event_monitor" }
futures = { version = "0.3.27", optional = true }
gdbstub = { version = "0.6.4", optional = true } gdbstub = { version = "0.6.4", optional = true }
gdbstub_arch = { version = "0.2.4", optional = true } gdbstub_arch = { version = "0.2.4", optional = true }
hypervisor = { path = "../hypervisor" } hypervisor = { path = "../hypervisor" }
@ -55,4 +58,5 @@ vm-memory = { version = "0.10.0", features = ["backend-mmap", "backend-atomic",
vm-migration = { path = "../vm-migration" } vm-migration = { path = "../vm-migration" }
vm-virtio = { path = "../vm-virtio" } vm-virtio = { path = "../vm-virtio" }
vmm-sys-util = { version = "0.11.0", features = ["with-serde"] } vmm-sys-util = { version = "0.11.0", features = ["with-serde"] }
zerocopy = "0.6.1" zbus = { version = "3.11.1", optional = true }
zerocopy = "0.6.1"

286
vmm/src/api/dbus/mod.rs Normal file
View File

@ -0,0 +1,286 @@
// Copyright © 2023 Sartura Ltd.
//
// SPDX-License-Identifier: Apache-2.0
//
use super::{ApiRequest, VmAction};
use crate::{Error as VmmError, Result as VmmResult};
use futures::executor;
use hypervisor::HypervisorType;
use seccompiler::SeccompAction;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread;
use vmm_sys_util::eventfd::EventFd;
use zbus::fdo::{self, Result};
use zbus::zvariant::Optional;
use zbus::{dbus_interface, ConnectionBuilder};
pub struct DBusApi {
api_notifier: EventFd,
api_sender: futures::lock::Mutex<Sender<ApiRequest>>,
}
fn api_error(error: impl std::fmt::Debug) -> fdo::Error {
fdo::Error::Failed(format!("{error:?}"))
}
impl DBusApi {
pub fn new(api_notifier: EventFd, api_sender: Sender<ApiRequest>) -> Self {
Self {
api_notifier,
api_sender: futures::lock::Mutex::new(api_sender),
}
}
async fn clone_api_sender(&self) -> Sender<ApiRequest> {
// lock the async mutex, clone the `Sender` and then immediately
// drop the MutexGuard so that other tasks can clone the
// `Sender` as well
self.api_sender.lock().await.clone()
}
fn clone_api_notifier(&self) -> Result<EventFd> {
self.api_notifier
.try_clone()
.map_err(|err| fdo::Error::IOError(format!("{err:?}")))
}
async fn vm_action(&self, action: VmAction) -> Result<Optional<String>> {
let api_sender = self.clone_api_sender().await;
let api_notifier = self.clone_api_notifier()?;
let result = blocking::unblock(move || super::vm_action(api_notifier, api_sender, action))
.await
.map_err(api_error)?
// We're using `from_utf8_lossy` here to not deal with the
// error case of `from_utf8` as we know that `b.body` is valid JSON.
.map(|b| String::from_utf8_lossy(&b.body).to_string());
Ok(result.into())
}
}
#[dbus_interface]
impl DBusApi {
async fn vmm_ping(&self) -> Result<String> {
let api_sender = self.clone_api_sender().await;
let api_notifier = self.clone_api_notifier()?;
let result = blocking::unblock(move || super::vmm_ping(api_notifier, api_sender))
.await
.map_err(api_error)?;
serde_json::to_string(&result).map_err(api_error)
}
async fn vmm_shutdown(&self) -> Result<()> {
let api_sender = self.clone_api_sender().await;
let api_notifier = self.clone_api_notifier()?;
blocking::unblock(move || super::vmm_shutdown(api_notifier, api_sender))
.await
.map_err(api_error)
}
async fn vm_add_device(&self, device_config: String) -> Result<Optional<String>> {
let device_config = Arc::new(serde_json::from_str(&device_config).map_err(api_error)?);
self.vm_action(VmAction::AddDevice(device_config)).await
}
async fn vm_add_disk(&self, disk_config: String) -> Result<Optional<String>> {
let disk_config = Arc::new(serde_json::from_str(&disk_config).map_err(api_error)?);
self.vm_action(VmAction::AddDisk(disk_config)).await
}
async fn vm_add_fs(&self, fs_config: String) -> Result<Optional<String>> {
let fs_config = Arc::new(serde_json::from_str(&fs_config).map_err(api_error)?);
self.vm_action(VmAction::AddFs(fs_config)).await
}
async fn vm_add_net(&self, net_config: String) -> Result<Optional<String>> {
let net_config = Arc::new(serde_json::from_str(&net_config).map_err(api_error)?);
self.vm_action(VmAction::AddNet(net_config)).await
}
async fn vm_add_pmem(&self, pmem_config: String) -> Result<Optional<String>> {
let pmem_config = Arc::new(serde_json::from_str(&pmem_config).map_err(api_error)?);
self.vm_action(VmAction::AddPmem(pmem_config)).await
}
async fn vm_add_user_device(&self, vm_add_user_device: String) -> Result<Optional<String>> {
let vm_add_user_device =
Arc::new(serde_json::from_str(&vm_add_user_device).map_err(api_error)?);
self.vm_action(VmAction::AddUserDevice(vm_add_user_device))
.await
}
async fn vm_add_vdpa(&self, vdpa_config: String) -> Result<Optional<String>> {
let vdpa_config = Arc::new(serde_json::from_str(&vdpa_config).map_err(api_error)?);
self.vm_action(VmAction::AddVdpa(vdpa_config)).await
}
async fn vm_add_vsock(&self, vsock_config: String) -> Result<Optional<String>> {
let vsock_config = Arc::new(serde_json::from_str(&vsock_config).map_err(api_error)?);
self.vm_action(VmAction::AddVsock(vsock_config)).await
}
async fn vm_boot(&self) -> Result<()> {
self.vm_action(VmAction::Boot).await.map(|_| ())
}
#[allow(unused_variables)]
// zbus doesn't support cfg attributes on interface methods
// as a workaround, we make the *call to the internal API* conditionally
// compile and return an error on unsupported platforms.
async fn vm_coredump(&self, vm_coredump_data: String) -> Result<()> {
#[cfg(all(target_arch = "x86_64", feature = "guest_debug"))]
{
let vm_coredump_data =
Arc::new(serde_json::from_str(&vm_coredump_data).map_err(api_error)?);
self.vm_action(VmAction::Coredump(vm_coredump_data))
.await
.map(|_| ())
}
#[cfg(not(all(target_arch = "x86_64", feature = "guest_debug")))]
Err(api_error(
"VmCoredump only works on x86_64 with the `guest_debug` feature enabled",
))
}
async fn vm_counters(&self) -> Result<Optional<String>> {
self.vm_action(VmAction::Counters).await
}
async fn vm_create(&self, vm_config: String) -> Result<()> {
let api_sender = self.clone_api_sender().await;
let api_notifier = self.clone_api_notifier()?;
let vm_config = Arc::new(Mutex::new(
serde_json::from_str(&vm_config).map_err(api_error)?,
));
blocking::unblock(move || super::vm_create(api_notifier, api_sender, vm_config))
.await
.map_err(api_error)?;
Ok(())
}
async fn vm_delete(&self) -> Result<()> {
self.vm_action(VmAction::Delete).await.map(|_| ())
}
async fn vm_info(&self) -> Result<String> {
let api_sender = self.clone_api_sender().await;
let api_notifier = self.clone_api_notifier()?;
let result = blocking::unblock(move || super::vm_info(api_notifier, api_sender))
.await
.map_err(api_error)?;
serde_json::to_string(&result).map_err(api_error)
}
async fn vm_pause(&self) -> Result<()> {
self.vm_action(VmAction::Pause).await.map(|_| ())
}
async fn vm_power_button(&self) -> Result<()> {
self.vm_action(VmAction::PowerButton).await.map(|_| ())
}
async fn vm_reboot(&self) -> Result<()> {
self.vm_action(VmAction::Reboot).await.map(|_| ())
}
async fn vm_remove_device(&self, vm_remove_device: String) -> Result<()> {
let vm_remove_device =
Arc::new(serde_json::from_str(&vm_remove_device).map_err(api_error)?);
self.vm_action(VmAction::RemoveDevice(vm_remove_device))
.await
.map(|_| ())
}
async fn vm_resize(&self, vm_resize: String) -> Result<()> {
let vm_resize = Arc::new(serde_json::from_str(&vm_resize).map_err(api_error)?);
self.vm_action(VmAction::Resize(vm_resize))
.await
.map(|_| ())
}
async fn vm_resize_zone(&self, vm_resize_zone: String) -> Result<()> {
let vm_resize_zone = Arc::new(serde_json::from_str(&vm_resize_zone).map_err(api_error)?);
self.vm_action(VmAction::ResizeZone(vm_resize_zone))
.await
.map(|_| ())
}
async fn vm_restore(&self, restore_config: String) -> Result<()> {
let restore_config = Arc::new(serde_json::from_str(&restore_config).map_err(api_error)?);
self.vm_action(VmAction::Restore(restore_config))
.await
.map(|_| ())
}
async fn vm_receive_migration(&self, receive_migration_data: String) -> Result<()> {
let receive_migration_data =
Arc::new(serde_json::from_str(&receive_migration_data).map_err(api_error)?);
self.vm_action(VmAction::ReceiveMigration(receive_migration_data))
.await
.map(|_| ())
}
async fn vm_send_migration(&self, send_migration_data: String) -> Result<()> {
let send_migration_data =
Arc::new(serde_json::from_str(&send_migration_data).map_err(api_error)?);
self.vm_action(VmAction::SendMigration(send_migration_data))
.await
.map(|_| ())
}
async fn vm_resume(&self) -> Result<()> {
self.vm_action(VmAction::Resume).await.map(|_| ())
}
async fn vm_shutdown(&self) -> Result<()> {
self.vm_action(VmAction::Shutdown).await.map(|_| ())
}
async fn vm_snapshot(&self, vm_snapshot_config: String) -> Result<()> {
let vm_snapshot_config =
Arc::new(serde_json::from_str(&vm_snapshot_config).map_err(api_error)?);
self.vm_action(VmAction::Snapshot(vm_snapshot_config))
.await
.map(|_| ())
}
}
// TODO: add another field to the `seccomp_filters::Thread` enum for the D-Bus API
// TODO: add command line arguments to make this configurable
pub fn start_dbus_thread(
api_notifier: EventFd,
api_sender: Sender<ApiRequest>,
_seccomp_action: &SeccompAction,
_exit_evt: EventFd,
_hypervisor_type: HypervisorType,
) -> VmmResult<thread::JoinHandle<VmmResult<()>>> {
let dbus_iface = DBusApi::new(api_notifier, api_sender);
let connection = executor::block_on(async move {
ConnectionBuilder::session()?
.internal_executor(false)
.name("org.cloudhypervisor.ZBUS")?
.serve_at("/org/cloudhypervisor/ZBUS", dbus_iface)?
.build()
.await
})
.map_err(VmmError::CreateDBusSession)?;
thread::Builder::new()
.name("dbus-thread".to_string())
.spawn(move || {
executor::block_on(async move {
loop {
connection.executor().tick().await;
}
})
})
.map_err(VmmError::DBusThreadSpawn)
}

View File

@ -28,11 +28,15 @@
//! response channel Receiver. //! response channel Receiver.
//! 5. The thread handles the response and forwards potential errors. //! 5. The thread handles the response and forwards potential errors.
#[cfg(feature = "dbus_api")]
pub mod dbus;
pub mod http;
#[cfg(feature = "dbus_api")]
pub use self::dbus::start_dbus_thread;
pub use self::http::start_http_fd_thread; pub use self::http::start_http_fd_thread;
pub use self::http::start_http_path_thread; pub use self::http::start_http_path_thread;
pub mod http;
use crate::config::{ use crate::config::{
DeviceConfig, DiskConfig, FsConfig, NetConfig, PmemConfig, RestoreConfig, UserDeviceConfig, DeviceConfig, DiskConfig, FsConfig, NetConfig, PmemConfig, RestoreConfig, UserDeviceConfig,
VdpaConfig, VmConfig, VsockConfig, VdpaConfig, VmConfig, VsockConfig,

View File

@ -113,6 +113,16 @@ pub enum Error {
#[error("Error spawning HTTP thread: {0}")] #[error("Error spawning HTTP thread: {0}")]
HttpThreadSpawn(#[source] io::Error), HttpThreadSpawn(#[source] io::Error),
/// Cannot create D-Bus thread
#[cfg(feature = "dbus_api")]
#[error("Error spawning D-Bus thread: {0}")]
DBusThreadSpawn(#[source] io::Error),
/// Cannot start D-Bus session
#[cfg(feature = "dbus_api")]
#[error("Error starting D-Bus session: {0}")]
CreateDBusSession(#[source] zbus::Error),
/// Cannot handle the VM STDIN stream /// Cannot handle the VM STDIN stream
#[error("Error handling VM stdin: {0:?}")] #[error("Error handling VM stdin: {0:?}")]
Stdin(VmError), Stdin(VmError),
@ -301,7 +311,7 @@ pub fn start_vmm_thread(
#[cfg(feature = "guest_debug")] #[cfg(feature = "guest_debug")]
let gdb_vm_debug_event = vm_debug_event.try_clone().map_err(Error::EventFdClone)?; let gdb_vm_debug_event = vm_debug_event.try_clone().map_err(Error::EventFdClone)?;
let http_api_event = api_event.try_clone().map_err(Error::EventFdClone)?; let api_event_clone = api_event.try_clone().map_err(Error::EventFdClone)?;
let hypervisor_type = hypervisor.hypervisor_type(); let hypervisor_type = hypervisor.hypervisor_type();
// Retrieve seccomp filter // Retrieve seccomp filter
@ -342,11 +352,21 @@ pub fn start_vmm_thread(
.map_err(Error::VmmThreadSpawn)? .map_err(Error::VmmThreadSpawn)?
}; };
// The VMM thread is started, we can start serving HTTP requests // The VMM thread is started, we can start the dbus thread
// and start serving HTTP requests
#[cfg(feature = "dbus_api")]
api::start_dbus_thread(
api_event_clone.try_clone().map_err(Error::EventFdClone)?,
api_sender.clone(),
seccomp_action,
exit_event.try_clone().map_err(Error::EventFdClone)?,
hypervisor_type,
)?;
if let Some(http_path) = http_path { if let Some(http_path) = http_path {
api::start_http_path_thread( api::start_http_path_thread(
http_path, http_path,
http_api_event, api_event_clone,
api_sender, api_sender,
seccomp_action, seccomp_action,
exit_event, exit_event,
@ -355,7 +375,7 @@ pub fn start_vmm_thread(
} else if let Some(http_fd) = http_fd { } else if let Some(http_fd) = http_fd {
api::start_http_fd_thread( api::start_http_fd_thread(
http_fd, http_fd,
http_api_event, api_event_clone,
api_sender, api_sender,
seccomp_action, seccomp_action,
exit_event, exit_event,