vmm: use RateLimiterGroup for virtio-blk devices

Add a 'rate_limit_groups' field to VmConfig that defines a set of
named RateLimiterGroups.

When the 'rate_limit_group' field of DiskConfig is defined, all
virtio-blk queues will be rate-limited by a shared RateLimiterGroup.
The lifecycle of all RateLimiterGroups is tied to the Vm.
A RateLimiterGroup may exist even if no Disks are configured to use
the RateLimiterGroup. Disks may be hot-added or hot-removed from the
RateLimiterGroup.

When the 'rate_limiter' field of DiskConfig is defined, we construct
an anonymous RateLimiterGroup whose lifecycle is tied to the Disk.
This is primarily done for api backwards compatability. Importantly,
the behavior is not the same! This implementation rate_limits the
aggregate bandwidth / iops of an individual disk rather than the
bandwidth / iops of an individual queue of a disk.

When neither the 'rate_limit_group' or the 'rate_limiter' fields of
DiskConfig is defined, the Disk is not rate-limited.

Signed-off-by: Thomas Barrett <tbarrett@crusoeenergy.com>
This commit is contained in:
Thomas Barrett 2023-12-07 19:45:08 +00:00 committed by Bo Chen
parent c71da496c0
commit c297d8d796
10 changed files with 358 additions and 18 deletions

1
Cargo.lock generated
View File

