cloud-hypervisor/virtio-devices/src/block.rs
Rob Bradford dab1cab4a7 virtio-devices: Simplify device state to support Versionize
In order to support using Versionize for state structures it is necessary
to use simpler, primitive, data types in the state definitions used for
snapshot restore.

Signed-off-by: Rob Bradford <robert.bradford@intel.com>
2021-04-23 14:24:16 +01:00

687 lines
24 KiB
Rust

// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE-BSD-3-Clause file.
//
// Copyright © 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
use super::Error as DeviceError;
use super::{
ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue,
RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
EPOLL_HELPER_EVENT_LAST,
};
use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::VirtioInterrupt;
use block_util::{
async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_disk_image_id, Request,
RequestType, VirtioBlockConfig,
};
use rate_limiter::{RateLimiter, TokenType};
use seccomp::{SeccompAction, SeccompFilter};
use std::io;
use std::num::Wrapping;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::{collections::HashMap, convert::TryInto};
use virtio_bindings::bindings::virtio_blk::*;
use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
use vmm_sys_util::eventfd::EventFd;
const SECTOR_SHIFT: u8 = 9;
pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
// New descriptors are pending on the virtio queue.
const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
// New completed tasks are pending on the completion ring.
const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
// New 'wake up' event from the rate limiter
const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
#[derive(Debug)]
pub enum Error {
/// Failed to parse the request.
RequestParsing(block_util::Error),
/// Failed to execute the request.
RequestExecuting(block_util::ExecuteError),
/// Missing the expected entry in the list of requests.
MissingEntryRequestList,
/// The asynchronous request returned with failure.
AsyncRequestFailure,
/// Failed synchronizing the file
Fsync(AsyncIoError),
}
pub type Result<T> = result::Result<T, Error>;
#[derive(Default, Clone)]
pub struct BlockCounters {
read_bytes: Arc<AtomicU64>,
read_ops: Arc<AtomicU64>,
write_bytes: Arc<AtomicU64>,
write_ops: Arc<AtomicU64>,
}
struct BlockEpollHandler {
queue: Queue,
mem: GuestMemoryAtomic<GuestMemoryMmap>,
disk_image: Box<dyn AsyncIo>,
disk_nsectors: u64,
interrupt_cb: Arc<dyn VirtioInterrupt>,
disk_image_id: Vec<u8>,
kill_evt: EventFd,
pause_evt: EventFd,
writeback: Arc<AtomicBool>,
counters: BlockCounters,
queue_evt: EventFd,
request_list: HashMap<u16, Request>,
rate_limiter: Option<RateLimiter>,
}
impl BlockEpollHandler {
fn process_queue_submit(&mut self) -> Result<bool> {
let queue = &mut self.queue;
let mem = self.mem.memory();
let mut used_desc_heads = Vec::new();
let mut used_count = 0;
for avail_desc in queue.iter(&mem) {
let mut request = Request::parse(&avail_desc, &mem).map_err(Error::RequestParsing)?;
if let Some(rate_limiter) = &mut self.rate_limiter {
// If limiter.consume() fails it means there is no more TokenType::Ops
// budget and rate limiting is in effect.
if !rate_limiter.consume(1, TokenType::Ops) {
// Stop processing the queue and return this descriptor chain to the
// avail ring, for later processing.
queue.go_to_previous_position();
break;
}
// Exercise the rate limiter only if this request is of data transfer type.
if request.request_type == RequestType::In
|| request.request_type == RequestType::Out
{
let mut bytes = Wrapping(0);
for (_, data_len) in &request.data_descriptors {
bytes += Wrapping(*data_len as u64);
}
// If limiter.consume() fails it means there is no more TokenType::Bytes
// budget and rate limiting is in effect.
if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
// Revert the OPS consume().
rate_limiter.manual_replenish(1, TokenType::Ops);
// Stop processing the queue and return this descriptor chain to the
// avail ring, for later processing.
queue.go_to_previous_position();
break;
}
};
}
request.set_writeback(self.writeback.load(Ordering::Acquire));
if request
.execute_async(
&mem,
self.disk_nsectors,
self.disk_image.as_mut(),
&self.disk_image_id,
avail_desc.index as u64,
)
.map_err(Error::RequestExecuting)?
{
self.request_list.insert(avail_desc.index, request);
} else {
// We use unwrap because the request parsing process already
// checked that the status_addr was valid.
mem.write_obj(VIRTIO_BLK_S_OK, request.status_addr).unwrap();
// If no asynchronous operation has been submitted, we can
// simply return the used descriptor.
used_desc_heads.push((avail_desc.index, 0));
used_count += 1;
}
}
for &(desc_index, len) in used_desc_heads.iter() {
queue.add_used(&mem, desc_index, len);
}
Ok(used_count > 0)
}
fn process_queue_complete(&mut self) -> Result<bool> {
let queue = &mut self.queue;
let mut used_desc_heads = Vec::new();
let mut used_count = 0;
let mem = self.mem.memory();
let mut read_bytes = Wrapping(0);
let mut write_bytes = Wrapping(0);
let mut read_ops = Wrapping(0);
let mut write_ops = Wrapping(0);
let completion_list = self.disk_image.complete();
for (user_data, result) in completion_list {
let desc_index = user_data as u16;
let request = self
.request_list
.remove(&desc_index)
.ok_or(Error::MissingEntryRequestList)?;
let (status, len) = if result >= 0 {
match request.request_type {
RequestType::In => {
for (_, data_len) in &request.data_descriptors {
read_bytes += Wrapping(*data_len as u64);
}
read_ops += Wrapping(1);
}
RequestType::Out => {
if !request.writeback {
self.disk_image.fsync(None).map_err(Error::Fsync)?;
}
for (_, data_len) in &request.data_descriptors {
write_bytes += Wrapping(*data_len as u64);
}
write_ops += Wrapping(1);
}
_ => {}
}
(VIRTIO_BLK_S_OK, result as u32)
} else {
error!(
"Request failed: {:?}",
io::Error::from_raw_os_error(-result)
);
return Err(Error::AsyncRequestFailure);
};
// We use unwrap because the request parsing process already
// checked that the status_addr was valid.
mem.write_obj(status, request.status_addr).unwrap();
used_desc_heads.push((desc_index as u16, len));
used_count += 1;
}
for &(desc_index, len) in used_desc_heads.iter() {
queue.add_used(&mem, desc_index, len);
}
self.counters
.write_bytes
.fetch_add(write_bytes.0, Ordering::AcqRel);
self.counters
.write_ops
.fetch_add(write_ops.0, Ordering::AcqRel);
self.counters
.read_bytes
.fetch_add(read_bytes.0, Ordering::AcqRel);
self.counters
.read_ops
.fetch_add(read_ops.0, Ordering::AcqRel);
Ok(used_count > 0)
}
fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
self.interrupt_cb
.trigger(&VirtioInterruptType::Queue, Some(&self.queue))
.map_err(|e| {
error!("Failed to signal used queue: {:?}", e);
DeviceError::FailedSignalingUsedQueue(e)
})
}
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
if let Some(rate_limiter) = &self.rate_limiter {
helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
}
helper.run(paused, paused_sync, self)?;
Ok(())
}
}
impl EpollHelperHandler for BlockEpollHandler {
fn handle_event(&mut self, _helper: &mut EpollHelper, event: &epoll::Event) -> bool {
let ev_type = event.data as u16;
match ev_type {
QUEUE_AVAIL_EVENT => {
if let Err(e) = self.queue_evt.read() {
error!("Failed to get queue event: {:?}", e);
return true;
}
let rate_limit_reached =
self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
// Process the queue only when the rate limit is not reached
if !rate_limit_reached {
match self.process_queue_submit() {
Ok(needs_notification) => {
if needs_notification {
if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e);
return true;
}
}
}
Err(e) => {
error!("Failed to process queue (submit): {:?}", e);
return true;
}
}
}
}
COMPLETION_EVENT => {
if let Err(e) = self.disk_image.notifier().read() {
error!("Failed to get queue event: {:?}", e);
return true;
}
match self.process_queue_complete() {
Ok(needs_notification) => {
if needs_notification {
if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e);
return true;
}
}
}
Err(e) => {
error!("Failed to process queue (complete): {:?}", e);
return true;
}
}
}
RATE_LIMITER_EVENT => {
if let Some(rate_limiter) = &mut self.rate_limiter {
// Upon rate limiter event, call the rate limiter handler
// and restart processing the queue.
if rate_limiter.event_handler().is_ok() {
match self.process_queue_submit() {
Ok(needs_notification) => {
if needs_notification {
if let Err(e) = self.signal_used_queue() {
error!("Failed to signal used queue: {:?}", e);
return true;
}
}
}
Err(e) => {
error!("Failed to process queue (submit): {:?}", e);
return true;
}
}
}
} else {
error!("Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled.");
return true;
}
}
_ => {
error!("Unexpected event: {}", ev_type);
return true;
}
}
false
}
}
/// Virtio device for exposing block level read/write operations on a host file.
pub struct Block {
common: VirtioCommon,
id: String,
disk_image: Box<dyn DiskFile>,
disk_path: PathBuf,
disk_nsectors: u64,
config: VirtioBlockConfig,
writeback: Arc<AtomicBool>,
counters: BlockCounters,
seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>,
}
#[derive(Serialize, Deserialize)]
pub struct BlockState {
pub disk_path: String,
pub disk_nsectors: u64,
pub avail_features: u64,
pub acked_features: u64,
pub config: VirtioBlockConfig,
}
impl Block {
/// Create a new virtio block device that operates on the given file.
#[allow(clippy::too_many_arguments)]
pub fn new(
id: String,
mut disk_image: Box<dyn DiskFile>,
disk_path: PathBuf,
is_disk_read_only: bool,
iommu: bool,
num_queues: usize,
queue_size: u16,
seccomp_action: SeccompAction,
rate_limiter_config: Option<RateLimiterConfig>,
) -> io::Result<Self> {
let disk_size = disk_image.size().map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed getting disk size: {}", e),
)
})?;
if disk_size % SECTOR_SIZE != 0 {
warn!(
"Disk size {} is not a multiple of sector size {}; \
the remainder will not be visible to the guest.",
disk_size, SECTOR_SIZE
);
}
let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
| (1u64 << VIRTIO_BLK_F_FLUSH)
| (1u64 << VIRTIO_BLK_F_CONFIG_WCE);
if iommu {
avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
}
if is_disk_read_only {
avail_features |= 1u64 << VIRTIO_BLK_F_RO;
}
let disk_nsectors = disk_size / SECTOR_SIZE;
let mut config = VirtioBlockConfig {
capacity: disk_nsectors,
writeback: 1,
..Default::default()
};
if num_queues > 1 {
avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
config.num_queues = num_queues as u16;
}
Ok(Block {
common: VirtioCommon {
device_type: VirtioDeviceType::Block as u32,
avail_features,
paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
queue_sizes: vec![queue_size; num_queues],
min_queues: 1,
..Default::default()
},
id,
disk_image,
disk_path,
disk_nsectors,
config,
writeback: Arc::new(AtomicBool::new(true)),
counters: BlockCounters::default(),
seccomp_action,
rate_limiter_config,
})
}
fn state(&self) -> BlockState {
BlockState {
disk_path: self.disk_path.to_str().unwrap().to_owned(),
disk_nsectors: self.disk_nsectors,
avail_features: self.common.avail_features,
acked_features: self.common.acked_features,
config: self.config,
}
}
fn set_state(&mut self, state: &BlockState) {
self.disk_path = state.disk_path.clone().into();
self.disk_nsectors = state.disk_nsectors;
self.common.avail_features = state.avail_features;
self.common.acked_features = state.acked_features;
self.config = state.config;
}
fn update_writeback(&mut self) {
// Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
self.config.writeback == 1
} else {
// Else check if VIRTIO_BLK_F_FLUSH negotiated
self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
};
info!(
"Changing cache mode to {}",
if writeback {
"writeback"
} else {
"writethrough"
}
);
self.writeback.store(writeback, Ordering::Release);
}
}
impl Drop for Block {
fn drop(&mut self) {
if let Some(kill_evt) = self.common.kill_evt.take() {
// Ignore the result because there is nothing we can do about it.
let _ = kill_evt.write(1);
}
}
}
impl VirtioDevice for Block {
fn device_type(&self) -> u32 {
self.common.device_type
}
fn queue_max_sizes(&self) -> &[u16] {
&self.common.queue_sizes
}
fn features(&self) -> u64 {
self.common.avail_features
}
fn ack_features(&mut self, value: u64) {
self.common.ack_features(value)
}
fn read_config(&self, offset: u64, data: &mut [u8]) {
self.read_config_from_slice(self.config.as_slice(), offset, data);
}
fn write_config(&mut self, offset: u64, data: &[u8]) {
// The "writeback" field is the only mutable field
let writeback_offset =
(&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
{
error!(
"Attempt to write to read-only field: offset {:x} length {}",
offset,
data.len()
);
return;
}
self.config.writeback = data[0];
self.update_writeback();
}
fn activate(
&mut self,
mem: GuestMemoryAtomic<GuestMemoryMmap>,
interrupt_cb: Arc<dyn VirtioInterrupt>,
mut queues: Vec<Queue>,
mut queue_evts: Vec<EventFd>,
) -> ActivateResult {
self.common.activate(&queues, &queue_evts, &interrupt_cb)?;
let disk_image_id = build_disk_image_id(&self.disk_path);
self.update_writeback();
let mut epoll_threads = Vec::new();
for i in 0..queues.len() {
let queue_evt = queue_evts.remove(0);
let queue = queues.remove(0);
let queue_size = queue.size;
let kill_evt = self
.common
.kill_evt
.as_ref()
.unwrap()
.try_clone()
.map_err(|e| {
error!("failed to clone kill_evt eventfd: {}", e);
ActivateError::BadActivate
})?;
let pause_evt = self
.common
.pause_evt
.as_ref()
.unwrap()
.try_clone()
.map_err(|e| {
error!("failed to clone pause_evt eventfd: {}", e);
ActivateError::BadActivate
})?;
let rate_limiter: Option<RateLimiter> = self
.rate_limiter_config
.map(RateLimiterConfig::try_into)
.transpose()
.map_err(ActivateError::CreateRateLimiter)?;
let mut handler = BlockEpollHandler {
queue,
mem: mem.clone(),
disk_image: self
.disk_image
.new_async_io(queue_size as u32)
.map_err(|e| {
error!("failed to create new AsyncIo: {}", e);
ActivateError::BadActivate
})?,
disk_nsectors: self.disk_nsectors,
interrupt_cb: interrupt_cb.clone(),
disk_image_id: disk_image_id.clone(),
kill_evt,
pause_evt,
writeback: self.writeback.clone(),
counters: self.counters.clone(),
queue_evt,
request_list: HashMap::with_capacity(queue_size.into()),
rate_limiter,
};
let paused = self.common.paused.clone();
let paused_sync = self.common.paused_sync.clone();
// Retrieve seccomp filter for virtio_block thread
let virtio_block_seccomp_filter =
get_seccomp_filter(&self.seccomp_action, Thread::VirtioBlock)
.map_err(ActivateError::CreateSeccompFilter)?;
thread::Builder::new()
.name(format!("{}_q{}", self.id.clone(), i))
.spawn(move || {
if let Err(e) = SeccompFilter::apply(virtio_block_seccomp_filter) {
error!("Error applying seccomp filter: {:?}", e);
} else if let Err(e) = handler.run(paused, paused_sync.unwrap()) {
error!("Error running worker: {:?}", e);
}
})
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone the virtio-block epoll thread: {}", e);
ActivateError::BadActivate
})?;
}
self.common.epoll_threads = Some(epoll_threads);
event!("virtio-device", "activated", "id", &self.id);
Ok(())
}
fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
let result = self.common.reset();
event!("virtio-device", "reset", "id", &self.id);
result
}
fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
let mut counters = HashMap::new();
counters.insert(
"read_bytes",
Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
);
counters.insert(
"write_bytes",
Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
);
counters.insert(
"read_ops",
Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
);
counters.insert(
"write_ops",
Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
);
Some(counters)
}
}
impl Pausable for Block {
fn pause(&mut self) -> result::Result<(), MigratableError> {
self.common.pause()
}
fn resume(&mut self) -> result::Result<(), MigratableError> {
self.common.resume()
}
}
impl Snapshottable for Block {
fn id(&self) -> String {
self.id.clone()
}
fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
Snapshot::new_from_state(&self.id(), &self.state())
}
fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
self.set_state(&snapshot.to_state(&self.id)?);
Ok(())
}
}
impl Transportable for Block {}
impl Migratable for Block {}