vhost_user_backend: Add the ability to start multiple threads

In order to support multiqueues running on multiple threads for
increasing the IO performances, this commit introduces a new function
queues_per_thread() to the VhostUserBackend trait.

This gives each backend implementation the opportunity to define which
queues belong to which thread.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-04-09 10:52:54 +02:00
parent 40e4dc6339
commit d9eec0de14

View File

@ -113,6 +113,10 @@ pub trait VhostUserBackend: Send + Sync + 'static {
/// A default implementation is provided as we cannot expect all backends
/// to implement this function.
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
fn queues_per_thread(&self) -> Vec<u64> {
vec![0xffff_ffff]
}
}
/// This structure is the public API the backend is allowed to interact with
@ -453,22 +457,24 @@ type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
struct VhostUserHandler<S: VhostUserBackend> {
backend: Arc<RwLock<S>>,
worker: Arc<VringWorker>,
workers: Vec<Arc<VringWorker>>,
owned: bool,
features_acked: bool,
acked_features: u64,
acked_protocol_features: u64,
num_queues: usize,
max_queue_size: usize,
queues_per_thread: Vec<u64>,
memory: Option<Memory>,
vrings: Vec<Arc<RwLock<Vring>>>,
worker_thread: Option<thread::JoinHandle<VringWorkerResult<()>>>,
worker_threads: Vec<thread::JoinHandle<VringWorkerResult<()>>>,
}
impl<S: VhostUserBackend> VhostUserHandler<S> {
fn new(backend: Arc<RwLock<S>>) -> VhostUserHandlerResult<Self> {
let num_queues = backend.read().unwrap().num_queues();
let max_queue_size = backend.read().unwrap().max_queue_size();
let queues_per_thread = backend.read().unwrap().queues_per_thread();
let mut vrings: Vec<Arc<RwLock<Vring>>> = Vec::new();
for _ in 0..num_queues {
@ -476,14 +482,18 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
vrings.push(vring);
}
// Create the epoll file descriptor
let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?;
let mut workers = Vec::new();
let mut worker_threads = Vec::new();
for queues_mask in queues_per_thread.iter() {
// Create the epoll file descriptor
let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?;
let vring_worker = Arc::new(VringWorker { epoll_fd });
let worker = vring_worker.clone();
let vring_worker = Arc::new(VringWorker { epoll_fd });
let worker = vring_worker.clone();
let exit_event_id =
if let Some((exit_event_fd, exit_event_id)) = backend.read().unwrap().exit_event() {
let exit_event_id = if let Some((exit_event_fd, exit_event_id)) =
backend.read().unwrap().exit_event()
{
let exit_event_id = exit_event_id.unwrap_or(num_queues as u16);
worker
.register_listener(
@ -497,36 +507,46 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
None
};
let vring_handler = VringEpollHandler {
backend: backend.clone(),
vrings: vrings.clone(),
exit_event_id,
};
let mut thread_vrings: Vec<Arc<RwLock<Vring>>> = Vec::new();
for (index, vring) in vrings.iter().enumerate() {
if (queues_mask >> index) & 1u64 == 1u64 {
thread_vrings.push(vring.clone());
}
}
let worker_thread = Some(
thread::Builder::new()
let vring_handler = VringEpollHandler {
backend: backend.clone(),
vrings: thread_vrings,
exit_event_id,
};
let worker_thread = thread::Builder::new()
.name("vring_worker".to_string())
.spawn(move || vring_worker.run(vring_handler))
.map_err(VhostUserHandlerError::SpawnVringWorker)?,
);
.map_err(VhostUserHandlerError::SpawnVringWorker)?;
workers.push(worker);
worker_threads.push(worker_thread);
}
Ok(VhostUserHandler {
backend,
worker,
workers,
owned: false,
features_acked: false,
acked_features: 0,
acked_protocol_features: 0,
num_queues,
max_queue_size,
queues_per_thread,
memory: None,
vrings,
worker_thread,
worker_threads,
})
}
fn get_vring_worker(&self) -> Arc<VringWorker> {
self.worker.clone()
self.workers[0].clone()
}
fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
@ -718,9 +738,20 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
// VHOST_USER_GET_VRING_BASE.
self.vrings[index as usize].write().unwrap().queue.ready = false;
if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() {
self.worker
.unregister_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, u64::from(index))
.map_err(VhostUserError::ReqHandlerError)?;
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.workers[thread_index]
.unregister_listener(
fd.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(evt_idx),
)
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
let next_avail = self.vrings[index as usize]
@ -752,9 +783,20 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
// VHOST_USER_GET_VRING_BASE.
self.vrings[index as usize].write().unwrap().queue.ready = true;
if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() {
self.worker
.register_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, u64::from(index))
.map_err(VhostUserError::ReqHandlerError)?;
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.workers[thread_index]
.register_listener(
fd.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(evt_idx),
)
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
Ok(())
@ -837,7 +879,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
impl<S: VhostUserBackend> Drop for VhostUserHandler<S> {
fn drop(&mut self) {
if let Some(thread) = self.worker_thread.take() {
for thread in self.worker_threads.drain(..) {
if let Err(e) = thread.join() {
error!("Error in vring worker: {:?}", e);
}