From 5a3472847d9d742d1da303f7ead8e1b8122fda16 Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Wed, 4 Sep 2019 12:29:57 -0700 Subject: [PATCH] vm-virtio: vsock: Implement VsockEpollHandler This is the last step connecting the dots between the virtio-vsock device and the bulk of the logic hosted in the unix and csm modules. Signed-off-by: Sebastien Boeuf --- vm-virtio/src/vsock/device.rs | 287 +++++++++++++++++++++++++++------- 1 file changed, 233 insertions(+), 54 deletions(-) diff --git a/vm-virtio/src/vsock/device.rs b/vm-virtio/src/vsock/device.rs index 71e66611f..7d76e24ad 100644 --- a/vm-virtio/src/vsock/device.rs +++ b/vm-virtio/src/vsock/device.rs @@ -36,12 +36,12 @@ use std::result; use std::sync::{Arc, RwLock}; use std::thread; -use super::VsockBackend; +use super::{VsockBackend, VsockPacket}; use crate::Error as DeviceError; use crate::VirtioInterrupt; use crate::{ ActivateError, ActivateResult, DeviceEventT, Queue, VirtioDevice, VirtioDeviceType, - VIRTIO_F_IN_ORDER, VIRTIO_F_VERSION_1, + VirtioInterruptType, VIRTIO_F_IN_ORDER, VIRTIO_F_VERSION_1, }; use byteorder::{ByteOrder, LittleEndian}; use vm_memory::GuestMemoryMmap; @@ -52,24 +52,142 @@ const NUM_QUEUES: usize = 3; const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE; NUM_QUEUES]; // New descriptors are pending on the rx queue. -const RX_QUEUE_EVENT: DeviceEventT = 0; +pub const RX_QUEUE_EVENT: DeviceEventT = 0; // New descriptors are pending on the tx queue. -const TX_QUEUE_EVENT: DeviceEventT = 1; +pub const TX_QUEUE_EVENT: DeviceEventT = 1; // New descriptors are pending on the event queue. -const EVT_QUEUE_EVENT: DeviceEventT = 2; +pub const EVT_QUEUE_EVENT: DeviceEventT = 2; +// Notification coming from the backend. +pub const BACKEND_EVENT: DeviceEventT = 3; // The device has been dropped. -const KILL_EVENT: DeviceEventT = 3; +pub const KILL_EVENT: DeviceEventT = 4; +pub const EVENTS_LEN: usize = 5; -struct VsockEpollHandler { - _cid: u64, - _mem: Arc>, - _queues: Vec, - queue_evts: Vec, - kill_evt: EventFd, - _interrupt_cb: Arc, +/// The `VsockEpollHandler` implements the runtime logic of our vsock device: +/// 1. Respond to TX queue events by wrapping virtio buffers into `VsockPacket`s, then sending those +/// packets to the `VsockBackend`; +/// 2. Forward backend FD event notifications to the `VsockBackend`; +/// 3. Fetch incoming packets from the `VsockBackend` and place them into the virtio RX queue; +/// 4. Whenever we have processed some virtio buffers (either TX or RX), let the driver know by +/// raising our assigned IRQ. +/// +/// In a nutshell, the `VsockEpollHandler` logic looks like this: +/// - on TX queue event: +/// - fetch all packets from the TX queue and send them to the backend; then +/// - if the backend has queued up any incoming packets, fetch them into any available RX buffers. +/// - on RX queue event: +/// - fetch any incoming packets, queued up by the backend, into newly available RX buffers. +/// - on backend event: +/// - forward the event to the backend; then +/// - again, attempt to fetch any incoming packets queued by the backend into virtio RX buffers. +/// +pub struct VsockEpollHandler { + pub mem: Arc>, + pub queues: Vec, + pub queue_evts: Vec, + pub kill_evt: EventFd, + pub interrupt_cb: Arc, + pub backend: B, } -impl VsockEpollHandler { +impl VsockEpollHandler +where + B: VsockBackend, +{ + /// Signal the guest driver that we've used some virtio buffers that it had previously made + /// available. + /// + fn signal_used_queue(&self, queue: &Queue) -> result::Result<(), DeviceError> { + debug!("vsock: raising IRQ"); + + (self.interrupt_cb)(&VirtioInterruptType::Queue, Some(queue)).map_err(|e| { + error!("Failed to signal used queue: {:?}", e); + DeviceError::FailedSignalingUsedQueue(e) + }) + } + + /// Walk the driver-provided RX queue buffers and attempt to fill them up with any data that we + /// have pending. + /// + fn process_rx(&mut self) -> result::Result<(), DeviceError> { + debug!("vsock: epoll_handler::process_rx()"); + + let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize]; + let mut used_count = 0; + let mem = self.mem.read().unwrap(); + for avail_desc in self.queues[0].iter(&mem) { + let used_len = match VsockPacket::from_rx_virtq_head(&avail_desc) { + Ok(mut pkt) => { + if self.backend.recv_pkt(&mut pkt).is_ok() { + pkt.hdr().len() as u32 + pkt.len() + } else { + // We are using a consuming iterator over the virtio buffers, so, if we can't + // fill in this buffer, we'll need to undo the last iterator step. + self.queues[0].go_to_previous_position(); + break; + } + } + Err(e) => { + warn!("vsock: RX queue error: {:?}", e); + 0 + } + }; + + used_desc_heads[used_count] = (avail_desc.index, used_len); + used_count += 1; + } + + for &(desc_index, len) in &used_desc_heads[..used_count] { + self.queues[0].add_used(&mem, desc_index, len); + } + + if used_count > 0 { + self.signal_used_queue(&self.queues[0]) + } else { + Ok(()) + } + } + + /// Walk the driver-provided TX queue buffers, package them up as vsock packets, and send them to + /// the backend for processing. + /// + fn process_tx(&mut self) -> result::Result<(), DeviceError> { + debug!("vsock: epoll_handler::process_tx()"); + + let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize]; + let mut used_count = 0; + let mem = self.mem.read().unwrap(); + for avail_desc in self.queues[1].iter(&mem) { + let pkt = match VsockPacket::from_tx_virtq_head(&avail_desc) { + Ok(pkt) => pkt, + Err(e) => { + error!("vsock: error reading TX packet: {:?}", e); + used_desc_heads[used_count] = (avail_desc.index, 0); + used_count += 1; + continue; + } + }; + + if self.backend.send_pkt(&pkt).is_err() { + self.queues[1].go_to_previous_position(); + break; + } + + used_desc_heads[used_count] = (avail_desc.index, 0); + used_count += 1; + } + + for &(desc_index, len) in &used_desc_heads[..used_count] { + self.queues[1].add_used(&mem, desc_index, len); + } + + if used_count > 0 { + self.signal_used_queue(&self.queues[1]) + } else { + Ok(()) + } + } + fn run(&mut self) -> result::Result<(), DeviceError> { // Create the epoll file descriptor let epoll_fd = epoll::create(true).map_err(DeviceError::EpollCreateFd)?; @@ -96,6 +214,13 @@ impl VsockEpollHandler { epoll::Event::new(epoll::Events::EPOLLIN, u64::from(EVT_QUEUE_EVENT)), ) .map_err(DeviceError::EpollCtl)?; + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_ADD, + self.backend.get_polled_fd(), + epoll::Event::new(self.backend.get_polled_evset(), u64::from(BACKEND_EVENT)), + ) + .map_err(DeviceError::EpollCtl)?; epoll::ctl( epoll_fd, epoll::ControlOptions::EPOLL_CTL_ADD, @@ -104,7 +229,7 @@ impl VsockEpollHandler { ) .map_err(DeviceError::EpollCtl)?; - let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); 4]; + let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EVENTS_LEN]; 'epoll: loop { let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) { @@ -125,52 +250,106 @@ impl VsockEpollHandler { }; for event in events.iter().take(num_events) { - let ev_type = event.data as u16; + let evset = match epoll::Events::from_bits(event.events) { + Some(evset) => evset, + None => { + let evbits = event.events; + warn!("epoll: ignoring unknown event set: 0x{:x}", evbits); + continue; + } + }; - match ev_type { - RX_QUEUE_EVENT => { - if let Err(e) = self.queue_evts[0].read() { - error!("Failed to get queue event: {:?}", e); - break 'epoll; - } + let ev_type = event.data as DeviceEventT; - debug!("RX queue event received"); - } - TX_QUEUE_EVENT => { - if let Err(e) = self.queue_evts[1].read() { - error!("Failed to get queue event: {:?}", e); - break 'epoll; - } - - debug!("TX queue event received"); - } - EVT_QUEUE_EVENT => { - if let Err(e) = self.queue_evts[2].read() { - error!("Failed to get queue event: {:?}", e); - break 'epoll; - } - - debug!("EVT queue event received"); - } - KILL_EVENT => { - debug!("KILL_EVENT received, stopping epoll loop"); - break 'epoll; - } - _ => { - error!("Unknown event for virtio-vsock"); - } + if self.handle_event(ev_type, evset)? { + break 'epoll; } } } Ok(()) } + + pub fn handle_event( + &mut self, + device_event: DeviceEventT, + evset: epoll::Events, + ) -> Result { + match device_event { + RX_QUEUE_EVENT => { + debug!("vsock: RX queue event"); + if let Err(e) = self.queue_evts[0].read() { + error!("Failed to get RX queue event: {:?}", e); + return Err(DeviceError::FailedReadingQueue { + event_type: "rx queue event", + underlying: e, + }); + } else if self.backend.has_pending_rx() { + self.process_rx()?; + } + } + TX_QUEUE_EVENT => { + debug!("vsock: TX queue event"); + if let Err(e) = self.queue_evts[1].read() { + error!("Failed to get TX queue event: {:?}", e); + return Err(DeviceError::FailedReadingQueue { + event_type: "tx queue event", + underlying: e, + }); + } else { + self.process_tx()?; + // The backend may have queued up responses to the packets we sent during TX queue + // processing. If that happened, we need to fetch those responses and place them + // into RX buffers. + if self.backend.has_pending_rx() { + self.process_rx()?; + } + } + } + EVT_QUEUE_EVENT => { + debug!("vsock: EVT queue event"); + if let Err(e) = self.queue_evts[2].read() { + error!("Failed to get EVT queue event: {:?}", e); + return Err(DeviceError::FailedReadingQueue { + event_type: "evt queue event", + underlying: e, + }); + } + } + BACKEND_EVENT => { + debug!("vsock: backend event"); + self.backend.notify(evset); + // After the backend has been kicked, it might've freed up some resources, so we + // can attempt to send it more data to process. + // In particular, if `self.backend.send_pkt()` halted the TX queue processing (by + // reurning an error) at some point in the past, now is the time to try walking the + // TX queue again. + self.process_tx()?; + if self.backend.has_pending_rx() { + self.process_rx()?; + } + } + KILL_EVENT => { + debug!("KILL_EVENT received, stopping epoll loop"); + return Ok(true); + } + other => { + error!("Unknown event for virtio-vsock"); + return Err(DeviceError::UnknownEvent { + device: "vsock", + event: other, + }); + } + } + + Ok(false) + } } /// Virtio device exposing virtual socket to the guest. pub struct Vsock { cid: u64, - _backend: Option, + backend: Option, kill_evt: Option, avail_features: u64, acked_features: u64, @@ -187,7 +366,7 @@ where Ok(Vsock { cid, - _backend: Some(backend), + backend: Some(backend), kill_evt: None, avail_features, acked_features: 0u64, @@ -209,7 +388,7 @@ where impl VirtioDevice for Vsock where - B: VsockBackend, + B: VsockBackend + 'static, { fn device_type(&self) -> u32 { VirtioDeviceType::TYPE_VSOCK as u32 @@ -303,12 +482,12 @@ where self.kill_evt = Some(self_kill_evt); let mut handler = VsockEpollHandler { - _cid: self.cid, - _mem: mem, - _queues: queues, + mem, + queues, queue_evts, kill_evt, - _interrupt_cb: interrupt_cb, + interrupt_cb, + backend: self.backend.take().unwrap(), }; let worker_result = thread::Builder::new()