net_util, vhost_user_net, virtio-devices: Move NetQueuePair

Move NetQueuePair and the related NetCounters into the net_util crate.
This means that the vhost_user_net crate now no longer depends on
virtio-devices and so does not depend on the pci, qcow or other similar
crates. This significantly simplifies the build chain for this backend.

Signed-off-by: Rob Bradford <robert.bradford@intel.com>
This commit is contained in:
Rob Bradford 2020-07-07 16:50:13 +01:00
parent 1237784a8f
commit 17766fcea4
8 changed files with 238 additions and 208 deletions

1
Cargo.lock generated
View File

@ -1486,7 +1486,6 @@ dependencies = [
"vhost",
"vhost_user_backend",
"virtio-bindings",
"virtio-devices",
"vm-memory",
"vmm-sys-util",
]

View File

@ -32,7 +32,10 @@ use std::{io, mem, net};
pub use mac::{MacAddr, MAC_ADDR_LEN};
pub use open_tap::{open_tap, Error as OpenTapError};
pub use queue_pair::{RxVirtio, TxVirtio};
pub use queue_pair::{
NetCounters, NetQueuePair, NetQueuePairError, RxVirtio, TxVirtio, RX_QUEUE_EVENT, RX_TAP_EVENT,
TX_QUEUE_EVENT,
};
pub use tap::{Error as TapError, Tap};
#[derive(Debug)]

View File

@ -2,11 +2,16 @@
//
// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
use super::{vnet_hdr_len, Tap};
use super::{register_listener, unregister_listener, vnet_hdr_len, Tap};
use libc::EAGAIN;
use std::cmp;
use std::io::Write;
use std::io;
use std::io::{Read, Write};
use std::num::Wrapping;
use vm_memory::{Bytes, GuestAddress, GuestMemoryMmap};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use vm_memory::{Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vm_virtio::{DescriptorChain, Queue};
/// The maximum buffer size when segmentation offload is enabled. This
@ -182,3 +187,201 @@ impl RxVirtio {
}
}
}
#[derive(Default, Clone)]
pub struct NetCounters {
pub tx_bytes: Arc<AtomicU64>,
pub tx_frames: Arc<AtomicU64>,
pub rx_bytes: Arc<AtomicU64>,
pub rx_frames: Arc<AtomicU64>,
}
#[derive(Debug)]
pub enum NetQueuePairError {
/// No memory configured
NoMemoryConfigured,
/// Error registering listener
RegisterListener(io::Error),
/// Error unregistering listener
UnregisterListener(io::Error),
/// Error reading from the TAP device
FailedReadTap,
}
pub struct NetQueuePair {
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
pub tap: Tap,
pub rx: RxVirtio,
pub tx: TxVirtio,
pub epoll_fd: Option<RawFd>,
pub rx_tap_listening: bool,
pub counters: NetCounters,
}
pub type DeviceEventT = u16;
// The guest has made a buffer available to receive a frame into.
pub const RX_QUEUE_EVENT: DeviceEventT = 0;
// The transmit queue has a frame that is ready to send from the guest.
pub const TX_QUEUE_EVENT: DeviceEventT = 1;
// A frame is available for reading from the tap device to receive in the guest.
pub const RX_TAP_EVENT: DeviceEventT = 2;
impl NetQueuePair {
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
// if a buffer was used, and false if the frame must be deferred until a buffer
// is made available by the driver.
fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
let mem = self
.mem
.as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?;
let next_desc = queue.iter(&mem).next();
if next_desc.is_none() {
// Queue has no available descriptors
if self.rx_tap_listening {
unregister_listener(
self.epoll_fd.unwrap(),
self.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT),
)
.map_err(NetQueuePairError::UnregisterListener)?;
self.rx_tap_listening = false;
info!("Listener unregistered");
}
return Ok(false);
}
Ok(self.rx.process_desc_chain(&mem, next_desc, &mut queue))
}
fn process_rx(&mut self, queue: &mut Queue) -> Result<bool, NetQueuePairError> {
// Read as many frames as possible.
loop {
match self.read_tap() {
Ok(count) => {
self.rx.bytes_read = count;
if !self.rx_single_frame(queue)? {
self.rx.deferred_frame = true;
break;
}
}
Err(e) => {
// The tap device is non-blocking, so any error aside from EAGAIN is
// unexpected.
match e.raw_os_error() {
Some(err) if err == EAGAIN => (),
_ => {
error!("Failed to read tap: {:?}", e);
return Err(NetQueuePairError::FailedReadTap);
}
};
break;
}
}
}
// Consume the counters from the Rx/Tx queues and accumulate into
// the counters for the device as whole. This consumption is needed
// to handle MQ.
self.counters
.rx_bytes
.fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
self.counters
.rx_frames
.fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
self.rx.counter_bytes = Wrapping(0);
self.rx.counter_frames = Wrapping(0);
if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
let mem = self
.mem
.as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?;
Ok(queue.needs_notification(&mem, queue.next_used))
} else {
Ok(false)
}
}
pub fn resume_rx(&mut self, queue: &mut Queue) -> Result<bool, NetQueuePairError> {
if !self.rx_tap_listening {
register_listener(
self.epoll_fd.unwrap(),
self.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT),
)
.map_err(NetQueuePairError::RegisterListener)?;
self.rx_tap_listening = true;
info!("Listener registered");
}
if self.rx.deferred_frame {
if self.rx_single_frame(queue)? {
self.rx.deferred_frame = false;
// process_rx() was interrupted possibly before consuming all
// packets in the tap; try continuing now.
self.process_rx(queue)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
let mem = self
.mem
.as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?;
Ok(queue.needs_notification(&mem, queue.next_used))
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
let mem = self
.mem
.as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?;
self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue);
self.counters
.tx_bytes
.fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel);
self.counters
.tx_frames
.fetch_add(self.tx.counter_frames.0, Ordering::AcqRel);
self.tx.counter_bytes = Wrapping(0);
self.tx.counter_frames = Wrapping(0);
Ok(queue.needs_notification(&mem, queue.next_used))
}
pub fn process_rx_tap(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
if self.rx.deferred_frame
// Process a deferred frame first if available. Don't read from tap again
// until we manage to receive this deferred frame.
{
if self.rx_single_frame(&mut queue)? {
self.rx.deferred_frame = false;
self.process_rx(&mut queue)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
Ok(true)
} else {
Ok(false)
}
} else {
self.process_rx(&mut queue)
}
}
fn read_tap(&mut self) -> io::Result<usize> {
self.tap.read(&mut self.rx.frame_buf)
}
}

