From 17766fcea4559101b604e37dded4ea6c08d65fa1 Mon Sep 17 00:00:00 2001 From: Rob Bradford Date: Tue, 7 Jul 2020 16:50:13 +0100 Subject: [PATCH] net_util, vhost_user_net, virtio-devices: Move NetQueuePair Move NetQueuePair and the related NetCounters into the net_util crate. This means that the vhost_user_net crate now no longer depends on virtio-devices and so does not depend on the pci, qcow or other similar crates. This significantly simplifies the build chain for this backend. Signed-off-by: Rob Bradford --- Cargo.lock | 1 - net_util/src/lib.rs | 5 +- net_util/src/queue_pair.rs | 209 +++++++++++++++++++++++++++++++- vhost_user_net/Cargo.toml | 1 - vhost_user_net/src/lib.rs | 8 +- virtio-devices/src/lib.rs | 4 +- virtio-devices/src/net.rs | 212 ++++----------------------------- virtio-devices/src/net_util.rs | 6 - 8 files changed, 238 insertions(+), 208 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3dda9b0f7..16fbba464 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,7 +1486,6 @@ dependencies = [ "vhost", "vhost_user_backend", "virtio-bindings", - "virtio-devices", "vm-memory", "vmm-sys-util", ] diff --git a/net_util/src/lib.rs b/net_util/src/lib.rs index 1f31c1a5e..5c022cc11 100644 --- a/net_util/src/lib.rs +++ b/net_util/src/lib.rs @@ -32,7 +32,10 @@ use std::{io, mem, net}; pub use mac::{MacAddr, MAC_ADDR_LEN}; pub use open_tap::{open_tap, Error as OpenTapError}; -pub use queue_pair::{RxVirtio, TxVirtio}; +pub use queue_pair::{ + NetCounters, NetQueuePair, NetQueuePairError, RxVirtio, TxVirtio, RX_QUEUE_EVENT, RX_TAP_EVENT, + TX_QUEUE_EVENT, +}; pub use tap::{Error as TapError, Tap}; #[derive(Debug)] diff --git a/net_util/src/queue_pair.rs b/net_util/src/queue_pair.rs index 42c883f30..288f0c693 100644 --- a/net_util/src/queue_pair.rs +++ b/net_util/src/queue_pair.rs @@ -2,11 +2,16 @@ // // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause -use super::{vnet_hdr_len, Tap}; +use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; +use libc::EAGAIN; use std::cmp; -use std::io::Write; +use std::io; +use std::io::{Read, Write}; use std::num::Wrapping; -use vm_memory::{Bytes, GuestAddress, GuestMemoryMmap}; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use vm_memory::{Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; use vm_virtio::{DescriptorChain, Queue}; /// The maximum buffer size when segmentation offload is enabled. This @@ -182,3 +187,201 @@ impl RxVirtio { } } } + +#[derive(Default, Clone)] +pub struct NetCounters { + pub tx_bytes: Arc, + pub tx_frames: Arc, + pub rx_bytes: Arc, + pub rx_frames: Arc, +} + +#[derive(Debug)] +pub enum NetQueuePairError { + /// No memory configured + NoMemoryConfigured, + /// Error registering listener + RegisterListener(io::Error), + /// Error unregistering listener + UnregisterListener(io::Error), + /// Error reading from the TAP device + FailedReadTap, +} + +pub struct NetQueuePair { + pub mem: Option>, + pub tap: Tap, + pub rx: RxVirtio, + pub tx: TxVirtio, + pub epoll_fd: Option, + pub rx_tap_listening: bool, + pub counters: NetCounters, +} + +pub type DeviceEventT = u16; +// The guest has made a buffer available to receive a frame into. +pub const RX_QUEUE_EVENT: DeviceEventT = 0; +// The transmit queue has a frame that is ready to send from the guest. +pub const TX_QUEUE_EVENT: DeviceEventT = 1; +// A frame is available for reading from the tap device to receive in the guest. +pub const RX_TAP_EVENT: DeviceEventT = 2; + +impl NetQueuePair { + // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true + // if a buffer was used, and false if the frame must be deferred until a buffer + // is made available by the driver. + fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result { + let mem = self + .mem + .as_ref() + .ok_or(NetQueuePairError::NoMemoryConfigured) + .map(|m| m.memory())?; + let next_desc = queue.iter(&mem).next(); + + if next_desc.is_none() { + // Queue has no available descriptors + if self.rx_tap_listening { + unregister_listener( + self.epoll_fd.unwrap(), + self.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(RX_TAP_EVENT), + ) + .map_err(NetQueuePairError::UnregisterListener)?; + self.rx_tap_listening = false; + info!("Listener unregistered"); + } + return Ok(false); + } + + Ok(self.rx.process_desc_chain(&mem, next_desc, &mut queue)) + } + + fn process_rx(&mut self, queue: &mut Queue) -> Result { + // Read as many frames as possible. + loop { + match self.read_tap() { + Ok(count) => { + self.rx.bytes_read = count; + if !self.rx_single_frame(queue)? { + self.rx.deferred_frame = true; + break; + } + } + Err(e) => { + // The tap device is non-blocking, so any error aside from EAGAIN is + // unexpected. + match e.raw_os_error() { + Some(err) if err == EAGAIN => (), + _ => { + error!("Failed to read tap: {:?}", e); + return Err(NetQueuePairError::FailedReadTap); + } + }; + break; + } + } + } + + // Consume the counters from the Rx/Tx queues and accumulate into + // the counters for the device as whole. This consumption is needed + // to handle MQ. + self.counters + .rx_bytes + .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); + self.counters + .rx_frames + .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); + self.rx.counter_bytes = Wrapping(0); + self.rx.counter_frames = Wrapping(0); + + if self.rx.deferred_irqs { + self.rx.deferred_irqs = false; + let mem = self + .mem + .as_ref() + .ok_or(NetQueuePairError::NoMemoryConfigured) + .map(|m| m.memory())?; + Ok(queue.needs_notification(&mem, queue.next_used)) + } else { + Ok(false) + } + } + + pub fn resume_rx(&mut self, queue: &mut Queue) -> Result { + if !self.rx_tap_listening { + register_listener( + self.epoll_fd.unwrap(), + self.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(RX_TAP_EVENT), + ) + .map_err(NetQueuePairError::RegisterListener)?; + self.rx_tap_listening = true; + info!("Listener registered"); + } + if self.rx.deferred_frame { + if self.rx_single_frame(queue)? { + self.rx.deferred_frame = false; + // process_rx() was interrupted possibly before consuming all + // packets in the tap; try continuing now. + self.process_rx(queue) + } else if self.rx.deferred_irqs { + self.rx.deferred_irqs = false; + let mem = self + .mem + .as_ref() + .ok_or(NetQueuePairError::NoMemoryConfigured) + .map(|m| m.memory())?; + Ok(queue.needs_notification(&mem, queue.next_used)) + } else { + Ok(false) + } + } else { + Ok(false) + } + } + + pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result { + let mem = self + .mem + .as_ref() + .ok_or(NetQueuePairError::NoMemoryConfigured) + .map(|m| m.memory())?; + self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue); + + self.counters + .tx_bytes + .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel); + self.counters + .tx_frames + .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel); + self.tx.counter_bytes = Wrapping(0); + self.tx.counter_frames = Wrapping(0); + + Ok(queue.needs_notification(&mem, queue.next_used)) + } + + pub fn process_rx_tap(&mut self, mut queue: &mut Queue) -> Result { + if self.rx.deferred_frame + // Process a deferred frame first if available. Don't read from tap again + // until we manage to receive this deferred frame. + { + if self.rx_single_frame(&mut queue)? { + self.rx.deferred_frame = false; + self.process_rx(&mut queue) + } else if self.rx.deferred_irqs { + self.rx.deferred_irqs = false; + Ok(true) + } else { + Ok(false) + } + } else { + self.process_rx(&mut queue) + } + } + + fn read_tap(&mut self) -> io::Result { + self.tap.read(&mut self.rx.frame_buf) + } +} diff --git a/vhost_user_net/Cargo.toml b/vhost_user_net/Cargo.toml index 41012ac92..034ecb854 100644 --- a/vhost_user_net/Cargo.toml +++ b/vhost_user_net/Cargo.toml @@ -14,6 +14,5 @@ option_parser = { path = "../option_parser" } vhost_user_backend = { path = "../vhost_user_backend" } vhost_rs = { git = "https://github.com/cloud-hypervisor/vhost", branch = "dragonball", package = "vhost", features = ["vhost-user-slave"] } virtio-bindings = "0.1.0" -virtio-devices = { path = "../virtio-devices" } vm-memory = "0.2.1" vmm-sys-util = ">=0.3.1" diff --git a/vhost_user_net/src/lib.rs b/vhost_user_net/src/lib.rs index 6353b0a7b..1522118bd 100644 --- a/vhost_user_net/src/lib.rs +++ b/vhost_user_net/src/lib.rs @@ -10,11 +10,12 @@ extern crate log; extern crate net_util; extern crate vhost_rs; extern crate vhost_user_backend; -extern crate virtio_devices; use libc::{self, EFD_NONBLOCK}; use log::*; -use net_util::{open_tap, MacAddr, OpenTapError, RxVirtio, Tap, TxVirtio}; +use net_util::{ + open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TxVirtio, +}; use option_parser::{OptionParser, OptionParserError}; use std::fmt; use std::io::{self}; @@ -28,7 +29,6 @@ use vhost_rs::vhost_user::{Error as VhostUserError, Listener}; use vhost_user_backend::{VhostUserBackend, VhostUserDaemon, Vring, VringWorker}; use virtio_bindings::bindings::virtio_net::*; use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; -use virtio_devices::{NetCounters, NetQueuePair}; use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap}; use vmm_sys_util::eventfd::EventFd; @@ -69,7 +69,7 @@ pub enum Error { /// No socket provided SocketParameterMissing, /// Underlying QueuePair error - NetQueuePair(virtio_devices::Error), + NetQueuePair(net_util::NetQueuePairError), } pub const SYNTAX: &str = "vhost-user-net backend parameters \ diff --git a/virtio-devices/src/lib.rs b/virtio-devices/src/lib.rs index d3f5c07a6..f4960517e 100644 --- a/virtio-devices/src/lib.rs +++ b/virtio-devices/src/lib.rs @@ -103,7 +103,6 @@ pub enum Error { event_type: &'static str, underlying: io::Error, }, - FailedReadTap, FailedSignalingUsedQueue(io::Error), PayloadExpected, UnknownEvent { @@ -111,8 +110,6 @@ pub enum Error { event: DeviceEventT, }, IoError(io::Error), - RegisterListener(io::Error), - UnregisterListener(io::Error), EpollCreateFd(io::Error), EpollCtl(io::Error), EpollWait(io::Error), @@ -122,4 +119,5 @@ pub enum Error { SetShmRegionsNotSupported, EpollHander(String), NoMemoryConfigured, + NetQueuePair(::net_util::NetQueuePairError), } diff --git a/virtio-devices/src/net.rs b/virtio-devices/src/net.rs index 01aa74b6c..0019b6f7d 100644 --- a/virtio-devices/src/net.rs +++ b/virtio-devices/src/net.rs @@ -7,8 +7,7 @@ use super::net_util::{ build_net_config_space, build_net_config_space_with_mq, CtrlVirtio, NetCtrlEpollHandler, - VirtioNetConfig, KILL_EVENT, NET_EVENTS_COUNT, PAUSE_EVENT, RX_QUEUE_EVENT, RX_TAP_EVENT, - TX_QUEUE_EVENT, + VirtioNetConfig, KILL_EVENT, NET_EVENTS_COUNT, PAUSE_EVENT, }; use super::Error as DeviceError; use super::{ @@ -16,22 +15,20 @@ use super::{ }; use crate::VirtioInterrupt; use anyhow::anyhow; -use libc::EAGAIN; use libc::EFD_NONBLOCK; use net_util::{ - open_tap, register_listener, unregister_listener, MacAddr, OpenTapError, RxVirtio, Tap, - TxVirtio, + open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TxVirtio, + RX_QUEUE_EVENT, RX_TAP_EVENT, TX_QUEUE_EVENT, }; use std::cmp; use std::collections::HashMap; use std::fs::File; -use std::io::Read; use std::io::{self, Write}; use std::net::Ipv4Addr; use std::num::Wrapping; -use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::os::unix::io::{AsRawFd, FromRawFd}; use std::result; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::vec::Vec; @@ -52,176 +49,6 @@ pub enum Error { pub type Result = result::Result; -pub struct NetQueuePair { - pub mem: Option>, - pub tap: Tap, - pub rx: RxVirtio, - pub tx: TxVirtio, - pub epoll_fd: Option, - pub rx_tap_listening: bool, - pub counters: NetCounters, -} - -impl NetQueuePair { - // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true - // if a buffer was used, and false if the frame must be deferred until a buffer - // is made available by the driver. - fn rx_single_frame(&mut self, mut queue: &mut Queue) -> result::Result { - let mem = self - .mem - .as_ref() - .ok_or(DeviceError::NoMemoryConfigured) - .map(|m| m.memory())?; - let next_desc = queue.iter(&mem).next(); - - if next_desc.is_none() { - // Queue has no available descriptors - if self.rx_tap_listening { - unregister_listener( - self.epoll_fd.unwrap(), - self.tap.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(RX_TAP_EVENT), - ) - .map_err(DeviceError::UnregisterListener)?; - self.rx_tap_listening = false; - info!("Listener unregistered"); - } - return Ok(false); - } - - Ok(self.rx.process_desc_chain(&mem, next_desc, &mut queue)) - } - - fn process_rx(&mut self, queue: &mut Queue) -> result::Result { - // Read as many frames as possible. - loop { - match self.read_tap() { - Ok(count) => { - self.rx.bytes_read = count; - if !self.rx_single_frame(queue)? { - self.rx.deferred_frame = true; - break; - } - } - Err(e) => { - // The tap device is non-blocking, so any error aside from EAGAIN is - // unexpected. - match e.raw_os_error() { - Some(err) if err == EAGAIN => (), - _ => { - error!("Failed to read tap: {:?}", e); - return Err(DeviceError::FailedReadTap); - } - }; - break; - } - } - } - - // Consume the counters from the Rx/Tx queues and accumulate into - // the counters for the device as whole. This consumption is needed - // to handle MQ. - self.counters - .rx_bytes - .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); - self.counters - .rx_frames - .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); - self.rx.counter_bytes = Wrapping(0); - self.rx.counter_frames = Wrapping(0); - - if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - let mem = self - .mem - .as_ref() - .ok_or(DeviceError::NoMemoryConfigured) - .map(|m| m.memory())?; - Ok(queue.needs_notification(&mem, queue.next_used)) - } else { - Ok(false) - } - } - - pub fn resume_rx(&mut self, queue: &mut Queue) -> result::Result { - if !self.rx_tap_listening { - register_listener( - self.epoll_fd.unwrap(), - self.tap.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(RX_TAP_EVENT), - ) - .map_err(DeviceError::RegisterListener)?; - self.rx_tap_listening = true; - info!("Listener registered"); - } - if self.rx.deferred_frame { - if self.rx_single_frame(queue)? { - self.rx.deferred_frame = false; - // process_rx() was interrupted possibly before consuming all - // packets in the tap; try continuing now. - self.process_rx(queue) - } else if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - let mem = self - .mem - .as_ref() - .ok_or(DeviceError::NoMemoryConfigured) - .map(|m| m.memory())?; - Ok(queue.needs_notification(&mem, queue.next_used)) - } else { - Ok(false) - } - } else { - Ok(false) - } - } - - pub fn process_tx(&mut self, mut queue: &mut Queue) -> result::Result { - let mem = self - .mem - .as_ref() - .ok_or(DeviceError::NoMemoryConfigured) - .map(|m| m.memory())?; - self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue); - - self.counters - .tx_bytes - .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel); - self.counters - .tx_frames - .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel); - self.tx.counter_bytes = Wrapping(0); - self.tx.counter_frames = Wrapping(0); - - Ok(queue.needs_notification(&mem, queue.next_used)) - } - - pub fn process_rx_tap(&mut self, mut queue: &mut Queue) -> result::Result { - if self.rx.deferred_frame - // Process a deferred frame first if available. Don't read from tap again - // until we manage to receive this deferred frame. - { - if self.rx_single_frame(&mut queue)? { - self.rx.deferred_frame = false; - self.process_rx(&mut queue) - } else if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - Ok(true) - } else { - Ok(false) - } - } else { - self.process_rx(&mut queue) - } - } - - fn read_tap(&mut self) -> io::Result { - self.tap.read(&mut self.rx.frame_buf) - } -} - struct NetEpollHandler { net: NetQueuePair, interrupt_cb: Arc, @@ -254,7 +81,12 @@ impl NetEpollHandler { error!("Failed to get rx queue event: {:?}", e); } - if self.net.resume_rx(&mut queue)? || !self.driver_awake { + if self + .net + .resume_rx(&mut queue) + .map_err(DeviceError::NetQueuePair)? + || !self.driver_awake + { self.signal_used_queue(queue)?; info!("Signalling RX queue"); } else { @@ -272,7 +104,12 @@ impl NetEpollHandler { if let Err(e) = queue_evt.read() { error!("Failed to get tx queue event: {:?}", e); } - if self.net.process_tx(&mut queue)? || !self.driver_awake { + if self + .net + .process_tx(&mut queue) + .map_err(DeviceError::NetQueuePair)? + || !self.driver_awake + { self.signal_used_queue(queue)?; info!("Signalling TX queue"); } else { @@ -282,7 +119,12 @@ impl NetEpollHandler { } fn handle_rx_tap_event(&mut self, queue: &mut Queue) -> result::Result<(), DeviceError> { - if self.net.process_rx_tap(queue)? || !self.driver_awake { + if self + .net + .process_rx_tap(queue) + .map_err(DeviceError::NetQueuePair)? + || !self.driver_awake + { self.signal_used_queue(queue)?; info!("Signalling RX queue"); } else { @@ -422,14 +264,6 @@ impl NetEpollHandler { } } -#[derive(Default, Clone)] -pub struct NetCounters { - tx_bytes: Arc, - tx_frames: Arc, - rx_bytes: Arc, - rx_frames: Arc, -} - pub struct Net { id: String, kill_evt: Option, diff --git a/virtio-devices/src/net_util.rs b/virtio-devices/src/net_util.rs index 0060a30b9..7014c96fa 100644 --- a/virtio-devices/src/net_util.rs +++ b/virtio-devices/src/net_util.rs @@ -21,12 +21,6 @@ type Result = std::result::Result; const QUEUE_SIZE: usize = 256; -// The guest has made a buffer available to receive a frame into. -pub const RX_QUEUE_EVENT: DeviceEventT = 0; -// The transmit queue has a frame that is ready to send from the guest. -pub const TX_QUEUE_EVENT: DeviceEventT = 1; -// A frame is available for reading from the tap device to receive in the guest. -pub const RX_TAP_EVENT: DeviceEventT = 2; // The device has been dropped. pub const KILL_EVENT: DeviceEventT = 3; // The device should be paused.