vhost_user: Adapt backends to let handle_event be immutable

Both blk, net and fs backends have been updated to avoid the requirement
of having handle_event(&mut self). This will allow the backend crate to
avoid taking a write lock onto the backend object, which will remove the
potential contention point when multiple threads will be handling
multiqueues.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-04-08 18:27:26 +02:00
parent b1554642e4
commit 8f434df1fb
3 changed files with 157 additions and 61 deletions

View File

@ -15,7 +15,7 @@ use futures::executor::{ThreadPool, ThreadPoolBuilder};
use libc::EFD_NONBLOCK;
use log::*;
use std::num::Wrapping;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::{convert, error, fmt, io, process};
use vhost_rs::vhost_user::message::*;
@ -83,7 +83,7 @@ impl convert::From<Error> for io::Error {
}
}
struct VhostUserFsBackend<F: FileSystem + Send + Sync + 'static> {
struct VhostUserFsThread<F: FileSystem + Send + Sync + 'static> {
mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
kill_evt: EventFd,
server: Arc<Server<F>>,
@ -93,9 +93,9 @@ struct VhostUserFsBackend<F: FileSystem + Send + Sync + 'static> {
pool: ThreadPool,
}
impl<F: FileSystem + Send + Sync + 'static> Clone for VhostUserFsBackend<F> {
impl<F: FileSystem + Send + Sync + 'static> Clone for VhostUserFsThread<F> {
fn clone(&self) -> Self {
VhostUserFsBackend {
VhostUserFsThread {
mem: self.mem.clone(),
kill_evt: self.kill_evt.try_clone().unwrap(),
server: self.server.clone(),
@ -106,9 +106,9 @@ impl<F: FileSystem + Send + Sync + 'static> Clone for VhostUserFsBackend<F> {
}
}
impl<F: FileSystem + Send + Sync + 'static> VhostUserFsBackend<F> {
impl<F: FileSystem + Send + Sync + 'static> VhostUserFsThread<F> {
fn new(fs: F, thread_pool_size: usize) -> Result<Self> {
Ok(VhostUserFsBackend {
Ok(VhostUserFsThread {
mem: None,
kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
server: Arc::new(Server::new(fs)),
@ -176,6 +176,17 @@ impl<F: FileSystem + Send + Sync + 'static> VhostUserFsBackend<F> {
}
}
struct VhostUserFsBackend<F: FileSystem + Send + Sync + 'static> {
thread: Mutex<VhostUserFsThread<F>>,
}
impl<F: FileSystem + Send + Sync + 'static> VhostUserFsBackend<F> {
fn new(fs: F, thread_pool_size: usize) -> Result<Self> {
let thread = Mutex::new(VhostUserFsThread::new(fs, thread_pool_size)?);
Ok(VhostUserFsBackend { thread })
}
}
impl<F: FileSystem + Send + Sync + 'static> VhostUserBackend for VhostUserFsBackend<F> {
fn num_queues(&self) -> usize {
NUM_QUEUES
@ -197,11 +208,11 @@ impl<F: FileSystem + Send + Sync + 'static> VhostUserBackend for VhostUserFsBack
}
fn set_event_idx(&mut self, enabled: bool) {
self.event_idx = enabled;
self.thread.lock().unwrap().event_idx = enabled;
}
fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> {
self.mem = Some(GuestMemoryAtomic::new(mem));
self.thread.lock().unwrap().mem = Some(GuestMemoryAtomic::new(mem));
Ok(())
}
@ -215,7 +226,8 @@ impl<F: FileSystem + Send + Sync + 'static> VhostUserBackend for VhostUserFsBack
return Err(Error::HandleEventNotEpollIn.into());
}
let mem = match &self.mem {
let mut thread = self.thread.lock().unwrap();
let mem = match &thread.mem {
Some(m) => m.memory(),
None => return Err(Error::NoMemoryConfigured.into()),
};
@ -232,7 +244,7 @@ impl<F: FileSystem + Send + Sync + 'static> VhostUserBackend for VhostUserFsBack
_ => return Err(Error::HandleEventUnknownEvent.into()),
};
if self.event_idx {
if thread.event_idx {
// vm-virtio's Queue implementation only checks avail_index
// once, so to properly support EVENT_IDX we need to keep
// calling process_queue() until it stops finding new
@ -242,24 +254,27 @@ impl<F: FileSystem + Send + Sync + 'static> VhostUserBackend for VhostUserFsBack
let mut vring = vring_lock.write().unwrap();
vring.mut_queue().update_avail_event(&mem);
}
if !self.process_queue(vring_lock.clone())? {
if !thread.process_queue(vring_lock.clone())? {
break;
}
}
} else {
// Without EVENT_IDX, a single call is enough.
self.process_queue(vring_lock)?;
thread.process_queue(vring_lock)?;
}
Ok(false)
}
fn exit_event(&self) -> Option<(EventFd, Option<u16>)> {
Some((self.kill_evt.try_clone().unwrap(), Some(KILL_EVENT)))
Some((
self.thread.lock().unwrap().kill_evt.try_clone().unwrap(),
Some(KILL_EVENT),
))
}
fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) {
self.vu_req = Some(vu_req);
self.thread.lock().unwrap().vu_req = Some(vu_req);
}
}
@ -338,7 +353,15 @@ fn main() {
error!("Waiting for daemon failed: {:?}", e);
}
let kill_evt = &fs_backend.read().unwrap().kill_evt;
let kill_evt = fs_backend
.read()
.unwrap()
.thread
.lock()
.unwrap()
.kill_evt
.try_clone()
.unwrap();
if let Err(e) = kill_evt.write(1) {
error!("Error shutting down worker thread: {:?}", e)
}