View File

@ -14,6 +14,5 @@ option_parser = { path = "../option_parser" }
vhost_user_backend = { path = "../vhost_user_backend" }
vhost_rs = { git = "https://github.com/cloud-hypervisor/vhost", branch = "dragonball", package = "vhost", features = ["vhost-user-slave"] }
virtio-bindings = "0.1.0"
virtio-devices = { path = "../virtio-devices" }
vm-memory = "0.2.1"
vmm-sys-util = ">=0.3.1"

View File

@ -10,11 +10,12 @@ extern crate log;
extern crate net_util;
extern crate vhost_rs;
extern crate vhost_user_backend;
extern crate virtio_devices;
use libc::{self, EFD_NONBLOCK};
use log::*;
use net_util::{open_tap, MacAddr, OpenTapError, RxVirtio, Tap, TxVirtio};
use net_util::{
open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TxVirtio,
};
use option_parser::{OptionParser, OptionParserError};
use std::fmt;
use std::io::{self};
@ -28,7 +29,6 @@ use vhost_rs::vhost_user::{Error as VhostUserError, Listener};
use vhost_user_backend::{VhostUserBackend, VhostUserDaemon, Vring, VringWorker};
use virtio_bindings::bindings::virtio_net::*;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_devices::{NetCounters, NetQueuePair};
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::eventfd::EventFd;
@ -69,7 +69,7 @@ pub enum Error {
/// No socket provided
SocketParameterMissing,
/// Underlying QueuePair error
NetQueuePair(virtio_devices::Error),
NetQueuePair(net_util::NetQueuePairError),
}
pub const SYNTAX: &str = "vhost-user-net backend parameters \

View File

