vm-virtio: vhost: Implement the Pausable trait for all vhost-user devices

Due to the amount of code currently duplicated across vhost-user devices,
the stats for this commit is on the large side but it's mostly more
duplicated code, unfortunately.

Migratable and Snapshotable placeholder implementations are provided as
well, making all vhost-user devices Migratable.

Signed-off-by: Samuel Ortiz <sameo@linux.intel.com>
This commit is contained in:
Samuel Ortiz 2019-12-02 21:08:53 +01:00
parent dae0b2ef72
commit a122da4bef
4 changed files with 154 additions and 56 deletions

View File

@ -6,12 +6,17 @@ use libc::EFD_NONBLOCK;
use std::cmp;
use std::io::Write;
use std::ptr::null;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::vec::Vec;
use crate::VirtioInterrupt;
use super::Error as DeviceError;
use vm_device::{Migratable, MigratableError, Pausable, Snapshotable};
use vm_memory::GuestMemoryMmap;
use vmm_sys_util::eventfd::EventFd;
@ -38,12 +43,15 @@ impl VhostUserMasterReqHandler for SlaveReqHandler {}
pub struct Blk {
vhost_user_blk: Master,
kill_evt: Option<EventFd>,
pause_evt: Option<EventFd>,
avail_features: u64,
acked_features: u64,
config_space: Vec<u8>,
queue_sizes: Vec<u16>,
queue_evts: Option<Vec<EventFd>>,
interrupt_cb: Option<Arc<VirtioInterrupt>>,
epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>,
paused: Arc<AtomicBool>,
}
impl Blk {
@ -118,12 +126,15 @@ impl Blk {
Ok(Blk {
vhost_user_blk,
kill_evt: None,
pause_evt: None,
avail_features,
acked_features,
config_space,
queue_sizes: vec![vu_cfg.queue_size; vu_cfg.num_queues],
queue_evts: None,
interrupt_cb: None,
epoll_thread: None,
paused: Arc::new(AtomicBool::new(false)),
})
}
}
@ -216,16 +227,22 @@ impl VirtioDevice for Blk {
queues: Vec<Queue>,
queue_evts: Vec<EventFd>,
) -> ActivateResult {
let (self_kill_evt, kill_evt) =
match EventFd::new(EFD_NONBLOCK).and_then(|e| Ok((e.try_clone()?, e))) {
Ok(v) => v,
Err(e) => {
error!("failed creating kill EventFd pair: {}", e);
return Err(ActivateError::BadActivate);
}
};
let (self_kill_evt, kill_evt) = EventFd::new(EFD_NONBLOCK)
.and_then(|e| Ok((e.try_clone()?, e)))
.map_err(|e| {
error!("failed creating kill EventFd pair: {}", e);
ActivateError::BadActivate
})?;
self.kill_evt = Some(self_kill_evt);
let (self_pause_evt, pause_evt) = EventFd::new(EFD_NONBLOCK)
.and_then(|e| Ok((e.try_clone()?, e)))
.map_err(|e| {
error!("failed creating pause EventFd pair: {}", e);
ActivateError::BadActivate
})?;
self.pause_evt = Some(self_pause_evt);
// Save the interrupt EventFD as we need to return it on reset
// but clone it to pass into the thread.
self.interrupt_cb = Some(interrupt_cb.clone());
@ -253,24 +270,30 @@ impl VirtioDevice for Blk {
let mut handler = VhostUserEpollHandler::<SlaveReqHandler>::new(VhostUserEpollConfig {
interrupt_cb,
kill_evt,
pause_evt,
vu_interrupt_list,
slave_req_handler: None,
});
let handler_result = thread::Builder::new()
let paused = self.paused.clone();
thread::Builder::new()
.name("vhost_user_blk".to_string())
.spawn(move || {
if let Err(e) = handler.run() {
error!("net worker thread exited with error {:?}!", e);
}
});
if let Err(e) = handler_result {
error!("vhost-user blk thread create failed with error {:?}", e);
}
.spawn(move || handler.run(paused))
.map(|thread| self.epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone virtio epoll thread: {}", e);
ActivateError::BadActivate
})?;
Ok(())
}
fn reset(&mut self) -> Option<(Arc<VirtioInterrupt>, Vec<EventFd>)> {
// We first must resume the virtio thread if it was paused.
if self.pause_evt.take().is_some() {
self.resume().ok()?;
}
if let Err(e) = reset_vhost_user(&mut self.vhost_user_blk, self.queue_sizes.len()) {
error!("Failed to reset vhost-user daemon: {:?}", e);
return None;
@ -288,3 +311,7 @@ impl VirtioDevice for Blk {
))
}
}
virtio_pausable!(Blk);
impl Snapshotable for Blk {}
impl Migratable for Blk {}

