virtio-devices, net_util: Migrate virtio-net to EpollHelper

EpollHelper allows the removal of much duplicated loop handling code and
instead the device specific even handling is delegated via an
implementation of EpollHelperHandler.

Signed-off-by: Rob Bradford <robert.bradford@intel.com>
This commit is contained in:
Rob Bradford 2020-07-23 17:16:10 +01:00 committed by Sebastien Boeuf
parent d66fa942be
commit 56bfe2700d
4 changed files with 73 additions and 153 deletions

View File

@ -32,10 +32,7 @@ use std::{io, mem, net};
pub use mac::{MacAddr, MAC_ADDR_LEN}; pub use mac::{MacAddr, MAC_ADDR_LEN};
pub use open_tap::{open_tap, Error as OpenTapError}; pub use open_tap::{open_tap, Error as OpenTapError};
pub use queue_pair::{ pub use queue_pair::{NetCounters, NetQueuePair, NetQueuePairError, RxVirtio, TxVirtio};
NetCounters, NetQueuePair, NetQueuePairError, RxVirtio, TxVirtio, RX_QUEUE_EVENT, RX_TAP_EVENT,
TX_QUEUE_EVENT,
};
pub use tap::{Error as TapError, Tap}; pub use tap::{Error as TapError, Tap};
#[derive(Debug)] #[derive(Debug)]

View File

@ -216,16 +216,9 @@ pub struct NetQueuePair {
pub epoll_fd: Option<RawFd>, pub epoll_fd: Option<RawFd>,
pub rx_tap_listening: bool, pub rx_tap_listening: bool,
pub counters: NetCounters, pub counters: NetCounters,
pub tap_event_id: u16,
} }
pub type DeviceEventT = u16;
// The guest has made a buffer available to receive a frame into.
pub const RX_QUEUE_EVENT: DeviceEventT = 0;
// The transmit queue has a frame that is ready to send from the guest.
pub const TX_QUEUE_EVENT: DeviceEventT = 1;
// A frame is available for reading from the tap device to receive in the guest.
pub const RX_TAP_EVENT: DeviceEventT = 2;
impl NetQueuePair { impl NetQueuePair {
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
// if a buffer was used, and false if the frame must be deferred until a buffer // if a buffer was used, and false if the frame must be deferred until a buffer
@ -245,7 +238,7 @@ impl NetQueuePair {
self.epoll_fd.unwrap(), self.epoll_fd.unwrap(),
self.tap.as_raw_fd(), self.tap.as_raw_fd(),
epoll::Events::EPOLLIN, epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT), u64::from(self.tap_event_id),
) )
.map_err(NetQueuePairError::UnregisterListener)?; .map_err(NetQueuePairError::UnregisterListener)?;
self.rx_tap_listening = false; self.rx_tap_listening = false;
@ -314,7 +307,7 @@ impl NetQueuePair {
self.epoll_fd.unwrap(), self.epoll_fd.unwrap(),
self.tap.as_raw_fd(), self.tap.as_raw_fd(),
epoll::Events::EPOLLIN, epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT), u64::from(self.tap_event_id),
) )
.map_err(NetQueuePairError::RegisterListener)?; .map_err(NetQueuePairError::RegisterListener)?;
self.rx_tap_listening = true; self.rx_tap_listening = true;

View File

@ -110,6 +110,7 @@ impl VhostUserNetThread {
rx_tap_listening: false, rx_tap_listening: false,
epoll_fd: None, epoll_fd: None,
counters: NetCounters::default(), counters: NetCounters::default(),
tap_event_id: 2,
}, },
}) })
} }

View File

