fix(virtq): only enable_notification when about to stop consumption

Signed-off-by: ihciah <ihciah@gmail.com>
This commit is contained in:
ihciah 2024-08-12 15:01:11 +00:00
parent 1c7997c5c3
commit 698de9315b
10 changed files with 107 additions and 57 deletions

10
Cargo.lock generated
View File

@ -1318,6 +1318,7 @@ dependencies = [
"serde_json", "serde_json",
"thiserror", "thiserror",
"virtio-bindings", "virtio-bindings",
"virtio-ext",
"virtio-queue", "virtio-queue",
"vm-memory", "vm-memory",
"vm-virtio", "vm-virtio",
@ -2368,6 +2369,7 @@ dependencies = [
"thiserror", "thiserror",
"vhost", "vhost",
"virtio-bindings", "virtio-bindings",
"virtio-ext",
"virtio-queue", "virtio-queue",
"vm-allocator", "vm-allocator",
"vm-device", "vm-device",
@ -2377,6 +2379,14 @@ dependencies = [
"vmm-sys-util", "vmm-sys-util",
] ]
[[package]]
name = "virtio-ext"
version = "0.1.0"
dependencies = [
"virtio-queue",
"vm-memory",
]
[[package]] [[package]]
name = "virtio-queue" name = "virtio-queue"
version = "0.12.0" version = "0.12.0"

View File

@ -94,6 +94,7 @@ members = [
"vhost_user_block", "vhost_user_block",
"vhost_user_net", "vhost_user_net",
"virtio-devices", "virtio-devices",
"virtio-ext",
"vm-allocator", "vm-allocator",
"vm-device", "vm-device",
"vm-migration", "vm-migration",

View File

@ -14,6 +14,7 @@ rate_limiter = { path = "../rate_limiter" }
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
thiserror = "1.0.62" thiserror = "1.0.62"
virtio-bindings = "0.2.2" virtio-bindings = "0.2.2"
virtio-ext = { path = "../virtio-ext" }
virtio-queue = "0.12.0" virtio-queue = "0.12.0"
vm-memory = { version = "0.14.1", features = [ vm-memory = { version = "0.14.1", features = [
"backend-atomic", "backend-atomic",

View File

@ -13,6 +13,7 @@ use virtio_bindings::virtio_net::{
VIRTIO_NET_F_GUEST_ECN, VIRTIO_NET_F_GUEST_TSO4, VIRTIO_NET_F_GUEST_TSO6, VIRTIO_NET_F_GUEST_ECN, VIRTIO_NET_F_GUEST_TSO4, VIRTIO_NET_F_GUEST_TSO6,
VIRTIO_NET_F_GUEST_UFO, VIRTIO_NET_OK, VIRTIO_NET_F_GUEST_UFO, VIRTIO_NET_OK,
}; };
use virtio_ext::QueueExt;
use virtio_queue::{Queue, QueueT}; use virtio_queue::{Queue, QueueT};
use vm_memory::{ByteValued, Bytes, GuestMemoryError}; use vm_memory::{ByteValued, Bytes, GuestMemoryError};
use vm_virtio::{AccessPlatform, Translatable}; use vm_virtio::{AccessPlatform, Translatable};
@ -62,7 +63,10 @@ impl CtrlQueue {
queue: &mut Queue, queue: &mut Queue,
access_platform: Option<&Arc<dyn AccessPlatform>>, access_platform: Option<&Arc<dyn AccessPlatform>>,
) -> Result<()> { ) -> Result<()> {
while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { while let Some(mut desc_chain) = queue
.pop_desc_chain_with_notification(mem)
.map_err(Error::QueueEnableNotification)?
{
let ctrl_desc = desc_chain.next().ok_or(Error::NoControlHeaderDescriptor)?; let ctrl_desc = desc_chain.next().ok_or(Error::NoControlHeaderDescriptor)?;
let ctrl_hdr: ControlHeader = desc_chain let ctrl_hdr: ControlHeader = desc_chain
@ -142,13 +146,6 @@ impl CtrlQueue {
queue queue
.add_used(desc_chain.memory(), desc_chain.head_index(), len) .add_used(desc_chain.memory(), desc_chain.head_index(), len)
.map_err(Error::QueueAddUsed)?; .map_err(Error::QueueAddUsed)?;
if !queue
.enable_notification(mem)
.map_err(Error::QueueEnableNotification)?
{
break;
}
} }
Ok(()) Ok(())

View File

@ -10,6 +10,7 @@ use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use thiserror::Error; use thiserror::Error;
use virtio_ext::QueueExt;
use virtio_queue::{Queue, QueueOwnedT, QueueT}; use virtio_queue::{Queue, QueueOwnedT, QueueT};
use vm_memory::bitmap::Bitmap; use vm_memory::bitmap::Bitmap;
use vm_memory::{Bytes, GuestMemory}; use vm_memory::{Bytes, GuestMemory};
@ -46,7 +47,10 @@ impl TxVirtio {
let mut retry_write = false; let mut retry_write = false;
let mut rate_limit_reached = false; let mut rate_limit_reached = false;
while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { while let Some(mut desc_chain) = queue
.pop_desc_chain_with_notification(mem)
.map_err(NetQueuePairError::QueueEnableNotification)?
{
if rate_limit_reached { if rate_limit_reached {
queue.go_to_previous_position(); queue.go_to_previous_position();
break; break;
@ -128,13 +132,6 @@ impl TxVirtio {
queue queue
.add_used(desc_chain.memory(), desc_chain.head_index(), len) .add_used(desc_chain.memory(), desc_chain.head_index(), len)
.map_err(NetQueuePairError::QueueAddUsed)?; .map_err(NetQueuePairError::QueueAddUsed)?;
if !queue
.enable_notification(mem)
.map_err(NetQueuePairError::QueueEnableNotification)?
{
break;
}
} }
Ok(retry_write) Ok(retry_write)
@ -172,7 +169,10 @@ impl RxVirtio {
let mut exhausted_descs = true; let mut exhausted_descs = true;
let mut rate_limit_reached = false; let mut rate_limit_reached = false;
while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { while let Some(mut desc_chain) = queue
.pop_desc_chain_with_notification(mem)
.map_err(NetQueuePairError::QueueEnableNotification)?
{
if rate_limit_reached { if rate_limit_reached {
exhausted_descs = false; exhausted_descs = false;
queue.go_to_previous_position(); queue.go_to_previous_position();
@ -275,13 +275,6 @@ impl RxVirtio {
queue queue
.add_used(desc_chain.memory(), desc_chain.head_index(), len) .add_used(desc_chain.memory(), desc_chain.head_index(), len)
.map_err(NetQueuePairError::QueueAddUsed)?; .map_err(NetQueuePairError::QueueAddUsed)?;
if !queue
.enable_notification(mem)
.map_err(NetQueuePairError::QueueEnableNotification)?
{
break;
}
} }
Ok(exhausted_descs) Ok(exhausted_descs)

View File

@ -384,11 +384,12 @@ impl VhostUserBackendMut for VhostUserBlkBackend {
// calling process_queue() until it stops finding new // calling process_queue() until it stops finding new
// requests on the queue. // requests on the queue.
loop { loop {
vring thread.process_queue(&mut vring);
if !vring
.get_queue_mut() .get_queue_mut()
.enable_notification(self.mem.memory().deref()) .enable_notification(self.mem.memory().deref())
.unwrap(); .unwrap()
if !thread.process_queue(&mut vring) { {
break; break;
} }
} }

View File

@ -36,6 +36,7 @@ vhost = { version = "0.11.0", features = [
"vhost-vdpa", "vhost-vdpa",
] } ] }
virtio-bindings = { version = "0.2.2", features = ["virtio-v5_0_0"] } virtio-bindings = { version = "0.2.2", features = ["virtio-v5_0_0"] }
virtio-ext = { path = "../virtio-ext" }
virtio-queue = "0.12.0" virtio-queue = "0.12.0"
vm-allocator = { path = "../vm-allocator" } vm-allocator = { path = "../vm-allocator" }
vm-device = { path = "../vm-device" } vm-device = { path = "../vm-device" }

View File

@ -41,6 +41,7 @@ use thiserror::Error;
use virtio_bindings::virtio_blk::*; use virtio_bindings::virtio_blk::*;
use virtio_bindings::virtio_config::*; use virtio_bindings::virtio_config::*;
use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_ext::QueueExt;
use virtio_queue::{Queue, QueueOwnedT, QueueT}; use virtio_queue::{Queue, QueueOwnedT, QueueT};
use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
@ -143,7 +144,10 @@ impl BlockEpollHandler {
fn process_queue_submit(&mut self) -> Result<()> { fn process_queue_submit(&mut self) -> Result<()> {
let queue = &mut self.queue; let queue = &mut self.queue;
while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { while let Some(mut desc_chain) = queue
.pop_desc_chain_with_notification(self.mem.memory())
.map_err(Error::QueueEnableNotification)?
{
let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
.map_err(Error::RequestParsing)?; .map_err(Error::RequestParsing)?;
@ -164,9 +168,6 @@ impl BlockEpollHandler {
queue queue
.add_used(desc_chain.memory(), desc_chain.head_index(), 0) .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
.map_err(Error::QueueAddUsed)?; .map_err(Error::QueueAddUsed)?;
queue
.enable_notification(self.mem.memory().deref())
.map_err(Error::QueueEnableNotification)?;
continue; continue;
} }
@ -226,9 +227,6 @@ impl BlockEpollHandler {
queue queue
.add_used(desc_chain.memory(), desc_chain.head_index(), 0) .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
.map_err(Error::QueueAddUsed)?; .map_err(Error::QueueAddUsed)?;
queue
.enable_notification(self.mem.memory().deref())
.map_err(Error::QueueEnableNotification)?;
} }
} }
@ -240,22 +238,7 @@ impl BlockEpollHandler {
EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
})?; })?;
if self self.try_signal_used_queue()
.queue
.needs_notification(self.mem.memory().deref())
.map_err(|e| {
EpollHelperError::HandleEvent(anyhow!(
"Failed to check needs_notification: {:?}",
e
))
})?
{
self.signal_used_queue().map_err(|e| {
EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
})?;
}
Ok(())
} }
#[inline] #[inline]
@ -387,14 +370,9 @@ impl BlockEpollHandler {
mem.write_obj(status, request.status_addr) mem.write_obj(status, request.status_addr)
.map_err(Error::RequestStatus)?; .map_err(Error::RequestStatus)?;
let queue = &mut self.queue; self.queue
queue
.add_used(mem.deref(), desc_index, len) .add_used(mem.deref(), desc_index, len)
.map_err(Error::QueueAddUsed)?; .map_err(Error::QueueAddUsed)?;
queue
.enable_notification(mem.deref())
.map_err(Error::QueueEnableNotification)?;
} }
self.counters self.counters
@ -423,6 +401,25 @@ impl BlockEpollHandler {
}) })
} }
fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> {
if self
.queue
.needs_notification(self.mem.memory().deref())
.map_err(|e| {
EpollHelperError::HandleEvent(anyhow!(
"Failed to check needs_notification: {:?}",
e
))
})?
{
self.signal_used_queue().map_err(|e| {
EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
})?;
}
Ok(())
}
fn set_queue_thread_affinity(&self) { fn set_queue_thread_affinity(&self) {
// Prepare the CPU set the current queue thread is expected to run onto. // Prepare the CPU set the current queue thread is expected to run onto.
let cpuset = self.host_cpus.as_ref().map(|host_cpus| { let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
@ -514,8 +511,14 @@ impl EpollHelperHandler for BlockEpollHandler {
// Process the queue only when the rate limit is not reached // Process the queue only when the rate limit is not reached
if !rate_limit_reached { if !rate_limit_reached {
self.process_queue_submit_and_signal()? self.process_queue_submit().map_err(|e| {
EpollHelperError::HandleEvent(anyhow!(
"Failed to process queue (submit): {:?}",
e
))
})?;
} }
self.try_signal_used_queue()?;
} }
RATE_LIMITER_EVENT => { RATE_LIMITER_EVENT => {
if let Some(rate_limiter) = &mut self.rate_limiter { if let Some(rate_limiter) = &mut self.rate_limiter {

11
virtio-ext/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
authors = ["The Cloud Hypervisor Authors"]
edition = "2021"
name = "virtio-ext"
version = "0.1.0"
description = "virtio helper traits"
[dependencies]
virtio-queue = "0.12.0"
vm-memory = "0.14.1"

32
virtio-ext/src/lib.rs Normal file
View File

@ -0,0 +1,32 @@
use std::ops::Deref;
use virtio_queue::{DescriptorChain, Error, Queue, QueueT};
use vm_memory::GuestMemory;
pub trait QueueExt {
fn pop_desc_chain_with_notification<M>(
&mut self,
mem: M,
) -> Result<Option<DescriptorChain<M>>, Error>
where
// TODO: remove Sized bound once `Queue::enable_notification<M>` add `?Sized` for `M`.
M: Clone + Deref<Target: GuestMemory + Sized>;
}
impl QueueExt for Queue {
fn pop_desc_chain_with_notification<M>(
&mut self,
mem: M,
) -> Result<Option<DescriptorChain<M>>, Error>
where
M: Clone + Deref<Target: GuestMemory + Sized>,
{
if let Some(dc) = self.pop_descriptor_chain(mem.clone()) {
return Ok(Some(dc));
}
if self.enable_notification(mem.deref())? {
return Ok(self.pop_descriptor_chain(mem));
}
Ok(None)
}
}