From 4ed0e1a3c822ecc87702b91928a169ad3b6083e1 Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Tue, 16 Feb 2021 22:17:37 +0100 Subject: [PATCH] net_util: Simplify TX/RX queue handling The main idea behind this commit is to remove all the complexity associated with TX/RX handling for virtio-net. By using writev() and readv() syscalls, we could get rid of intermediate buffers for both queues. The complexity regarding the TAP registration has been simplified as well. The RX queue is only processed when some data are ready to be read from TAP. The event related to the RX queue getting more descriptors only serves the purpose to register the TAP file if it's not already. With all these simplifications, the code is more readable but more performant as well. We can see an improvement of 10% for a single queue device. Signed-off-by: Sebastien Boeuf --- net_util/src/queue_pair.rs | 378 +++++++++----------------- vhost_user_net/src/lib.rs | 22 +- virtio-devices/src/net.rs | 21 +- virtio-devices/src/seccomp_filters.rs | 2 + vmm/src/seccomp_filters.rs | 2 + 5 files changed, 160 insertions(+), 265 deletions(-) diff --git a/net_util/src/queue_pair.rs b/net_util/src/queue_pair.rs index bea5436c7..75df2eaeb 100644 --- a/net_util/src/queue_pair.rs +++ b/net_util/src/queue_pair.rs @@ -2,27 +2,17 @@ // // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause -use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; -use libc::EAGAIN; -use std::cmp; +use super::{unregister_listener, vnet_hdr_len, Tap}; use std::io; -use std::io::{Read, Write}; use std::num::Wrapping; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use vm_memory::{Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; -use vm_virtio::{DescriptorChain, Queue}; - -/// The maximum buffer size when segmentation offload is enabled. This -/// includes the 12-byte virtio net header. -/// http://docs.oasis-open.org/virtio/virtio/v1.0/virtio-v1.0.html#x1-1740003 -const MAX_BUFFER_SIZE: usize = 65562; +use vm_memory::{Bytes, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryMmap}; +use vm_virtio::Queue; #[derive(Clone)] pub struct TxVirtio { - pub iovec: Vec<(GuestAddress, usize)>, - pub frame_buf: [u8; MAX_BUFFER_SIZE], pub counter_bytes: Wrapping, pub counter_frames: Wrapping, } @@ -36,73 +26,66 @@ impl Default for TxVirtio { impl TxVirtio { pub fn new() -> Self { TxVirtio { - iovec: Vec::new(), - frame_buf: [0u8; MAX_BUFFER_SIZE], counter_bytes: Wrapping(0), counter_frames: Wrapping(0), } } - pub fn process_desc_chain(&mut self, mem: &GuestMemoryMmap, tap: &mut Tap, queue: &mut Queue) { + pub fn process_desc_chain( + &mut self, + mem: &GuestMemoryMmap, + tap: &mut Tap, + queue: &mut Queue, + ) -> Result<(), NetQueuePairError> { while let Some(avail_desc) = queue.iter(&mem).next() { let head_index = avail_desc.index; - let mut read_count = 0; let mut next_desc = Some(avail_desc); - self.iovec.clear(); + let mut iovecs = Vec::new(); while let Some(desc) = next_desc { - if desc.is_write_only() { - break; + if !desc.is_write_only() { + let buf = mem + .get_slice(desc.addr, desc.len as usize) + .map_err(NetQueuePairError::GuestMemory)? + .as_ptr(); + let iovec = libc::iovec { + iov_base: buf as *mut libc::c_void, + iov_len: desc.len as libc::size_t, + }; + iovecs.push(iovec); } - self.iovec.push((desc.addr, desc.len as usize)); - read_count += desc.len as usize; next_desc = desc.next_descriptor(); } - read_count = 0; - // Copy buffer from across multiple descriptors. - // TODO(performance - Issue #420): change this to use `writev()` instead of `write()` - // and get rid of the intermediate buffer. - for (desc_addr, desc_len) in self.iovec.drain(..) { - let limit = cmp::min((read_count + desc_len) as usize, self.frame_buf.len()); - - let read_result = - mem.read_slice(&mut self.frame_buf[read_count..limit as usize], desc_addr); - match read_result { - Ok(_) => { - // Increment by number of bytes actually read - read_count += limit - read_count; - } - Err(e) => { - println!("Failed to read slice: {:?}", e); - break; - } + if !iovecs.is_empty() { + let result = unsafe { + libc::writev( + tap.as_raw_fd() as libc::c_int, + iovecs.as_ptr() as *const libc::iovec, + iovecs.len() as libc::c_int, + ) + }; + if result < 0 { + let e = std::io::Error::last_os_error(); + error!("net: tx: failed writing to tap: {}", e); + queue.go_to_previous_position(); + return Err(NetQueuePairError::WriteTap(e)); } + + self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); + self.counter_frames += Wrapping(1); } - let write_result = tap.write(&self.frame_buf[..read_count]); - match write_result { - Ok(_) => {} - Err(e) => { - println!("net: tx: error failed to write to tap: {}", e); - } - }; - - self.counter_bytes += Wrapping((read_count - vnet_hdr_len()) as u64); - self.counter_frames += Wrapping(1); - queue.add_used(&mem, head_index, 0); queue.update_avail_event(&mem); } + + Ok(()) } } #[derive(Clone)] pub struct RxVirtio { - pub deferred_frame: bool, - pub deferred_irqs: bool, - pub bytes_read: usize, - pub frame_buf: [u8; MAX_BUFFER_SIZE], pub counter_bytes: Wrapping, pub counter_frames: Wrapping, } @@ -116,10 +99,6 @@ impl Default for RxVirtio { impl RxVirtio { pub fn new() -> Self { RxVirtio { - deferred_frame: false, - deferred_irqs: false, - bytes_read: 0, - frame_buf: [0u8; MAX_BUFFER_SIZE], counter_bytes: Wrapping(0), counter_frames: Wrapping(0), } @@ -128,63 +107,72 @@ impl RxVirtio { pub fn process_desc_chain( &mut self, mem: &GuestMemoryMmap, - mut next_desc: Option, + tap: &mut Tap, queue: &mut Queue, - ) -> bool { - let head_index = next_desc.as_ref().unwrap().index; - let mut write_count = 0; + ) -> Result { + let mut exhausted_descs = true; + while let Some(avail_desc) = queue.iter(&mem).next() { + let head_index = avail_desc.index; + let num_buffers_addr = mem.checked_offset(avail_desc.addr, 10).unwrap(); + let mut next_desc = Some(avail_desc); - // Copy from frame into buffer, which may span multiple descriptors. - loop { - match next_desc { - Some(desc) => { - if !desc.is_write_only() { - break; - } - let limit = cmp::min(write_count + desc.len as usize, self.bytes_read); - let source_slice = &self.frame_buf[write_count..limit]; - let write_result = mem.write_slice(source_slice, desc.addr); + let mut iovecs = Vec::new(); + while let Some(desc) = next_desc { + if desc.is_write_only() { + let buf = mem + .get_slice(desc.addr, desc.len as usize) + .map_err(NetQueuePairError::GuestMemory)? + .as_ptr(); + let iovec = libc::iovec { + iov_base: buf as *mut libc::c_void, + iov_len: desc.len as libc::size_t, + }; + iovecs.push(iovec); + } + next_desc = desc.next_descriptor(); + } - match write_result { - Ok(_) => { - write_count = limit; - } - Err(e) => { - error!("Failed to write slice: {:?}", e); + let len = if !iovecs.is_empty() { + let result = unsafe { + libc::readv( + tap.as_raw_fd() as libc::c_int, + iovecs.as_ptr() as *const libc::iovec, + iovecs.len() as libc::c_int, + ) + }; + if result < 0 { + let e = std::io::Error::last_os_error(); + exhausted_descs = false; + queue.go_to_previous_position(); + + if let Some(raw_err) = e.raw_os_error() { + if raw_err == libc::EAGAIN { break; } - }; - - if write_count >= self.bytes_read { - break; } - next_desc = desc.next_descriptor(); + + error!("net: rx: failed reading from tap: {}", e); + return Err(NetQueuePairError::ReadTap(e)); } - None => { - warn!("Receiving buffer is too small to hold frame of current size"); - break; - } - } + + // Write num_buffers to guest memory. We simply write 1 as we + // never spread the frame over more than one descriptor chain. + mem.write_obj(1u16, num_buffers_addr) + .map_err(NetQueuePairError::GuestMemory)?; + + self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); + self.counter_frames += Wrapping(1); + + result as u32 + } else { + 0 + }; + + queue.add_used(&mem, head_index, len); + queue.update_avail_event(&mem); } - self.counter_bytes += Wrapping((write_count - vnet_hdr_len()) as u64); - self.counter_frames += Wrapping(1); - - queue.add_used(&mem, head_index, write_count as u32); - queue.update_avail_event(&mem); - - // Mark that we have at least one pending packet and we need to interrupt the guest. - self.deferred_irqs = true; - - // Update the frame_buf buffer. - if write_count < self.bytes_read { - self.frame_buf.copy_within(write_count..self.bytes_read, 0); - self.bytes_read -= write_count; - false - } else { - self.bytes_read = 0; - true - } + Ok(exhausted_descs) } } @@ -204,8 +192,12 @@ pub enum NetQueuePairError { RegisterListener(io::Error), /// Error unregistering listener UnregisterListener(io::Error), + /// Error writing to the TAP device + WriteTap(io::Error), /// Error reading from the TAP device - FailedReadTap, + ReadTap(io::Error), + /// Error related to guest memory + GuestMemory(vm_memory::GuestMemoryError), } pub struct NetQueuePair { @@ -220,128 +212,15 @@ pub struct NetQueuePair { } impl NetQueuePair { - // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true - // if a buffer was used, and false if the frame must be deferred until a buffer - // is made available by the driver. - fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result { - let mem = self - .mem - .as_ref() - .ok_or(NetQueuePairError::NoMemoryConfigured) - .map(|m| m.memory())?; - let next_desc = queue.iter(&mem).next(); - - if next_desc.is_none() { - // Queue has no available descriptors - if self.rx_tap_listening { - unregister_listener( - self.epoll_fd.unwrap(), - self.tap.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(self.tap_event_id), - ) - .map_err(NetQueuePairError::UnregisterListener)?; - self.rx_tap_listening = false; - info!("Listener unregistered"); - } - return Ok(false); - } - - Ok(self.rx.process_desc_chain(&mem, next_desc, &mut queue)) - } - - fn process_rx(&mut self, queue: &mut Queue) -> Result { - // Read as many frames as possible. - loop { - match self.read_tap() { - Ok(count) => { - self.rx.bytes_read = count; - if !self.rx_single_frame(queue)? { - self.rx.deferred_frame = true; - break; - } - } - Err(e) => { - // The tap device is non-blocking, so any error aside from EAGAIN is - // unexpected. - match e.raw_os_error() { - Some(err) if err == EAGAIN => (), - _ => { - error!("Failed to read tap: {:?}", e); - return Err(NetQueuePairError::FailedReadTap); - } - }; - break; - } - } - } - - // Consume the counters from the Rx/Tx queues and accumulate into - // the counters for the device as whole. This consumption is needed - // to handle MQ. - self.counters - .rx_bytes - .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); - self.counters - .rx_frames - .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); - self.rx.counter_bytes = Wrapping(0); - self.rx.counter_frames = Wrapping(0); - - if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - let mem = self - .mem - .as_ref() - .ok_or(NetQueuePairError::NoMemoryConfigured) - .map(|m| m.memory())?; - Ok(queue.needs_notification(&mem, queue.next_used)) - } else { - Ok(false) - } - } - - pub fn resume_rx(&mut self, queue: &mut Queue) -> Result { - if !self.rx_tap_listening { - register_listener( - self.epoll_fd.unwrap(), - self.tap.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(self.tap_event_id), - ) - .map_err(NetQueuePairError::RegisterListener)?; - self.rx_tap_listening = true; - info!("Listener registered"); - } - if self.rx.deferred_frame { - if self.rx_single_frame(queue)? { - self.rx.deferred_frame = false; - // process_rx() was interrupted possibly before consuming all - // packets in the tap; try continuing now. - self.process_rx(queue) - } else if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - let mem = self - .mem - .as_ref() - .ok_or(NetQueuePairError::NoMemoryConfigured) - .map(|m| m.memory())?; - Ok(queue.needs_notification(&mem, queue.next_used)) - } else { - Ok(false) - } - } else { - Ok(false) - } - } - pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result { let mem = self .mem .as_ref() .ok_or(NetQueuePairError::NoMemoryConfigured) .map(|m| m.memory())?; - self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue); + + self.tx + .process_desc_chain(&mem, &mut self.tap, &mut queue)?; self.counters .tx_bytes @@ -355,26 +234,37 @@ impl NetQueuePair { Ok(queue.needs_notification(&mem, queue.next_used)) } - pub fn process_rx_tap(&mut self, mut queue: &mut Queue) -> Result { - if self.rx.deferred_frame - // Process a deferred frame first if available. Don't read from tap again - // until we manage to receive this deferred frame. - { - if self.rx_single_frame(&mut queue)? { - self.rx.deferred_frame = false; - self.process_rx(&mut queue) - } else if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - Ok(true) - } else { - Ok(false) - } - } else { - self.process_rx(&mut queue) - } - } + pub fn process_rx(&mut self, mut queue: &mut Queue) -> Result { + let mem = self + .mem + .as_ref() + .ok_or(NetQueuePairError::NoMemoryConfigured) + .map(|m| m.memory())?; - fn read_tap(&mut self) -> io::Result { - self.tap.read(&mut self.rx.frame_buf) + if self + .rx + .process_desc_chain(&mem, &mut self.tap, &mut queue)? + && self.rx_tap_listening + { + unregister_listener( + self.epoll_fd.unwrap(), + self.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(self.tap_event_id), + ) + .map_err(NetQueuePairError::UnregisterListener)?; + self.rx_tap_listening = false; + } + + self.counters + .rx_bytes + .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); + self.counters + .rx_frames + .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); + self.rx.counter_bytes = Wrapping(0); + self.rx.counter_frames = Wrapping(0); + + Ok(queue.needs_notification(&mem, queue.next_used)) } } diff --git a/vhost_user_net/src/lib.rs b/vhost_user_net/src/lib.rs index d79d7deba..e3ec4242b 100644 --- a/vhost_user_net/src/lib.rs +++ b/vhost_user_net/src/lib.rs @@ -70,6 +70,8 @@ pub enum Error { SocketParameterMissing, /// Underlying QueuePair error NetQueuePair(net_util::NetQueuePairError), + /// Failed registering the TAP listener + RegisterTapListener(io::Error), } pub const SYNTAX: &str = "vhost-user-net backend parameters \ @@ -212,15 +214,15 @@ impl VhostUserBackend for VhostUserNetBackend { let mut thread = self.threads[thread_id].lock().unwrap(); match device_event { 0 => { - let mut vring = vrings[0].write().unwrap(); - if thread - .net - .resume_rx(&mut vring.mut_queue()) - .map_err(Error::NetQueuePair)? - { - vring - .signal_used_queue() - .map_err(Error::FailedSignalingUsedQueue)? + if !thread.net.rx_tap_listening { + net_util::register_listener( + thread.net.epoll_fd.unwrap(), + thread.net.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(thread.net.tap_event_id), + ) + .map_err(Error::RegisterTapListener)?; + thread.net.rx_tap_listening = true; } } 1 => { @@ -239,7 +241,7 @@ impl VhostUserBackend for VhostUserNetBackend { let mut vring = vrings[0].write().unwrap(); if thread .net - .process_rx_tap(&mut vring.mut_queue()) + .process_rx(&mut vring.mut_queue()) .map_err(Error::NetQueuePair)? { vring diff --git a/virtio-devices/src/net.rs b/virtio-devices/src/net.rs index 99f10cdc6..da1b9c517 100644 --- a/virtio-devices/src/net.rs +++ b/virtio-devices/src/net.rs @@ -87,16 +87,15 @@ impl NetEpollHandler { error!("Failed to get rx queue event: {:?}", e); } - if self - .net - .resume_rx(&mut self.queue_pair[0]) - .map_err(DeviceError::NetQueuePair)? - || !self.driver_awake - { - self.signal_used_queue(&self.queue_pair[0])?; - debug!("Signalling RX queue"); - } else { - debug!("Not signalling RX queue"); + if !self.net.rx_tap_listening { + net_util::register_listener( + self.net.epoll_fd.unwrap(), + self.net.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(self.net.tap_event_id), + ) + .map_err(DeviceError::IoError)?; + self.net.rx_tap_listening = true; } Ok(()) @@ -124,7 +123,7 @@ impl NetEpollHandler { fn handle_rx_tap_event(&mut self) -> result::Result<(), DeviceError> { if self .net - .process_rx_tap(&mut self.queue_pair[0]) + .process_rx(&mut self.queue_pair[0]) .map_err(DeviceError::NetQueuePair)? || !self.driver_awake { diff --git a/virtio-devices/src/seccomp_filters.rs b/virtio-devices/src/seccomp_filters.rs index e114989ba..31b74950b 100644 --- a/virtio-devices/src/seccomp_filters.rs +++ b/virtio-devices/src/seccomp_filters.rs @@ -203,9 +203,11 @@ fn virtio_net_thread_rules() -> Vec { allow_syscall(libc::SYS_munmap), allow_syscall(libc::SYS_openat), allow_syscall(libc::SYS_read), + allow_syscall(libc::SYS_readv), allow_syscall(libc::SYS_rt_sigprocmask), allow_syscall(libc::SYS_sigaltstack), allow_syscall(libc::SYS_write), + allow_syscall(libc::SYS_writev), ] } diff --git a/vmm/src/seccomp_filters.rs b/vmm/src/seccomp_filters.rs index fe5ee4d64..86f6c50a2 100644 --- a/vmm/src/seccomp_filters.rs +++ b/vmm/src/seccomp_filters.rs @@ -353,6 +353,7 @@ fn vmm_thread_rules() -> Result, Error> { allow_syscall(libc::SYS_pwrite64), allow_syscall(libc::SYS_pwritev), allow_syscall(libc::SYS_read), + allow_syscall(libc::SYS_readv), #[cfg(target_arch = "x86_64")] allow_syscall(libc::SYS_readlink), allow_syscall(libc::SYS_recvfrom), @@ -392,6 +393,7 @@ fn vmm_thread_rules() -> Result, Error> { allow_syscall(libc::SYS_unlinkat), allow_syscall(libc::SYS_wait4), allow_syscall(libc::SYS_write), + allow_syscall(libc::SYS_writev), ]) }