From e1151482fc38835563a05eb304f3ef13b91d74cf Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Sun, 5 Dec 2021 17:53:18 +0000 Subject: [PATCH] block_util: handle synchronized read/write/fsync idiomatically Previously mutex (semaphore) and file were separated. The code needed to create artificial scopes to use mutex to protect file. Rewrite the code to be idiomatic. The file itself is turned into a trait object and placed inside the mutex. This requires providing a new ReadWriteSeekFile trait to unify all helper functions. The rewrite further simplified vhdx_sync code. The original code contained two mutex'es for no apparent reason. No functional change intended. Signed-off-by: Wei Liu --- block_util/src/lib.rs | 38 +++++++++++++++--------------- block_util/src/qcow_sync.rs | 33 ++++++++------------------ block_util/src/vhdx_sync.rs | 46 ++++++++++++++++++------------------- 3 files changed, 49 insertions(+), 68 deletions(-) diff --git a/block_util/src/lib.rs b/block_util/src/lib.rs index fc9eebfdf..b58f941d1 100644 --- a/block_util/src/lib.rs +++ b/block_util/src/lib.rs @@ -23,6 +23,7 @@ pub mod vhdx_sync; use crate::async_io::{AsyncIo, AsyncIoError, AsyncIoResult}; #[cfg(feature = "io_uring")] use io_uring::{opcode, IoUring, Probe}; +use std::any::Any; use std::cmp; use std::convert::TryInto; use std::fs::File; @@ -476,17 +477,25 @@ pub fn block_io_uring_is_supported() -> bool { false } -pub trait ReadSeekFile: Read + Seek {} -impl ReadSeekFile for F {} +pub trait ReadWriteSeekFile: Read + Write + Seek { + // Provides a mutable reference to the Any trait. This is useful to let + // the caller have access to the underlying type behind the trait. + fn as_any(&mut self) -> &mut dyn Any; +} + +impl ReadWriteSeekFile for F { + fn as_any(&mut self) -> &mut dyn Any { + self + } +} pub fn read_vectored_sync( offset: libc::off_t, iovecs: Vec, user_data: u64, - file: &mut dyn ReadSeekFile, + file: &mut Arc>, eventfd: &EventFd, completion_list: &mut Vec<(u64, i32)>, - semaphore: &mut Arc>, ) -> AsyncIoResult<()> { // Convert libc::iovec into IoSliceMut let mut slices = Vec::new(); @@ -495,9 +504,7 @@ pub fn read_vectored_sync( } let result = { - // Take the semaphore to ensure other threads are not interacting - // with the underlying file. - let _lock = semaphore.lock().unwrap(); + let mut file = file.lock().unwrap(); // Move the cursor to the right offset file.seek(SeekFrom::Start(offset as u64)) @@ -514,17 +521,13 @@ pub fn read_vectored_sync( Ok(()) } -pub trait WriteSeekFile: Write + Seek {} -impl WriteSeekFile for F {} - pub fn write_vectored_sync( offset: libc::off_t, iovecs: Vec, user_data: u64, - file: &mut dyn WriteSeekFile, + file: &mut Arc>, eventfd: &EventFd, completion_list: &mut Vec<(u64, i32)>, - semaphore: &mut Arc>, ) -> AsyncIoResult<()> { // Convert libc::iovec into IoSlice let mut slices = Vec::new(); @@ -533,9 +536,7 @@ pub fn write_vectored_sync( } let result = { - // Take the semaphore to ensure other threads are not interacting - // with the underlying file. - let _lock = semaphore.lock().unwrap(); + let mut file = file.lock().unwrap(); // Move the cursor to the right offset file.seek(SeekFrom::Start(offset as u64)) @@ -554,15 +555,12 @@ pub fn write_vectored_sync( pub fn fsync_sync( user_data: Option, - file: &mut dyn Write, + file: &mut Arc>, eventfd: &EventFd, completion_list: &mut Vec<(u64, i32)>, - semaphore: &mut Arc>, ) -> AsyncIoResult<()> { let result: i32 = { - // Take the semaphore to ensure other threads are not interacting - // with the underlying file. - let _lock = semaphore.lock().unwrap(); + let mut file = file.lock().unwrap(); // Flush file.flush().map_err(AsyncIoError::Fsync)?; diff --git a/block_util/src/qcow_sync.rs b/block_util/src/qcow_sync.rs index f3d749c32..654c37118 100644 --- a/block_util/src/qcow_sync.rs +++ b/block_util/src/qcow_sync.rs @@ -3,62 +3,50 @@ // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause use crate::async_io::{AsyncIo, AsyncIoResult, DiskFile, DiskFileError, DiskFileResult}; -use crate::{fsync_sync, read_vectored_sync, write_vectored_sync}; +use crate::{fsync_sync, read_vectored_sync, write_vectored_sync, ReadWriteSeekFile}; use qcow::{QcowFile, RawFile, Result as QcowResult}; use std::fs::File; -use std::io::{Seek, SeekFrom}; +use std::io::SeekFrom; use std::sync::{Arc, Mutex}; use vmm_sys_util::eventfd::EventFd; pub struct QcowDiskSync { - qcow_file: QcowFile, - semaphore: Arc>, + qcow_file: Arc>, } impl QcowDiskSync { pub fn new(file: File, direct_io: bool) -> QcowResult { Ok(QcowDiskSync { - qcow_file: QcowFile::from(RawFile::new(file, direct_io))?, - semaphore: Arc::new(Mutex::new(())), + qcow_file: Arc::new(Mutex::new(QcowFile::from(RawFile::new(file, direct_io))?)), }) } } impl DiskFile for QcowDiskSync { fn size(&mut self) -> DiskFileResult { - // Take the semaphore to ensure other threads are not interacting with - // the underlying file. - let _lock = self.semaphore.lock().unwrap(); + let mut file = self.qcow_file.lock().unwrap(); - Ok(self - .qcow_file - .seek(SeekFrom::End(0)) - .map_err(DiskFileError::Size)? as u64) + Ok(file.seek(SeekFrom::End(0)).map_err(DiskFileError::Size)? as u64) } fn new_async_io(&self, _ring_depth: u32) -> DiskFileResult> { - Ok(Box::new(QcowSync::new( - self.qcow_file.clone(), - self.semaphore.clone(), - )) as Box) + Ok(Box::new(QcowSync::new(self.qcow_file.clone())) as Box) } } pub struct QcowSync { - qcow_file: QcowFile, + qcow_file: Arc>, eventfd: EventFd, completion_list: Vec<(u64, i32)>, - semaphore: Arc>, } impl QcowSync { - pub fn new(qcow_file: QcowFile, semaphore: Arc>) -> Self { + pub fn new(qcow_file: Arc>) -> Self { QcowSync { qcow_file, eventfd: EventFd::new(libc::EFD_NONBLOCK) .expect("Failed creating EventFd for QcowSync"), completion_list: Vec::new(), - semaphore, } } } @@ -81,7 +69,6 @@ impl AsyncIo for QcowSync { &mut self.qcow_file, &self.eventfd, &mut self.completion_list, - &mut self.semaphore, ) } @@ -98,7 +85,6 @@ impl AsyncIo for QcowSync { &mut self.qcow_file, &self.eventfd, &mut self.completion_list, - &mut self.semaphore, ) } @@ -108,7 +94,6 @@ impl AsyncIo for QcowSync { &mut self.qcow_file, &self.eventfd, &mut self.completion_list, - &mut self.semaphore, ) } diff --git a/block_util/src/vhdx_sync.rs b/block_util/src/vhdx_sync.rs index 1d36157fa..6846b6dcf 100644 --- a/block_util/src/vhdx_sync.rs +++ b/block_util/src/vhdx_sync.rs @@ -3,57 +3,58 @@ // SPDX-License-Identifier: Apache-2.0 use crate::async_io::{AsyncIo, AsyncIoResult, DiskFile, DiskFileError, DiskFileResult}; -use crate::{fsync_sync, read_vectored_sync, write_vectored_sync}; +use crate::{fsync_sync, read_vectored_sync, write_vectored_sync, ReadWriteSeekFile}; use std::fs::File; -use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use vhdx::vhdx::{Result as VhdxResult, Vhdx}; use vmm_sys_util::eventfd::EventFd; pub struct VhdxDiskSync { - vhdx_file: Arc>, - semaphore: Arc>, + vhdx_file: Arc>, } impl VhdxDiskSync { pub fn new(f: File) -> VhdxResult { - let vhdx = Vhdx::new(f)?; - let vhdx_file = Arc::new(Mutex::new(vhdx)); - Ok(VhdxDiskSync { - vhdx_file, - semaphore: Arc::new(Mutex::new(())), + vhdx_file: Arc::new(Mutex::new(Vhdx::new(f)?)), }) } } impl DiskFile for VhdxDiskSync { fn size(&mut self) -> DiskFileResult { - Ok(self.vhdx_file.lock().unwrap().virtual_disk_size()) + Ok(self + .vhdx_file + .lock() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .virtual_disk_size()) } fn new_async_io(&self, _ring_depth: u32) -> DiskFileResult> { - Ok(Box::new( - VhdxSync::new(self.vhdx_file.clone(), self.semaphore.clone()) - .map_err(DiskFileError::NewAsyncIo)?, - ) as Box) + Ok( + Box::new(VhdxSync::new(self.vhdx_file.clone()).map_err(DiskFileError::NewAsyncIo)?) + as Box, + ) } } pub struct VhdxSync { - vhdx_file: Arc>, + vhdx_file: Arc>, eventfd: EventFd, completion_list: Vec<(u64, i32)>, - semaphore: Arc>, } impl VhdxSync { - pub fn new(vhdx_file: Arc>, semaphore: Arc>) -> std::io::Result { + pub fn new( + vhdx_file: Arc>, + ) -> std::io::Result { Ok(VhdxSync { vhdx_file, eventfd: EventFd::new(libc::EFD_NONBLOCK)?, completion_list: Vec::new(), - semaphore, }) } } @@ -73,10 +74,9 @@ impl AsyncIo for VhdxSync { offset, iovecs, user_data, - self.vhdx_file.lock().unwrap().deref_mut(), + &mut self.vhdx_file, &self.eventfd, &mut self.completion_list, - &mut self.semaphore, ) } @@ -90,20 +90,18 @@ impl AsyncIo for VhdxSync { offset, iovecs, user_data, - self.vhdx_file.lock().unwrap().deref_mut(), + &mut self.vhdx_file, &self.eventfd, &mut self.completion_list, - &mut self.semaphore, ) } fn fsync(&mut self, user_data: Option) -> AsyncIoResult<()> { fsync_sync( user_data, - self.vhdx_file.lock().unwrap().deref_mut(), + &mut self.vhdx_file, &self.eventfd, &mut self.completion_list, - &mut self.semaphore, ) }