vhost_user_backend: Move to a per-queue RwLock

Instead of locking every queues whenever something needs to be updated,
this patch modifies the code design to lock each Vring independently.
This allows for much finer granularity, and will allow multiple queues
to be handled at the same time.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2019-09-16 11:31:23 -07:00
parent 4ed81894aa
commit d4f7f73bc8

View File

@ -189,7 +189,7 @@ impl<S: VhostUserBackend> VhostUserDaemon<S> {
/// of `process_queue`. With this twisted trick, all common parts related
/// to the virtqueues can remain part of the library.
pub fn process_queue(&self, q_idx: u16) -> Result<()> {
self.vring_handler.read().unwrap().process_queue(q_idx)
self.vring_handler.write().unwrap().process_queue(q_idx)
}
}
@ -211,37 +211,6 @@ struct Vring {
enabled: bool,
}
impl Clone for Vring {
fn clone(&self) -> Self {
let kick = if let Some(c) = &self.kick {
Some(c.try_clone().unwrap())
} else {
None
};
let call = if let Some(c) = &self.call {
Some(c.try_clone().unwrap())
} else {
None
};
let err = if let Some(e) = &self.err {
Some(e.try_clone().unwrap())
} else {
None
};
Vring {
queue: self.queue.clone(),
kick,
call,
err,
started: self.started,
enabled: self.enabled,
}
}
}
impl Vring {
fn new(max_queue_size: u16) -> Self {
Vring {
@ -257,7 +226,7 @@ impl Vring {
struct VringEpollHandler<S: VhostUserBackend> {
backend: Arc<S>,
vrings: Arc<RwLock<Vec<Vring>>>,
vrings: Vec<Arc<RwLock<Vring>>>,
mem: Option<GuestMemoryMmap>,
epoll_fd: RawFd,
}
@ -267,8 +236,8 @@ impl<S: VhostUserBackend> VringEpollHandler<S> {
self.mem = mem;
}
fn process_queue(&self, q_idx: u16) -> Result<()> {
let vring = &mut self.vrings.write().unwrap()[q_idx as usize];
fn process_queue(&mut self, q_idx: u16) -> Result<()> {
let vring = &mut self.vrings[q_idx as usize].write().unwrap();
let mut used_desc_heads = vec![(0, 0); vring.queue.size as usize];
let mut used_count = 0;
if let Some(mem) = &self.mem {
@ -296,11 +265,11 @@ impl<S: VhostUserBackend> VringEpollHandler<S> {
Ok(())
}
fn handle_event(&self, device_event: u16, evset: epoll::Events) -> Result<bool> {
let num_queues = self.vrings.read().unwrap().len();
fn handle_event(&mut self, device_event: u16, evset: epoll::Events) -> Result<bool> {
let num_queues = self.vrings.len();
match device_event as usize {
x if x < num_queues => {
if let Some(kick) = &self.vrings.read().unwrap()[device_event as usize].kick {
if let Some(kick) = &self.vrings[device_event as usize].read().unwrap().kick {
kick.read().unwrap();
}
@ -313,7 +282,7 @@ impl<S: VhostUserBackend> VringEpollHandler<S> {
}
fn register_vring_listener(&self, q_idx: usize) -> result::Result<(), io::Error> {
if let Some(fd) = &self.vrings.read().unwrap()[q_idx].kick {
if let Some(fd) = &self.vrings[q_idx].read().unwrap().kick {
self.register_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, q_idx as u64)
} else {
Ok(())
@ -321,7 +290,7 @@ impl<S: VhostUserBackend> VringEpollHandler<S> {
}
fn unregister_vring_listener(&self, q_idx: usize) -> result::Result<(), io::Error> {
if let Some(fd) = &self.vrings.read().unwrap()[q_idx].kick {
if let Some(fd) = &self.vrings[q_idx].read().unwrap().kick {
self.unregister_listener(fd.as_raw_fd(), epoll::Events::EPOLLIN, q_idx as u64)
} else {
Ok(())
@ -396,7 +365,7 @@ impl<S: VhostUserBackend> VringWorker<S> {
let ev_type = event.data as u16;
if self.handler.read().unwrap().handle_event(ev_type, evset)? {
if self.handler.write().unwrap().handle_event(ev_type, evset)? {
break 'epoll;
}
}
@ -416,7 +385,7 @@ struct VhostUserHandler<S: VhostUserBackend> {
num_queues: usize,
max_queue_size: usize,
memory: Option<Memory>,
vrings: Arc<RwLock<Vec<Vring>>>,
vrings: Vec<Arc<RwLock<Vring>>>,
}
impl<S: VhostUserBackend> VhostUserHandler<S> {
@ -425,10 +394,7 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
let max_queue_size = backend.max_queue_size();
let arc_backend = Arc::new(backend);
let vrings = Arc::new(RwLock::new(vec![
Vring::new(max_queue_size as u16);
num_queues
]));
let vrings = vec![Arc::new(RwLock::new(Vring::new(max_queue_size as u16))); num_queues];
// Create the epoll file descriptor
let epoll_fd = epoll::create(true).unwrap();
@ -518,8 +484,8 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
// been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0.
let vring_enabled =
self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0;
for vring in self.vrings.write().unwrap().iter_mut() {
vring.enabled = vring_enabled;
for vring in self.vrings.iter_mut() {
vring.write().unwrap().enabled = vring_enabled;
}
Ok(())
@ -575,7 +541,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size {
return Err(VhostUserError::InvalidParam);
}
self.vrings.write().unwrap()[index as usize].queue.size = num as u16;
self.vrings[index as usize].write().unwrap().queue.size = num as u16;
Ok(())
}
@ -596,13 +562,17 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
let desc_table = self.vmm_va_to_gpa(descriptor).unwrap();
let avail_ring = self.vmm_va_to_gpa(available).unwrap();
let used_ring = self.vmm_va_to_gpa(used).unwrap();
self.vrings.write().unwrap()[index as usize]
self.vrings[index as usize]
.write()
.unwrap()
.queue
.desc_table = GuestAddress(desc_table);
self.vrings.write().unwrap()[index as usize]
self.vrings[index as usize]
.write()
.unwrap()
.queue
.avail_ring = GuestAddress(avail_ring);
self.vrings.write().unwrap()[index as usize].queue.used_ring = GuestAddress(used_ring);
self.vrings[index as usize].write().unwrap().queue.used_ring = GuestAddress(used_ring);
Ok(())
} else {
Err(VhostUserError::InvalidParam)
@ -610,10 +580,12 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
}
fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> {
self.vrings.write().unwrap()[index as usize]
self.vrings[index as usize]
.write()
.unwrap()
.queue
.next_avail = Wrapping(base as u16);
self.vrings.write().unwrap()[index as usize].queue.next_used = Wrapping(base as u16);
self.vrings[index as usize].write().unwrap().queue.next_used = Wrapping(base as u16);
Ok(())
}
@ -626,14 +598,16 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
// that file descriptor is readable) on the descriptor specified by
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
// VHOST_USER_GET_VRING_BASE.
self.vrings.write().unwrap()[index as usize].started = false;
self.vrings[index as usize].write().unwrap().started = false;
self.vring_handler
.read()
.unwrap()
.unregister_vring_listener(index as usize)
.unwrap();
let next_avail = self.vrings.read().unwrap()[index as usize]
let next_avail = self.vrings[index as usize]
.read()
.unwrap()
.queue
.next_avail
.0 as u16;
@ -646,11 +620,13 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
return Err(VhostUserError::InvalidParam);
}
if self.vrings.read().unwrap()[index as usize].kick.is_some() {
if self.vrings[index as usize].read().unwrap().kick.is_some() {
// Close file descriptor set by previous operations.
let _ = unsafe {
libc::close(
self.vrings.write().unwrap()[index as usize]
self.vrings[index as usize]
.write()
.unwrap()
.kick
.take()
.unwrap()
@ -658,7 +634,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
)
};
}
self.vrings.write().unwrap()[index as usize].kick =
self.vrings[index as usize].write().unwrap().kick =
Some(unsafe { EventFd::from_raw_fd(fd.unwrap()) });;
// Quotation from vhost-user spec:
@ -668,7 +644,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
// VHOST_USER_GET_VRING_BASE.
//
// So we should add fd to event monitor(select, poll, epoll) here.
self.vrings.write().unwrap()[index as usize].started = true;
self.vrings[index as usize].write().unwrap().started = true;
self.vring_handler
.read()
.unwrap()
@ -683,11 +659,13 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
return Err(VhostUserError::InvalidParam);
}
if self.vrings.write().unwrap()[index as usize].call.is_some() {
if self.vrings[index as usize].write().unwrap().call.is_some() {
// Close file descriptor set by previous operations.
let _ = unsafe {
libc::close(
self.vrings.write().unwrap()[index as usize]
self.vrings[index as usize]
.write()
.unwrap()
.call
.take()
.unwrap()
@ -695,7 +673,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
)
};
}
self.vrings.write().unwrap()[index as usize].call =
self.vrings[index as usize].write().unwrap().call =
Some(unsafe { EventFd::from_raw_fd(fd.unwrap()) });
Ok(())
@ -706,11 +684,13 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
return Err(VhostUserError::InvalidParam);
}
if self.vrings.read().unwrap()[index as usize].err.is_some() {
if self.vrings[index as usize].read().unwrap().err.is_some() {
// Close file descriptor set by previous operations.
let _ = unsafe {
libc::close(
self.vrings.write().unwrap()[index as usize]
self.vrings[index as usize]
.write()
.unwrap()
.err
.take()
.unwrap()
@ -718,7 +698,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
)
};
}
self.vrings.write().unwrap()[index as usize].err =
self.vrings[index as usize].write().unwrap().err =
Some(unsafe { EventFd::from_raw_fd(fd.unwrap()) });
Ok(())
@ -737,7 +717,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandler for VhostUserHandler<S> {
// enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1,
// or after it has been disabled by VHOST_USER_SET_VRING_ENABLE
// with parameter 0.
self.vrings.write().unwrap()[index as usize].enabled = enable;
self.vrings[index as usize].write().unwrap().enabled = enable;
Ok(())
}