diff --git a/virtio-devices/src/vhost_user/blk.rs b/virtio-devices/src/vhost_user/blk.rs index 052e559dd..3dc4226b2 100644 --- a/virtio-devices/src/vhost_user/blk.rs +++ b/virtio-devices/src/vhost_user/blk.rs @@ -5,10 +5,11 @@ use super::super::{ ActivateError, ActivateResult, Queue, VirtioCommon, VirtioDevice, VirtioDeviceType, }; use super::vu_common_ctrl::{ - add_memory_region, negotiate_features_vhost_user, reset_vhost_user, setup_vhost_user, - update_mem_table, VhostUserConfig, + add_memory_region, connect_vhost_user, negotiate_features_vhost_user, reset_vhost_user, + setup_vhost_user, update_mem_table, VhostUserConfig, }; use super::{Error, Result, DEFAULT_VIRTIO_FEATURES}; +use crate::vhost_user::ReconnectEpollHandler; use crate::VirtioInterrupt; use crate::{GuestMemoryMmap, GuestRegionMmap}; use block_util::VirtioBlockConfig; @@ -16,7 +17,8 @@ use std::mem; use std::ops::Deref; use std::os::unix::io::AsRawFd; use std::result; -use std::sync::{Arc, Barrier}; +use std::sync::{Arc, Barrier, Mutex}; +use std::thread; use std::vec::Vec; use vhost::vhost_user::message::VhostUserConfigFlags; use vhost::vhost_user::message::VHOST_USER_CONFIG_OFFSET; @@ -40,10 +42,12 @@ impl VhostUserMasterReqHandler for SlaveReqHandler {} pub struct Blk { common: VirtioCommon, id: String, - vhost_user_blk: Master, + vhost_user_blk: Arc>, config: VirtioBlockConfig, guest_memory: Option>, acked_protocol_features: u64, + socket_path: String, + reconnect_epoll_thread: Option>, } impl Blk { @@ -51,8 +55,8 @@ impl Blk { pub fn new(id: String, vu_cfg: VhostUserConfig) -> Result { let num_queues = vu_cfg.num_queues; - let mut vhost_user_blk = Master::connect(&vu_cfg.socket, num_queues as u64) - .map_err(Error::VhostUserCreateMaster)?; + let mut vhost_user_blk = + connect_vhost_user(false, &vu_cfg.socket, num_queues as u64, false)?; // Filling device and vring features VMM supports. let mut avail_features = 1 << VIRTIO_BLK_F_SIZE_MAX @@ -128,15 +132,17 @@ impl Blk { queue_sizes: vec![vu_cfg.queue_size; num_queues], avail_features: acked_features, acked_features: 0, - paused_sync: Some(Arc::new(Barrier::new(1))), + paused_sync: Some(Arc::new(Barrier::new(2))), min_queues: DEFAULT_QUEUE_NUMBER as u16, ..Default::default() }, id, - vhost_user_blk, + vhost_user_blk: Arc::new(Mutex::new(vhost_user_blk)), config, guest_memory: None, acked_protocol_features, + socket_path: vu_cfg.socket, + reconnect_epoll_thread: None, }) } } @@ -189,6 +195,8 @@ impl VirtioDevice for Blk { self.config.writeback = data[0]; if let Err(e) = self .vhost_user_blk + .lock() + .unwrap() .set_config(offset as u32, VhostUserConfigFlags::WRITABLE, data) .map_err(Error::VhostUserSetConfig) { @@ -207,17 +215,56 @@ impl VirtioDevice for Blk { self.guest_memory = Some(mem.clone()); + // The backend acknowledged features must contain the protocol feature + // bit in case it was initially set but lost through the features + // negotiation with the guest. + let backend_acked_features = self.common.acked_features + | (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()); + setup_vhost_user( - &mut self.vhost_user_blk, + &mut self.vhost_user_blk.lock().unwrap(), &mem.memory(), - queues, - queue_evts, + queues.clone(), + queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(), &interrupt_cb, - self.common.acked_features - | (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()), + backend_acked_features, ) .map_err(ActivateError::VhostUserBlkSetup)?; + // Run a dedicated thread for handling potential reconnections with + // the backend. + let (kill_evt, pause_evt) = self.common.dup_eventfds(); + + let mut reconnect_handler = ReconnectEpollHandler { + vu: self.vhost_user_blk.clone(), + mem, + kill_evt, + pause_evt, + queues, + queue_evts, + virtio_interrupt: interrupt_cb, + acked_features: backend_acked_features, + acked_protocol_features: self.acked_protocol_features, + socket_path: self.socket_path.clone(), + server: false, + }; + + let paused = self.common.paused.clone(); + let paused_sync = self.common.paused_sync.clone(); + + thread::Builder::new() + .name(format!("{}_reconnect", self.id)) + .spawn(move || { + if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) { + error!("Error running reconnection worker: {:?}", e); + } + }) + .map(|thread| self.reconnect_epoll_thread = Some(thread)) + .map_err(|e| { + error!("failed to clone queue EventFd: {}", e); + ActivateError::BadActivate + })?; + Ok(()) } @@ -227,7 +274,10 @@ impl VirtioDevice for Blk { self.common.resume().ok()?; } - if let Err(e) = reset_vhost_user(&mut self.vhost_user_blk, self.common.queue_sizes.len()) { + if let Err(e) = reset_vhost_user( + &mut self.vhost_user_blk.lock().unwrap(), + self.common.queue_sizes.len(), + ) { error!("Failed to reset vhost-user daemon: {:?}", e); return None; } @@ -244,7 +294,7 @@ impl VirtioDevice for Blk { } fn shutdown(&mut self) { - let _ = unsafe { libc::close(self.vhost_user_blk.as_raw_fd()) }; + let _ = unsafe { libc::close(self.vhost_user_blk.lock().unwrap().as_raw_fd()) }; } fn add_memory_region( @@ -253,11 +303,14 @@ impl VirtioDevice for Blk { ) -> std::result::Result<(), crate::Error> { if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits() != 0 { - add_memory_region(&mut self.vhost_user_blk, region) + add_memory_region(&mut self.vhost_user_blk.lock().unwrap(), region) .map_err(crate::Error::VhostUserAddMemoryRegion) } else if let Some(guest_memory) = &self.guest_memory { - update_mem_table(&mut self.vhost_user_blk, guest_memory.memory().deref()) - .map_err(crate::Error::VhostUserUpdateMemory) + update_mem_table( + &mut self.vhost_user_blk.lock().unwrap(), + guest_memory.memory().deref(), + ) + .map_err(crate::Error::VhostUserUpdateMemory) } else { Ok(()) } @@ -270,7 +323,12 @@ impl Pausable for Blk { } fn resume(&mut self) -> result::Result<(), MigratableError> { - self.common.resume() + self.common.resume()?; + + if let Some(reconnect_epoll_thread) = &self.reconnect_epoll_thread { + reconnect_epoll_thread.thread().unpark(); + } + Ok(()) } }