virtio-devices: vhost_user: Refactor through VhostUserCommon

Introducing a new structure VhostUserCommon allowing to factorize a lot
of the code shared between the vhost-user devices (block, fs and net).

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2021-08-11 10:56:29 +02:00 committed by Bo Chen
parent 4918c1ca7f
commit 6d34ed03f7
4 changed files with 407 additions and 609 deletions

View File

@ -6,30 +6,28 @@ use super::super::{
};
use super::vu_common_ctrl::{VhostUserConfig, VhostUserHandle};
use super::{Error, Result, DEFAULT_VIRTIO_FEATURES};
use crate::vhost_user::{Inflight, VhostUserEpollHandler};
use crate::vhost_user::VhostUserCommon;
use crate::VirtioInterrupt;
use crate::{GuestMemoryMmap, GuestRegionMmap};
use anyhow::anyhow;
use block_util::VirtioBlockConfig;
use std::mem;
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::vec::Vec;
use versionize::{VersionMap, Versionize, VersionizeResult};
use versionize_derive::Versionize;
use vhost::vhost_user::message::VhostUserConfigFlags;
use vhost::vhost_user::message::VHOST_USER_CONFIG_OFFSET;
use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
use vhost::vhost_user::message::{
VhostUserConfigFlags, VhostUserProtocolFeatures, VhostUserVirtioFeatures,
VHOST_USER_CONFIG_OFFSET,
};
use vhost::vhost_user::{MasterReqHandler, VhostUserMaster, VhostUserMasterReqHandler};
use virtio_bindings::bindings::virtio_blk::{
VIRTIO_BLK_F_BLK_SIZE, VIRTIO_BLK_F_CONFIG_WCE, VIRTIO_BLK_F_DISCARD, VIRTIO_BLK_F_FLUSH,
VIRTIO_BLK_F_GEOMETRY, VIRTIO_BLK_F_MQ, VIRTIO_BLK_F_RO, VIRTIO_BLK_F_SEG_MAX,
VIRTIO_BLK_F_SIZE_MAX, VIRTIO_BLK_F_TOPOLOGY, VIRTIO_BLK_F_WRITE_ZEROES,
};
use vm_memory::{Address, ByteValued, GuestAddressSpace, GuestMemory, GuestMemoryAtomic};
use vm_memory::{ByteValued, GuestMemoryAtomic};
use vm_migration::{
protocol::MemoryRangeTable, Migratable, MigratableError, Pausable, Snapshot, Snapshottable,
Transportable, VersionMapped,
@ -54,15 +52,11 @@ impl VhostUserMasterReqHandler for SlaveReqHandler {}
pub struct Blk {
common: VirtioCommon,
vu_common: VhostUserCommon,
id: String,
vu: Option<Arc<Mutex<VhostUserHandle>>>,
config: VirtioBlockConfig,
guest_memory: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
acked_protocol_features: u64,
socket_path: String,
epoll_thread: Option<thread::JoinHandle<()>>,
vu_num_queues: usize,
migration_started: bool,
}
impl Blk {
@ -75,7 +69,6 @@ impl Blk {
// enough to handle all the potential queues. VirtioPciDevice::new()
// will create the actual queues based on this information.
return Ok(Blk {
id,
common: VirtioCommon {
device_type: VirtioDeviceType::Block as u32,
queue_sizes: vec![vu_cfg.queue_size; num_queues],
@ -83,14 +76,15 @@ impl Blk {
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
vu: None,
vu_common: VhostUserCommon {
socket_path: vu_cfg.socket,
vu_num_queues: num_queues,
..Default::default()
},
id,
config: VirtioBlockConfig::default(),
guest_memory: None,
acked_protocol_features: 0,
socket_path: vu_cfg.socket,
epoll_thread: None,
vu_num_queues: num_queues,
migration_started: false,
});
}
@ -166,15 +160,17 @@ impl Blk {
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
vu_common: VhostUserCommon {
vu: Some(Arc::new(Mutex::new(vu))),
acked_protocol_features,
socket_path: vu_cfg.socket,
vu_num_queues: num_queues,
..Default::default()
},
id,
vu: Some(Arc::new(Mutex::new(vu))),
config,
guest_memory: None,
acked_protocol_features,
socket_path: vu_cfg.socket,
epoll_thread: None,
vu_num_queues: num_queues,
migration_started: false,
})
}
@ -183,8 +179,8 @@ impl Blk {
avail_features: self.common.avail_features,
acked_features: self.common.acked_features,
config: self.config,
acked_protocol_features: self.acked_protocol_features,
vu_num_queues: self.vu_num_queues,
acked_protocol_features: self.vu_common.acked_protocol_features,
vu_num_queues: self.vu_common.vu_num_queues,
}
}
@ -192,31 +188,18 @@ impl Blk {
self.common.avail_features = state.avail_features;
self.common.acked_features = state.acked_features;
self.config = state.config;
self.acked_protocol_features = state.acked_protocol_features;
self.vu_num_queues = state.vu_num_queues;
self.vu_common.acked_protocol_features = state.acked_protocol_features;
self.vu_common.vu_num_queues = state.vu_num_queues;
let mut vu = match VhostUserHandle::connect_vhost_user(
false,
&self.socket_path,
self.vu_num_queues as u64,
false,
) {
Ok(r) => r,
Err(e) => {
error!("Failed connecting vhost-user backend: {:?}", e);
return;
}
};
if let Err(e) = vu.set_protocol_features_vhost_user(
self.common.acked_features,
self.acked_protocol_features,
) {
error!("Failed setting up vhost-user backend: {:?}", e);
return;
if let Err(e) = self
.vu_common
.restore_backend_connection(self.common.acked_features)
{
error!(
"Failed restoring connection with vhost-user backend: {:?}",
e
);
}
self.vu = Some(Arc::new(Mutex::new(vu)));
}
}
@ -266,7 +249,7 @@ impl VirtioDevice for Blk {
}
self.config.writeback = data[0];
if let Some(vu) = &self.vu {
if let Some(vu) = &self.vu_common.vu {
if let Err(e) = vu
.lock()
.unwrap()
@ -297,51 +280,20 @@ impl VirtioDevice for Blk {
let backend_acked_features = self.common.acked_features
| (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits());
let mut inflight: Option<Inflight> =
if self.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits() != 0
{
Some(Inflight::default())
} else {
None
};
if self.vu.is_none() {
error!("Missing vhost-user handle");
return Err(ActivateError::BadActivate);
}
let vu = self.vu.as_ref().unwrap();
vu.lock()
.unwrap()
.setup_vhost_user(
&mem.memory(),
queues.clone(),
queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(),
&interrupt_cb,
backend_acked_features,
&slave_req_handler,
inflight.as_mut(),
)
.map_err(ActivateError::VhostUserBlkSetup)?;
// Run a dedicated thread for handling potential reconnections with
// the backend.
let (kill_evt, pause_evt) = self.common.dup_eventfds();
let mut handler: VhostUserEpollHandler<SlaveReqHandler> = VhostUserEpollHandler {
vu: vu.clone(),
let mut handler = self.vu_common.activate(
mem,
kill_evt,
pause_evt,
queues,
queue_evts,
virtio_interrupt: interrupt_cb,
acked_features: backend_acked_features,
acked_protocol_features: self.acked_protocol_features,
socket_path: self.socket_path.clone(),
server: false,
slave_req_handler: None,
inflight,
};
interrupt_cb,
backend_acked_features,
slave_req_handler,
kill_evt,
pause_evt,
)?;
let paused = self.common.paused.clone();
let paused_sync = self.common.paused_sync.clone();
@ -368,7 +320,7 @@ impl VirtioDevice for Blk {
self.common.resume().ok()?;
}
if let Some(vu) = &self.vu {
if let Some(vu) = &self.vu_common.vu {
if let Err(e) = vu
.lock()
.unwrap()
@ -391,47 +343,20 @@ impl VirtioDevice for Blk {
}
fn shutdown(&mut self) {
if let Some(vu) = &self.vu {
let _ = unsafe { libc::close(vu.lock().unwrap().socket_handle().as_raw_fd()) };
}
self.vu_common.shutdown()
}
fn add_memory_region(
&mut self,
region: &Arc<GuestRegionMmap>,
) -> std::result::Result<(), crate::Error> {
if let Some(vu) = &self.vu {
if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits()
!= 0
{
return vu
.lock()
.unwrap()
.add_memory_region(region)
.map_err(crate::Error::VhostUserAddMemoryRegion);
} else if let Some(guest_memory) = &self.guest_memory {
return vu
.lock()
.unwrap()
.update_mem_table(guest_memory.memory().deref())
.map_err(crate::Error::VhostUserUpdateMemory);
}
}
Ok(())
self.vu_common.add_memory_region(&self.guest_memory, region)
}
}
impl Pausable for Blk {
fn pause(&mut self) -> result::Result<(), MigratableError> {
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.pause_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Pause(anyhow!("Error pausing vhost-user-blk backend: {:?}", e))
})?;
}
self.vu_common.pause()?;
self.common.pause()
}
@ -442,19 +367,7 @@ impl Pausable for Blk {
epoll_thread.thread().unpark();
}
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.resume_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Resume(anyhow!(
"Error resuming vhost-user-blk backend: {:?}",
e
))
})
} else {
Ok(())
}
self.vu_common.resume()
}
}
@ -464,13 +377,7 @@ impl Snapshottable for Blk {
}
fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
let snapshot = Snapshot::new_from_versioned_state(&self.id(), &self.state())?;
if self.migration_started {
self.shutdown();
}
Ok(snapshot)
self.vu_common.snapshot(&self.id(), &self.state())
}
fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
@ -482,77 +389,19 @@ impl Transportable for Blk {}
impl Migratable for Blk {
fn start_dirty_log(&mut self) -> std::result::Result<(), MigratableError> {
self.migration_started = true;
if let Some(vu) = &self.vu {
if let Some(guest_memory) = &self.guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock()
.unwrap()
.start_dirty_log(last_ram_addr)
.map_err(|e| {
MigratableError::StartDirtyLog(anyhow!(
"Error starting migration for vhost-user-blk backend: {:?}",
e
))
})
} else {
Err(MigratableError::StartDirtyLog(anyhow!(
"Missing guest memory"
)))
}
} else {
Ok(())
}
self.vu_common.start_dirty_log(&self.guest_memory)
}
fn stop_dirty_log(&mut self) -> std::result::Result<(), MigratableError> {
self.migration_started = false;
if let Some(vu) = &self.vu {
vu.lock().unwrap().stop_dirty_log().map_err(|e| {
MigratableError::StopDirtyLog(anyhow!(
"Error stopping migration for vhost-user-blk backend: {:?}",
e
))
})
} else {
Ok(())
}
self.vu_common.stop_dirty_log()
}
fn dirty_log(&mut self) -> std::result::Result<MemoryRangeTable, MigratableError> {
if let Some(vu) = &self.vu {
if let Some(guest_memory) = &self.guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock().unwrap().dirty_log(last_ram_addr).map_err(|e| {
MigratableError::DirtyLog(anyhow!(
"Error retrieving dirty ranges from vhost-user-blk backend: {:?}",
e
))
})
} else {
Err(MigratableError::DirtyLog(anyhow!("Missing guest memory")))
}
} else {
Ok(MemoryRangeTable::default())
}
self.vu_common.dirty_log(&self.guest_memory)
}
fn complete_migration(&mut self) -> std::result::Result<(), MigratableError> {
// Make sure the device thread is killed in order to prevent from
// reconnections to the socket.
if let Some(kill_evt) = self.common.kill_evt.take() {
kill_evt.write(1).map_err(|e| {
MigratableError::CompleteMigration(anyhow!(
"Error killing vhost-user-blk thread: {:?}",
e
))
})?;
}
// Drop the vhost-user handler to avoid further calls to fail because
// the connection with the backend has been closed.
self.vu = None;
Ok(())
self.vu_common
.complete_migration(self.common.kill_evt.take())
}
}