View File

@ -27,7 +27,7 @@ use std::os::unix::fs::OpenOptionsExt;
use std::path::PathBuf;
use std::process;
use std::slice;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
use std::vec::Vec;
use std::{convert, error, fmt, io};
@ -96,7 +96,7 @@ impl convert::From<Error> for io::Error {
}
}
pub struct VhostUserBlkBackend {
pub struct VhostUserBlkThread {
mem: Option<GuestMemoryMmap>,
vring_worker: Option<Arc<VringWorker>>,
disk_image: Box<dyn DiskFile>,
@ -109,7 +109,7 @@ pub struct VhostUserBlkBackend {
kill_evt: EventFd,
}
impl VhostUserBlkBackend {
impl VhostUserBlkThread {
pub fn new(
image_path: String,
num_queues: usize,
@ -145,7 +145,7 @@ impl VhostUserBlkBackend {
config.num_queues = num_queues as u16;
config.wce = 1;
Ok(VhostUserBlkBackend {
Ok(VhostUserBlkThread {
mem: None,
vring_worker: None,
disk_image: image,
@ -221,9 +221,29 @@ impl VhostUserBlkBackend {
}
}
pub struct VhostUserBlkBackend {
thread: Mutex<VhostUserBlkThread>,
}
impl VhostUserBlkBackend {
pub fn new(
image_path: String,
num_queues: usize,
rdonly: bool,
direct: bool,
poll_queue: bool,
) -> Result<Self> {
let thread = Mutex::new(VhostUserBlkThread::new(
image_path, num_queues, rdonly, direct, poll_queue,
)?);
Ok(VhostUserBlkBackend { thread })
}
}
impl VhostUserBackend for VhostUserBlkBackend {
fn num_queues(&self) -> usize {
self.config.num_queues as usize
self.thread.lock().unwrap().config.num_queues as usize
}
fn max_queue_size(&self) -> usize {
@ -237,7 +257,7 @@ impl VhostUserBackend for VhostUserBlkBackend {
| 1 << VIRTIO_F_VERSION_1
| VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
if self.rdonly {
if self.thread.lock().unwrap().rdonly {
avail_features |= 1 << VIRTIO_BLK_F_RO;
}
avail_features
@ -248,11 +268,11 @@ impl VhostUserBackend for VhostUserBlkBackend {
}
fn set_event_idx(&mut self, enabled: bool) {
self.event_idx = enabled;
self.thread.lock().unwrap().event_idx = enabled;
}
fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> {
self.mem = Some(mem);
self.thread.lock().unwrap().mem = Some(mem);
Ok(())
}
@ -268,16 +288,17 @@ impl VhostUserBackend for VhostUserBlkBackend {
debug!("event received: {:?}", device_event);
let mut thread = self.thread.lock().unwrap();
match device_event {
q if device_event < self.config.num_queues => {
q if device_event < thread.config.num_queues => {
let mut vring = vrings[q as usize].write().unwrap();
if self.poll_queue {
if thread.poll_queue {
// Actively poll the queue until POLL_QUEUE_US has passed
// without seeing a new request.
let mut now = Instant::now();
loop {
if self.process_queue(&mut vring) {
if thread.process_queue(&mut vring) {
now = Instant::now();
} else if now.elapsed().as_micros() > POLL_QUEUE_US {
break;
@ -285,7 +306,7 @@ impl VhostUserBackend for VhostUserBlkBackend {
}
}
if self.event_idx {
if thread.event_idx {
// vm-virtio's Queue implementation only checks avail_index
// once, so to properly support EVENT_IDX we need to keep
// calling process_queue() until it stops finding new
@ -293,14 +314,14 @@ impl VhostUserBackend for VhostUserBlkBackend {
loop {
vring
.mut_queue()
.update_avail_event(self.mem.as_ref().unwrap());
if !self.process_queue(&mut vring) {
.update_avail_event(thread.mem.as_ref().unwrap());
if !thread.process_queue(&mut vring) {
break;
}
}
} else {
// Without EVENT_IDX, a single call is enough.
self.process_queue(&mut vring);
thread.process_queue(&mut vring);
}
Ok(false)
@ -313,7 +334,7 @@ impl VhostUserBackend for VhostUserBlkBackend {
// self.config is a statically allocated virtio_blk_config
let buf = unsafe {
slice::from_raw_parts(
&self.config as *const virtio_blk_config as *const _,
&self.thread.lock().unwrap().config as *const virtio_blk_config as *const _,
mem::size_of::<virtio_blk_config>(),
)
};
@ -322,7 +343,10 @@ impl VhostUserBackend for VhostUserBlkBackend {
}
fn exit_event(&self) -> Option<(EventFd, Option<u16>)> {
Some((self.kill_evt.try_clone().unwrap(), None))
Some((
self.thread.lock().unwrap().kill_evt.try_clone().unwrap(),
None,
))
}
}
@ -429,6 +453,9 @@ pub fn start_block_backend(backend_command: &str) {
blk_backend
.write()
.unwrap()
.thread
.lock()
.unwrap()
.set_vring_worker(Some(vring_worker));
if let Err(e) = blk_daemon.start() {
@ -443,7 +470,15 @@ pub fn start_block_backend(backend_command: &str) {
error!("Error from the main thread: {:?}", e);
}
let kill_evt = &blk_backend.write().unwrap().kill_evt;
let kill_evt = blk_backend
.write()
.unwrap()
.thread
.lock()
.unwrap()
.kill_evt
.try_clone()
.unwrap();
if let Err(e) = kill_evt.write(1) {
error!("Error shutting down worker thread: {:?}", e)
}

View File

@ -23,7 +23,7 @@ use std::io::{self};
use std::net::Ipv4Addr;
use std::os::unix::io::AsRawFd;
use std::process;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::vec::Vec;
use vhost_rs::vhost_user::message::*;
use vhost_rs::vhost_user::Error as VhostUserError;
@ -92,7 +92,7 @@ impl std::convert::From<Error> for std::io::Error {
}
}
pub struct VhostUserNetBackend {
struct VhostUserNetThread {
mem: Option<GuestMemoryMmap>,
vring_worker: Option<Arc<VringWorker>>,
kill_evt: EventFd,
@ -101,12 +101,11 @@ pub struct VhostUserNetBackend {
txs: Vec<TxVirtio>,
rx_tap_listenings: Vec<bool>,
num_queues: usize,
queue_size: u16,
}
impl VhostUserNetBackend {
impl VhostUserNetThread {
/// Create a new virtio network device with the given TAP interface.
pub fn new_with_tap(taps: Vec<Tap>, num_queues: usize, queue_size: u16) -> Result<Self> {
fn new_with_tap(taps: Vec<Tap>, num_queues: usize) -> Result<Self> {
let mut taps_v: Vec<(Tap, usize)> = Vec::new();
for (i, tap) in taps.iter().enumerate() {
taps_v.push((tap.clone(), num_queues + i));
@ -124,7 +123,7 @@ impl VhostUserNetBackend {
rx_tap_listenings.push(false);
}
Ok(VhostUserNetBackend {
Ok(VhostUserNetThread {
mem: None,
vring_worker: None,
kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
@ -133,23 +132,21 @@ impl VhostUserNetBackend {
txs,
rx_tap_listenings,
num_queues,
queue_size,
})
}
/// Create a new virtio network device with the given IP address and
/// netmask.
pub fn new(
fn new(
ip_addr: Ipv4Addr,
netmask: Ipv4Addr,
num_queues: usize,
queue_size: u16,
ifname: Option<&str>,
) -> Result<Self> {
let taps = open_tap(ifname, Some(ip_addr), Some(netmask), num_queues / 2)
.map_err(Error::OpenTap)?;
Self::new_with_tap(taps, num_queues, queue_size)
Self::new_with_tap(taps, num_queues)
}
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
@ -252,6 +249,32 @@ impl VhostUserNetBackend {
}
}
pub struct VhostUserNetBackend {
thread: Mutex<VhostUserNetThread>,
num_queues: usize,
queue_size: u16,
}
impl VhostUserNetBackend {
fn new(
ip_addr: Ipv4Addr,
netmask: Ipv4Addr,
num_queues: usize,
queue_size: u16,
ifname: Option<&str>,
) -> Result<Self> {
let thread = Mutex::new(VhostUserNetThread::new(
ip_addr, netmask, num_queues, ifname,
)?);
Ok(VhostUserNetBackend {
thread,
num_queues,
queue_size,
})
}
}
impl VhostUserBackend for VhostUserNetBackend {
fn num_queues(&self) -> usize {
self.num_queues
@ -279,7 +302,7 @@ impl VhostUserBackend for VhostUserNetBackend {
fn set_event_idx(&mut self, _enabled: bool) {}
fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> {
self.mem = Some(mem);
self.thread.lock().unwrap().mem = Some(mem);
Ok(())
}
@ -296,42 +319,43 @@ impl VhostUserBackend for VhostUserNetBackend {
let tap_start_index = self.num_queues as u16;
let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16;
let mut thread = self.thread.lock().unwrap();
match device_event {
x if ((x < self.num_queues as u16) && (x % 2 == 0)) => {
let index = (x / 2) as usize;
let mut vring = vrings[x as usize].write().unwrap();
self.resume_rx(&mut vring, index)?;
thread.resume_rx(&mut vring, index)?;
if !self.rx_tap_listenings[index] {
self.vring_worker.as_ref().unwrap().register_listener(
self.taps[index].0.as_raw_fd(),
if !thread.rx_tap_listenings[index] {
thread.vring_worker.as_ref().unwrap().register_listener(
thread.taps[index].0.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::try_from(self.taps[index].1).unwrap(),
u64::try_from(thread.taps[index].1).unwrap(),
)?;
self.rx_tap_listenings[index] = true;
thread.rx_tap_listenings[index] = true;
}
}
x if ((x < self.num_queues as u16) && (x % 2 != 0)) => {
x if ((x < thread.num_queues as u16) && (x % 2 != 0)) => {
let index = ((x - 1) / 2) as usize;
let mut vring = vrings[x as usize].write().unwrap();
self.process_tx(&mut vring.mut_queue(), index)?;
thread.process_tx(&mut vring.mut_queue(), index)?;
}
x if x >= tap_start_index && x <= tap_end_index => {
let index = x as usize - self.num_queues;
let mut vring = vrings[2 * index].write().unwrap();
if self.rxs[index].deferred_frame
if thread.rxs[index].deferred_frame
// Process a deferred frame first if available. Don't read from tap again
// until we manage to receive this deferred frame.
{
if self.rx_single_frame(&mut vring.mut_queue(), index)? {
self.rxs[index].deferred_frame = false;
self.process_rx(&mut vring, index)?;
} else if self.rxs[index].deferred_irqs {
self.rxs[index].deferred_irqs = false;
if thread.rx_single_frame(&mut vring.mut_queue(), index)? {
thread.rxs[index].deferred_frame = false;
thread.process_rx(&mut vring, index)?;
} else if thread.rxs[index].deferred_irqs {
thread.rxs[index].deferred_irqs = false;
vring.signal_used_queue()?;
}
} else {
self.process_rx(&mut vring, index)?;
thread.process_rx(&mut vring, index)?;
}
}
_ => return Err(Error::HandleEventUnknownEvent.into()),
@ -343,7 +367,10 @@ impl VhostUserBackend for VhostUserNetBackend {
fn exit_event(&self) -> Option<(EventFd, Option<u16>)> {
let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16;
let kill_index = tap_end_index + 1;
Some((self.kill_evt.try_clone().unwrap(), Some(kill_index)))
Some((
self.thread.lock().unwrap().kill_evt.try_clone().unwrap(),
Some(kill_index),
))
}
}
@ -451,6 +478,9 @@ pub fn start_net_backend(backend_command: &str) {
net_backend
.write()
.unwrap()
.thread
.lock()
.unwrap()
.set_vring_worker(Some(vring_worker));
if let Err(e) = net_daemon.start() {
@ -465,7 +495,15 @@ pub fn start_net_backend(backend_command: &str) {
error!("Error from the main thread: {:?}", e);
}
let kill_evt = &net_backend.write().unwrap().kill_evt;
let kill_evt = net_backend
.write()
.unwrap()
.thread
.lock()
.unwrap()
.kill_evt
.try_clone()
.unwrap();
if let Err(e) = kill_evt.write(1) {
error!("Error shutting down worker thread: {:?}", e)
}