cloud-hypervisor/vhost_user_backend/src/lib.rs
Sebastien Boeuf 0249e8641a Move Cloud Hypervisor to virtio-queue crate
Relying on the vm-virtio/virtio-queue crate from rust-vmm which has been
copied inside the Cloud Hypervisor tree, the entire codebase is moved to
the new definition of a Queue and other related structures.

The reason for this move is to follow the upstream until we get some
agreement for the patches that we need on top of that to make it
properly work with Cloud Hypervisor.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
2021-10-22 11:38:55 +02:00

961 lines
33 KiB
Rust

// Copyright 2019 Intel Corporation. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Copyright 2019 Alibaba Cloud Computing. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
#[macro_use]
extern crate log;
use std::error;
use std::fs::File;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::result;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use vhost::vhost_user::message::{
VhostUserConfigFlags, VhostUserInflight, VhostUserMemoryRegion, VhostUserProtocolFeatures,
VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags,
VhostUserVringState,
};
use vhost::vhost_user::SlaveReqHandler;
use vhost::vhost_user::{
Error as VhostUserError, Listener, Result as VhostUserResult, SlaveFsCacheReq, SlaveListener,
VhostUserSlaveReqHandlerMut,
};
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_queue::Queue;
use vm_memory::guest_memory::FileOffset;
use vm_memory::GuestAddressSpace;
use vm_memory::{bitmap::AtomicBitmap, GuestAddress, GuestMemoryAtomic, MmapRegion};
use vmm_sys_util::eventfd::EventFd;
pub type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>;
pub type GuestRegionMmap = vm_memory::GuestRegionMmap<AtomicBitmap>;
const MAX_MEM_SLOTS: u64 = 32;
#[derive(Debug)]
/// Errors related to vhost-user daemon.
pub enum Error {
/// Failed to create a new vhost-user handler.
NewVhostUserHandler(VhostUserHandlerError),
/// Failed creating vhost-user slave listener.
CreateSlaveListener(VhostUserError),
/// Failed creating vhost-user slave handler.
CreateSlaveReqHandler(VhostUserError),
/// Failed starting daemon thread.
StartDaemon(io::Error),
/// Failed waiting for daemon thread.
WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>),
/// Failed handling a vhost-user request.
HandleRequest(VhostUserError),
/// Failed to process queue.
ProcessQueue(VringEpollHandlerError),
/// Failed to register listener.
RegisterListener(io::Error),
/// Failed to unregister listener.
UnregisterListener(io::Error),
}
/// Result of vhost-user daemon operations.
pub type Result<T> = result::Result<T, Error>;
/// This trait must be implemented by the caller in order to provide backend
/// specific implementation.
pub trait VhostUserBackend: Send + Sync + 'static {
/// Number of queues.
fn num_queues(&self) -> usize;
/// Depth of each queue.
fn max_queue_size(&self) -> usize;
/// Available virtio features.
fn features(&self) -> u64;
/// Acked virtio features.
fn acked_features(&mut self, _features: u64) {}
/// Virtio protocol features.
fn protocol_features(&self) -> VhostUserProtocolFeatures;
/// Tell the backend if EVENT_IDX has been negotiated.
fn set_event_idx(&mut self, enabled: bool);
/// This function gets called if the backend registered some additional
/// listeners onto specific file descriptors. The library can handle
/// virtqueues on its own, but does not know what to do with events
/// happening on custom listeners.
fn handle_event(
&self,
device_event: u16,
evset: epoll::Events,
vrings: &[Arc<RwLock<Vring>>],
thread_id: usize,
) -> result::Result<bool, io::Error>;
/// Get virtio device configuration.
/// A default implementation is provided as we cannot expect all backends
/// to implement this function.
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
Vec::new()
}
/// Set virtio device configuration.
/// A default implementation is provided as we cannot expect all backends
/// to implement this function.
fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> {
Ok(())
}
/// Provide an exit EventFd
/// When this EventFd is written to the worker thread will exit. An optional id may
/// also be provided, if it not provided then the exit event will be first event id
/// after the last queue
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, Option<u16>)> {
None
}
/// Set slave fd.
/// A default implementation is provided as we cannot expect all backends
/// to implement this function.
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
fn queues_per_thread(&self) -> Vec<u64> {
vec![0xffff_ffff]
}
}
/// This structure is the public API the backend is allowed to interact with
/// in order to run a fully functional vhost-user daemon.
pub struct VhostUserDaemon<S: VhostUserBackend> {
name: String,
handler: Arc<Mutex<VhostUserHandler<S>>>,
main_thread: Option<thread::JoinHandle<Result<()>>>,
}
impl<S: VhostUserBackend> VhostUserDaemon<S> {
/// Create the daemon instance, providing the backend implementation of
/// VhostUserBackend.
/// Under the hood, this will start a dedicated thread responsible for
/// listening onto registered event. Those events can be vring events or
/// custom events from the backend, but they get to be registered later
/// during the sequence.
pub fn new(name: String, backend: Arc<RwLock<S>>) -> Result<Self> {
let handler = Arc::new(Mutex::new(
VhostUserHandler::new(backend).map_err(Error::NewVhostUserHandler)?,
));
Ok(VhostUserDaemon {
name,
handler,
main_thread: None,
})
}
/// Listen to the vhost-user socket and run a dedicated thread handling
/// all requests coming through this socket. This runs in an infinite loop
/// that should be terminating once the other end of the socket (the VMM)
/// disconnects.
pub fn start_server(&mut self, listener: Listener) -> Result<()> {
let mut slave_listener = SlaveListener::new(listener, self.handler.clone())
.map_err(Error::CreateSlaveListener)?;
let mut slave_handler = slave_listener
.accept()
.map_err(Error::CreateSlaveReqHandler)?
.unwrap();
let handle = thread::Builder::new()
.name(self.name.clone())
.spawn(move || loop {
slave_handler
.handle_request()
.map_err(Error::HandleRequest)?;
})
.map_err(Error::StartDaemon)?;
self.main_thread = Some(handle);
Ok(())
}
/// Connect to the vhost-user socket and run a dedicated thread handling
/// all requests coming through this socket. This runs in an infinite loop
/// that should be terminating once the other end of the socket (the VMM)
/// hangs up.
pub fn start_client(&mut self, socket_path: &str) -> Result<()> {
let mut slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone())
.map_err(Error::CreateSlaveReqHandler)?;
let handle = thread::Builder::new()
.name(self.name.clone())
.spawn(move || loop {
slave_handler
.handle_request()
.map_err(Error::HandleRequest)?;
})
.map_err(Error::StartDaemon)?;
self.main_thread = Some(handle);
Ok(())
}
/// Wait for the thread handling the vhost-user socket connection to
/// terminate.
pub fn wait(&mut self) -> Result<()> {
if let Some(handle) = self.main_thread.take() {
match handle.join().map_err(Error::WaitDaemon)? {
Ok(()) => Ok(()),
Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()),
Err(e) => Err(e),
}
} else {
Ok(())
}
}
/// Retrieve the vring worker. This is necessary to perform further
/// actions like registering and unregistering some extra event file
/// descriptors.
pub fn get_vring_workers(&self) -> Vec<Arc<VringWorker>> {
self.handler.lock().unwrap().get_vring_workers()
}
}
struct AddrMapping {
vmm_addr: u64,
size: u64,
gpa_base: u64,
}
pub struct Vring {
queue: Queue<GuestMemoryAtomic<GuestMemoryMmap>>,
kick: Option<EventFd>,
call: Option<EventFd>,
#[allow(dead_code)]
err: Option<EventFd>,
enabled: bool,
}
impl Vring {
fn new(mem: GuestMemoryAtomic<GuestMemoryMmap>, max_queue_size: u16) -> Self {
Vring {
queue: Queue::new(mem, max_queue_size),
kick: None,
call: None,
err: None,
enabled: false,
}
}
pub fn mut_queue(&mut self) -> &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>> {
&mut self.queue
}
pub fn signal_used_queue(&mut self) -> result::Result<(), io::Error> {
if let Some(call) = self.call.as_ref() {
call.write(1)
} else {
Ok(())
}
}
}
#[derive(Debug)]
/// Errors related to vring epoll handler.
pub enum VringEpollHandlerError {
/// Failed to process the queue from the backend.
ProcessQueueBackendProcessing(io::Error),
/// Failed to signal used queue.
SignalUsedQueue(io::Error),
/// Failed to read the event from kick EventFd.
HandleEventReadKick(io::Error),
/// Failed to handle the event from the backend.
HandleEventBackendHandling(io::Error),
}
/// Result of vring epoll handler operations.
type VringEpollHandlerResult<T> = std::result::Result<T, VringEpollHandlerError>;
struct VringEpollHandler<S: VhostUserBackend> {
backend: Arc<RwLock<S>>,
vrings: Vec<Arc<RwLock<Vring>>>,
exit_event_id: Option<u16>,
thread_id: usize,
}
impl<S: VhostUserBackend> VringEpollHandler<S> {
fn handle_event(
&self,
device_event: u16,
evset: epoll::Events,
) -> VringEpollHandlerResult<bool> {
if self.exit_event_id == Some(device_event) {
return Ok(true);
}
let num_queues = self.vrings.len();
if (device_event as usize) < num_queues {
// If the vring is not enabled, it should not be processed.
// But let's not read it (hence lose it) in case it is later enabled.
if !self.vrings[device_event as usize].read().unwrap().enabled {
return Ok(false);
}
if let Some(kick) = &self.vrings[device_event as usize].read().unwrap().kick {
kick.read()
.map_err(VringEpollHandlerError::HandleEventReadKick)?;
}
}
self.backend
.read()
.unwrap()
.handle_event(device_event, evset, &self.vrings, self.thread_id)
.map_err(VringEpollHandlerError::HandleEventBackendHandling)
}
}
#[derive(Debug)]
/// Errors related to vring worker.
enum VringWorkerError {
/// Failed while waiting for events.
EpollWait(io::Error),
/// Failed to handle the event.
HandleEvent(VringEpollHandlerError),
}
/// Result of vring worker operations.
type VringWorkerResult<T> = std::result::Result<T, VringWorkerError>;
pub struct VringWorker {
epoll_file: File,
}
impl AsRawFd for VringWorker {
fn as_raw_fd(&self) -> RawFd {
self.epoll_file.as_raw_fd()
}
}
impl VringWorker {
fn run<S: VhostUserBackend>(&self, handler: VringEpollHandler<S>) -> VringWorkerResult<()> {
const EPOLL_EVENTS_LEN: usize = 100;
let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN];
'epoll: loop {
let num_events = match epoll::wait(self.epoll_file.as_raw_fd(), -1, &mut events[..]) {
Ok(res) => res,
Err(e) => {
if e.kind() == io::ErrorKind::Interrupted {
// It's well defined from the epoll_wait() syscall
// documentation that the epoll loop can be interrupted
// before any of the requested events occurred or the
// timeout expired. In both those cases, epoll_wait()
// returns an error of type EINTR, but this should not
// be considered as a regular error. Instead it is more
// appropriate to retry, by calling into epoll_wait().
continue;
}
return Err(VringWorkerError::EpollWait(e));
}
};
for event in events.iter().take(num_events) {
let evset = match epoll::Events::from_bits(event.events) {
Some(evset) => evset,
None => {
let evbits = event.events;
println!("epoll: ignoring unknown event set: 0x{:x}", evbits);
continue;
}
};
let ev_type = event.data as u16;
if handler
.handle_event(ev_type, evset)
.map_err(VringWorkerError::HandleEvent)?
{
break 'epoll;
}
}
}
Ok(())
}
/// Register a custom event only meaningful to the caller. When this event
/// is later triggered, and because only the caller knows what to do about
/// it, the backend implementation of `handle_event` will be called.
/// This lets entire control to the caller about what needs to be done for
/// this special event, without forcing it to run its own dedicated epoll
/// loop for it.
pub fn register_listener(
&self,
fd: RawFd,
ev_type: epoll::Events,
data: u64,
) -> result::Result<(), io::Error> {
epoll::ctl(
self.epoll_file.as_raw_fd(),
epoll::ControlOptions::EPOLL_CTL_ADD,
fd,
epoll::Event::new(ev_type, data),
)
}
/// Unregister a custom event. If the custom event is triggered after this
/// function has been called, nothing will happen as it will be removed
/// from the list of file descriptors the epoll loop is listening to.
pub fn unregister_listener(
&self,
fd: RawFd,
ev_type: epoll::Events,
data: u64,
) -> result::Result<(), io::Error> {
epoll::ctl(
self.epoll_file.as_raw_fd(),
epoll::ControlOptions::EPOLL_CTL_DEL,
fd,
epoll::Event::new(ev_type, data),
)
}
}
#[derive(Debug)]
/// Errors related to vhost-user handler.
pub enum VhostUserHandlerError {
/// Failed to create epoll file descriptor.
EpollCreateFd(io::Error),
/// Failed to spawn vring worker.
SpawnVringWorker(io::Error),
/// Could not find the mapping from memory regions.
MissingMemoryMapping,
/// Could not register exit event
RegisterExitEvent(io::Error),
}
impl std::fmt::Display for VhostUserHandlerError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
VhostUserHandlerError::EpollCreateFd(e) => write!(f, "failed creating epoll fd: {}", e),
VhostUserHandlerError::SpawnVringWorker(e) => {
write!(f, "failed spawning the vring worker: {}", e)
}
VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"),
VhostUserHandlerError::RegisterExitEvent(e) => {
write!(f, "Failed to register exit event: {}", e)
}
}
}
}
impl error::Error for VhostUserHandlerError {}
/// Result of vhost-user handler operations.
type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
struct VhostUserHandler<S: VhostUserBackend> {
backend: Arc<RwLock<S>>,
workers: Vec<Arc<VringWorker>>,
owned: bool,
acked_features: u64,
num_queues: usize,
max_queue_size: usize,
queues_per_thread: Vec<u64>,
mappings: Vec<AddrMapping>,
guest_memory: GuestMemoryAtomic<GuestMemoryMmap>,
vrings: Vec<Arc<RwLock<Vring>>>,
worker_threads: Vec<thread::JoinHandle<VringWorkerResult<()>>>,
}
impl<S: VhostUserBackend> VhostUserHandler<S> {
fn new(backend: Arc<RwLock<S>>) -> VhostUserHandlerResult<Self> {
let num_queues = backend.read().unwrap().num_queues();
let max_queue_size = backend.read().unwrap().max_queue_size();
let queues_per_thread = backend.read().unwrap().queues_per_thread();
let guest_memory = GuestMemoryAtomic::new(GuestMemoryMmap::new());
let mut vrings: Vec<Arc<RwLock<Vring>>> = Vec::new();
for _ in 0..num_queues {
let vring = Arc::new(RwLock::new(Vring::new(
guest_memory.clone(),
max_queue_size as u16,
)));
vrings.push(vring);
}
let mut workers = Vec::new();
let mut worker_threads = Vec::new();
for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() {
// Create the epoll file descriptor
let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?;
// Use 'File' to enforce closing on 'epoll_fd'
let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
let vring_worker = Arc::new(VringWorker { epoll_file });
let worker = vring_worker.clone();
let exit_event_id = if let Some((exit_event_fd, exit_event_id)) =
backend.read().unwrap().exit_event(thread_id)
{
let exit_event_id = exit_event_id.unwrap_or(num_queues as u16);
worker
.register_listener(
exit_event_fd.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(exit_event_id),
)
.map_err(VhostUserHandlerError::RegisterExitEvent)?;
Some(exit_event_id)
} else {
None
};
let mut thread_vrings: Vec<Arc<RwLock<Vring>>> = Vec::new();
for (index, vring) in vrings.iter().enumerate() {
if (queues_mask >> index) & 1u64 == 1u64 {
thread_vrings.push(vring.clone());
}
}
let vring_handler = VringEpollHandler {
backend: backend.clone(),
vrings: thread_vrings,
exit_event_id,
thread_id,
};
let worker_thread = thread::Builder::new()
.name("vring_worker".to_string())
.spawn(move || vring_worker.run(vring_handler))
.map_err(VhostUserHandlerError::SpawnVringWorker)?;
workers.push(worker);
worker_threads.push(worker_thread);
}
Ok(VhostUserHandler {
backend,
workers,
owned: false,
acked_features: 0,
num_queues,
max_queue_size,
queues_per_thread,
mappings: Vec::new(),
guest_memory,
vrings,
worker_threads,
})
}
fn get_vring_workers(&self) -> Vec<Arc<VringWorker>> {
self.workers.clone()
}
fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
for mapping in self.mappings.iter() {
if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size {
return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base);
}
}
Err(VhostUserHandlerError::MissingMemoryMapping)
}
}
impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
fn set_owner(&mut self) -> VhostUserResult<()> {
if self.owned {
return Err(VhostUserError::InvalidOperation);
}
self.owned = true;
Ok(())
}
fn reset_owner(&mut self) -> VhostUserResult<()> {
self.owned = false;
self.acked_features = 0;
Ok(())
}
fn get_features(&mut self) -> VhostUserResult<u64> {
Ok(self.backend.read().unwrap().features())
}
fn set_features(&mut self, features: u64) -> VhostUserResult<()> {
if (features & !self.backend.read().unwrap().features()) != 0 {
return Err(VhostUserError::InvalidParam);
}
self.acked_features = features;
// If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated,
// the ring is initialized in an enabled state.
// If VHOST_USER_F_PROTOCOL_FEATURES has been negotiated,
// the ring is initialized in a disabled state. Client must not
// pass data to/from the backend until ring is enabled by
// VHOST_USER_SET_VRING_ENABLE with parameter 1, or after it has
// been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0.
let vring_enabled =
self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0;
for vring in self.vrings.iter_mut() {
vring.write().unwrap().enabled = vring_enabled;
}
self.backend
.write()
.unwrap()
.acked_features(self.acked_features);
Ok(())
}
fn get_protocol_features(&mut self) -> VhostUserResult<VhostUserProtocolFeatures> {
Ok(self.backend.read().unwrap().protocol_features())
}
fn set_protocol_features(&mut self, _features: u64) -> VhostUserResult<()> {
// Note: slave that reported VHOST_USER_F_PROTOCOL_FEATURES must
// support this message even before VHOST_USER_SET_FEATURES was
// called.
Ok(())
}
fn set_mem_table(
&mut self,
ctx: &[VhostUserMemoryRegion],
mut files: Vec<File>,
) -> VhostUserResult<()> {
// We need to create tuple of ranges from the list of VhostUserMemoryRegion
// that we get from the caller.
let mut regions: Vec<(GuestAddress, usize, Option<FileOffset>)> = Vec::new();
let mut mappings: Vec<AddrMapping> = Vec::new();
for region in ctx.iter() {
let g_addr = GuestAddress(region.guest_phys_addr);
let len = region.memory_size as usize;
let f_off = FileOffset::new(files.remove(0), region.mmap_offset);
regions.push((g_addr, len, Some(f_off)));
mappings.push(AddrMapping {
vmm_addr: region.user_addr,
size: region.memory_size,
gpa_base: region.guest_phys_addr,
});
}
let mem = GuestMemoryMmap::from_ranges_with_files(regions).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.guest_memory.lock().unwrap().replace(mem);
self.mappings = mappings;
Ok(())
}
fn get_queue_num(&mut self) -> VhostUserResult<u64> {
Ok(self.num_queues as u64)
}
fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> {
if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size {
return Err(VhostUserError::InvalidParam);
}
self.vrings[index as usize]
.write()
.unwrap()
.queue
.state
.size = num as u16;
Ok(())
}
fn set_vring_addr(
&mut self,
index: u32,
_flags: VhostUserVringAddrFlags,
descriptor: u64,
used: u64,
available: u64,
_log: u64,
) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
if !self.mappings.is_empty() {
let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
let used_ring = self.vmm_va_to_gpa(used).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.vrings[index as usize]
.write()
.unwrap()
.queue
.state
.desc_table = GuestAddress(desc_table);
self.vrings[index as usize]
.write()
.unwrap()
.queue
.state
.avail_ring = GuestAddress(avail_ring);
self.vrings[index as usize]
.write()
.unwrap()
.queue
.state
.used_ring = GuestAddress(used_ring);
Ok(())
} else {
Err(VhostUserError::InvalidParam)
}
}
fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> {
self.vrings[index as usize]
.write()
.unwrap()
.queue
.set_next_avail(base as u16);
let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0;
self.vrings[index as usize]
.write()
.unwrap()
.mut_queue()
.set_event_idx(event_idx);
self.backend.write().unwrap().set_event_idx(event_idx);
Ok(())
}
fn get_vring_base(&mut self, index: u32) -> VhostUserResult<VhostUserVringState> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
// Quote from vhost-user specification:
// Client must start ring upon receiving a kick (that is, detecting
// that file descriptor is readable) on the descriptor specified by
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
// VHOST_USER_GET_VRING_BASE.
self.vrings[index as usize]
.write()
.unwrap()
.queue
.state
.ready = false;
if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() {
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.workers[thread_index]
.unregister_listener(
fd.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(evt_idx),
)
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
let next_avail = self.vrings[index as usize]
.read()
.unwrap()
.queue
.next_avail();
Ok(VhostUserVringState::new(index, u32::from(next_avail)))
}
fn set_vring_kick(&mut self, index: u8, fd: Option<File>) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
self.vrings[index as usize].write().unwrap().kick =
fd.map(|x| unsafe { EventFd::from_raw_fd(x.into_raw_fd()) });
// Quote from vhost-user specification:
// Client must start ring upon receiving a kick (that is, detecting
// that file descriptor is readable) on the descriptor specified by
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
// VHOST_USER_GET_VRING_BASE.
self.vrings[index as usize]
.write()
.unwrap()
.queue
.state
.ready = true;
if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() {
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.workers[thread_index]
.register_listener(
fd.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(evt_idx),
)
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
Ok(())
}
fn set_vring_call(&mut self, index: u8, fd: Option<File>) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
self.vrings[index as usize].write().unwrap().call =
fd.map(|x| unsafe { EventFd::from_raw_fd(x.into_raw_fd()) });
Ok(())
}
fn set_vring_err(&mut self, index: u8, fd: Option<File>) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
self.vrings[index as usize].write().unwrap().err =
fd.map(|x| unsafe { EventFd::from_raw_fd(x.into_raw_fd()) });
Ok(())
}
fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> {
// This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES
// has been negotiated.
if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 {
return Err(VhostUserError::InvalidOperation);
} else if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
// Slave must not pass data to/from the backend until ring is
// enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1,
// or after it has been disabled by VHOST_USER_SET_VRING_ENABLE
// with parameter 0.
self.vrings[index as usize].write().unwrap().enabled = enable;
Ok(())
}
fn get_config(
&mut self,
offset: u32,
size: u32,
_flags: VhostUserConfigFlags,
) -> VhostUserResult<Vec<u8>> {
Ok(self.backend.read().unwrap().get_config(offset, size))
}
fn set_config(
&mut self,
offset: u32,
buf: &[u8],
_flags: VhostUserConfigFlags,
) -> VhostUserResult<()> {
self.backend
.write()
.unwrap()
.set_config(offset, buf)
.map_err(VhostUserError::ReqHandlerError)
}
fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) {
self.backend.write().unwrap().set_slave_req_fd(vu_req);
}
fn get_max_mem_slots(&mut self) -> VhostUserResult<u64> {
Ok(MAX_MEM_SLOTS)
}
fn add_mem_region(
&mut self,
region: &VhostUserSingleMemoryRegion,
fd: File,
) -> VhostUserResult<()> {
let mmap_region = MmapRegion::from_file(
FileOffset::new(fd, region.mmap_offset),
region.memory_size as usize,
)
.map_err(|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)))?;
let guest_region = Arc::new(
GuestRegionMmap::new(mmap_region, GuestAddress(region.guest_phys_addr)).map_err(
|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)),
)?,
);
let guest_memory = self
.guest_memory
.memory()
.insert_region(guest_region)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.guest_memory.lock().unwrap().replace(guest_memory);
self.mappings.push(AddrMapping {
vmm_addr: region.user_addr,
size: region.memory_size,
gpa_base: region.guest_phys_addr,
});
Ok(())
}
fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> {
let (guest_memory, _) = self
.guest_memory
.memory()
.remove_region(GuestAddress(region.guest_phys_addr), region.memory_size)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.guest_memory.lock().unwrap().replace(guest_memory);
self.mappings
.retain(|mapping| mapping.gpa_base != region.guest_phys_addr);
Ok(())
}
fn get_inflight_fd(
&mut self,
_: &VhostUserInflight,
) -> VhostUserResult<(VhostUserInflight, File)> {
std::unimplemented!()
}
fn set_inflight_fd(&mut self, _: &VhostUserInflight, _: File) -> VhostUserResult<()> {
std::unimplemented!()
}
}
impl<S: VhostUserBackend> Drop for VhostUserHandler<S> {
fn drop(&mut self) {
for thread in self.worker_threads.drain(..) {
if let Err(e) = thread.join() {
error!("Error in vring worker: {:?}", e);
}
}
}
}