diff --git a/block_util/src/lib.rs b/block_util/src/lib.rs index 1ea9703b1..aecdc4b5b 100644 --- a/block_util/src/lib.rs +++ b/block_util/src/lib.rs @@ -16,16 +16,15 @@ extern crate serde_derive; pub mod async_io; pub mod raw_async; +use crate::async_io::{AsyncIo, AsyncIoError}; #[cfg(feature = "io_uring")] -use io_uring::Probe; -use io_uring::{opcode, squeue, IoUring}; +use io_uring::{opcode, IoUring, Probe}; use serde::ser::{Serialize, SerializeStruct, Serializer}; use std::cmp; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::os::linux::fs::MetadataExt; #[cfg(feature = "io_uring")] use std::os::unix::io::AsRawFd; -use std::os::unix::io::RawFd; use std::path::PathBuf; use std::result; use virtio_bindings::bindings::virtio_blk::*; @@ -101,6 +100,9 @@ pub enum ExecuteError { Unsupported(u32), SubmitIoUring(io::Error), GetHostAddress(GuestMemoryError), + AsyncRead(AsyncIoError), + AsyncWrite(AsyncIoError), + AsyncFlush(AsyncIoError), } impl ExecuteError { @@ -114,6 +116,9 @@ impl ExecuteError { ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP, ExecuteError::SubmitIoUring(_) => VIRTIO_BLK_S_IOERR, ExecuteError::GetHostAddress(_) => VIRTIO_BLK_S_IOERR, + ExecuteError::AsyncRead(_) => VIRTIO_BLK_S_IOERR, + ExecuteError::AsyncWrite(_) => VIRTIO_BLK_S_IOERR, + ExecuteError::AsyncFlush(_) => VIRTIO_BLK_S_IOERR, } } } @@ -271,12 +276,11 @@ impl Request { Ok(len) } - pub fn execute_io_uring( + pub fn execute_async( &self, mem: &GuestMemoryMmap, - io_uring: &mut IoUring, disk_nsectors: u64, - disk_image_fd: RawFd, + disk_image: &mut dyn AsyncIo, disk_id: &[u8], user_data: u64, ) -> result::Result { @@ -284,9 +288,6 @@ impl Request { let request_type = self.request_type; let offset = (sector << SECTOR_SHIFT) as libc::off_t; - let (submitter, sq, _) = io_uring.split(); - let mut avail_sq = sq.available(); - let mut iovecs = Vec::new(); for (data_addr, data_len) in &self.data_descriptors { let mut top: u64 = u64::from(*data_len) / SECTOR_SIZE; @@ -314,49 +315,19 @@ impl Request { // Queue operations expected to be submitted. match request_type { RequestType::In => { - // Safe because we know the file descriptor is valid and we - // relied on vm-memory to provide the buffer address. - let _ = unsafe { - avail_sq.push( - opcode::Readv::new( - opcode::types::Fd(disk_image_fd), - iovecs.as_ptr(), - iovecs.len() as u32, - ) - .offset(offset) - .build() - .flags(squeue::Flags::ASYNC) - .user_data(user_data), - ) - }; + disk_image + .read_vectored(offset, iovecs, user_data) + .map_err(ExecuteError::AsyncRead)?; } RequestType::Out => { - // Safe because we know the file descriptor is valid and we - // relied on vm-memory to provide the buffer address. - let _ = unsafe { - avail_sq.push( - opcode::Writev::new( - opcode::types::Fd(disk_image_fd), - iovecs.as_ptr(), - iovecs.len() as u32, - ) - .offset(offset) - .build() - .flags(squeue::Flags::ASYNC) - .user_data(user_data), - ) - }; + disk_image + .write_vectored(offset, iovecs, user_data) + .map_err(ExecuteError::AsyncWrite)?; } RequestType::Flush => { - // Safe because we know the file descriptor is valid. - let _ = unsafe { - avail_sq.push( - opcode::Fsync::new(opcode::types::Fd(disk_image_fd)) - .build() - .flags(squeue::Flags::ASYNC) - .user_data(user_data), - ) - }; + disk_image + .fsync(Some(user_data)) + .map_err(ExecuteError::AsyncFlush)?; } RequestType::GetDeviceID => { let (data_addr, data_len) = if self.data_descriptors.len() == 1 { @@ -374,11 +345,6 @@ impl Request { RequestType::Unsupported(t) => return Err(ExecuteError::Unsupported(t)), } - // Update the submission queue and submit new operations to the - // io_uring instance. - avail_sq.sync(); - submitter.submit().map_err(ExecuteError::SubmitIoUring)?; - Ok(true) } diff --git a/virtio-devices/src/block_io_uring.rs b/virtio-devices/src/block_io_uring.rs index 96f64ef58..d18d09726 100644 --- a/virtio-devices/src/block_io_uring.rs +++ b/virtio-devices/src/block_io_uring.rs @@ -16,15 +16,15 @@ use super::{ use crate::seccomp_filters::{get_seccomp_filter, Thread}; use crate::VirtioInterrupt; use anyhow::anyhow; -use block_util::{build_disk_image_id, Request, RequestType, VirtioBlockConfig}; -use io_uring::IoUring; -use libc::EFD_NONBLOCK; +use block_util::{ + async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_disk_image_id, Request, + RequestType, VirtioBlockConfig, +}; use seccomp::{SeccompAction, SeccompFilter}; use std::collections::HashMap; -use std::fs::File; -use std::io::{self, Seek, SeekFrom}; +use std::io; use std::num::Wrapping; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::path::PathBuf; use std::result; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; @@ -47,7 +47,7 @@ pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; // New descriptors are pending on the virtio queue. const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; // New completed tasks are pending on the completion ring. -const IO_URING_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; +const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; #[derive(Debug)] pub enum Error { @@ -77,6 +77,8 @@ pub enum Error { MissingEntryRequestList, /// The asynchronous request returned with failure. AsyncRequestFailure, + /// Failed synchronizing the file + Fsync(AsyncIoError), } pub type Result = result::Result; @@ -92,7 +94,7 @@ pub struct BlockCounters { struct BlockIoUringEpollHandler { queue: Queue, mem: GuestMemoryAtomic, - disk_image_fd: RawFd, + disk_image: Box, disk_nsectors: u64, interrupt_cb: Arc, disk_image_id: Vec, @@ -101,8 +103,6 @@ struct BlockIoUringEpollHandler { writeback: Arc, counters: BlockCounters, queue_evt: EventFd, - io_uring: IoUring, - io_uring_evt: EventFd, request_list: HashMap, } @@ -117,12 +117,12 @@ impl BlockIoUringEpollHandler { for avail_desc in queue.iter(&mem) { let mut request = Request::parse(&avail_desc, &mem).map_err(Error::RequestParsing)?; request.set_writeback(self.writeback.load(Ordering::Acquire)); + if request - .execute_io_uring( + .execute_async( &mem, - &mut self.io_uring, self.disk_nsectors, - self.disk_image_fd, + self.disk_image.as_mut(), &self.disk_image_id, avail_desc.index as u64, ) @@ -159,10 +159,9 @@ impl BlockIoUringEpollHandler { let mut read_ops = Wrapping(0); let mut write_ops = Wrapping(0); - let cq = self.io_uring.completion(); - for cq_entry in cq.available() { - let result = cq_entry.result(); - let desc_index = cq_entry.user_data() as u16; + let completion_list = self.disk_image.complete(); + for (user_data, result) in completion_list { + let desc_index = user_data as u16; let request = self .request_list .remove(&desc_index) @@ -178,7 +177,7 @@ impl BlockIoUringEpollHandler { } RequestType::Out => { if !request.writeback { - unsafe { libc::fsync(self.disk_image_fd) }; + self.disk_image.fsync(None).map_err(Error::Fsync)?; } for (_, data_len) in &request.data_descriptors { write_bytes += Wrapping(*data_len as u64); @@ -201,7 +200,7 @@ impl BlockIoUringEpollHandler { // checked that the status_addr was valid. mem.write_obj(status, request.status_addr).unwrap(); - used_desc_heads.push((desc_index, len)); + used_desc_heads.push((desc_index as u16, len)); used_count += 1; } @@ -242,7 +241,7 @@ impl BlockIoUringEpollHandler { ) -> result::Result<(), EpollHelperError> { let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; - helper.add_event(self.io_uring_evt.as_raw_fd(), IO_URING_EVENT)?; + helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; helper.run(paused, paused_sync, self)?; Ok(()) @@ -274,8 +273,8 @@ impl EpollHelperHandler for BlockIoUringEpollHandler { } } } - IO_URING_EVENT => { - if let Err(e) = self.io_uring_evt.read() { + COMPLETION_EVENT => { + if let Err(e) = self.disk_image.notifier().read() { error!("Failed to get queue event: {:?}", e); return true; } @@ -308,7 +307,7 @@ impl EpollHelperHandler for BlockIoUringEpollHandler { pub struct BlockIoUring { common: VirtioCommon, id: String, - disk_image: File, + disk_image: Box, disk_path: PathBuf, disk_nsectors: u64, config: VirtioBlockConfig, @@ -331,7 +330,7 @@ impl BlockIoUring { #[allow(clippy::too_many_arguments)] pub fn new( id: String, - mut disk_image: File, + mut disk_image: Box, disk_path: PathBuf, is_disk_read_only: bool, iommu: bool, @@ -339,7 +338,12 @@ impl BlockIoUring { queue_size: u16, seccomp_action: SeccompAction, ) -> io::Result { - let disk_size = disk_image.seek(SeekFrom::End(0))? as u64; + let disk_size = disk_image.size().map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("Failed getting disk size: {}", e), + ) + })?; if disk_size % SECTOR_SIZE != 0 { warn!( "Disk size {} is not a multiple of sector size {}; \ @@ -498,10 +502,6 @@ impl VirtioDevice for BlockIoUring { let queue_evt = queue_evts.remove(0); let queue = queues.remove(0); let queue_size = queue.size; - let io_uring = IoUring::new(queue_size as u32).map_err(|e| { - error!("failed to create io_uring instance: {}", e); - ActivateError::BadActivate - })?; let kill_evt = self .common .kill_evt @@ -526,7 +526,13 @@ impl VirtioDevice for BlockIoUring { let mut handler = BlockIoUringEpollHandler { queue, mem: mem.clone(), - disk_image_fd: self.disk_image.as_raw_fd(), + disk_image: self + .disk_image + .new_async_io(queue_size as u32) + .map_err(|e| { + error!("failed to create new AsyncIo: {}", e); + ActivateError::BadActivate + })?, disk_nsectors: self.disk_nsectors, interrupt_cb: interrupt_cb.clone(), disk_image_id: disk_image_id.clone(), @@ -535,28 +541,12 @@ impl VirtioDevice for BlockIoUring { writeback: self.writeback.clone(), counters: self.counters.clone(), queue_evt, - io_uring, - io_uring_evt: EventFd::new(EFD_NONBLOCK).map_err(|e| { - error!("failed to create io_uring eventfd: {}", e); - ActivateError::BadActivate - })?, request_list: HashMap::with_capacity(queue_size.into()), }; let paused = self.common.paused.clone(); let paused_sync = self.common.paused_sync.clone(); - // Register the io_uring eventfd that will notify the epoll loop - // when something in the completion queue is ready. - handler - .io_uring - .submitter() - .register_eventfd(handler.io_uring_evt.as_raw_fd()) - .map_err(|e| { - error!("failed to register eventfd for io_uring: {}", e); - ActivateError::BadActivate - })?; - // Retrieve seccomp filter for virtio_blk_io_uring thread let virtio_blk_io_uring_seccomp_filter = get_seccomp_filter(&self.seccomp_action, Thread::VirtioBlkIoUring) diff --git a/vmm/src/device_manager.rs b/vmm/src/device_manager.rs index 1b0e5379f..932cc8412 100644 --- a/vmm/src/device_manager.rs +++ b/vmm/src/device_manager.rs @@ -38,7 +38,7 @@ use arch::layout; use arch::layout::{APIC_START, IOAPIC_SIZE, IOAPIC_START}; #[cfg(target_arch = "aarch64")] use arch::DeviceType; -use block_util::block_io_uring_is_supported; +use block_util::{async_io::DiskFile, block_io_uring_is_supported, raw_async::RawFileDisk}; #[cfg(target_arch = "aarch64")] use devices::gic; #[cfg(target_arch = "x86_64")] @@ -1655,6 +1655,7 @@ impl DeviceManager { // Use asynchronous backend relying on io_uring if the // syscalls are supported. if block_io_uring_is_supported() && !disk_cfg.disable_io_uring { + let image = Box::new(RawFileDisk::new(image)) as Box; let dev = Arc::new(Mutex::new( virtio_devices::BlockIoUring::new( id.clone(),