virtio-devices: Acknowledge a device being paused

Using the Rust Barrier mechanism, this patch forces each virtio device
to acknowledge they've been correctly paused before going further.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-08-11 16:05:06 +02:00
parent fd48779a0d
commit aa57762c4f
17 changed files with 162 additions and 54 deletions

View File

@ -25,7 +25,7 @@ use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use vm_memory::{
Address, ByteValued, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic,
@ -229,12 +229,16 @@ impl BalloonEpollHandler {
Ok(())
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.resize_receiver.evt.as_raw_fd(), RESIZE_EVENT)?;
helper.add_event(self.inflate_queue_evt.as_raw_fd(), INFLATE_QUEUE_EVENT)?;
helper.add_event(self.deflate_queue_evt.as_raw_fd(), DEFLATE_QUEUE_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -313,6 +317,7 @@ pub struct Balloon {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
}
impl Balloon {
@ -335,6 +340,7 @@ impl Balloon {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
})
}
@ -443,10 +449,11 @@ impl VirtioDevice for Balloon {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
thread::Builder::new()
.name("virtio_balloon".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone virtio-balloon epoll thread: {}", e);

View File

@ -27,7 +27,7 @@ use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use virtio_bindings::bindings::virtio_blk::*;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
@ -198,10 +198,14 @@ impl<T: DiskFile> BlockEpollHandler<T> {
Ok(())
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -268,6 +272,7 @@ pub struct Block<T: DiskFile> {
epoll_threads: Option<Vec<thread::JoinHandle<()>>>,
pause_evt: Option<EventFd>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
queue_size: Vec<u16>,
writeback: Arc<AtomicBool>,
counters: BlockCounters,
@ -346,6 +351,7 @@ impl<T: DiskFile> Block<T> {
epoll_threads: None,
pause_evt: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(num_queues + 1)),
queue_size: vec![queue_size; num_queues],
writeback: Arc::new(AtomicBool::new(true)),
counters: BlockCounters::default(),
@ -534,6 +540,7 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
handler.queue.set_event_idx(event_idx);
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
// Retrieve seccomp filter for virtio_blk thread
let virtio_blk_seccomp_filter =
@ -545,7 +552,7 @@ impl<T: 'static + DiskFile + Send> VirtioDevice for Block<T> {
.spawn(move || {
if let Err(e) = SeccompFilter::apply(virtio_blk_seccomp_filter) {
error!("Error applying seccomp filter: {:?}", e);
} else if let Err(e) = handler.run(paused) {
} else if let Err(e) = handler.run(paused, paused_sync) {
error!("Error running worker: {:?}", e);
}
})

View File

@ -26,7 +26,7 @@ use std::os::unix::io::{AsRawFd, RawFd};
use std::path::PathBuf;
use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use std::thread;
use virtio_bindings::bindings::virtio_blk::*;
use vm_memory::{
@ -229,11 +229,15 @@ impl BlockIoUringEpollHandler {
})
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
helper.add_event(self.io_uring_evt.as_raw_fd(), IO_URING_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -309,6 +313,7 @@ pub struct BlockIoUring {
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
pause_evt: Option<EventFd>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
queue_size: Vec<u16>,
writeback: Arc<AtomicBool>,
counters: BlockCounters,
@ -381,6 +386,7 @@ impl BlockIoUring {
epoll_threads: None,
pause_evt: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(num_queues + 1)),
queue_size: vec![queue_size; num_queues],
writeback: Arc::new(AtomicBool::new(true)),
counters: BlockCounters::default(),
@ -580,6 +586,7 @@ impl VirtioDevice for BlockIoUring {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
// Register the io_uring eventfd that will notify the epoll loop
// when something in the completion queue is ready.
@ -594,7 +601,7 @@ impl VirtioDevice for BlockIoUring {
thread::Builder::new()
.name("virtio_blk".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone the virtio-blk epoll thread: {}", e);

View File

@ -21,7 +21,7 @@ use std::ops::DerefMut;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vm_migration::{
@ -180,13 +180,17 @@ impl ConsoleEpollHandler {
})
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.input_queue_evt.as_raw_fd(), INPUT_QUEUE_EVENT)?;
helper.add_event(self.output_queue_evt.as_raw_fd(), OUTPUT_QUEUE_EVENT)?;
helper.add_event(self.input_evt.as_raw_fd(), INPUT_EVENT)?;
helper.add_event(self.config_evt.as_raw_fd(), CONFIG_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -306,6 +310,7 @@ pub struct Console {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<()>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
seccomp_action: SeccompAction,
}
@ -358,6 +363,7 @@ impl Console {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
seccomp_action,
},
console_input,
@ -495,6 +501,7 @@ impl VirtioDevice for Console {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
// Retrieve seccomp filter for virtio_console thread
let virtio_console_seccomp_filter =
@ -505,7 +512,7 @@ impl VirtioDevice for Console {
.spawn(move || {
if let Err(e) = SeccompFilter::apply(virtio_console_seccomp_filter) {
error!("Error applying seccomp filter: {:?}", e);
} else if let Err(e) = handler.run(paused) {
} else if let Err(e) = handler.run(paused, paused_sync) {
error!("Error running worker: {:?}", e);
}
})

View File

@ -205,6 +205,13 @@ macro_rules! virtio_pausable_trait_inner {
pause_evt
.write(1)
.map_err(|e| MigratableError::Pause(e.into()))?;
// Wait for all threads to acknowledge the pause before going
// any further. This is exclusively performed when pause_evt
// eventfd is Some(), as this means the virtio device has been
// activated. One specific case where the device can be paused
// while it hasn't been yet activated is snapshot/restore.
self.paused_sync.wait();
}
Ok(())

View File

@ -11,7 +11,7 @@
use std::fs::File;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use std::thread;
use vmm_sys_util::eventfd::EventFd;
@ -69,6 +69,7 @@ impl EpollHelper {
pub fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
handler: &mut dyn EpollHelperHandler,
) -> std::result::Result<(), EpollHelperError> {
const EPOLL_EVENTS_LEN: usize = 100;
@ -110,6 +111,11 @@ impl EpollHelper {
}
EPOLL_HELPER_EVENT_PAUSE => {
debug!("PAUSE_EVENT received, pausing epoll loop");
// Acknowledge the pause is effective by using the
// paused_sync barrier.
paused_sync.wait();
// We loop here to handle spurious park() returns.
// Until we have not resumed, the paused boolean will
// be true.

View File

@ -19,7 +19,7 @@ use std::ops::Bound::Included;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Barrier, RwLock};
use std::thread;
use vfio_ioctls::ExternalDmaMapping;
use vm_memory::{
@ -646,11 +646,15 @@ impl IommuEpollHandler {
})
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evts[0].as_raw_fd(), REQUEST_Q_EVENT)?;
helper.add_event(self.queue_evts[1].as_raw_fd(), EVENT_Q_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -743,6 +747,7 @@ pub struct Iommu {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
}
#[derive(Serialize, Deserialize)]
@ -783,6 +788,7 @@ impl Iommu {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
},
mapping,
))
@ -955,10 +961,11 @@ impl VirtioDevice for Iommu {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
thread::Builder::new()
.name("virtio_iommu".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone the virtio-iommu epoll thread: {}", e);

View File

@ -28,7 +28,7 @@ use std::os::unix::io::{AsRawFd, RawFd};
use std::result;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use vm_memory::{
Address, ByteValued, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic,
@ -589,11 +589,15 @@ impl MemEpollHandler {
used_count > 0
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.resize.evt.as_raw_fd(), RESIZE_EVENT)?;
helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -687,6 +691,7 @@ pub struct Mem {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
}
impl Mem {
@ -737,6 +742,7 @@ impl Mem {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
})
}
}
@ -844,10 +850,11 @@ impl VirtioDevice for Mem {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
thread::Builder::new()
.name("virtio_mem".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone virtio-mem epoll thread: {}", e);

View File

@ -28,7 +28,7 @@ use std::num::Wrapping;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::vec::Vec;
use virtio_bindings::bindings::virtio_net::*;
@ -134,7 +134,11 @@ impl NetEpollHandler {
Ok(())
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt_pair[0].as_raw_fd(), RX_QUEUE_EVENT)?;
helper.add_event(self.queue_evt_pair[1].as_raw_fd(), TX_QUEUE_EVENT)?;
@ -153,7 +157,7 @@ impl NetEpollHandler {
// The NetQueuePair needs the epoll fd.
self.net.epoll_fd = Some(helper.as_raw_fd());
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -205,6 +209,7 @@ pub struct Net {
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
ctrl_queue_epoll_thread: Option<thread::JoinHandle<()>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
queue_size: Vec<u16>,
counters: NetCounters,
seccomp_action: SeccompAction,
@ -265,6 +270,7 @@ impl Net {
epoll_threads: None,
ctrl_queue_epoll_thread: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new((num_queues / 2) + 1)),
queue_size: vec![queue_size; queue_num],
counters: NetCounters::default(),
seccomp_action,
@ -419,6 +425,12 @@ impl VirtioDevice for Net {
};
let paused = self.paused.clone();
// Let's update the barrier as we need 1 for each RX/TX pair +
// 1 for the control queue + 1 for the main thread signalling
// the pause.
self.paused_sync = Arc::new(Barrier::new(taps.len() + 2));
let paused_sync = self.paused_sync.clone();
// Retrieve seccomp filter for virtio_net thread
let virtio_net_seccomp_filter =
get_seccomp_filter(&self.seccomp_action, Thread::VirtioNet)
@ -428,7 +440,7 @@ impl VirtioDevice for Net {
.spawn(move || {
if let Err(e) = SeccompFilter::apply(virtio_net_seccomp_filter) {
error!("Error applying seccomp filter: {:?}", e);
} else if let Err(e) = ctrl_handler.run_ctrl(paused) {
} else if let Err(e) = ctrl_handler.run_ctrl(paused, paused_sync) {
error!("Error running worker: {:?}", e);
}
})
@ -477,9 +489,10 @@ impl VirtioDevice for Net {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
thread::Builder::new()
.name("virtio_net".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);

View File

@ -10,7 +10,7 @@ use net_util::MacAddr;
use serde::ser::{Serialize, SerializeStruct, Serializer};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use virtio_bindings::bindings::virtio_net::*;
use vm_memory::{
ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError, GuestMemoryMmap,
@ -177,10 +177,11 @@ impl NetCtrlEpollHandler {
pub fn run_ctrl(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> std::result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.ctrl_q.queue_evt.as_raw_fd(), CTRL_QUEUE_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}

View File

@ -25,7 +25,7 @@ use std::mem::size_of;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use std::thread;
use vm_memory::{
Address, ByteValued, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic,
@ -243,10 +243,14 @@ impl PmemEpollHandler {
})
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -288,6 +292,7 @@ pub struct Pmem {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<()>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
mapping: UserspaceMapping,
seccomp_action: SeccompAction,
@ -336,6 +341,7 @@ impl Pmem {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
mapping,
seccomp_action,
_region,
@ -461,6 +467,7 @@ impl VirtioDevice for Pmem {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
// Retrieve seccomp filter for virtio_pmem thread
let virtio_pmem_seccomp_filter =
@ -471,7 +478,7 @@ impl VirtioDevice for Pmem {
.spawn(move || {
if let Err(e) = SeccompFilter::apply(virtio_pmem_seccomp_filter) {
error!("Error applying seccomp filter: {:?}", e);
} else if let Err(e) = handler.run(paused) {
} else if let Err(e) = handler.run(paused, paused_sync) {
error!("Error running worker: {:?}", e);
}
})

View File

@ -18,7 +18,7 @@ use std::io;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use std::thread;
use vm_memory::{Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vm_migration::{
@ -88,10 +88,14 @@ impl RngEpollHandler {
})
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -133,6 +137,7 @@ pub struct Rng {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<()>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
seccomp_action: SeccompAction,
}
@ -169,6 +174,7 @@ impl Rng {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
seccomp_action,
})
}
@ -288,6 +294,7 @@ impl VirtioDevice for Rng {
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
// Retrieve seccomp filter for virtio_rng thread
let virtio_rng_seccomp_filter =
@ -298,7 +305,7 @@ impl VirtioDevice for Rng {
.spawn(move || {
if let Err(e) = SeccompFilter::apply(virtio_rng_seccomp_filter) {
error!("Error applying seccomp filter: {:?}", e);
} else if let Err(e) = handler.run(paused) {
} else if let Err(e) = handler.run(paused, paused_sync) {
error!("Error running worker: {:?}", e);
}
})

View File

@ -14,7 +14,7 @@ use std::mem;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::vec::Vec;
use vhost_rs::vhost_user::message::VhostUserConfigFlags;
@ -44,6 +44,7 @@ pub struct Blk {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
}
impl Blk {
@ -148,6 +149,7 @@ impl Blk {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(vu_cfg.num_queues + 1)),
})
}
}
@ -273,9 +275,10 @@ impl VirtioDevice for Blk {
});
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
thread::Builder::new()
.name("vhost_user_blk".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone virtio epoll thread: {}", e);

View File

@ -13,7 +13,7 @@ use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use vhost_rs::vhost_user::message::{
VhostUserFSSlaveMsg, VhostUserFSSlaveMsgFlags, VhostUserProtocolFeatures,
@ -280,6 +280,7 @@ pub struct Fs {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
}
impl Fs {
@ -365,6 +366,7 @@ impl Fs {
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
})
}
}
@ -500,10 +502,11 @@ impl VirtioDevice for Fs {
});
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
thread::Builder::new()
.name("virtio_fs".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);

View File

@ -17,7 +17,7 @@ use vmm_sys_util::eventfd::EventFd;
use crate::VirtioInterrupt;
use std::os::unix::io::AsRawFd;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use vhost_rs::vhost_user::{MasterReqHandler, VhostUserMasterReqHandler};
/// Collection of common parameters required by vhost-user devices while
@ -67,7 +67,11 @@ impl<S: VhostUserMasterReqHandler> VhostUserEpollHandler<S> {
.map_err(Error::FailedSignalingUsedQueue)
}
pub fn run(&mut self, paused: Arc<AtomicBool>) -> std::result::Result<(), EpollHelperError> {
pub fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> std::result::Result<(), EpollHelperError> {
let mut helper =
EpollHelper::new(&self.vu_epoll_cfg.kill_evt, &self.vu_epoll_cfg.pause_evt)?;
@ -81,7 +85,7 @@ impl<S: VhostUserMasterReqHandler> VhostUserEpollHandler<S> {
helper.add_event(self_req_handler.as_raw_fd(), self.slave_evt_idx)?;
}
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}

View File

@ -16,7 +16,7 @@ use net_util::MacAddr;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::vec::Vec;
use vhost_rs::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
@ -48,6 +48,7 @@ pub struct Net {
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
ctrl_queue_epoll_thread: Option<thread::JoinHandle<result::Result<(), EpollHelperError>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
}
impl Net {
@ -153,6 +154,7 @@ impl Net {
epoll_threads: None,
ctrl_queue_epoll_thread: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new((vu_cfg.num_queues / 2) + 1)),
})
}
}
@ -259,9 +261,14 @@ impl VirtioDevice for Net {
};
let paused = self.paused.clone();
// Let's update the barrier as we need 1 for each RX/TX pair +
// 1 for the control queue + 1 for the main thread signalling
// the pause.
self.paused_sync = Arc::new(Barrier::new((queue_num / 2) + 2));
let paused_sync = self.paused_sync.clone();
thread::Builder::new()
.name("virtio_net".to_string())
.spawn(move || ctrl_handler.run_ctrl(paused))
.spawn(move || ctrl_handler.run_ctrl(paused, paused_sync))
.map(|thread| self.ctrl_queue_epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
@ -294,9 +301,10 @@ impl VirtioDevice for Net {
});
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
thread::Builder::new()
.name("vhost_user_net".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);

View File

@ -43,7 +43,7 @@ use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Barrier, RwLock};
use std::thread;
use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vm_migration::{
@ -193,13 +193,17 @@ where
}
}
fn run(&mut self, paused: Arc<AtomicBool>) -> result::Result<(), EpollHelperError> {
fn run(
&mut self,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
) -> result::Result<(), EpollHelperError> {
let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
helper.add_event(self.queue_evts[0].as_raw_fd(), RX_QUEUE_EVENT)?;
helper.add_event(self.queue_evts[1].as_raw_fd(), TX_QUEUE_EVENT)?;
helper.add_event(self.queue_evts[2].as_raw_fd(), EVT_QUEUE_EVENT)?;
helper.add_event(self.backend.read().unwrap().get_polled_fd(), BACKEND_EVENT)?;
helper.run(paused, self)?;
helper.run(paused, paused_sync, self)?;
Ok(())
}
@ -303,6 +307,7 @@ pub struct Vsock<B: VsockBackend> {
interrupt_cb: Option<Arc<dyn VirtioInterrupt>>,
epoll_threads: Option<Vec<thread::JoinHandle<result::Result<(), EpollHelperError>>>>,
paused: Arc<AtomicBool>,
paused_sync: Arc<Barrier>,
path: PathBuf,
}
@ -343,6 +348,7 @@ where
interrupt_cb: None,
epoll_threads: None,
paused: Arc::new(AtomicBool::new(false)),
paused_sync: Arc::new(Barrier::new(2)),
path,
})
}
@ -476,10 +482,11 @@ where
};
let paused = self.paused.clone();
let paused_sync = self.paused_sync.clone();
let mut epoll_threads = Vec::new();
thread::Builder::new()
.name("virtio_vsock".to_string())
.spawn(move || handler.run(paused))
.spawn(move || handler.run(paused, paused_sync))
.map(|thread| epoll_threads.push(thread))
.map_err(|e| {
error!("failed to clone the vsock epoll thread: {}", e);