From b176ddfe2ac6424298c4d6dbc68406373418a91d Mon Sep 17 00:00:00 2001 From: Bo Chen Date: Wed, 17 Mar 2021 15:41:52 -0700 Subject: [PATCH] virtio-devices, vmm: Add rate limiter for the TX queue of virtio-net Partially fixes: #1286 Signed-off-by: Bo Chen --- Cargo.lock | 1 + net_util/Cargo.toml | 1 + net_util/src/lib.rs | 1 + net_util/src/queue_pair.rs | 28 ++++++++++- vhost_user_net/src/lib.rs | 1 + virtio-devices/src/net.rs | 57 +++++++++++++++++++++-- virtio-devices/src/seccomp_filters.rs | 1 + vmm/src/config.rs | 67 ++++++++++++++++++++++++++- vmm/src/device_manager.rs | 3 ++ 9 files changed, 154 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1574a447..486bae8ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -632,6 +632,7 @@ dependencies = [ "log 0.4.14", "net_gen", "pnet", + "rate_limiter", "serde", "serde_json", "virtio-bindings", diff --git a/net_util/Cargo.toml b/net_util/Cargo.toml index 2df908c15..9657c588d 100644 --- a/net_util/Cargo.toml +++ b/net_util/Cargo.toml @@ -8,6 +8,7 @@ epoll = ">=4.0.1" libc = "0.2.92" log = "0.4.14" net_gen = { path = "../net_gen" } +rate_limiter = { path = "../rate_limiter" } serde = "1.0.125" virtio-bindings = "0.1.0" vm-memory = { version = "0.5.0", features = ["backend-mmap", "backend-atomic"] } diff --git a/net_util/src/lib.rs b/net_util/src/lib.rs index a226a4f64..603d9f265 100644 --- a/net_util/src/lib.rs +++ b/net_util/src/lib.rs @@ -14,6 +14,7 @@ extern crate libc; #[macro_use] extern crate log; extern crate net_gen; +extern crate rate_limiter; extern crate serde; extern crate virtio_bindings; extern crate vm_memory; diff --git a/net_util/src/queue_pair.rs b/net_util/src/queue_pair.rs index 7a7437c42..808d3a2e9 100644 --- a/net_util/src/queue_pair.rs +++ b/net_util/src/queue_pair.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause use super::{unregister_listener, vnet_hdr_len, Tap}; +use rate_limiter::{RateLimiter, TokenType}; use std::io; use std::num::Wrapping; use std::os::unix::io::{AsRawFd, RawFd}; @@ -36,11 +37,35 @@ impl TxVirtio { mem: &GuestMemoryMmap, tap: &mut Tap, queue: &mut Queue, + rate_limiter: &mut Option, ) -> Result<(), NetQueuePairError> { while let Some(avail_desc) = queue.iter(&mem).next() { let head_index = avail_desc.index; let mut next_desc = Some(avail_desc); + if let Some(rate_limiter) = rate_limiter { + if !rate_limiter.consume(1, TokenType::Ops) { + queue.go_to_previous_position(); + break; + } + + let mut bytes = Wrapping(0); + let mut tmp_next_desc = next_desc.clone(); + while let Some(desc) = tmp_next_desc { + if !desc.is_write_only() { + bytes += Wrapping(desc.len as u64); + } + tmp_next_desc = desc.next_descriptor(); + } + bytes -= Wrapping(vnet_hdr_len() as u64); + if !rate_limiter.consume(bytes.0, TokenType::Bytes) { + // Revert the OPS consume(). + rate_limiter.manual_replenish(1, TokenType::Ops); + queue.go_to_previous_position(); + break; + } + } + let mut iovecs = Vec::new(); while let Some(desc) = next_desc { if !desc.is_write_only() && desc.len > 0 { @@ -209,6 +234,7 @@ pub struct NetQueuePair { pub rx_tap_listening: bool, pub counters: NetCounters, pub tap_event_id: u16, + pub tx_rate_limiter: Option, } impl NetQueuePair { @@ -220,7 +246,7 @@ impl NetQueuePair { .map(|m| m.memory())?; self.tx - .process_desc_chain(&mem, &mut self.tap, &mut queue)?; + .process_desc_chain(&mem, &mut self.tap, &mut queue, &mut self.tx_rate_limiter)?; self.counters .tx_bytes diff --git a/vhost_user_net/src/lib.rs b/vhost_user_net/src/lib.rs index f51e7c9a2..a247c662d 100644 --- a/vhost_user_net/src/lib.rs +++ b/vhost_user_net/src/lib.rs @@ -111,6 +111,7 @@ impl VhostUserNetThread { epoll_fd: None, counters: NetCounters::default(), tap_event_id: 2, + tx_rate_limiter: None, }, }) } diff --git a/virtio-devices/src/net.rs b/virtio-devices/src/net.rs index 3f3abd9f5..85522f2c1 100644 --- a/virtio-devices/src/net.rs +++ b/virtio-devices/src/net.rs @@ -12,7 +12,8 @@ use super::net_util::{ use super::Error as DeviceError; use super::{ ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue, - VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST, + RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, + EPOLL_HELPER_EVENT_LAST, }; use crate::seccomp_filters::{get_seccomp_filter, Thread}; use crate::VirtioInterrupt; @@ -21,7 +22,6 @@ use net_util::{ open_tap, MacAddr, NetCounters, NetQueuePair, OpenTapError, RxVirtio, Tap, TapError, TxVirtio, }; use seccomp::{SeccompAction, SeccompFilter}; -use std::collections::HashMap; use std::net::Ipv4Addr; use std::num::Wrapping; use std::os::unix::io::{AsRawFd, RawFd}; @@ -30,6 +30,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Barrier}; use std::thread; use std::vec::Vec; +use std::{collections::HashMap, convert::TryInto}; use virtio_bindings::bindings::virtio_net::*; use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use vm_memory::{ByteValued, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; @@ -45,6 +46,8 @@ pub const RX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; pub const TX_QUEUE_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; // A frame is available for reading from the tap device to receive in the guest. pub const RX_TAP_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; +// New 'wake up' event from the tx rate limiter +pub const TX_RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4; #[derive(Debug)] pub enum Error { @@ -122,7 +125,15 @@ impl NetEpollHandler { error!("Failed to get tx queue event: {:?}", e); } - self.process_tx()?; + let rate_limit_reached = self + .net + .tx_rate_limiter + .as_ref() + .map_or(false, |r| r.is_blocked()); + + if !rate_limit_reached { + self.process_tx()?; + } Ok(()) } @@ -150,6 +161,9 @@ impl NetEpollHandler { let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?; helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?; + if let Some(rate_limiter) = &self.net.tx_rate_limiter { + helper.add_event(rate_limiter.as_raw_fd(), TX_RATE_LIMITER_EVENT)?; + } // If there are some already available descriptors on the RX queue, // then we can start the thread while listening onto the TAP. @@ -195,6 +209,28 @@ impl EpollHelperHandler for NetEpollHandler { return true; } } + TX_RATE_LIMITER_EVENT => { + if let Some(rate_limiter) = &mut self.net.tx_rate_limiter { + // Upon rate limiter event, call the rate limiter handler + // and restart processing the queue. + match rate_limiter.event_handler() { + Ok(_) => { + self.driver_awake = true; + if let Err(e) = self.process_tx() { + error!("Error processing TX queue: {:?}", e); + return true; + } + } + Err(e) => { + error!("Error from 'rate_limiter.event_handler()': {:?}", e); + return true; + } + } + } else { + error!("Unexpected TX_RATE_LIMITER_EVENT"); + return true; + } + } _ => { error!("Unknown event: {}", ev_type); return true; @@ -212,6 +248,7 @@ pub struct Net { ctrl_queue_epoll_thread: Option>, counters: NetCounters, seccomp_action: SeccompAction, + rate_limiter_config: Option, } #[derive(Serialize, Deserialize)] @@ -224,6 +261,7 @@ pub struct NetState { impl Net { /// Create a new virtio network device with the given TAP interface. + #[allow(clippy::too_many_arguments)] pub fn new_with_tap( id: String, taps: Vec, @@ -232,6 +270,7 @@ impl Net { num_queues: usize, queue_size: u16, seccomp_action: SeccompAction, + rate_limiter_config: Option, ) -> Result { let mut avail_features = 1 << VIRTIO_NET_F_GUEST_CSUM | 1 << VIRTIO_NET_F_CSUM @@ -271,6 +310,7 @@ impl Net { ctrl_queue_epoll_thread: None, counters: NetCounters::default(), seccomp_action, + rate_limiter_config, }) } @@ -288,6 +328,7 @@ impl Net { num_queues: usize, queue_size: u16, seccomp_action: SeccompAction, + rate_limiter_config: Option, ) -> Result { let taps = open_tap(if_name, ip_addr, netmask, host_mac, num_queues / 2, None) .map_err(Error::OpenTap)?; @@ -300,6 +341,7 @@ impl Net { num_queues, queue_size, seccomp_action, + rate_limiter_config, ) } @@ -310,6 +352,7 @@ impl Net { iommu: bool, queue_size: u16, seccomp_action: SeccompAction, + rate_limiter_config: Option, ) -> Result { let mut taps: Vec = Vec::new(); let num_queue_pairs = fds.len(); @@ -327,6 +370,7 @@ impl Net { num_queue_pairs * 2, queue_size, seccomp_action, + rate_limiter_config, ) } @@ -483,6 +527,12 @@ impl VirtioDevice for Net { ActivateError::BadActivate })?; + let tx_rate_limiter: Option = self + .rate_limiter_config + .map(RateLimiterConfig::try_into) + .transpose() + .map_err(ActivateError::CreateRateLimiter)?; + let mut handler = NetEpollHandler { net: NetQueuePair { mem: Some(mem.clone()), @@ -493,6 +543,7 @@ impl VirtioDevice for Net { rx_tap_listening, counters: self.counters.clone(), tap_event_id: RX_TAP_EVENT, + tx_rate_limiter, }, queue_pair, queue_evt_pair, diff --git a/virtio-devices/src/seccomp_filters.rs b/virtio-devices/src/seccomp_filters.rs index aa9703b52..37a068561 100644 --- a/virtio-devices/src/seccomp_filters.rs +++ b/virtio-devices/src/seccomp_filters.rs @@ -228,6 +228,7 @@ fn virtio_net_thread_rules() -> Vec { allow_syscall(libc::SYS_readv), allow_syscall(libc::SYS_rt_sigprocmask), allow_syscall(libc::SYS_sigaltstack), + allow_syscall(libc::SYS_timerfd_settime), allow_syscall(libc::SYS_write), allow_syscall(libc::SYS_writev), ] diff --git a/vmm/src/config.rs b/vmm/src/config.rs index c6ab2b126..714230564 100644 --- a/vmm/src/config.rs +++ b/vmm/src/config.rs @@ -887,6 +887,8 @@ pub struct NetConfig { pub id: Option, #[serde(default)] pub fds: Option>, + #[serde(default)] + pub rate_limiter_config: Option, } fn default_netconfig_tap() -> Option { @@ -928,6 +930,7 @@ impl Default for NetConfig { vhost_socket: None, id: None, fds: None, + rate_limiter_config: None, } } } @@ -936,7 +939,9 @@ impl NetConfig { pub const SYNTAX: &'static str = "Network parameters \ \"tap=,ip=,mask=,mac=,fd=,iommu=on|off,\ num_queues=,queue_size=,\ - vhost_user=,socket=,id=\""; + vhost_user=,socket=,id=,\ + bw_size=,bw_one_time_burst=,bw_refill_time=,\ + ops_size=,ops_one_time_burst=,ops_refill_time=\""; pub fn parse(net: &str) -> Result { let mut parser = OptionParser::new(); @@ -953,7 +958,13 @@ impl NetConfig { .add("vhost_user") .add("socket") .add("id") - .add("fd"); + .add("fd") + .add("bw_size") + .add("bw_one_time_burst") + .add("bw_refill_time") + .add("ops_size") + .add("ops_one_time_burst") + .add("ops_refill_time"); parser.parse(net).map_err(Error::ParseNetwork)?; let tap = parser.get("tap"); @@ -995,6 +1006,57 @@ impl NetConfig { .map_err(Error::ParseNetwork)? .map(|v| v.0.iter().map(|e| *e as i32).collect()); + let bw_size = parser + .convert("bw_size") + .map_err(Error::ParseDisk)? + .unwrap_or_default(); + let bw_one_time_burst = parser + .convert("bw_one_time_burst") + .map_err(Error::ParseDisk)? + .unwrap_or_default(); + let bw_refill_time = parser + .convert("bw_refill_time") + .map_err(Error::ParseDisk)? + .unwrap_or_default(); + let ops_size = parser + .convert("ops_size") + .map_err(Error::ParseDisk)? + .unwrap_or_default(); + let ops_one_time_burst = parser + .convert("ops_one_time_burst") + .map_err(Error::ParseDisk)? + .unwrap_or_default(); + let ops_refill_time = parser + .convert("ops_refill_time") + .map_err(Error::ParseDisk)? + .unwrap_or_default(); + let bw_tb_config = if bw_size != 0 && bw_refill_time != 0 { + Some(TokenBucketConfig { + size: bw_size, + one_time_burst: Some(bw_one_time_burst), + refill_time: bw_refill_time, + }) + } else { + None + }; + let ops_tb_config = if ops_size != 0 && ops_refill_time != 0 { + Some(TokenBucketConfig { + size: ops_size, + one_time_burst: Some(ops_one_time_burst), + refill_time: ops_refill_time, + }) + } else { + None + }; + let rate_limiter_config = if bw_tb_config.is_some() || ops_tb_config.is_some() { + Some(RateLimiterConfig { + bandwidth: bw_tb_config, + ops: ops_tb_config, + }) + } else { + None + }; + let config = NetConfig { tap, ip, @@ -1008,6 +1070,7 @@ impl NetConfig { vhost_socket, id, fds, + rate_limiter_config, }; config.validate().map_err(Error::Validation)?; Ok(config) diff --git a/vmm/src/device_manager.rs b/vmm/src/device_manager.rs index b0919a029..8df956c06 100644 --- a/vmm/src/device_manager.rs +++ b/vmm/src/device_manager.rs @@ -2076,6 +2076,7 @@ impl DeviceManager { net_cfg.num_queues, net_cfg.queue_size, self.seccomp_action.clone(), + net_cfg.rate_limiter_config, ) .map_err(DeviceManagerError::CreateVirtioNet)?, )) @@ -2088,6 +2089,7 @@ impl DeviceManager { net_cfg.iommu, net_cfg.queue_size, self.seccomp_action.clone(), + net_cfg.rate_limiter_config, ) .map_err(DeviceManagerError::CreateVirtioNet)?, )) @@ -2104,6 +2106,7 @@ impl DeviceManager { net_cfg.num_queues, net_cfg.queue_size, self.seccomp_action.clone(), + net_cfg.rate_limiter_config, ) .map_err(DeviceManagerError::CreateVirtioNet)?, ))