View File

@ -4,17 +4,15 @@
use super::vu_common_ctrl::VhostUserHandle;
use super::{Error, Result, DEFAULT_VIRTIO_FEATURES};
use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::vhost_user::{Inflight, VhostUserEpollHandler};
use crate::vhost_user::VhostUserCommon;
use crate::{
ActivateError, ActivateResult, Queue, UserspaceMapping, VirtioCommon, VirtioDevice,
VirtioDeviceType, VirtioInterrupt, VirtioSharedMemoryList,
};
use crate::{GuestMemoryMmap, GuestRegionMmap, MmapRegion};
use anyhow::anyhow;
use libc::{self, c_void, off64_t, pread64, pwrite64};
use seccomp::{SeccompAction, SeccompFilter};
use std::io;
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::{Arc, Barrier, Mutex};
@ -300,8 +298,8 @@ unsafe impl ByteValued for VirtioFsConfig {}
pub struct Fs {
common: VirtioCommon,
vu_common: VhostUserCommon,
id: String,
vu: Option<Arc<Mutex<VhostUserHandle>>>,
config: VirtioFsConfig,
// Hold ownership of the memory that is allocated for the device
// which will be automatically dropped when the device is dropped
@ -309,11 +307,7 @@ pub struct Fs {
slave_req_support: bool,
seccomp_action: SeccompAction,
guest_memory: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
acked_protocol_features: u64,
socket_path: String,
epoll_thread: Option<thread::JoinHandle<()>>,
vu_num_queues: usize,
migration_started: bool,
}
impl Fs {
@ -339,7 +333,6 @@ impl Fs {
// enough to handle all the potential queues. VirtioPciDevice::new()
// will create the actual queues based on this information.
return Ok(Fs {
id,
common: VirtioCommon {
device_type: VirtioDeviceType::Fs as u32,
queue_sizes: vec![queue_size; num_queues],
@ -347,17 +340,18 @@ impl Fs {
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
vu: None,
vu_common: VhostUserCommon {
socket_path: path.to_string(),
vu_num_queues: num_queues,
..Default::default()
},
id,
config: VirtioFsConfig::default(),
cache,
slave_req_support,
seccomp_action,
guest_memory: None,
acked_protocol_features: 0,
socket_path: path.to_string(),
epoll_thread: None,
vu_num_queues: num_queues,
migration_started: false,
});
}
@ -420,18 +414,20 @@ impl Fs {
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
vu_common: VhostUserCommon {
vu: Some(Arc::new(Mutex::new(vu))),
acked_protocol_features,
socket_path: path.to_string(),
vu_num_queues: num_queues,
..Default::default()
},
id,
vu: Some(Arc::new(Mutex::new(vu))),
config,
cache,
slave_req_support,
seccomp_action,
guest_memory: None,
acked_protocol_features,
socket_path: path.to_string(),
epoll_thread: None,
vu_num_queues: num_queues,
migration_started: false,
})
}
@ -440,8 +436,8 @@ impl Fs {
avail_features: self.common.avail_features,
acked_features: self.common.acked_features,
config: self.config,
acked_protocol_features: self.acked_protocol_features,
vu_num_queues: self.vu_num_queues,
acked_protocol_features: self.vu_common.acked_protocol_features,
vu_num_queues: self.vu_common.vu_num_queues,
slave_req_support: self.slave_req_support,
}
}
@ -450,32 +446,19 @@ impl Fs {
self.common.avail_features = state.avail_features;
self.common.acked_features = state.acked_features;
self.config = state.config;
self.acked_protocol_features = state.acked_protocol_features;
self.vu_num_queues = state.vu_num_queues;
self.vu_common.acked_protocol_features = state.acked_protocol_features;
self.vu_common.vu_num_queues = state.vu_num_queues;
self.slave_req_support = state.slave_req_support;
let mut vu = match VhostUserHandle::connect_vhost_user(
false,
&self.socket_path,
self.vu_num_queues as u64,
false,
) {
Ok(r) => r,
Err(e) => {
error!("Failed connecting vhost-user backend: {:?}", e);
return;
}
};
if let Err(e) = vu.set_protocol_features_vhost_user(
self.common.acked_features,
self.acked_protocol_features,
) {
error!("Failed setting up vhost-user backend: {:?}", e);
return;
if let Err(e) = self
.vu_common
.restore_backend_connection(self.common.acked_features)
{
error!(
"Failed restoring connection with vhost-user backend: {:?}",
e
);
}
self.vu = Some(Arc::new(Mutex::new(vu)));
}
}
@ -548,50 +531,20 @@ impl VirtioDevice for Fs {
let backend_acked_features = self.common.acked_features
| (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits());
let mut inflight: Option<Inflight> =
if self.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits() != 0
{
Some(Inflight::default())
} else {
None
};
if self.vu.is_none() {
error!("Missing vhost-user handle");
return Err(ActivateError::BadActivate);
}
let vu = self.vu.as_ref().unwrap();
vu.lock()
.unwrap()
.setup_vhost_user(
&mem.memory(),
queues.clone(),
queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(),
&interrupt_cb,
backend_acked_features,
&slave_req_handler,
inflight.as_mut(),
)
.map_err(ActivateError::VhostUserFsSetup)?;
// Run a dedicated thread for handling potential reconnections with
// the backend as well as requests initiated by the backend.
// the backend.
let (kill_evt, pause_evt) = self.common.dup_eventfds();
let mut handler: VhostUserEpollHandler<SlaveReqHandler> = VhostUserEpollHandler {
vu: vu.clone(),
let mut handler = self.vu_common.activate(
mem,
kill_evt,
pause_evt,
queues,
queue_evts,
virtio_interrupt: interrupt_cb,
acked_features: backend_acked_features,
acked_protocol_features: self.acked_protocol_features,
socket_path: self.socket_path.clone(),
server: false,
interrupt_cb,
backend_acked_features,
slave_req_handler,
inflight,
};
kill_evt,
pause_evt,
)?;
let paused = self.common.paused.clone();
let paused_sync = self.common.paused_sync.clone();
@ -625,7 +578,7 @@ impl VirtioDevice for Fs {
self.common.resume().ok()?;
}
if let Some(vu) = &self.vu {
if let Some(vu) = &self.vu_common.vu {
if let Err(e) = vu
.lock()
.unwrap()
@ -648,9 +601,7 @@ impl VirtioDevice for Fs {
}
fn shutdown(&mut self) {
if let Some(vu) = &self.vu {
let _ = unsafe { libc::close(vu.lock().unwrap().socket_handle().as_raw_fd()) };
}
self.vu_common.shutdown()
}
fn get_shm_regions(&self) -> Option<VirtioSharedMemoryList> {
@ -673,24 +624,7 @@ impl VirtioDevice for Fs {
&mut self,
region: &Arc<GuestRegionMmap>,
) -> std::result::Result<(), crate::Error> {
if let Some(vu) = &self.vu {
if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits()
!= 0
{
return vu
.lock()
.unwrap()
.add_memory_region(region)
.map_err(crate::Error::VhostUserAddMemoryRegion);
} else if let Some(guest_memory) = &self.guest_memory {
return vu
.lock()
.unwrap()
.update_mem_table(guest_memory.memory().deref())
.map_err(crate::Error::VhostUserUpdateMemory);
}
}
Ok(())
self.vu_common.add_memory_region(&self.guest_memory, region)
}
fn userspace_mappings(&self) -> Vec<UserspaceMapping> {
@ -711,15 +645,7 @@ impl VirtioDevice for Fs {
impl Pausable for Fs {
fn pause(&mut self) -> result::Result<(), MigratableError> {
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.pause_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Pause(anyhow!("Error pausing vhost-user-fs backend: {:?}", e))
})?;
}
self.vu_common.pause()?;
self.common.pause()
}
@ -730,19 +656,7 @@ impl Pausable for Fs {
epoll_thread.thread().unpark();
}
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.resume_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Resume(anyhow!(
"Error resuming vhost-user-fs backend: {:?}",
e
))
})
} else {
Ok(())
}
self.vu_common.resume()
}
}
@ -752,13 +666,7 @@ impl Snapshottable for Fs {
}
fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
let snapshot = Snapshot::new_from_versioned_state(&self.id(), &self.state())?;
if self.migration_started {
self.shutdown();
}
Ok(snapshot)
self.vu_common.snapshot(&self.id(), &self.state())
}
fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
@ -770,77 +678,19 @@ impl Transportable for Fs {}
impl Migratable for Fs {
fn start_dirty_log(&mut self) -> std::result::Result<(), MigratableError> {
self.migration_started = true;
if let Some(vu) = &self.vu {
if let Some(guest_memory) = &self.guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock()
.unwrap()
.start_dirty_log(last_ram_addr)
.map_err(|e| {
MigratableError::StartDirtyLog(anyhow!(
"Error starting migration for vhost-user-fs backend: {:?}",
e
))
})
} else {
Err(MigratableError::StartDirtyLog(anyhow!(
"Missing guest memory"
)))
}
} else {
Ok(())
}
self.vu_common.start_dirty_log(&self.guest_memory)
}
fn stop_dirty_log(&mut self) -> std::result::Result<(), MigratableError> {
self.migration_started = false;
if let Some(vu) = &self.vu {
vu.lock().unwrap().stop_dirty_log().map_err(|e| {
MigratableError::StopDirtyLog(anyhow!(
"Error stopping migration for vhost-user-fs backend: {:?}",
e
))
})
} else {
Ok(())
}
self.vu_common.stop_dirty_log()
}
fn dirty_log(&mut self) -> std::result::Result<MemoryRangeTable, MigratableError> {
if let Some(vu) = &self.vu {
if let Some(guest_memory) = &self.guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock().unwrap().dirty_log(last_ram_addr).map_err(|e| {
MigratableError::DirtyLog(anyhow!(
"Error retrieving dirty ranges from vhost-user-fs backend: {:?}",
e
))
})
} else {
Err(MigratableError::DirtyLog(anyhow!("Missing guest memory")))
}
} else {
Ok(MemoryRangeTable::default())
}
self.vu_common.dirty_log(&self.guest_memory)
}
fn complete_migration(&mut self) -> std::result::Result<(), MigratableError> {
// Make sure the device thread is killed in order to prevent from
// reconnections to the socket.
if let Some(kill_evt) = self.common.kill_evt.take() {
kill_evt.write(1).map_err(|e| {
MigratableError::CompleteMigration(anyhow!(
"Error killing vhost-user-fs threads: {:?}",
e
))
})?;
}
// Drop the vhost-user handler to avoid further calls to fail because
// the connection with the backend has been closed.
self.vu = None;
Ok(())
self.vu_common
.complete_migration(self.common.kill_evt.take())
}
}

