virtio-devices: Improve error handling for virtio-blk io_uring

Instead of just logging error messages but continue the processing of
the queues, this patch returns errors right away. This allows for a
quicker detection of an error happening on the virtqueue.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-07-30 11:58:29 +02:00 committed by Rob Bradford
parent 917027c55b
commit a85304196e

View File

@ -67,8 +67,18 @@ pub enum Error {
InvalidOffset,
/// Unsupported operation on the disk.
Unsupported(u32),
/// Failed to parse the request.
RequestParsing(block_util::Error),
/// Failed to execute the request.
RequestExecuting(block_util::ExecuteError),
/// Missing the expected entry in the list of requests.
MissingEntryRequestList,
/// The asynchronous request returned with failure.
AsyncRequestFailure,
}
pub type Result<T> = result::Result<T, Error>;
#[derive(Default, Clone)]
pub struct BlockCounters {
read_bytes: Arc<AtomicU64>,
@ -95,7 +105,7 @@ struct BlockIoUringEpollHandler {
}
impl BlockIoUringEpollHandler {
fn process_queue_submit(&mut self) -> bool {
fn process_queue_submit(&mut self) -> Result<bool> {
let queue = &mut self.queue;
let mem = self.mem.memory();
@ -103,44 +113,29 @@ impl BlockIoUringEpollHandler {
let mut used_count = 0;
for avail_desc in queue.iter(&mem) {
match Request::parse(&avail_desc, &mem) {
Ok(mut request) => {
request.set_writeback(self.writeback.load(Ordering::SeqCst));
let (status, len) = match request.execute_io_uring(
&mem,
&mut self.io_uring,
self.disk_nsectors,
self.disk_image_fd,
&self.disk_image_id,
avail_desc.index as u64,
) {
Ok(async_submitted) => {
if async_submitted {
self.request_list.insert(avail_desc.index, request);
continue;
} else {
// If no asynchronous operation has been
// submitted, we can simply return the used
// descriptor.
(VIRTIO_BLK_S_OK, 0)
}
}
Err(e) => {
error!("Failed to execute request: {:?}", e);
(e.status(), 1)
}
};
let mut request = Request::parse(&avail_desc, &mem).map_err(Error::RequestParsing)?;
request.set_writeback(self.writeback.load(Ordering::SeqCst));
if request
.execute_io_uring(
&mem,
&mut self.io_uring,
self.disk_nsectors,
self.disk_image_fd,
&self.disk_image_id,
avail_desc.index as u64,
)
.map_err(Error::RequestExecuting)?
{
self.request_list.insert(avail_desc.index, request);
} else {
// We use unwrap because the request parsing process already
// checked that the status_addr was valid.
mem.write_obj(VIRTIO_BLK_S_OK, request.status_addr).unwrap();
used_desc_heads.push((avail_desc.index, len));
used_count += 1;
// We use unwrap because the request parsing process
// already checked that the status_addr was valid.
mem.write_obj(status, request.status_addr).unwrap();
}
Err(e) => {
error!("Failed to parse available descriptor chain: {:?}", e);
}
// If no asynchronous operation has been submitted, we can
// simply return the used descriptor.
used_desc_heads.push((avail_desc.index, 0));
used_count += 1;
}
}
@ -148,10 +143,10 @@ impl BlockIoUringEpollHandler {
queue.add_used(&mem, desc_index, len);
}
used_count > 0
Ok(used_count > 0)
}
fn process_queue_complete(&mut self) -> bool {
fn process_queue_complete(&mut self) -> Result<bool> {
let queue = &mut self.queue;
let mut used_desc_heads = Vec::new();
@ -166,16 +161,10 @@ impl BlockIoUringEpollHandler {
for cq_entry in cq.available() {
let result = cq_entry.result();
let desc_index = cq_entry.user_data() as u16;
let request = match self.request_list.remove(&desc_index) {
Some(req) => req,
None => {
error!(
"Could not find the descriptor index {} from the request list",
desc_index
);
continue;
}
};
let request = self
.request_list
.remove(&desc_index)
.ok_or(Error::MissingEntryRequestList)?;
let (status, len) = if result >= 0 {
match request.request_type {
@ -191,15 +180,17 @@ impl BlockIoUringEpollHandler {
write_ops += Wrapping(1);
}
_ => {}
};
}
(VIRTIO_BLK_S_OK, result as u32)
} else {
error!(
"Request failed: {:?}",
io::Error::from_raw_os_error(-result)
);
(VIRTIO_BLK_S_IOERR, 1)
return Err(Error::AsyncRequestFailure);
};
// We use unwrap because the request parsing process already
// checked that the status_addr was valid.
mem.write_obj(status, request.status_addr).unwrap();
@ -226,7 +217,7 @@ impl BlockIoUringEpollHandler {
.read_ops
.fetch_add(read_ops.0, Ordering::AcqRel);
used_count > 0
Ok(used_count > 0)
}
fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
@ -255,9 +246,19 @@ impl EpollHelperHandler for BlockIoUringEpollHandler {
if let Err(e) = self.queue_evt.read() {
error!("Failed to get queue event: {:?}", e);
return true;
} else if self.process_queue_submit() {
if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e);
}
match self.process_queue_submit() {
Ok(needs_notification) => {
if needs_notification {
if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e);
return true;
}
}
}
Err(e) => {
error!("Failed to process queue (submit): {:?}", e);
return true;
}
}
@ -266,9 +267,19 @@ impl EpollHelperHandler for BlockIoUringEpollHandler {
if let Err(e) = self.io_uring_evt.read() {
error!("Failed to get queue event: {:?}", e);
return true;
} else if self.process_queue_complete() {
if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e);
}
match self.process_queue_complete() {
Ok(needs_notification) => {
if needs_notification {
if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e);
return true;
}
}
}
Err(e) => {
error!("Failed to process queue (complete): {:?}", e);
return true;
}
}