virtio-devices: vhost_user: fs: Support socket reconnection handling

This commit enables socket reconnection for vhost-user-fs backends. Note
that, till this commit:

- The re-establish of the slave communication channel is no supported. So
the socket reconnection does not support virtiofsd with DAX enabled.

- Inflight I/O tracking and restoring is not supported. Therefore, only
virtio-fs daemons that are not processing inflight requests can work
normally after reconnection.

- To make the restarted virtiofsd work normally after reconnection, the
internal status of virtiofsd should also be recovered. This is not the
work of cloud-hypervisor. If the virtio-fs daemon does not support
saving or restoring its internal status, then a re-mount in guest after
socket reconnection should be performed.

Signed-off-by: Jiachen Zhang <zhangjiachen.jaycee@bytedance.com>
This commit is contained in:
Jiachen Zhang 2021-06-07 14:27:06 +08:00 committed by Sebastien Boeuf
parent 7cfb2139d0
commit deca570544

View File

@ -2,12 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use super::vu_common_ctrl::{
add_memory_region, negotiate_features_vhost_user, reset_vhost_user, setup_vhost_user,
update_mem_table,
add_memory_region, connect_vhost_user, negotiate_features_vhost_user, reset_vhost_user,
setup_vhost_user, update_mem_table,
};
use super::{Error, Result, DEFAULT_VIRTIO_FEATURES};
use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::vhost_user::handler::{VhostUserEpollConfig, VhostUserEpollHandler};
use crate::vhost_user::ReconnectEpollHandler;
use crate::{
ActivateError, ActivateResult, Queue, UserspaceMapping, VirtioCommon, VirtioDevice,
VirtioDeviceType, VirtioInterrupt, VirtioSharedMemoryList,
@ -19,7 +20,7 @@ use std::io;
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
use std::result;
use std::sync::{Arc, Barrier};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use vhost::vhost_user::message::{
VhostUserFSSlaveMsg, VhostUserFSSlaveMsgFlags, VhostUserProtocolFeatures,
@ -272,7 +273,7 @@ unsafe impl ByteValued for VirtioFsConfig {}
pub struct Fs {
common: VirtioCommon,
id: String,
vu: Master,
vu: Arc<Mutex<Master>>,
config: VirtioFsConfig,
// Hold ownership of the memory that is allocated for the device
// which will be automatically dropped when the device is dropped
@ -281,6 +282,8 @@ pub struct Fs {
seccomp_action: SeccompAction,
guest_memory: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
acked_protocol_features: u64,
socket_path: String,
reconnect_epoll_thread: Option<thread::JoinHandle<()>>,
}
impl Fs {
@ -300,8 +303,7 @@ impl Fs {
let num_queues = NUM_QUEUE_OFFSET + req_num_queues;
// Connect to the vhost-user socket.
let mut vhost_user_fs =
Master::connect(path, num_queues as u64).map_err(Error::VhostUserCreateMaster)?;
let mut vhost_user_fs = connect_vhost_user(false, &path, num_queues as u64, false)?;
// Filling device and vring features VMM supports.
let avail_features = DEFAULT_VIRTIO_FEATURES;
@ -356,18 +358,20 @@ impl Fs {
avail_features: acked_features,
acked_features: 0,
queue_sizes: vec![queue_size; num_queues],
paused_sync: Some(Arc::new(Barrier::new(2))),
paused_sync: Some(Arc::new(Barrier::new(3))),
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
id,
vu: vhost_user_fs,
vu: Arc::new(Mutex::new(vhost_user_fs)),
config,
cache,
slave_req_support,
seccomp_action,
guest_memory: None,
acked_protocol_features,
socket_path: path.to_string(),
reconnect_epoll_thread: None,
})
}
}
@ -413,14 +417,19 @@ impl VirtioDevice for Fs {
let (kill_evt, pause_evt) = self.common.dup_eventfds();
self.guest_memory = Some(mem.clone());
// The backend acknowledged features must contain the protocol feature
// bit in case it was initially set but lost through the features
// negotiation with the guest.
let backend_acked_features = self.common.acked_features
| (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits());
setup_vhost_user(
&mut self.vu,
&mut self.vu.lock().unwrap(),
&mem.memory(),
queues,
queue_evts,
queues.clone(),
queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(),
&interrupt_cb,
self.common.acked_features
| (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()),
backend_acked_features,
)
.map_err(ActivateError::VhostUserFsSetup)?;
@ -431,7 +440,7 @@ impl VirtioDevice for Fs {
cache_offset: cache.0.addr,
cache_size: cache.0.len,
mmap_cache_addr: cache.0.host_addr,
mem,
mem: mem.clone(),
});
let mut req_handler =
@ -440,6 +449,8 @@ impl VirtioDevice for Fs {
})?;
req_handler.set_reply_ack_flag(true);
self.vu
.lock()
.unwrap()
.set_slave_request_fd(req_handler.get_tx_raw_fd())
.map_err(|e| {
ActivateError::VhostUserFsSetup(Error::VhostUserSetSlaveRequestFd(e))
@ -481,6 +492,39 @@ impl VirtioDevice for Fs {
self.common.epoll_threads = Some(epoll_threads);
// Run a dedicated thread for handling potential reconnections with
// the backend.
let (kill_evt, pause_evt) = self.common.dup_eventfds();
let mut reconnect_handler = ReconnectEpollHandler {
vu: self.vu.clone(),
mem,
kill_evt,
pause_evt,
queues,
queue_evts,
virtio_interrupt: interrupt_cb,
acked_features: backend_acked_features,
acked_protocol_features: self.acked_protocol_features,
socket_path: self.socket_path.clone(),
server: false,
};
let paused = self.common.paused.clone();
let paused_sync = self.common.paused_sync.clone();
thread::Builder::new()
.name(format!("{}_reconnect", self.id))
.spawn(move || {
if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) {
error!("Error running reconnection worker: {:?}", e);
}
})
.map(|thread| self.reconnect_epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?;
event!("virtio-device", "activated", "id", &self.id);
Ok(())
}
@ -491,7 +535,9 @@ impl VirtioDevice for Fs {
self.common.resume().ok()?;
}
if let Err(e) = reset_vhost_user(&mut self.vu, self.common.queue_sizes.len()) {
if let Err(e) =
reset_vhost_user(&mut self.vu.lock().unwrap(), self.common.queue_sizes.len())
{
error!("Failed to reset vhost-user daemon: {:?}", e);
return None;
}
@ -508,7 +554,7 @@ impl VirtioDevice for Fs {
}
fn shutdown(&mut self) {
let _ = unsafe { libc::close(self.vu.as_raw_fd()) };
let _ = unsafe { libc::close(self.vu.lock().unwrap().as_raw_fd()) };
}
fn get_shm_regions(&self) -> Option<VirtioSharedMemoryList> {
@ -533,9 +579,10 @@ impl VirtioDevice for Fs {
) -> std::result::Result<(), crate::Error> {
if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits() != 0
{
add_memory_region(&mut self.vu, region).map_err(crate::Error::VhostUserAddMemoryRegion)
add_memory_region(&mut self.vu.lock().unwrap(), region)
.map_err(crate::Error::VhostUserAddMemoryRegion)
} else if let Some(guest_memory) = &self.guest_memory {
update_mem_table(&mut self.vu, guest_memory.memory().deref())
update_mem_table(&mut self.vu.lock().unwrap(), guest_memory.memory().deref())
.map_err(crate::Error::VhostUserUpdateMemory)
} else {
Ok(())
@ -564,7 +611,12 @@ impl Pausable for Fs {
}
fn resume(&mut self) -> result::Result<(), MigratableError> {
self.common.resume()
self.common.resume()?;
if let Some(reconnect_epoll_thread) = &self.reconnect_epoll_thread {
reconnect_epoll_thread.thread().unpark();
}
Ok(())
}
}