vm-virtio: Implement multiqueue/multithread support for virtio-blk

This commit improves the existing virtio-blk implementation, allowing
for better I/O performance. The cost for the end user is to accept
allocating more vCPUs to the virtual machine, so that multiple I/O
threads can run in parallel.

One thing to notice, the amount of vCPUs must be egal or superior to the
amount of queues dedicated to the virtio-blk device.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-01-24 18:17:25 +01:00
parent 08e47ebd4b
commit f5b53ae4be
2 changed files with 129 additions and 73 deletions

View File

@ -22,25 +22,22 @@ use std::cmp;
use std::convert::TryInto; use std::convert::TryInto;
use std::fs::{File, Metadata}; use std::fs::{File, Metadata};
use std::io::{self, Read, Seek, SeekFrom, Write}; use std::io::{self, Read, Seek, SeekFrom, Write};
use std::ops::DerefMut;
use std::os::linux::fs::MetadataExt; use std::os::linux::fs::MetadataExt;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::path::PathBuf; use std::path::PathBuf;
use std::result; use std::result;
use std::slice; use std::slice;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use virtio_bindings::bindings::virtio_blk::*; use virtio_bindings::bindings::virtio_blk::*;
use vm_device::{Migratable, MigratableError, Pausable, Snapshotable}; use vm_device::{Migratable, MigratableError, Pausable, Snapshotable};
use vm_memory::{Bytes, GuestAddress, GuestMemory, GuestMemoryError, GuestMemoryMmap}; use vm_memory::{ByteValued, Bytes, GuestAddress, GuestMemory, GuestMemoryError, GuestMemoryMmap};
use vmm_sys_util::{eventfd::EventFd, seek_hole::SeekHole, write_zeroes::PunchHole}; use vmm_sys_util::{eventfd::EventFd, seek_hole::SeekHole, write_zeroes::PunchHole};
const CONFIG_SPACE_SIZE: usize = 8;
const SECTOR_SHIFT: u8 = 9; const SECTOR_SHIFT: u8 = 9;
pub const SECTOR_SIZE: u64 = (0x01 as u64) << SECTOR_SHIFT; pub const SECTOR_SIZE: u64 = (0x01 as u64) << SECTOR_SHIFT;
const QUEUE_SIZE: u16 = 256;
const NUM_QUEUES: usize = 1;
const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE];
// New descriptors are pending on the virtio queue. // New descriptors are pending on the virtio queue.
const QUEUE_AVAIL_EVENT: DeviceEventT = 0; const QUEUE_AVAIL_EVENT: DeviceEventT = 0;
@ -592,9 +589,9 @@ impl Request {
} }
struct BlockEpollHandler<T: DiskFile> { struct BlockEpollHandler<T: DiskFile> {
queues: Vec<Queue>, queue: Queue,
mem: Arc<ArcSwap<GuestMemoryMmap>>, mem: Arc<ArcSwap<GuestMemoryMmap>>,
disk_image: T, disk_image: Arc<Mutex<T>>,
disk_nsectors: u64, disk_nsectors: u64,
interrupt_cb: Arc<dyn VirtioInterrupt>, interrupt_cb: Arc<dyn VirtioInterrupt>,
disk_image_id: Vec<u8>, disk_image_id: Vec<u8>,
@ -603,18 +600,20 @@ struct BlockEpollHandler<T: DiskFile> {
} }
impl<T: DiskFile> BlockEpollHandler<T> { impl<T: DiskFile> BlockEpollHandler<T> {
fn process_queue(&mut self, queue_index: usize) -> bool { fn process_queue(&mut self) -> bool {
let queue = &mut self.queues[queue_index]; let queue = &mut self.queue;
let mut used_desc_heads = [(0, 0); QUEUE_SIZE as usize]; let mut used_desc_heads = Vec::new();
let mut used_count = 0; let mut used_count = 0;
let mem = self.mem.load(); let mem = self.mem.load();
for avail_desc in queue.iter(&mem) { for avail_desc in queue.iter(&mem) {
let len; let len;
match Request::parse(&avail_desc, &mem) { match Request::parse(&avail_desc, &mem) {
Ok(request) => { Ok(request) => {
let mut disk_image_locked = self.disk_image.lock().unwrap();
let mut disk_image = disk_image_locked.deref_mut();
let status = match request.execute( let status = match request.execute(
&mut self.disk_image, &mut disk_image,
self.disk_nsectors, self.disk_nsectors,
&mem, &mem,
&self.disk_image_id, &self.disk_image_id,
@ -638,19 +637,19 @@ impl<T: DiskFile> BlockEpollHandler<T> {
len = 0; len = 0;
} }
} }
used_desc_heads[used_count] = (avail_desc.index, len); used_desc_heads.push((avail_desc.index, len));
used_count += 1; used_count += 1;
} }
for &(desc_index, len) in &used_desc_heads[..used_count] { for &(desc_index, len) in used_desc_heads.iter() {
queue.add_used(&mem, desc_index, len); queue.add_used(&mem, desc_index, len);
} }
used_count > 0 used_count > 0
} }
fn signal_used_queue(&self, queue_index: usize) -> result::Result<(), DeviceError> { fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
self.interrupt_cb self.interrupt_cb
.trigger(&VirtioInterruptType::Queue, Some(&self.queues[queue_index])) .trigger(&VirtioInterruptType::Queue, Some(&self.queue))
.map_err(|e| { .map_err(|e| {
error!("Failed to signal used queue: {:?}", e); error!("Failed to signal used queue: {:?}", e);
DeviceError::FailedSignalingUsedQueue(e) DeviceError::FailedSignalingUsedQueue(e)
@ -660,16 +659,15 @@ impl<T: DiskFile> BlockEpollHandler<T> {
#[allow(dead_code)] #[allow(dead_code)]
fn update_disk_image( fn update_disk_image(
&mut self, &mut self,
disk_image: T, mut disk_image: T,
disk_path: &PathBuf, disk_path: &PathBuf,
) -> result::Result<(), DeviceError> { ) -> result::Result<(), DeviceError> {
self.disk_image = disk_image; self.disk_nsectors = disk_image
self.disk_nsectors = self
.disk_image
.seek(SeekFrom::End(0)) .seek(SeekFrom::End(0))
.map_err(DeviceError::IoError)? .map_err(DeviceError::IoError)?
/ SECTOR_SIZE; / SECTOR_SIZE;
self.disk_image_id = build_disk_image_id(disk_path); self.disk_image_id = build_disk_image_id(disk_path);
self.disk_image = Arc::new(Mutex::new(disk_image));
Ok(()) Ok(())
} }
@ -733,8 +731,8 @@ impl<T: DiskFile> BlockEpollHandler<T> {
if let Err(e) = queue_evt.read() { if let Err(e) = queue_evt.read() {
error!("Failed to get queue event: {:?}", e); error!("Failed to get queue event: {:?}", e);
break 'epoll; break 'epoll;
} else if self.process_queue(0) { } else if self.process_queue() {
if let Err(e) = self.signal_used_queue(0) { if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e); error!("Failed to signal used queue: {:?}", e);
break 'epoll; break 'epoll;
} }
@ -764,32 +762,57 @@ impl<T: DiskFile> BlockEpollHandler<T> {
} }
} }
#[derive(Copy, Clone, Debug, Default)]
#[repr(C, packed)]
pub struct VirtioBlockGeometry {
pub cylinders: u16,
pub heads: u8,
pub sectors: u8,
}
unsafe impl ByteValued for VirtioBlockGeometry {}
#[derive(Copy, Clone, Debug, Default)]
#[repr(C, packed)]
pub struct VirtioBlockConfig {
pub capacity: u64,
pub size_max: u32,
pub seg_max: u32,
pub geometry: VirtioBlockGeometry,
pub blk_size: u32,
pub physical_block_exp: u8,
pub alignment_offset: u8,
pub min_io_size: u16,
pub opt_io_size: u32,
pub wce: u8,
unused: u8,
pub num_queues: u16,
pub max_discard_sectors: u32,
pub max_discard_seg: u32,
pub discard_sector_alignment: u32,
pub max_write_zeroes_sectors: u32,
pub max_write_zeroes_seg: u32,
pub write_zeroes_may_unmap: u8,
unused1: [u8; 3],
}
unsafe impl ByteValued for VirtioBlockConfig {}
/// Virtio device for exposing block level read/write operations on a host file. /// Virtio device for exposing block level read/write operations on a host file.
pub struct Block<T: DiskFile> { pub struct Block<T: DiskFile> {
kill_evt: Option<EventFd>, kill_evt: Option<EventFd>,
disk_image: Option<T>, disk_image: Arc<Mutex<T>>,
disk_path: PathBuf, disk_path: PathBuf,
disk_nsectors: u64, disk_nsectors: u64,
avail_features: u64, avail_features: u64,
acked_features: u64, acked_features: u64,
config_space: Vec<u8>, config: VirtioBlockConfig,
queue_evt: Option<EventFd>, queue_evts: Option<Vec<EventFd>>,
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>, interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), DeviceError>>>>, epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), DeviceError>>>>,
pause_evt: Option<EventFd>, pause_evt: Option<EventFd>,
paused: Arc<AtomicBool>, paused: Arc<AtomicBool>,
} queue_size: Vec<u16>,
pub fn build_config_space(disk_size: u64) -> Vec<u8> {
// We only support disk size, which uses the first two words of the configuration space.
// If the image is not a multiple of the sector size, the tail bits are not exposed.
// The config space is little endian.
let mut config = Vec::with_capacity(CONFIG_SPACE_SIZE);
let num_sectors = disk_size >> SECTOR_SHIFT;
for i in 0..8 {
config.push((num_sectors >> (8 * i)) as u8);
}
config
} }
impl<T: DiskFile> Block<T> { impl<T: DiskFile> Block<T> {
@ -801,6 +824,8 @@ impl<T: DiskFile> Block<T> {
disk_path: PathBuf, disk_path: PathBuf,
is_disk_read_only: bool, is_disk_read_only: bool,
iommu: bool, iommu: bool,
num_queues: usize,
queue_size: u16,
) -> io::Result<Block<T>> { ) -> io::Result<Block<T>> {
let disk_size = disk_image.seek(SeekFrom::End(0))? as u64; let disk_size = disk_image.seek(SeekFrom::End(0))? as u64;
if disk_size % SECTOR_SIZE != 0 { if disk_size % SECTOR_SIZE != 0 {
@ -819,21 +844,33 @@ impl<T: DiskFile> Block<T> {
if is_disk_read_only { if is_disk_read_only {
avail_features |= 1u64 << VIRTIO_BLK_F_RO; avail_features |= 1u64 << VIRTIO_BLK_F_RO;
}
let disk_nsectors = disk_size / SECTOR_SIZE;
let mut config = VirtioBlockConfig {
capacity: disk_nsectors,
..Default::default()
}; };
if num_queues > 1 {
avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
config.num_queues = num_queues as u16;
}
Ok(Block { Ok(Block {
kill_evt: None, kill_evt: None,
disk_image: Some(disk_image), disk_image: Arc::new(Mutex::new(disk_image)),
disk_path, disk_path,
disk_nsectors: disk_size / SECTOR_SIZE, disk_nsectors,
avail_features, avail_features,
acked_features: 0u64, acked_features: 0u64,
config_space: build_config_space(disk_size), config,
queue_evt: None, queue_evts: None,
interrupt_cb: None, interrupt_cb: None,
epoll_threads: None, epoll_threads: None,
pause_evt: None, pause_evt: None,
paused: Arc::new(AtomicBool::new(false)), paused: Arc::new(AtomicBool::new(false)),
queue_size: vec![queue_size; num_queues],
}) })
} }
} }
@ -853,7 +890,7 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
} }
fn queue_max_sizes(&self) -> &[u16] { fn queue_max_sizes(&self) -> &[u16] {
QUEUE_SIZES self.queue_size.as_slice()
} }
fn features(&self, page: u32) -> u32 { fn features(&self, page: u32) -> u32 {
@ -891,26 +928,28 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
} }
fn read_config(&self, offset: u64, mut data: &mut [u8]) { fn read_config(&self, offset: u64, mut data: &mut [u8]) {
let config_len = self.config_space.len() as u64; let config_slice = self.config.as_slice();
let config_len = config_slice.len() as u64;
if offset >= config_len { if offset >= config_len {
error!("Failed to read config space"); error!("Failed to read config space");
return; return;
} }
if let Some(end) = offset.checked_add(data.len() as u64) { if let Some(end) = offset.checked_add(data.len() as u64) {
// This write can't fail, offset and end are checked against config_len. // This write can't fail, offset and end are checked against config_len.
data.write_all(&self.config_space[offset as usize..cmp::min(end, config_len) as usize]) data.write_all(&config_slice[offset as usize..cmp::min(end, config_len) as usize])
.unwrap(); .unwrap();
} }
} }
fn write_config(&mut self, offset: u64, data: &[u8]) { fn write_config(&mut self, offset: u64, data: &[u8]) {
let config_slice = self.config.as_mut_slice();
let data_len = data.len() as u64; let data_len = data.len() as u64;
let config_len = self.config_space.len() as u64; let config_len = config_slice.len() as u64;
if offset + data_len > config_len { if offset + data_len > config_len {
error!("Failed to write config space"); error!("Failed to write config space");
return; return;
} }
let (_, right) = self.config_space.split_at_mut(offset as usize); let (_, right) = config_slice.split_at_mut(offset as usize);
right.copy_from_slice(&data[..]); right.copy_from_slice(&data[..]);
} }
@ -918,13 +957,13 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
&mut self, &mut self,
mem: Arc<ArcSwap<GuestMemoryMmap>>, mem: Arc<ArcSwap<GuestMemoryMmap>>,
interrupt_cb: Arc<dyn VirtioInterrupt>, interrupt_cb: Arc<dyn VirtioInterrupt>,
queues: Vec<Queue>, mut queues: Vec<Queue>,
mut queue_evts: Vec<EventFd>, mut queue_evts: Vec<EventFd>,
) -> ActivateResult { ) -> ActivateResult {
if queues.len() != NUM_QUEUES || queue_evts.len() != NUM_QUEUES { if queues.len() != self.queue_size.len() || queue_evts.len() != self.queue_size.len() {
error!( error!(
"Cannot perform activate. Expected {} queue(s), got {}", "Cannot perform activate. Expected {} queue(s), got {}",
NUM_QUEUES, self.queue_size.len(),
queues.len() queues.len()
); );
return Err(ActivateError::BadActivate); return Err(ActivateError::BadActivate);
@ -947,35 +986,45 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
})?; })?;
self.pause_evt = Some(self_pause_evt); self.pause_evt = Some(self_pause_evt);
if let Some(disk_image) = self.disk_image.clone() { let disk_image_id = build_disk_image_id(&self.disk_path);
let disk_image_id = build_disk_image_id(&self.disk_path);
// Save the interrupt EventFD as we need to return it on reset
// but clone it to pass into the thread.
self.interrupt_cb = Some(interrupt_cb);
let interrupt_cb = self.interrupt_cb.as_ref().unwrap().clone();
let mut tmp_queue_evts: Vec<EventFd> = Vec::new();
for queue_evt in queue_evts.iter() {
// Save the queue EventFD as we need to return it on reset // Save the queue EventFD as we need to return it on reset
// but clone it to pass into the thread. // but clone it to pass into the thread.
self.queue_evt = Some(queue_evts.remove(0)); tmp_queue_evts.push(queue_evt.try_clone().map_err(|e| {
let queue_evt = self.queue_evt.as_ref().unwrap().try_clone().map_err(|e| {
error!("failed to clone queue EventFd: {}", e); error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate ActivateError::BadActivate
})?; })?);
}
self.queue_evts = Some(tmp_queue_evts);
let mut tmp_queue_evts: Vec<EventFd> = Vec::new();
for queue_evt in queue_evts.iter() {
// Save the queue EventFD as we need to return it on reset
// but clone it to pass into the thread.
tmp_queue_evts.push(queue_evt.try_clone().map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?);
}
self.queue_evts = Some(tmp_queue_evts);
let mut epoll_threads = Vec::new();
for _ in 0..self.queue_size.len() {
let mut handler = BlockEpollHandler { let mut handler = BlockEpollHandler {
queues, queue: queues.remove(0),
mem, mem: mem.clone(),
disk_image, disk_image: self.disk_image.clone(),
disk_nsectors: self.disk_nsectors, disk_nsectors: self.disk_nsectors,
interrupt_cb, interrupt_cb: interrupt_cb.clone(),
disk_image_id, disk_image_id: disk_image_id.clone(),
kill_evt, kill_evt: kill_evt.try_clone().unwrap(),
pause_evt, pause_evt: pause_evt.try_clone().unwrap(),
}; };
let queue_evt = queue_evts.remove(0);
let paused = self.paused.clone(); let paused = self.paused.clone();
let mut epoll_threads = Vec::new();
thread::Builder::new() thread::Builder::new()
.name("virtio_blk".to_string()) .name("virtio_blk".to_string())
.spawn(move || handler.run(queue_evt, paused)) .spawn(move || handler.run(queue_evt, paused))
@ -984,12 +1033,15 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
error!("failed to clone the virtio-blk epoll thread: {}", e); error!("failed to clone the virtio-blk epoll thread: {}", e);
ActivateError::BadActivate ActivateError::BadActivate
})?; })?;
self.epoll_threads = Some(epoll_threads);
return Ok(());
} }
Err(ActivateError::BadActivate)
// Save the interrupt EventFD as we need to return it on reset
// but clone it to pass into the thread.
self.interrupt_cb = Some(interrupt_cb);
self.epoll_threads = Some(epoll_threads);
Ok(())
} }
fn reset(&mut self) -> Option<(Arc<dyn VirtioInterrupt>, Vec<EventFd>)> { fn reset(&mut self) -> Option<(Arc<dyn VirtioInterrupt>, Vec<EventFd>)> {
@ -1006,7 +1058,7 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
// Return the interrupt and queue EventFDs // Return the interrupt and queue EventFDs
Some(( Some((
self.interrupt_cb.take().unwrap(), self.interrupt_cb.take().unwrap(),
vec![self.queue_evt.take().unwrap()], self.queue_evts.take().unwrap(),
)) ))
} }
} }

View File

@ -991,6 +991,8 @@ impl DeviceManager {
disk_cfg.path.clone(), disk_cfg.path.clone(),
disk_cfg.readonly, disk_cfg.readonly,
disk_cfg.iommu, disk_cfg.iommu,
disk_cfg.num_queues,
disk_cfg.queue_size,
) )
.map_err(DeviceManagerError::CreateVirtioBlock)?; .map_err(DeviceManagerError::CreateVirtioBlock)?;
@ -1010,6 +1012,8 @@ impl DeviceManager {
disk_cfg.path.clone(), disk_cfg.path.clone(),
disk_cfg.readonly, disk_cfg.readonly,
disk_cfg.iommu, disk_cfg.iommu,
disk_cfg.num_queues,
disk_cfg.queue_size,
) )
.map_err(DeviceManagerError::CreateVirtioBlock)?; .map_err(DeviceManagerError::CreateVirtioBlock)?;