From 8f434df1fb0f1f127af408fa4ebe27384051e028 Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Wed, 8 Apr 2020 18:27:26 +0200 Subject: [PATCH] vhost_user: Adapt backends to let handle_event be immutable Both blk, net and fs backends have been updated to avoid the requirement of having handle_event(&mut self). This will allow the backend crate to avoid taking a write lock onto the backend object, which will remove the potential contention point when multiple threads will be handling multiqueues. Signed-off-by: Sebastien Boeuf --- src/bin/vhost_user_fs.rs | 53 +++++++++++++++------ vhost_user_block/src/lib.rs | 71 +++++++++++++++++++++------- vhost_user_net/src/lib.rs | 94 ++++++++++++++++++++++++++----------- 3 files changed, 157 insertions(+), 61 deletions(-) diff --git a/src/bin/vhost_user_fs.rs b/src/bin/vhost_user_fs.rs index b738de182..03a279ac2 100644 --- a/src/bin/vhost_user_fs.rs +++ b/src/bin/vhost_user_fs.rs @@ -15,7 +15,7 @@ use futures::executor::{ThreadPool, ThreadPoolBuilder}; use libc::EFD_NONBLOCK; use log::*; use std::num::Wrapping; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::{convert, error, fmt, io, process}; use vhost_rs::vhost_user::message::*; @@ -83,7 +83,7 @@ impl convert::From for io::Error { } } -struct VhostUserFsBackend { +struct VhostUserFsThread { mem: Option>, kill_evt: EventFd, server: Arc>, @@ -93,9 +93,9 @@ struct VhostUserFsBackend { pool: ThreadPool, } -impl Clone for VhostUserFsBackend { +impl Clone for VhostUserFsThread { fn clone(&self) -> Self { - VhostUserFsBackend { + VhostUserFsThread { mem: self.mem.clone(), kill_evt: self.kill_evt.try_clone().unwrap(), server: self.server.clone(), @@ -106,9 +106,9 @@ impl Clone for VhostUserFsBackend { } } -impl VhostUserFsBackend { +impl VhostUserFsThread { fn new(fs: F, thread_pool_size: usize) -> Result { - Ok(VhostUserFsBackend { + Ok(VhostUserFsThread { mem: None, kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, server: Arc::new(Server::new(fs)), @@ -176,6 +176,17 @@ impl VhostUserFsBackend { } } +struct VhostUserFsBackend { + thread: Mutex>, +} + +impl VhostUserFsBackend { + fn new(fs: F, thread_pool_size: usize) -> Result { + let thread = Mutex::new(VhostUserFsThread::new(fs, thread_pool_size)?); + Ok(VhostUserFsBackend { thread }) + } +} + impl VhostUserBackend for VhostUserFsBackend { fn num_queues(&self) -> usize { NUM_QUEUES @@ -197,11 +208,11 @@ impl VhostUserBackend for VhostUserFsBack } fn set_event_idx(&mut self, enabled: bool) { - self.event_idx = enabled; + self.thread.lock().unwrap().event_idx = enabled; } fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> { - self.mem = Some(GuestMemoryAtomic::new(mem)); + self.thread.lock().unwrap().mem = Some(GuestMemoryAtomic::new(mem)); Ok(()) } @@ -215,7 +226,8 @@ impl VhostUserBackend for VhostUserFsBack return Err(Error::HandleEventNotEpollIn.into()); } - let mem = match &self.mem { + let mut thread = self.thread.lock().unwrap(); + let mem = match &thread.mem { Some(m) => m.memory(), None => return Err(Error::NoMemoryConfigured.into()), }; @@ -232,7 +244,7 @@ impl VhostUserBackend for VhostUserFsBack _ => return Err(Error::HandleEventUnknownEvent.into()), }; - if self.event_idx { + if thread.event_idx { // vm-virtio's Queue implementation only checks avail_index // once, so to properly support EVENT_IDX we need to keep // calling process_queue() until it stops finding new @@ -242,24 +254,27 @@ impl VhostUserBackend for VhostUserFsBack let mut vring = vring_lock.write().unwrap(); vring.mut_queue().update_avail_event(&mem); } - if !self.process_queue(vring_lock.clone())? { + if !thread.process_queue(vring_lock.clone())? { break; } } } else { // Without EVENT_IDX, a single call is enough. - self.process_queue(vring_lock)?; + thread.process_queue(vring_lock)?; } Ok(false) } fn exit_event(&self) -> Option<(EventFd, Option)> { - Some((self.kill_evt.try_clone().unwrap(), Some(KILL_EVENT))) + Some(( + self.thread.lock().unwrap().kill_evt.try_clone().unwrap(), + Some(KILL_EVENT), + )) } fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) { - self.vu_req = Some(vu_req); + self.thread.lock().unwrap().vu_req = Some(vu_req); } } @@ -338,7 +353,15 @@ fn main() { error!("Waiting for daemon failed: {:?}", e); } - let kill_evt = &fs_backend.read().unwrap().kill_evt; + let kill_evt = fs_backend + .read() + .unwrap() + .thread + .lock() + .unwrap() + .kill_evt + .try_clone() + .unwrap(); if let Err(e) = kill_evt.write(1) { error!("Error shutting down worker thread: {:?}", e) } diff --git a/vhost_user_block/src/lib.rs b/vhost_user_block/src/lib.rs index 395df9f44..46d364622 100644 --- a/vhost_user_block/src/lib.rs +++ b/vhost_user_block/src/lib.rs @@ -27,7 +27,7 @@ use std::os::unix::fs::OpenOptionsExt; use std::path::PathBuf; use std::process; use std::slice; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; use std::vec::Vec; use std::{convert, error, fmt, io}; @@ -96,7 +96,7 @@ impl convert::From for io::Error { } } -pub struct VhostUserBlkBackend { +pub struct VhostUserBlkThread { mem: Option, vring_worker: Option>, disk_image: Box, @@ -109,7 +109,7 @@ pub struct VhostUserBlkBackend { kill_evt: EventFd, } -impl VhostUserBlkBackend { +impl VhostUserBlkThread { pub fn new( image_path: String, num_queues: usize, @@ -145,7 +145,7 @@ impl VhostUserBlkBackend { config.num_queues = num_queues as u16; config.wce = 1; - Ok(VhostUserBlkBackend { + Ok(VhostUserBlkThread { mem: None, vring_worker: None, disk_image: image, @@ -221,9 +221,29 @@ impl VhostUserBlkBackend { } } +pub struct VhostUserBlkBackend { + thread: Mutex, +} + +impl VhostUserBlkBackend { + pub fn new( + image_path: String, + num_queues: usize, + rdonly: bool, + direct: bool, + poll_queue: bool, + ) -> Result { + let thread = Mutex::new(VhostUserBlkThread::new( + image_path, num_queues, rdonly, direct, poll_queue, + )?); + + Ok(VhostUserBlkBackend { thread }) + } +} + impl VhostUserBackend for VhostUserBlkBackend { fn num_queues(&self) -> usize { - self.config.num_queues as usize + self.thread.lock().unwrap().config.num_queues as usize } fn max_queue_size(&self) -> usize { @@ -237,7 +257,7 @@ impl VhostUserBackend for VhostUserBlkBackend { | 1 << VIRTIO_F_VERSION_1 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); - if self.rdonly { + if self.thread.lock().unwrap().rdonly { avail_features |= 1 << VIRTIO_BLK_F_RO; } avail_features @@ -248,11 +268,11 @@ impl VhostUserBackend for VhostUserBlkBackend { } fn set_event_idx(&mut self, enabled: bool) { - self.event_idx = enabled; + self.thread.lock().unwrap().event_idx = enabled; } fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> { - self.mem = Some(mem); + self.thread.lock().unwrap().mem = Some(mem); Ok(()) } @@ -268,16 +288,17 @@ impl VhostUserBackend for VhostUserBlkBackend { debug!("event received: {:?}", device_event); + let mut thread = self.thread.lock().unwrap(); match device_event { - q if device_event < self.config.num_queues => { + q if device_event < thread.config.num_queues => { let mut vring = vrings[q as usize].write().unwrap(); - if self.poll_queue { + if thread.poll_queue { // Actively poll the queue until POLL_QUEUE_US has passed // without seeing a new request. let mut now = Instant::now(); loop { - if self.process_queue(&mut vring) { + if thread.process_queue(&mut vring) { now = Instant::now(); } else if now.elapsed().as_micros() > POLL_QUEUE_US { break; @@ -285,7 +306,7 @@ impl VhostUserBackend for VhostUserBlkBackend { } } - if self.event_idx { + if thread.event_idx { // vm-virtio's Queue implementation only checks avail_index // once, so to properly support EVENT_IDX we need to keep // calling process_queue() until it stops finding new @@ -293,14 +314,14 @@ impl VhostUserBackend for VhostUserBlkBackend { loop { vring .mut_queue() - .update_avail_event(self.mem.as_ref().unwrap()); - if !self.process_queue(&mut vring) { + .update_avail_event(thread.mem.as_ref().unwrap()); + if !thread.process_queue(&mut vring) { break; } } } else { // Without EVENT_IDX, a single call is enough. - self.process_queue(&mut vring); + thread.process_queue(&mut vring); } Ok(false) @@ -313,7 +334,7 @@ impl VhostUserBackend for VhostUserBlkBackend { // self.config is a statically allocated virtio_blk_config let buf = unsafe { slice::from_raw_parts( - &self.config as *const virtio_blk_config as *const _, + &self.thread.lock().unwrap().config as *const virtio_blk_config as *const _, mem::size_of::(), ) }; @@ -322,7 +343,10 @@ impl VhostUserBackend for VhostUserBlkBackend { } fn exit_event(&self) -> Option<(EventFd, Option)> { - Some((self.kill_evt.try_clone().unwrap(), None)) + Some(( + self.thread.lock().unwrap().kill_evt.try_clone().unwrap(), + None, + )) } } @@ -429,6 +453,9 @@ pub fn start_block_backend(backend_command: &str) { blk_backend .write() .unwrap() + .thread + .lock() + .unwrap() .set_vring_worker(Some(vring_worker)); if let Err(e) = blk_daemon.start() { @@ -443,7 +470,15 @@ pub fn start_block_backend(backend_command: &str) { error!("Error from the main thread: {:?}", e); } - let kill_evt = &blk_backend.write().unwrap().kill_evt; + let kill_evt = blk_backend + .write() + .unwrap() + .thread + .lock() + .unwrap() + .kill_evt + .try_clone() + .unwrap(); if let Err(e) = kill_evt.write(1) { error!("Error shutting down worker thread: {:?}", e) } diff --git a/vhost_user_net/src/lib.rs b/vhost_user_net/src/lib.rs index 59aaec0a0..f3dc74e0c 100644 --- a/vhost_user_net/src/lib.rs +++ b/vhost_user_net/src/lib.rs @@ -23,7 +23,7 @@ use std::io::{self}; use std::net::Ipv4Addr; use std::os::unix::io::AsRawFd; use std::process; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::vec::Vec; use vhost_rs::vhost_user::message::*; use vhost_rs::vhost_user::Error as VhostUserError; @@ -92,7 +92,7 @@ impl std::convert::From for std::io::Error { } } -pub struct VhostUserNetBackend { +struct VhostUserNetThread { mem: Option, vring_worker: Option>, kill_evt: EventFd, @@ -101,12 +101,11 @@ pub struct VhostUserNetBackend { txs: Vec, rx_tap_listenings: Vec, num_queues: usize, - queue_size: u16, } -impl VhostUserNetBackend { +impl VhostUserNetThread { /// Create a new virtio network device with the given TAP interface. - pub fn new_with_tap(taps: Vec, num_queues: usize, queue_size: u16) -> Result { + fn new_with_tap(taps: Vec, num_queues: usize) -> Result { let mut taps_v: Vec<(Tap, usize)> = Vec::new(); for (i, tap) in taps.iter().enumerate() { taps_v.push((tap.clone(), num_queues + i)); @@ -124,7 +123,7 @@ impl VhostUserNetBackend { rx_tap_listenings.push(false); } - Ok(VhostUserNetBackend { + Ok(VhostUserNetThread { mem: None, vring_worker: None, kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, @@ -133,23 +132,21 @@ impl VhostUserNetBackend { txs, rx_tap_listenings, num_queues, - queue_size, }) } /// Create a new virtio network device with the given IP address and /// netmask. - pub fn new( + fn new( ip_addr: Ipv4Addr, netmask: Ipv4Addr, num_queues: usize, - queue_size: u16, ifname: Option<&str>, ) -> Result { let taps = open_tap(ifname, Some(ip_addr), Some(netmask), num_queues / 2) .map_err(Error::OpenTap)?; - Self::new_with_tap(taps, num_queues, queue_size) + Self::new_with_tap(taps, num_queues) } // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true @@ -252,6 +249,32 @@ impl VhostUserNetBackend { } } +pub struct VhostUserNetBackend { + thread: Mutex, + num_queues: usize, + queue_size: u16, +} + +impl VhostUserNetBackend { + fn new( + ip_addr: Ipv4Addr, + netmask: Ipv4Addr, + num_queues: usize, + queue_size: u16, + ifname: Option<&str>, + ) -> Result { + let thread = Mutex::new(VhostUserNetThread::new( + ip_addr, netmask, num_queues, ifname, + )?); + + Ok(VhostUserNetBackend { + thread, + num_queues, + queue_size, + }) + } +} + impl VhostUserBackend for VhostUserNetBackend { fn num_queues(&self) -> usize { self.num_queues @@ -279,7 +302,7 @@ impl VhostUserBackend for VhostUserNetBackend { fn set_event_idx(&mut self, _enabled: bool) {} fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> { - self.mem = Some(mem); + self.thread.lock().unwrap().mem = Some(mem); Ok(()) } @@ -296,42 +319,43 @@ impl VhostUserBackend for VhostUserNetBackend { let tap_start_index = self.num_queues as u16; let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16; + let mut thread = self.thread.lock().unwrap(); match device_event { x if ((x < self.num_queues as u16) && (x % 2 == 0)) => { let index = (x / 2) as usize; let mut vring = vrings[x as usize].write().unwrap(); - self.resume_rx(&mut vring, index)?; + thread.resume_rx(&mut vring, index)?; - if !self.rx_tap_listenings[index] { - self.vring_worker.as_ref().unwrap().register_listener( - self.taps[index].0.as_raw_fd(), + if !thread.rx_tap_listenings[index] { + thread.vring_worker.as_ref().unwrap().register_listener( + thread.taps[index].0.as_raw_fd(), epoll::Events::EPOLLIN, - u64::try_from(self.taps[index].1).unwrap(), + u64::try_from(thread.taps[index].1).unwrap(), )?; - self.rx_tap_listenings[index] = true; + thread.rx_tap_listenings[index] = true; } } - x if ((x < self.num_queues as u16) && (x % 2 != 0)) => { + x if ((x < thread.num_queues as u16) && (x % 2 != 0)) => { let index = ((x - 1) / 2) as usize; let mut vring = vrings[x as usize].write().unwrap(); - self.process_tx(&mut vring.mut_queue(), index)?; + thread.process_tx(&mut vring.mut_queue(), index)?; } x if x >= tap_start_index && x <= tap_end_index => { let index = x as usize - self.num_queues; let mut vring = vrings[2 * index].write().unwrap(); - if self.rxs[index].deferred_frame + if thread.rxs[index].deferred_frame // Process a deferred frame first if available. Don't read from tap again // until we manage to receive this deferred frame. { - if self.rx_single_frame(&mut vring.mut_queue(), index)? { - self.rxs[index].deferred_frame = false; - self.process_rx(&mut vring, index)?; - } else if self.rxs[index].deferred_irqs { - self.rxs[index].deferred_irqs = false; + if thread.rx_single_frame(&mut vring.mut_queue(), index)? { + thread.rxs[index].deferred_frame = false; + thread.process_rx(&mut vring, index)?; + } else if thread.rxs[index].deferred_irqs { + thread.rxs[index].deferred_irqs = false; vring.signal_used_queue()?; } } else { - self.process_rx(&mut vring, index)?; + thread.process_rx(&mut vring, index)?; } } _ => return Err(Error::HandleEventUnknownEvent.into()), @@ -343,7 +367,10 @@ impl VhostUserBackend for VhostUserNetBackend { fn exit_event(&self) -> Option<(EventFd, Option)> { let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16; let kill_index = tap_end_index + 1; - Some((self.kill_evt.try_clone().unwrap(), Some(kill_index))) + Some(( + self.thread.lock().unwrap().kill_evt.try_clone().unwrap(), + Some(kill_index), + )) } } @@ -451,6 +478,9 @@ pub fn start_net_backend(backend_command: &str) { net_backend .write() .unwrap() + .thread + .lock() + .unwrap() .set_vring_worker(Some(vring_worker)); if let Err(e) = net_daemon.start() { @@ -465,7 +495,15 @@ pub fn start_net_backend(backend_command: &str) { error!("Error from the main thread: {:?}", e); } - let kill_evt = &net_backend.write().unwrap().kill_evt; + let kill_evt = net_backend + .write() + .unwrap() + .thread + .lock() + .unwrap() + .kill_evt + .try_clone() + .unwrap(); if let Err(e) = kill_evt.write(1) { error!("Error shutting down worker thread: {:?}", e) }