@ -7,25 +7,23 @@
use super::net_util::{ use super::net_util::{
build_net_config_space, build_net_config_space_with_mq, CtrlVirtio, NetCtrlEpollHandler, build_net_config_space, build_net_config_space_with_mq, CtrlVirtio, NetCtrlEpollHandler,
VirtioNetConfig, KILL_EVENT, NET_EVENTS_COUNT, PAUSE_EVENT, VirtioNetConfig,
}; };
use super::Error as DeviceError; use super::Error as DeviceError;
use super::{ use super::{
ActivateError, ActivateResult, Queue, VirtioDevice, VirtioDeviceType, VirtioInterruptType, ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue,
VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST,
}; };
use crate::VirtioInterrupt; use crate::VirtioInterrupt;
use anyhow::anyhow; use anyhow::anyhow;
use libc::EFD_NONBLOCK; use libc::EFD_NONBLOCK;
use net_util::{ use net_util::{
open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TxVirtio, open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TxVirtio,
RX_QUEUE_EVENT, RX_TAP_EVENT, TX_QUEUE_EVENT,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::num::Wrapping; use std::num::Wrapping;
use std::os::unix::io::{AsRawFd, FromRawFd}; use std::os::unix::io::AsRawFd;
use std::result; use std::result;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -40,6 +38,13 @@ use vm_migration::{
}; };
use vmm_sys_util::eventfd::EventFd; use vmm_sys_util::eventfd::EventFd;
// The guest has made a buffer available to receive a frame into.
pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
// The transmit queue has a frame that is ready to send from the guest.
pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
// A frame is available for reading from the tap device to receive in the guest.
pub const RX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
/// Failed to open taps. /// Failed to open taps.
@ -53,7 +58,8 @@ struct NetEpollHandler {
interrupt_cb: Arc<dyn VirtioInterrupt>, interrupt_cb: Arc<dyn VirtioInterrupt>,
kill_evt: EventFd, kill_evt: EventFd,
pause_evt: EventFd, pause_evt: EventFd,
queue_pair: Vec<Queue>,
queue_evt_pair: Vec<EventFd>,
// Always generate interrupts until the driver has signalled to the device. // Always generate interrupts until the driver has signalled to the device.
// This mitigates a problem with interrupts from tap events being "lost" upon // This mitigates a problem with interrupts from tap events being "lost" upon
// a restore as the vCPU thread isn't ready to handle the interrupt. This causes // a restore as the vCPU thread isn't ready to handle the interrupt. This causes
@ -71,22 +77,19 @@ impl NetEpollHandler {
}) })
} }
fn handle_rx_event( fn handle_rx_event(&mut self) -> result::Result<(), DeviceError> {
&mut self, let queue_evt = &self.queue_evt_pair[0];
mut queue: &mut Queue,
queue_evt: &EventFd,
) -> result::Result<(), DeviceError> {
if let Err(e) = queue_evt.read() { if let Err(e) = queue_evt.read() {
error!("Failed to get rx queue event: {:?}", e); error!("Failed to get rx queue event: {:?}", e);
} }
if self if self
.net .net
.resume_rx(&mut queue) .resume_rx(&mut self.queue_pair[0])
.map_err(DeviceError::NetQueuePair)? .map_err(DeviceError::NetQueuePair)?
|| !self.driver_awake || !self.driver_awake
{ {
self.signal_used_queue(queue)?; self.signal_used_queue(&self.queue_pair[0])?;
info!("Signalling RX queue"); info!("Signalling RX queue");
} else { } else {
info!("Not signalling RX queue"); info!("Not signalling RX queue");
@ -95,21 +98,18 @@ impl NetEpollHandler {
Ok(()) Ok(())
} }
fn handle_tx_event( fn handle_tx_event(&mut self) -> result::Result<(), DeviceError> {
&mut self, let queue_evt = &self.queue_evt_pair[1];
mut queue: &mut Queue,
queue_evt: &EventFd,
) -> result::Result<(), DeviceError> {
if let Err(e) = queue_evt.read() { if let Err(e) = queue_evt.read() {
error!("Failed to get tx queue event: {:?}", e); error!("Failed to get tx queue event: {:?}", e);
} }
if self if self
.net .net
.process_tx(&mut queue) .process_tx(&mut self.queue_pair[1])
.map_err(DeviceError::NetQueuePair)? .map_err(DeviceError::NetQueuePair)?
|| !self.driver_awake || !self.driver_awake
{ {
self.signal_used_queue(queue)?; self.signal_used_queue(&self.queue_pair[1])?;
info!("Signalling TX queue"); info!("Signalling TX queue");
} else { } else {
info!("Not signalling TX queue"); info!("Not signalling TX queue");
@ -117,14 +117,14 @@ impl NetEpollHandler {
Ok(()) Ok(())
} }
fn handle_rx_tap_event(&mut self, queue: &mut Queue) -> result::Result<(), DeviceError> { fn handle_rx_tap_event(&mut self) -> result::Result<(), DeviceError> {
if self if self
.net .net
.process_rx_tap(queue) .process_rx_tap(&mut self.queue_pair[0])
.map_err(DeviceError::NetQueuePair)? .map_err(DeviceError::NetQueuePair)?
|| !self.driver_awake || !self.driver_awake
{ {
self.signal_used_queue(queue)?; self.signal_used_queue(&self.queue_pair[0])?;
info!("Signalling RX queue"); info!("Signalling RX queue");
} else { } else {
info!("Not signalling RX queue"); info!("Not signalling RX queue");
@ -132,134 +132,60 @@ impl NetEpollHandler {
Ok(()) Ok(())
} }
fn run( fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
&mut self, let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
paused: Arc<AtomicBool>, helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?;
mut queues: Vec<Queue>, helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?;
queue_evts: Vec<EventFd>,
) -> result::Result<(), DeviceError> {
// Create the epoll file descriptor
let epoll_fd = epoll::create(true).map_err(DeviceError::EpollCreateFd)?;
// Use 'File' to enforce closing on 'epoll_fd'
let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
self.net.epoll_fd = Some(epoll_fd);
// Add events
epoll::ctl(
epoll_file.as_raw_fd(),
epoll::ControlOptions::EPOLL_CTL_ADD,
queue_evts[0].as_raw_fd(),
epoll::Event::new(epoll::Events::EPOLLIN, u64::from(RX_QUEUE_EVENT)),
)
.map_err(DeviceError::EpollCtl)?;
epoll::ctl(
epoll_file.as_raw_fd(),
epoll::ControlOptions::EPOLL_CTL_ADD,
queue_evts[1].as_raw_fd(),
epoll::Event::new(epoll::Events::EPOLLIN, u64::from(TX_QUEUE_EVENT)),
)
.map_err(DeviceError::EpollCtl)?;
epoll::ctl(
epoll_file.as_raw_fd(),
epoll::ControlOptions::EPOLL_CTL_ADD,
self.kill_evt.as_raw_fd(),
epoll::Event::new(epoll::Events::EPOLLIN, u64::from(KILL_EVENT)),
)
.map_err(DeviceError::EpollCtl)?;
epoll::ctl(
epoll_file.as_raw_fd(),
epoll::ControlOptions::EPOLL_CTL_ADD,
self.pause_evt.as_raw_fd(),
epoll::Event::new(epoll::Events::EPOLLIN, u64::from(PAUSE_EVENT)),
)
.map_err(DeviceError::EpollCtl)?;
// If there are some already available descriptors on the RX queue, // If there are some already available descriptors on the RX queue,
// then we can start the thread while listening onto the TAP. // then we can start the thread while listening onto the TAP.
if queues[0] if self.queue_pair[0]
.available_descriptors(&self.net.mem.as_ref().unwrap().memory()) .available_descriptors(&self.net.mem.as_ref().unwrap().memory())
.unwrap() .unwrap()
{ {
epoll::ctl( helper.add_event(self.net.tap.as_raw_fd(), RX_TAP_EVENT)?;
epoll_file.as_raw_fd(),
epoll::ControlOptions::EPOLL_CTL_ADD,
self.net.tap.as_raw_fd(),
epoll::Event::new(epoll::Events::EPOLLIN, u64::from(RX_TAP_EVENT)),
)
.map_err(DeviceError::EpollCtl)?;
self.net.rx_tap_listening = true; self.net.rx_tap_listening = true;
error!("Listener registered at start"); info!("Listener registered at start");
} }
let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); NET_EVENTS_COUNT]; // The NetQueuePair needs the epoll fd.
self.net.epoll_fd = Some(helper.as_raw_fd());
// Before jumping into the epoll loop, check if the device is expected helper.run(paused, self)?;
// to be in a paused state. This is helpful for the restore code path
// as the device thread should not start processing anything before the
// device has been resumed.
while paused.load(Ordering::SeqCst) {
thread::park();
}
'epoll: loop { Ok(())
let num_events = match epoll::wait(epoll_file.as_raw_fd(), -1, &mut events[..]) { }
Ok(res) => res, }
Err(e) => {
if e.kind() == io::ErrorKind::Interrupted {
// It's well defined from the epoll_wait() syscall
// documentation that the epoll loop can be interrupted
// before any of the requested events occurred or the
// timeout expired. In both those cases, epoll_wait()
// returns an error of type EINTR, but this should not
// be considered as a regular error. Instead it is more
// appropriate to retry, by calling into epoll_wait().
continue;
}
return Err(DeviceError::EpollWait(e));
}
};
for event in events.iter().take(num_events) { impl EpollHelperHandler for NetEpollHandler {
let ev_type = event.data as u16; fn handle_event(&mut self, _helper: &mut EpollHelper, event: u16) -> bool {
match event {
match ev_type { RX_QUEUE_EVENT => {
RX_QUEUE_EVENT => { self.driver_awake = true;
self.driver_awake = true; if let Err(e) = self.handle_rx_event() {
self.handle_rx_event(&mut queues[0], &queue_evts[0])?; error!("Error processing RX queue: {:?}", e);
} return true;
TX_QUEUE_EVENT => {
self.driver_awake = true;
self.handle_tx_event(&mut queues[1], &queue_evts[1])?;
}
RX_TAP_EVENT => {
self.handle_rx_tap_event(&mut queues[0])?;
}
KILL_EVENT => {
debug!("KILL_EVENT received, stopping epoll loop");
break 'epoll;
}
PAUSE_EVENT => {
debug!("PAUSE_EVENT received, pausing virtio-net epoll loop");
// We loop here to handle spurious park() returns.
// Until we have not resumed, the paused boolean will
// be true.
while paused.load(Ordering::SeqCst) {
thread::park();
}
// Drain pause event after the device has been resumed.
// This ensures the pause event has been seen by each
// and every thread related to this virtio device.
let _ = self.pause_evt.read();
}
_ => {
error!("Unknown event for virtio-net");
}
} }
} }
TX_QUEUE_EVENT => {
self.driver_awake = true;
if let Err(e) = self.handle_tx_event() {
error!("Error processing TX queue: {:?}", e);
return true;
}
}
RX_TAP_EVENT => {
if let Err(e) = self.handle_rx_tap_event() {
error!("Error processing tap queue: {:?}", e);
return true;
}
}
_ => {
error!("Unknown event: {}", event);
return true;
}
} }
Ok(()) false
} }
} }
@ -273,7 +199,7 @@ pub struct Net {
config: VirtioNetConfig, config: VirtioNetConfig,
queue_evts: Option<Vec<EventFd>>, queue_evts: Option<Vec<EventFd>>,
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>, interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), DeviceError>>>>, epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
ctrl_queue_epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>, ctrl_queue_epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>,
paused: Arc<AtomicBool>, paused: Arc<AtomicBool>,
queue_size: Vec<u16>, queue_size: Vec<u16>,
@ -515,7 +441,10 @@ impl VirtioDevice for Net {
epoll_fd: None, epoll_fd: None,
rx_tap_listening, rx_tap_listening,
counters: self.counters.clone(), counters: self.counters.clone(),
tap_event_id: RX_TAP_EVENT,
}, },
queue_pair,
queue_evt_pair,
interrupt_cb: interrupt_cb.clone(), interrupt_cb: interrupt_cb.clone(),
kill_evt: kill_evt.try_clone().unwrap(), kill_evt: kill_evt.try_clone().unwrap(),
pause_evt: pause_evt.try_clone().unwrap(), pause_evt: pause_evt.try_clone().unwrap(),
@ -525,7 +454,7 @@ impl VirtioDevice for Net {
let paused = self.paused.clone(); let paused = self.paused.clone();
thread::Builder::new() thread::Builder::new()
.name("virtio_net".to_string()) .name("virtio_net".to_string())
.spawn(move || handler.run(paused, queue_pair, queue_evt_pair)) .spawn(move || handler.run(paused))
.map(|thread| epoll_threads.push(thread)) .map(|thread| epoll_threads.push(thread))
.map_err(|e| { .map_err(|e| {
error!("failed to clone queue EventFd: {}", e); error!("failed to clone queue EventFd: {}", e);