diff --git a/virtio-devices/src/block.rs b/virtio-devices/src/block.rs index 95411bb4d..8e3e5d6fe 100644 --- a/virtio-devices/src/block.rs +++ b/virtio-devices/src/block.rs @@ -40,6 +40,7 @@ use std::sync::{Arc, Barrier}; use thiserror::Error; use virtio_bindings::virtio_blk::*; use virtio_bindings::virtio_config::*; +use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use virtio_queue::{Queue, QueueOwnedT, QueueT}; use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; @@ -79,6 +80,8 @@ pub enum Error { QueueIterator(virtio_queue::Error), #[error("Failed to update request status: {0}")] RequestStatus(GuestMemoryError), + #[error("Failed to enable notification: {0}")] + QueueEnableNotification(virtio_queue::Error), } pub type Result = result::Result; @@ -137,11 +140,9 @@ struct BlockEpollHandler { } impl BlockEpollHandler { - fn process_queue_submit(&mut self) -> Result { + fn process_queue_submit(&mut self) -> Result<()> { let queue = &mut self.queue; - let mut used_descs = false; - while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) .map_err(Error::RequestParsing)?; @@ -163,7 +164,9 @@ impl BlockEpollHandler { queue .add_used(desc_chain.memory(), desc_chain.head_index(), 0) .map_err(Error::QueueAddUsed)?; - used_descs = true; + queue + .enable_notification(self.mem.memory().deref()) + .map_err(Error::QueueEnableNotification)?; continue; } @@ -223,23 +226,34 @@ impl BlockEpollHandler { queue .add_used(desc_chain.memory(), desc_chain.head_index(), 0) .map_err(Error::QueueAddUsed)?; - used_descs = true; + queue + .enable_notification(self.mem.memory().deref()) + .map_err(Error::QueueEnableNotification)?; } } - Ok(used_descs) + Ok(()) } fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> { - let needs_notification = self.process_queue_submit().map_err(|e| { + self.process_queue_submit().map_err(|e| { EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) })?; - if needs_notification { + if self + .queue + .needs_notification(self.mem.memory().deref()) + .map_err(|e| { + EpollHelperError::HandleEvent(anyhow!( + "Failed to check needs_notification: {:?}", + e + )) + })? + { self.signal_used_queue().map_err(|e| { EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e)) - })? - }; + })?; + } Ok(()) } @@ -265,8 +279,7 @@ impl BlockEpollHandler { Err(Error::MissingEntryRequestList) } - fn process_queue_complete(&mut self) -> Result { - let mut used_descs = false; + fn process_queue_complete(&mut self) -> Result<()> { let mem = self.mem.memory(); let mut read_bytes = Wrapping(0); let mut write_bytes = Wrapping(0); @@ -379,7 +392,9 @@ impl BlockEpollHandler { queue .add_used(mem.deref(), desc_index, len) .map_err(Error::QueueAddUsed)?; - used_descs = true; + queue + .enable_notification(mem.deref()) + .map_err(Error::QueueEnableNotification)?; } self.counters @@ -396,7 +411,7 @@ impl BlockEpollHandler { .read_ops .fetch_add(read_ops.0, Ordering::AcqRel); - Ok(used_descs) + Ok(()) } fn signal_used_queue(&self) -> result::Result<(), DeviceError> { @@ -487,20 +502,19 @@ impl EpollHelperHandler for BlockEpollHandler { EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) })?; - let needs_notification = self.process_queue_complete().map_err(|e| { + self.process_queue_complete().map_err(|e| { EpollHelperError::HandleEvent(anyhow!( "Failed to process queue (complete): {:?}", e )) })?; - if needs_notification { - self.signal_used_queue().map_err(|e| { - EpollHelperError::HandleEvent(anyhow!( - "Failed to signal used queue: {:?}", - e - )) - })?; + let rate_limit_reached = + self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked()); + + // Process the queue only when the rate limit is not reached + if !rate_limit_reached { + self.process_queue_submit_and_signal()? } } RATE_LIMITER_EVENT => { @@ -606,7 +620,8 @@ impl Block { | (1u64 << VIRTIO_BLK_F_FLUSH) | (1u64 << VIRTIO_BLK_F_CONFIG_WCE) | (1u64 << VIRTIO_BLK_F_BLK_SIZE) - | (1u64 << VIRTIO_BLK_F_TOPOLOGY); + | (1u64 << VIRTIO_BLK_F_TOPOLOGY) + | (1u64 << VIRTIO_RING_F_EVENT_IDX); if iommu { avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; @@ -779,8 +794,12 @@ impl VirtioDevice for Block { self.update_writeback(); let mut epoll_threads = Vec::new(); + let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into()); + for i in 0..queues.len() { - let (_, queue, queue_evt) = queues.remove(0); + let (_, mut queue, queue_evt) = queues.remove(0); + queue.set_event_idx(event_idx); + let queue_size = queue.size(); let (kill_evt, pause_evt) = self.common.dup_eventfds(); let queue_idx = i as u16;