mirror of
https://github.com/cloud-hypervisor/cloud-hypervisor.git
synced 2025-02-22 03:12:27 +00:00
virtio-devices: vhost_user: Factorize slave request handler
Since the slave request handler is common to all vhost-user devices, the same way the reconnection is, it makes sense to handle the requests from the backend through the same thread. The reconnection thread now handles both a reconnection as well as any request coming from the backend. Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
parent
deca570544
commit
acec7e34fc
@ -313,6 +313,7 @@ fn virtio_vhost_fs_thread_rules() -> Vec<SyscallRuleSet> {
|
||||
vec![
|
||||
allow_syscall(libc::SYS_brk),
|
||||
allow_syscall(libc::SYS_close),
|
||||
allow_syscall(libc::SYS_connect),
|
||||
allow_syscall(libc::SYS_dup),
|
||||
allow_syscall(libc::SYS_epoll_create1),
|
||||
allow_syscall(libc::SYS_epoll_ctl),
|
||||
@ -324,11 +325,14 @@ fn virtio_vhost_fs_thread_rules() -> Vec<SyscallRuleSet> {
|
||||
allow_syscall(libc::SYS_madvise),
|
||||
allow_syscall(libc::SYS_mmap),
|
||||
allow_syscall(libc::SYS_munmap),
|
||||
allow_syscall(libc::SYS_nanosleep),
|
||||
allow_syscall(libc::SYS_read),
|
||||
allow_syscall(libc::SYS_recvmsg),
|
||||
allow_syscall(libc::SYS_rt_sigprocmask),
|
||||
allow_syscall(libc::SYS_sendmsg),
|
||||
allow_syscall(libc::SYS_sendto),
|
||||
allow_syscall(libc::SYS_sigaltstack),
|
||||
allow_syscall(libc::SYS_socket),
|
||||
allow_syscall(libc::SYS_write),
|
||||
]
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ use super::vu_common_ctrl::{
|
||||
setup_vhost_user, update_mem_table, VhostUserConfig,
|
||||
};
|
||||
use super::{Error, Result, DEFAULT_VIRTIO_FEATURES};
|
||||
use crate::vhost_user::ReconnectEpollHandler;
|
||||
use crate::vhost_user::VhostUserEpollHandler;
|
||||
use crate::VirtioInterrupt;
|
||||
use crate::{GuestMemoryMmap, GuestRegionMmap};
|
||||
use block_util::VirtioBlockConfig;
|
||||
@ -235,7 +235,7 @@ impl VirtioDevice for Blk {
|
||||
// the backend.
|
||||
let (kill_evt, pause_evt) = self.common.dup_eventfds();
|
||||
|
||||
let mut reconnect_handler = ReconnectEpollHandler {
|
||||
let mut reconnect_handler: VhostUserEpollHandler<SlaveReqHandler> = VhostUserEpollHandler {
|
||||
vu: self.vhost_user_blk.clone(),
|
||||
mem,
|
||||
kill_evt,
|
||||
@ -247,13 +247,14 @@ impl VirtioDevice for Blk {
|
||||
acked_protocol_features: self.acked_protocol_features,
|
||||
socket_path: self.socket_path.clone(),
|
||||
server: false,
|
||||
slave_req_handler: None,
|
||||
};
|
||||
|
||||
let paused = self.common.paused.clone();
|
||||
let paused_sync = self.common.paused_sync.clone();
|
||||
|
||||
thread::Builder::new()
|
||||
.name(format!("{}_reconnect", self.id))
|
||||
.name(self.id.to_string())
|
||||
.spawn(move || {
|
||||
if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) {
|
||||
error!("Error running reconnection worker: {:?}", e);
|
||||
|
@ -7,8 +7,7 @@ use super::vu_common_ctrl::{
|
||||
};
|
||||
use super::{Error, Result, DEFAULT_VIRTIO_FEATURES};
|
||||
use crate::seccomp_filters::{get_seccomp_filter, Thread};
|
||||
use crate::vhost_user::handler::{VhostUserEpollConfig, VhostUserEpollHandler};
|
||||
use crate::vhost_user::ReconnectEpollHandler;
|
||||
use crate::vhost_user::VhostUserEpollHandler;
|
||||
use crate::{
|
||||
ActivateError, ActivateResult, Queue, UserspaceMapping, VirtioCommon, VirtioDevice,
|
||||
VirtioDeviceType, VirtioInterrupt, VirtioSharedMemoryList,
|
||||
@ -358,7 +357,7 @@ impl Fs {
|
||||
avail_features: acked_features,
|
||||
acked_features: 0,
|
||||
queue_sizes: vec![queue_size; num_queues],
|
||||
paused_sync: Some(Arc::new(Barrier::new(3))),
|
||||
paused_sync: Some(Arc::new(Barrier::new(2))),
|
||||
min_queues: DEFAULT_QUEUE_NUMBER as u16,
|
||||
..Default::default()
|
||||
},
|
||||
@ -414,7 +413,6 @@ impl VirtioDevice for Fs {
|
||||
queue_evts: Vec<EventFd>,
|
||||
) -> ActivateResult {
|
||||
self.common.activate(&queues, &queue_evts, &interrupt_cb)?;
|
||||
let (kill_evt, pause_evt) = self.common.dup_eventfds();
|
||||
self.guest_memory = Some(mem.clone());
|
||||
|
||||
// The backend acknowledged features must contain the protocol feature
|
||||
@ -463,39 +461,10 @@ impl VirtioDevice for Fs {
|
||||
None
|
||||
};
|
||||
|
||||
let mut handler = VhostUserEpollHandler::new(VhostUserEpollConfig {
|
||||
kill_evt,
|
||||
pause_evt,
|
||||
slave_req_handler,
|
||||
});
|
||||
|
||||
let paused = self.common.paused.clone();
|
||||
let paused_sync = self.common.paused_sync.clone();
|
||||
let mut epoll_threads = Vec::new();
|
||||
let virtio_vhost_fs_seccomp_filter =
|
||||
get_seccomp_filter(&self.seccomp_action, Thread::VirtioVhostFs)
|
||||
.map_err(ActivateError::CreateSeccompFilter)?;
|
||||
thread::Builder::new()
|
||||
.name(self.id.clone())
|
||||
.spawn(move || {
|
||||
if let Err(e) = SeccompFilter::apply(virtio_vhost_fs_seccomp_filter) {
|
||||
error!("Error applying seccomp filter: {:?}", e);
|
||||
} else if let Err(e) = handler.run(paused, paused_sync.unwrap()) {
|
||||
error!("Error running worker: {:?}", e);
|
||||
}
|
||||
})
|
||||
.map(|thread| epoll_threads.push(thread))
|
||||
.map_err(|e| {
|
||||
error!("failed to clone queue EventFd: {}", e);
|
||||
ActivateError::BadActivate
|
||||
})?;
|
||||
|
||||
self.common.epoll_threads = Some(epoll_threads);
|
||||
|
||||
// Run a dedicated thread for handling potential reconnections with
|
||||
// the backend.
|
||||
// the backend as well as requests initiated by the backend.
|
||||
let (kill_evt, pause_evt) = self.common.dup_eventfds();
|
||||
let mut reconnect_handler = ReconnectEpollHandler {
|
||||
let mut reconnect_handler: VhostUserEpollHandler<SlaveReqHandler> = VhostUserEpollHandler {
|
||||
vu: self.vu.clone(),
|
||||
mem,
|
||||
kill_evt,
|
||||
@ -507,15 +476,22 @@ impl VirtioDevice for Fs {
|
||||
acked_protocol_features: self.acked_protocol_features,
|
||||
socket_path: self.socket_path.clone(),
|
||||
server: false,
|
||||
slave_req_handler,
|
||||
};
|
||||
|
||||
let paused = self.common.paused.clone();
|
||||
let paused_sync = self.common.paused_sync.clone();
|
||||
|
||||
let virtio_vhost_fs_seccomp_filter =
|
||||
get_seccomp_filter(&self.seccomp_action, Thread::VirtioVhostFs)
|
||||
.map_err(ActivateError::CreateSeccompFilter)?;
|
||||
|
||||
thread::Builder::new()
|
||||
.name(format!("{}_reconnect", self.id))
|
||||
.name(self.id.to_string())
|
||||
.spawn(move || {
|
||||
if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) {
|
||||
if let Err(e) = SeccompFilter::apply(virtio_vhost_fs_seccomp_filter) {
|
||||
error!("Error applying seccomp filter: {:?}", e);
|
||||
} else if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) {
|
||||
error!("Error running reconnection worker: {:?}", e);
|
||||
}
|
||||
})
|
||||
|
@ -1,88 +0,0 @@
|
||||
// Copyright (c) 2019 Intel Corporation. All rights reserved.
|
||||
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// Copyright 2017 The Chromium OS Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE-BSD-3-Clause file.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
|
||||
|
||||
use super::super::{EpollHelper, EpollHelperError, EpollHelperHandler, EPOLL_HELPER_EVENT_LAST};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, Barrier};
|
||||
use vhost::vhost_user::{MasterReqHandler, VhostUserMasterReqHandler};
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
/// Collection of common parameters required by vhost-user devices while
|
||||
/// call Epoll handler.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `interrupt_cb` interrupt for virtqueue change.
|
||||
/// * `kill_evt` - EventFd used to kill the vhost-user device.
|
||||
/// * `vu_interrupt_list` - virtqueue and EventFd to signal when buffer used.
|
||||
pub struct VhostUserEpollConfig<S: VhostUserMasterReqHandler> {
|
||||
pub kill_evt: EventFd,
|
||||
pub pause_evt: EventFd,
|
||||
pub slave_req_handler: Option<MasterReqHandler<S>>,
|
||||
}
|
||||
|
||||
pub struct VhostUserEpollHandler<S: VhostUserMasterReqHandler> {
|
||||
vu_epoll_cfg: VhostUserEpollConfig<S>,
|
||||
slave_evt_idx: u16,
|
||||
}
|
||||
|
||||
impl<S: VhostUserMasterReqHandler> VhostUserEpollHandler<S> {
|
||||
/// Construct a new event handler for vhost-user based devices.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `vu_epoll_cfg` - collection of common parameters for vhost-user devices
|
||||
///
|
||||
/// # Return
|
||||
/// * `VhostUserEpollHandler` - epoll handler for vhost-user based devices
|
||||
pub fn new(vu_epoll_cfg: VhostUserEpollConfig<S>) -> VhostUserEpollHandler<S> {
|
||||
VhostUserEpollHandler {
|
||||
vu_epoll_cfg,
|
||||
slave_evt_idx: EPOLL_HELPER_EVENT_LAST + 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(
|
||||
&mut self,
|
||||
paused: Arc<AtomicBool>,
|
||||
paused_sync: Arc<Barrier>,
|
||||
) -> std::result::Result<(), EpollHelperError> {
|
||||
let mut helper =
|
||||
EpollHelper::new(&self.vu_epoll_cfg.kill_evt, &self.vu_epoll_cfg.pause_evt)?;
|
||||
|
||||
if let Some(self_req_handler) = &self.vu_epoll_cfg.slave_req_handler {
|
||||
helper.add_event(self_req_handler.as_raw_fd(), self.slave_evt_idx)?;
|
||||
}
|
||||
|
||||
helper.run(paused, paused_sync, self)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: VhostUserMasterReqHandler> EpollHelperHandler for VhostUserEpollHandler<S> {
|
||||
fn handle_event(&mut self, _helper: &mut EpollHelper, event: &epoll::Event) -> bool {
|
||||
let ev_type = event.data as u16;
|
||||
match ev_type {
|
||||
x if x == self.slave_evt_idx => {
|
||||
if let Some(slave_req_handler) = self.vu_epoll_cfg.slave_req_handler.as_mut() {
|
||||
if let Err(e) = slave_req_handler.handle_request() {
|
||||
error!("Failed to handle vhost-user request: {:?}", e);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
error!("Unknown event for vhost-user");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
@ -12,7 +12,7 @@ use std::ops::Deref;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::sync::{atomic::AtomicBool, Arc, Barrier, Mutex};
|
||||
use vhost::vhost_user::message::VhostUserVirtioFeatures;
|
||||
use vhost::vhost_user::Master;
|
||||
use vhost::vhost_user::{Master, MasterReqHandler, VhostUserMasterReqHandler};
|
||||
use vhost::Error as VhostError;
|
||||
use vm_memory::{Error as MmapError, GuestAddressSpace, GuestMemoryAtomic};
|
||||
use vm_virtio::Error as VirtioError;
|
||||
@ -21,7 +21,6 @@ use vu_common_ctrl::{connect_vhost_user, reinitialize_vhost_user};
|
||||
|
||||
pub mod blk;
|
||||
pub mod fs;
|
||||
mod handler;
|
||||
pub mod net;
|
||||
pub mod vu_common_ctrl;
|
||||
|
||||
@ -127,8 +126,9 @@ pub const DEFAULT_VIRTIO_FEATURES: u64 = 1 << VIRTIO_F_RING_INDIRECT_DESC
|
||||
| VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
|
||||
|
||||
const HUP_CONNECTION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
|
||||
const SLAVE_REQ_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
|
||||
|
||||
pub struct ReconnectEpollHandler {
|
||||
pub struct VhostUserEpollHandler<S: VhostUserMasterReqHandler> {
|
||||
pub vu: Arc<Mutex<Master>>,
|
||||
pub mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
pub kill_evt: EventFd,
|
||||
@ -140,9 +140,10 @@ pub struct ReconnectEpollHandler {
|
||||
pub acked_protocol_features: u64,
|
||||
pub socket_path: String,
|
||||
pub server: bool,
|
||||
pub slave_req_handler: Option<MasterReqHandler<S>>,
|
||||
}
|
||||
|
||||
impl ReconnectEpollHandler {
|
||||
impl<S: VhostUserMasterReqHandler> VhostUserEpollHandler<S> {
|
||||
pub fn run(
|
||||
&mut self,
|
||||
paused: Arc<AtomicBool>,
|
||||
@ -154,6 +155,11 @@ impl ReconnectEpollHandler {
|
||||
HUP_CONNECTION_EVENT,
|
||||
epoll::Events::EPOLLHUP,
|
||||
)?;
|
||||
|
||||
if let Some(slave_req_handler) = &self.slave_req_handler {
|
||||
helper.add_event(slave_req_handler.as_raw_fd(), SLAVE_REQ_EVENT)?;
|
||||
}
|
||||
|
||||
helper.run(paused, paused_sync, self)?;
|
||||
|
||||
Ok(())
|
||||
@ -213,7 +219,7 @@ impl ReconnectEpollHandler {
|
||||
}
|
||||
}
|
||||
|
||||
impl EpollHelperHandler for ReconnectEpollHandler {
|
||||
impl<S: VhostUserMasterReqHandler> EpollHelperHandler for VhostUserEpollHandler<S> {
|
||||
fn handle_event(&mut self, helper: &mut EpollHelper, event: &epoll::Event) -> bool {
|
||||
let ev_type = event.data as u16;
|
||||
match ev_type {
|
||||
@ -223,6 +229,14 @@ impl EpollHelperHandler for ReconnectEpollHandler {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
SLAVE_REQ_EVENT => {
|
||||
if let Some(slave_req_handler) = self.slave_req_handler.as_mut() {
|
||||
if let Err(e) = slave_req_handler.handle_request() {
|
||||
error!("Failed to handle request from vhost-user backend: {:?}", e);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
error!("Unknown event for vhost-user reconnection thread");
|
||||
return true;
|
||||
|
@ -6,7 +6,7 @@ use crate::vhost_user::vu_common_ctrl::{
|
||||
add_memory_region, connect_vhost_user, negotiate_features_vhost_user, reset_vhost_user,
|
||||
setup_vhost_user, update_mem_table, VhostUserConfig,
|
||||
};
|
||||
use crate::vhost_user::{Error, ReconnectEpollHandler, Result};
|
||||
use crate::vhost_user::{Error, Result, VhostUserEpollHandler};
|
||||
use crate::{
|
||||
ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue,
|
||||
VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterrupt, EPOLL_HELPER_EVENT_LAST,
|
||||
@ -304,7 +304,7 @@ impl VirtioDevice for Net {
|
||||
// the backend.
|
||||
let (kill_evt, pause_evt) = self.common.dup_eventfds();
|
||||
|
||||
let mut reconnect_handler = ReconnectEpollHandler {
|
||||
let mut reconnect_handler: VhostUserEpollHandler<SlaveReqHandler> = VhostUserEpollHandler {
|
||||
vu: self.vhost_user_net.clone(),
|
||||
mem,
|
||||
kill_evt,
|
||||
@ -316,13 +316,14 @@ impl VirtioDevice for Net {
|
||||
acked_protocol_features: self.acked_protocol_features,
|
||||
socket_path: self.socket_path.clone(),
|
||||
server: self.server,
|
||||
slave_req_handler: None,
|
||||
};
|
||||
|
||||
let paused = self.common.paused.clone();
|
||||
let paused_sync = self.common.paused_sync.clone();
|
||||
|
||||
thread::Builder::new()
|
||||
.name(format!("{}_reconnect", self.id))
|
||||
.name(self.id.to_string())
|
||||
.spawn(move || {
|
||||
if let Err(e) = reconnect_handler.run(paused, paused_sync.unwrap()) {
|
||||
error!("Error running reconnection worker: {:?}", e);
|
||||
|
Loading…
x
Reference in New Issue
Block a user