vhost_user_backend: Provide the thread ID to handle_event()

By adding a "thread_id" parameter to handle_event(), the backend crate
can now indicate to the backend implementation which thread triggered
the processing of some events.

This is applied to vhost-user-net backend and allows for simplifying a
lot the code since each thread is identical.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-04-09 18:20:38 +02:00
parent cfffb7edb0
commit 1a0a2c0182
4 changed files with 25 additions and 31 deletions

View File

@ -221,6 +221,7 @@ impl<F: FileSystem + Send + Sync + 'static> VhostUserBackend for VhostUserFsBack
device_event: u16,
evset: epoll::Events,
vrings: &[Arc<RwLock<Vring>>],
_thread_id: usize,
) -> VhostUserBackendResult<bool> {
if evset != epoll::Events::EPOLLIN {
return Err(Error::HandleEventNotEpollIn.into());

View File

@ -85,6 +85,7 @@ pub trait VhostUserBackend: Send + Sync + 'static {
device_event: u16,
evset: epoll::Events,
vrings: &[Arc<RwLock<Vring>>],
thread_id: usize,
) -> result::Result<bool, io::Error>;
/// Get virtio device configuration.
@ -287,6 +288,7 @@ struct VringEpollHandler<S: VhostUserBackend> {
backend: Arc<RwLock<S>>,
vrings: Vec<Arc<RwLock<Vring>>>,
exit_event_id: Option<u16>,
thread_id: usize,
}
impl<S: VhostUserBackend> VringEpollHandler<S> {
@ -316,7 +318,7 @@ impl<S: VhostUserBackend> VringEpollHandler<S> {
self.backend
.read()
.unwrap()
.handle_event(device_event, evset, &self.vrings)
.handle_event(device_event, evset, &self.vrings, self.thread_id)
.map_err(VringEpollHandlerError::HandleEventBackendHandling)
}
}
@ -484,7 +486,7 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
let mut workers = Vec::new();
let mut worker_threads = Vec::new();
for (thread_index, queues_mask) in queues_per_thread.iter().enumerate() {
for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() {
// Create the epoll file descriptor
let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?;
@ -492,7 +494,7 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
let worker = vring_worker.clone();
let exit_event_id = if let Some((exit_event_fd, exit_event_id)) =
backend.read().unwrap().exit_event(thread_index)
backend.read().unwrap().exit_event(thread_id)
{
let exit_event_id = exit_event_id.unwrap_or(num_queues as u16);
worker
@ -518,6 +520,7 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
backend: backend.clone(),
vrings: thread_vrings,
exit_event_id,
thread_id,
};
let worker_thread = thread::Builder::new()

View File

@ -281,6 +281,7 @@ impl VhostUserBackend for VhostUserBlkBackend {
device_event: u16,
evset: epoll::Events,
vrings: &[Arc<RwLock<Vring>>],
_thread_id: usize,
) -> VhostUserBackendResult<bool> {
if evset != epoll::Events::EPOLLIN {
return Err(Error::HandleEventNotEpollIn.into());

View File

@ -16,7 +16,6 @@ use epoll;
use libc::{self, EAGAIN, EFD_NONBLOCK};
use log::*;
use net_util::Tap;
use std::convert::TryFrom;
use std::fmt;
use std::io::Read;
use std::io::{self};
@ -96,7 +95,7 @@ struct VhostUserNetThread {
mem: Option<GuestMemoryMmap>,
vring_worker: Option<Arc<VringWorker>>,
kill_evt: EventFd,
tap: (Tap, usize),
tap: Tap,
rx: RxVirtio,
tx: TxVirtio,
rx_tap_listening: bool,
@ -104,12 +103,12 @@ struct VhostUserNetThread {
impl VhostUserNetThread {
/// Create a new virtio network device with the given TAP interface.
fn new(tap: Tap, tap_evt_index: usize) -> Result<Self> {
fn new(tap: Tap) -> Result<Self> {
Ok(VhostUserNetThread {
mem: None,
vring_worker: None,
kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
tap: (tap, tap_evt_index),
tap,
rx: RxVirtio::new(),
tx: TxVirtio::new(),
rx_tap_listening: false,
@ -130,11 +129,7 @@ impl VhostUserNetThread {
self.vring_worker
.as_ref()
.unwrap()
.unregister_listener(
self.tap.0.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::try_from(self.tap.1).unwrap(),
)
.unregister_listener(self.tap.as_raw_fd(), epoll::Events::EPOLLIN, 2)
.unwrap();
self.rx_tap_listening = false;
}
@ -202,14 +197,13 @@ impl VhostUserNetThread {
fn process_tx(&mut self, mut queue: &mut Queue) -> Result<()> {
let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?;
self.tx
.process_desc_chain(&mem, &mut self.tap.0, &mut queue);
self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue);
Ok(())
}
fn read_tap(&mut self) -> io::Result<usize> {
self.tap.0.read(&mut self.rx.frame_buf)
self.tap.read(&mut self.rx.frame_buf)
}
pub fn set_vring_worker(&mut self, vring_worker: Option<Arc<VringWorker>>) {
@ -236,7 +230,7 @@ impl VhostUserNetBackend {
let mut threads = Vec::new();
for tap in taps.drain(..) {
let thread = Mutex::new(VhostUserNetThread::new(tap, num_queues)?);
let thread = Mutex::new(VhostUserNetThread::new(tap)?);
threads.push(thread);
}
@ -286,36 +280,31 @@ impl VhostUserBackend for VhostUserNetBackend {
device_event: u16,
evset: epoll::Events,
vrings: &[Arc<RwLock<Vring>>],
thread_id: usize,
) -> VhostUserBackendResult<bool> {
if evset != epoll::Events::EPOLLIN {
return Err(Error::HandleEventNotEpollIn.into());
}
let tap_start_index = self.num_queues as u16;
let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16;
let mut thread = self.threads[0].lock().unwrap();
let mut thread = self.threads[thread_id].lock().unwrap();
match device_event {
x if ((x < self.num_queues as u16) && (x % 2 == 0)) => {
let mut vring = vrings[x as usize].write().unwrap();
thread.resume_rx(&mut vring)?;
x if x == 0 => {
thread.resume_rx(&mut vrings[0].write().unwrap())?;
if !thread.rx_tap_listening {
thread.vring_worker.as_ref().unwrap().register_listener(
thread.tap.0.as_raw_fd(),
thread.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::try_from(thread.tap.1).unwrap(),
2,
)?;
thread.rx_tap_listening = true;
}
}
x if ((x < self.num_queues as u16) && (x % 2 != 0)) => {
let mut vring = vrings[x as usize].write().unwrap();
thread.process_tx(&mut vring.mut_queue())?;
x if x == 1 => {
thread.process_tx(&mut vrings[1].write().unwrap().mut_queue())?;
}
x if x >= tap_start_index && x <= tap_end_index => {
let index = x as usize - self.num_queues;
let mut vring = vrings[2 * index].write().unwrap();
x if x == 2 => {
let mut vring = vrings[0].write().unwrap();
if thread.rx.deferred_frame
// Process a deferred frame first if available. Don't read from tap again
// until we manage to receive this deferred frame.