From d9eec0de14a05766be379b66d2919470d0b7c7d3 Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Thu, 9 Apr 2020 10:52:54 +0200 Subject: [PATCH] vhost_user_backend: Add the ability to start multiple threads In order to support multiqueues running on multiple threads for increasing the IO performances, this commit introduces a new function queues_per_thread() to the VhostUserBackend trait. This gives each backend implementation the opportunity to define which queues belong to which thread. Signed-off-by: Sebastien Boeuf --- vhost_user_backend/src/lib.rs | 96 +++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 27 deletions(-) diff --git a/vhost_user_backend/src/lib.rs b/vhost_user_backend/src/lib.rs index 1be1cb1f0..611d9e7d6 100644 --- a/vhost_user_backend/src/lib.rs +++ b/vhost_user_backend/src/lib.rs @@ -113,6 +113,10 @@ pub trait VhostUserBackend: Send + Sync + 'static { /// A default implementation is provided as we cannot expect all backends /// to implement this function. fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + + fn queues_per_thread(&self) -> Vec { + vec![0xffff_ffff] + } } /// This structure is the public API the backend is allowed to interact with @@ -453,22 +457,24 @@ type VhostUserHandlerResult = std::result::Result; struct VhostUserHandler { backend: Arc>, - worker: Arc, + workers: Vec>, owned: bool, features_acked: bool, acked_features: u64, acked_protocol_features: u64, num_queues: usize, max_queue_size: usize, + queues_per_thread: Vec, memory: Option, vrings: Vec>>, - worker_thread: Option>>, + worker_threads: Vec>>, } impl VhostUserHandler { fn new(backend: Arc>) -> VhostUserHandlerResult { let num_queues = backend.read().unwrap().num_queues(); let max_queue_size = backend.read().unwrap().max_queue_size(); + let queues_per_thread = backend.read().unwrap().queues_per_thread(); let mut vrings: Vec>> = Vec::new(); for _ in 0..num_queues { @@ -476,14 +482,18 @@ impl VhostUserHandler { vrings.push(vring); } - // Create the epoll file descriptor - let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?; + let mut workers = Vec::new(); + let mut worker_threads = Vec::new(); + for queues_mask in queues_per_thread.iter() { + // Create the epoll file descriptor + let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?; - let vring_worker = Arc::new(VringWorker { epoll_fd }); - let worker = vring_worker.clone(); + let vring_worker = Arc::new(VringWorker { epoll_fd }); + let worker = vring_worker.clone(); - let exit_event_id = - if let Some((exit_event_fd, exit_event_id)) = backend.read().unwrap().exit_event() { + let exit_event_id = if let Some((exit_event_fd, exit_event_id)) = + backend.read().unwrap().exit_event() + { let exit_event_id = exit_event_id.unwrap_or(num_queues as u16); worker .register_listener( @@ -497,36 +507,46 @@ impl VhostUserHandler { None }; - let vring_handler = VringEpollHandler { - backend: backend.clone(), - vrings: vrings.clone(), - exit_event_id, - }; + let mut thread_vrings: Vec>> = Vec::new(); + for (index, vring) in vrings.iter().enumerate() { + if (queues_mask >> index) & 1u64 == 1u64 { + thread_vrings.push(vring.clone()); + } + } - let worker_thread = Some( - thread::Builder::new() + let vring_handler = VringEpollHandler { + backend: backend.clone(), + vrings: thread_vrings, + exit_event_id, + }; + + let worker_thread = thread::Builder::new() .name("vring_worker".to_string()) .spawn(move || vring_worker.run(vring_handler)) - .map_err(VhostUserHandlerError::SpawnVringWorker)?, - ); + .map_err(VhostUserHandlerError::SpawnVringWorker)?; + + workers.push(worker); + worker_threads.push(worker_thread); + } Ok(VhostUserHandler { backend, - worker, + workers, owned: false, features_acked: false, acked_features: 0, acked_protocol_features: 0, num_queues, max_queue_size, + queues_per_thread, memory: None, vrings, - worker_thread, + worker_threads, }) } fn get_vring_worker(&self) -> Arc { - self.worker.clone() + self.workers[0].clone() } fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult { @@ -718,9 +738,20 @@ impl VhostUserSlaveReqHandler for VhostUserHandler { // VHOST_USER_GET_VRING_BASE. self.vrings[index as usize].write().unwrap().queue.ready = false; if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() { - self.worker - .unregister_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, u64::from(index)) - .map_err(VhostUserError::ReqHandlerError)?; + for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { + let shifted_queues_mask = queues_mask >> index; + if shifted_queues_mask & 1u64 == 1u64 { + let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); + self.workers[thread_index] + .unregister_listener( + fd.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(evt_idx), + ) + .map_err(VhostUserError::ReqHandlerError)?; + break; + } + } } let next_avail = self.vrings[index as usize] @@ -752,9 +783,20 @@ impl VhostUserSlaveReqHandler for VhostUserHandler { // VHOST_USER_GET_VRING_BASE. self.vrings[index as usize].write().unwrap().queue.ready = true; if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() { - self.worker - .register_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, u64::from(index)) - .map_err(VhostUserError::ReqHandlerError)?; + for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { + let shifted_queues_mask = queues_mask >> index; + if shifted_queues_mask & 1u64 == 1u64 { + let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); + self.workers[thread_index] + .register_listener( + fd.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(evt_idx), + ) + .map_err(VhostUserError::ReqHandlerError)?; + break; + } + } } Ok(()) @@ -837,7 +879,7 @@ impl VhostUserSlaveReqHandler for VhostUserHandler { impl Drop for VhostUserHandler { fn drop(&mut self) { - if let Some(thread) = self.worker_thread.take() { + for thread in self.worker_threads.drain(..) { if let Err(e) = thread.join() { error!("Error in vring worker: {:?}", e); }