diff --git a/vhost_user_block/src/lib.rs b/vhost_user_block/src/lib.rs index c9b35a3a8..5640d2379 100644 --- a/vhost_user_block/src/lib.rs +++ b/vhost_user_block/src/lib.rs @@ -175,10 +175,11 @@ impl VhostUserBlkThread { } struct VhostUserBlkBackend { - thread: Mutex, + threads: Vec>, config: virtio_blk_config, rdonly: bool, poll_queue: bool, + queues_per_thread: Vec, } impl VhostUserBlkBackend { @@ -219,13 +220,24 @@ impl VhostUserBlkBackend { config.num_queues = num_queues as u16; config.wce = 1; - let thread = Mutex::new(VhostUserBlkThread::new(image, image_id, nsectors)?); + let mut queues_per_thread = Vec::new(); + let mut threads = Vec::new(); + for i in 0..num_queues { + let thread = Mutex::new(VhostUserBlkThread::new( + image.clone(), + image_id.clone(), + nsectors, + )?); + threads.push(thread); + queues_per_thread.push(0b1 << i); + } Ok(VhostUserBlkBackend { - thread, + threads, config, rdonly, poll_queue, + queues_per_thread, }) } } @@ -257,11 +269,15 @@ impl VhostUserBackend for VhostUserBlkBackend { } fn set_event_idx(&mut self, enabled: bool) { - self.thread.lock().unwrap().event_idx = enabled; + for thread in self.threads.iter() { + thread.lock().unwrap().event_idx = enabled; + } } fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> { - self.thread.lock().unwrap().mem = Some(mem); + for thread in self.threads.iter() { + thread.lock().unwrap().mem = Some(mem.clone()); + } Ok(()) } @@ -270,7 +286,7 @@ impl VhostUserBackend for VhostUserBlkBackend { device_event: u16, evset: epoll::Events, vrings: &[Arc>], - _thread_id: usize, + thread_id: usize, ) -> VhostUserBackendResult { if evset != epoll::Events::EPOLLIN { return Err(Error::HandleEventNotEpollIn.into()); @@ -278,10 +294,10 @@ impl VhostUserBackend for VhostUserBlkBackend { debug!("event received: {:?}", device_event); - let mut thread = self.thread.lock().unwrap(); + let mut thread = self.threads[thread_id].lock().unwrap(); match device_event { - q if device_event < self.config.num_queues => { - let mut vring = vrings[q as usize].write().unwrap(); + 0 => { + let mut vring = vrings[0].write().unwrap(); if self.poll_queue { // Actively poll the queue until POLL_QUEUE_US has passed @@ -332,12 +348,22 @@ impl VhostUserBackend for VhostUserBlkBackend { buf.to_vec() } - fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, Option)> { + fn exit_event(&self, thread_index: usize) -> Option<(EventFd, Option)> { + // The exit event is placed after the queue, which is event index 1. Some(( - self.thread.lock().unwrap().kill_evt.try_clone().unwrap(), - None, + self.threads[thread_index] + .lock() + .unwrap() + .kill_evt + .try_clone() + .unwrap(), + Some(1), )) } + + fn queues_per_thread(&self) -> Vec { + self.queues_per_thread.clone() + } } struct VhostUserBlkBackendConfig<'a> { @@ -451,16 +477,9 @@ pub fn start_block_backend(backend_command: &str) { error!("Error from the main thread: {:?}", e); } - let kill_evt = blk_backend - .write() - .unwrap() - .thread - .lock() - .unwrap() - .kill_evt - .try_clone() - .unwrap(); - if let Err(e) = kill_evt.write(1) { - error!("Error shutting down worker thread: {:?}", e) + for thread in blk_backend.read().unwrap().threads.iter() { + if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { + error!("Error shutting down worker thread: {:?}", e) + } } }