mirror of
https://github.com/cloud-hypervisor/cloud-hypervisor.git
synced 2025-02-22 03:12:27 +00:00
vhost_user_backend: Fix remaining issues
This commit fixes all the remaining issues that were found as part of the integration with vhost-user-net. It fixes the way to notify that a vring is used, by using the proper EventFd. It removes the process_queue() function from the trait, since the complexity it was introducing was leading to deadlocks with mutexes. It moves the register/unregister functions for registering custom events from the backend, from the VringEpollHandler to the VringWorker. This allows for a lot of simplification and solve a deadlock issue. Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
parent
527dd68ce1
commit
09392f0530
@ -22,7 +22,7 @@ use vhost_rs::vhost_user::{
|
||||
};
|
||||
use vm_memory::guest_memory::FileOffset;
|
||||
use vm_memory::{GuestAddress, GuestMemoryMmap};
|
||||
use vm_virtio::{DescriptorChain, Queue};
|
||||
use vm_virtio::Queue;
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -63,6 +63,9 @@ pub trait VhostUserBackend: Send + Sync + 'static {
|
||||
/// Virtio features.
|
||||
fn features(&self) -> u64;
|
||||
|
||||
/// Update guest memory regions.
|
||||
fn update_memory(&mut self, mem: GuestMemoryMmap) -> result::Result<(), io::Error>;
|
||||
|
||||
/// This function gets called if the backend registered some additional
|
||||
/// listeners onto specific file descriptors. The library can handle
|
||||
/// virtqueues on its own, but does not know what to do with events
|
||||
@ -71,28 +74,20 @@ pub trait VhostUserBackend: Send + Sync + 'static {
|
||||
&mut self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &Vec<Arc<RwLock<Vring>>>,
|
||||
) -> result::Result<bool, io::Error>;
|
||||
|
||||
/// This function is responsible for the actual processing that needs to
|
||||
/// happen when one of the virtqueues is available.
|
||||
fn process_queue(
|
||||
&mut self,
|
||||
q_idx: u16,
|
||||
avail_desc: &DescriptorChain,
|
||||
mem: &GuestMemoryMmap,
|
||||
) -> result::Result<u32, io::Error>;
|
||||
|
||||
/// Get virtio device configuration.
|
||||
/// A default implementation is provided as we cannot expect all backends
|
||||
/// to implement this function.
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Set virtio device configuration.
|
||||
/// A default implementation is provided as we cannot expect all backends
|
||||
/// to implement this function.
|
||||
fn set_config(&mut self, offset: u32, buf: &[u8]) -> result::Result<(), io::Error> {
|
||||
fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -132,7 +127,7 @@ impl<S: VhostUserBackend> VhostUserDaemon<S> {
|
||||
/// disconnects.
|
||||
pub fn start(&mut self) -> Result<()> {
|
||||
let mut slave_listener =
|
||||
SlaveListener::new(self.sock_path.as_str(), false, self.handler.clone())
|
||||
SlaveListener::new(self.sock_path.as_str(), true, self.handler.clone())
|
||||
.map_err(Error::CreateSlaveListener)?;
|
||||
let mut slave_handler = slave_listener
|
||||
.accept()
|
||||
@ -161,11 +156,11 @@ impl<S: VhostUserBackend> VhostUserDaemon<S> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieve the vring handler. This is necessary to perform further
|
||||
/// Retrieve the vring worker. This is necessary to perform further
|
||||
/// actions like registering and unregistering some extra event file
|
||||
/// descriptors, as well as forcing some vring to be processed.
|
||||
pub fn get_vring_handler(&self) -> Arc<RwLock<VringEpollHandler<S>>> {
|
||||
self.handler.lock().unwrap().get_vring_handler()
|
||||
/// descriptors.
|
||||
pub fn get_vring_worker(&self) -> Arc<VringWorker> {
|
||||
self.handler.lock().unwrap().get_vring_worker()
|
||||
}
|
||||
}
|
||||
|
||||
@ -179,7 +174,7 @@ struct Memory {
|
||||
mappings: Vec<AddrMapping>,
|
||||
}
|
||||
|
||||
struct Vring {
|
||||
pub struct Vring {
|
||||
queue: Queue,
|
||||
kick: Option<EventFd>,
|
||||
call: Option<EventFd>,
|
||||
@ -197,6 +192,18 @@ impl Vring {
|
||||
enabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mut_queue(&mut self) -> &mut Queue {
|
||||
&mut self.queue
|
||||
}
|
||||
|
||||
pub fn signal_used_queue(&self) -> result::Result<(), io::Error> {
|
||||
if let Some(call) = self.call.as_ref() {
|
||||
return call.write(1);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -210,143 +217,104 @@ pub enum VringEpollHandlerError {
|
||||
HandleEventReadKick(io::Error),
|
||||
/// Failed to handle the event from the backend.
|
||||
HandleEventBackendHandling(io::Error),
|
||||
/// Failed to register vring listener.
|
||||
RegisterVringListener(io::Error),
|
||||
/// Failed to unregister vring listener.
|
||||
UnregisterVringListener(io::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for VringEpollHandlerError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
VringEpollHandlerError::ProcessQueueBackendProcessing(e) => {
|
||||
write!(f, "failed processing queue from backend: {}", e)
|
||||
}
|
||||
VringEpollHandlerError::SignalUsedQueue(e) => {
|
||||
write!(f, "failed signalling used queue: {}", e)
|
||||
}
|
||||
VringEpollHandlerError::HandleEventReadKick(e) => {
|
||||
write!(f, "failed reading from kick eventfd: {}", e)
|
||||
}
|
||||
VringEpollHandlerError::HandleEventBackendHandling(e) => {
|
||||
write!(f, "failed handling event from backend: {}", e)
|
||||
}
|
||||
VringEpollHandlerError::RegisterVringListener(e) => {
|
||||
write!(f, "failed registering vring listener: {}", e)
|
||||
}
|
||||
VringEpollHandlerError::UnregisterVringListener(e) => {
|
||||
write!(f, "failed unregistering vring listener: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for VringEpollHandlerError {}
|
||||
|
||||
/// Result of vring epoll handler operations.
|
||||
type VringEpollHandlerResult<T> = std::result::Result<T, VringEpollHandlerError>;
|
||||
|
||||
pub struct VringEpollHandler<S: VhostUserBackend> {
|
||||
struct VringEpollHandler<S: VhostUserBackend> {
|
||||
backend: Arc<RwLock<S>>,
|
||||
vrings: Vec<Arc<RwLock<Vring>>>,
|
||||
mem: Option<GuestMemoryMmap>,
|
||||
epoll_fd: RawFd,
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend> VringEpollHandler<S> {
|
||||
fn update_memory(&mut self, mem: Option<GuestMemoryMmap>) {
|
||||
self.mem = mem;
|
||||
}
|
||||
|
||||
/// Trigger the processing of a virtqueue. This function is meant to be
|
||||
/// used by the caller whenever it might need some available queues to
|
||||
/// send data back to the guest.
|
||||
/// A concrete example is a backend registering one extra listener for
|
||||
/// data that needs to be sent to the guest. When the associated event
|
||||
/// is triggered, the backend will be invoked through its `handle_event`
|
||||
/// implementation. And in this case, the way to handle the event is to
|
||||
/// call into `process_queue` to let it invoke the backend implementation
|
||||
/// of `process_queue`. With this twisted trick, all common parts related
|
||||
/// to the virtqueues can remain part of the library.
|
||||
pub fn process_queue(&mut self, q_idx: u16) -> VringEpollHandlerResult<()> {
|
||||
let vring = &mut self.vrings[q_idx as usize].write().unwrap();
|
||||
let mut used_desc_heads = vec![(0, 0); vring.queue.size as usize];
|
||||
let mut used_count = 0;
|
||||
if let Some(mem) = &self.mem {
|
||||
for avail_desc in vring.queue.iter(&mem) {
|
||||
let used_len = self
|
||||
.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.process_queue(q_idx, &avail_desc, &mem)
|
||||
.map_err(VringEpollHandlerError::ProcessQueueBackendProcessing)?;
|
||||
|
||||
used_desc_heads[used_count] = (avail_desc.index, used_len);
|
||||
used_count += 1;
|
||||
}
|
||||
|
||||
for &(desc_index, len) in &used_desc_heads[..used_count] {
|
||||
vring.queue.add_used(&mem, desc_index, len);
|
||||
}
|
||||
}
|
||||
|
||||
if used_count > 0 {
|
||||
if let Some(call) = &vring.call {
|
||||
call.write(1)
|
||||
.map_err(VringEpollHandlerError::SignalUsedQueue)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&mut self,
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
) -> VringEpollHandlerResult<bool> {
|
||||
let num_queues = self.vrings.len();
|
||||
match device_event as usize {
|
||||
x if x < num_queues => {
|
||||
if let Some(kick) = &self.vrings[device_event as usize].read().unwrap().kick {
|
||||
kick.read()
|
||||
.map_err(VringEpollHandlerError::HandleEventReadKick)?;
|
||||
}
|
||||
|
||||
// If the vring is not enabled, it should not be processed.
|
||||
// The event is only read to be discarded.
|
||||
if !self.vrings[device_event as usize].read().unwrap().enabled {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.process_queue(device_event)?;
|
||||
Ok(false)
|
||||
if (device_event as usize) < num_queues {
|
||||
if let Some(kick) = &self.vrings[device_event as usize].read().unwrap().kick {
|
||||
kick.read()
|
||||
.map_err(VringEpollHandlerError::HandleEventReadKick)?;
|
||||
}
|
||||
_ => self
|
||||
.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.handle_event(device_event, evset)
|
||||
.map_err(VringEpollHandlerError::HandleEventBackendHandling),
|
||||
}
|
||||
}
|
||||
|
||||
fn register_vring_listener(&self, q_idx: usize) -> VringEpollHandlerResult<()> {
|
||||
if let Some(fd) = &self.vrings[q_idx].read().unwrap().kick {
|
||||
self.register_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, q_idx as u64)
|
||||
.map_err(VringEpollHandlerError::RegisterVringListener)
|
||||
} else {
|
||||
Ok(())
|
||||
// If the vring is not enabled, it should not be processed.
|
||||
// The event is only read to be discarded.
|
||||
if !self.vrings[device_event as usize].read().unwrap().enabled {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn unregister_vring_listener(&self, q_idx: usize) -> VringEpollHandlerResult<()> {
|
||||
if let Some(fd) = &self.vrings[q_idx].read().unwrap().kick {
|
||||
self.unregister_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, q_idx as u64)
|
||||
.map_err(VringEpollHandlerError::UnregisterVringListener)
|
||||
} else {
|
||||
Ok(())
|
||||
self.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.handle_event(device_event, evset, &self.vrings)
|
||||
.map_err(VringEpollHandlerError::HandleEventBackendHandling)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Errors related to vring worker.
|
||||
enum VringWorkerError {
|
||||
/// Failed while waiting for events.
|
||||
EpollWait(io::Error),
|
||||
}
|
||||
|
||||
/// Result of vring worker operations.
|
||||
type VringWorkerResult<T> = std::result::Result<T, VringWorkerError>;
|
||||
|
||||
pub struct VringWorker {
|
||||
epoll_fd: RawFd,
|
||||
}
|
||||
|
||||
impl VringWorker {
|
||||
fn run<S: VhostUserBackend>(&self, handler: VringEpollHandler<S>) -> VringWorkerResult<()> {
|
||||
const EPOLL_EVENTS_LEN: usize = 100;
|
||||
let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN];
|
||||
|
||||
'epoll: loop {
|
||||
let num_events = match epoll::wait(self.epoll_fd, -1, &mut events[..]) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::Interrupted {
|
||||
// It's well defined from the epoll_wait() syscall
|
||||
// documentation that the epoll loop can be interrupted
|
||||
// before any of the requested events occurred or the
|
||||
// timeout expired. In both those cases, epoll_wait()
|
||||
// returns an error of type EINTR, but this should not
|
||||
// be considered as a regular error. Instead it is more
|
||||
// appropriate to retry, by calling into epoll_wait().
|
||||
continue;
|
||||
}
|
||||
return Err(VringWorkerError::EpollWait(e));
|
||||
}
|
||||
};
|
||||
|
||||
for event in events.iter().take(num_events) {
|
||||
let evset = match epoll::Events::from_bits(event.events) {
|
||||
Some(evset) => evset,
|
||||
None => {
|
||||
let evbits = event.events;
|
||||
println!("epoll: ignoring unknown event set: 0x{:x}", evbits);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let ev_type = event.data as u16;
|
||||
|
||||
if let Err(e) = handler.handle_event(ev_type, evset) {
|
||||
println!(
|
||||
"vring handler handle event {} with error {:?}\n",
|
||||
ev_type, e
|
||||
);
|
||||
break 'epoll;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Register a custom event only meaningful to the caller. When this event
|
||||
@ -387,71 +355,6 @@ impl<S: VhostUserBackend> VringEpollHandler<S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Errors related to vring worker.
|
||||
enum VringWorkerError {
|
||||
/// Failed while waiting for events.
|
||||
EpollWait(io::Error),
|
||||
/// Failed to handle event.
|
||||
HandleEvent(VringEpollHandlerError),
|
||||
}
|
||||
|
||||
/// Result of vring worker operations.
|
||||
type VringWorkerResult<T> = std::result::Result<T, VringWorkerError>;
|
||||
|
||||
struct VringWorker<S: VhostUserBackend> {
|
||||
handler: Arc<RwLock<VringEpollHandler<S>>>,
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend> VringWorker<S> {
|
||||
fn run(&self, epoll_fd: RawFd) -> VringWorkerResult<()> {
|
||||
const EPOLL_EVENTS_LEN: usize = 100;
|
||||
let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN];
|
||||
|
||||
'epoll: loop {
|
||||
let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::Interrupted {
|
||||
// It's well defined from the epoll_wait() syscall
|
||||
// documentation that the epoll loop can be interrupted
|
||||
// before any of the requested events occurred or the
|
||||
// timeout expired. In both those cases, epoll_wait()
|
||||
// returns an error of type EINTR, but this should not
|
||||
// be considered as a regular error. Instead it is more
|
||||
// appropriate to retry, by calling into epoll_wait().
|
||||
continue;
|
||||
}
|
||||
return Err(VringWorkerError::EpollWait(e));
|
||||
}
|
||||
};
|
||||
|
||||
for event in events.iter().take(num_events) {
|
||||
let evset = match epoll::Events::from_bits(event.events) {
|
||||
Some(evset) => evset,
|
||||
None => {
|
||||
let evbits = event.events;
|
||||
println!("epoll: ignoring unknown event set: 0x{:x}", evbits);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let ev_type = event.data as u16;
|
||||
|
||||
if let Err(e) = self.handler.write().unwrap().handle_event(ev_type, evset) {
|
||||
println!(
|
||||
"vring handler handle event {} with error {:?}\n",
|
||||
ev_type, e
|
||||
);
|
||||
break 'epoll;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Errors related to vhost-user handler.
|
||||
pub enum VhostUserHandlerError {
|
||||
@ -482,7 +385,7 @@ type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
|
||||
|
||||
struct VhostUserHandler<S: VhostUserBackend> {
|
||||
backend: Arc<RwLock<S>>,
|
||||
vring_handler: Arc<RwLock<VringEpollHandler<S>>>,
|
||||
worker: Arc<VringWorker>,
|
||||
owned: bool,
|
||||
features_acked: bool,
|
||||
acked_features: u64,
|
||||
@ -498,28 +401,30 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
|
||||
let num_queues = backend.read().unwrap().num_queues();
|
||||
let max_queue_size = backend.read().unwrap().max_queue_size();
|
||||
|
||||
let vrings = vec![Arc::new(RwLock::new(Vring::new(max_queue_size as u16))); num_queues];
|
||||
let mut vrings: Vec<Arc<RwLock<Vring>>> = Vec::new();
|
||||
for _ in 0..num_queues {
|
||||
let vring = Arc::new(RwLock::new(Vring::new(max_queue_size as u16)));
|
||||
vrings.push(vring);
|
||||
}
|
||||
|
||||
// Create the epoll file descriptor
|
||||
let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?;
|
||||
|
||||
let vring_handler = Arc::new(RwLock::new(VringEpollHandler {
|
||||
let vring_handler = VringEpollHandler {
|
||||
backend: backend.clone(),
|
||||
vrings: vrings.clone(),
|
||||
mem: None,
|
||||
epoll_fd,
|
||||
}));
|
||||
let worker = VringWorker {
|
||||
handler: vring_handler.clone(),
|
||||
};
|
||||
let vring_worker = Arc::new(VringWorker { epoll_fd });
|
||||
let worker = vring_worker.clone();
|
||||
|
||||
thread::Builder::new()
|
||||
.name("vring_worker".to_string())
|
||||
.spawn(move || worker.run(epoll_fd))
|
||||
.spawn(move || vring_worker.run(vring_handler))
|
||||
.map_err(VhostUserHandlerError::SpawnVringWorker)?;
|
||||
|
||||
Ok(VhostUserHandler {
|
||||
backend,
|
||||
vring_handler,
|
||||
worker,
|
||||
owned: false,
|
||||
features_acked: false,
|
||||
acked_features: 0,
|
||||
@ -531,8 +436,8 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
|
||||
})
|
||||
}
|
||||
|
||||
fn get_vring_handler(&self) -> Arc<RwLock<VringEpollHandler<S>>> {
|
||||
self.vring_handler.clone()
|
||||
fn get_vring_worker(&self) -> Arc<VringWorker> {
|
||||
self.worker.clone()
|
||||
}
|
||||
|
||||
fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
|
||||
@ -634,7 +539,13 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
|
||||
let mem = GuestMemoryMmap::with_files(regions).map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
self.vring_handler.write().unwrap().update_memory(Some(mem));
|
||||
self.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_memory(mem)
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
self.memory = Some(Memory { mappings });
|
||||
|
||||
Ok(())
|
||||
@ -712,13 +623,11 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
|
||||
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
|
||||
// VHOST_USER_GET_VRING_BASE.
|
||||
self.vrings[index as usize].write().unwrap().queue.ready = false;
|
||||
self.vring_handler
|
||||
.read()
|
||||
.unwrap()
|
||||
.unregister_vring_listener(index as usize)
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() {
|
||||
self.worker
|
||||
.unregister_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, index as u64)
|
||||
.map_err(VhostUserError::ReqHandlerError)?;
|
||||
}
|
||||
|
||||
let next_avail = self.vrings[index as usize]
|
||||
.read()
|
||||
@ -748,13 +657,11 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
|
||||
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
|
||||
// VHOST_USER_GET_VRING_BASE.
|
||||
self.vrings[index as usize].write().unwrap().queue.ready = true;
|
||||
self.vring_handler
|
||||
.read()
|
||||
.unwrap()
|
||||
.register_vring_listener(index as usize)
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() {
|
||||
self.worker
|
||||
.register_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, index as u64)
|
||||
.map_err(VhostUserError::ReqHandlerError)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user