@ -103,7 +103,6 @@ pub enum Error {
event_type: &'static str,
underlying: io::Error,
},
FailedReadTap,
FailedSignalingUsedQueue(io::Error),
PayloadExpected,
UnknownEvent {
@ -111,8 +110,6 @@ pub enum Error {
event: DeviceEventT,
},
IoError(io::Error),
RegisterListener(io::Error),
UnregisterListener(io::Error),
EpollCreateFd(io::Error),
EpollCtl(io::Error),
EpollWait(io::Error),
@ -122,4 +119,5 @@ pub enum Error {
SetShmRegionsNotSupported,
EpollHander(String),
NoMemoryConfigured,
NetQueuePair(::net_util::NetQueuePairError),
}

View File

@ -7,8 +7,7 @@
use super::net_util::{
build_net_config_space, build_net_config_space_with_mq, CtrlVirtio, NetCtrlEpollHandler,
VirtioNetConfig, KILL_EVENT, NET_EVENTS_COUNT, PAUSE_EVENT, RX_QUEUE_EVENT, RX_TAP_EVENT,
TX_QUEUE_EVENT,
VirtioNetConfig, KILL_EVENT, NET_EVENTS_COUNT, PAUSE_EVENT,
};
use super::Error as DeviceError;
use super::{
@ -16,22 +15,20 @@ use super::{
};
use crate::VirtioInterrupt;
use anyhow::anyhow;
use libc::EAGAIN;
use libc::EFD_NONBLOCK;
use net_util::{
open_tap, register_listener, unregister_listener, MacAddr, OpenTapError, RxVirtio, Tap,
TxVirtio,
open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TxVirtio,
RX_QUEUE_EVENT, RX_TAP_EVENT, TX_QUEUE_EVENT,
};
use std::cmp;
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::io::{self, Write};
use std::net::Ipv4Addr;
use std::num::Wrapping;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::vec::Vec;
@ -52,176 +49,6 @@ pub enum Error {
pub type Result<T> = result::Result<T, Error>;
pub struct NetQueuePair {
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
pub tap: Tap,
pub rx: RxVirtio,
pub tx: TxVirtio,
pub epoll_fd: Option<RawFd>,
pub rx_tap_listening: bool,
pub counters: NetCounters,
}
impl NetQueuePair {
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
// if a buffer was used, and false if the frame must be deferred until a buffer
// is made available by the driver.
fn rx_single_frame(&mut self, mut queue: &mut Queue) -> result::Result<bool, DeviceError> {
let mem = self
.mem
.as_ref()
.ok_or(DeviceError::NoMemoryConfigured)
.map(|m| m.memory())?;
let next_desc = queue.iter(&mem).next();
if next_desc.is_none() {
// Queue has no available descriptors
if self.rx_tap_listening {
unregister_listener(
self.epoll_fd.unwrap(),
self.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT),
)
.map_err(DeviceError::UnregisterListener)?;
self.rx_tap_listening = false;
info!("Listener unregistered");
}
return Ok(false);
}
Ok(self.rx.process_desc_chain(&mem, next_desc, &mut queue))
}
fn process_rx(&mut self, queue: &mut Queue) -> result::Result<bool, DeviceError> {
// Read as many frames as possible.
loop {
match self.read_tap() {
Ok(count) => {
self.rx.bytes_read = count;
if !self.rx_single_frame(queue)? {
self.rx.deferred_frame = true;
break;
}
}
Err(e) => {
// The tap device is non-blocking, so any error aside from EAGAIN is
// unexpected.
match e.raw_os_error() {
Some(err) if err == EAGAIN => (),
_ => {
error!("Failed to read tap: {:?}", e);
return Err(DeviceError::FailedReadTap);
}
};
break;
}
}
}
// Consume the counters from the Rx/Tx queues and accumulate into
// the counters for the device as whole. This consumption is needed
// to handle MQ.
self.counters
.rx_bytes
.fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
self.counters
.rx_frames
.fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
self.rx.counter_bytes = Wrapping(0);
self.rx.counter_frames = Wrapping(0);
if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
let mem = self
.mem
.as_ref()
.ok_or(DeviceError::NoMemoryConfigured)
.map(|m| m.memory())?;
Ok(queue.needs_notification(&mem, queue.next_used))
} else {
Ok(false)
}
}
pub fn resume_rx(&mut self, queue: &mut Queue) -> result::Result<bool, DeviceError> {
if !self.rx_tap_listening {
register_listener(
self.epoll_fd.unwrap(),
self.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT),
)
.map_err(DeviceError::RegisterListener)?;
self.rx_tap_listening = true;
info!("Listener registered");
}
if self.rx.deferred_frame {
if self.rx_single_frame(queue)? {
self.rx.deferred_frame = false;
// process_rx() was interrupted possibly before consuming all
// packets in the tap; try continuing now.
self.process_rx(queue)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
let mem = self
.mem
.as_ref()
.ok_or(DeviceError::NoMemoryConfigured)
.map(|m| m.memory())?;
Ok(queue.needs_notification(&mem, queue.next_used))
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub fn process_tx(&mut self, mut queue: &mut Queue) -> result::Result<bool, DeviceError> {
let mem = self
.mem
.as_ref()
.ok_or(DeviceError::NoMemoryConfigured)
.map(|m| m.memory())?;
self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue);
self.counters
.tx_bytes
.fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel);
self.counters
.tx_frames
.fetch_add(self.tx.counter_frames.0, Ordering::AcqRel);
self.tx.counter_bytes = Wrapping(0);
self.tx.counter_frames = Wrapping(0);
Ok(queue.needs_notification(&mem, queue.next_used))
}
pub fn process_rx_tap(&mut self, mut queue: &mut Queue) -> result::Result<bool, DeviceError> {
if self.rx.deferred_frame
// Process a deferred frame first if available. Don't read from tap again
// until we manage to receive this deferred frame.
{
if self.rx_single_frame(&mut queue)? {
self.rx.deferred_frame = false;
self.process_rx(&mut queue)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
Ok(true)
} else {
Ok(false)
}
} else {
self.process_rx(&mut queue)
}
}
fn read_tap(&mut self) -> io::Result<usize> {
self.tap.read(&mut self.rx.frame_buf)
}
}
struct NetEpollHandler {
net: NetQueuePair,
interrupt_cb: Arc<dyn VirtioInterrupt>,
@ -254,7 +81,12 @@ impl NetEpollHandler {
error!("Failed to get rx queue event: {:?}", e);
}
if self.net.resume_rx(&mut queue)? || !self.driver_awake {
if self
.net
.resume_rx(&mut queue)
.map_err(DeviceError::NetQueuePair)?
|| !self.driver_awake
{
self.signal_used_queue(queue)?;
info!("Signalling RX queue");
} else {
@ -272,7 +104,12 @@ impl NetEpollHandler {
if let Err(e) = queue_evt.read() {
error!("Failed to get tx queue event: {:?}", e);
}
if self.net.process_tx(&mut queue)? || !self.driver_awake {
if self
.net
.process_tx(&mut queue)
.map_err(DeviceError::NetQueuePair)?
|| !self.driver_awake
{
self.signal_used_queue(queue)?;
info!("Signalling TX queue");
} else {
@ -282,7 +119,12 @@ impl NetEpollHandler {
}
fn handle_rx_tap_event(&mut self, queue: &mut Queue) -> result::Result<(), DeviceError> {
if self.net.process_rx_tap(queue)? || !self.driver_awake {
if self
.net
.process_rx_tap(queue)
.map_err(DeviceError::NetQueuePair)?
|| !self.driver_awake
{
self.signal_used_queue(queue)?;
info!("Signalling RX queue");
} else {
@ -422,14 +264,6 @@ impl NetEpollHandler {
}
}
#[derive(Default, Clone)]
pub struct NetCounters {
tx_bytes: Arc<AtomicU64>,
tx_frames: Arc<AtomicU64>,
rx_bytes: Arc<AtomicU64>,
rx_frames: Arc<AtomicU64>,
}
pub struct Net {
id: String,
kill_evt: Option<EventFd>,

View File

@ -21,12 +21,6 @@ type Result<T> = std::result::Result<T, Error>;
const QUEUE_SIZE: usize = 256;
// The guest has made a buffer available to receive a frame into.
pub const RX_QUEUE_EVENT: DeviceEventT = 0;
// The transmit queue has a frame that is ready to send from the guest.
pub const TX_QUEUE_EVENT: DeviceEventT = 1;
// A frame is available for reading from the tap device to receive in the guest.
pub const RX_TAP_EVENT: DeviceEventT = 2;
// The device has been dropped.
pub const KILL_EVENT: DeviceEventT = 3;
// The device should be paused.