net_util: Simplify TX/RX queue handling

The main idea behind this commit is to remove all the complexity
associated with TX/RX handling for virtio-net. By using writev() and
readv() syscalls, we could get rid of intermediate buffers for both
queues.

The complexity regarding the TAP registration has been simplified as
well. The RX queue is only processed when some data are ready to be
read from TAP. The event related to the RX queue getting more
descriptors only serves the purpose to register the TAP file if it's not
already.

With all these simplifications, the code is more readable but more
performant as well. We can see an improvement of 10% for a single
queue device.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2021-02-16 22:17:37 +01:00 committed by Rob Bradford
parent 5ed2a654e8
commit 4ed0e1a3c8
5 changed files with 160 additions and 265 deletions

View File

@ -2,27 +2,17 @@
// //
// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; use super::{unregister_listener, vnet_hdr_len, Tap};
use libc::EAGAIN;
use std::cmp;
use std::io; use std::io;
use std::io::{Read, Write};
use std::num::Wrapping; use std::num::Wrapping;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use vm_memory::{Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; use vm_memory::{Bytes, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryMmap};
use vm_virtio::{DescriptorChain, Queue}; use vm_virtio::Queue;
/// The maximum buffer size when segmentation offload is enabled. This
/// includes the 12-byte virtio net header.
/// http://docs.oasis-open.org/virtio/virtio/v1.0/virtio-v1.0.html#x1-1740003
const MAX_BUFFER_SIZE: usize = 65562;
#[derive(Clone)] #[derive(Clone)]
pub struct TxVirtio { pub struct TxVirtio {
pub iovec: Vec<(GuestAddress, usize)>,
pub frame_buf: [u8; MAX_BUFFER_SIZE],
pub counter_bytes: Wrapping<u64>, pub counter_bytes: Wrapping<u64>,
pub counter_frames: Wrapping<u64>, pub counter_frames: Wrapping<u64>,
} }
@ -36,73 +26,66 @@ impl Default for TxVirtio {
impl TxVirtio { impl TxVirtio {
pub fn new() -> Self { pub fn new() -> Self {
TxVirtio { TxVirtio {
iovec: Vec::new(),
frame_buf: [0u8; MAX_BUFFER_SIZE],
counter_bytes: Wrapping(0), counter_bytes: Wrapping(0),
counter_frames: Wrapping(0), counter_frames: Wrapping(0),
} }
} }
pub fn process_desc_chain(&mut self, mem: &GuestMemoryMmap, tap: &mut Tap, queue: &mut Queue) { pub fn process_desc_chain(
&mut self,
mem: &GuestMemoryMmap,
tap: &mut Tap,
queue: &mut Queue,
) -> Result<(), NetQueuePairError> {
while let Some(avail_desc) = queue.iter(&mem).next() { while let Some(avail_desc) = queue.iter(&mem).next() {
let head_index = avail_desc.index; let head_index = avail_desc.index;
let mut read_count = 0;
let mut next_desc = Some(avail_desc); let mut next_desc = Some(avail_desc);
self.iovec.clear(); let mut iovecs = Vec::new();
while let Some(desc) = next_desc { while let Some(desc) = next_desc {
if desc.is_write_only() { if !desc.is_write_only() {
break; let buf = mem
.get_slice(desc.addr, desc.len as usize)
.map_err(NetQueuePairError::GuestMemory)?
.as_ptr();
let iovec = libc::iovec {
iov_base: buf as *mut libc::c_void,
iov_len: desc.len as libc::size_t,
};
iovecs.push(iovec);
} }
self.iovec.push((desc.addr, desc.len as usize));
read_count += desc.len as usize;
next_desc = desc.next_descriptor(); next_desc = desc.next_descriptor();
} }
read_count = 0; if !iovecs.is_empty() {
// Copy buffer from across multiple descriptors. let result = unsafe {
// TODO(performance - Issue #420): change this to use `writev()` instead of `write()` libc::writev(
// and get rid of the intermediate buffer. tap.as_raw_fd() as libc::c_int,
for (desc_addr, desc_len) in self.iovec.drain(..) { iovecs.as_ptr() as *const libc::iovec,
let limit = cmp::min((read_count + desc_len) as usize, self.frame_buf.len()); iovecs.len() as libc::c_int,
)
let read_result = };
mem.read_slice(&mut self.frame_buf[read_count..limit as usize], desc_addr); if result < 0 {
match read_result { let e = std::io::Error::last_os_error();
Ok(_) => { error!("net: tx: failed writing to tap: {}", e);
// Increment by number of bytes actually read queue.go_to_previous_position();
read_count += limit - read_count; return Err(NetQueuePairError::WriteTap(e));
}
Err(e) => {
println!("Failed to read slice: {:?}", e);
break;
}
} }
self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
self.counter_frames += Wrapping(1);
} }
let write_result = tap.write(&self.frame_buf[..read_count]);
match write_result {
Ok(_) => {}
Err(e) => {
println!("net: tx: error failed to write to tap: {}", e);
}
};
self.counter_bytes += Wrapping((read_count - vnet_hdr_len()) as u64);
self.counter_frames += Wrapping(1);
queue.add_used(&mem, head_index, 0); queue.add_used(&mem, head_index, 0);
queue.update_avail_event(&mem); queue.update_avail_event(&mem);
} }
Ok(())
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct RxVirtio { pub struct RxVirtio {
pub deferred_frame: bool,
pub deferred_irqs: bool,
pub bytes_read: usize,
pub frame_buf: [u8; MAX_BUFFER_SIZE],
pub counter_bytes: Wrapping<u64>, pub counter_bytes: Wrapping<u64>,
pub counter_frames: Wrapping<u64>, pub counter_frames: Wrapping<u64>,
} }
@ -116,10 +99,6 @@ impl Default for RxVirtio {
impl RxVirtio { impl RxVirtio {
pub fn new() -> Self { pub fn new() -> Self {
RxVirtio { RxVirtio {
deferred_frame: false,
deferred_irqs: false,
bytes_read: 0,
frame_buf: [0u8; MAX_BUFFER_SIZE],
counter_bytes: Wrapping(0), counter_bytes: Wrapping(0),
counter_frames: Wrapping(0), counter_frames: Wrapping(0),
} }
@ -128,63 +107,72 @@ impl RxVirtio {
pub fn process_desc_chain( pub fn process_desc_chain(
&mut self, &mut self,
mem: &GuestMemoryMmap, mem: &GuestMemoryMmap,
mut next_desc: Option<DescriptorChain>, tap: &mut Tap,
queue: &mut Queue, queue: &mut Queue,
) -> bool { ) -> Result<bool, NetQueuePairError> {
let head_index = next_desc.as_ref().unwrap().index; let mut exhausted_descs = true;
let mut write_count = 0; while let Some(avail_desc) = queue.iter(&mem).next() {
let head_index = avail_desc.index;
let num_buffers_addr = mem.checked_offset(avail_desc.addr, 10).unwrap();
let mut next_desc = Some(avail_desc);
// Copy from frame into buffer, which may span multiple descriptors. let mut iovecs = Vec::new();
loop { while let Some(desc) = next_desc {
match next_desc { if desc.is_write_only() {
Some(desc) => { let buf = mem
if !desc.is_write_only() { .get_slice(desc.addr, desc.len as usize)
break; .map_err(NetQueuePairError::GuestMemory)?
} .as_ptr();
let limit = cmp::min(write_count + desc.len as usize, self.bytes_read); let iovec = libc::iovec {
let source_slice = &self.frame_buf[write_count..limit]; iov_base: buf as *mut libc::c_void,
let write_result = mem.write_slice(source_slice, desc.addr); iov_len: desc.len as libc::size_t,
};
iovecs.push(iovec);
}
next_desc = desc.next_descriptor();
}
match write_result { let len = if !iovecs.is_empty() {
Ok(_) => { let result = unsafe {
write_count = limit; libc::readv(
} tap.as_raw_fd() as libc::c_int,
Err(e) => { iovecs.as_ptr() as *const libc::iovec,
error!("Failed to write slice: {:?}", e); iovecs.len() as libc::c_int,
)
};
if result < 0 {
let e = std::io::Error::last_os_error();
exhausted_descs = false;
queue.go_to_previous_position();
if let Some(raw_err) = e.raw_os_error() {
if raw_err == libc::EAGAIN {
break; break;
} }
};
if write_count >= self.bytes_read {
break;
} }
next_desc = desc.next_descriptor();
error!("net: rx: failed reading from tap: {}", e);
return Err(NetQueuePairError::ReadTap(e));
} }
None => {
warn!("Receiving buffer is too small to hold frame of current size"); // Write num_buffers to guest memory. We simply write 1 as we
break; // never spread the frame over more than one descriptor chain.
} mem.write_obj(1u16, num_buffers_addr)
} .map_err(NetQueuePairError::GuestMemory)?;
self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
self.counter_frames += Wrapping(1);
result as u32
} else {
0
};
queue.add_used(&mem, head_index, len);
queue.update_avail_event(&mem);
} }
self.counter_bytes += Wrapping((write_count - vnet_hdr_len()) as u64); Ok(exhausted_descs)
self.counter_frames += Wrapping(1);
queue.add_used(&mem, head_index, write_count as u32);
queue.update_avail_event(&mem);
// Mark that we have at least one pending packet and we need to interrupt the guest.
self.deferred_irqs = true;
// Update the frame_buf buffer.
if write_count < self.bytes_read {
self.frame_buf.copy_within(write_count..self.bytes_read, 0);
self.bytes_read -= write_count;
false
} else {
self.bytes_read = 0;
true
}
} }
} }
@ -204,8 +192,12 @@ pub enum NetQueuePairError {
RegisterListener(io::Error), RegisterListener(io::Error),
/// Error unregistering listener /// Error unregistering listener
UnregisterListener(io::Error), UnregisterListener(io::Error),
/// Error writing to the TAP device
WriteTap(io::Error),
/// Error reading from the TAP device /// Error reading from the TAP device
FailedReadTap, ReadTap(io::Error),
/// Error related to guest memory
GuestMemory(vm_memory::GuestMemoryError),
} }
pub struct NetQueuePair { pub struct NetQueuePair {
@ -220,128 +212,15 @@ pub struct NetQueuePair {
} }
impl NetQueuePair { impl NetQueuePair {
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
// if a buffer was used, and false if the frame must be deferred until a buffer
// is made available by the driver.
fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
let mem = self
.mem
.as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?;
let next_desc = queue.iter(&mem).next();
if next_desc.is_none() {
// Queue has no available descriptors
if self.rx_tap_listening {
unregister_listener(
self.epoll_fd.unwrap(),
self.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(self.tap_event_id),
)
.map_err(NetQueuePairError::UnregisterListener)?;
self.rx_tap_listening = false;
info!("Listener unregistered");
}
return Ok(false);
}
Ok(self.rx.process_desc_chain(&mem, next_desc, &mut queue))
}
fn process_rx(&mut self, queue: &mut Queue) -> Result<bool, NetQueuePairError> {
// Read as many frames as possible.
loop {
match self.read_tap() {
Ok(count) => {
self.rx.bytes_read = count;
if !self.rx_single_frame(queue)? {
self.rx.deferred_frame = true;
break;
}
}
Err(e) => {
// The tap device is non-blocking, so any error aside from EAGAIN is
// unexpected.
match e.raw_os_error() {
Some(err) if err == EAGAIN => (),
_ => {
error!("Failed to read tap: {:?}", e);
return Err(NetQueuePairError::FailedReadTap);
}
};
break;
}
}
}
// Consume the counters from the Rx/Tx queues and accumulate into
// the counters for the device as whole. This consumption is needed
// to handle MQ.
self.counters
.rx_bytes
.fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
self.counters
.rx_frames
.fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
self.rx.counter_bytes = Wrapping(0);
self.rx.counter_frames = Wrapping(0);
if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
let mem = self
.mem
.as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?;
Ok(queue.needs_notification(&mem, queue.next_used))
} else {
Ok(false)
}
}
pub fn resume_rx(&mut self, queue: &mut Queue) -> Result<bool, NetQueuePairError> {
if !self.rx_tap_listening {
register_listener(
self.epoll_fd.unwrap(),
self.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(self.tap_event_id),
)
.map_err(NetQueuePairError::RegisterListener)?;
self.rx_tap_listening = true;
info!("Listener registered");
}
if self.rx.deferred_frame {
if self.rx_single_frame(queue)? {
self.rx.deferred_frame = false;
// process_rx() was interrupted possibly before consuming all
// packets in the tap; try continuing now.
self.process_rx(queue)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
let mem = self
.mem
.as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?;
Ok(queue.needs_notification(&mem, queue.next_used))
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> { pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
let mem = self let mem = self
.mem .mem
.as_ref() .as_ref()
.ok_or(NetQueuePairError::NoMemoryConfigured) .ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?; .map(|m| m.memory())?;
self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue);
self.tx
.process_desc_chain(&mem, &mut self.tap, &mut queue)?;
self.counters self.counters
.tx_bytes .tx_bytes
@ -355,26 +234,37 @@ impl NetQueuePair {
Ok(queue.needs_notification(&mem, queue.next_used)) Ok(queue.needs_notification(&mem, queue.next_used))
} }
pub fn process_rx_tap(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> { pub fn process_rx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
if self.rx.deferred_frame let mem = self
// Process a deferred frame first if available. Don't read from tap again .mem
// until we manage to receive this deferred frame. .as_ref()
{ .ok_or(NetQueuePairError::NoMemoryConfigured)
if self.rx_single_frame(&mut queue)? { .map(|m| m.memory())?;
self.rx.deferred_frame = false;
self.process_rx(&mut queue)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
Ok(true)
} else {
Ok(false)
}
} else {
self.process_rx(&mut queue)
}
}
fn read_tap(&mut self) -> io::Result<usize> { if self
self.tap.read(&mut self.rx.frame_buf) .rx
.process_desc_chain(&mem, &mut self.tap, &mut queue)?
&& self.rx_tap_listening
{
unregister_listener(
self.epoll_fd.unwrap(),
self.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(self.tap_event_id),
)
.map_err(NetQueuePairError::UnregisterListener)?;
self.rx_tap_listening = false;
}
self.counters
.rx_bytes
.fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
self.counters
.rx_frames
.fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
self.rx.counter_bytes = Wrapping(0);
self.rx.counter_frames = Wrapping(0);
Ok(queue.needs_notification(&mem, queue.next_used))
} }
} }

View File

@ -70,6 +70,8 @@ pub enum Error {
SocketParameterMissing, SocketParameterMissing,
/// Underlying QueuePair error /// Underlying QueuePair error
NetQueuePair(net_util::NetQueuePairError), NetQueuePair(net_util::NetQueuePairError),
/// Failed registering the TAP listener
RegisterTapListener(io::Error),
} }
pub const SYNTAX: &str = "vhost-user-net backend parameters \ pub const SYNTAX: &str = "vhost-user-net backend parameters \
@ -212,15 +214,15 @@ impl VhostUserBackend for VhostUserNetBackend {
let mut thread = self.threads[thread_id].lock().unwrap(); let mut thread = self.threads[thread_id].lock().unwrap();
match device_event { match device_event {
0 => { 0 => {
let mut vring = vrings[0].write().unwrap(); if !thread.net.rx_tap_listening {
if thread net_util::register_listener(
.net thread.net.epoll_fd.unwrap(),
.resume_rx(&mut vring.mut_queue()) thread.net.tap.as_raw_fd(),
.map_err(Error::NetQueuePair)? epoll::Events::EPOLLIN,
{ u64::from(thread.net.tap_event_id),
vring )
.signal_used_queue() .map_err(Error::RegisterTapListener)?;
.map_err(Error::FailedSignalingUsedQueue)? thread.net.rx_tap_listening = true;
} }
} }
1 => { 1 => {
@ -239,7 +241,7 @@ impl VhostUserBackend for VhostUserNetBackend {
let mut vring = vrings[0].write().unwrap(); let mut vring = vrings[0].write().unwrap();
if thread if thread
.net .net
.process_rx_tap(&mut vring.mut_queue()) .process_rx(&mut vring.mut_queue())
.map_err(Error::NetQueuePair)? .map_err(Error::NetQueuePair)?
{ {
vring vring

View File

@ -87,16 +87,15 @@ impl NetEpollHandler {
error!("Failed to get rx queue event: {:?}", e); error!("Failed to get rx queue event: {:?}", e);
} }
if self if !self.net.rx_tap_listening {
.net net_util::register_listener(
.resume_rx(&mut self.queue_pair[0]) self.net.epoll_fd.unwrap(),
.map_err(DeviceError::NetQueuePair)? self.net.tap.as_raw_fd(),
|| !self.driver_awake epoll::Events::EPOLLIN,
{ u64::from(self.net.tap_event_id),
self.signal_used_queue(&self.queue_pair[0])?; )
debug!("Signalling RX queue"); .map_err(DeviceError::IoError)?;
} else { self.net.rx_tap_listening = true;
debug!("Not signalling RX queue");
} }
Ok(()) Ok(())
@ -124,7 +123,7 @@ impl NetEpollHandler {
fn handle_rx_tap_event(&mut self) -> result::Result<(), DeviceError> { fn handle_rx_tap_event(&mut self) -> result::Result<(), DeviceError> {
if self if self
.net .net
.process_rx_tap(&mut self.queue_pair[0]) .process_rx(&mut self.queue_pair[0])
.map_err(DeviceError::NetQueuePair)? .map_err(DeviceError::NetQueuePair)?
|| !self.driver_awake || !self.driver_awake
{ {

View File

@ -203,9 +203,11 @@ fn virtio_net_thread_rules() -> Vec<SyscallRuleSet> {
allow_syscall(libc::SYS_munmap), allow_syscall(libc::SYS_munmap),
allow_syscall(libc::SYS_openat), allow_syscall(libc::SYS_openat),
allow_syscall(libc::SYS_read), allow_syscall(libc::SYS_read),
allow_syscall(libc::SYS_readv),
allow_syscall(libc::SYS_rt_sigprocmask), allow_syscall(libc::SYS_rt_sigprocmask),
allow_syscall(libc::SYS_sigaltstack), allow_syscall(libc::SYS_sigaltstack),
allow_syscall(libc::SYS_write), allow_syscall(libc::SYS_write),
allow_syscall(libc::SYS_writev),
] ]
} }

View File

@ -353,6 +353,7 @@ fn vmm_thread_rules() -> Result<Vec<SyscallRuleSet>, Error> {
allow_syscall(libc::SYS_pwrite64), allow_syscall(libc::SYS_pwrite64),
allow_syscall(libc::SYS_pwritev), allow_syscall(libc::SYS_pwritev),
allow_syscall(libc::SYS_read), allow_syscall(libc::SYS_read),
allow_syscall(libc::SYS_readv),
#[cfg(target_arch = "x86_64")] #[cfg(target_arch = "x86_64")]
allow_syscall(libc::SYS_readlink), allow_syscall(libc::SYS_readlink),
allow_syscall(libc::SYS_recvfrom), allow_syscall(libc::SYS_recvfrom),
@ -392,6 +393,7 @@ fn vmm_thread_rules() -> Result<Vec<SyscallRuleSet>, Error> {
allow_syscall(libc::SYS_unlinkat), allow_syscall(libc::SYS_unlinkat),
allow_syscall(libc::SYS_wait4), allow_syscall(libc::SYS_wait4),
allow_syscall(libc::SYS_write), allow_syscall(libc::SYS_write),
allow_syscall(libc::SYS_writev),
]) ])
} }