vhost_user_net: Use NetQueuePair from vm-virtio

The logic for handling the networking queues can now be shared between
the version running in vhost-user-net and vm-virtio.

Signed-off-by: Rob Bradford <robert.bradford@intel.com>
This commit is contained in:
Rob Bradford 2020-05-29 15:30:18 +01:00 committed by Sebastien Boeuf
parent fcc62efc41
commit cc51fdb8a7
2 changed files with 48 additions and 130 deletions

View File

@ -310,6 +310,12 @@ pub struct VringWorker {
epoll_file: File, epoll_file: File,
} }
impl AsRawFd for VringWorker {
fn as_raw_fd(&self) -> RawFd {
self.epoll_file.as_raw_fd()
}
}
impl VringWorker { impl VringWorker {
fn run<S: VhostUserBackend>(&self, handler: VringEpollHandler<S>) -> VringWorkerResult<()> { fn run<S: VhostUserBackend>(&self, handler: VringEpollHandler<S>) -> VringWorkerResult<()> {
const EPOLL_EVENTS_LEN: usize = 100; const EPOLL_EVENTS_LEN: usize = 100;

View File

@ -13,11 +13,10 @@ extern crate vhost_user_backend;
extern crate vm_virtio; extern crate vm_virtio;
extern crate vmm; extern crate vmm;
use libc::{self, EAGAIN, EFD_NONBLOCK}; use libc::{self, EFD_NONBLOCK};
use log::*; use log::*;
use net_util::{MacAddr, Tap}; use net_util::{MacAddr, Tap};
use std::fmt; use std::fmt;
use std::io::Read;
use std::io::{self}; use std::io::{self};
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
@ -28,9 +27,9 @@ use vhost_rs::vhost_user::message::*;
use vhost_rs::vhost_user::{Error as VhostUserError, Listener}; use vhost_rs::vhost_user::{Error as VhostUserError, Listener};
use vhost_user_backend::{VhostUserBackend, VhostUserDaemon, Vring, VringWorker}; use vhost_user_backend::{VhostUserBackend, VhostUserDaemon, Vring, VringWorker};
use virtio_bindings::bindings::virtio_net::*; use virtio_bindings::bindings::virtio_net::*;
use vm_memory::GuestMemoryMmap; use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
use vm_virtio::net_util::{open_tap, RxVirtio, TxVirtio}; use vm_virtio::net_util::{open_tap, RxVirtio, TxVirtio};
use vm_virtio::Queue; use vm_virtio::NetQueuePair;
use vmm::config::{OptionParser, OptionParserError}; use vmm::config::{OptionParser, OptionParserError};
use vmm_sys_util::eventfd::EventFd; use vmm_sys_util::eventfd::EventFd;
@ -55,7 +54,7 @@ pub enum Error {
/// Failed to parse configuration string /// Failed to parse configuration string
FailedConfigParse(OptionParserError), FailedConfigParse(OptionParserError),
/// Failed to signal used queue. /// Failed to signal used queue.
FailedSignalingUsedQueue, FailedSignalingUsedQueue(io::Error),
/// Failed to handle event other than input event. /// Failed to handle event other than input event.
HandleEventNotEpollIn, HandleEventNotEpollIn,
/// Failed to handle unknown event. /// Failed to handle unknown event.
@ -70,6 +69,8 @@ pub enum Error {
OpenTap(vm_virtio::net_util::Error), OpenTap(vm_virtio::net_util::Error),
/// No socket provided /// No socket provided
SocketParameterMissing, SocketParameterMissing,
/// Underlying QueuePair error
NetQueuePair(vm_virtio::Error),
} }
pub const SYNTAX: &str = "vhost-user-net backend parameters \ pub const SYNTAX: &str = "vhost-user-net backend parameters \
@ -91,121 +92,30 @@ impl std::convert::From<Error> for std::io::Error {
} }
struct VhostUserNetThread { struct VhostUserNetThread {
mem: Option<GuestMemoryMmap>, net: NetQueuePair,
vring_worker: Option<Arc<VringWorker>>, vring_worker: Option<Arc<VringWorker>>,
kill_evt: EventFd, kill_evt: EventFd,
tap: Tap,
rx: RxVirtio,
tx: TxVirtio,
rx_tap_listening: bool,
} }
impl VhostUserNetThread { impl VhostUserNetThread {
/// Create a new virtio network device with the given TAP interface. /// Create a new virtio network device with the given TAP interface.
fn new(tap: Tap) -> Result<Self> { fn new(tap: Tap) -> Result<Self> {
Ok(VhostUserNetThread { Ok(VhostUserNetThread {
mem: None,
vring_worker: None, vring_worker: None,
kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
tap, net: NetQueuePair {
rx: RxVirtio::new(), mem: None,
tx: TxVirtio::new(), tap,
rx_tap_listening: false, rx: RxVirtio::new(),
tx: TxVirtio::new(),
rx_tap_listening: false,
epoll_fd: None,
},
}) })
} }
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
// if a buffer was used, and false if the frame must be deferred until a buffer
// is made available by the driver.
fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result<bool> {
let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?;
let next_desc = queue.iter(&mem).next();
if next_desc.is_none() {
// Queue has no available descriptors
if self.rx_tap_listening {
self.vring_worker
.as_ref()
.unwrap()
.unregister_listener(self.tap.as_raw_fd(), epoll::Events::EPOLLIN, 2)
.unwrap();
self.rx_tap_listening = false;
}
return Ok(false);
}
let write_complete = self.rx.process_desc_chain(&mem, next_desc, &mut queue);
Ok(write_complete)
}
fn process_rx(&mut self, vring: &mut Vring) -> Result<()> {
// Read as many frames as possible.
loop {
match self.read_tap() {
Ok(count) => {
self.rx.bytes_read = count;
if !self.rx_single_frame(&mut vring.mut_queue())? {
self.rx.deferred_frame = true;
break;
}
}
Err(e) => {
// The tap device is non-blocking, so any error aside from EAGAIN is
// unexpected.
match e.raw_os_error() {
Some(err) if err == EAGAIN => (),
_ => {
error!("Failed to read tap: {:?}", e);
return Err(Error::FailedReadTap);
}
};
break;
}
}
}
if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
vring.signal_used_queue().unwrap();
Ok(())
} else {
Ok(())
}
}
fn resume_rx(&mut self, vring: &mut Vring) -> Result<()> {
if self.rx.deferred_frame {
if self.rx_single_frame(&mut vring.mut_queue())? {
self.rx.deferred_frame = false;
// process_rx() was interrupted possibly before consuming all
// packets in the tap; try continuing now.
self.process_rx(vring)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
vring.signal_used_queue().unwrap();
Ok(())
} else {
Ok(())
}
} else {
Ok(())
}
}
fn process_tx(&mut self, mut queue: &mut Queue) -> Result<()> {
let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?;
self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue);
Ok(())
}
fn read_tap(&mut self) -> io::Result<usize> {
self.tap.read(&mut self.rx.frame_buf)
}
pub fn set_vring_worker(&mut self, vring_worker: Option<Arc<VringWorker>>) { pub fn set_vring_worker(&mut self, vring_worker: Option<Arc<VringWorker>>) {
self.net.epoll_fd = Some(vring_worker.as_ref().unwrap().as_raw_fd());
self.vring_worker = vring_worker; self.vring_worker = vring_worker;
} }
} }
@ -280,7 +190,7 @@ impl VhostUserBackend for VhostUserNetBackend {
fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> { fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> {
for thread in self.threads.iter() { for thread in self.threads.iter() {
thread.lock().unwrap().mem = Some(mem.clone()); thread.lock().unwrap().net.mem = Some(GuestMemoryAtomic::new(mem.clone()));
} }
Ok(()) Ok(())
} }
@ -299,37 +209,39 @@ impl VhostUserBackend for VhostUserNetBackend {
let mut thread = self.threads[thread_id].lock().unwrap(); let mut thread = self.threads[thread_id].lock().unwrap();
match device_event { match device_event {
0 => { 0 => {
thread.resume_rx(&mut vrings[0].write().unwrap())?; let mut vring = vrings[0].write().unwrap();
if thread
if !thread.rx_tap_listening { .net
thread.vring_worker.as_ref().unwrap().register_listener( .resume_rx(&mut vring.mut_queue())
thread.tap.as_raw_fd(), .map_err(Error::NetQueuePair)?
epoll::Events::EPOLLIN, {
2, vring
)?; .signal_used_queue()
thread.rx_tap_listening = true; .map_err(Error::FailedSignalingUsedQueue)?
} }
} }
1 => { 1 => {
let mut vring = vrings[1].write().unwrap(); let mut vring = vrings[1].write().unwrap();
thread.process_tx(vring.mut_queue())?; if thread
vring.signal_used_queue()?; .net
.process_tx(&mut vring.mut_queue())
.map_err(Error::NetQueuePair)?
{
vring
.signal_used_queue()
.map_err(Error::FailedSignalingUsedQueue)?
}
} }
2 => { 2 => {
let mut vring = vrings[0].write().unwrap(); let mut vring = vrings[0].write().unwrap();
if thread.rx.deferred_frame if thread
// Process a deferred frame first if available. Don't read from tap again .net
// until we manage to receive this deferred frame. .process_rx_tap(&mut vring.mut_queue())
.map_err(Error::NetQueuePair)?
{ {
if thread.rx_single_frame(&mut vring.mut_queue())? { vring
thread.rx.deferred_frame = false; .signal_used_queue()
thread.process_rx(&mut vring)?; .map_err(Error::FailedSignalingUsedQueue)?
} else if thread.rx.deferred_irqs {
thread.rx.deferred_irqs = false;
vring.signal_used_queue()?;
}
} else {
thread.process_rx(&mut vring)?;
} }
} }
_ => return Err(Error::HandleEventUnknownEvent.into()), _ => return Err(Error::HandleEventUnknownEvent.into()),