virtio-devices: vhost_user: blk: Support socket reconnection handling

This commit enables socket reconnection for vhost-user-blk backends.

Note that, till this commit, inflight I/O trakcing and restoring is not
supported. Therefore, only vhost-user-blk backend that are not processing
inflight requests can work normally after reconnection.

Signed-off-by: Jiachen Zhang <zhangjiachen.jaycee@bytedance.com>
This commit is contained in:
Jiachen Zhang 2021-06-03 12:10:31 +08:00 committed by Sebastien Boeuf
parent 058946772a
commit 650cbce017

View File

@ -5,10 +5,11 @@ use super::super::{
ActivateError, ActivateResult, Queue, VirtioCommon, VirtioDevice, VirtioDeviceType,
};
use super::vu_common_ctrl::{
add_memory_region, negotiate_features_vhost_user, reset_vhost_user, setup_vhost_user,
update_mem_table, VhostUserConfig,
add_memory_region, connect_vhost_user, negotiate_features_vhost_user, reset_vhost_user,
setup_vhost_user, update_mem_table, VhostUserConfig,
};
use super::{Error, Result, DEFAULT_VIRTIO_FEATURES};
use crate::vhost_user::ReconnectEpollHandler;
use crate::VirtioInterrupt;
use crate::{GuestMemoryMmap, GuestRegionMmap};
use block_util::VirtioBlockConfig;
@ -16,7 +17,8 @@ use std::mem;
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::{Arc, Barrier};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::vec::Vec;
use vhost::vhost_user::message::VhostUserConfigFlags;
use vhost::vhost_user::message::VHOST_USER_CONFIG_OFFSET;
@ -40,10 +42,12 @@ impl VhostUserMasterReqHandler for SlaveReqHandler {}
pub struct Blk {
common: VirtioCommon,
id: String,
vhost_user_blk: Master,
vhost_user_blk: Arc<Mutex<Master>>,
config: VirtioBlockConfig,
guest_memory: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
acked_protocol_features: u64,
socket_path: String,
reconnect_epoll_thread: Option<thread::JoinHandle<()>>,
}
impl Blk {
@ -51,8 +55,8 @@ impl Blk {
pub fn new(id: String, vu_cfg: VhostUserConfig) -> Result<Blk> {
let num_queues = vu_cfg.num_queues;
let mut vhost_user_blk = Master::connect(&vu_cfg.socket, num_queues as u64)
.map_err(Error::VhostUserCreateMaster)?;
let mut vhost_user_blk =
connect_vhost_user(false, &vu_cfg.socket, num_queues as u64, false)?;
// Filling device and vring features VMM supports.
let mut avail_features = 1 << VIRTIO_BLK_F_SIZE_MAX
@ -128,15 +132,17 @@ impl Blk {
queue_sizes: vec![vu_cfg.queue_size; num_queues],
avail_features: acked_features,
acked_features: 0,
paused_sync: Some(Arc::new(Barrier::new(1))),
paused_sync: Some(Arc::new(Barrier::new(2))),
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
id,
vhost_user_blk,
vhost_user_blk: Arc::new(Mutex::new(vhost_user_blk)),
config,
guest_memory: None,
acked_protocol_features,
socket_path: vu_cfg.socket,
reconnect_epoll_thread: None,
})
}
}
@ -189,6 +195,8 @@ impl VirtioDevice for Blk {
self.config.writeback = data[0];
if let Err(e) = self
.vhost_user_blk
.lock()
.unwrap()
.set_config(offset as u32, VhostUserConfigFlags::WRITABLE, data)
.map_err(Error::VhostUserSetConfig)
{
@ -207,17 +215,56 @@ impl VirtioDevice for Blk {
self.guest_memory = Some(mem.clone());
// The backend acknowledged features must contain the protocol feature
// bit in case it was initially set but lost through the features
// negotiation with the guest.
let backend_acked_features = self.common.acked_features
| (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits());
setup_vhost_user(
&mut self.vhost_user_blk,
&mut self.vhost_user_blk.lock().unwrap(),
&mem.memory(),
queues,
queue_evts,
queues.clone(),
queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(),
&interrupt_cb,
self.common.acked_features
| (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()),
backend_acked_features,
)
.map_err(ActivateError::VhostUserBlkSetup)?;
// Run a dedicated thread for handling potential reconnections with
// the backend.
let (kill_evt, pause_evt) = self.common.dup_eventfds();
let mut reconnect_handler = ReconnectEpollHandler {
vu: self.vhost_user_blk.clone(),
mem,
kill_evt,
pause_evt,
queues,
queue_evts,
virtio_interrupt: interrupt_cb,
acked_features: backend_acked_features,
acked_protocol_features: self.acked_protocol_features,
socket_path: self.socket_path.clone(),
server: false,
};
let paused = self.common.paused.clone();
let paused_sync = self.common.paused_sync.clone();
thread::Builder::new()
.name(format!("{}_reconnect", self.id))
.spawn(move || {
if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) {
error!("Error running reconnection worker: {:?}", e);
}
})
.map(|thread| self.reconnect_epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?;
Ok(())
}
@ -227,7 +274,10 @@ impl VirtioDevice for Blk {
self.common.resume().ok()?;
}
if let Err(e) = reset_vhost_user(&mut self.vhost_user_blk, self.common.queue_sizes.len()) {
if let Err(e) = reset_vhost_user(
&mut self.vhost_user_blk.lock().unwrap(),
self.common.queue_sizes.len(),
) {
error!("Failed to reset vhost-user daemon: {:?}", e);
return None;
}
@ -244,7 +294,7 @@ impl VirtioDevice for Blk {
}
fn shutdown(&mut self) {
let _ = unsafe { libc::close(self.vhost_user_blk.as_raw_fd()) };
let _ = unsafe { libc::close(self.vhost_user_blk.lock().unwrap().as_raw_fd()) };
}
fn add_memory_region(
@ -253,11 +303,14 @@ impl VirtioDevice for Blk {
) -> std::result::Result<(), crate::Error> {
if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits() != 0
{
add_memory_region(&mut self.vhost_user_blk, region)
add_memory_region(&mut self.vhost_user_blk.lock().unwrap(), region)
.map_err(crate::Error::VhostUserAddMemoryRegion)
} else if let Some(guest_memory) = &self.guest_memory {
update_mem_table(&mut self.vhost_user_blk, guest_memory.memory().deref())
.map_err(crate::Error::VhostUserUpdateMemory)
update_mem_table(
&mut self.vhost_user_blk.lock().unwrap(),
guest_memory.memory().deref(),
)
.map_err(crate::Error::VhostUserUpdateMemory)
} else {
Ok(())
}
@ -270,7 +323,12 @@ impl Pausable for Blk {
}
fn resume(&mut self) -> result::Result<(), MigratableError> {
self.common.resume()
self.common.resume()?;
if let Some(reconnect_epoll_thread) = &self.reconnect_epoll_thread {
reconnect_epoll_thread.thread().unpark();
}
Ok(())
}
}