diff --git a/virtio-devices/src/vhost_user/fs.rs b/virtio-devices/src/vhost_user/fs.rs index b20d4164d..23f77d851 100644 --- a/virtio-devices/src/vhost_user/fs.rs +++ b/virtio-devices/src/vhost_user/fs.rs @@ -9,12 +9,11 @@ use crate::{ ActivateError, ActivateResult, Queue, UserspaceMapping, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterrupt, VirtioSharedMemoryList, VIRTIO_F_VERSION_1, }; -use libc::{self, c_void, off64_t, pread64, pwrite64, EFD_NONBLOCK}; +use libc::{self, c_void, off64_t, pread64, pwrite64}; use seccomp::{SeccompAction, SeccompFilter}; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; use std::result; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Barrier, Mutex}; use std::thread; use vhost_rs::vhost_user::message::{ @@ -269,19 +268,11 @@ pub struct Fs { common: VirtioCommon, id: String, vu: Master, - queue_sizes: Vec, config: VirtioFsConfig, - kill_evt: Option, - pause_evt: Option, // Hold ownership of the memory that is allocated for the device // which will be automatically dropped when the device is dropped cache: Option<(VirtioSharedMemoryList, MmapRegion)>, slave_req_support: bool, - queue_evts: Option>, - interrupt_cb: Option>, - epoll_threads: Option>>, - paused: Arc, - paused_sync: Arc, seccomp_action: SeccompAction, } @@ -356,23 +347,18 @@ impl Fs { Ok(Fs { common: VirtioCommon { + device_type: VirtioDeviceType::TYPE_FS as u32, avail_features, acked_features, + queue_sizes: vec![queue_size; num_queues], + paused_sync: Some(Arc::new(Barrier::new(2))), ..Default::default() }, id, vu: master, - queue_sizes: vec![queue_size; num_queues], config, - kill_evt: None, - pause_evt: None, cache, slave_req_support, - queue_evts: None, - interrupt_cb: None, - epoll_threads: None, - paused: Arc::new(AtomicBool::new(false)), - paused_sync: Arc::new(Barrier::new(2)), seccomp_action, }) } @@ -380,7 +366,7 @@ impl Fs { impl Drop for Fs { fn drop(&mut self) { - if let Some(kill_evt) = self.kill_evt.take() { + if let Some(kill_evt) = self.common.kill_evt.take() { // Ignore the result because there is nothing we can do about it. let _ = kill_evt.write(1); } @@ -389,11 +375,11 @@ impl Drop for Fs { impl VirtioDevice for Fs { fn device_type(&self) -> u32 { - VirtioDeviceType::TYPE_FS as u32 + self.common.device_type } fn queue_max_sizes(&self) -> &[u16] { - &self.queue_sizes.as_slice() + &self.common.queue_sizes } fn features(&self) -> u64 { @@ -415,45 +401,28 @@ impl VirtioDevice for Fs { queues: Vec, queue_evts: Vec, ) -> ActivateResult { - if queues.len() != self.queue_sizes.len() || queue_evts.len() != self.queue_sizes.len() { - error!( - "Cannot perform activate. Expected {} queue(s), got {}", - self.queue_sizes.len(), - queues.len() - ); - return Err(ActivateError::BadActivate); - } + self.common.activate(&queues, &queue_evts, &interrupt_cb)?; - let (self_kill_evt, kill_evt) = EventFd::new(EFD_NONBLOCK) - .and_then(|e| Ok((e.try_clone()?, e))) + let kill_evt = self + .common + .kill_evt + .as_ref() + .unwrap() + .try_clone() .map_err(|e| { - error!("failed creating kill EventFd pair: {}", e); + error!("failed to clone kill_evt eventfd: {}", e); ActivateError::BadActivate })?; - self.kill_evt = Some(self_kill_evt); - - let (self_pause_evt, pause_evt) = EventFd::new(EFD_NONBLOCK) - .and_then(|e| Ok((e.try_clone()?, e))) + let pause_evt = self + .common + .pause_evt + .as_ref() + .unwrap() + .try_clone() .map_err(|e| { - error!("failed creating pause EventFd pair: {}", e); + error!("failed to clone pause_evt eventfd: {}", e); ActivateError::BadActivate })?; - self.pause_evt = Some(self_pause_evt); - - // Save the interrupt EventFD as we need to return it on reset - // but clone it to pass into the thread. - self.interrupt_cb = Some(interrupt_cb.clone()); - - let mut tmp_queue_evts: Vec = Vec::new(); - for queue_evt in queue_evts.iter() { - // Save the queue EventFD as we need to return it on reset - // but clone it to pass into the thread. - tmp_queue_evts.push(queue_evt.try_clone().map_err(|e| { - error!("failed to clone queue EventFd: {}", e); - ActivateError::BadActivate - })?); - } - self.queue_evts = Some(tmp_queue_evts); let vu_call_evt_queue_list = setup_vhost_user( &mut self.vu, @@ -499,8 +468,8 @@ impl VirtioDevice for Fs { slave_req_handler, }); - let paused = self.paused.clone(); - let paused_sync = self.paused_sync.clone(); + let paused = self.common.paused.clone(); + let paused_sync = self.common.paused_sync.clone(); let mut epoll_threads = Vec::new(); let virtio_vhost_fs_seccomp_filter = get_seccomp_filter(&self.seccomp_action, Thread::VirtioVhostFs) @@ -510,7 +479,7 @@ impl VirtioDevice for Fs { .spawn(move || { if let Err(e) = SeccompFilter::apply(virtio_vhost_fs_seccomp_filter) { error!("Error applying seccomp filter: {:?}", e); - } else if let Err(e) = handler.run(paused, paused_sync) { + } else if let Err(e) = handler.run(paused, paused_sync.unwrap()) { error!("Error running worker: {:?}", e); } }) @@ -520,31 +489,31 @@ impl VirtioDevice for Fs { ActivateError::BadActivate })?; - self.epoll_threads = Some(epoll_threads); + self.common.epoll_threads = Some(epoll_threads); Ok(()) } fn reset(&mut self) -> Option<(Arc, Vec)> { // We first must resume the virtio thread if it was paused. - if self.pause_evt.take().is_some() { - self.resume().ok()?; + if self.common.pause_evt.take().is_some() { + self.common.resume().ok()?; } - if let Err(e) = reset_vhost_user(&mut self.vu, self.queue_sizes.len()) { + if let Err(e) = reset_vhost_user(&mut self.vu, self.common.queue_sizes.len()) { error!("Failed to reset vhost-user daemon: {:?}", e); return None; } - if let Some(kill_evt) = self.kill_evt.take() { + if let Some(kill_evt) = self.common.kill_evt.take() { // Ignore the result because there is nothing we can do about it. let _ = kill_evt.write(1); } // Return the interrupt and queue EventFDs Some(( - self.interrupt_cb.take().unwrap(), - self.queue_evts.take().unwrap(), + self.common.interrupt_cb.take().unwrap(), + self.common.queue_evts.take().unwrap(), )) } @@ -592,7 +561,16 @@ impl VirtioDevice for Fs { } } -virtio_pausable!(Fs); +impl Pausable for Fs { + fn pause(&mut self) -> result::Result<(), MigratableError> { + self.common.pause() + } + + fn resume(&mut self) -> result::Result<(), MigratableError> { + self.common.resume() + } +} + impl Snapshottable for Fs { fn id(&self) -> String { self.id.clone()