From deca5705448e7265f75209db0ae518b08b3ae6eb Mon Sep 17 00:00:00 2001 From: Jiachen Zhang Date: Mon, 7 Jun 2021 14:27:06 +0800 Subject: [PATCH] virtio-devices: vhost_user: fs: Support socket reconnection handling This commit enables socket reconnection for vhost-user-fs backends. Note that, till this commit: - The re-establish of the slave communication channel is no supported. So the socket reconnection does not support virtiofsd with DAX enabled. - Inflight I/O tracking and restoring is not supported. Therefore, only virtio-fs daemons that are not processing inflight requests can work normally after reconnection. - To make the restarted virtiofsd work normally after reconnection, the internal status of virtiofsd should also be recovered. This is not the work of cloud-hypervisor. If the virtio-fs daemon does not support saving or restoring its internal status, then a re-mount in guest after socket reconnection should be performed. Signed-off-by: Jiachen Zhang --- virtio-devices/src/vhost_user/fs.rs | 90 +++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 19 deletions(-) diff --git a/virtio-devices/src/vhost_user/fs.rs b/virtio-devices/src/vhost_user/fs.rs index f4252dacc..a530fb304 100644 --- a/virtio-devices/src/vhost_user/fs.rs +++ b/virtio-devices/src/vhost_user/fs.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use super::vu_common_ctrl::{ - add_memory_region, negotiate_features_vhost_user, reset_vhost_user, setup_vhost_user, - update_mem_table, + add_memory_region, connect_vhost_user, negotiate_features_vhost_user, reset_vhost_user, + setup_vhost_user, update_mem_table, }; use super::{Error, Result, DEFAULT_VIRTIO_FEATURES}; use crate::seccomp_filters::{get_seccomp_filter, Thread}; use crate::vhost_user::handler::{VhostUserEpollConfig, VhostUserEpollHandler}; +use crate::vhost_user::ReconnectEpollHandler; use crate::{ ActivateError, ActivateResult, Queue, UserspaceMapping, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterrupt, VirtioSharedMemoryList, @@ -19,7 +20,7 @@ use std::io; use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; use std::result; -use std::sync::{Arc, Barrier}; +use std::sync::{Arc, Barrier, Mutex}; use std::thread; use vhost::vhost_user::message::{ VhostUserFSSlaveMsg, VhostUserFSSlaveMsgFlags, VhostUserProtocolFeatures, @@ -272,7 +273,7 @@ unsafe impl ByteValued for VirtioFsConfig {} pub struct Fs { common: VirtioCommon, id: String, - vu: Master, + vu: Arc>, config: VirtioFsConfig, // Hold ownership of the memory that is allocated for the device // which will be automatically dropped when the device is dropped @@ -281,6 +282,8 @@ pub struct Fs { seccomp_action: SeccompAction, guest_memory: Option>, acked_protocol_features: u64, + socket_path: String, + reconnect_epoll_thread: Option>, } impl Fs { @@ -300,8 +303,7 @@ impl Fs { let num_queues = NUM_QUEUE_OFFSET + req_num_queues; // Connect to the vhost-user socket. - let mut vhost_user_fs = - Master::connect(path, num_queues as u64).map_err(Error::VhostUserCreateMaster)?; + let mut vhost_user_fs = connect_vhost_user(false, &path, num_queues as u64, false)?; // Filling device and vring features VMM supports. let avail_features = DEFAULT_VIRTIO_FEATURES; @@ -356,18 +358,20 @@ impl Fs { avail_features: acked_features, acked_features: 0, queue_sizes: vec![queue_size; num_queues], - paused_sync: Some(Arc::new(Barrier::new(2))), + paused_sync: Some(Arc::new(Barrier::new(3))), min_queues: DEFAULT_QUEUE_NUMBER as u16, ..Default::default() }, id, - vu: vhost_user_fs, + vu: Arc::new(Mutex::new(vhost_user_fs)), config, cache, slave_req_support, seccomp_action, guest_memory: None, acked_protocol_features, + socket_path: path.to_string(), + reconnect_epoll_thread: None, }) } } @@ -413,14 +417,19 @@ impl VirtioDevice for Fs { let (kill_evt, pause_evt) = self.common.dup_eventfds(); self.guest_memory = Some(mem.clone()); + // The backend acknowledged features must contain the protocol feature + // bit in case it was initially set but lost through the features + // negotiation with the guest. + let backend_acked_features = self.common.acked_features + | (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()); + setup_vhost_user( - &mut self.vu, + &mut self.vu.lock().unwrap(), &mem.memory(), - queues, - queue_evts, + queues.clone(), + queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(), &interrupt_cb, - self.common.acked_features - | (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()), + backend_acked_features, ) .map_err(ActivateError::VhostUserFsSetup)?; @@ -431,7 +440,7 @@ impl VirtioDevice for Fs { cache_offset: cache.0.addr, cache_size: cache.0.len, mmap_cache_addr: cache.0.host_addr, - mem, + mem: mem.clone(), }); let mut req_handler = @@ -440,6 +449,8 @@ impl VirtioDevice for Fs { })?; req_handler.set_reply_ack_flag(true); self.vu + .lock() + .unwrap() .set_slave_request_fd(req_handler.get_tx_raw_fd()) .map_err(|e| { ActivateError::VhostUserFsSetup(Error::VhostUserSetSlaveRequestFd(e)) @@ -481,6 +492,39 @@ impl VirtioDevice for Fs { self.common.epoll_threads = Some(epoll_threads); + // Run a dedicated thread for handling potential reconnections with + // the backend. + let (kill_evt, pause_evt) = self.common.dup_eventfds(); + let mut reconnect_handler = ReconnectEpollHandler { + vu: self.vu.clone(), + mem, + kill_evt, + pause_evt, + queues, + queue_evts, + virtio_interrupt: interrupt_cb, + acked_features: backend_acked_features, + acked_protocol_features: self.acked_protocol_features, + socket_path: self.socket_path.clone(), + server: false, + }; + + let paused = self.common.paused.clone(); + let paused_sync = self.common.paused_sync.clone(); + + thread::Builder::new() + .name(format!("{}_reconnect", self.id)) + .spawn(move || { + if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) { + error!("Error running reconnection worker: {:?}", e); + } + }) + .map(|thread| self.reconnect_epoll_thread = Some(thread)) + .map_err(|e| { + error!("failed to clone queue EventFd: {}", e); + ActivateError::BadActivate + })?; + event!("virtio-device", "activated", "id", &self.id); Ok(()) } @@ -491,7 +535,9 @@ impl VirtioDevice for Fs { self.common.resume().ok()?; } - if let Err(e) = reset_vhost_user(&mut self.vu, self.common.queue_sizes.len()) { + if let Err(e) = + reset_vhost_user(&mut self.vu.lock().unwrap(), self.common.queue_sizes.len()) + { error!("Failed to reset vhost-user daemon: {:?}", e); return None; } @@ -508,7 +554,7 @@ impl VirtioDevice for Fs { } fn shutdown(&mut self) { - let _ = unsafe { libc::close(self.vu.as_raw_fd()) }; + let _ = unsafe { libc::close(self.vu.lock().unwrap().as_raw_fd()) }; } fn get_shm_regions(&self) -> Option { @@ -533,9 +579,10 @@ impl VirtioDevice for Fs { ) -> std::result::Result<(), crate::Error> { if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits() != 0 { - add_memory_region(&mut self.vu, region).map_err(crate::Error::VhostUserAddMemoryRegion) + add_memory_region(&mut self.vu.lock().unwrap(), region) + .map_err(crate::Error::VhostUserAddMemoryRegion) } else if let Some(guest_memory) = &self.guest_memory { - update_mem_table(&mut self.vu, guest_memory.memory().deref()) + update_mem_table(&mut self.vu.lock().unwrap(), guest_memory.memory().deref()) .map_err(crate::Error::VhostUserUpdateMemory) } else { Ok(()) @@ -564,7 +611,12 @@ impl Pausable for Fs { } fn resume(&mut self) -> result::Result<(), MigratableError> { - self.common.resume() + self.common.resume()?; + + if let Some(reconnect_epoll_thread) = &self.reconnect_epoll_thread { + reconnect_epoll_thread.thread().unpark(); + } + Ok(()) } }