diff --git a/fuzz/fuzz_targets/block.rs b/fuzz/fuzz_targets/block.rs index 0514a2c00..8714a8a40 100644 --- a/fuzz/fuzz_targets/block.rs +++ b/fuzz/fuzz_targets/block.rs @@ -21,6 +21,7 @@ use virtio_devices::{Block, VirtioDevice, VirtioInterrupt, VirtioInterruptType}; use virtio_queue::{Queue, QueueT}; use vm_memory::{bitmap::AtomicBitmap, Bytes, GuestAddress, GuestMemoryAtomic}; use vmm_sys_util::eventfd::{EventFd, EFD_NONBLOCK}; +use std::collections::BTreeMap; type GuestMemoryMmap = vm_memory::GuestMemoryMmap; @@ -49,6 +50,7 @@ fuzz_target!(|bytes| { let shm = memfd_create(&ffi::CString::new("fuzz").unwrap(), 0).unwrap(); let disk_file: File = unsafe { File::from_raw_fd(shm) }; let qcow_disk = Box::new(RawFileDiskSync::new(disk_file)) as Box; + let queue_affinity = BTreeMap::new(); let mut block = Block::new( "tmp".to_owned(), qcow_disk, @@ -62,6 +64,7 @@ fuzz_target!(|bytes| { None, EventFd::new(EFD_NONBLOCK).unwrap(), None, + queue_affinity, ) .unwrap(); diff --git a/tests/integration.rs b/tests/integration.rs index d59ba5323..cea494055 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -2575,7 +2575,63 @@ mod common_parallel { let _ = child.kill(); let output = child.wait_with_output().unwrap(); + handle_child_output(r, &output); + } + #[test] + fn test_virtio_queue_affinity() { + let focal = UbuntuDiskConfig::new(FOCAL_IMAGE_NAME.to_string()); + let guest = Guest::new(Box::new(focal)); + + // We need the host to have at least 4 CPUs if we want to be able + // to run this test. + let host_cpus_count = exec_host_command_output("nproc"); + assert!( + String::from_utf8_lossy(&host_cpus_count.stdout) + .trim() + .parse::() + .unwrap_or(0) + >= 4 + ); + + let mut child = GuestCommand::new(&guest) + .args(["--cpus", "boot=4"]) + .args(["--memory", "size=512M"]) + .args(["--kernel", direct_kernel_boot_path().to_str().unwrap()]) + .args(["--cmdline", DIRECT_KERNEL_BOOT_CMDLINE]) + .args([ + "--disk", + format!( + "path={}", + guest.disk_config.disk(DiskType::OperatingSystem).unwrap() + ) + .as_str(), + format!( + "path={},num_queues=4,queue_affinity=[0@[0,2],1@[1,3],2@[1],3@[3]]", + guest.disk_config.disk(DiskType::CloudInit).unwrap() + ) + .as_str(), + ]) + .default_net() + .capture_output() + .spawn() + .unwrap(); + + let r = std::panic::catch_unwind(|| { + guest.wait_vm_boot(None).unwrap(); + let pid = child.id(); + let taskset_q0 = exec_host_command_output(format!("taskset -pc $(ps -T -p {pid} | grep disk1_q0 | xargs | cut -f 2 -d \" \") | cut -f 6 -d \" \"").as_str()); + assert_eq!(String::from_utf8_lossy(&taskset_q0.stdout).trim(), "0,2"); + let taskset_q1 = exec_host_command_output(format!("taskset -pc $(ps -T -p {pid} | grep disk1_q1 | xargs | cut -f 2 -d \" \") | cut -f 6 -d \" \"").as_str()); + assert_eq!(String::from_utf8_lossy(&taskset_q1.stdout).trim(), "1,3"); + let taskset_q2 = exec_host_command_output(format!("taskset -pc $(ps -T -p {pid} | grep disk1_q2 | xargs | cut -f 2 -d \" \") | cut -f 6 -d \" \"").as_str()); + assert_eq!(String::from_utf8_lossy(&taskset_q2.stdout).trim(), "1"); + let taskset_q3 = exec_host_command_output(format!("taskset -pc $(ps -T -p {pid} | grep disk1_q3 | xargs | cut -f 2 -d \" \") | cut -f 6 -d \" \"").as_str()); + assert_eq!(String::from_utf8_lossy(&taskset_q3.stdout).trim(), "3"); + }); + + let _ = child.kill(); + let output = child.wait_with_output().unwrap(); handle_child_output(r, &output); } diff --git a/virtio-devices/src/block.rs b/virtio-devices/src/block.rs index 9d4b6636d..816e0f294 100644 --- a/virtio-devices/src/block.rs +++ b/virtio-devices/src/block.rs @@ -25,6 +25,7 @@ use block::{ use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle}; use rate_limiter::TokenType; use seccompiler::SeccompAction; +use std::collections::BTreeMap; use std::collections::VecDeque; use std::io; use std::num::Wrapping; @@ -134,6 +135,7 @@ struct BlockEpollHandler { rate_limiter: Option, access_platform: Option>, read_only: bool, + host_cpus: Option>, } impl BlockEpollHandler { @@ -408,6 +410,41 @@ impl BlockEpollHandler { }) } + fn set_queue_thread_affinity(&self) { + // Prepare the CPU set the current queue thread is expected to run onto. + let cpuset = self.host_cpus.as_ref().map(|host_cpus| { + // SAFETY: all zeros is a valid pattern + let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() }; + // SAFETY: FFI call, trivially safe + unsafe { libc::CPU_ZERO(&mut cpuset) }; + for host_cpu in host_cpus { + // SAFETY: FFI call, trivially safe + unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) }; + } + cpuset + }); + + // Schedule the thread to run on the expected CPU set + if let Some(cpuset) = cpuset.as_ref() { + // SAFETY: FFI call with correct arguments + let ret = unsafe { + libc::sched_setaffinity( + 0, + std::mem::size_of::(), + cpuset as *const libc::cpu_set_t, + ) + }; + + if ret != 0 { + error!( + "Failed scheduling the virtqueue thread {} on the expected CPU set: {}", + self.queue_index, + io::Error::last_os_error() + ) + } + } + } + fn run( &mut self, paused: Arc, @@ -419,6 +456,7 @@ impl BlockEpollHandler { if let Some(rate_limiter) = &self.rate_limiter { helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; } + self.set_queue_thread_affinity(); helper.run(paused, paused_sync, self)?; Ok(()) @@ -511,6 +549,7 @@ pub struct Block { exit_evt: EventFd, read_only: bool, serial: Vec, + queue_affinity: BTreeMap>, } #[derive(Versionize)] @@ -540,6 +579,7 @@ impl Block { rate_limiter: Option>, exit_evt: EventFd, state: Option, + queue_affinity: BTreeMap>, ) -> io::Result { let (disk_nsectors, avail_features, acked_features, config, paused) = if let Some(state) = state { @@ -643,6 +683,7 @@ impl Block { exit_evt, read_only, serial, + queue_affinity, }) } @@ -746,9 +787,10 @@ impl VirtioDevice for Block { let (_, queue, queue_evt) = queues.remove(0); let queue_size = queue.size(); let (kill_evt, pause_evt) = self.common.dup_eventfds(); + let queue_idx = i as u16; let mut handler = BlockEpollHandler { - queue_index: i as u16, + queue_index: queue_idx, queue, mem: mem.clone(), disk_image: self @@ -778,6 +820,7 @@ impl VirtioDevice for Block { .unwrap(), access_platform: self.common.access_platform.clone(), read_only: self.read_only, + host_cpus: self.queue_affinity.get(&queue_idx).cloned(), }; let paused = self.common.paused.clone(); diff --git a/virtio-devices/src/seccomp_filters.rs b/virtio-devices/src/seccomp_filters.rs index 41585e778..b9e7e5f17 100644 --- a/virtio-devices/src/seccomp_filters.rs +++ b/virtio-devices/src/seccomp_filters.rs @@ -96,6 +96,7 @@ fn virtio_block_thread_rules() -> Vec<(i64, Vec)> { (libc::SYS_pwritev, vec![]), (libc::SYS_pwrite64, vec![]), (libc::SYS_sched_getaffinity, vec![]), + (libc::SYS_sched_setaffinity, vec![]), (libc::SYS_set_robust_list, vec![]), (libc::SYS_timerfd_settime, vec![]), ] diff --git a/vmm/src/api/openapi/cloud-hypervisor.yaml b/vmm/src/api/openapi/cloud-hypervisor.yaml index 59db48a57..e82697713 100644 --- a/vmm/src/api/openapi/cloud-hypervisor.yaml +++ b/vmm/src/api/openapi/cloud-hypervisor.yaml @@ -828,6 +828,19 @@ components: type: string rate_limiter_config: $ref: "#/components/schemas/RateLimiterConfig" + + VirtQueueAffinity: + required: + - queue_index + - host_cpus + type: object + properties: + queue_index: + type: integer + host_cpus: + type: array + items: + type: integer DiskConfig: required: @@ -867,6 +880,10 @@ components: type: string rate_limit_group: type: string + affinity: + type: array + items: + $ref: "#/components/schemas/VirtQueueAffinity" NetConfig: type: object diff --git a/vmm/src/config.rs b/vmm/src/config.rs index c3c6b4c33..4b4e618ae 100644 --- a/vmm/src/config.rs +++ b/vmm/src/config.rs @@ -1001,7 +1001,8 @@ impl DiskConfig { vhost_user=on|off,socket=,\ bw_size=,bw_one_time_burst=,bw_refill_time=,\ ops_size=,ops_one_time_burst=,ops_refill_time=,\ - id=,pci_segment=,rate_limit_group=\""; + id=,pci_segment=,rate_limit_group=,\ + queue_affinity="; pub fn parse(disk: &str) -> Result { let mut parser = OptionParser::new(); @@ -1025,7 +1026,8 @@ impl DiskConfig { .add("_disable_aio") .add("pci_segment") .add("serial") - .add("rate_limit_group"); + .add("rate_limit_group") + .add("queue_affinity"); parser.parse(disk).map_err(Error::ParseDisk)?; let path = parser.get("path").map(PathBuf::from); @@ -1099,6 +1101,17 @@ impl DiskConfig { .map_err(Error::ParseDisk)? .unwrap_or_default(); let serial = parser.get("serial"); + let queue_affinity = parser + .convert::>>("queue_affinity") + .map_err(Error::ParseDisk)? + .map(|v| { + v.0.iter() + .map(|(e1, e2)| VirtQueueAffinity { + queue_index: *e1, + host_cpus: e2.clone(), + }) + .collect() + }); let bw_tb_config = if bw_size != 0 && bw_refill_time != 0 { Some(TokenBucketConfig { size: bw_size, @@ -1142,6 +1155,7 @@ impl DiskConfig { disable_aio, pci_segment, serial, + queue_affinity, }) } @@ -2922,6 +2936,7 @@ mod tests { rate_limiter_config: None, pci_segment: 0, serial: None, + queue_affinity: None, } } @@ -2992,6 +3007,30 @@ mod tests { ..disk_fixture() } ); + assert_eq!( + DiskConfig::parse("path=/path/to_file,queue_affinity=[0@[1],1@[2],2@[3,4],3@[5-8]]")?, + DiskConfig { + queue_affinity: Some(vec![ + VirtQueueAffinity { + queue_index: 0, + host_cpus: vec![1], + }, + VirtQueueAffinity { + queue_index: 1, + host_cpus: vec![2], + }, + VirtQueueAffinity { + queue_index: 2, + host_cpus: vec![3, 4], + }, + VirtQueueAffinity { + queue_index: 3, + host_cpus: vec![5, 6, 7, 8], + } + ]), + ..disk_fixture() + } + ); Ok(()) } diff --git a/vmm/src/device_manager.rs b/vmm/src/device_manager.rs index 70a06217e..6c8e5bc94 100644 --- a/vmm/src/device_manager.rs +++ b/vmm/src/device_manager.rs @@ -64,7 +64,7 @@ use pci::{ use rate_limiter::group::RateLimiterGroup; use seccompiler::SeccompAction; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fs::{read_link, File, OpenOptions}; use std::io::{self, stdout, Seek, SeekFrom}; use std::mem::zeroed; @@ -2512,6 +2512,15 @@ impl DeviceManager { None }; + let queue_affinity = if let Some(queue_affinity) = disk_cfg.queue_affinity.as_ref() { + queue_affinity + .iter() + .map(|a| (a.queue_index, a.host_cpus.clone())) + .collect() + } else { + BTreeMap::new() + }; + let virtio_block = Arc::new(Mutex::new( virtio_devices::Block::new( id.clone(), @@ -2535,6 +2544,7 @@ impl DeviceManager { .map(|s| s.to_versioned_state()) .transpose() .map_err(DeviceManagerError::RestoreGetState)?, + queue_affinity, ) .map_err(DeviceManagerError::CreateVirtioBlock)?, )); diff --git a/vmm/src/vm_config.rs b/vmm/src/vm_config.rs index 6e5ec9ad7..51e7847e9 100644 --- a/vmm/src/vm_config.rs +++ b/vmm/src/vm_config.rs @@ -187,6 +187,12 @@ pub struct RateLimiterGroupConfig { pub rate_limiter_config: RateLimiterConfig, } +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct VirtQueueAffinity { + pub queue_index: u16, + pub host_cpus: Vec, +} + #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct DiskConfig { pub path: Option, @@ -219,6 +225,8 @@ pub struct DiskConfig { pub pci_segment: u16, #[serde(default)] pub serial: Option, + #[serde(default)] + pub queue_affinity: Option>, } pub const DEFAULT_DISK_NUM_QUEUES: usize = 1;