@ -2594,6 +2594,7 @@ dependencies = [
"option_parser", "option_parser",
"pci", "pci",
"range_map_vec", "range_map_vec",
"rate_limiter",
"seccompiler", "seccompiler",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -42,3 +42,14 @@ actual rate limit users get can be as low as
generally advisable to keep `bw/ops_refill_time` larger than `100 ms` generally advisable to keep `bw/ops_refill_time` larger than `100 ms`
(`cool_down_time`) to make sure the actual rate limit is close to users' (`cool_down_time`) to make sure the actual rate limit is close to users'
expectation ("refill-rate"). expectation ("refill-rate").
## Rate Limit Groups
It is possible to throttle the aggregate bandwidth or operations
of multiple virtio-blk devices using a `rate_limit_group`. virtio-blk devices may be
dynamically added and removed from a `rate_limit_group`. The following example
demonstrates how to throttle the aggregate bandwidth of two disks to 10 MiB/s.
```
--disk path=disk0.raw,rate_limit_group=group0 \
path=disk1.raw,rate_limit_group=group0 \
--rate-limit-group bw_size=1048576,bw_refill_time,bw_refill_time=100
```

View File

@ -240,6 +240,13 @@ fn create_app(default_vcpus: String, default_memory: String, default_rng: String
.num_args(1) .num_args(1)
.group("vm-config"), .group("vm-config"),
) )
.arg(
Arg::new("rate-limit-group")
.long("rate-limit-group")
.help(config::RateLimiterGroupConfig::SYNTAX)
.num_args(1..)
.group("vm-config"),
)
.arg( .arg(
Arg::new("disk") Arg::new("disk")
.long("disk") .long("disk")
@ -845,6 +852,7 @@ mod unit_tests {
kernel: Some(PathBuf::from("/path/to/kernel")), kernel: Some(PathBuf::from("/path/to/kernel")),
..Default::default() ..Default::default()
}), }),
rate_limit_groups: None,
disks: None, disks: None,
net: None, net: None,
rng: RngConfig { rng: RngConfig {
@ -1116,6 +1124,29 @@ mod unit_tests {
}"#, }"#,
true, true,
), ),
(
vec![
"cloud-hypervisor",
"--kernel",
"/path/to/kernel",
"--disk",
"path=/path/to/disk/1,rate_limit_group=group0",
"path=/path/to/disk/2,rate_limit_group=group0",
"--rate-limit-group",
"id=group0,bw_size=1000,bw_refill_time=100",
],
r#"{
"payload": {"kernel": "/path/to/kernel"},
"disks": [
{"path": "/path/to/disk/1", "rate_limit_group": "group0"},
{"path": "/path/to/disk/2", "rate_limit_group": "group0"}
],
"rate_limit_groups": [
{"id": "group0", "rate_limiter_config": {"bandwidth": {"size": 1000, "one_time_burst": 0, "refill_time": 100}}}
]
}"#,
true,
),
] ]
.iter() .iter()
.for_each(|(cli, openapi, equal)| { .for_each(|(cli, openapi, equal)| {

View File

@ -10,9 +10,8 @@
use super::Error as DeviceError; use super::Error as DeviceError;
use super::{ use super::{
ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, VirtioCommon,
RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST,
EPOLL_HELPER_EVENT_LAST,
}; };
use crate::seccomp_filters::Thread; use crate::seccomp_filters::Thread;
use crate::thread_helper::spawn_virtio_thread; use crate::thread_helper::spawn_virtio_thread;
@ -23,7 +22,8 @@ use block::{
async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request, async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request,
RequestType, VirtioBlockConfig, RequestType, VirtioBlockConfig,
}; };
use rate_limiter::{RateLimiter, TokenType}; use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
use rate_limiter::TokenType;
use seccompiler::SeccompAction; use seccompiler::SeccompAction;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io; use std::io;
@ -131,7 +131,7 @@ struct BlockEpollHandler {
counters: BlockCounters, counters: BlockCounters,
queue_evt: EventFd, queue_evt: EventFd,
inflight_requests: VecDeque<(u16, Request)>, inflight_requests: VecDeque<(u16, Request)>,
rate_limiter: Option<RateLimiter>, rate_limiter: Option<RateLimiterGroupHandle>,
access_platform: Option<Arc<dyn AccessPlatform>>, access_platform: Option<Arc<dyn AccessPlatform>>,
read_only: bool, read_only: bool,
} }
@ -507,7 +507,7 @@ pub struct Block {
writeback: Arc<AtomicBool>, writeback: Arc<AtomicBool>,
counters: BlockCounters, counters: BlockCounters,
seccomp_action: SeccompAction, seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>, rate_limiter: Option<Arc<RateLimiterGroup>>,
exit_evt: EventFd, exit_evt: EventFd,
read_only: bool, read_only: bool,
serial: Vec<u8>, serial: Vec<u8>,
@ -537,7 +537,7 @@ impl Block {
queue_size: u16, queue_size: u16,
serial: Option<String>, serial: Option<String>,
seccomp_action: SeccompAction, seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>, rate_limiter: Option<Arc<RateLimiterGroup>>,
exit_evt: EventFd, exit_evt: EventFd,
state: Option<BlockState>, state: Option<BlockState>,
) -> io::Result<Self> { ) -> io::Result<Self> {
@ -639,7 +639,7 @@ impl Block {
writeback: Arc::new(AtomicBool::new(true)), writeback: Arc::new(AtomicBool::new(true)),
counters: BlockCounters::default(), counters: BlockCounters::default(),
seccomp_action, seccomp_action,
rate_limiter_config, rate_limiter,
exit_evt, exit_evt,
read_only, read_only,
serial, serial,
@ -747,12 +747,6 @@ impl VirtioDevice for Block {
let queue_size = queue.size(); let queue_size = queue.size();
let (kill_evt, pause_evt) = self.common.dup_eventfds(); let (kill_evt, pause_evt) = self.common.dup_eventfds();
let rate_limiter: Option<RateLimiter> = self
.rate_limiter_config
.map(RateLimiterConfig::try_into)
.transpose()
.map_err(ActivateError::CreateRateLimiter)?;
let mut handler = BlockEpollHandler { let mut handler = BlockEpollHandler {
queue_index: i as u16, queue_index: i as u16,
queue, queue,
@ -776,7 +770,12 @@ impl VirtioDevice for Block {
// This gives head room for systems with slower I/O without // This gives head room for systems with slower I/O without
// compromising the cost of the reallocation or memory overhead // compromising the cost of the reallocation or memory overhead
inflight_requests: VecDeque::with_capacity(64), inflight_requests: VecDeque::with_capacity(64),
rate_limiter, rate_limiter: self
.rate_limiter
.as_ref()
.map(|r| r.new_handle())
.transpose()
.unwrap(),
access_platform: self.common.access_platform.clone(), access_platform: self.common.access_platform.clone(),
read_only: self.read_only, read_only: self.read_only,
}; };

View File

@ -47,6 +47,7 @@ once_cell = "1.19.0"
option_parser = { path = "../option_parser" } option_parser = { path = "../option_parser" }
pci = { path = "../pci" } pci = { path = "../pci" }
range_map_vec = { version = "0.1.0", optional = true } range_map_vec = { version = "0.1.0", optional = true }
rate_limiter = { path = "../rate_limiter" }
seccompiler = "0.4.0" seccompiler = "0.4.0"
serde = { version = "1.0.168", features = ["rc", "derive"] } serde = { version = "1.0.168", features = ["rc", "derive"] }
serde_json = "1.0.109" serde_json = "1.0.109"

View File

@ -554,6 +554,10 @@ components:
$ref: "#/components/schemas/MemoryConfig" $ref: "#/components/schemas/MemoryConfig"
payload: payload:
$ref: "#/components/schemas/PayloadConfig" $ref: "#/components/schemas/PayloadConfig"
rate_limit_groups:
type: array
items:
$ref: "#/components/schemas/RateLimitGroupConfig"
disks: disks:
type: array type: array
items: items:
@ -812,6 +816,17 @@ components:
Defines an IO rate limiter with independent bytes/s and ops/s limits. Defines an IO rate limiter with independent bytes/s and ops/s limits.
Limits are defined by configuring each of the _bandwidth_ and _ops_ token buckets. Limits are defined by configuring each of the _bandwidth_ and _ops_ token buckets.
RateLimitGroupConfig:
required:
- id
- rate_limiter_config
type: object
properties:
id:
type: string
rate_limiter_config:
$ref: "#/components/schemas/RateLimiterConfig"
DiskConfig: DiskConfig:
required: required:
- path - path
@ -848,6 +863,8 @@ components:
type: string type: string
serial: serial:
type: string type: string
rate_limit_group:
type: string
NetConfig: NetConfig:
type: object type: object

View File

@ -45,6 +45,8 @@ pub enum Error {
ParseMemoryZone(OptionParserError), ParseMemoryZone(OptionParserError),
/// Missing 'id' from memory zone /// Missing 'id' from memory zone
ParseMemoryZoneIdMissing, ParseMemoryZoneIdMissing,
/// Error parsing rate-limiter group options
ParseRateLimiterGroup(OptionParserError),
/// Error parsing disk options /// Error parsing disk options
ParseDisk(OptionParserError), ParseDisk(OptionParserError),
/// Error parsing network options /// Error parsing network options
@ -179,6 +181,8 @@ pub enum ValidationError {
PciSegmentReused(u16, u32, u32), PciSegmentReused(u16, u32, u32),
/// Default PCI segment is assigned to NUMA node other than 0. /// Default PCI segment is assigned to NUMA node other than 0.
DefaultPciSegmentInvalidNode(u32), DefaultPciSegmentInvalidNode(u32),
/// Invalid rate-limiter group
InvalidRateLimiterGroup,
} }
type ValidationResult<T> = std::result::Result<T, ValidationError>; type ValidationResult<T> = std::result::Result<T, ValidationError>;
@ -299,6 +303,9 @@ impl fmt::Display for ValidationError {
DefaultPciSegmentInvalidNode(u1) => { DefaultPciSegmentInvalidNode(u1) => {
write!(f, "Default PCI segment assigned to non-zero NUMA node {u1}") write!(f, "Default PCI segment assigned to non-zero NUMA node {u1}")
} }
InvalidRateLimiterGroup => {
write!(f, "Invalid rate-limiter group")
}
} }
} }
} }
@ -327,6 +334,7 @@ impl fmt::Display for Error {
ParseMemoryZone(o) => write!(f, "Error parsing --memory-zone: {o}"), ParseMemoryZone(o) => write!(f, "Error parsing --memory-zone: {o}"),
ParseMemoryZoneIdMissing => write!(f, "Error parsing --memory-zone: id missing"), ParseMemoryZoneIdMissing => write!(f, "Error parsing --memory-zone: id missing"),
ParseNetwork(o) => write!(f, "Error parsing --net: {o}"), ParseNetwork(o) => write!(f, "Error parsing --net: {o}"),
ParseRateLimiterGroup(o) => write!(f, "Error parsing --rate-limit-group: {o}"),
ParseDisk(o) => write!(f, "Error parsing --disk: {o}"), ParseDisk(o) => write!(f, "Error parsing --disk: {o}"),
ParseRng(o) => write!(f, "Error parsing --rng: {o}"), ParseRng(o) => write!(f, "Error parsing --rng: {o}"),
ParseBalloon(o) => write!(f, "Error parsing --balloon: {o}"), ParseBalloon(o) => write!(f, "Error parsing --balloon: {o}"),
@ -377,6 +385,7 @@ pub struct VmParams<'a> {
pub kernel: Option<&'a str>, pub kernel: Option<&'a str>,
pub initramfs: Option<&'a str>, pub initramfs: Option<&'a str>,
pub cmdline: Option<&'a str>, pub cmdline: Option<&'a str>,
pub rate_limit_groups: Option<Vec<&'a str>>,
pub disks: Option<Vec<&'a str>>, pub disks: Option<Vec<&'a str>>,
pub net: Option<Vec<&'a str>>, pub net: Option<Vec<&'a str>>,
pub rng: &'a str, pub rng: &'a str,
@ -416,6 +425,9 @@ impl<'a> VmParams<'a> {
let kernel = args.get_one::<String>("kernel").map(|x| x as &str); let kernel = args.get_one::<String>("kernel").map(|x| x as &str);
let initramfs = args.get_one::<String>("initramfs").map(|x| x as &str); let initramfs = args.get_one::<String>("initramfs").map(|x| x as &str);
let cmdline = args.get_one::<String>("cmdline").map(|x| x as &str); let cmdline = args.get_one::<String>("cmdline").map(|x| x as &str);
let rate_limit_groups: Option<Vec<&str>> = args
.get_many::<String>("rate-limit-group")
.map(|x| x.map(|y| y as &str).collect());
let disks: Option<Vec<&str>> = args let disks: Option<Vec<&str>> = args
.get_many::<String>("disk") .get_many::<String>("disk")
.map(|x| x.map(|y| y as &str).collect()); .map(|x| x.map(|y| y as &str).collect());
@ -463,6 +475,7 @@ impl<'a> VmParams<'a> {
kernel, kernel,
initramfs, initramfs,
cmdline, cmdline,
rate_limit_groups,
disks, disks,
net, net,
rng, rng,
@ -863,6 +876,93 @@ impl MemoryConfig {
} }
} }
impl RateLimiterGroupConfig {
pub const SYNTAX: &'static str = "Rate Limit Group parameters \
\"bw_size=<bytes>,bw_one_time_burst=<bytes>,bw_refill_time=<ms>,\
ops_size=<io_ops>,ops_one_time_burst=<io_ops>,ops_refill_time=<ms>,\
id=<device_id>\"";
pub fn parse(rate_limit_group: &str) -> Result<Self> {
let mut parser = OptionParser::new();
parser
.add("bw_size")
.add("bw_one_time_burst")
.add("bw_refill_time")
.add("ops_size")
.add("ops_one_time_burst")
.add("ops_refill_time")
.add("id");
parser
.parse(rate_limit_group)
.map_err(Error::ParseRateLimiterGroup)?;
let id = parser.get("id").unwrap_or_default();
let bw_size = parser
.convert("bw_size")
.map_err(Error::ParseRateLimiterGroup)?
.unwrap_or_default();
let bw_one_time_burst = parser
.convert("bw_one_time_burst")
.map_err(Error::ParseRateLimiterGroup)?
.unwrap_or_default();
let bw_refill_time = parser
.convert("bw_refill_time")
.map_err(Error::ParseRateLimiterGroup)?
.unwrap_or_default();
let ops_size = parser
.convert("ops_size")
.map_err(Error::ParseRateLimiterGroup)?
.unwrap_or_default();
let ops_one_time_burst = parser
.convert("ops_one_time_burst")
.map_err(Error::ParseRateLimiterGroup)?
.unwrap_or_default();
let ops_refill_time = parser
.convert("ops_refill_time")
.map_err(Error::ParseRateLimiterGroup)?
.unwrap_or_default();
let bw_tb_config = if bw_size != 0 && bw_refill_time != 0 {
Some(TokenBucketConfig {
size: bw_size,
one_time_burst: Some(bw_one_time_burst),
refill_time: bw_refill_time,
})
} else {
None
};
let ops_tb_config = if ops_size != 0 && ops_refill_time != 0 {
Some(TokenBucketConfig {
size: ops_size,
one_time_burst: Some(ops_one_time_burst),
refill_time: ops_refill_time,
})
} else {
None
};
Ok(RateLimiterGroupConfig {
id,
rate_limiter_config: RateLimiterConfig {
bandwidth: bw_tb_config,
ops: ops_tb_config,
},
})
}
pub fn validate(&self, _vm_config: &VmConfig) -> ValidationResult<()> {
if self.rate_limiter_config.bandwidth.is_none() && self.rate_limiter_config.ops.is_none() {
return Err(ValidationError::InvalidRateLimiterGroup);
}
if self.id.is_empty() {
return Err(ValidationError::InvalidRateLimiterGroup);
}
Ok(())
}
}
impl DiskConfig { impl DiskConfig {
pub const SYNTAX: &'static str = "Disk parameters \ pub const SYNTAX: &'static str = "Disk parameters \
\"path=<disk_image_path>,readonly=on|off,direct=on|off,iommu=on|off,\ \"path=<disk_image_path>,readonly=on|off,direct=on|off,iommu=on|off,\
@ -870,7 +970,7 @@ impl DiskConfig {
vhost_user=on|off,socket=<vhost_user_socket_path>,\ vhost_user=on|off,socket=<vhost_user_socket_path>,\
bw_size=<bytes>,bw_one_time_burst=<bytes>,bw_refill_time=<ms>,\ bw_size=<bytes>,bw_one_time_burst=<bytes>,bw_refill_time=<ms>,\
ops_size=<io_ops>,ops_one_time_burst=<io_ops>,ops_refill_time=<ms>,\ ops_size=<io_ops>,ops_one_time_burst=<io_ops>,ops_refill_time=<ms>,\
id=<device_id>,pci_segment=<segment_id>\""; id=<device_id>,pci_segment=<segment_id>,rate_limit_group=<group_id>\"";
pub fn parse(disk: &str) -> Result<Self> { pub fn parse(disk: &str) -> Result<Self> {
let mut parser = OptionParser::new(); let mut parser = OptionParser::new();
@ -893,7 +993,8 @@ impl DiskConfig {
.add("_disable_io_uring") .add("_disable_io_uring")
.add("_disable_aio") .add("_disable_aio")
.add("pci_segment") .add("pci_segment")
.add("serial"); .add("serial")
.add("rate_limit_group");
parser.parse(disk).map_err(Error::ParseDisk)?; parser.parse(disk).map_err(Error::ParseDisk)?;
let path = parser.get("path").map(PathBuf::from); let path = parser.get("path").map(PathBuf::from);
@ -941,6 +1042,7 @@ impl DiskConfig {
.convert("pci_segment") .convert("pci_segment")
.map_err(Error::ParseDisk)? .map_err(Error::ParseDisk)?
.unwrap_or_default(); .unwrap_or_default();
let rate_limit_group = parser.get("rate_limit_group");
let bw_size = parser let bw_size = parser
.convert("bw_size") .convert("bw_size")
.map_err(Error::ParseDisk)? .map_err(Error::ParseDisk)?
@ -1002,6 +1104,7 @@ impl DiskConfig {
queue_size, queue_size,
vhost_user, vhost_user,
vhost_socket, vhost_socket,
rate_limit_group,
rate_limiter_config, rate_limiter_config,
id, id,
disable_io_uring, disable_io_uring,
@ -1032,6 +1135,10 @@ impl DiskConfig {
} }
} }
if self.rate_limiter_config.is_some() && self.rate_limit_group.is_some() {
return Err(ValidationError::InvalidRateLimiterGroup);
}
Ok(()) Ok(())
} }
} }
@ -1959,6 +2066,14 @@ impl VmConfig {
return Err(ValidationError::CpusMaxLowerThanBoot); return Err(ValidationError::CpusMaxLowerThanBoot);
} }
if let Some(rate_limit_groups) = &self.rate_limit_groups {
for rate_limit_group in rate_limit_groups {
rate_limit_group.validate(self)?;
Self::validate_identifier(&mut id_list, &Some(rate_limit_group.id.clone()))?;
}
}
if let Some(disks) = &self.disks { if let Some(disks) = &self.disks {
for disk in disks { for disk in disks {
if disk.vhost_socket.as_ref().and(disk.path.as_ref()).is_some() { if disk.vhost_socket.as_ref().and(disk.path.as_ref()).is_some() {
@ -1970,6 +2085,19 @@ impl VmConfig {
if disk.vhost_user && disk.vhost_socket.is_none() { if disk.vhost_user && disk.vhost_socket.is_none() {
return Err(ValidationError::VhostUserMissingSocket); return Err(ValidationError::VhostUserMissingSocket);
} }
if let Some(rate_limit_group) = &disk.rate_limit_group {
if let Some(rate_limit_groups) = &self.rate_limit_groups {
if !rate_limit_groups
.iter()
.any(|cfg| &cfg.id == rate_limit_group)
{
return Err(ValidationError::InvalidRateLimiterGroup);
}
} else {
return Err(ValidationError::InvalidRateLimiterGroup);
}
}
disk.validate(self)?; disk.validate(self)?;
self.iommu |= disk.iommu; self.iommu |= disk.iommu;
@ -2178,6 +2306,16 @@ impl VmConfig {
} }
pub fn parse(vm_params: VmParams) -> Result<Self> { pub fn parse(vm_params: VmParams) -> Result<Self> {
let mut rate_limit_groups: Option<Vec<RateLimiterGroupConfig>> = None;
if let Some(rate_limit_group_list) = &vm_params.rate_limit_groups {
let mut rate_limit_group_config_list = Vec::new();
for item in rate_limit_group_list.iter() {
let rate_limit_group_config = RateLimiterGroupConfig::parse(item)?;
rate_limit_group_config_list.push(rate_limit_group_config);
}
rate_limit_groups = Some(rate_limit_group_config_list);
}
let mut disks: Option<Vec<DiskConfig>> = None; let mut disks: Option<Vec<DiskConfig>> = None;
if let Some(disk_list) = &vm_params.disks { if let Some(disk_list) = &vm_params.disks {
let mut disk_config_list = Vec::new(); let mut disk_config_list = Vec::new();
@ -2324,6 +2462,7 @@ impl VmConfig {
cpus: CpusConfig::parse(vm_params.cpus)?, cpus: CpusConfig::parse(vm_params.cpus)?,
memory: MemoryConfig::parse(vm_params.memory, vm_params.memory_zones)?, memory: MemoryConfig::parse(vm_params.memory, vm_params.memory_zones)?,
payload, payload,
rate_limit_groups,
disks, disks,
net, net,
rng, rng,
@ -2447,6 +2586,7 @@ impl Clone for VmConfig {
cpus: self.cpus.clone(), cpus: self.cpus.clone(),
memory: self.memory.clone(), memory: self.memory.clone(),
payload: self.payload.clone(), payload: self.payload.clone(),
rate_limit_groups: self.rate_limit_groups.clone(),
disks: self.disks.clone(), disks: self.disks.clone(),
net: self.net.clone(), net: self.net.clone(),
rng: self.rng.clone(), rng: self.rng.clone(),
@ -2624,6 +2764,39 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn test_rate_limit_group_parsing() -> Result<()> {
assert_eq!(
RateLimiterGroupConfig::parse("id=group0,bw_size=1000,bw_refill_time=100")?,
RateLimiterGroupConfig {
id: "group0".to_string(),
rate_limiter_config: RateLimiterConfig {
bandwidth: Some(TokenBucketConfig {
size: 1000,
one_time_burst: Some(0),
refill_time: 100,
}),
ops: None,
}
}
);
assert_eq!(
RateLimiterGroupConfig::parse("id=group0,ops_size=1000,ops_refill_time=100")?,
RateLimiterGroupConfig {
id: "group0".to_string(),
rate_limiter_config: RateLimiterConfig {
bandwidth: None,
ops: Some(TokenBucketConfig {
size: 1000,
one_time_burst: Some(0),
refill_time: 100,
}),
}
}
);
Ok(())
}
#[test] #[test]
fn test_disk_parsing() -> Result<()> { fn test_disk_parsing() -> Result<()> {
assert_eq!( assert_eq!(
@ -2706,6 +2879,14 @@ mod tests {
..Default::default() ..Default::default()
} }
); );
assert_eq!(
DiskConfig::parse("path=/path/to_file,rate_limit_group=group0")?,
DiskConfig {
path: Some(PathBuf::from("/path/to_file")),
rate_limit_group: Some("group0".to_string()),
..Default::default()
}
);
Ok(()) Ok(())
} }
@ -3088,6 +3269,7 @@ mod tests {
kernel: Some(PathBuf::from("/path/to/kernel")), kernel: Some(PathBuf::from("/path/to/kernel")),
..Default::default() ..Default::default()
}), }),
rate_limit_groups: None,
disks: None, disks: None,
net: None, net: None,
rng: RngConfig { rng: RngConfig {
@ -3576,6 +3758,17 @@ mod tests {
Err(ValidationError::InvalidPciSegment(1)) Err(ValidationError::InvalidPciSegment(1))
); );
let mut invalid_config = valid_config.clone();
invalid_config.disks = Some(vec![DiskConfig {
path: Some(PathBuf::from("/path/to/image")),
rate_limit_group: Some("foo".into()),
..Default::default()
}]);
assert_eq!(
invalid_config.validate(),
Err(ValidationError::InvalidRateLimiterGroup)
);
let mut still_valid_config = valid_config.clone(); let mut still_valid_config = valid_config.clone();
still_valid_config.devices = Some(vec![ still_valid_config.devices = Some(vec![
DeviceConfig { DeviceConfig {

View File

@ -61,6 +61,7 @@ use pci::{
DeviceRelocation, PciBarRegionType, PciBdf, PciDevice, VfioPciDevice, VfioUserDmaMapping, DeviceRelocation, PciBarRegionType, PciBdf, PciDevice, VfioPciDevice, VfioUserDmaMapping,
VfioUserPciDevice, VfioUserPciDeviceError, VfioUserPciDevice, VfioUserPciDeviceError,
}; };
use rate_limiter::group::RateLimiterGroup;
use seccompiler::SeccompAction; use seccompiler::SeccompAction;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeSet, HashMap};
@ -479,7 +480,11 @@ pub enum DeviceManagerError {
/// Cannot create a PvPanic device /// Cannot create a PvPanic device
PvPanicCreate(devices::pvpanic::PvPanicError), PvPanicCreate(devices::pvpanic::PvPanicError),
/// Cannot create a RateLimiterGroup
RateLimiterGroupCreate(rate_limiter::group::Error),
} }
pub type DeviceManagerResult<T> = result::Result<T, DeviceManagerError>; pub type DeviceManagerResult<T> = result::Result<T, DeviceManagerError>;
const DEVICE_MANAGER_ACPI_SIZE: usize = 0x10; const DEVICE_MANAGER_ACPI_SIZE: usize = 0x10;
@ -945,6 +950,8 @@ pub struct DeviceManager {
acpi_platform_addresses: AcpiPlatformAddresses, acpi_platform_addresses: AcpiPlatformAddresses,
snapshot: Option<Snapshot>, snapshot: Option<Snapshot>,
rate_limit_groups: HashMap<String, Arc<RateLimiterGroup>>,
} }
impl DeviceManager { impl DeviceManager {
@ -1096,6 +1103,31 @@ impl DeviceManager {
cpu_manager.lock().unwrap().set_acpi_address(acpi_address); cpu_manager.lock().unwrap().set_acpi_address(acpi_address);
} }
let mut rate_limit_groups = HashMap::<String, Arc<RateLimiterGroup>>::new();
if let Some(rate_limit_groups_cfg) = config.lock().unwrap().rate_limit_groups.as_ref() {
for rate_limit_group_cfg in rate_limit_groups_cfg {
let rate_limit_cfg = rate_limit_group_cfg.rate_limiter_config;
let bw = rate_limit_cfg.bandwidth.unwrap_or_default();
let ops = rate_limit_cfg.ops.unwrap_or_default();
let mut rate_limit_group = RateLimiterGroup::new(
&rate_limit_group_cfg.id,
bw.size,
bw.one_time_burst.unwrap_or(0),
bw.refill_time,
ops.size,
ops.one_time_burst.unwrap_or(0),
ops.refill_time,
)
.map_err(DeviceManagerError::RateLimiterGroupCreate)?;
let exit_evt = exit_evt.try_clone().map_err(DeviceManagerError::EventFd)?;
rate_limit_group.start_thread(exit_evt).unwrap();
rate_limit_groups
.insert(rate_limit_group_cfg.id.clone(), Arc::new(rate_limit_group));
}
}
let device_manager = DeviceManager { let device_manager = DeviceManager {
hypervisor_type, hypervisor_type,
address_manager: Arc::clone(&address_manager), address_manager: Arc::clone(&address_manager),
@ -1148,6 +1180,7 @@ impl DeviceManager {
pending_activations: Arc::new(Mutex::new(Vec::default())), pending_activations: Arc::new(Mutex::new(Vec::default())),
acpi_platform_addresses: AcpiPlatformAddresses::default(), acpi_platform_addresses: AcpiPlatformAddresses::default(),
snapshot, snapshot,
rate_limit_groups,
}; };
let device_manager = Arc::new(Mutex::new(device_manager)); let device_manager = Arc::new(Mutex::new(device_manager));
@ -2332,6 +2365,38 @@ impl DeviceManager {
} }
}; };
let rate_limit_group =
if let Some(rate_limiter_cfg) = disk_cfg.rate_limiter_config.as_ref() {
// Create an anonymous RateLimiterGroup that is dropped when the Disk
// is dropped.
let bw = rate_limiter_cfg.bandwidth.unwrap_or_default();
let ops = rate_limiter_cfg.ops.unwrap_or_default();
let mut rate_limit_group = RateLimiterGroup::new(
disk_cfg.id.as_ref().unwrap(),
bw.size,
bw.one_time_burst.unwrap_or(0),
bw.refill_time,
ops.size,
ops.one_time_burst.unwrap_or(0),
ops.refill_time,
)
.map_err(DeviceManagerError::RateLimiterGroupCreate)?;
rate_limit_group
.start_thread(
self.exit_evt
.try_clone()
.map_err(DeviceManagerError::EventFd)?,
)
.unwrap();
Some(Arc::new(rate_limit_group))
} else if let Some(rate_limit_group) = disk_cfg.rate_limit_group.as_ref() {
self.rate_limit_groups.get(rate_limit_group).cloned()
} else {
None
};
let virtio_block = Arc::new(Mutex::new( let virtio_block = Arc::new(Mutex::new(
virtio_devices::Block::new( virtio_devices::Block::new(
id.clone(), id.clone(),
@ -2347,7 +2412,7 @@ impl DeviceManager {
disk_cfg.queue_size, disk_cfg.queue_size,
disk_cfg.serial.clone(), disk_cfg.serial.clone(),
self.seccomp_action.clone(), self.seccomp_action.clone(),
disk_cfg.rate_limiter_config, rate_limit_group,
self.exit_evt self.exit_evt
.try_clone() .try_clone()
.map_err(DeviceManagerError::EventFd)?, .map_err(DeviceManagerError::EventFd)?,

View File

@ -2285,6 +2285,7 @@ mod unit_tests {
kernel: Some(PathBuf::from("/path/to/kernel")), kernel: Some(PathBuf::from("/path/to/kernel")),
..Default::default() ..Default::default()
}), }),
rate_limit_groups: None,
disks: None, disks: None,
net: None, net: None,
rng: RngConfig { rng: RngConfig {

View File

@ -195,6 +195,23 @@ pub enum VhostMode {
Server, Server,
} }
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct RateLimiterGroupConfig {
#[serde(default)]
pub id: String,
#[serde(default)]
pub rate_limiter_config: RateLimiterConfig,
}
impl Default for RateLimiterGroupConfig {
fn default() -> Self {
RateLimiterGroupConfig {
id: "".to_string(),
rate_limiter_config: RateLimiterConfig::default(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct DiskConfig { pub struct DiskConfig {
pub path: Option<PathBuf>, pub path: Option<PathBuf>,
@ -212,6 +229,8 @@ pub struct DiskConfig {
pub vhost_user: bool, pub vhost_user: bool,
pub vhost_socket: Option<String>, pub vhost_socket: Option<String>,
#[serde(default)] #[serde(default)]
pub rate_limit_group: Option<String>,
#[serde(default)]
pub rate_limiter_config: Option<RateLimiterConfig>, pub rate_limiter_config: Option<RateLimiterConfig>,
#[serde(default)] #[serde(default)]
pub id: Option<String>, pub id: Option<String>,
@ -253,6 +272,7 @@ impl Default for DiskConfig {
id: None, id: None,
disable_io_uring: false, disable_io_uring: false,
disable_aio: false, disable_aio: false,
rate_limit_group: None,
rate_limiter_config: None, rate_limiter_config: None,
pci_segment: 0, pci_segment: 0,
serial: None, serial: None,
@ -591,6 +611,7 @@ pub struct VmConfig {
#[serde(default)] #[serde(default)]
pub memory: MemoryConfig, pub memory: MemoryConfig,
pub payload: Option<PayloadConfig>, pub payload: Option<PayloadConfig>,
pub rate_limit_groups: Option<Vec<RateLimiterGroupConfig>>,
pub disks: Option<Vec<DiskConfig>>, pub disks: Option<Vec<DiskConfig>>,
pub net: Option<Vec<NetConfig>>, pub net: Option<Vec<NetConfig>>,
#[serde(default)] #[serde(default)]