virtio-devices, vmm: Add rate limiter for the TX queue of virtio-net

Partially fixes: #1286

Signed-off-by: Bo Chen <chen.bo@intel.com>
This commit is contained in:
Bo Chen 2021-03-17 15:41:52 -07:00 committed by Sebastien Boeuf
parent bfa37f89c4
commit b176ddfe2a
9 changed files with 154 additions and 6 deletions

1
Cargo.lock generated
View File

@ -632,6 +632,7 @@ dependencies = [
"log 0.4.14",
"net_gen",
"pnet",
"rate_limiter",
"serde",
"serde_json",
"virtio-bindings",

View File

@ -8,6 +8,7 @@ epoll = ">=4.0.1"
libc = "0.2.92"
log = "0.4.14"
net_gen = { path = "../net_gen" }
rate_limiter = { path = "../rate_limiter" }
serde = "1.0.125"
virtio-bindings = "0.1.0"
vm-memory = { version = "0.5.0", features = ["backend-mmap", "backend-atomic"] }

View File

@ -14,6 +14,7 @@ extern crate libc;
#[macro_use]
extern crate log;
extern crate net_gen;
extern crate rate_limiter;
extern crate serde;
extern crate virtio_bindings;
extern crate vm_memory;

View File

@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
use super::{unregister_listener, vnet_hdr_len, Tap};
use rate_limiter::{RateLimiter, TokenType};
use std::io;
use std::num::Wrapping;
use std::os::unix::io::{AsRawFd, RawFd};
@ -36,11 +37,35 @@ impl TxVirtio {
mem: &GuestMemoryMmap,
tap: &mut Tap,
queue: &mut Queue,
rate_limiter: &mut Option<RateLimiter>,
) -> Result<(), NetQueuePairError> {
while let Some(avail_desc) = queue.iter(&mem).next() {
let head_index = avail_desc.index;
let mut next_desc = Some(avail_desc);
if let Some(rate_limiter) = rate_limiter {
if !rate_limiter.consume(1, TokenType::Ops) {
queue.go_to_previous_position();
break;
}
let mut bytes = Wrapping(0);
let mut tmp_next_desc = next_desc.clone();
while let Some(desc) = tmp_next_desc {
if !desc.is_write_only() {
bytes += Wrapping(desc.len as u64);
}
tmp_next_desc = desc.next_descriptor();
}
bytes -= Wrapping(vnet_hdr_len() as u64);
if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
// Revert the OPS consume().
rate_limiter.manual_replenish(1, TokenType::Ops);
queue.go_to_previous_position();
break;
}
}
let mut iovecs = Vec::new();
while let Some(desc) = next_desc {
if !desc.is_write_only() && desc.len > 0 {
@ -209,6 +234,7 @@ pub struct NetQueuePair {
pub rx_tap_listening: bool,
pub counters: NetCounters,
pub tap_event_id: u16,
pub tx_rate_limiter: Option<RateLimiter>,
}
impl NetQueuePair {
@ -220,7 +246,7 @@ impl NetQueuePair {
.map(|m| m.memory())?;
self.tx
.process_desc_chain(&mem, &mut self.tap, &mut queue)?;
.process_desc_chain(&mem, &mut self.tap, &mut queue, &mut self.tx_rate_limiter)?;
self.counters
.tx_bytes

View File

@ -111,6 +111,7 @@ impl VhostUserNetThread {
epoll_fd: None,
counters: NetCounters::default(),
tap_event_id: 2,
tx_rate_limiter: None,
},
})
}

View File

@ -12,7 +12,8 @@ use super::net_util::{
use super::Error as DeviceError;
use super::{
ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue,
VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST,
RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
EPOLL_HELPER_EVENT_LAST,
};
use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::VirtioInterrupt;
@ -21,7 +22,6 @@ use net_util::{
open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TapError, TxVirtio,
};
use seccomp::{SeccompAction, SeccompFilter};
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::num::Wrapping;
use std::os::unix::io::{AsRawFd, RawFd};
@ -30,6 +30,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::vec::Vec;
use std::{collections::HashMap, convert::TryInto};
use virtio_bindings::bindings::virtio_net::*;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use vm_memory::{ByteValued, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
@ -45,6 +46,8 @@ pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
// A frame is available for reading from the tap device to receive in the guest.
pub const RX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
// New 'wake up' event from the tx rate limiter
pub const TX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4;
#[derive(Debug)]
pub enum Error {
@ -122,7 +125,15 @@ impl NetEpollHandler {
error!("Failed to get tx queue event: {:?}", e);
}
self.process_tx()?;
let rate_limit_reached = self
.net
.tx_rate_limiter
.as_ref()
.map_or(false, |r| r.is_blocked());
if !rate_limit_reached {
self.process_tx()?;
}
Ok(())
}
@ -150,6 +161,9 @@ impl NetEpollHandler {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?;
helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?;
if let Some(rate_limiter) = &self.net.tx_rate_limiter {
helper.add_event(rate_limiter.as_raw_fd(), TX_RATE_LIMITER_EVENT)?;
}
// If there are some already available descriptors on the RX queue,
// then we can start the thread while listening onto the TAP.
@ -195,6 +209,28 @@ impl EpollHelperHandler for NetEpollHandler {
return true;
}
}
TX_RATE_LIMITER_EVENT => {
if let Some(rate_limiter) = &mut self.net.tx_rate_limiter {
// Upon rate limiter event, call the rate limiter handler
// and restart processing the queue.
match rate_limiter.event_handler() {
Ok(_) => {
self.driver_awake = true;
if let Err(e) = self.process_tx() {
error!("Error processing TX queue: {:?}", e);
return true;
}
}
Err(e) => {
error!("Error from 'rate_limiter.event_handler()': {:?}", e);
return true;
}
}
} else {
error!("Unexpected TX_RATE_LIMITER_EVENT");
return true;
}
}
_ => {
error!("Unknown event: {}", ev_type);
return true;
@ -212,6 +248,7 @@ pub struct Net {
ctrl_queue_epoll_thread: Option<thread::JoinHandle<()>>,
counters: NetCounters,
seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>,
}
#[derive(Serialize, Deserialize)]
@ -224,6 +261,7 @@ pub struct NetState {
impl Net {
/// Create a new virtio network device with the given TAP interface.
#[allow(clippy::too_many_arguments)]
pub fn new_with_tap(
id: String,
taps: Vec<Tap>,
@ -232,6 +270,7 @@ impl Net {
num_queues: usize,
queue_size: u16,
seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>,
) -> Result<Self> {
let mut avail_features = 1 << VIRTIO_NET_F_GUEST_CSUM
| 1 << VIRTIO_NET_F_CSUM
@ -271,6 +310,7 @@ impl Net {
ctrl_queue_epoll_thread: None,
counters: NetCounters::default(),
seccomp_action,
rate_limiter_config,
})
}
@ -288,6 +328,7 @@ impl Net {
num_queues: usize,
queue_size: u16,
seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>,
) -> Result<Self> {
let taps = open_tap(if_name, ip_addr, netmask, host_mac, num_queues / 2, None)
.map_err(Error::OpenTap)?;
@ -300,6 +341,7 @@ impl Net {
num_queues,
queue_size,
seccomp_action,
rate_limiter_config,
)
}
@ -310,6 +352,7 @@ impl Net {
iommu: bool,
queue_size: u16,
seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>,
) -> Result<Self> {
let mut taps: Vec<Tap> = Vec::new();
let num_queue_pairs = fds.len();
@ -327,6 +370,7 @@ impl Net {
num_queue_pairs * 2,
queue_size,
seccomp_action,
rate_limiter_config,
)
}
@ -483,6 +527,12 @@ impl VirtioDevice for Net {
ActivateError::BadActivate
})?;
let tx_rate_limiter: Option<rate_limiter::RateLimiter> = self
.rate_limiter_config
.map(RateLimiterConfig::try_into)
.transpose()
.map_err(ActivateError::CreateRateLimiter)?;
let mut handler = NetEpollHandler {
net: NetQueuePair {
mem: Some(mem.clone()),
@ -493,6 +543,7 @@ impl VirtioDevice for Net {
rx_tap_listening,
counters: self.counters.clone(),
tap_event_id: RX_TAP_EVENT,
tx_rate_limiter,
},
queue_pair,
queue_evt_pair,

View File

@ -228,6 +228,7 @@ fn virtio_net_thread_rules() -> Vec<SyscallRuleSet> {
allow_syscall(libc::SYS_readv),
allow_syscall(libc::SYS_rt_sigprocmask),
allow_syscall(libc::SYS_sigaltstack),
allow_syscall(libc::SYS_timerfd_settime),
allow_syscall(libc::SYS_write),
allow_syscall(libc::SYS_writev),
]

View File

@ -887,6 +887,8 @@ pub struct NetConfig {
pub id: Option<String>,
#[serde(default)]
pub fds: Option<Vec<i32>>,
#[serde(default)]
pub rate_limiter_config: Option<RateLimiterConfig>,
}
fn default_netconfig_tap() -> Option<String> {
@ -928,6 +930,7 @@ impl Default for NetConfig {
vhost_socket: None,
id: None,
fds: None,
rate_limiter_config: None,
}
}
}
@ -936,7 +939,9 @@ impl NetConfig {
pub const SYNTAX: &'static str = "Network parameters \
\"tap=<if_name>,ip=<ip_addr>,mask=<net_mask>,mac=<mac_addr>,fd=<fd1:fd2...>,iommu=on|off,\
num_queues=<number_of_queues>,queue_size=<size_of_each_queue>,\
vhost_user=<vhost_user_enable>,socket=<vhost_user_socket_path>,id=<device_id>\"";
vhost_user=<vhost_user_enable>,socket=<vhost_user_socket_path>,id=<device_id>,\
bw_size=<bytes>,bw_one_time_burst=<bytes>,bw_refill_time=<ms>,\
ops_size=<io_ops>,ops_one_time_burst=<io_ops>,ops_refill_time=<ms>\"";
pub fn parse(net: &str) -> Result<Self> {
let mut parser = OptionParser::new();
@ -953,7 +958,13 @@ impl NetConfig {
.add("vhost_user")
.add("socket")
.add("id")
.add("fd");
.add("fd")
.add("bw_size")
.add("bw_one_time_burst")
.add("bw_refill_time")
.add("ops_size")
.add("ops_one_time_burst")
.add("ops_refill_time");
parser.parse(net).map_err(Error::ParseNetwork)?;
let tap = parser.get("tap");
@ -995,6 +1006,57 @@ impl NetConfig {
.map_err(Error::ParseNetwork)?
.map(|v| v.0.iter().map(|e| *e as i32).collect());
let bw_size = parser
.convert("bw_size")
.map_err(Error::ParseDisk)?
.unwrap_or_default();
let bw_one_time_burst = parser
.convert("bw_one_time_burst")
.map_err(Error::ParseDisk)?
.unwrap_or_default();
let bw_refill_time = parser
.convert("bw_refill_time")
.map_err(Error::ParseDisk)?
.unwrap_or_default();
let ops_size = parser
.convert("ops_size")
.map_err(Error::ParseDisk)?
.unwrap_or_default();
let ops_one_time_burst = parser
.convert("ops_one_time_burst")
.map_err(Error::ParseDisk)?
.unwrap_or_default();
let ops_refill_time = parser
.convert("ops_refill_time")
.map_err(Error::ParseDisk)?
.unwrap_or_default();
let bw_tb_config = if bw_size != 0 && bw_refill_time != 0 {
Some(TokenBucketConfig {
size: bw_size,
one_time_burst: Some(bw_one_time_burst),
refill_time: bw_refill_time,
})
} else {
None
};
let ops_tb_config = if ops_size != 0 && ops_refill_time != 0 {
Some(TokenBucketConfig {
size: ops_size,
one_time_burst: Some(ops_one_time_burst),
refill_time: ops_refill_time,
})
} else {
None
};
let rate_limiter_config = if bw_tb_config.is_some() || ops_tb_config.is_some() {
Some(RateLimiterConfig {
bandwidth: bw_tb_config,
ops: ops_tb_config,
})
} else {
None
};
let config = NetConfig {
tap,
ip,
@ -1008,6 +1070,7 @@ impl NetConfig {
vhost_socket,
id,
fds,
rate_limiter_config,
};
config.validate().map_err(Error::Validation)?;
Ok(config)

View File

@ -2076,6 +2076,7 @@ impl DeviceManager {
net_cfg.num_queues,
net_cfg.queue_size,
self.seccomp_action.clone(),
net_cfg.rate_limiter_config,
)
.map_err(DeviceManagerError::CreateVirtioNet)?,
))
@ -2088,6 +2089,7 @@ impl DeviceManager {
net_cfg.iommu,
net_cfg.queue_size,
self.seccomp_action.clone(),
net_cfg.rate_limiter_config,
)
.map_err(DeviceManagerError::CreateVirtioNet)?,
))
@ -2104,6 +2106,7 @@ impl DeviceManager {
net_cfg.num_queues,
net_cfg.queue_size,
self.seccomp_action.clone(),
net_cfg.rate_limiter_config,
)
.map_err(DeviceManagerError::CreateVirtioNet)?,
))