virtio-devices: Add rate limiter for the RX queue of virtio-net

Fixes: #1286

Signed-off-by: Bo Chen <chen.bo@intel.com>
This commit is contained in:
Bo Chen 2021-03-25 14:17:05 -07:00 committed by Sebastien Boeuf
parent 5d8de62362
commit 32ad4982dd
4 changed files with 97 additions and 11 deletions

View File

@ -1,13 +1,16 @@
# I/O Throttling # I/O Throttling
Cloud Hypervisor now supports I/O throttling on virtio-block Cloud Hypervisor now supports I/O throttling on virtio-block and virtio-net
devices. This support is based on the [`rate-limiter` module](https://github.com/firecracker-microvm/firecracker/tree/master/src/rate_limiter) devices. This support is based on the [`rate-limiter` module](https://github.com/firecracker-microvm/firecracker/tree/master/src/rate_limiter)
from Firecracker. This document explains the user interface of this from Firecracker. This document explains the user interface of this
feature, and highlights some internal implementations that can help users feature, and highlights some internal implementations that can help users
better understand the expected behavior of I/O throttling in practice. better understand the expected behavior of I/O throttling in practice.
Cloud Hypervisor allows to limit both the I/O bandwidth (e.g. bytes/s) Cloud Hypervisor allows to limit both the I/O bandwidth (e.g. bytes/s)
and I/O operations (ops/s) independently. To limit the I/O bandwidth, it and I/O operations (ops/s) independently. For virtio-net devices, while
sharing the same "rate limit" from user inputs (on both bandwidth and
operations), the RX and TX queues are throttled independently.
To limit the I/O bandwidth, Cloud Hypervisor
provides three user options, i.e., `bw_size` (bytes), `bw_one_time_burst` provides three user options, i.e., `bw_size` (bytes), `bw_one_time_burst`
(bytes), and `bw_refill_time` (ms). Both `bw_size` and `bw_refill_time` (bytes), and `bw_refill_time` (ms). Both `bw_size` and `bw_refill_time`
are required, while `bw_one_time_burst` is optional. are required, while `bw_one_time_burst` is optional.
@ -29,8 +32,8 @@ empty, it will stop I/O operations for a fixed amount of time
(`cool_down_time`). The `cool_down_time` now is fixed at `100 ms`, it (`cool_down_time`). The `cool_down_time` now is fixed at `100 ms`, it
can have big implications to the actual rate limit (which can be a lot can have big implications to the actual rate limit (which can be a lot
different the expected "refill-rate" derived from user inputs). For different the expected "refill-rate" derived from user inputs). For
example, to have a 1000 IOPS limit, users should be able to provide example, to have a 1000 IOPS limit on a virtio-blk device, users should
either of the following two options: be able to provide either of the following two options:
`ops_size=1000,ops_refill_time=1000` or `ops_size=1000,ops_refill_time=1000` or
`ops_size=10,ops_refill_time=10`. However, the actual IOPS limits are `ops_size=10,ops_refill_time=10`. However, the actual IOPS limits are
likely to be ~1000 IOPS and ~100 IOPS respectively. The reason is the likely to be ~1000 IOPS and ~100 IOPS respectively. The reason is the

View File

@ -134,9 +134,18 @@ impl RxVirtio {
mem: &GuestMemoryMmap, mem: &GuestMemoryMmap,
tap: &mut Tap, tap: &mut Tap,
queue: &mut Queue, queue: &mut Queue,
rate_limiter: &mut Option<RateLimiter>,
) -> Result<bool, NetQueuePairError> { ) -> Result<bool, NetQueuePairError> {
let mut exhausted_descs = true; let mut exhausted_descs = true;
let mut rate_limit_reached = false;
while let Some(avail_desc) = queue.iter(&mem).next() { while let Some(avail_desc) = queue.iter(&mem).next() {
if rate_limit_reached {
exhausted_descs = false;
queue.go_to_previous_position();
break;
}
let head_index = avail_desc.index; let head_index = avail_desc.index;
let num_buffers_addr = mem.checked_offset(avail_desc.addr, 10).unwrap(); let num_buffers_addr = mem.checked_offset(avail_desc.addr, 10).unwrap();
let mut next_desc = Some(avail_desc); let mut next_desc = Some(avail_desc);
@ -195,6 +204,15 @@ impl RxVirtio {
queue.add_used(&mem, head_index, len); queue.add_used(&mem, head_index, len);
queue.update_avail_event(&mem); queue.update_avail_event(&mem);
// For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and
// RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor
// chain go-through even if it was over the rate limit, and simply stop
// processing oncoming `avail_desc` if any.
if let Some(rate_limiter) = rate_limiter {
rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
|| !rate_limiter.consume(len as u64, TokenType::Bytes);
}
} }
Ok(exhausted_descs) Ok(exhausted_descs)
@ -234,6 +252,8 @@ pub struct NetQueuePair {
pub rx_tap_listening: bool, pub rx_tap_listening: bool,
pub counters: NetCounters, pub counters: NetCounters,
pub tap_event_id: u16, pub tap_event_id: u16,
pub rx_desc_avail: bool,
pub rx_rate_limiter: Option<RateLimiter>,
pub tx_rate_limiter: Option<RateLimiter>, pub tx_rate_limiter: Option<RateLimiter>,
} }
@ -267,11 +287,21 @@ impl NetQueuePair {
.ok_or(NetQueuePairError::NoMemoryConfigured) .ok_or(NetQueuePairError::NoMemoryConfigured)
.map(|m| m.memory())?; .map(|m| m.memory())?;
if self self.rx_desc_avail = !self.rx.process_desc_chain(
.rx &mem,
.process_desc_chain(&mem, &mut self.tap, &mut queue)? &mut self.tap,
&& self.rx_tap_listening &mut queue,
{ &mut self.rx_rate_limiter,
)?;
let rate_limit_reached = self
.rx_rate_limiter
.as_ref()
.map_or(false, |r| r.is_blocked());
// Stop listening on the `RX_TAP_EVENT` when:
// 1) there is no available describles, or
// 2) the RX rate limit is reached.
if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) {
unregister_listener( unregister_listener(
self.epoll_fd.unwrap(), self.epoll_fd.unwrap(),
self.tap.as_raw_fd(), self.tap.as_raw_fd(),

View File

@ -111,6 +111,8 @@ impl VhostUserNetThread {
epoll_fd: None, epoll_fd: None,
counters: NetCounters::default(), counters: NetCounters::default(),
tap_event_id: 2, tap_event_id: 2,
rx_desc_avail: false,
rx_rate_limiter: None,
tx_rate_limiter: None, tx_rate_limiter: None,
}, },
}) })

View File

@ -46,8 +46,10 @@ pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
// A frame is available for reading from the tap device to receive in the guest. // A frame is available for reading from the tap device to receive in the guest.
pub const RX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; pub const RX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
// New 'wake up' event from the rx rate limiter
pub const RX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4;
// New 'wake up' event from the tx rate limiter // New 'wake up' event from the tx rate limiter
pub const TX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4; pub const TX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 5;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -90,7 +92,16 @@ impl NetEpollHandler {
error!("Failed to get rx queue event: {:?}", e); error!("Failed to get rx queue event: {:?}", e);
} }
if !self.net.rx_tap_listening { self.net.rx_desc_avail = true;
let rate_limit_reached = self
.net
.rx_rate_limiter
.as_ref()
.map_or(false, |r| r.is_blocked());
// Start to listen on RX_TAP_EVENT only when the rate limit is not reached
if !self.net.rx_tap_listening && !rate_limit_reached {
net_util::register_listener( net_util::register_listener(
self.net.epoll_fd.unwrap(), self.net.epoll_fd.unwrap(),
self.net.tap.as_raw_fd(), self.net.tap.as_raw_fd(),
@ -161,6 +172,9 @@ impl NetEpollHandler {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?; helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?;
helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?; helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?;
if let Some(rate_limiter) = &self.net.rx_rate_limiter {
helper.add_event(rate_limiter.as_raw_fd(), RX_RATE_LIMITER_EVENT)?;
}
if let Some(rate_limiter) = &self.net.tx_rate_limiter { if let Some(rate_limiter) = &self.net.tx_rate_limiter {
helper.add_event(rate_limiter.as_raw_fd(), TX_RATE_LIMITER_EVENT)?; helper.add_event(rate_limiter.as_raw_fd(), TX_RATE_LIMITER_EVENT)?;
} }
@ -209,6 +223,35 @@ impl EpollHelperHandler for NetEpollHandler {
return true; return true;
} }
} }
RX_RATE_LIMITER_EVENT => {
if let Some(rate_limiter) = &mut self.net.rx_rate_limiter {
// Upon rate limiter event, call the rate limiter handler and register the
// TAP fd for further processing if some RX buffers are available
match rate_limiter.event_handler() {
Ok(_) => {
if !self.net.rx_tap_listening && self.net.rx_desc_avail {
if let Err(e) = net_util::register_listener(
self.net.epoll_fd.unwrap(),
self.net.tap.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(self.net.tap_event_id),
) {
error!("Error register_listener with `RX_RATE_LIMITER_EVENT`: {:?}", e);
return true;
}
self.net.rx_tap_listening = true;
}
}
Err(e) => {
error!("Error from 'rate_limiter.event_handler()': {:?}", e);
return true;
}
}
} else {
error!("Unexpected RX_RATE_LIMITER_EVENT");
return true;
}
}
TX_RATE_LIMITER_EVENT => { TX_RATE_LIMITER_EVENT => {
if let Some(rate_limiter) = &mut self.net.tx_rate_limiter { if let Some(rate_limiter) = &mut self.net.tx_rate_limiter {
// Upon rate limiter event, call the rate limiter handler // Upon rate limiter event, call the rate limiter handler
@ -527,6 +570,12 @@ impl VirtioDevice for Net {
ActivateError::BadActivate ActivateError::BadActivate
})?; })?;
let rx_rate_limiter: Option<rate_limiter::RateLimiter> = self
.rate_limiter_config
.map(RateLimiterConfig::try_into)
.transpose()
.map_err(ActivateError::CreateRateLimiter)?;
let tx_rate_limiter: Option<rate_limiter::RateLimiter> = self let tx_rate_limiter: Option<rate_limiter::RateLimiter> = self
.rate_limiter_config .rate_limiter_config
.map(RateLimiterConfig::try_into) .map(RateLimiterConfig::try_into)
@ -543,6 +592,8 @@ impl VirtioDevice for Net {
rx_tap_listening, rx_tap_listening,
counters: self.counters.clone(), counters: self.counters.clone(),
tap_event_id: RX_TAP_EVENT, tap_event_id: RX_TAP_EVENT,
rx_desc_avail: false,
rx_rate_limiter,
tx_rate_limiter, tx_rate_limiter,
}, },
queue_pair, queue_pair,