From 8fbc4b40041b4d9287d7a248e9c63e9abb9ce0f7 Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Tue, 11 Aug 2020 18:38:21 +0200 Subject: [PATCH] virtio-devices: vsock: Port to EpollHelper Migrate to EpollHelper so as to remove code that is duplicated between multiple virtio devices. Signed-off-by: Sebastien Boeuf --- virtio-devices/src/vsock/device.rs | 303 ++++++++++------------------- virtio-devices/src/vsock/mod.rs | 27 ++- 2 files changed, 113 insertions(+), 217 deletions(-) diff --git a/virtio-devices/src/vsock/device.rs b/virtio-devices/src/vsock/device.rs index 1bf144f3f..5ff505932 100644 --- a/virtio-devices/src/vsock/device.rs +++ b/virtio-devices/src/vsock/device.rs @@ -12,8 +12,9 @@ use super::{VsockBackend, VsockPacket}; use crate::Error as DeviceError; use crate::VirtioInterrupt; use crate::{ - ActivateError, ActivateResult, DeviceEventT, Queue, VirtioDevice, VirtioDeviceType, - VirtioInterruptType, VIRTIO_F_IN_ORDER, VIRTIO_F_IOMMU_PLATFORM, VIRTIO_F_VERSION_1, + ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue, + VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST, + VIRTIO_F_IN_ORDER, VIRTIO_F_IOMMU_PLATFORM, VIRTIO_F_VERSION_1, }; use anyhow::anyhow; /// This is the `VirtioDevice` implementation for our vsock device. It handles the virtio-level @@ -37,9 +38,8 @@ use anyhow::anyhow; /// use byteorder::{ByteOrder, LittleEndian}; use libc::EFD_NONBLOCK; -use std::fs::File; use std::io; -use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::io::AsRawFd; use std::path::PathBuf; use std::result; use std::sync::atomic::{AtomicBool, Ordering}; @@ -57,18 +57,13 @@ const NUM_QUEUES: usize = 3; const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE; NUM_QUEUES]; // New descriptors are pending on the rx queue. -pub const RX_QUEUE_EVENT: DeviceEventT = 0; +pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; // New descriptors are pending on the tx queue. -pub const TX_QUEUE_EVENT: DeviceEventT = 1; +pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; // New descriptors are pending on the event queue. -pub const EVT_QUEUE_EVENT: DeviceEventT = 2; +pub const EVT_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; // Notification coming from the backend. -pub const BACKEND_EVENT: DeviceEventT = 3; -// The device has been dropped. -pub const KILL_EVENT: DeviceEventT = 4; -// The device should be paused. -const PAUSE_EVENT: DeviceEventT = 5; -pub const EVENTS_LEN: usize = 6; +pub const BACKEND_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4; /// The `VsockEpollHandler` implements the runtime logic of our vsock device: /// 1. Respond to TX queue events by wrapping virtio buffers into `VsockPacket`s, then sending those @@ -198,142 +193,64 @@ where } } - fn run(&mut self, paused: Arc) -> result::Result<(), DeviceError> { - // Create the epoll file descriptor - let epoll_fd = epoll::create(true).map_err(DeviceError::EpollCreateFd)?; - // Use 'File' to enforce closing on 'epoll_fd' - let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; - - // Add events - epoll::ctl( - epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_ADD, - self.queue_evts[0].as_raw_fd(), - epoll::Event::new(epoll::Events::EPOLLIN, u64::from(RX_QUEUE_EVENT)), - ) - .map_err(DeviceError::EpollCtl)?; - epoll::ctl( - epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_ADD, - self.queue_evts[1].as_raw_fd(), - epoll::Event::new(epoll::Events::EPOLLIN, u64::from(TX_QUEUE_EVENT)), - ) - .map_err(DeviceError::EpollCtl)?; - epoll::ctl( - epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_ADD, - self.queue_evts[2].as_raw_fd(), - epoll::Event::new(epoll::Events::EPOLLIN, u64::from(EVT_QUEUE_EVENT)), - ) - .map_err(DeviceError::EpollCtl)?; - epoll::ctl( - epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_ADD, - self.backend.read().unwrap().get_polled_fd(), - epoll::Event::new( - self.backend.read().unwrap().get_polled_evset(), - u64::from(BACKEND_EVENT), - ), - ) - .map_err(DeviceError::EpollCtl)?; - epoll::ctl( - epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_ADD, - self.kill_evt.as_raw_fd(), - epoll::Event::new(epoll::Events::EPOLLIN, u64::from(KILL_EVENT)), - ) - .map_err(DeviceError::EpollCtl)?; - epoll::ctl( - epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_ADD, - self.pause_evt.as_raw_fd(), - epoll::Event::new(epoll::Events::EPOLLIN, u64::from(PAUSE_EVENT)), - ) - .map_err(DeviceError::EpollCtl)?; - - let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EVENTS_LEN]; - - // Before jumping into the epoll loop, check if the device is expected - // to be in a paused state. This is helpful for the restore code path - // as the device thread should not start processing anything before the - // device has been resumed. - while paused.load(Ordering::SeqCst) { - thread::park(); - } - - 'epoll: loop { - let num_events = match epoll::wait(epoll_file.as_raw_fd(), -1, &mut events[..]) { - Ok(res) => res, - Err(e) => { - if e.kind() == io::ErrorKind::Interrupted { - // It's well defined from the epoll_wait() syscall - // documentation that the epoll loop can be interrupted - // before any of the requested events occurred or the - // timeout expired. In both those cases, epoll_wait() - // returns an error of type EINTR, but this should not - // be considered as a regular error. Instead it is more - // appropriate to retry, by calling into epoll_wait(). - continue; - } - return Err(DeviceError::EpollWait(e)); - } - }; - - for event in events.iter().take(num_events) { - let evset = match epoll::Events::from_bits(event.events) { - Some(evset) => evset, - None => { - let evbits = event.events; - warn!("epoll: ignoring unknown event set: 0x{:x}", evbits); - continue; - } - }; - - let ev_type = event.data as DeviceEventT; - - if self.handle_event(ev_type, evset, paused.clone())? { - break 'epoll; - } - } - } + fn run(&mut self, paused: Arc) -> result::Result<(), EpollHelperError> { + let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; + helper.add_event(self.queue_evts[0].as_raw_fd(), RX_QUEUE_EVENT)?; + helper.add_event(self.queue_evts[1].as_raw_fd(), TX_QUEUE_EVENT)?; + helper.add_event(self.queue_evts[2].as_raw_fd(), EVT_QUEUE_EVENT)?; + helper.add_event(self.backend.read().unwrap().get_polled_fd(), BACKEND_EVENT)?; + helper.run(paused, self)?; Ok(()) } +} - pub fn handle_event( - &mut self, - device_event: DeviceEventT, - evset: epoll::Events, - paused: Arc, - ) -> Result { - match device_event { +impl EpollHelperHandler for VsockEpollHandler +where + B: VsockBackend, +{ + fn handle_event(&mut self, _helper: &mut EpollHelper, event: &epoll::Event) -> bool { + let evset = match epoll::Events::from_bits(event.events) { + Some(evset) => evset, + None => { + let evbits = event.events; + warn!("epoll: ignoring unknown event set: 0x{:x}", evbits); + return false; + } + }; + + let ev_type = event.data as u16; + match ev_type { RX_QUEUE_EVENT => { debug!("vsock: RX queue event"); if let Err(e) = self.queue_evts[0].read() { error!("Failed to get RX queue event: {:?}", e); - return Err(DeviceError::FailedReadingQueue { - event_type: "rx queue event", - underlying: e, - }); + return true; } else if self.backend.read().unwrap().has_pending_rx() { - self.process_rx()?; + if let Err(e) = self.process_rx() { + error!("Failed to process RX queue: {:?}", e); + return true; + } } } TX_QUEUE_EVENT => { debug!("vsock: TX queue event"); if let Err(e) = self.queue_evts[1].read() { error!("Failed to get TX queue event: {:?}", e); - return Err(DeviceError::FailedReadingQueue { - event_type: "tx queue event", - underlying: e, - }); + return true; } else { - self.process_tx()?; + if let Err(e) = self.process_tx() { + error!("Failed to process TX queue: {:?}", e); + return true; + } // The backend may have queued up responses to the packets we sent during TX queue // processing. If that happened, we need to fetch those responses and place them // into RX buffers. if self.backend.read().unwrap().has_pending_rx() { - self.process_rx()?; + if let Err(e) = self.process_rx() { + error!("Failed to process RX queue: {:?}", e); + return true; + } } } } @@ -341,10 +258,7 @@ where debug!("vsock: EVT queue event"); if let Err(e) = self.queue_evts[2].read() { error!("Failed to get EVT queue event: {:?}", e); - return Err(DeviceError::FailedReadingQueue { - event_type: "evt queue event", - underlying: e, - }); + return true; } } BACKEND_EVENT => { @@ -355,39 +269,24 @@ where // In particular, if `self.backend.send_pkt()` halted the TX queue processing (by // reurning an error) at some point in the past, now is the time to try walking the // TX queue again. - self.process_tx()?; + if let Err(e) = self.process_tx() { + error!("Failed to process TX queue: {:?}", e); + return true; + } if self.backend.read().unwrap().has_pending_rx() { - self.process_rx()?; + if let Err(e) = self.process_rx() { + error!("Failed to process RX queue: {:?}", e); + return true; + } } } - KILL_EVENT => { - debug!("KILL_EVENT received, stopping epoll loop"); - return Ok(true); - } - PAUSE_EVENT => { - debug!("PAUSE_EVENT received, pausing virtio-vsock epoll loop"); - // We loop here to handle spurious park() returns. - // Until we have not resumed, the paused boolean will - // be true. - while paused.load(Ordering::SeqCst) { - thread::park(); - } - - // Drain pause event after the device has been resumed. - // This ensures the pause event has been seen by each - // and every thread related to this virtio device. - let _ = self.pause_evt.read(); - } - other => { + _ => { error!("Unknown event for virtio-vsock"); - return Err(DeviceError::UnknownEvent { - device: "vsock", - event: other, - }); + return true; } } - Ok(false) + false } } @@ -402,7 +301,7 @@ pub struct Vsock { acked_features: u64, queue_evts: Option>, interrupt_cb: Option>, - epoll_threads: Option>>>, + epoll_threads: Option>>>, paused: Arc, path: PathBuf, } @@ -840,13 +739,13 @@ mod tests { let test_ctx = TestContext::new(); let mut ctx = test_ctx.create_epoll_handler_context(); - match ctx.handler.handle_event( - TX_QUEUE_EVENT, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) { - Err(DeviceError::FailedReadingQueue { .. }) => (), - other => panic!("{:?}", other), + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, TX_QUEUE_EVENT as u64); + let mut epoll_helper = + EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap(); + + if !ctx.handler.handle_event(&mut epoll_helper, &event) { + panic!("handle_event() should have failed"); } } } @@ -907,13 +806,14 @@ mod tests { let test_ctx = TestContext::new(); let mut ctx = test_ctx.create_epoll_handler_context(); ctx.handler.backend.write().unwrap().set_pending_rx(false); - match ctx.handler.handle_event( - RX_QUEUE_EVENT, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) { - Err(DeviceError::FailedReadingQueue { .. }) => (), - other => panic!("{:?}", other), + + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, RX_QUEUE_EVENT as u64); + let mut epoll_helper = + EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap(); + + if !ctx.handler.handle_event(&mut epoll_helper, &event) { + panic!("handle_event() should have failed"); } } } @@ -925,13 +825,14 @@ mod tests { let test_ctx = TestContext::new(); let mut ctx = test_ctx.create_epoll_handler_context(); ctx.handler.backend.write().unwrap().set_pending_rx(false); - match ctx.handler.handle_event( - EVT_QUEUE_EVENT, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) { - Err(DeviceError::FailedReadingQueue { .. }) => (), - other => panic!("{:?}", other), + + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, EVT_QUEUE_EVENT as u64); + let mut epoll_helper = + EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap(); + + if !ctx.handler.handle_event(&mut epoll_helper, &event) { + panic!("handle_event() should have failed"); } } } @@ -946,13 +847,12 @@ mod tests { let mut ctx = test_ctx.create_epoll_handler_context(); ctx.handler.backend.write().unwrap().set_pending_rx(true); - ctx.handler - .handle_event( - BACKEND_EVENT, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) - .unwrap(); + + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, BACKEND_EVENT as u64); + let mut epoll_helper = + EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap(); + ctx.handler.handle_event(&mut epoll_helper, &event); // The backend should've received this event. assert_eq!( @@ -973,13 +873,12 @@ mod tests { let mut ctx = test_ctx.create_epoll_handler_context(); ctx.handler.backend.write().unwrap().set_pending_rx(false); - ctx.handler - .handle_event( - BACKEND_EVENT, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) - .unwrap(); + + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, BACKEND_EVENT as u64); + let mut epoll_helper = + EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap(); + ctx.handler.handle_event(&mut epoll_helper, &event); // The backend should've received this event. assert_eq!( @@ -998,13 +897,13 @@ mod tests { let test_ctx = TestContext::new(); let mut ctx = test_ctx.create_epoll_handler_context(); - match ctx.handler.handle_event( - 0xff, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) { - Err(DeviceError::UnknownEvent { .. }) => (), - other => panic!("{:?}", other), + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, 0xff); + let mut epoll_helper = + EpollHelper::new(&ctx.handler.kill_evt, &ctx.handler.pause_evt).unwrap(); + + if !ctx.handler.handle_event(&mut epoll_helper, &event) { + panic!("handle_event() should have failed"); } } } diff --git a/virtio-devices/src/vsock/mod.rs b/virtio-devices/src/vsock/mod.rs index b42bdc8a1..4ac1a764f 100644 --- a/virtio-devices/src/vsock/mod.rs +++ b/virtio-devices/src/vsock/mod.rs @@ -161,10 +161,11 @@ mod tests { use super::packet::VSOCK_PKT_HDR_SIZE; use super::*; use crate::device::{VirtioInterrupt, VirtioInterruptType}; + use crate::epoll_helper::EpollHelperHandler; + use crate::EpollHelper; use libc::EFD_NONBLOCK; use std::os::unix::io::AsRawFd; use std::path::PathBuf; - use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; use vm_virtio::queue::testing::VirtQueue as GuestQ; @@ -339,23 +340,19 @@ mod tests { impl<'a> EpollHandlerContext<'a> { pub fn signal_txq_event(&mut self) { self.handler.queue_evts[1].write(1).unwrap(); - self.handler - .handle_event( - TX_QUEUE_EVENT, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) - .unwrap(); + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, TX_QUEUE_EVENT as u64); + let mut epoll_helper = + EpollHelper::new(&self.handler.kill_evt, &self.handler.pause_evt).unwrap(); + self.handler.handle_event(&mut epoll_helper, &event); } pub fn signal_rxq_event(&mut self) { self.handler.queue_evts[0].write(1).unwrap(); - self.handler - .handle_event( - RX_QUEUE_EVENT, - epoll::Events::EPOLLIN, - Arc::new(AtomicBool::new(false)), - ) - .unwrap(); + let events = epoll::Events::EPOLLIN; + let event = epoll::Event::new(events, RX_QUEUE_EVENT as u64); + let mut epoll_helper = + EpollHelper::new(&self.handler.kill_evt, &self.handler.pause_evt).unwrap(); + self.handler.handle_event(&mut epoll_helper, &event); } } }