diff --git a/vhost_user_net/src/lib.rs b/vhost_user_net/src/lib.rs index 34983540c..53d6ced70 100644 --- a/vhost_user_net/src/lib.rs +++ b/vhost_user_net/src/lib.rs @@ -96,97 +96,64 @@ struct VhostUserNetThread { mem: Option, vring_worker: Option>, kill_evt: EventFd, - taps: Vec<(Tap, usize)>, - rxs: Vec, - txs: Vec, - rx_tap_listenings: Vec, - num_queues: usize, + tap: (Tap, usize), + rx: RxVirtio, + tx: TxVirtio, + rx_tap_listening: bool, } impl VhostUserNetThread { /// Create a new virtio network device with the given TAP interface. - fn new_with_tap(taps: Vec, num_queues: usize) -> Result { - let mut taps_v: Vec<(Tap, usize)> = Vec::new(); - for (i, tap) in taps.iter().enumerate() { - taps_v.push((tap.clone(), num_queues + i)); - } - - let mut rxs: Vec = Vec::new(); - let mut txs: Vec = Vec::new(); - let mut rx_tap_listenings: Vec = Vec::new(); - - for _ in 0..taps.len() { - let rx = RxVirtio::new(); - rxs.push(rx); - let tx = TxVirtio::new(); - txs.push(tx); - rx_tap_listenings.push(false); - } - + fn new(tap: Tap, tap_evt_index: usize) -> Result { Ok(VhostUserNetThread { mem: None, vring_worker: None, kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, - taps: taps_v, - rxs, - txs, - rx_tap_listenings, - num_queues, + tap: (tap, tap_evt_index), + rx: RxVirtio::new(), + tx: TxVirtio::new(), + rx_tap_listening: false, }) } - /// Create a new virtio network device with the given IP address and - /// netmask. - fn new( - ip_addr: Ipv4Addr, - netmask: Ipv4Addr, - num_queues: usize, - ifname: Option<&str>, - ) -> Result { - let taps = open_tap(ifname, Some(ip_addr), Some(netmask), num_queues / 2) - .map_err(Error::OpenTap)?; - - Self::new_with_tap(taps, num_queues) - } - // Copies a single frame from `self.rx.frame_buf` into the guest. Returns true // if a buffer was used, and false if the frame must be deferred until a buffer // is made available by the driver. - fn rx_single_frame(&mut self, mut queue: &mut Queue, index: usize) -> Result { + fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result { let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?; let next_desc = queue.iter(&mem).next(); if next_desc.is_none() { // Queue has no available descriptors - if self.rx_tap_listenings[index] { + if self.rx_tap_listening { self.vring_worker .as_ref() .unwrap() .unregister_listener( - self.taps[index].0.as_raw_fd(), + self.tap.0.as_raw_fd(), epoll::Events::EPOLLIN, - u64::try_from(self.taps[index].1).unwrap(), + u64::try_from(self.tap.1).unwrap(), ) .unwrap(); - self.rx_tap_listenings[index] = false; + self.rx_tap_listening = false; } return Ok(false); } - let write_complete = self.rxs[index].process_desc_chain(&mem, next_desc, &mut queue); + let write_complete = self.rx.process_desc_chain(&mem, next_desc, &mut queue); Ok(write_complete) } - fn process_rx(&mut self, vring: &mut Vring, index: usize) -> Result<()> { + fn process_rx(&mut self, vring: &mut Vring) -> Result<()> { // Read as many frames as possible. loop { - match self.read_tap(index) { + match self.read_tap() { Ok(count) => { - self.rxs[index].bytes_read = count; - if !self.rx_single_frame(&mut vring.mut_queue(), index)? { - self.rxs[index].deferred_frame = true; + self.rx.bytes_read = count; + if !self.rx_single_frame(&mut vring.mut_queue())? { + self.rx.deferred_frame = true; break; } } @@ -204,8 +171,8 @@ impl VhostUserNetThread { } } } - if self.rxs[index].deferred_irqs { - self.rxs[index].deferred_irqs = false; + if self.rx.deferred_irqs { + self.rx.deferred_irqs = false; vring.signal_used_queue().unwrap(); Ok(()) } else { @@ -213,15 +180,15 @@ impl VhostUserNetThread { } } - fn resume_rx(&mut self, vring: &mut Vring, index: usize) -> Result<()> { - if self.rxs[index].deferred_frame { - if self.rx_single_frame(&mut vring.mut_queue(), index)? { - self.rxs[index].deferred_frame = false; + fn resume_rx(&mut self, vring: &mut Vring) -> Result<()> { + if self.rx.deferred_frame { + if self.rx_single_frame(&mut vring.mut_queue())? { + self.rx.deferred_frame = false; // process_rx() was interrupted possibly before consuming all // packets in the tap; try continuing now. - self.process_rx(vring, index) - } else if self.rxs[index].deferred_irqs { - self.rxs[index].deferred_irqs = false; + self.process_rx(vring) + } else if self.rx.deferred_irqs { + self.rx.deferred_irqs = false; vring.signal_used_queue().unwrap(); Ok(()) } else { @@ -232,16 +199,17 @@ impl VhostUserNetThread { } } - fn process_tx(&mut self, mut queue: &mut Queue, index: usize) -> Result<()> { + fn process_tx(&mut self, mut queue: &mut Queue) -> Result<()> { let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?; - self.txs[index].process_desc_chain(&mem, &mut self.taps[index].0, &mut queue); + self.tx + .process_desc_chain(&mem, &mut self.tap.0, &mut queue); Ok(()) } - fn read_tap(&mut self, index: usize) -> io::Result { - self.taps[index].0.read(&mut self.rxs[index].frame_buf) + fn read_tap(&mut self) -> io::Result { + self.tap.0.read(&mut self.rx.frame_buf) } pub fn set_vring_worker(&mut self, vring_worker: Option>) { @@ -250,7 +218,7 @@ impl VhostUserNetThread { } pub struct VhostUserNetBackend { - thread: Mutex, + threads: Vec>, num_queues: usize, queue_size: u16, } @@ -263,12 +231,17 @@ impl VhostUserNetBackend { queue_size: u16, ifname: Option<&str>, ) -> Result { - let thread = Mutex::new(VhostUserNetThread::new( - ip_addr, netmask, num_queues, ifname, - )?); + let mut taps = open_tap(ifname, Some(ip_addr), Some(netmask), num_queues / 2) + .map_err(Error::OpenTap)?; + + let mut threads = Vec::new(); + for tap in taps.drain(..) { + let thread = Mutex::new(VhostUserNetThread::new(tap, num_queues)?); + threads.push(thread); + } Ok(VhostUserNetBackend { - thread, + threads, num_queues, queue_size, }) @@ -302,7 +275,9 @@ impl VhostUserBackend for VhostUserNetBackend { fn set_event_idx(&mut self, _enabled: bool) {} fn update_memory(&mut self, mem: GuestMemoryMmap) -> VhostUserBackendResult<()> { - self.thread.lock().unwrap().mem = Some(mem); + for thread in self.threads.iter() { + thread.lock().unwrap().mem = Some(mem.clone()); + } Ok(()) } @@ -319,43 +294,41 @@ impl VhostUserBackend for VhostUserNetBackend { let tap_start_index = self.num_queues as u16; let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16; - let mut thread = self.thread.lock().unwrap(); + let mut thread = self.threads[0].lock().unwrap(); match device_event { x if ((x < self.num_queues as u16) && (x % 2 == 0)) => { - let index = (x / 2) as usize; let mut vring = vrings[x as usize].write().unwrap(); - thread.resume_rx(&mut vring, index)?; + thread.resume_rx(&mut vring)?; - if !thread.rx_tap_listenings[index] { + if !thread.rx_tap_listening { thread.vring_worker.as_ref().unwrap().register_listener( - thread.taps[index].0.as_raw_fd(), + thread.tap.0.as_raw_fd(), epoll::Events::EPOLLIN, - u64::try_from(thread.taps[index].1).unwrap(), + u64::try_from(thread.tap.1).unwrap(), )?; - thread.rx_tap_listenings[index] = true; + thread.rx_tap_listening = true; } } - x if ((x < thread.num_queues as u16) && (x % 2 != 0)) => { - let index = ((x - 1) / 2) as usize; + x if ((x < self.num_queues as u16) && (x % 2 != 0)) => { let mut vring = vrings[x as usize].write().unwrap(); - thread.process_tx(&mut vring.mut_queue(), index)?; + thread.process_tx(&mut vring.mut_queue())?; } x if x >= tap_start_index && x <= tap_end_index => { let index = x as usize - self.num_queues; let mut vring = vrings[2 * index].write().unwrap(); - if thread.rxs[index].deferred_frame + if thread.rx.deferred_frame // Process a deferred frame first if available. Don't read from tap again // until we manage to receive this deferred frame. { - if thread.rx_single_frame(&mut vring.mut_queue(), index)? { - thread.rxs[index].deferred_frame = false; - thread.process_rx(&mut vring, index)?; - } else if thread.rxs[index].deferred_irqs { - thread.rxs[index].deferred_irqs = false; + if thread.rx_single_frame(&mut vring.mut_queue())? { + thread.rx.deferred_frame = false; + thread.process_rx(&mut vring)?; + } else if thread.rx.deferred_irqs { + thread.rx.deferred_irqs = false; vring.signal_used_queue()?; } } else { - thread.process_rx(&mut vring, index)?; + thread.process_rx(&mut vring)?; } } _ => return Err(Error::HandleEventUnknownEvent.into()), @@ -368,7 +341,12 @@ impl VhostUserBackend for VhostUserNetBackend { let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16; let kill_index = tap_end_index + 1; Some(( - self.thread.lock().unwrap().kill_evt.try_clone().unwrap(), + self.threads[0] + .lock() + .unwrap() + .kill_evt + .try_clone() + .unwrap(), Some(kill_index), )) } @@ -450,7 +428,7 @@ pub fn start_net_backend(backend_command: &str) { let backend_config = match VhostUserNetBackendConfig::parse(backend_command) { Ok(config) => config, Err(e) => { - println!("Failed parsing parameters {:?}", e); + eprintln!("Failed parsing parameters {:?}", e); process::exit(1); } }; @@ -473,18 +451,22 @@ pub fn start_net_backend(backend_command: &str) { ) .unwrap(); - let vring_worker = net_daemon.get_vring_workers(); + let mut vring_workers = net_daemon.get_vring_workers(); - net_backend - .write() - .unwrap() - .thread - .lock() - .unwrap() - .set_vring_worker(Some(vring_worker[0].clone())); + if vring_workers.len() != net_backend.read().unwrap().threads.len() { + error!("Number of vring workers must be identical to the number of backend threads"); + process::exit(1); + } + + for thread in net_backend.read().unwrap().threads.iter() { + thread + .lock() + .unwrap() + .set_vring_worker(Some(vring_workers.remove(0))); + } if let Err(e) = net_daemon.start() { - println!( + error!( "failed to start daemon for vhost-user-net with error: {:?}", e ); @@ -495,16 +477,9 @@ pub fn start_net_backend(backend_command: &str) { error!("Error from the main thread: {:?}", e); } - let kill_evt = net_backend - .write() - .unwrap() - .thread - .lock() - .unwrap() - .kill_evt - .try_clone() - .unwrap(); - if let Err(e) = kill_evt.write(1) { - error!("Error shutting down worker thread: {:?}", e) + for thread in net_backend.read().unwrap().threads.iter() { + if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { + error!("Error shutting down worker thread: {:?}", e) + } } }