View File

@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use super::vu_common_ctrl::{reset_vhost_user, setup_vhost_user};
use super::Error as DeviceError;
use super::{Error, Result};
use crate::vhost_user::handler::{VhostUserEpollConfig, VhostUserEpollHandler};
use crate::{
@ -13,6 +14,8 @@ use std::cmp;
use std::io;
use std::io::Write;
use std::os::unix::io::RawFd;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use vhost_rs::vhost_user::message::{
@ -22,6 +25,7 @@ use vhost_rs::vhost_user::{
HandlerResult, Master, MasterReqHandler, VhostUserMaster, VhostUserMasterReqHandler,
};
use vhost_rs::VhostBackend;
use vm_device::{Migratable, MigratableError, Pausable, Snapshotable};
use vm_memory::GuestMemoryMmap;
use vmm_sys_util::eventfd::EventFd;
@ -116,10 +120,13 @@ pub struct Fs {
acked_features: u64,
config_space: Vec<u8>,
kill_evt: Option<EventFd>,
pause_evt: Option<EventFd>,
cache: Option<(VirtioSharedMemoryList, u64)>,
slave_req_support: bool,
queue_evts: Option<Vec<EventFd>>,
interrupt_cb: Option<Arc<VirtioInterrupt>>,
epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>,
paused: Arc<AtomicBool>,
}
impl Fs {
@ -199,10 +206,13 @@ impl Fs {
acked_features,
config_space,
kill_evt: None,
pause_evt: None,
cache,
slave_req_support,
queue_evts: None,
interrupt_cb: None,
epoll_thread: None,
paused: Arc::new(AtomicBool::new(false)),
})
}
}
@ -299,16 +309,22 @@ impl VirtioDevice for Fs {
return Err(ActivateError::BadActivate);
}
let (self_kill_evt, kill_evt) =
match EventFd::new(EFD_NONBLOCK).and_then(|e| Ok((e.try_clone()?, e))) {
Ok(v) => v,
Err(e) => {
error!("failed creating kill EventFd pair: {}", e);
return Err(ActivateError::BadActivate);
}
};
let (self_kill_evt, kill_evt) = EventFd::new(EFD_NONBLOCK)
.and_then(|e| Ok((e.try_clone()?, e)))
.map_err(|e| {
error!("failed creating kill EventFd pair: {}", e);
ActivateError::BadActivate
})?;
self.kill_evt = Some(self_kill_evt);
let (self_pause_evt, pause_evt) = EventFd::new(EFD_NONBLOCK)
.and_then(|e| Ok((e.try_clone()?, e)))
.map_err(|e| {
error!("failed creating pause EventFd pair: {}", e);
ActivateError::BadActivate
})?;
self.pause_evt = Some(self_pause_evt);
// Save the interrupt EventFD as we need to return it on reset
// but clone it to pass into the thread.
self.interrupt_cb = Some(interrupt_cb.clone());
@ -361,26 +377,29 @@ impl VirtioDevice for Fs {
vu_interrupt_list: vu_call_evt_queue_list,
interrupt_cb,
kill_evt,
pause_evt,
slave_req_handler,
});
let worker_result = thread::Builder::new()
let paused = self.paused.clone();
thread::Builder::new()
.name("virtio_fs".to_string())
.spawn(move || {
if let Err(e) = handler.run() {
error!("net worker thread exited with error {:?}!", e);
}
});
if let Err(e) = worker_result {
error!("failed to spawn virtio-fs worker: {}", e);
return Err(ActivateError::BadActivate);
}
.spawn(move || handler.run(paused))
.map(|thread| self.epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?;
Ok(())
}
fn reset(&mut self) -> Option<(Arc<VirtioInterrupt>, Vec<EventFd>)> {
// We first must resume the virtio thread if it was paused.
if self.pause_evt.take().is_some() {
self.resume().ok()?;
}
if let Err(e) = reset_vhost_user(&mut self.vu, self.queue_sizes.len()) {
error!("Failed to reset vhost-user daemon: {:?}", e);
return None;
@ -406,3 +425,7 @@ impl VirtioDevice for Fs {
}
}
}
virtio_pausable!(Fs);
impl Snapshotable for Fs {}
impl Migratable for Fs {}

