virtio-devices: Improve queue handling with pop_descriptor_chain()

Using pop_descriptor_chain() is much more appropriate than iter() since
it recreates the iterator every time, avoiding the queue to be borrowed
and allowing the virtio-net implementation to match all the other ones.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2022-07-08 14:30:50 +02:00 committed by Rob Bradford
parent a423bf13ad
commit 87f57f7c1e
12 changed files with 271 additions and 302 deletions

View File

@ -13,7 +13,7 @@ use virtio_bindings::bindings::virtio_net::{
VIRTIO_NET_F_GUEST_ECN, VIRTIO_NET_F_GUEST_TSO4, VIRTIO_NET_F_GUEST_TSO6,
VIRTIO_NET_F_GUEST_UFO, VIRTIO_NET_OK,
};
use virtio_queue::{Queue, QueueOwnedT, QueueT};
use virtio_queue::{Queue, QueueT};
use vm_memory::{ByteValued, Bytes, GuestMemoryError};
use vm_virtio::{AccessPlatform, Translatable};
@ -63,91 +63,83 @@ impl CtrlQueue {
access_platform: Option<&Arc<dyn AccessPlatform>>,
) -> Result<()> {
let mut used_desc_heads = Vec::new();
loop {
for mut desc_chain in queue.iter(mem).map_err(Error::QueueIterator)? {
let ctrl_desc = desc_chain.next().ok_or(Error::NoControlHeaderDescriptor)?;
while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) {
let ctrl_desc = desc_chain.next().ok_or(Error::NoControlHeaderDescriptor)?;
let ctrl_hdr: ControlHeader = desc_chain
.memory()
.read_obj(
ctrl_desc
.addr()
.translate_gva(access_platform, ctrl_desc.len() as usize),
)
.map_err(Error::GuestMemory)?;
let data_desc = desc_chain.next().ok_or(Error::NoDataDescriptor)?;
let ctrl_hdr: ControlHeader = desc_chain
.memory()
.read_obj(
ctrl_desc
.addr()
.translate_gva(access_platform, ctrl_desc.len() as usize),
)
.map_err(Error::GuestMemory)?;
let data_desc = desc_chain.next().ok_or(Error::NoDataDescriptor)?;
let data_desc_addr = data_desc
.addr()
.translate_gva(access_platform, data_desc.len() as usize);
let data_desc_addr = data_desc
.addr()
.translate_gva(access_platform, data_desc.len() as usize);
let status_desc = desc_chain.next().ok_or(Error::NoStatusDescriptor)?;
let status_desc = desc_chain.next().ok_or(Error::NoStatusDescriptor)?;
let ok = match u32::from(ctrl_hdr.class) {
VIRTIO_NET_CTRL_MQ => {
let queue_pairs = desc_chain
.memory()
.read_obj::<u16>(data_desc_addr)
.map_err(Error::GuestMemory)?;
if u32::from(ctrl_hdr.cmd) != VIRTIO_NET_CTRL_MQ_VQ_PAIRS_SET {
warn!("Unsupported command: {}", ctrl_hdr.cmd);
false
} else if (queue_pairs < VIRTIO_NET_CTRL_MQ_VQ_PAIRS_MIN as u16)
|| (queue_pairs > VIRTIO_NET_CTRL_MQ_VQ_PAIRS_MAX as u16)
{
warn!("Number of MQ pairs out of range: {}", queue_pairs);
false
} else {
info!("Number of MQ pairs requested: {}", queue_pairs);
true
}
}
VIRTIO_NET_CTRL_GUEST_OFFLOADS => {
let features = desc_chain
.memory()
.read_obj::<u64>(data_desc_addr)
.map_err(Error::GuestMemory)?;
if u32::from(ctrl_hdr.cmd) != VIRTIO_NET_CTRL_GUEST_OFFLOADS_SET {
warn!("Unsupported command: {}", ctrl_hdr.cmd);
false
} else {
let mut ok = true;
for tap in self.taps.iter_mut() {
info!("Reprogramming tap offload with features: {}", features);
tap.set_offload(virtio_features_to_tap_offload(features))
.map_err(|e| {
error!("Error programming tap offload: {:?}", e);
ok = false
})
.ok();
}
ok
}
}
_ => {
warn!("Unsupported command {:?}", ctrl_hdr);
let ok = match u32::from(ctrl_hdr.class) {
VIRTIO_NET_CTRL_MQ => {
let queue_pairs = desc_chain
.memory()
.read_obj::<u16>(data_desc_addr)
.map_err(Error::GuestMemory)?;
if u32::from(ctrl_hdr.cmd) != VIRTIO_NET_CTRL_MQ_VQ_PAIRS_SET {
warn!("Unsupported command: {}", ctrl_hdr.cmd);
false
} else if (queue_pairs < VIRTIO_NET_CTRL_MQ_VQ_PAIRS_MIN as u16)
|| (queue_pairs > VIRTIO_NET_CTRL_MQ_VQ_PAIRS_MAX as u16)
{
warn!("Number of MQ pairs out of range: {}", queue_pairs);
false
} else {
info!("Number of MQ pairs requested: {}", queue_pairs);
true
}
};
}
VIRTIO_NET_CTRL_GUEST_OFFLOADS => {
let features = desc_chain
.memory()
.read_obj::<u64>(data_desc_addr)
.map_err(Error::GuestMemory)?;
if u32::from(ctrl_hdr.cmd) != VIRTIO_NET_CTRL_GUEST_OFFLOADS_SET {
warn!("Unsupported command: {}", ctrl_hdr.cmd);
false
} else {
let mut ok = true;
for tap in self.taps.iter_mut() {
info!("Reprogramming tap offload with features: {}", features);
tap.set_offload(virtio_features_to_tap_offload(features))
.map_err(|e| {
error!("Error programming tap offload: {:?}", e);
ok = false
})
.ok();
}
ok
}
}
_ => {
warn!("Unsupported command {:?}", ctrl_hdr);
false
}
};
desc_chain
.memory()
.write_obj(
if ok { VIRTIO_NET_OK } else { VIRTIO_NET_ERR } as u8,
status_desc
.addr()
.translate_gva(access_platform, status_desc.len() as usize),
)
.map_err(Error::GuestMemory)?;
let len = ctrl_desc.len() + data_desc.len() + status_desc.len();
used_desc_heads.push((desc_chain.head_index(), len));
}
for (desc_index, len) in used_desc_heads.iter() {
queue
.add_used(mem, *desc_index, *len)
.map_err(Error::QueueAddUsed)?;
}
desc_chain
.memory()
.write_obj(
if ok { VIRTIO_NET_OK } else { VIRTIO_NET_ERR } as u8,
status_desc
.addr()
.translate_gva(access_platform, status_desc.len() as usize),
)
.map_err(Error::GuestMemory)?;
let len = ctrl_desc.len() + data_desc.len() + status_desc.len();
used_desc_heads.push((desc_chain.head_index(), len));
if !queue
.enable_notification(mem)
@ -157,6 +149,12 @@ impl CtrlQueue {
}
}
for (desc_index, len) in used_desc_heads.iter() {
queue
.add_used(mem, *desc_index, *len)
.map_err(Error::QueueAddUsed)?;
}
Ok(())
}
}

