vhost_user_net: Prepare for multithreaded support

In order to prepare for the support of multithreaded multiqueues, the
structure VhostUserNetThread is simplified to hold only one RX queue,
one TX queue, and one TAP interface.

Following this change, VhostUserNetBackend now holds a list of threads
instead of going through each thread to handle multiqueues.

These changes decouple neatly the abstraction between the backend and
each thread. This allows for a lot of simplification since we now know
all threads are identical, hence the handling of events becomes very
straightforward.

One important point is that each thread can be locked when in use,
without causing any contention with other threads since the backend
doesn't need to be locked anymore.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
Sebastien Boeuf 2020-04-09 16:35:56 +02:00
parent cd2b03f6ed
commit b927dceed8

View File

@ -96,97 +96,64 @@ struct VhostUserNetThread {
mem: Option<GuestMemoryMmap>,
vring_worker: Option<Arc<VringWorker>>,
kill_evt: EventFd,
taps: Vec<(Tap, usize)>,
rxs: Vec<RxVirtio>,
txs: Vec<TxVirtio>,
rx_tap_listenings: Vec<bool>,
num_queues: usize,
tap: (Tap, usize),
rx: RxVirtio,
tx: TxVirtio,
rx_tap_listening: bool,
}
impl VhostUserNetThread {
/// Create a new virtio network device with the given TAP interface.
fn new_with_tap(taps: Vec<Tap>, num_queues: usize) -> Result<Self> {
let mut taps_v: Vec<(Tap, usize)> = Vec::new();
for (i, tap) in taps.iter().enumerate() {
taps_v.push((tap.clone(), num_queues + i));
}
let mut rxs: Vec<RxVirtio> = Vec::new();
let mut txs: Vec<TxVirtio> = Vec::new();
let mut rx_tap_listenings: Vec<bool> = Vec::new();
for _ in 0..taps.len() {
let rx = RxVirtio::new();
rxs.push(rx);
let tx = TxVirtio::new();
txs.push(tx);
rx_tap_listenings.push(false);
}
fn new(tap: Tap, tap_evt_index: usize) -> Result<Self> {
Ok(VhostUserNetThread {
mem: None,
vring_worker: None,
kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
taps: taps_v,
rxs,
txs,
rx_tap_listenings,
num_queues,
tap: (tap, tap_evt_index),
rx: RxVirtio::new(),
tx: TxVirtio::new(),
rx_tap_listening: false,
})
}
/// Create a new virtio network device with the given IP address and
/// netmask.
fn new(
ip_addr: Ipv4Addr,
netmask: Ipv4Addr,
num_queues: usize,
ifname: Option<&str>,
) -> Result<Self> {
let taps = open_tap(ifname, Some(ip_addr), Some(netmask), num_queues / 2)
.map_err(Error::OpenTap)?;
Self::new_with_tap(taps, num_queues)
}
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
// if a buffer was used, and false if the frame must be deferred until a buffer
// is made available by the driver.
fn rx_single_frame(&mut self, mut queue: &mut Queue, index: usize) -> Result<bool> {
fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result<bool> {
let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?;
let next_desc = queue.iter(&mem).next();
if next_desc.is_none() {
// Queue has no available descriptors
if self.rx_tap_listenings[index] {
if self.rx_tap_listening {
self.vring_worker
.as_ref()
.unwrap()
.unregister_listener(
self.taps[index].0.as_raw_fd(),
self.tap.0.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::try_from(self.taps[index].1).unwrap(),
u64::try_from(self.tap.1).unwrap(),
)
.unwrap();
self.rx_tap_listenings[index] = false;
self.rx_tap_listening = false;
}
return Ok(false);
}
let write_complete = self.rxs[index].process_desc_chain(&mem, next_desc, &mut queue);
let write_complete = self.rx.process_desc_chain(&mem, next_desc, &mut queue);
Ok(write_complete)
}
fn process_rx(&mut self, vring: &mut Vring, index: usize) -> Result<()> {
fn process_rx(&mut self, vring: &mut Vring) -> Result<()> {
// Read as many frames as possible.
loop {
match self.read_tap(index) {
match self.read_tap() {
Ok(count) => {
self.rxs[index].bytes_read = count;
if !self.rx_single_frame(&mut vring.mut_queue(), index)? {
self.rxs[index].deferred_frame = true;
self.rx.bytes_read = count;
if !self.rx_single_frame(&mut vring.mut_queue())? {
self.rx.deferred_frame = true;
break;
}
}
@ -204,8 +171,8 @@ impl VhostUserNetThread {
}
}
}
if self.rxs[index].deferred_irqs {
self.rxs[index].deferred_irqs = false;
if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
vring.signal_used_queue().unwrap();
Ok(())
} else {
@ -213,15 +180,15 @@ impl VhostUserNetThread {
}
}
fn resume_rx(&mut self, vring: &mut Vring, index: usize) -> Result<()> {
if self.rxs[index].deferred_frame {
if self.rx_single_frame(&mut vring.mut_queue(), index)? {
self.rxs[index].deferred_frame = false;
fn resume_rx(&mut self, vring: &mut Vring) -> Result<()> {
if self.rx.deferred_frame {
if self.rx_single_frame(&mut vring.mut_queue())? {
self.rx.deferred_frame = false;
// process_rx() was interrupted possibly before consuming all
// packets in the tap; try continuing now.
self.process_rx(vring, index)
} else if self.rxs[index].deferred_irqs {
self.rxs[index].deferred_irqs = false;
self.process_rx(vring)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
vring.signal_used_queue().unwrap();
Ok(())
} else {
@ -232,16 +199,17 @@ impl VhostUserNetThread {
}
}
fn process_tx(&mut self, mut queue: &mut Queue, index: usize) -> Result<()> {
fn process_tx(&mut self, mut queue: &mut Queue) -> Result<()> {
let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?;
self.txs[index].process_desc_chain(&mem, &mut self.taps[index].0, &mut queue);
self.tx
.process_desc_chain(&mem, &mut self.tap.0, &mut queue);
Ok(())
}
fn read_tap(&mut self, index: usize) -> io::Result<usize> {
self.taps[index].0.read(&mut self.rxs[index].frame_buf)
fn read_tap(&mut self) -> io::Result<usize> {
self.tap.0.read(&mut self.rx.frame_buf)
}
pub fn set_vring_worker(&mut self, vring_worker: Option<Arc<VringWorker>>) {
@ -250,7 +218,7 @@ impl VhostUserNetThread {
}
pub struct VhostUserNetBackend {
thread: Mutex<VhostUserNetThread>,
threads: Vec<Mutex<VhostUserNetThread>>,
num_queues: usize,
queue_size: u16,
}
@ -263,12 +231,17 @@ impl VhostUserNetBackend {
queue_size: u16,
ifname: Option<&str>,
) -> Result<Self> {
let thread = Mutex::new(VhostUserNetThread::new(
ip_addr, netmask, num_queues, ifname,
)?);
let mut taps = open_tap(ifname, Some(ip_addr), Some(netmask), num_queues / 2)
.map_err(Error::OpenTap)?;
let mut threads = Vec::new();
for tap in taps.drain(..) {
let thread = Mutex::new(VhostUserNetThread::new(tap, num_queues)?);
threads.push(thread);
}
Ok(VhostUserNetBackend {
thread,
threads,
num_queues,
queue_size,
})
@ -302,7 +275,9 @@ impl VhostUserBackend for VhostUserNetBackend {
fn set_event_idx(&mut self, _enabled: bool) {}
fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> {
self.thread.lock().unwrap().mem = Some(mem);
for thread in self.threads.iter() {
thread.lock().unwrap().mem = Some(mem.clone());
}
Ok(())
}
@ -319,43 +294,41 @@ impl VhostUserBackend for VhostUserNetBackend {
let tap_start_index = self.num_queues as u16;
let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16;
let mut thread = self.thread.lock().unwrap();
let mut thread = self.threads[0].lock().unwrap();
match device_event {
x if ((x < self.num_queues as u16) && (x % 2 == 0)) => {
let index = (x / 2) as usize;
let mut vring = vrings[x as usize].write().unwrap();
thread.resume_rx(&mut vring, index)?;
thread.resume_rx(&mut vring)?;
if !thread.rx_tap_listenings[index] {
if !thread.rx_tap_listening {
thread.vring_worker.as_ref().unwrap().register_listener(
thread.taps[index].0.as_raw_fd(),
thread.tap.0.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::try_from(thread.taps[index].1).unwrap(),
u64::try_from(thread.tap.1).unwrap(),
)?;
thread.rx_tap_listenings[index] = true;
thread.rx_tap_listening = true;
}
}
x if ((x < thread.num_queues as u16) && (x % 2 != 0)) => {
let index = ((x - 1) / 2) as usize;
x if ((x < self.num_queues as u16) && (x % 2 != 0)) => {
let mut vring = vrings[x as usize].write().unwrap();
thread.process_tx(&mut vring.mut_queue(), index)?;
thread.process_tx(&mut vring.mut_queue())?;
}
x if x >= tap_start_index && x <= tap_end_index => {
let index = x as usize - self.num_queues;
let mut vring = vrings[2 * index].write().unwrap();
if thread.rxs[index].deferred_frame
if thread.rx.deferred_frame
// Process a deferred frame first if available. Don't read from tap again
// until we manage to receive this deferred frame.
{
if thread.rx_single_frame(&mut vring.mut_queue(), index)? {
thread.rxs[index].deferred_frame = false;
thread.process_rx(&mut vring, index)?;
} else if thread.rxs[index].deferred_irqs {
thread.rxs[index].deferred_irqs = false;
if thread.rx_single_frame(&mut vring.mut_queue())? {
thread.rx.deferred_frame = false;
thread.process_rx(&mut vring)?;
} else if thread.rx.deferred_irqs {
thread.rx.deferred_irqs = false;
vring.signal_used_queue()?;
}
} else {
thread.process_rx(&mut vring, index)?;
thread.process_rx(&mut vring)?;
}
}
_ => return Err(Error::HandleEventUnknownEvent.into()),
@ -368,7 +341,12 @@ impl VhostUserBackend for VhostUserNetBackend {
let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16;
let kill_index = tap_end_index + 1;
Some((
self.thread.lock().unwrap().kill_evt.try_clone().unwrap(),
self.threads[0]
.lock()
.unwrap()
.kill_evt
.try_clone()
.unwrap(),
Some(kill_index),
))
}
@ -450,7 +428,7 @@ pub fn start_net_backend(backend_command: &str) {
let backend_config = match VhostUserNetBackendConfig::parse(backend_command) {
Ok(config) => config,
Err(e) => {
println!("Failed parsing parameters {:?}", e);
eprintln!("Failed parsing parameters {:?}", e);
process::exit(1);
}
};
@ -473,18 +451,22 @@ pub fn start_net_backend(backend_command: &str) {
)
.unwrap();
let vring_worker = net_daemon.get_vring_workers();
let mut vring_workers = net_daemon.get_vring_workers();
net_backend
.write()
.unwrap()
.thread
.lock()
.unwrap()
.set_vring_worker(Some(vring_worker[0].clone()));
if vring_workers.len() != net_backend.read().unwrap().threads.len() {
error!("Number of vring workers must be identical to the number of backend threads");
process::exit(1);
}
for thread in net_backend.read().unwrap().threads.iter() {
thread
.lock()
.unwrap()
.set_vring_worker(Some(vring_workers.remove(0)));
}
if let Err(e) = net_daemon.start() {
println!(
error!(
"failed to start daemon for vhost-user-net with error: {:?}",
e
);
@ -495,16 +477,9 @@ pub fn start_net_backend(backend_command: &str) {
error!("Error from the main thread: {:?}", e);
}
let kill_evt = net_backend
.write()
.unwrap()
.thread
.lock()
.unwrap()
.kill_evt
.try_clone()
.unwrap();
if let Err(e) = kill_evt.write(1) {
error!("Error shutting down worker thread: {:?}", e)
for thread in net_backend.read().unwrap().threads.iter() {
if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
error!("Error shutting down worker thread: {:?}", e)
}
}
}