diff --git a/net_util/src/queue_pair.rs b/net_util/src/queue_pair.rs index bea5436c7..75df2eaeb 100644 --- a/net_util/src/queue_pair.rs +++ b/net_util/src/queue_pair.rs @@ -2,27 +2,17 @@ // // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause -use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; -use libc::EAGAIN; -use std::cmp; +use super::{unregister_listener, vnet_hdr_len, Tap}; use std::io; -use std::io::{Read, Write}; use std::num::Wrapping; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use vm_memory::{Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; -use vm_virtio::{DescriptorChain, Queue}; - -/// The maximum buffer size when segmentation offload is enabled. This -/// includes the 12-byte virtio net header. -/// http://docs.oasis-open.org/virtio/virtio/v1.0/virtio-v1.0.html#x1-1740003 -const MAX_BUFFER_SIZE: usize = 65562; +use vm_memory::{Bytes, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryMmap}; +use vm_virtio::Queue; #[derive(Clone)] pub struct TxVirtio { - pub iovec: Vec<(GuestAddress, usize)>, - pub frame_buf: [u8; MAX_BUFFER_SIZE], pub counter_bytes: Wrapping, pub counter_frames: Wrapping, } @@ -36,73 +26,66 @@ impl Default for TxVirtio { impl TxVirtio { pub fn new() -> Self { TxVirtio { - iovec: Vec::new(), - frame_buf: [0u8; MAX_BUFFER_SIZE], counter_bytes: Wrapping(0), counter_frames: Wrapping(0), } } - pub fn process_desc_chain(&mut self, mem: &GuestMemoryMmap, tap: &mut Tap, queue: &mut Queue) { + pub fn process_desc_chain( + &mut self, + mem: &GuestMemoryMmap, + tap: &mut Tap, + queue: &mut Queue, + ) -> Result<(), NetQueuePairError> { while let Some(avail_desc) = queue.iter(&mem).next() { let head_index = avail_desc.index; - let mut read_count = 0; let mut next_desc = Some(avail_desc); - self.iovec.clear(); + let mut iovecs = Vec::new(); while let Some(desc) = next_desc { - if desc.is_write_only() { - break; + if !desc.is_write_only() { + let buf = mem + .get_slice(desc.addr, desc.len as usize) + .map_err(NetQueuePairError::GuestMemory)? + .as_ptr(); + let iovec = libc::iovec { + iov_base: buf as *mut libc::c_void, + iov_len: desc.len as libc::size_t, + }; + iovecs.push(iovec); } - self.iovec.push((desc.addr, desc.len as usize)); - read_count += desc.len as usize; next_desc = desc.next_descriptor(); } - read_count = 0; - // Copy buffer from across multiple descriptors. - // TODO(performance - Issue #420): change this to use `writev()` instead of `write()` - // and get rid of the intermediate buffer. - for (desc_addr, desc_len) in self.iovec.drain(..) { - let limit = cmp::min((read_count + desc_len) as usize, self.frame_buf.len()); - - let read_result = - mem.read_slice(&mut self.frame_buf[read_count..limit as usize], desc_addr); - match read_result { - Ok(_) => { - // Increment by number of bytes actually read - read_count += limit - read_count; - } - Err(e) => { - println!("Failed to read slice: {:?}", e); - break; - } + if !iovecs.is_empty() { + let result = unsafe { + libc::writev( + tap.as_raw_fd() as libc::c_int, + iovecs.as_ptr() as *const libc::iovec, + iovecs.len() as libc::c_int, + ) + }; + if result < 0 { + let e = std::io::Error::last_os_error(); + error!("net: tx: failed writing to tap: {}", e); + queue.go_to_previous_position(); + return Err(NetQueuePairError::WriteTap(e)); } + + self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); + self.counter_frames += Wrapping(1); } - let write_result = tap.write(&self.frame_buf[..read_count]); - match write_result { - Ok(_) => {} - Err(e) => { - println!("net: tx: error failed to write to tap: {}", e); - } - }; - - self.counter_bytes += Wrapping((read_count - vnet_hdr_len()) as u64); - self.counter_frames += Wrapping(1); - queue.add_used(&mem, head_index, 0); queue.update_avail_event(&mem); } + + Ok(()) } } #[derive(Clone)] pub struct RxVirtio { - pub deferred_frame: bool, - pub deferred_irqs: bool, - pub bytes_read: usize, - pub frame_buf: [u8; MAX_BUFFER_SIZE], pub counter_bytes: Wrapping, pub counter_frames: Wrapping, } @@ -116,10 +99,6 @@ impl Default for RxVirtio { impl RxVirtio { pub fn new() -> Self { RxVirtio { - deferred_frame: false, - deferred_irqs: false, - bytes_read: 0, - frame_buf: [0u8; MAX_BUFFER_SIZE], counter_bytes: Wrapping(0), counter_frames: Wrapping(0), } @@ -128,63 +107,72 @@ impl RxVirtio { pub fn process_desc_chain( &mut self, mem: &GuestMemoryMmap, - mut next_desc: Option, + tap: &mut Tap, queue: &mut Queue, - ) -> bool { - let head_index = next_desc.as_ref().unwrap().index; - let mut write_count = 0; + ) -> Result { + let mut exhausted_descs = true; + while let Some(avail_desc) = queue.iter(&mem).next() { + let head_index = avail_desc.index; + let num_buffers_addr = mem.checked_offset(avail_desc.addr, 10).unwrap(); + let mut next_desc = Some(avail_desc); - // Copy from frame into buffer, which may span multiple descriptors. - loop { - match next_desc { - Some(desc) => { - if !desc.is_write_only() { - break; - } - let limit = cmp::min(write_count + desc.len as usize, self.bytes_read); - let source_slice = &self.frame_buf[write_count..limit]; - let write_result = mem.write_slice(source_slice, desc.addr); + let mut iovecs = Vec::new(); + while let Some(desc) = next_desc { + if desc.is_write_only() { + let buf = mem + .get_slice(desc.addr, desc.len as usize) + .map_err(NetQueuePairError::GuestMemory)? + .as_ptr(); + let iovec = libc::iovec { + iov_base: buf as *mut libc::c_void, + iov_len: desc.len as libc::size_t, + }; + iovecs.push(iovec); + } + next_desc = desc.next_descriptor(); + } - match write_result { - Ok(_) => { - write_count = limit; - } - Err(e) => { - error!("Failed to write slice: {:?}", e); + let len = if !iovecs.is_empty() { + let result = unsafe { + libc::readv( + tap.as_raw_fd() as libc::c_int, + iovecs.as_ptr() as *const libc::iovec, + iovecs.len() as libc::c_int, + ) + }; + if result < 0 { + let e = std::io::Error::last_os_error(); + exhausted_descs = false; + queue.go_to_previous_position(); + + if let Some(raw_err) = e.raw_os_error() { + if raw_err == libc::EAGAIN { break; } - }; - - if write_count >= self.bytes_read { - break; } - next_desc = desc.next_descriptor(); + + error!("net: rx: failed reading from tap: {}", e); + return Err(NetQueuePairError::ReadTap(e)); } - None => { - warn!("Receiving buffer is too small to hold frame of current size"); - break; - } - } + + // Write num_buffers to guest memory. We simply write 1 as we + // never spread the frame over more than one descriptor chain. + mem.write_obj(1u16, num_buffers_addr) + .map_err(NetQueuePairError::GuestMemory)?; + + self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); + self.counter_frames += Wrapping(1); + + result as u32 + } else { + 0 + }; + + queue.add_used(&mem, head_index, len); + queue.update_avail_event(&mem); } - self.counter_bytes += Wrapping((write_count - vnet_hdr_len()) as u64); - self.counter_frames += Wrapping(1); - - queue.add_used(&mem, head_index, write_count as u32); - queue.update_avail_event(&mem); - - // Mark that we have at least one pending packet and we need to interrupt the guest. - self.deferred_irqs = true; - - // Update the frame_buf buffer. - if write_count < self.bytes_read { - self.frame_buf.copy_within(write_count..self.bytes_read, 0); - self.bytes_read -= write_count; - false - } else { - self.bytes_read = 0; - true - } + Ok(exhausted_descs) } } @@ -204,8 +192,12 @@ pub enum NetQueuePairError { RegisterListener(io::Error), /// Error unregistering listener UnregisterListener(io::Error), + /// Error writing to the TAP device + WriteTap(io::Error), /// Error reading from the TAP device - FailedReadTap, + ReadTap(io::Error), + /// Error related to guest memory + GuestMemory(vm_memory::GuestMemoryError), } pub struct NetQueuePair { @@ -220,128 +212,15 @@ pub struct NetQueuePair { } impl NetQueuePair { - // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true - // if a buffer was used, and false if the frame must be deferred until a buffer - // is made available by the driver. - fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result { - let mem = self - .mem - .as_ref() - .ok_or(NetQueuePairError::NoMemoryConfigured) - .map(|m| m.memory())?; - let next_desc = queue.iter(&mem).next(); - - if next_desc.is_none() { - // Queue has no available descriptors - if self.rx_tap_listening { - unregister_listener( - self.epoll_fd.unwrap(), - self.tap.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(self.tap_event_id), - ) - .map_err(NetQueuePairError::UnregisterListener)?; - self.rx_tap_listening = false; - info!("Listener unregistered"); - } - return Ok(false); - } - - Ok(self.rx.process_desc_chain(&mem, next_desc, &mut queue)) - } - - fn process_rx(&mut self, queue: &mut Queue) -> Result { - // Read as many frames as possible. - loop { - match self.read_tap() { - Ok(count) => { - self.rx.bytes_read = count; - if !self.rx_single_frame(queue)? { - self.rx.deferred_frame = true; - break; - } - } - Err(e) => { - // The tap device is non-blocking, so any error aside from EAGAIN is - // unexpected. - match e.raw_os_error() { - Some(err) if err == EAGAIN => (), - _ => { - error!("Failed to read tap: {:?}", e); - return Err(NetQueuePairError::FailedReadTap); - } - }; - break; - } - } - } - - // Consume the counters from the Rx/Tx queues and accumulate into - // the counters for the device as whole. This consumption is needed - // to handle MQ. - self.counters - .rx_bytes - .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); - self.counters - .rx_frames - .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); - self.rx.counter_bytes = Wrapping(0); - self.rx.counter_frames = Wrapping(0); - - if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - let mem = self - .mem - .as_ref() - .ok_or(NetQueuePairError::NoMemoryConfigured) - .map(|m| m.memory())?; - Ok(queue.needs_notification(&mem, queue.next_used)) - } else { - Ok(false) - } - } - - pub fn resume_rx(&mut self, queue: &mut Queue) -> Result { - if !self.rx_tap_listening { - register_listener( - self.epoll_fd.unwrap(), - self.tap.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(self.tap_event_id), - ) - .map_err(NetQueuePairError::RegisterListener)?; - self.rx_tap_listening = true; - info!("Listener registered"); - } - if self.rx.deferred_frame { - if self.rx_single_frame(queue)? { - self.rx.deferred_frame = false; - // process_rx() was interrupted possibly before consuming all - // packets in the tap; try continuing now. - self.process_rx(queue) - } else if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - let mem = self - .mem - .as_ref() - .ok_or(NetQueuePairError::NoMemoryConfigured) - .map(|m| m.memory())?; - Ok(queue.needs_notification(&mem, queue.next_used)) - } else { - Ok(false) - } - } else { - Ok(false) - } - } - pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result { let mem = self .mem .as_ref() .ok_or(NetQueuePairError::NoMemoryConfigured) .map(|m| m.memory())?; - self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue); + + self.tx + .process_desc_chain(&mem, &mut self.tap, &mut queue)?; self.counters .tx_bytes @@ -355,26 +234,37 @@ impl NetQueuePair { Ok(queue.needs_notification(&mem, queue.next_used)) } - pub fn process_rx_tap(&mut self, mut queue: &mut Queue) -> Result { - if self.rx.deferred_frame - // Process a deferred frame first if available. Don't read from tap again - // until we manage to receive this deferred frame. - { - if self.rx_single_frame(&mut queue)? { - self.rx.deferred_frame = false; - self.process_rx(&mut queue) - } else if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - Ok(true) - } else { - Ok(false) - } - } else { - self.process_rx(&mut queue) - } - } + pub fn process_rx(&mut self, mut queue: &mut Queue) -> Result { + let mem = self + .mem + .as_ref() + .ok_or(NetQueuePairError::NoMemoryConfigured) + .map(|m| m.memory())?; - fn read_tap(&mut self) -> io::Result { - self.tap.read(&mut self.rx.frame_buf) + if self + .rx + .process_desc_chain(&mem, &mut self.tap, &mut queue)? + && self.rx_tap_listening + { + unregister_listener( + self.epoll_fd.unwrap(), + self.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(self.tap_event_id), + ) + .map_err(NetQueuePairError::UnregisterListener)?; + self.rx_tap_listening = false; + } + + self.counters + .rx_bytes + .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); + self.counters + .rx_frames + .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); + self.rx.counter_bytes = Wrapping(0); + self.rx.counter_frames = Wrapping(0); + + Ok(queue.needs_notification(&mem, queue.next_used)) } } diff --git a/vhost_user_net/src/lib.rs b/vhost_user_net/src/lib.rs index d79d7deba..e3ec4242b 100644 --- a/vhost_user_net/src/lib.rs +++ b/vhost_user_net/src/lib.rs @@ -70,6 +70,8 @@ pub enum Error { SocketParameterMissing, /// Underlying QueuePair error NetQueuePair(net_util::NetQueuePairError), + /// Failed registering the TAP listener + RegisterTapListener(io::Error), } pub const SYNTAX: &str = "vhost-user-net backend parameters \ @@ -212,15 +214,15 @@ impl VhostUserBackend for VhostUserNetBackend { let mut thread = self.threads[thread_id].lock().unwrap(); match device_event { 0 => { - let mut vring = vrings[0].write().unwrap(); - if thread - .net - .resume_rx(&mut vring.mut_queue()) - .map_err(Error::NetQueuePair)? - { - vring - .signal_used_queue() - .map_err(Error::FailedSignalingUsedQueue)? + if !thread.net.rx_tap_listening { + net_util::register_listener( + thread.net.epoll_fd.unwrap(), + thread.net.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(thread.net.tap_event_id), + ) + .map_err(Error::RegisterTapListener)?; + thread.net.rx_tap_listening = true; } } 1 => { @@ -239,7 +241,7 @@ impl VhostUserBackend for VhostUserNetBackend { let mut vring = vrings[0].write().unwrap(); if thread .net - .process_rx_tap(&mut vring.mut_queue()) + .process_rx(&mut vring.mut_queue()) .map_err(Error::NetQueuePair)? { vring diff --git a/virtio-devices/src/net.rs b/virtio-devices/src/net.rs index 99f10cdc6..da1b9c517 100644 --- a/virtio-devices/src/net.rs +++ b/virtio-devices/src/net.rs @@ -87,16 +87,15 @@ impl NetEpollHandler { error!("Failed to get rx queue event: {:?}", e); } - if self - .net - .resume_rx(&mut self.queue_pair[0]) - .map_err(DeviceError::NetQueuePair)? - || !self.driver_awake - { - self.signal_used_queue(&self.queue_pair[0])?; - debug!("Signalling RX queue"); - } else { - debug!("Not signalling RX queue"); + if !self.net.rx_tap_listening { + net_util::register_listener( + self.net.epoll_fd.unwrap(), + self.net.tap.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(self.net.tap_event_id), + ) + .map_err(DeviceError::IoError)?; + self.net.rx_tap_listening = true; } Ok(()) @@ -124,7 +123,7 @@ impl NetEpollHandler { fn handle_rx_tap_event(&mut self) -> result::Result<(), DeviceError> { if self .net - .process_rx_tap(&mut self.queue_pair[0]) + .process_rx(&mut self.queue_pair[0]) .map_err(DeviceError::NetQueuePair)? || !self.driver_awake { diff --git a/virtio-devices/src/seccomp_filters.rs b/virtio-devices/src/seccomp_filters.rs index e114989ba..31b74950b 100644 --- a/virtio-devices/src/seccomp_filters.rs +++ b/virtio-devices/src/seccomp_filters.rs @@ -203,9 +203,11 @@ fn virtio_net_thread_rules() -> Vec { allow_syscall(libc::SYS_munmap), allow_syscall(libc::SYS_openat), allow_syscall(libc::SYS_read), + allow_syscall(libc::SYS_readv), allow_syscall(libc::SYS_rt_sigprocmask), allow_syscall(libc::SYS_sigaltstack), allow_syscall(libc::SYS_write), + allow_syscall(libc::SYS_writev), ] } diff --git a/vmm/src/seccomp_filters.rs b/vmm/src/seccomp_filters.rs index fe5ee4d64..86f6c50a2 100644 --- a/vmm/src/seccomp_filters.rs +++ b/vmm/src/seccomp_filters.rs @@ -353,6 +353,7 @@ fn vmm_thread_rules() -> Result, Error> { allow_syscall(libc::SYS_pwrite64), allow_syscall(libc::SYS_pwritev), allow_syscall(libc::SYS_read), + allow_syscall(libc::SYS_readv), #[cfg(target_arch = "x86_64")] allow_syscall(libc::SYS_readlink), allow_syscall(libc::SYS_recvfrom), @@ -392,6 +393,7 @@ fn vmm_thread_rules() -> Result, Error> { allow_syscall(libc::SYS_unlinkat), allow_syscall(libc::SYS_wait4), allow_syscall(libc::SYS_write), + allow_syscall(libc::SYS_writev), ]) }