From 23e3b022eb9f68e6f9da7305a3b02a863d41bc68 Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Wed, 20 Jan 2021 10:55:39 +0100 Subject: [PATCH] block_util: Implement asynchronous traits for RAW disk file This provides the implementation of DiskFile and AsyncIo for the RAW file format. Signed-off-by: Sebastien Boeuf --- block_util/src/lib.rs | 1 + block_util/src/raw_async.rs | 170 ++++++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+) create mode 100644 block_util/src/raw_async.rs diff --git a/block_util/src/lib.rs b/block_util/src/lib.rs index 71b89a994..1ea9703b1 100644 --- a/block_util/src/lib.rs +++ b/block_util/src/lib.rs @@ -14,6 +14,7 @@ extern crate log; extern crate serde_derive; pub mod async_io; +pub mod raw_async; #[cfg(feature = "io_uring")] use io_uring::Probe; diff --git a/block_util/src/raw_async.rs b/block_util/src/raw_async.rs new file mode 100644 index 000000000..6c585d03c --- /dev/null +++ b/block_util/src/raw_async.rs @@ -0,0 +1,170 @@ +// Copyright © 2021 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause + +use crate::async_io::{ + AsyncIo, AsyncIoError, AsyncIoResult, DiskFile, DiskFileError, DiskFileResult, +}; +use io_uring::{opcode, squeue, IoUring}; +use std::fs::File; +use std::io::{Seek, SeekFrom}; +use std::os::unix::io::{AsRawFd, RawFd}; +use vmm_sys_util::eventfd::EventFd; + +pub struct RawFileDisk { + file: File, +} + +impl RawFileDisk { + pub fn new(file: File) -> Self { + RawFileDisk { file } + } +} + +impl DiskFile for RawFileDisk { + fn size(&mut self) -> DiskFileResult { + Ok(self + .file + .seek(SeekFrom::End(0)) + .map_err(DiskFileError::Size)? as u64) + } + + fn new_async_io(&self, ring_depth: u32) -> DiskFileResult> { + Ok(Box::new( + RawFileAsync::new(self.file.as_raw_fd(), ring_depth) + .map_err(DiskFileError::NewAsyncIo)?, + ) as Box) + } +} + +pub struct RawFileAsync { + fd: RawFd, + io_uring: IoUring, + eventfd: EventFd, +} + +impl RawFileAsync { + pub fn new(fd: RawFd, ring_depth: u32) -> std::io::Result { + let io_uring = IoUring::new(ring_depth)?; + let eventfd = EventFd::new(libc::EFD_NONBLOCK)?; + + // Register the io_uring eventfd that will notify when something in + // the completion queue is ready. + io_uring.submitter().register_eventfd(eventfd.as_raw_fd())?; + + Ok(RawFileAsync { + fd, + io_uring, + eventfd, + }) + } +} + +impl AsyncIo for RawFileAsync { + fn notifier(&self) -> &EventFd { + &self.eventfd + } + + fn read_vectored( + &mut self, + offset: libc::off_t, + iovecs: Vec, + user_data: u64, + ) -> AsyncIoResult<()> { + let (submitter, sq, _) = self.io_uring.split(); + let mut avail_sq = sq.available(); + + // Safe because we know the file descriptor is valid and we + // relied on vm-memory to provide the buffer address. + let _ = unsafe { + avail_sq.push( + opcode::Readv::new( + opcode::types::Fd(self.fd), + iovecs.as_ptr(), + iovecs.len() as u32, + ) + .offset(offset) + .build() + .flags(squeue::Flags::ASYNC) + .user_data(user_data), + ) + }; + + // Update the submission queue and submit new operations to the + // io_uring instance. + avail_sq.sync(); + submitter.submit().map_err(AsyncIoError::ReadVectored)?; + + Ok(()) + } + + fn write_vectored( + &mut self, + offset: libc::off_t, + iovecs: Vec, + user_data: u64, + ) -> AsyncIoResult<()> { + let (submitter, sq, _) = self.io_uring.split(); + let mut avail_sq = sq.available(); + + // Safe because we know the file descriptor is valid and we + // relied on vm-memory to provide the buffer address. + let _ = unsafe { + avail_sq.push( + opcode::Writev::new( + opcode::types::Fd(self.fd), + iovecs.as_ptr(), + iovecs.len() as u32, + ) + .offset(offset) + .build() + .flags(squeue::Flags::ASYNC) + .user_data(user_data), + ) + }; + + // Update the submission queue and submit new operations to the + // io_uring instance. + avail_sq.sync(); + submitter.submit().map_err(AsyncIoError::WriteVectored)?; + + Ok(()) + } + + fn fsync(&mut self, user_data: Option) -> AsyncIoResult<()> { + if let Some(user_data) = user_data { + let (submitter, sq, _) = self.io_uring.split(); + let mut avail_sq = sq.available(); + + // Safe because we know the file descriptor is valid. + let _ = unsafe { + avail_sq.push( + opcode::Fsync::new(opcode::types::Fd(self.fd)) + .build() + .flags(squeue::Flags::ASYNC) + .user_data(user_data), + ) + }; + + // Update the submission queue and submit new operations to the + // io_uring instance. + avail_sq.sync(); + submitter.submit().map_err(AsyncIoError::Fsync)?; + } else { + unsafe { libc::fsync(self.fd) }; + } + + Ok(()) + } + + fn complete(&mut self) -> Vec<(u64, i32)> { + let mut completion_list = Vec::new(); + + let cq = self.io_uring.completion(); + for cq_entry in cq.available() { + completion_list.push((cq_entry.user_data(), cq_entry.result())); + } + + completion_list + } +}