vhost_user_blk: Add multithreaded multiqueue support

After all the previous refactoring patches, we can finally create
multiple threads under the same backend. This is directly combined with
multiqueues so that it will create one thread per queue.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-04-16 18:50:12 +02:00
parent 13c8283fbe
commit a517be4eac

View File

@ -175,10 +175,11 @@ impl VhostUserBlkThread {
}
struct VhostUserBlkBackend {
thread: Mutex<VhostUserBlkThread>,
threads: Vec<Mutex<VhostUserBlkThread>>,
config: virtio_blk_config,
rdonly: bool,
poll_queue: bool,
queues_per_thread: Vec<u64>,
}
impl VhostUserBlkBackend {
@ -219,13 +220,24 @@ impl VhostUserBlkBackend {
config.num_queues = num_queues as u16;
config.wce = 1;
let thread = Mutex::new(VhostUserBlkThread::new(image, image_id, nsectors)?);
let mut queues_per_thread = Vec::new();
let mut threads = Vec::new();
for i in 0..num_queues {
let thread = Mutex::new(VhostUserBlkThread::new(
image.clone(),
image_id.clone(),
nsectors,
)?);
threads.push(thread);
queues_per_thread.push(0b1 << i);
}
Ok(VhostUserBlkBackend {
thread,
threads,
config,
rdonly,
poll_queue,
queues_per_thread,
})
}
}
@ -257,11 +269,15 @@ impl VhostUserBackend for VhostUserBlkBackend {
}
fn set_event_idx(&mut self, enabled: bool) {
self.thread.lock().unwrap().event_idx = enabled;
for thread in self.threads.iter() {
thread.lock().unwrap().event_idx = enabled;
}
}
fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> {
self.thread.lock().unwrap().mem = Some(mem);
for thread in self.threads.iter() {
thread.lock().unwrap().mem = Some(mem.clone());
}
Ok(())
}
@ -270,7 +286,7 @@ impl VhostUserBackend for VhostUserBlkBackend {
device_event: u16,
evset: epoll::Events,
vrings: &[Arc<RwLock<Vring>>],
_thread_id: usize,
thread_id: usize,
) -> VhostUserBackendResult<bool> {
if evset != epoll::Events::EPOLLIN {
return Err(Error::HandleEventNotEpollIn.into());
@ -278,10 +294,10 @@ impl VhostUserBackend for VhostUserBlkBackend {
debug!("event received: {:?}", device_event);
let mut thread = self.thread.lock().unwrap();
let mut thread = self.threads[thread_id].lock().unwrap();
match device_event {
q if device_event < self.config.num_queues => {
let mut vring = vrings[q as usize].write().unwrap();
0 => {
let mut vring = vrings[0].write().unwrap();
if self.poll_queue {
// Actively poll the queue until POLL_QUEUE_US has passed
@ -332,12 +348,22 @@ impl VhostUserBackend for VhostUserBlkBackend {
buf.to_vec()
}
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, Option<u16>)> {
fn exit_event(&self, thread_index: usize) -> Option<(EventFd, Option<u16>)> {
// The exit event is placed after the queue, which is event index 1.
Some((
self.thread.lock().unwrap().kill_evt.try_clone().unwrap(),
None,
self.threads[thread_index]
.lock()
.unwrap()
.kill_evt
.try_clone()
.unwrap(),
Some(1),
))
}
fn queues_per_thread(&self) -> Vec<u64> {
self.queues_per_thread.clone()
}
}
struct VhostUserBlkBackendConfig<'a> {
@ -451,16 +477,9 @@ pub fn start_block_backend(backend_command: &str) {
error!("Error from the main thread: {:?}", e);
}
let kill_evt = blk_backend
.write()
.unwrap()
.thread
.lock()
.unwrap()
.kill_evt
.try_clone()
.unwrap();
if let Err(e) = kill_evt.write(1) {
error!("Error shutting down worker thread: {:?}", e)
for thread in blk_backend.read().unwrap().threads.iter() {
if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
error!("Error shutting down worker thread: {:?}", e)
}
}
}