View File

@ -45,94 +45,84 @@ impl TxVirtio {
let mut retry_write = false;
let mut rate_limit_reached = false;
loop {
let used_desc_head: (u16, u32);
let mut avail_iter = queue
.iter(mem)
.map_err(NetQueuePairError::QueueIteratorFailed)?;
if let Some(mut desc_chain) = avail_iter.next() {
if rate_limit_reached {
avail_iter.go_to_previous_position();
break;
}
let mut next_desc = desc_chain.next();
let mut iovecs = Vec::new();
while let Some(desc) = next_desc {
let desc_addr = desc
.addr()
.translate_gva(access_platform, desc.len() as usize);
if !desc.is_write_only() && desc.len() > 0 {
let buf = desc_chain
.memory()
.get_slice(desc_addr, desc.len() as usize)
.map_err(NetQueuePairError::GuestMemory)?
.as_ptr();
let iovec = libc::iovec {
iov_base: buf as *mut libc::c_void,
iov_len: desc.len() as libc::size_t,
};
iovecs.push(iovec);
} else {
error!(
"Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
desc_addr.0,
desc.len(),
desc.is_write_only()
);
return Err(NetQueuePairError::DescriptorChainInvalid);
}
next_desc = desc_chain.next();
}
let len = if !iovecs.is_empty() {
let result = unsafe {
libc::writev(
tap.as_raw_fd() as libc::c_int,
iovecs.as_ptr() as *const libc::iovec,
iovecs.len() as libc::c_int,
)
};
if result < 0 {
let e = std::io::Error::last_os_error();
/* EAGAIN */
if e.kind() == std::io::ErrorKind::WouldBlock {
avail_iter.go_to_previous_position();
retry_write = true;
break;
}
error!("net: tx: failed writing to tap: {}", e);
return Err(NetQueuePairError::WriteTap(e));
}
self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
self.counter_frames += Wrapping(1);
result as u32
} else {
0
};
used_desc_head = (desc_chain.head_index(), len);
// For the sake of simplicity (similar to the RX rate limiting), we always
// let the 'last' descriptor chain go-through even if it was over the rate
// limit, and simply stop processing oncoming `avail_desc` if any.
if let Some(rate_limiter) = rate_limiter {
rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
|| !rate_limiter.consume(len as u64, TokenType::Bytes);
}
} else {
while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) {
if rate_limit_reached {
queue.go_to_previous_position();
break;
}
let mut next_desc = desc_chain.next();
let mut iovecs = Vec::new();
while let Some(desc) = next_desc {
let desc_addr = desc
.addr()
.translate_gva(access_platform, desc.len() as usize);
if !desc.is_write_only() && desc.len() > 0 {
let buf = desc_chain
.memory()
.get_slice(desc_addr, desc.len() as usize)
.map_err(NetQueuePairError::GuestMemory)?
.as_ptr();
let iovec = libc::iovec {
iov_base: buf as *mut libc::c_void,
iov_len: desc.len() as libc::size_t,
};
iovecs.push(iovec);
} else {
error!(
"Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
desc_addr.0,
desc.len(),
desc.is_write_only()
);
return Err(NetQueuePairError::DescriptorChainInvalid);
}
next_desc = desc_chain.next();
}
let len = if !iovecs.is_empty() {
let result = unsafe {
libc::writev(
tap.as_raw_fd() as libc::c_int,
iovecs.as_ptr() as *const libc::iovec,
iovecs.len() as libc::c_int,
)
};
if result < 0 {
let e = std::io::Error::last_os_error();
/* EAGAIN */
if e.kind() == std::io::ErrorKind::WouldBlock {
queue.go_to_previous_position();
retry_write = true;
break;
}
error!("net: tx: failed writing to tap: {}", e);
return Err(NetQueuePairError::WriteTap(e));
}
self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
self.counter_frames += Wrapping(1);
result as u32
} else {
0
};
// For the sake of simplicity (similar to the RX rate limiting), we always
// let the 'last' descriptor chain go-through even if it was over the rate
// limit, and simply stop processing oncoming `avail_desc` if any.
if let Some(rate_limiter) = rate_limiter {
rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
|| !rate_limiter.consume(len as u64, TokenType::Bytes);
}
queue
.add_used(mem, used_desc_head.0, used_desc_head.1)
.add_used(mem, desc_chain.head_index(), len)
.map_err(NetQueuePairError::QueueAddUsed)?;
if !queue
.enable_notification(mem)
.map_err(NetQueuePairError::QueueEnableNotification)?
@ -176,115 +166,105 @@ impl RxVirtio {
let mut exhausted_descs = true;
let mut rate_limit_reached = false;
loop {
let used_desc_head: (u16, u32);
let mut avail_iter = queue
.iter(mem)
.map_err(NetQueuePairError::QueueIteratorFailed)?;
if let Some(mut desc_chain) = avail_iter.next() {
if rate_limit_reached {
exhausted_descs = false;
avail_iter.go_to_previous_position();
break;
}
let desc = desc_chain
.next()
.ok_or(NetQueuePairError::DescriptorChainTooShort)?;
let num_buffers_addr = desc_chain
.memory()
.checked_offset(
desc.addr()
.translate_gva(access_platform, desc.len() as usize),
10,
)
.unwrap();
let mut next_desc = Some(desc);
let mut iovecs = Vec::new();
while let Some(desc) = next_desc {
let desc_addr = desc
.addr()
.translate_gva(access_platform, desc.len() as usize);
if desc.is_write_only() && desc.len() > 0 {
let buf = desc_chain
.memory()
.get_slice(desc_addr, desc.len() as usize)
.map_err(NetQueuePairError::GuestMemory)?
.as_ptr();
let iovec = libc::iovec {
iov_base: buf as *mut libc::c_void,
iov_len: desc.len() as libc::size_t,
};
iovecs.push(iovec);
} else {
error!(
"Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
desc_addr.0,
desc.len(),
desc.is_write_only()
);
return Err(NetQueuePairError::DescriptorChainInvalid);
}
next_desc = desc_chain.next();
}
let len = if !iovecs.is_empty() {
let result = unsafe {
libc::readv(
tap.as_raw_fd() as libc::c_int,
iovecs.as_ptr() as *const libc::iovec,
iovecs.len() as libc::c_int,
)
};
if result < 0 {
let e = std::io::Error::last_os_error();
exhausted_descs = false;
avail_iter.go_to_previous_position();
/* EAGAIN */
if e.kind() == std::io::ErrorKind::WouldBlock {
break;
}
error!("net: rx: failed reading from tap: {}", e);
return Err(NetQueuePairError::ReadTap(e));
}
// Write num_buffers to guest memory. We simply write 1 as we
// never spread the frame over more than one descriptor chain.
desc_chain
.memory()
.write_obj(1u16, num_buffers_addr)
.map_err(NetQueuePairError::GuestMemory)?;
self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
self.counter_frames += Wrapping(1);
result as u32
} else {
0
};
used_desc_head = (desc_chain.head_index(), len);
// For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and
// RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor
// chain go-through even if it was over the rate limit, and simply stop
// processing oncoming `avail_desc` if any.
if let Some(rate_limiter) = rate_limiter {
rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
|| !rate_limiter.consume(len as u64, TokenType::Bytes);
}
} else {
while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) {
if rate_limit_reached {
exhausted_descs = false;
queue.go_to_previous_position();
break;
}
let desc = desc_chain
.next()
.ok_or(NetQueuePairError::DescriptorChainTooShort)?;
let num_buffers_addr = desc_chain
.memory()
.checked_offset(
desc.addr()
.translate_gva(access_platform, desc.len() as usize),
10,
)
.unwrap();
let mut next_desc = Some(desc);
let mut iovecs = Vec::new();
while let Some(desc) = next_desc {
let desc_addr = desc
.addr()
.translate_gva(access_platform, desc.len() as usize);
if desc.is_write_only() && desc.len() > 0 {
let buf = desc_chain
.memory()
.get_slice(desc_addr, desc.len() as usize)
.map_err(NetQueuePairError::GuestMemory)?
.as_ptr();
let iovec = libc::iovec {
iov_base: buf as *mut libc::c_void,
iov_len: desc.len() as libc::size_t,
};
iovecs.push(iovec);
} else {
error!(
"Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
desc_addr.0,
desc.len(),
desc.is_write_only()
);
return Err(NetQueuePairError::DescriptorChainInvalid);
}
next_desc = desc_chain.next();
}
let len = if !iovecs.is_empty() {
let result = unsafe {
libc::readv(
tap.as_raw_fd() as libc::c_int,
iovecs.as_ptr() as *const libc::iovec,
iovecs.len() as libc::c_int,
)
};
if result < 0 {
let e = std::io::Error::last_os_error();
exhausted_descs = false;
queue.go_to_previous_position();
/* EAGAIN */
if e.kind() == std::io::ErrorKind::WouldBlock {
break;
}
error!("net: rx: failed reading from tap: {}", e);
return Err(NetQueuePairError::ReadTap(e));
}
// Write num_buffers to guest memory. We simply write 1 as we
// never spread the frame over more than one descriptor chain.
desc_chain
.memory()
.write_obj(1u16, num_buffers_addr)
.map_err(NetQueuePairError::GuestMemory)?;
self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
self.counter_frames += Wrapping(1);
result as u32
} else {
0
};
// For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and
// RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor
// chain go-through even if it was over the rate limit, and simply stop
// processing oncoming `avail_desc` if any.
if let Some(rate_limiter) = rate_limiter {
rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
|| !rate_limiter.consume(len as u64, TokenType::Bytes);
}
queue
.add_used(mem, used_desc_head.0, used_desc_head.1)
.add_used(mem, desc_chain.head_index(), len)
.map_err(NetQueuePairError::QueueAddUsed)?;
if !queue
.enable_notification(mem)
.map_err(NetQueuePairError::QueueEnableNotification)?

View File

@ -35,7 +35,7 @@ use vhost::vhost_user::Listener;
use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT};
use virtio_bindings::bindings::virtio_blk::*;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_queue::{QueueOwnedT, QueueT};
use virtio_queue::QueueT;
use vm_memory::GuestAddressSpace;
use vm_memory::{bitmap::AtomicBitmap, ByteValued, Bytes, GuestMemoryAtomic};
use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
@ -126,7 +126,10 @@ impl VhostUserBlkThread {
) -> bool {
let mut used_desc_heads = Vec::new();
for mut desc_chain in vring.get_queue_mut().iter(self.mem.memory()).unwrap() {
while let Some(mut desc_chain) = vring
.get_queue_mut()
.pop_descriptor_chain(self.mem.memory())
{
debug!("got an element in the queue");
let len;
match Request::parse(&mut desc_chain, None) {

View File

@ -247,11 +247,9 @@ impl BalloonEpollHandler {
}
fn process_queue(&mut self, queue_index: usize) -> result::Result<(), Error> {
let mem = self.mem.memory();
let mut used_descs = Vec::new();
for mut desc_chain in self.queues[queue_index]
.iter(mem)
.map_err(Error::QueueIterator)?
while let Some(mut desc_chain) =
self.queues[queue_index].pop_descriptor_chain(self.mem.memory())
{
let desc = desc_chain.next().ok_or(Error::DescriptorChainTooShort)?;

View File

@ -108,10 +108,7 @@ impl BlockEpollHandler {
let mut used_desc_heads = Vec::new();
let mut used_count = 0;
let mut avail_iter = queue
.iter(self.mem.memory())
.map_err(Error::QueueIterator)?;
for mut desc_chain in &mut avail_iter {
while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
.map_err(Error::RequestParsing)?;
@ -121,7 +118,7 @@ impl BlockEpollHandler {
if !rate_limiter.consume(1, TokenType::Ops) {
// Stop processing the queue and return this descriptor chain to the
// avail ring, for later processing.
avail_iter.go_to_previous_position();
queue.go_to_previous_position();
break;
}
// Exercise the rate limiter only if this request is of data transfer type.
@ -140,7 +137,7 @@ impl BlockEpollHandler {
rate_limiter.manual_replenish(1, TokenType::Ops);
// Stop processing the queue and return this descriptor chain to the
// avail ring, for later processing.
avail_iter.go_to_previous_position();
queue.go_to_previous_position();
break;
}
};

View File

@ -144,8 +144,7 @@ impl ConsoleEpollHandler {
return false;
}
let mut avail_iter = recv_queue.iter(self.mem.memory()).unwrap();
for mut desc_chain in &mut avail_iter {
while let Some(mut desc_chain) = recv_queue.pop_descriptor_chain(self.mem.memory()) {
let desc = desc_chain.next().unwrap();
let len = cmp::min(desc.len() as u32, in_buffer.len() as u32);
let source_slice = in_buffer.drain(..len as usize).collect::<Vec<u8>>();
@ -156,7 +155,7 @@ impl ConsoleEpollHandler {
.translate_gva(self.access_platform.as_ref(), desc.len() as usize),
) {
error!("Failed to write slice: {:?}", e);
avail_iter.go_to_previous_position();
recv_queue.go_to_previous_position();
break;
}
@ -188,7 +187,7 @@ impl ConsoleEpollHandler {
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize];
let mut used_count = 0;
for mut desc_chain in trans_queue.iter(self.mem.memory()).unwrap() {
while let Some(mut desc_chain) = trans_queue.pop_descriptor_chain(self.mem.memory()) {
let desc = desc_chain.next().unwrap();
if let Some(ref mut out) = self.endpoint.out_file() {
let _ = desc_chain.memory().write_to(

View File

@ -24,7 +24,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier, Mutex, RwLock};
use versionize::{VersionMap, Versionize, VersionizeResult};
use versionize_derive::Versionize;
use virtio_queue::{DescriptorChain, Queue, QueueOwnedT, QueueT};
use virtio_queue::{DescriptorChain, Queue, QueueT};
use vm_device::dma_mapping::ExternalDmaMapping;
use vm_memory::{
Address, ByteValued, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic,
@ -676,7 +676,7 @@ impl IommuEpollHandler {
fn request_queue(&mut self) -> bool {
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize];
let mut used_count = 0;
for mut desc_chain in self.queues[0].iter(self.mem.memory()).unwrap() {
while let Some(mut desc_chain) = self.queues[0].pop_descriptor_chain(self.mem.memory()) {
let len = match Request::parse(
&mut desc_chain,
&self.mapping,

View File

@ -35,7 +35,7 @@ use std::sync::mpsc;
use std::sync::{Arc, Barrier, Mutex};
use versionize::{VersionMap, Versionize, VersionizeResult};
use versionize_derive::Versionize;
use virtio_queue::{DescriptorChain, Queue, QueueOwnedT, QueueT};
use virtio_queue::{DescriptorChain, Queue, QueueT};
use vm_device::dma_mapping::ExternalDmaMapping;
use vm_memory::{
Address, ByteValued, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic,
@ -668,7 +668,7 @@ impl MemEpollHandler {
let mut request_list = Vec::new();
let mut used_count = 0;
for mut desc_chain in self.queue.iter(self.mem.memory()).unwrap() {
while let Some(mut desc_chain) = self.queue.pop_descriptor_chain(self.mem.memory()) {
request_list.push((
desc_chain.head_index(),
Request::parse(&mut desc_chain),

View File

@ -28,7 +28,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Barrier};
use versionize::{VersionMap, Versionize, VersionizeResult};
use versionize_derive::Versionize;
use virtio_queue::{DescriptorChain, Queue, QueueOwnedT, QueueT};
use virtio_queue::{DescriptorChain, Queue, QueueT};
use vm_memory::{
Address, ByteValued, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic,
GuestMemoryError, GuestMemoryLoadGuard,
@ -180,7 +180,7 @@ impl PmemEpollHandler {
fn process_queue(&mut self) -> bool {
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize];
let mut used_count = 0;
for mut desc_chain in self.queue.iter(self.mem.memory()).unwrap() {
while let Some(mut desc_chain) = self.queue.pop_descriptor_chain(self.mem.memory()) {
let len = match Request::parse(&mut desc_chain, self.access_platform.as_ref()) {
Ok(ref req) if (req.type_ == RequestType::Flush) => {
let status_code = match self.disk.sync_all() {

View File

@ -22,7 +22,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Barrier};
use versionize::{VersionMap, Versionize, VersionizeResult};
use versionize_derive::Versionize;
use virtio_queue::{Queue, QueueOwnedT, QueueT};
use virtio_queue::{Queue, QueueT};
use vm_memory::{Bytes, GuestAddressSpace, GuestMemoryAtomic};
use vm_migration::VersionMapped;
use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
@ -52,7 +52,7 @@ impl RngEpollHandler {
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize];
let mut used_count = 0;
for mut desc_chain in queue.iter(self.mem.memory()).unwrap() {
while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
let desc = desc_chain.next().unwrap();
let mut len = 0;

View File

@ -128,10 +128,7 @@ where
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize];
let mut used_count = 0;
let mut avail_iter = self.queues[0]
.iter(self.mem.memory())
.map_err(DeviceError::QueueIterator)?;
for mut desc_chain in &mut avail_iter {
while let Some(mut desc_chain) = self.queues[0].pop_descriptor_chain(self.mem.memory()) {
let used_len = match VsockPacket::from_rx_virtq_head(
&mut desc_chain,
self.access_platform.as_ref(),
@ -142,7 +139,7 @@ where
} else {
// We are using a consuming iterator over the virtio buffers, so, if we can't
// fill in this buffer, we'll need to undo the last iterator step.
avail_iter.go_to_previous_position();
self.queues[0].go_to_previous_position();
break;
}
}
@ -179,10 +176,7 @@ where
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize];
let mut used_count = 0;
let mut avail_iter = self.queues[1]
.iter(self.mem.memory())
.map_err(DeviceError::QueueIterator)?;
for mut desc_chain in &mut avail_iter {
while let Some(mut desc_chain) = self.queues[1].pop_descriptor_chain(self.mem.memory()) {
let pkt = match VsockPacket::from_tx_virtq_head(
&mut desc_chain,
self.access_platform.as_ref(),
@ -197,7 +191,7 @@ where
};
if self.backend.write().unwrap().send_pkt(&pkt).is_err() {
avail_iter.go_to_previous_position();
self.queues[1].go_to_previous_position();
break;
}

View File

@ -26,7 +26,7 @@ use std::sync::{Arc, Barrier, Mutex};
use std::time::Instant;
use versionize::{VersionMap, Versionize, VersionizeResult};
use versionize_derive::Versionize;
use virtio_queue::{Queue, QueueOwnedT, QueueT};
use virtio_queue::{Queue, QueueT};
use vm_memory::{Bytes, GuestAddressSpace, GuestMemoryAtomic};
use vm_migration::VersionMapped;
use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
@ -66,7 +66,7 @@ impl WatchdogEpollHandler {
let queue = &mut self.queue;
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize];
let mut used_count = 0;
for mut desc_chain in queue.iter(self.mem.memory()).unwrap() {
while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
let desc = desc_chain.next().unwrap();
let mut len = 0;