View File

@ -15,7 +15,9 @@ use vmm_sys_util::eventfd::EventFd;
use crate::VirtioInterrupt;
use std::io;
use std::os::unix::io::AsRawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use vhost_rs::vhost_user::{MasterReqHandler, VhostUserMasterReqHandler};
/// Collection of common parameters required by vhost-user devices while
@ -28,6 +30,7 @@ use vhost_rs::vhost_user::{MasterReqHandler, VhostUserMasterReqHandler};
pub struct VhostUserEpollConfig<S: VhostUserMasterReqHandler> {
pub interrupt_cb: Arc<VirtioInterrupt>,
pub kill_evt: EventFd,
pub pause_evt: EventFd,
pub vu_interrupt_list: Vec<(EventFd, Queue)>,
pub slave_req_handler: Option<MasterReqHandler<S>>,
}
@ -53,7 +56,7 @@ impl<S: VhostUserMasterReqHandler> VhostUserEpollHandler<S> {
.map_err(Error::FailedSignalingUsedQueue)
}
pub fn run(&mut self) -> Result<()> {
pub fn run(&mut self, paused: Arc<AtomicBool>) -> Result<()> {
// Create the epoll file descriptor
let epoll_fd = epoll::create(true).map_err(Error::EpollCreateFd)?;
@ -79,10 +82,20 @@ impl<S: VhostUserMasterReqHandler> VhostUserEpollHandler<S> {
)
.map_err(Error::EpollCtl)?;
let mut index = kill_evt_index;
let pause_evt_index = kill_evt_index + 1;
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_ADD,
self.vu_epoll_cfg.pause_evt.as_raw_fd(),
epoll::Event::new(epoll::Events::EPOLLIN, pause_evt_index as u64),
)
.map_err(Error::EpollCtl)?;
let mut index = pause_evt_index;
let slave_evt_index = if let Some(self_req_handler) = &self.vu_epoll_cfg.slave_req_handler {
index = kill_evt_index + 1;
index = pause_evt_index + 1;
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_ADD,
@ -136,6 +149,15 @@ impl<S: VhostUserMasterReqHandler> VhostUserEpollHandler<S> {
debug!("KILL_EVENT received, stopping epoll loop");
break 'poll;
}
x if pause_evt_index == x => {
debug!("PAUSE_EVENT received, pausing vhost-user epoll loop");
// We loop here to handle spurious park() returns.
// Until we have not resumed, the paused boolean will
// be true.
while paused.load(Ordering::SeqCst) {
thread::park();
}
}
x if (slave_evt_index.is_some() && slave_evt_index.unwrap() == x) => {
if let Some(slave_req_handler) =
self.vu_epoll_cfg.slave_req_handler.as_mut()

View File

@ -5,13 +5,17 @@ use libc;
use libc::EFD_NONBLOCK;
use std::cmp;
use std::io::Write;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::vec::Vec;
use super::Error as DeviceError;
use crate::VirtioInterrupt;
use net_util::{MacAddr, MAC_ADDR_LEN};
use vm_device::{Migratable, MigratableError, Pausable, Snapshotable};
use vm_memory::GuestMemoryMmap;
use vmm_sys_util::eventfd::EventFd;
@ -31,6 +35,7 @@ impl VhostUserMasterReqHandler for SlaveReqHandler {}
pub struct Net {
vhost_user_net: Master,
kill_evt: Option<EventFd>,
pause_evt: Option<EventFd>,
avail_features: u64,
acked_features: u64,
backend_features: u64,
@ -38,6 +43,8 @@ pub struct Net {
queue_sizes: Vec<u16>,
queue_evts: Option<Vec<EventFd>>,
interrupt_cb: Option<Arc<VirtioInterrupt>>,
epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>,
paused: Arc<AtomicBool>,
}
impl Net {
@ -109,6 +116,7 @@ impl Net {
Ok(Net {
vhost_user_net,
kill_evt: None,
pause_evt: None,
avail_features,
acked_features,
backend_features,
@ -116,6 +124,8 @@ impl Net {
queue_sizes: vec![vu_cfg.queue_size; vu_cfg.num_queues],
queue_evts: None,
interrupt_cb: None,
epoll_thread: None,
paused: Arc::new(AtomicBool::new(false)),
})
}
}
@ -201,16 +211,22 @@ impl VirtioDevice for Net {
queues: Vec<Queue>,
queue_evts: Vec<EventFd>,
) -> ActivateResult {
let (self_kill_evt, kill_evt) =
match EventFd::new(EFD_NONBLOCK).and_then(|e| Ok((e.try_clone()?, e))) {
Ok(v) => v,
Err(e) => {
error!("failed creating kill EventFd pair: {}", e);
return Err(ActivateError::BadActivate);
}
};
let (self_kill_evt, kill_evt) = EventFd::new(EFD_NONBLOCK)
.and_then(|e| Ok((e.try_clone()?, e)))
.map_err(|e| {
error!("failed creating kill EventFd pair: {}", e);
ActivateError::BadActivate
})?;
self.kill_evt = Some(self_kill_evt);
let (self_pause_evt, pause_evt) = EventFd::new(EFD_NONBLOCK)
.and_then(|e| Ok((e.try_clone()?, e)))
.map_err(|e| {
error!("failed creating pause EventFd pair: {}", e);
ActivateError::BadActivate
})?;
self.pause_evt = Some(self_pause_evt);
// Save the interrupt EventFD as we need to return it on reset
// but clone it to pass into the thread.
self.interrupt_cb = Some(interrupt_cb.clone());
@ -238,24 +254,30 @@ impl VirtioDevice for Net {
let mut handler = VhostUserEpollHandler::<SlaveReqHandler>::new(VhostUserEpollConfig {
interrupt_cb,
kill_evt,
pause_evt,
vu_interrupt_list,
slave_req_handler: None,
});
let handler_result = thread::Builder::new()
let paused = self.paused.clone();
thread::Builder::new()
.name("vhost_user_net".to_string())
.spawn(move || {
if let Err(e) = handler.run() {
error!("net worker thread exited with error {:?}!", e);
}
});
if let Err(e) = handler_result {
error!("vhost-user net thread create failed with error {:?}", e);
}
.spawn(move || handler.run(paused))
.map(|thread| self.epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?;
Ok(())
}
fn reset(&mut self) -> Option<(Arc<VirtioInterrupt>, Vec<EventFd>)> {
// We first must resume the virtio thread if it was paused.
if self.pause_evt.take().is_some() {
self.resume().ok()?;
}
if let Err(e) = reset_vhost_user(&mut self.vhost_user_net, self.queue_sizes.len()) {
error!("Failed to reset vhost-user daemon: {:?}", e);
return None;
@ -273,3 +295,7 @@ impl VirtioDevice for Net {
))
}
}
virtio_pausable!(Net);
impl Snapshotable for Net {}
impl Migratable for Net {}