View File

@ -2,19 +2,27 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{
EpollHelper, EpollHelperError, EpollHelperHandler, GuestMemoryMmap, Queue, VirtioInterrupt,
EPOLL_HELPER_EVENT_LAST, VIRTIO_F_IN_ORDER, VIRTIO_F_NOTIFICATION_DATA,
VIRTIO_F_ORDER_PLATFORM, VIRTIO_F_RING_EVENT_IDX, VIRTIO_F_RING_INDIRECT_DESC,
VIRTIO_F_VERSION_1,
ActivateError, EpollHelper, EpollHelperError, EpollHelperHandler, GuestMemoryMmap,
GuestRegionMmap, Queue, VirtioInterrupt, EPOLL_HELPER_EVENT_LAST, VIRTIO_F_IN_ORDER,
VIRTIO_F_NOTIFICATION_DATA, VIRTIO_F_ORDER_PLATFORM, VIRTIO_F_RING_EVENT_IDX,
VIRTIO_F_RING_INDIRECT_DESC, VIRTIO_F_VERSION_1,
};
use anyhow::anyhow;
use std::io;
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::sync::{atomic::AtomicBool, Arc, Barrier, Mutex};
use vhost::vhost_user::message::{VhostUserInflight, VhostUserVirtioFeatures};
use versionize::Versionize;
use vhost::vhost_user::message::{
VhostUserInflight, VhostUserProtocolFeatures, VhostUserVirtioFeatures,
};
use vhost::vhost_user::{MasterReqHandler, VhostUserMasterReqHandler};
use vhost::Error as VhostError;
use vm_memory::{mmap::MmapRegionError, Error as MmapError, GuestAddressSpace, GuestMemoryAtomic};
use vm_memory::{
mmap::MmapRegionError, Address, Error as MmapError, GuestAddressSpace, GuestMemory,
GuestMemoryAtomic,
};
use vm_migration::{protocol::MemoryRangeTable, MigratableError, Snapshot, VersionMapped};
use vm_virtio::Error as VirtioError;
use vmm_sys_util::eventfd::EventFd;
use vu_common_ctrl::VhostUserHandle;
@ -274,3 +282,251 @@ impl<S: VhostUserMasterReqHandler> EpollHelperHandler for VhostUserEpollHandler<
false
}
}
#[derive(Default)]
pub struct VhostUserCommon {
pub vu: Option<Arc<Mutex<VhostUserHandle>>>,
pub acked_protocol_features: u64,
pub socket_path: String,
pub vu_num_queues: usize,
pub migration_started: bool,
pub server: bool,
}
impl VhostUserCommon {
#[allow(clippy::too_many_arguments)]
pub fn activate<T: VhostUserMasterReqHandler>(
&mut self,
mem: GuestMemoryAtomic<GuestMemoryMmap>,
queues: Vec<Queue>,
queue_evts: Vec<EventFd>,
interrupt_cb: Arc<dyn VirtioInterrupt>,
acked_features: u64,
slave_req_handler: Option<MasterReqHandler<T>>,
kill_evt: EventFd,
pause_evt: EventFd,
) -> std::result::Result<VhostUserEpollHandler<T>, ActivateError> {
let mut inflight: Option<Inflight> =
if self.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits() != 0
{
Some(Inflight::default())
} else {
None
};
if self.vu.is_none() {
error!("Missing vhost-user handle");
return Err(ActivateError::BadActivate);
}
let vu = self.vu.as_ref().unwrap();
vu.lock()
.unwrap()
.setup_vhost_user(
&mem.memory(),
queues.clone(),
queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(),
&interrupt_cb,
acked_features,
&slave_req_handler,
inflight.as_mut(),
)
.map_err(ActivateError::VhostUserBlkSetup)?;
Ok(VhostUserEpollHandler {
vu: vu.clone(),
mem,
kill_evt,
pause_evt,
queues,
queue_evts,
virtio_interrupt: interrupt_cb,
acked_features,
acked_protocol_features: self.acked_protocol_features,
socket_path: self.socket_path.clone(),
server: self.server,
slave_req_handler,
inflight,
})
}
pub fn restore_backend_connection(&mut self, acked_features: u64) -> Result<()> {
let mut vu = VhostUserHandle::connect_vhost_user(
self.server,
&self.socket_path,
self.vu_num_queues as u64,
false,
)?;
vu.set_protocol_features_vhost_user(acked_features, self.acked_protocol_features)?;
self.vu = Some(Arc::new(Mutex::new(vu)));
Ok(())
}
pub fn shutdown(&mut self) {
if let Some(vu) = &self.vu {
let _ = unsafe { libc::close(vu.lock().unwrap().socket_handle().as_raw_fd()) };
}
// Remove socket path if needed
if self.server {
let _ = std::fs::remove_file(&self.socket_path);
}
}
pub fn add_memory_region(
&mut self,
guest_memory: &Option<GuestMemoryAtomic<GuestMemoryMmap>>,
region: &Arc<GuestRegionMmap>,
) -> std::result::Result<(), crate::Error> {
if let Some(vu) = &self.vu {
if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits()
!= 0
{
return vu
.lock()
.unwrap()
.add_memory_region(region)
.map_err(crate::Error::VhostUserAddMemoryRegion);
} else if let Some(guest_memory) = guest_memory {
return vu
.lock()
.unwrap()
.update_mem_table(guest_memory.memory().deref())
.map_err(crate::Error::VhostUserUpdateMemory);
}
}
Ok(())
}
pub fn pause(&mut self) -> std::result::Result<(), MigratableError> {
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.pause_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Pause(anyhow!("Error pausing vhost-user-blk backend: {:?}", e))
})
} else {
Ok(())
}
}
pub fn resume(&mut self) -> std::result::Result<(), MigratableError> {
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.resume_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Resume(anyhow!(
"Error resuming vhost-user-blk backend: {:?}",
e
))
})
} else {
Ok(())
}
}
pub fn snapshot<T>(
&mut self,
id: &str,
state: &T,
) -> std::result::Result<Snapshot, MigratableError>
where
T: Versionize + VersionMapped,
{
let snapshot = Snapshot::new_from_versioned_state(id, state)?;
if self.migration_started {
self.shutdown();
}
Ok(snapshot)
}
pub fn start_dirty_log(
&mut self,
guest_memory: &Option<GuestMemoryAtomic<GuestMemoryMmap>>,
) -> std::result::Result<(), MigratableError> {
self.migration_started = true;
if let Some(vu) = &self.vu {
if let Some(guest_memory) = guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock()
.unwrap()
.start_dirty_log(last_ram_addr)
.map_err(|e| {
MigratableError::StartDirtyLog(anyhow!(
"Error starting migration for vhost-user backend: {:?}",
e
))
})
} else {
Err(MigratableError::StartDirtyLog(anyhow!(
"Missing guest memory"
)))
}
} else {
Ok(())
}
}
pub fn stop_dirty_log(&mut self) -> std::result::Result<(), MigratableError> {
self.migration_started = false;
if let Some(vu) = &self.vu {
vu.lock().unwrap().stop_dirty_log().map_err(|e| {
MigratableError::StopDirtyLog(anyhow!(
"Error stopping migration for vhost-user backend: {:?}",
e
))
})
} else {
Ok(())
}
}
pub fn dirty_log(
&mut self,
guest_memory: &Option<GuestMemoryAtomic<GuestMemoryMmap>>,
) -> std::result::Result<MemoryRangeTable, MigratableError> {
if let Some(vu) = &self.vu {
if let Some(guest_memory) = guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock().unwrap().dirty_log(last_ram_addr).map_err(|e| {
MigratableError::DirtyLog(anyhow!(
"Error retrieving dirty ranges from vhost-user backend: {:?}",
e
))
})
} else {
Err(MigratableError::DirtyLog(anyhow!("Missing guest memory")))
}
} else {
Ok(MemoryRangeTable::default())
}
}
pub fn complete_migration(
&mut self,
kill_evt: Option<EventFd>,
) -> std::result::Result<(), MigratableError> {
// Make sure the device thread is killed in order to prevent from
// reconnections to the socket.
if let Some(kill_evt) = kill_evt {
kill_evt.write(1).map_err(|e| {
MigratableError::CompleteMigration(anyhow!(
"Error killing vhost-user thread: {:?}",
e
))
})?;
}
// Drop the vhost-user handler to avoid further calls to fail because
// the connection with the backend has been closed.
self.vu = None;
Ok(())
}
}

