mirror of
https://github.com/cloud-hypervisor/cloud-hypervisor.git
synced 2025-02-08 12:41:35 +00:00
block_util: handle synchronized read/write/fsync idiomatically
Previously mutex (semaphore) and file were separated. The code needed to create artificial scopes to use mutex to protect file. Rewrite the code to be idiomatic. The file itself is turned into a trait object and placed inside the mutex. This requires providing a new ReadWriteSeekFile trait to unify all helper functions. The rewrite further simplified vhdx_sync code. The original code contained two mutex'es for no apparent reason. No functional change intended. Signed-off-by: Wei Liu <liuwe@microsoft.com>
This commit is contained in:
parent
3e536f91eb
commit
e1151482fc
@ -23,6 +23,7 @@ pub mod vhdx_sync;
|
|||||||
use crate::async_io::{AsyncIo, AsyncIoError, AsyncIoResult};
|
use crate::async_io::{AsyncIo, AsyncIoError, AsyncIoResult};
|
||||||
#[cfg(feature = "io_uring")]
|
#[cfg(feature = "io_uring")]
|
||||||
use io_uring::{opcode, IoUring, Probe};
|
use io_uring::{opcode, IoUring, Probe};
|
||||||
|
use std::any::Any;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@ -476,17 +477,25 @@ pub fn block_io_uring_is_supported() -> bool {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ReadSeekFile: Read + Seek {}
|
pub trait ReadWriteSeekFile: Read + Write + Seek {
|
||||||
impl<F: Read + Seek> ReadSeekFile for F {}
|
// Provides a mutable reference to the Any trait. This is useful to let
|
||||||
|
// the caller have access to the underlying type behind the trait.
|
||||||
|
fn as_any(&mut self) -> &mut dyn Any;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: Read + Write + Seek + 'static> ReadWriteSeekFile for F {
|
||||||
|
fn as_any(&mut self) -> &mut dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn read_vectored_sync(
|
pub fn read_vectored_sync(
|
||||||
offset: libc::off_t,
|
offset: libc::off_t,
|
||||||
iovecs: Vec<libc::iovec>,
|
iovecs: Vec<libc::iovec>,
|
||||||
user_data: u64,
|
user_data: u64,
|
||||||
file: &mut dyn ReadSeekFile,
|
file: &mut Arc<Mutex<dyn ReadWriteSeekFile + Send + Sync>>,
|
||||||
eventfd: &EventFd,
|
eventfd: &EventFd,
|
||||||
completion_list: &mut Vec<(u64, i32)>,
|
completion_list: &mut Vec<(u64, i32)>,
|
||||||
semaphore: &mut Arc<Mutex<()>>,
|
|
||||||
) -> AsyncIoResult<()> {
|
) -> AsyncIoResult<()> {
|
||||||
// Convert libc::iovec into IoSliceMut
|
// Convert libc::iovec into IoSliceMut
|
||||||
let mut slices = Vec::new();
|
let mut slices = Vec::new();
|
||||||
@ -495,9 +504,7 @@ pub fn read_vectored_sync(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let result = {
|
let result = {
|
||||||
// Take the semaphore to ensure other threads are not interacting
|
let mut file = file.lock().unwrap();
|
||||||
// with the underlying file.
|
|
||||||
let _lock = semaphore.lock().unwrap();
|
|
||||||
|
|
||||||
// Move the cursor to the right offset
|
// Move the cursor to the right offset
|
||||||
file.seek(SeekFrom::Start(offset as u64))
|
file.seek(SeekFrom::Start(offset as u64))
|
||||||
@ -514,17 +521,13 @@ pub fn read_vectored_sync(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait WriteSeekFile: Write + Seek {}
|
|
||||||
impl<F: Write + Seek> WriteSeekFile for F {}
|
|
||||||
|
|
||||||
pub fn write_vectored_sync(
|
pub fn write_vectored_sync(
|
||||||
offset: libc::off_t,
|
offset: libc::off_t,
|
||||||
iovecs: Vec<libc::iovec>,
|
iovecs: Vec<libc::iovec>,
|
||||||
user_data: u64,
|
user_data: u64,
|
||||||
file: &mut dyn WriteSeekFile,
|
file: &mut Arc<Mutex<dyn ReadWriteSeekFile + Sync + Send>>,
|
||||||
eventfd: &EventFd,
|
eventfd: &EventFd,
|
||||||
completion_list: &mut Vec<(u64, i32)>,
|
completion_list: &mut Vec<(u64, i32)>,
|
||||||
semaphore: &mut Arc<Mutex<()>>,
|
|
||||||
) -> AsyncIoResult<()> {
|
) -> AsyncIoResult<()> {
|
||||||
// Convert libc::iovec into IoSlice
|
// Convert libc::iovec into IoSlice
|
||||||
let mut slices = Vec::new();
|
let mut slices = Vec::new();
|
||||||
@ -533,9 +536,7 @@ pub fn write_vectored_sync(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let result = {
|
let result = {
|
||||||
// Take the semaphore to ensure other threads are not interacting
|
let mut file = file.lock().unwrap();
|
||||||
// with the underlying file.
|
|
||||||
let _lock = semaphore.lock().unwrap();
|
|
||||||
|
|
||||||
// Move the cursor to the right offset
|
// Move the cursor to the right offset
|
||||||
file.seek(SeekFrom::Start(offset as u64))
|
file.seek(SeekFrom::Start(offset as u64))
|
||||||
@ -554,15 +555,12 @@ pub fn write_vectored_sync(
|
|||||||
|
|
||||||
pub fn fsync_sync(
|
pub fn fsync_sync(
|
||||||
user_data: Option<u64>,
|
user_data: Option<u64>,
|
||||||
file: &mut dyn Write,
|
file: &mut Arc<Mutex<dyn ReadWriteSeekFile + Sync + Send>>,
|
||||||
eventfd: &EventFd,
|
eventfd: &EventFd,
|
||||||
completion_list: &mut Vec<(u64, i32)>,
|
completion_list: &mut Vec<(u64, i32)>,
|
||||||
semaphore: &mut Arc<Mutex<()>>,
|
|
||||||
) -> AsyncIoResult<()> {
|
) -> AsyncIoResult<()> {
|
||||||
let result: i32 = {
|
let result: i32 = {
|
||||||
// Take the semaphore to ensure other threads are not interacting
|
let mut file = file.lock().unwrap();
|
||||||
// with the underlying file.
|
|
||||||
let _lock = semaphore.lock().unwrap();
|
|
||||||
|
|
||||||
// Flush
|
// Flush
|
||||||
file.flush().map_err(AsyncIoError::Fsync)?;
|
file.flush().map_err(AsyncIoError::Fsync)?;
|
||||||
|
@ -3,62 +3,50 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
|
// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
|
||||||
|
|
||||||
use crate::async_io::{AsyncIo, AsyncIoResult, DiskFile, DiskFileError, DiskFileResult};
|
use crate::async_io::{AsyncIo, AsyncIoResult, DiskFile, DiskFileError, DiskFileResult};
|
||||||
use crate::{fsync_sync, read_vectored_sync, write_vectored_sync};
|
use crate::{fsync_sync, read_vectored_sync, write_vectored_sync, ReadWriteSeekFile};
|
||||||
use qcow::{QcowFile, RawFile, Result as QcowResult};
|
use qcow::{QcowFile, RawFile, Result as QcowResult};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{Seek, SeekFrom};
|
use std::io::SeekFrom;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use vmm_sys_util::eventfd::EventFd;
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
pub struct QcowDiskSync {
|
pub struct QcowDiskSync {
|
||||||
qcow_file: QcowFile,
|
qcow_file: Arc<Mutex<dyn ReadWriteSeekFile + Send + Sync>>,
|
||||||
semaphore: Arc<Mutex<()>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QcowDiskSync {
|
impl QcowDiskSync {
|
||||||
pub fn new(file: File, direct_io: bool) -> QcowResult<Self> {
|
pub fn new(file: File, direct_io: bool) -> QcowResult<Self> {
|
||||||
Ok(QcowDiskSync {
|
Ok(QcowDiskSync {
|
||||||
qcow_file: QcowFile::from(RawFile::new(file, direct_io))?,
|
qcow_file: Arc::new(Mutex::new(QcowFile::from(RawFile::new(file, direct_io))?)),
|
||||||
semaphore: Arc::new(Mutex::new(())),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DiskFile for QcowDiskSync {
|
impl DiskFile for QcowDiskSync {
|
||||||
fn size(&mut self) -> DiskFileResult<u64> {
|
fn size(&mut self) -> DiskFileResult<u64> {
|
||||||
// Take the semaphore to ensure other threads are not interacting with
|
let mut file = self.qcow_file.lock().unwrap();
|
||||||
// the underlying file.
|
|
||||||
let _lock = self.semaphore.lock().unwrap();
|
|
||||||
|
|
||||||
Ok(self
|
Ok(file.seek(SeekFrom::End(0)).map_err(DiskFileError::Size)? as u64)
|
||||||
.qcow_file
|
|
||||||
.seek(SeekFrom::End(0))
|
|
||||||
.map_err(DiskFileError::Size)? as u64)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_async_io(&self, _ring_depth: u32) -> DiskFileResult<Box<dyn AsyncIo>> {
|
fn new_async_io(&self, _ring_depth: u32) -> DiskFileResult<Box<dyn AsyncIo>> {
|
||||||
Ok(Box::new(QcowSync::new(
|
Ok(Box::new(QcowSync::new(self.qcow_file.clone())) as Box<dyn AsyncIo>)
|
||||||
self.qcow_file.clone(),
|
|
||||||
self.semaphore.clone(),
|
|
||||||
)) as Box<dyn AsyncIo>)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct QcowSync {
|
pub struct QcowSync {
|
||||||
qcow_file: QcowFile,
|
qcow_file: Arc<Mutex<dyn ReadWriteSeekFile + Send + Sync>>,
|
||||||
eventfd: EventFd,
|
eventfd: EventFd,
|
||||||
completion_list: Vec<(u64, i32)>,
|
completion_list: Vec<(u64, i32)>,
|
||||||
semaphore: Arc<Mutex<()>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QcowSync {
|
impl QcowSync {
|
||||||
pub fn new(qcow_file: QcowFile, semaphore: Arc<Mutex<()>>) -> Self {
|
pub fn new(qcow_file: Arc<Mutex<dyn ReadWriteSeekFile + Send + Sync>>) -> Self {
|
||||||
QcowSync {
|
QcowSync {
|
||||||
qcow_file,
|
qcow_file,
|
||||||
eventfd: EventFd::new(libc::EFD_NONBLOCK)
|
eventfd: EventFd::new(libc::EFD_NONBLOCK)
|
||||||
.expect("Failed creating EventFd for QcowSync"),
|
.expect("Failed creating EventFd for QcowSync"),
|
||||||
completion_list: Vec::new(),
|
completion_list: Vec::new(),
|
||||||
semaphore,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -81,7 +69,6 @@ impl AsyncIo for QcowSync {
|
|||||||
&mut self.qcow_file,
|
&mut self.qcow_file,
|
||||||
&self.eventfd,
|
&self.eventfd,
|
||||||
&mut self.completion_list,
|
&mut self.completion_list,
|
||||||
&mut self.semaphore,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,7 +85,6 @@ impl AsyncIo for QcowSync {
|
|||||||
&mut self.qcow_file,
|
&mut self.qcow_file,
|
||||||
&self.eventfd,
|
&self.eventfd,
|
||||||
&mut self.completion_list,
|
&mut self.completion_list,
|
||||||
&mut self.semaphore,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,7 +94,6 @@ impl AsyncIo for QcowSync {
|
|||||||
&mut self.qcow_file,
|
&mut self.qcow_file,
|
||||||
&self.eventfd,
|
&self.eventfd,
|
||||||
&mut self.completion_list,
|
&mut self.completion_list,
|
||||||
&mut self.semaphore,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,57 +3,58 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use crate::async_io::{AsyncIo, AsyncIoResult, DiskFile, DiskFileError, DiskFileResult};
|
use crate::async_io::{AsyncIo, AsyncIoResult, DiskFile, DiskFileError, DiskFileResult};
|
||||||
use crate::{fsync_sync, read_vectored_sync, write_vectored_sync};
|
use crate::{fsync_sync, read_vectored_sync, write_vectored_sync, ReadWriteSeekFile};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::ops::DerefMut;
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use vhdx::vhdx::{Result as VhdxResult, Vhdx};
|
use vhdx::vhdx::{Result as VhdxResult, Vhdx};
|
||||||
use vmm_sys_util::eventfd::EventFd;
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
pub struct VhdxDiskSync {
|
pub struct VhdxDiskSync {
|
||||||
vhdx_file: Arc<Mutex<Vhdx>>,
|
vhdx_file: Arc<Mutex<dyn ReadWriteSeekFile + Sync + Send>>,
|
||||||
semaphore: Arc<Mutex<()>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VhdxDiskSync {
|
impl VhdxDiskSync {
|
||||||
pub fn new(f: File) -> VhdxResult<Self> {
|
pub fn new(f: File) -> VhdxResult<Self> {
|
||||||
let vhdx = Vhdx::new(f)?;
|
|
||||||
let vhdx_file = Arc::new(Mutex::new(vhdx));
|
|
||||||
|
|
||||||
Ok(VhdxDiskSync {
|
Ok(VhdxDiskSync {
|
||||||
vhdx_file,
|
vhdx_file: Arc::new(Mutex::new(Vhdx::new(f)?)),
|
||||||
semaphore: Arc::new(Mutex::new(())),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DiskFile for VhdxDiskSync {
|
impl DiskFile for VhdxDiskSync {
|
||||||
fn size(&mut self) -> DiskFileResult<u64> {
|
fn size(&mut self) -> DiskFileResult<u64> {
|
||||||
Ok(self.vhdx_file.lock().unwrap().virtual_disk_size())
|
Ok(self
|
||||||
|
.vhdx_file
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<Vhdx>()
|
||||||
|
.unwrap()
|
||||||
|
.virtual_disk_size())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_async_io(&self, _ring_depth: u32) -> DiskFileResult<Box<dyn AsyncIo>> {
|
fn new_async_io(&self, _ring_depth: u32) -> DiskFileResult<Box<dyn AsyncIo>> {
|
||||||
Ok(Box::new(
|
Ok(
|
||||||
VhdxSync::new(self.vhdx_file.clone(), self.semaphore.clone())
|
Box::new(VhdxSync::new(self.vhdx_file.clone()).map_err(DiskFileError::NewAsyncIo)?)
|
||||||
.map_err(DiskFileError::NewAsyncIo)?,
|
as Box<dyn AsyncIo>,
|
||||||
) as Box<dyn AsyncIo>)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct VhdxSync {
|
pub struct VhdxSync {
|
||||||
vhdx_file: Arc<Mutex<Vhdx>>,
|
vhdx_file: Arc<Mutex<dyn ReadWriteSeekFile + Sync + Send>>,
|
||||||
eventfd: EventFd,
|
eventfd: EventFd,
|
||||||
completion_list: Vec<(u64, i32)>,
|
completion_list: Vec<(u64, i32)>,
|
||||||
semaphore: Arc<Mutex<()>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VhdxSync {
|
impl VhdxSync {
|
||||||
pub fn new(vhdx_file: Arc<Mutex<Vhdx>>, semaphore: Arc<Mutex<()>>) -> std::io::Result<Self> {
|
pub fn new(
|
||||||
|
vhdx_file: Arc<Mutex<dyn ReadWriteSeekFile + Sync + Send>>,
|
||||||
|
) -> std::io::Result<Self> {
|
||||||
Ok(VhdxSync {
|
Ok(VhdxSync {
|
||||||
vhdx_file,
|
vhdx_file,
|
||||||
eventfd: EventFd::new(libc::EFD_NONBLOCK)?,
|
eventfd: EventFd::new(libc::EFD_NONBLOCK)?,
|
||||||
completion_list: Vec::new(),
|
completion_list: Vec::new(),
|
||||||
semaphore,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -73,10 +74,9 @@ impl AsyncIo for VhdxSync {
|
|||||||
offset,
|
offset,
|
||||||
iovecs,
|
iovecs,
|
||||||
user_data,
|
user_data,
|
||||||
self.vhdx_file.lock().unwrap().deref_mut(),
|
&mut self.vhdx_file,
|
||||||
&self.eventfd,
|
&self.eventfd,
|
||||||
&mut self.completion_list,
|
&mut self.completion_list,
|
||||||
&mut self.semaphore,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,20 +90,18 @@ impl AsyncIo for VhdxSync {
|
|||||||
offset,
|
offset,
|
||||||
iovecs,
|
iovecs,
|
||||||
user_data,
|
user_data,
|
||||||
self.vhdx_file.lock().unwrap().deref_mut(),
|
&mut self.vhdx_file,
|
||||||
&self.eventfd,
|
&self.eventfd,
|
||||||
&mut self.completion_list,
|
&mut self.completion_list,
|
||||||
&mut self.semaphore,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fsync(&mut self, user_data: Option<u64>) -> AsyncIoResult<()> {
|
fn fsync(&mut self, user_data: Option<u64>) -> AsyncIoResult<()> {
|
||||||
fsync_sync(
|
fsync_sync(
|
||||||
user_data,
|
user_data,
|
||||||
self.vhdx_file.lock().unwrap().deref_mut(),
|
&mut self.vhdx_file,
|
||||||
&self.eventfd,
|
&self.eventfd,
|
||||||
&mut self.completion_list,
|
&mut self.completion_list,
|
||||||
&mut self.semaphore,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user