diff --git a/vhost_user_backend/src/lib.rs b/vhost_user_backend/src/lib.rs index 6de50798d..f627e5d7f 100644 --- a/vhost_user_backend/src/lib.rs +++ b/vhost_user_backend/src/lib.rs @@ -310,6 +310,12 @@ pub struct VringWorker { epoll_file: File, } +impl AsRawFd for VringWorker { + fn as_raw_fd(&self) -> RawFd { + self.epoll_file.as_raw_fd() + } +} + impl VringWorker { fn run(&self, handler: VringEpollHandler) -> VringWorkerResult<()> { const EPOLL_EVENTS_LEN: usize = 100; diff --git a/vhost_user_net/src/lib.rs b/vhost_user_net/src/lib.rs index c8845248f..df84a857b 100644 --- a/vhost_user_net/src/lib.rs +++ b/vhost_user_net/src/lib.rs @@ -13,11 +13,10 @@ extern crate vhost_user_backend; extern crate vm_virtio; extern crate vmm; -use libc::{self, EAGAIN, EFD_NONBLOCK}; +use libc::{self, EFD_NONBLOCK}; use log::*; use net_util::{MacAddr, Tap}; use std::fmt; -use std::io::Read; use std::io::{self}; use std::net::Ipv4Addr; use std::os::unix::io::AsRawFd; @@ -28,9 +27,9 @@ use vhost_rs::vhost_user::message::*; use vhost_rs::vhost_user::{Error as VhostUserError, Listener}; use vhost_user_backend::{VhostUserBackend, VhostUserDaemon, Vring, VringWorker}; use virtio_bindings::bindings::virtio_net::*; -use vm_memory::GuestMemoryMmap; +use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap}; use vm_virtio::net_util::{open_tap, RxVirtio, TxVirtio}; -use vm_virtio::Queue; +use vm_virtio::NetQueuePair; use vmm::config::{OptionParser, OptionParserError}; use vmm_sys_util::eventfd::EventFd; @@ -55,7 +54,7 @@ pub enum Error { /// Failed to parse configuration string FailedConfigParse(OptionParserError), /// Failed to signal used queue. - FailedSignalingUsedQueue, + FailedSignalingUsedQueue(io::Error), /// Failed to handle event other than input event. HandleEventNotEpollIn, /// Failed to handle unknown event. @@ -70,6 +69,8 @@ pub enum Error { OpenTap(vm_virtio::net_util::Error), /// No socket provided SocketParameterMissing, + /// Underlying QueuePair error + NetQueuePair(vm_virtio::Error), } pub const SYNTAX: &str = "vhost-user-net backend parameters \ @@ -91,121 +92,30 @@ impl std::convert::From for std::io::Error { } struct VhostUserNetThread { - mem: Option, + net: NetQueuePair, vring_worker: Option>, kill_evt: EventFd, - tap: Tap, - rx: RxVirtio, - tx: TxVirtio, - rx_tap_listening: bool, } impl VhostUserNetThread { /// Create a new virtio network device with the given TAP interface. fn new(tap: Tap) -> Result { Ok(VhostUserNetThread { - mem: None, vring_worker: None, kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, - tap, - rx: RxVirtio::new(), - tx: TxVirtio::new(), - rx_tap_listening: false, + net: NetQueuePair { + mem: None, + tap, + rx: RxVirtio::new(), + tx: TxVirtio::new(), + rx_tap_listening: false, + epoll_fd: None, + }, }) } - // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true - // if a buffer was used, and false if the frame must be deferred until a buffer - // is made available by the driver. - fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result { - let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?; - - let next_desc = queue.iter(&mem).next(); - - if next_desc.is_none() { - // Queue has no available descriptors - if self.rx_tap_listening { - self.vring_worker - .as_ref() - .unwrap() - .unregister_listener(self.tap.as_raw_fd(), epoll::Events::EPOLLIN, 2) - .unwrap(); - self.rx_tap_listening = false; - } - return Ok(false); - } - - let write_complete = self.rx.process_desc_chain(&mem, next_desc, &mut queue); - - Ok(write_complete) - } - - fn process_rx(&mut self, vring: &mut Vring) -> Result<()> { - // Read as many frames as possible. - loop { - match self.read_tap() { - Ok(count) => { - self.rx.bytes_read = count; - if !self.rx_single_frame(&mut vring.mut_queue())? { - self.rx.deferred_frame = true; - break; - } - } - Err(e) => { - // The tap device is non-blocking, so any error aside from EAGAIN is - // unexpected. - match e.raw_os_error() { - Some(err) if err == EAGAIN => (), - _ => { - error!("Failed to read tap: {:?}", e); - return Err(Error::FailedReadTap); - } - }; - break; - } - } - } - if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - vring.signal_used_queue().unwrap(); - Ok(()) - } else { - Ok(()) - } - } - - fn resume_rx(&mut self, vring: &mut Vring) -> Result<()> { - if self.rx.deferred_frame { - if self.rx_single_frame(&mut vring.mut_queue())? { - self.rx.deferred_frame = false; - // process_rx() was interrupted possibly before consuming all - // packets in the tap; try continuing now. - self.process_rx(vring) - } else if self.rx.deferred_irqs { - self.rx.deferred_irqs = false; - vring.signal_used_queue().unwrap(); - Ok(()) - } else { - Ok(()) - } - } else { - Ok(()) - } - } - - fn process_tx(&mut self, mut queue: &mut Queue) -> Result<()> { - let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?; - - self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue); - - Ok(()) - } - - fn read_tap(&mut self) -> io::Result { - self.tap.read(&mut self.rx.frame_buf) - } - pub fn set_vring_worker(&mut self, vring_worker: Option>) { + self.net.epoll_fd = Some(vring_worker.as_ref().unwrap().as_raw_fd()); self.vring_worker = vring_worker; } } @@ -280,7 +190,7 @@ impl VhostUserBackend for VhostUserNetBackend { fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> { for thread in self.threads.iter() { - thread.lock().unwrap().mem = Some(mem.clone()); + thread.lock().unwrap().net.mem = Some(GuestMemoryAtomic::new(mem.clone())); } Ok(()) } @@ -299,37 +209,39 @@ impl VhostUserBackend for VhostUserNetBackend { let mut thread = self.threads[thread_id].lock().unwrap(); match device_event { 0 => { - thread.resume_rx(&mut vrings[0].write().unwrap())?; - - if !thread.rx_tap_listening { - thread.vring_worker.as_ref().unwrap().register_listener( - thread.tap.as_raw_fd(), - epoll::Events::EPOLLIN, - 2, - )?; - thread.rx_tap_listening = true; + let mut vring = vrings[0].write().unwrap(); + if thread + .net + .resume_rx(&mut vring.mut_queue()) + .map_err(Error::NetQueuePair)? + { + vring + .signal_used_queue() + .map_err(Error::FailedSignalingUsedQueue)? } } 1 => { let mut vring = vrings[1].write().unwrap(); - thread.process_tx(vring.mut_queue())?; - vring.signal_used_queue()?; + if thread + .net + .process_tx(&mut vring.mut_queue()) + .map_err(Error::NetQueuePair)? + { + vring + .signal_used_queue() + .map_err(Error::FailedSignalingUsedQueue)? + } } 2 => { let mut vring = vrings[0].write().unwrap(); - if thread.rx.deferred_frame - // Process a deferred frame first if available. Don't read from tap again - // until we manage to receive this deferred frame. + if thread + .net + .process_rx_tap(&mut vring.mut_queue()) + .map_err(Error::NetQueuePair)? { - if thread.rx_single_frame(&mut vring.mut_queue())? { - thread.rx.deferred_frame = false; - thread.process_rx(&mut vring)?; - } else if thread.rx.deferred_irqs { - thread.rx.deferred_irqs = false; - vring.signal_used_queue()?; - } - } else { - thread.process_rx(&mut vring)?; + vring + .signal_used_queue() + .map_err(Error::FailedSignalingUsedQueue)? } } _ => return Err(Error::HandleEventUnknownEvent.into()),