View File

@ -3,17 +3,15 @@
use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::vhost_user::vu_common_ctrl::{VhostUserConfig, VhostUserHandle};
use crate::vhost_user::{Error, Inflight, Result, VhostUserEpollHandler};
use crate::vhost_user::{Error, Result, VhostUserCommon};
use crate::{
ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue,
VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterrupt, EPOLL_HELPER_EVENT_LAST,
VIRTIO_F_RING_EVENT_IDX, VIRTIO_F_VERSION_1,
};
use crate::{GuestMemoryMmap, GuestRegionMmap};
use anyhow::anyhow;
use net_util::{build_net_config_space, CtrlQueue, MacAddr, VirtioNetConfig};
use seccomp::{SeccompAction, SeccompFilter};
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::AtomicBool;
@ -30,7 +28,7 @@ use virtio_bindings::bindings::virtio_net::{
VIRTIO_NET_F_HOST_ECN, VIRTIO_NET_F_HOST_TSO4, VIRTIO_NET_F_HOST_TSO6, VIRTIO_NET_F_HOST_UFO,
VIRTIO_NET_F_MAC, VIRTIO_NET_F_MRG_RXBUF,
};
use vm_memory::{Address, ByteValued, GuestAddressSpace, GuestMemory, GuestMemoryAtomic};
use vm_memory::{ByteValued, GuestAddressSpace, GuestMemoryAtomic};
use vm_migration::{
protocol::MemoryRangeTable, Migratable, MigratableError, Pausable, Snapshot, Snapshottable,
Transportable, VersionMapped,
@ -107,18 +105,13 @@ impl EpollHelperHandler for NetCtrlEpollHandler {
pub struct Net {
common: VirtioCommon,
vu_common: VhostUserCommon,
id: String,
vu: Option<Arc<Mutex<VhostUserHandle>>>,
config: VirtioNetConfig,
guest_memory: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
acked_protocol_features: u64,
socket_path: String,
server: bool,
ctrl_queue_epoll_thread: Option<thread::JoinHandle<()>>,
epoll_thread: Option<thread::JoinHandle<()>>,
seccomp_action: SeccompAction,
vu_num_queues: usize,
migration_started: bool,
}
impl Net {
@ -139,7 +132,6 @@ impl Net {
// queue (with +1) will guarantee that. VirtioPciDevice::new() will
// create the actual queues based on this information.
return Ok(Net {
id,
common: VirtioCommon {
device_type: VirtioDeviceType::Net as u32,
queue_sizes: vec![vu_cfg.queue_size; num_queues + 1],
@ -147,17 +139,18 @@ impl Net {
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
vu: None,
vu_common: VhostUserCommon {
socket_path: vu_cfg.socket,
vu_num_queues: num_queues,
server,
..Default::default()
},
id,
config: VirtioNetConfig::default(),
guest_memory: None,
acked_protocol_features: 0,
socket_path: vu_cfg.socket,
server,
ctrl_queue_epoll_thread: None,
epoll_thread: None,
seccomp_action,
vu_num_queues: num_queues,
migration_started: false,
});
}
@ -230,17 +223,19 @@ impl Net {
min_queues: DEFAULT_QUEUE_NUMBER as u16,
..Default::default()
},
vu: Some(Arc::new(Mutex::new(vu))),
vu_common: VhostUserCommon {
vu: Some(Arc::new(Mutex::new(vu))),
acked_protocol_features,
socket_path: vu_cfg.socket,
vu_num_queues,
server,
..Default::default()
},
config,
guest_memory: None,
acked_protocol_features,
socket_path: vu_cfg.socket,
server,
ctrl_queue_epoll_thread: None,
epoll_thread: None,
seccomp_action,
vu_num_queues,
migration_started: false,
})
}
@ -249,8 +244,8 @@ impl Net {
avail_features: self.common.avail_features,
acked_features: self.common.acked_features,
config: self.config,
acked_protocol_features: self.acked_protocol_features,
vu_num_queues: self.vu_num_queues,
acked_protocol_features: self.vu_common.acked_protocol_features,
vu_num_queues: self.vu_common.vu_num_queues,
}
}
@ -258,31 +253,18 @@ impl Net {
self.common.avail_features = state.avail_features;
self.common.acked_features = state.acked_features;
self.config = state.config;
self.acked_protocol_features = state.acked_protocol_features;
self.vu_num_queues = state.vu_num_queues;
self.vu_common.acked_protocol_features = state.acked_protocol_features;
self.vu_common.vu_num_queues = state.vu_num_queues;
let mut vu = match VhostUserHandle::connect_vhost_user(
self.server,
&self.socket_path,
self.vu_num_queues as u64,
false,
) {
Ok(r) => r,
Err(e) => {
error!("Failed connecting vhost-user backend: {:?}", e);
return;
}
};
if let Err(e) = vu.set_protocol_features_vhost_user(
self.common.acked_features,
self.acked_protocol_features,
) {
error!("Failed setting up vhost-user backend: {:?}", e);
return;
if let Err(e) = self
.vu_common
.restore_backend_connection(self.common.acked_features)
{
error!(
"Failed restoring connection with vhost-user backend: {:?}",
e
);
}
self.vu = Some(Arc::new(Mutex::new(vu)));
}
}
@ -379,51 +361,20 @@ impl VirtioDevice for Net {
let backend_acked_features = self.common.acked_features & !(1 << VIRTIO_NET_F_MAC)
| (self.common.avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits());
let mut inflight: Option<Inflight> =
if self.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits() != 0
{
Some(Inflight::default())
} else {
None
};
if self.vu.is_none() {
error!("Missing vhost-user handle");
return Err(ActivateError::BadActivate);
}
let vu = self.vu.as_ref().unwrap();
vu.lock()
.unwrap()
.setup_vhost_user(
&mem.memory(),
queues.clone(),
queue_evts.iter().map(|q| q.try_clone().unwrap()).collect(),
&interrupt_cb,
backend_acked_features,
&slave_req_handler,
inflight.as_mut(),
)
.map_err(ActivateError::VhostUserNetSetup)?;
// Run a dedicated thread for handling potential reconnections with
// the backend.
let (kill_evt, pause_evt) = self.common.dup_eventfds();
let mut handler: VhostUserEpollHandler<SlaveReqHandler> = VhostUserEpollHandler {
vu: vu.clone(),
let mut handler = self.vu_common.activate(
mem,
kill_evt,
pause_evt,
queues,
queue_evts,
virtio_interrupt: interrupt_cb,
acked_features: backend_acked_features,
acked_protocol_features: self.acked_protocol_features,
socket_path: self.socket_path.clone(),
server: self.server,
slave_req_handler: None,
inflight,
};
interrupt_cb,
backend_acked_features,
slave_req_handler,
kill_evt,
pause_evt,
)?;
let paused = self.common.paused.clone();
let paused_sync = self.common.paused_sync.clone();
@ -450,7 +401,7 @@ impl VirtioDevice for Net {
self.common.resume().ok()?;
}
if let Some(vu) = &self.vu {
if let Some(vu) = &self.vu_common.vu {
if let Err(e) = vu
.lock()
.unwrap()
@ -473,52 +424,20 @@ impl VirtioDevice for Net {
}
fn shutdown(&mut self) {
if let Some(vu) = &self.vu {
let _ = unsafe { libc::close(vu.lock().unwrap().socket_handle().as_raw_fd()) };
}
// Remove socket path if needed
if self.server {
let _ = std::fs::remove_file(&self.socket_path);
}
self.vu_common.shutdown();
}
fn add_memory_region(
&mut self,
region: &Arc<GuestRegionMmap>,
) -> std::result::Result<(), crate::Error> {
if let Some(vu) = &self.vu {
if self.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits()
!= 0
{
return vu
.lock()
.unwrap()
.add_memory_region(region)
.map_err(crate::Error::VhostUserAddMemoryRegion);
} else if let Some(guest_memory) = &self.guest_memory {
return vu
.lock()
.unwrap()
.update_mem_table(guest_memory.memory().deref())
.map_err(crate::Error::VhostUserUpdateMemory);
}
}
Ok(())
self.vu_common.add_memory_region(&self.guest_memory, region)
}
}
impl Pausable for Net {
fn pause(&mut self) -> result::Result<(), MigratableError> {
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.pause_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Pause(anyhow!("Error pausing vhost-user-net backend: {:?}", e))
})?;
}
self.vu_common.pause()?;
self.common.pause()
}
@ -533,19 +452,7 @@ impl Pausable for Net {
ctrl_queue_epoll_thread.thread().unpark();
}
if let Some(vu) = &self.vu {
vu.lock()
.unwrap()
.resume_vhost_user(self.vu_num_queues)
.map_err(|e| {
MigratableError::Resume(anyhow!(
"Error resuming vhost-user-net backend: {:?}",
e
))
})
} else {
Ok(())
}
self.vu_common.resume()
}
}
@ -555,13 +462,7 @@ impl Snapshottable for Net {
}
fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
let snapshot = Snapshot::new_from_versioned_state(&self.id(), &self.state())?;
if self.migration_started {
self.shutdown();
}
Ok(snapshot)
self.vu_common.snapshot(&self.id(), &self.state())
}
fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
@ -573,77 +474,19 @@ impl Transportable for Net {}
impl Migratable for Net {
fn start_dirty_log(&mut self) -> std::result::Result<(), MigratableError> {
self.migration_started = true;
if let Some(vu) = &self.vu {
if let Some(guest_memory) = &self.guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock()
.unwrap()
.start_dirty_log(last_ram_addr)
.map_err(|e| {
MigratableError::StartDirtyLog(anyhow!(
"Error starting migration for vhost-user-net backend: {:?}",
e
))
})
} else {
Err(MigratableError::StartDirtyLog(anyhow!(
"Missing guest memory"
)))
}
} else {
Ok(())
}
self.vu_common.start_dirty_log(&self.guest_memory)
}
fn stop_dirty_log(&mut self) -> std::result::Result<(), MigratableError> {
self.migration_started = false;
if let Some(vu) = &self.vu {
vu.lock().unwrap().stop_dirty_log().map_err(|e| {
MigratableError::StopDirtyLog(anyhow!(
"Error stopping migration for vhost-user-net backend: {:?}",
e
))
})
} else {
Ok(())
}
self.vu_common.stop_dirty_log()
}
fn dirty_log(&mut self) -> std::result::Result<MemoryRangeTable, MigratableError> {
if let Some(vu) = &self.vu {
if let Some(guest_memory) = &self.guest_memory {
let last_ram_addr = guest_memory.memory().last_addr().raw_value();
vu.lock().unwrap().dirty_log(last_ram_addr).map_err(|e| {
MigratableError::DirtyLog(anyhow!(
"Error retrieving dirty ranges from vhost-user-net backend: {:?}",
e
))
})
} else {
Err(MigratableError::DirtyLog(anyhow!("Missing guest memory")))
}
} else {
Ok(MemoryRangeTable::default())
}
self.vu_common.dirty_log(&self.guest_memory)
}
fn complete_migration(&mut self) -> std::result::Result<(), MigratableError> {
// Make sure the device thread is killed in order to prevent from
// reconnections to the socket.
if let Some(kill_evt) = self.common.kill_evt.take() {
kill_evt.write(1).map_err(|e| {
MigratableError::CompleteMigration(anyhow!(
"Error killing vhost-user-net threads: {:?}",
e
))
})?;
}
// Drop the vhost-user handler to avoid further calls to fail because
// the connection with the backend has been closed.
self.vu = None;
Ok(())
self.vu_common
.complete_migration(self.common.kill_evt.take())
}
}