virtio-devices: Use asynchronous traits for virtio-blk io_uring

Based on the new DiskFile and AsyncIo traits, the implementation of
asynchronous block support does not have to be tied to io_uring anymore.
Instead, the only thing the virtio-blk implementation knows is that it
is using an asynchronous implementation of the underlying disk file.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2021-01-20 10:58:30 +01:00 committed by Rob Bradford
parent 23e3b022eb
commit da8ce25abf
3 changed files with 57 additions and 100 deletions

View File

@ -16,16 +16,15 @@ extern crate serde_derive;
pub mod async_io; pub mod async_io;
pub mod raw_async; pub mod raw_async;
use crate::async_io::{AsyncIo, AsyncIoError};
#[cfg(feature = "io_uring")] #[cfg(feature = "io_uring")]
use io_uring::Probe; use io_uring::{opcode, IoUring, Probe};
use io_uring::{opcode, squeue, IoUring};
use serde::ser::{Serialize, SerializeStruct, Serializer}; use serde::ser::{Serialize, SerializeStruct, Serializer};
use std::cmp; use std::cmp;
use std::io::{self, Read, Seek, SeekFrom, Write}; use std::io::{self, Read, Seek, SeekFrom, Write};
use std::os::linux::fs::MetadataExt; use std::os::linux::fs::MetadataExt;
#[cfg(feature = "io_uring")] #[cfg(feature = "io_uring")]
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::path::PathBuf; use std::path::PathBuf;
use std::result; use std::result;
use virtio_bindings::bindings::virtio_blk::*; use virtio_bindings::bindings::virtio_blk::*;
@ -101,6 +100,9 @@ pub enum ExecuteError {
Unsupported(u32), Unsupported(u32),
SubmitIoUring(io::Error), SubmitIoUring(io::Error),
GetHostAddress(GuestMemoryError), GetHostAddress(GuestMemoryError),
AsyncRead(AsyncIoError),
AsyncWrite(AsyncIoError),
AsyncFlush(AsyncIoError),
} }
impl ExecuteError { impl ExecuteError {
@ -114,6 +116,9 @@ impl ExecuteError {
ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP, ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
ExecuteError::SubmitIoUring(_) => VIRTIO_BLK_S_IOERR, ExecuteError::SubmitIoUring(_) => VIRTIO_BLK_S_IOERR,
ExecuteError::GetHostAddress(_) => VIRTIO_BLK_S_IOERR, ExecuteError::GetHostAddress(_) => VIRTIO_BLK_S_IOERR,
ExecuteError::AsyncRead(_) => VIRTIO_BLK_S_IOERR,
ExecuteError::AsyncWrite(_) => VIRTIO_BLK_S_IOERR,
ExecuteError::AsyncFlush(_) => VIRTIO_BLK_S_IOERR,
} }
} }
} }
@ -271,12 +276,11 @@ impl Request {
Ok(len) Ok(len)
} }
pub fn execute_io_uring( pub fn execute_async(
&self, &self,
mem: &GuestMemoryMmap, mem: &GuestMemoryMmap,
io_uring: &mut IoUring,
disk_nsectors: u64, disk_nsectors: u64,
disk_image_fd: RawFd, disk_image: &mut dyn AsyncIo,
disk_id: &[u8], disk_id: &[u8],
user_data: u64, user_data: u64,
) -> result::Result<bool, ExecuteError> { ) -> result::Result<bool, ExecuteError> {
@ -284,9 +288,6 @@ impl Request {
let request_type = self.request_type; let request_type = self.request_type;
let offset = (sector << SECTOR_SHIFT) as libc::off_t; let offset = (sector << SECTOR_SHIFT) as libc::off_t;
let (submitter, sq, _) = io_uring.split();
let mut avail_sq = sq.available();
let mut iovecs = Vec::new(); let mut iovecs = Vec::new();
for (data_addr, data_len) in &self.data_descriptors { for (data_addr, data_len) in &self.data_descriptors {
let mut top: u64 = u64::from(*data_len) / SECTOR_SIZE; let mut top: u64 = u64::from(*data_len) / SECTOR_SIZE;
@ -314,49 +315,19 @@ impl Request {
// Queue operations expected to be submitted. // Queue operations expected to be submitted.
match request_type { match request_type {
RequestType::In => { RequestType::In => {
// Safe because we know the file descriptor is valid and we disk_image
// relied on vm-memory to provide the buffer address. .read_vectored(offset, iovecs, user_data)
let _ = unsafe { .map_err(ExecuteError::AsyncRead)?;
avail_sq.push(
opcode::Readv::new(
opcode::types::Fd(disk_image_fd),
iovecs.as_ptr(),
iovecs.len() as u32,
)
.offset(offset)
.build()
.flags(squeue::Flags::ASYNC)
.user_data(user_data),
)
};
} }
RequestType::Out => { RequestType::Out => {
// Safe because we know the file descriptor is valid and we disk_image
// relied on vm-memory to provide the buffer address. .write_vectored(offset, iovecs, user_data)
let _ = unsafe { .map_err(ExecuteError::AsyncWrite)?;
avail_sq.push(
opcode::Writev::new(
opcode::types::Fd(disk_image_fd),
iovecs.as_ptr(),
iovecs.len() as u32,
)
.offset(offset)
.build()
.flags(squeue::Flags::ASYNC)
.user_data(user_data),
)
};
} }
RequestType::Flush => { RequestType::Flush => {
// Safe because we know the file descriptor is valid. disk_image
let _ = unsafe { .fsync(Some(user_data))
avail_sq.push( .map_err(ExecuteError::AsyncFlush)?;
opcode::Fsync::new(opcode::types::Fd(disk_image_fd))
.build()
.flags(squeue::Flags::ASYNC)
.user_data(user_data),
)
};
} }
RequestType::GetDeviceID => { RequestType::GetDeviceID => {
let (data_addr, data_len) = if self.data_descriptors.len() == 1 { let (data_addr, data_len) = if self.data_descriptors.len() == 1 {
@ -374,11 +345,6 @@ impl Request {
RequestType::Unsupported(t) => return Err(ExecuteError::Unsupported(t)), RequestType::Unsupported(t) => return Err(ExecuteError::Unsupported(t)),
} }
// Update the submission queue and submit new operations to the
// io_uring instance.
avail_sq.sync();
submitter.submit().map_err(ExecuteError::SubmitIoUring)?;
Ok(true) Ok(true)
} }

View File

@ -16,15 +16,15 @@ use super::{
use crate::seccomp_filters::{get_seccomp_filter, Thread}; use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::VirtioInterrupt; use crate::VirtioInterrupt;
use anyhow::anyhow; use anyhow::anyhow;
use block_util::{build_disk_image_id, Request, RequestType, VirtioBlockConfig}; use block_util::{
use io_uring::IoUring; async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_disk_image_id, Request,
use libc::EFD_NONBLOCK; RequestType, VirtioBlockConfig,
};
use seccomp::{SeccompAction, SeccompFilter}; use seccomp::{SeccompAction, SeccompFilter};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::io;
use std::io::{self, Seek, SeekFrom};
use std::num::Wrapping; use std::num::Wrapping;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::AsRawFd;
use std::path::PathBuf; use std::path::PathBuf;
use std::result; use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
@ -47,7 +47,7 @@ pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
// New descriptors are pending on the virtio queue. // New descriptors are pending on the virtio queue.
const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
// New completed tasks are pending on the completion ring. // New completed tasks are pending on the completion ring.
const IO_URING_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -77,6 +77,8 @@ pub enum Error {
MissingEntryRequestList, MissingEntryRequestList,
/// The asynchronous request returned with failure. /// The asynchronous request returned with failure.
AsyncRequestFailure, AsyncRequestFailure,
/// Failed synchronizing the file
Fsync(AsyncIoError),
} }
pub type Result<T> = result::Result<T, Error>; pub type Result<T> = result::Result<T, Error>;
@ -92,7 +94,7 @@ pub struct BlockCounters {
struct BlockIoUringEpollHandler { struct BlockIoUringEpollHandler {
queue: Queue, queue: Queue,
mem: GuestMemoryAtomic<GuestMemoryMmap>, mem: GuestMemoryAtomic<GuestMemoryMmap>,
disk_image_fd: RawFd, disk_image: Box<dyn AsyncIo>,
disk_nsectors: u64, disk_nsectors: u64,
interrupt_cb: Arc<dyn VirtioInterrupt>, interrupt_cb: Arc<dyn VirtioInterrupt>,
disk_image_id: Vec<u8>, disk_image_id: Vec<u8>,
@ -101,8 +103,6 @@ struct BlockIoUringEpollHandler {
writeback: Arc<AtomicBool>, writeback: Arc<AtomicBool>,
counters: BlockCounters, counters: BlockCounters,
queue_evt: EventFd, queue_evt: EventFd,
io_uring: IoUring,
io_uring_evt: EventFd,
request_list: HashMap<u16, Request>, request_list: HashMap<u16, Request>,
} }
@ -117,12 +117,12 @@ impl BlockIoUringEpollHandler {
for avail_desc in queue.iter(&mem) { for avail_desc in queue.iter(&mem) {
let mut request = Request::parse(&avail_desc, &mem).map_err(Error::RequestParsing)?; let mut request = Request::parse(&avail_desc, &mem).map_err(Error::RequestParsing)?;
request.set_writeback(self.writeback.load(Ordering::Acquire)); request.set_writeback(self.writeback.load(Ordering::Acquire));
if request if request
.execute_io_uring( .execute_async(
&mem, &mem,
&mut self.io_uring,
self.disk_nsectors, self.disk_nsectors,
self.disk_image_fd, self.disk_image.as_mut(),
&self.disk_image_id, &self.disk_image_id,
avail_desc.index as u64, avail_desc.index as u64,
) )
@ -159,10 +159,9 @@ impl BlockIoUringEpollHandler {
let mut read_ops = Wrapping(0); let mut read_ops = Wrapping(0);
let mut write_ops = Wrapping(0); let mut write_ops = Wrapping(0);
let cq = self.io_uring.completion(); let completion_list = self.disk_image.complete();
for cq_entry in cq.available() { for (user_data, result) in completion_list {
let result = cq_entry.result(); let desc_index = user_data as u16;
let desc_index = cq_entry.user_data() as u16;
let request = self let request = self
.request_list .request_list
.remove(&desc_index) .remove(&desc_index)
@ -178,7 +177,7 @@ impl BlockIoUringEpollHandler {
} }
RequestType::Out => { RequestType::Out => {
if !request.writeback { if !request.writeback {
unsafe { libc::fsync(self.disk_image_fd) }; self.disk_image.fsync(None).map_err(Error::Fsync)?;
} }
for (_, data_len) in &request.data_descriptors { for (_, data_len) in &request.data_descriptors {
write_bytes += Wrapping(*data_len as u64); write_bytes += Wrapping(*data_len as u64);
@ -201,7 +200,7 @@ impl BlockIoUringEpollHandler {
// checked that the status_addr was valid. // checked that the status_addr was valid.
mem.write_obj(status, request.status_addr).unwrap(); mem.write_obj(status, request.status_addr).unwrap();
used_desc_heads.push((desc_index, len)); used_desc_heads.push((desc_index as u16, len));
used_count += 1; used_count += 1;
} }
@ -242,7 +241,7 @@ impl BlockIoUringEpollHandler {
) -> result::Result<(), EpollHelperError> { ) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
helper.add_event(self.io_uring_evt.as_raw_fd(), IO_URING_EVENT)?; helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
helper.run(paused, paused_sync, self)?; helper.run(paused, paused_sync, self)?;
Ok(()) Ok(())
@ -274,8 +273,8 @@ impl EpollHelperHandler for BlockIoUringEpollHandler {
} }
} }
} }
IO_URING_EVENT => { COMPLETION_EVENT => {
if let Err(e) = self.io_uring_evt.read() { if let Err(e) = self.disk_image.notifier().read() {
error!("Failed to get queue event: {:?}", e); error!("Failed to get queue event: {:?}", e);
return true; return true;
} }
@ -308,7 +307,7 @@ impl EpollHelperHandler for BlockIoUringEpollHandler {
pub struct BlockIoUring { pub struct BlockIoUring {
common: VirtioCommon, common: VirtioCommon,
id: String, id: String,
disk_image: File, disk_image: Box<dyn DiskFile>,
disk_path: PathBuf, disk_path: PathBuf,
disk_nsectors: u64, disk_nsectors: u64,
config: VirtioBlockConfig, config: VirtioBlockConfig,
@ -331,7 +330,7 @@ impl BlockIoUring {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
id: String, id: String,
mut disk_image: File, mut disk_image: Box<dyn DiskFile>,
disk_path: PathBuf, disk_path: PathBuf,
is_disk_read_only: bool, is_disk_read_only: bool,
iommu: bool, iommu: bool,
@ -339,7 +338,12 @@ impl BlockIoUring {
queue_size: u16, queue_size: u16,
seccomp_action: SeccompAction, seccomp_action: SeccompAction,
) -> io::Result<Self> { ) -> io::Result<Self> {
let disk_size = disk_image.seek(SeekFrom::End(0))? as u64; let disk_size = disk_image.size().map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed getting disk size: {}", e),
)
})?;
if disk_size % SECTOR_SIZE != 0 { if disk_size % SECTOR_SIZE != 0 {
warn!( warn!(
"Disk size {} is not a multiple of sector size {}; \ "Disk size {} is not a multiple of sector size {}; \
@ -498,10 +502,6 @@ impl VirtioDevice for BlockIoUring {
let queue_evt = queue_evts.remove(0); let queue_evt = queue_evts.remove(0);
let queue = queues.remove(0); let queue = queues.remove(0);
let queue_size = queue.size; let queue_size = queue.size;
let io_uring = IoUring::new(queue_size as u32).map_err(|e| {
error!("failed to create io_uring instance: {}", e);
ActivateError::BadActivate
})?;
let kill_evt = self let kill_evt = self
.common .common
.kill_evt .kill_evt
@ -526,7 +526,13 @@ impl VirtioDevice for BlockIoUring {
let mut handler = BlockIoUringEpollHandler { let mut handler = BlockIoUringEpollHandler {
queue, queue,
mem: mem.clone(), mem: mem.clone(),
disk_image_fd: self.disk_image.as_raw_fd(), disk_image: self
.disk_image
.new_async_io(queue_size as u32)
.map_err(|e| {
error!("failed to create new AsyncIo: {}", e);
ActivateError::BadActivate
})?,
disk_nsectors: self.disk_nsectors, disk_nsectors: self.disk_nsectors,
interrupt_cb: interrupt_cb.clone(), interrupt_cb: interrupt_cb.clone(),
disk_image_id: disk_image_id.clone(), disk_image_id: disk_image_id.clone(),
@ -535,28 +541,12 @@ impl VirtioDevice for BlockIoUring {
writeback: self.writeback.clone(), writeback: self.writeback.clone(),
counters: self.counters.clone(), counters: self.counters.clone(),
queue_evt, queue_evt,
io_uring,
io_uring_evt: EventFd::new(EFD_NONBLOCK).map_err(|e| {
error!("failed to create io_uring eventfd: {}", e);
ActivateError::BadActivate
})?,
request_list: HashMap::with_capacity(queue_size.into()), request_list: HashMap::with_capacity(queue_size.into()),
}; };
let paused = self.common.paused.clone(); let paused = self.common.paused.clone();
let paused_sync = self.common.paused_sync.clone(); let paused_sync = self.common.paused_sync.clone();
// Register the io_uring eventfd that will notify the epoll loop
// when something in the completion queue is ready.
handler
.io_uring
.submitter()
.register_eventfd(handler.io_uring_evt.as_raw_fd())
.map_err(|e| {
error!("failed to register eventfd for io_uring: {}", e);
ActivateError::BadActivate
})?;
// Retrieve seccomp filter for virtio_blk_io_uring thread // Retrieve seccomp filter for virtio_blk_io_uring thread
let virtio_blk_io_uring_seccomp_filter = let virtio_blk_io_uring_seccomp_filter =
get_seccomp_filter(&self.seccomp_action, Thread::VirtioBlkIoUring) get_seccomp_filter(&self.seccomp_action, Thread::VirtioBlkIoUring)

View File

@ -38,7 +38,7 @@ use arch::layout;
use arch::layout::{APIC_START, IOAPIC_SIZE, IOAPIC_START}; use arch::layout::{APIC_START, IOAPIC_SIZE, IOAPIC_START};
#[cfg(target_arch = "aarch64")] #[cfg(target_arch = "aarch64")]
use arch::DeviceType; use arch::DeviceType;
use block_util::block_io_uring_is_supported; use block_util::{async_io::DiskFile, block_io_uring_is_supported, raw_async::RawFileDisk};
#[cfg(target_arch = "aarch64")] #[cfg(target_arch = "aarch64")]
use devices::gic; use devices::gic;
#[cfg(target_arch = "x86_64")] #[cfg(target_arch = "x86_64")]
@ -1655,6 +1655,7 @@ impl DeviceManager {
// Use asynchronous backend relying on io_uring if the // Use asynchronous backend relying on io_uring if the
// syscalls are supported. // syscalls are supported.
if block_io_uring_is_supported() && !disk_cfg.disable_io_uring { if block_io_uring_is_supported() && !disk_cfg.disable_io_uring {
let image = Box::new(RawFileDisk::new(image)) as Box<dyn DiskFile>;
let dev = Arc::new(Mutex::new( let dev = Arc::new(Mutex::new(
virtio_devices::BlockIoUring::new( virtio_devices::BlockIoUring::new(
id.clone(), id.clone(),