vm-virtio: Implement multiple queue support for net devices

Update the common part in net_util.rs under vm-virtio to add mq
support, meanwhile enable mq for virtio-net device, vhost-user-net
device and vhost-user-net backend. Multiple threads will be created,
one thread will be responsible to handle one queue pair separately.
To gain the better performance, it requires to have the same amount
of vcpus as queue pair numbers defined for the net device, due to
the cpu affinity.

Multiple thread support is not added for vhost-user-net backend
currently, it will be added in future.

Signed-off-by: Cathy Zhang <cathy.zhang@intel.com>
This commit is contained in:
Cathy Zhang 2020-01-10 01:29:00 +08:00 committed by Sebastien Boeuf
parent 404316eea1
commit 652e7b9b8a
7 changed files with 353 additions and 169 deletions

View File

@ -19,6 +19,7 @@ use epoll;
use libc::{self, EAGAIN, EFD_NONBLOCK};
use log::*;
use net_util::Tap;
use std::convert::TryFrom;
use std::fmt;
use std::io::Read;
use std::io::{self};
@ -32,15 +33,10 @@ use vhost_rs::vhost_user::Error as VhostUserError;
use vhost_user_backend::{VhostUserBackend, VhostUserDaemon, Vring, VringWorker};
use virtio_bindings::bindings::virtio_net::*;
use vm_memory::GuestMemoryMmap;
use vm_virtio::net_util::{
open_tap, RxVirtio, TxVirtio, KILL_EVENT, RX_QUEUE_EVENT, RX_TAP_EVENT, TX_QUEUE_EVENT,
};
use vm_virtio::net_util::{open_tap, RxVirtio, TxVirtio};
use vm_virtio::Queue;
use vmm_sys_util::eventfd::EventFd;
const QUEUE_SIZE: usize = 1024;
const NUM_QUEUES: usize = 2;
pub type VhostUserResult<T> = std::result::Result<T, VhostUserError>;
pub type Result<T> = std::result::Result<T, Error>;
pub type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
@ -77,6 +73,10 @@ pub enum Error {
ParseIpParam,
/// Failed to parse mask parameter.
ParseMaskParam,
/// Failed to parse queue number.
ParseQueueNumParam,
/// Failed to parse queue size.
ParseQueueSizeParam,
/// Open tap device failed.
OpenTap(vm_virtio::net_util::Error),
}
@ -99,10 +99,12 @@ struct VhostUserNetBackend {
mem: Option<GuestMemoryMmap>,
vring_worker: Option<Arc<VringWorker>>,
kill_evt: EventFd,
tap: Tap,
rx: RxVirtio,
tx: TxVirtio,
rx_tap_listening: bool,
taps: Vec<(Tap, usize)>,
rxs: Vec<RxVirtio>,
txs: Vec<TxVirtio>,
rx_tap_listenings: Vec<bool>,
num_queues: usize,
queue_size: u16,
}
impl std::clone::Clone for VhostUserNetBackend {
@ -111,76 +113,101 @@ impl std::clone::Clone for VhostUserNetBackend {
mem: self.mem.clone(),
vring_worker: self.vring_worker.clone(),
kill_evt: self.kill_evt.try_clone().unwrap(),
tap: self.tap.clone(),
rx: self.rx.clone(),
tx: self.tx.clone(),
rx_tap_listening: self.rx_tap_listening,
taps: self.taps.clone(),
rxs: self.rxs.clone(),
txs: self.txs.clone(),
rx_tap_listenings: self.rx_tap_listenings.clone(),
num_queues: self.num_queues,
queue_size: self.queue_size,
}
}
}
impl VhostUserNetBackend {
/// Create a new virtio network device with the given TAP interface.
pub fn new_with_tap(tap: Tap) -> Result<Self> {
let rx = RxVirtio::new();
let tx = TxVirtio::new();
pub fn new_with_tap(taps: Vec<Tap>, num_queues: usize, queue_size: u16) -> Result<Self> {
let mut taps_v: Vec<(Tap, usize)> = Vec::new();
for (i, tap) in taps.iter().enumerate() {
taps_v.push((tap.clone(), num_queues + i));
}
let mut rxs: Vec<RxVirtio> = Vec::new();
let mut txs: Vec<TxVirtio> = Vec::new();
let mut rx_tap_listenings: Vec<bool> = Vec::new();
for _ in 0..taps.len() {
let rx = RxVirtio::new();
rxs.push(rx);
let tx = TxVirtio::new();
txs.push(tx);
rx_tap_listenings.push(false);
}
Ok(VhostUserNetBackend {
mem: None,
vring_worker: None,
kill_evt: EventFd::new(EFD_NONBLOCK).map_err(|_| Error::CreateKillEventFd)?,
tap,
rx,
tx,
rx_tap_listening: false,
taps: taps_v,
rxs,
txs,
rx_tap_listenings,
num_queues,
queue_size,
})
}
/// Create a new virtio network device with the given IP address and
/// netmask.
pub fn new(ip_addr: Ipv4Addr, netmask: Ipv4Addr) -> Result<Self> {
let tap = open_tap(ip_addr, netmask).map_err(Error::OpenTap)?;
pub fn new(
ip_addr: Ipv4Addr,
netmask: Ipv4Addr,
num_queues: usize,
queue_size: u16,
) -> Result<Self> {
let taps =
open_tap(None, Some(ip_addr), Some(netmask), num_queues / 2).map_err(Error::OpenTap)?;
Self::new_with_tap(tap)
Self::new_with_tap(taps, num_queues, queue_size)
}
// Copies a single frame from `self.rx.frame_buf` into the guest. Returns true
// if a buffer was used, and false if the frame must be deferred until a buffer
// is made available by the driver.
fn rx_single_frame(&mut self, mut queue: &mut Queue) -> Result<bool> {
fn rx_single_frame(&mut self, mut queue: &mut Queue, index: usize) -> Result<bool> {
let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?;
let next_desc = queue.iter(&mem).next();
if next_desc.is_none() {
// Queue has no available descriptors
if self.rx_tap_listening {
if self.rx_tap_listenings[index] {
self.vring_worker
.as_ref()
.unwrap()
.unregister_listener(
self.tap.as_raw_fd(),
self.taps[index].0.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT),
u64::try_from(self.taps[index].1).unwrap(),
)
.unwrap();
self.rx_tap_listening = false;
self.rx_tap_listenings[index] = false;
}
return Ok(false);
}
let write_complete = self.rx.process_desc_chain(&mem, next_desc, &mut queue);
let write_complete = self.rxs[index].process_desc_chain(&mem, next_desc, &mut queue);
Ok(write_complete)
}
fn process_rx(&mut self, vring: &mut Vring) -> Result<()> {
fn process_rx(&mut self, vring: &mut Vring, index: usize) -> Result<()> {
// Read as many frames as possible.
loop {
match self.read_tap() {
match self.read_tap(index) {
Ok(count) => {
self.rx.bytes_read = count;
if !self.rx_single_frame(&mut vring.mut_queue())? {
self.rx.deferred_frame = true;
self.rxs[index].bytes_read = count;
if !self.rx_single_frame(&mut vring.mut_queue(), index)? {
self.rxs[index].deferred_frame = true;
break;
}
}
@ -198,8 +225,8 @@ impl VhostUserNetBackend {
}
}
}
if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
if self.rxs[index].deferred_irqs {
self.rxs[index].deferred_irqs = false;
vring.signal_used_queue().unwrap();
Ok(())
} else {
@ -207,15 +234,15 @@ impl VhostUserNetBackend {
}
}
fn resume_rx(&mut self, vring: &mut Vring) -> Result<()> {
if self.rx.deferred_frame {
if self.rx_single_frame(&mut vring.mut_queue())? {
self.rx.deferred_frame = false;
fn resume_rx(&mut self, vring: &mut Vring, index: usize) -> Result<()> {
if self.rxs[index].deferred_frame {
if self.rx_single_frame(&mut vring.mut_queue(), index)? {
self.rxs[index].deferred_frame = false;
// process_rx() was interrupted possibly before consuming all
// packets in the tap; try continuing now.
self.process_rx(vring)
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
self.process_rx(vring, index)
} else if self.rxs[index].deferred_irqs {
self.rxs[index].deferred_irqs = false;
vring.signal_used_queue().unwrap();
Ok(())
} else {
@ -226,26 +253,26 @@ impl VhostUserNetBackend {
}
}
fn process_tx(&mut self, mut queue: &mut Queue) -> Result<()> {
fn process_tx(&mut self, mut queue: &mut Queue, index: usize) -> Result<()> {
let mem = self.mem.as_ref().ok_or(Error::NoMemoryConfigured)?;
self.tx.process_desc_chain(&mem, &mut self.tap, &mut queue);
self.txs[index].process_desc_chain(&mem, &mut self.taps[index].0, &mut queue);
Ok(())
}
fn read_tap(&mut self) -> io::Result<usize> {
self.tap.read(&mut self.rx.frame_buf)
fn read_tap(&mut self, index: usize) -> io::Result<usize> {
self.taps[index].0.read(&mut self.rxs[index].frame_buf)
}
}
impl VhostUserBackend for VhostUserNetBackend {
fn num_queues(&self) -> usize {
NUM_QUEUES
self.num_queues
}
fn max_queue_size(&self) -> usize {
QUEUE_SIZE
self.queue_size as usize
}
fn features(&self) -> u64 {
@ -278,42 +305,49 @@ impl VhostUserBackend for VhostUserNetBackend {
return Err(Error::HandleEventNotEpollIn.into());
}
match device_event {
RX_QUEUE_EVENT => {
let mut vring = vrings[0].write().unwrap();
self.resume_rx(&mut vring)?;
let tap_start_index = self.num_queues as u16;
let tap_end_index = (self.num_queues + self.num_queues / 2 - 1) as u16;
let kill_index = tap_end_index + 1;
if !self.rx_tap_listening {
match device_event {
x if ((x < self.num_queues as u16) && (x % 2 == 0)) => {
let index = (x / 2) as usize;
let mut vring = vrings[x as usize].write().unwrap();
self.resume_rx(&mut vring, index)?;
if !self.rx_tap_listenings[index] {
self.vring_worker.as_ref().unwrap().register_listener(
self.tap.as_raw_fd(),
self.taps[index].0.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(RX_TAP_EVENT),
u64::try_from(self.taps[index].1).unwrap(),
)?;
self.rx_tap_listening = true;
self.rx_tap_listenings[index] = true;
}
}
TX_QUEUE_EVENT => {
let mut vring = vrings[1].write().unwrap();
self.process_tx(&mut vring.mut_queue())?;
x if ((x < self.num_queues as u16) && (x % 2 != 0)) => {
let index = ((x - 1) / 2) as usize;
let mut vring = vrings[x as usize].write().unwrap();
self.process_tx(&mut vring.mut_queue(), index)?;
}
RX_TAP_EVENT => {
let mut vring = vrings[0].write().unwrap();
if self.rx.deferred_frame
x if x >= tap_start_index && x <= tap_end_index => {
let index = x as usize - self.num_queues;
let mut vring = vrings[2 * index].write().unwrap();
if self.rxs[index].deferred_frame
// Process a deferred frame first if available. Don't read from tap again
// until we manage to receive this deferred frame.
{
if self.rx_single_frame(&mut vring.mut_queue())? {
self.rx.deferred_frame = false;
self.process_rx(&mut vring)?;
} else if self.rx.deferred_irqs {
self.rx.deferred_irqs = false;
if self.rx_single_frame(&mut vring.mut_queue(), index)? {
self.rxs[index].deferred_frame = false;
self.process_rx(&mut vring, index)?;
} else if self.rxs[index].deferred_irqs {
self.rxs[index].deferred_irqs = false;
vring.signal_used_queue()?;
}
} else {
self.process_rx(&mut vring)?;
self.process_rx(&mut vring, index)?;
}
}
KILL_EVENT => {
x if x == kill_index => {
self.kill_evt.read().unwrap();
return Ok(true);
}
@ -328,6 +362,8 @@ pub struct VhostUserNetBackendConfig<'a> {
pub ip: Ipv4Addr,
pub mask: Ipv4Addr,
pub sock: &'a str,
pub num_queues: usize,
pub queue_size: u16,
}
impl<'a> VhostUserNetBackendConfig<'a> {
@ -337,6 +373,8 @@ impl<'a> VhostUserNetBackendConfig<'a> {
let mut ip_str: &str = "";
let mut mask_str: &str = "";
let mut sock: &str = "";
let mut num_queues_str: &str = "";
let mut queue_size_str: &str = "";
for param in params_list.iter() {
if param.starts_with("ip=") {
@ -345,11 +383,17 @@ impl<'a> VhostUserNetBackendConfig<'a> {
mask_str = &param[5..];
} else if param.starts_with("sock=") {
sock = &param[5..];
} else if param.starts_with("num_queues=") {
num_queues_str = &param[11..];
} else if param.starts_with("queue_size=") {
queue_size_str = &param[11..];
}
}
let mut ip: Ipv4Addr = Ipv4Addr::new(192, 168, 100, 1);
let mut mask: Ipv4Addr = Ipv4Addr::new(255, 255, 255, 0);
let mut num_queues: usize = 2;
let mut queue_size: u16 = 256;
if sock.is_empty() {
return Err(Error::ParseSockParam);
@ -360,8 +404,24 @@ impl<'a> VhostUserNetBackendConfig<'a> {
if !mask_str.is_empty() {
mask = mask_str.parse().map_err(|_| Error::ParseMaskParam)?;
}
if !num_queues_str.is_empty() {
num_queues = num_queues_str
.parse()
.map_err(|_| Error::ParseQueueNumParam)?;
}
if !queue_size_str.is_empty() {
queue_size = queue_size_str
.parse()
.map_err(|_| Error::ParseQueueSizeParam)?;
}
Ok(VhostUserNetBackendConfig { ip, mask, sock })
Ok(VhostUserNetBackendConfig {
ip,
mask,
sock,
num_queues,
queue_size,
})
}
}
@ -375,7 +435,9 @@ fn main() {
.long("backend")
.help(
"Backend parameters \"ip=<ip_addr>,\
mask=<net_mask>,sock=<socket_path>\"",
mask=<net_mask>,sock=<socket_path>,\
num_queues=<number_of_queues>,\
queue_size=<size_of_each_queue>\"",
)
.takes_value(true)
.min_values(1),
@ -393,7 +455,13 @@ fn main() {
};
let net_backend = Arc::new(RwLock::new(
VhostUserNetBackend::new(backend_config.ip, backend_config.mask).unwrap(),
VhostUserNetBackend::new(
backend_config.ip,
backend_config.mask,
backend_config.num_queues,
backend_config.queue_size,
)
.unwrap(),
));
let name = "vhost-user-net-backend";
let mut net_daemon = VhostUserDaemon::new(
@ -404,10 +472,12 @@ fn main() {
.unwrap();
let vring_worker = net_daemon.get_vring_worker();
let kill_index = (net_backend.read().unwrap().num_queues
+ net_backend.read().unwrap().num_queues / 2) as u16;
if let Err(e) = vring_worker.register_listener(
net_backend.read().unwrap().kill_evt.as_raw_fd(),
epoll::Events::EPOLLIN,
u64::from(KILL_EVENT),
u64::from(kill_index),
) {
println!("failed to register listener for kill event: {:?}", e);
process::exit(1);

View File

@ -134,7 +134,7 @@ macro_rules! virtio_pausable_inner {
Ok(())
}
};
($ctrl_q:expr) => {
($ctrl_q:expr, $mq:expr) => {
fn pause(&mut self) -> result::Result<(), MigratableError> {
debug!(
"Pausing virtio-{}",
@ -158,7 +158,9 @@ macro_rules! virtio_pausable_inner {
self.paused.store(false, Ordering::SeqCst);
if let Some(epoll_thread) = &self.epoll_thread {
epoll_thread.thread().unpark();
for i in 0..epoll_thread.len() {
epoll_thread[i].thread().unpark();
}
}
if let Some(ctrl_queue_epoll_thread) = &self.ctrl_queue_epoll_thread {
@ -177,9 +179,9 @@ macro_rules! virtio_pausable {
virtio_pausable_inner!();
}
};
($name:ident, $ctrl_q:expr) => {
($name:ident, $ctrl_q:expr, $mq:expr) => {
impl Pausable for $name {
virtio_pausable_inner!($ctrl_q);
virtio_pausable_inner!($ctrl_q, $mq);
}
};
}

View File

@ -62,7 +62,7 @@ const VIRTIO_F_IOMMU_PLATFORM: u32 = 33;
const VIRTIO_F_IN_ORDER: u32 = 35;
// Types taken from linux/virtio_ids.h
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone)]
#[allow(dead_code)]
#[allow(non_camel_case_types)]
#[repr(C)]

View File

@ -6,9 +6,9 @@
// found in the THIRD-PARTY file.
use super::net_util::{
build_net_config_space, open_tap, register_listener, unregister_listener, CtrlVirtio,
NetCtrlEpollHandler, RxVirtio, TxVirtio, KILL_EVENT, NET_EVENTS_COUNT, PAUSE_EVENT,
RX_QUEUE_EVENT, RX_TAP_EVENT, TX_QUEUE_EVENT,
build_net_config_space, build_net_config_space_with_mq, open_tap, register_listener,
unregister_listener, CtrlVirtio, NetCtrlEpollHandler, RxVirtio, TxVirtio, KILL_EVENT,
NET_EVENTS_COUNT, PAUSE_EVENT, RX_QUEUE_EVENT, RX_TAP_EVENT, TX_QUEUE_EVENT,
};
use super::Error as DeviceError;
use super::{
@ -35,10 +35,6 @@ use vm_device::{Migratable, MigratableError, Pausable, Snapshotable};
use vm_memory::GuestMemoryMmap;
use vmm_sys_util::eventfd::EventFd;
const QUEUE_SIZE: u16 = 256;
const NUM_QUEUES: usize = 3;
const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE; NUM_QUEUES];
#[derive(Debug)]
pub enum Error {
/// Failed to open taps.
@ -296,7 +292,7 @@ impl NetEpollHandler {
pub struct Net {
kill_evt: Option<EventFd>,
pause_evt: Option<EventFd>,
tap: Option<Tap>,
taps: Option<Vec<Tap>>,
avail_features: u64,
acked_features: u64,
// The config space will only consist of the MAC address specified by the user,
@ -304,14 +300,21 @@ pub struct Net {
config_space: Vec<u8>,
queue_evts: Option<Vec<EventFd>>,
interrupt_cb: Option<Arc<VirtioInterrupt>>,
epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>,
epoll_thread: Option<Vec<thread::JoinHandle<result::Result<(), DeviceError>>>>,
ctrl_queue_epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>,
paused: Arc<AtomicBool>,
queue_size: Vec<u16>,
}
impl Net {
/// Create a new virtio network device with the given TAP interface.
pub fn new_with_tap(tap: Tap, guest_mac: Option<MacAddr>, iommu: bool) -> Result<Self> {
pub fn new_with_tap(
taps: Vec<Tap>,
guest_mac: Option<MacAddr>,
iommu: bool,
num_queues: usize,
queue_size: u16,
) -> Result<Self> {
let mut avail_features = 1 << VIRTIO_NET_F_GUEST_CSUM
| 1 << VIRTIO_NET_F_CSUM
| 1 << VIRTIO_NET_F_GUEST_TSO4
@ -325,18 +328,20 @@ impl Net {
}
avail_features |= 1 << VIRTIO_NET_F_CTRL_VQ;
let queue_num = num_queues + 1;
let config_space;
let mut config_space;
if let Some(mac) = guest_mac {
config_space = build_net_config_space(mac, &mut avail_features);
config_space = build_net_config_space(mac, num_queues, &mut avail_features);
} else {
config_space = Vec::new();
build_net_config_space_with_mq(num_queues, &mut config_space, &mut avail_features);
}
Ok(Net {
kill_evt: None,
pause_evt: None,
tap: Some(tap),
taps: Some(taps),
avail_features,
acked_features: 0u64,
config_space,
@ -345,20 +350,24 @@ impl Net {
epoll_thread: None,
ctrl_queue_epoll_thread: None,
paused: Arc::new(AtomicBool::new(false)),
queue_size: vec![queue_size; queue_num],
})
}
/// Create a new virtio network device with the given IP address and
/// netmask.
pub fn new(
ip_addr: Ipv4Addr,
netmask: Ipv4Addr,
if_name: Option<&str>,
ip_addr: Option<Ipv4Addr>,
netmask: Option<Ipv4Addr>,
guest_mac: Option<MacAddr>,
iommu: bool,
num_queues: usize,
queue_size: u16,
) -> Result<Self> {
let tap = open_tap(ip_addr, netmask).map_err(Error::OpenTap)?;
let taps = open_tap(if_name, ip_addr, netmask, num_queues / 2).map_err(Error::OpenTap)?;
Self::new_with_tap(tap, guest_mac, iommu)
Self::new_with_tap(taps, guest_mac, iommu, num_queues, queue_size)
}
}
@ -377,7 +386,7 @@ impl VirtioDevice for Net {
}
fn queue_max_sizes(&self) -> &[u16] {
QUEUE_SIZES
&self.queue_size.as_slice()
}
fn features(&self, page: u32) -> u32 {
@ -444,10 +453,10 @@ impl VirtioDevice for Net {
mut queues: Vec<Queue>,
mut queue_evts: Vec<EventFd>,
) -> ActivateResult {
if queues.len() != NUM_QUEUES || queue_evts.len() != NUM_QUEUES {
if queues.len() != self.queue_size.len() || queue_evts.len() != self.queue_size.len() {
error!(
"Cannot perform activate. Expected {} queue(s), got {}",
NUM_QUEUES,
self.queue_size.len(),
queues.len()
);
return Err(ActivateError::BadActivate);
@ -469,7 +478,7 @@ impl VirtioDevice for Net {
})?;
self.pause_evt = Some(self_pause_evt);
if let Some(tap) = self.tap.clone() {
if let Some(mut taps) = self.taps.clone() {
// Save the interrupt EventFD as we need to return it on reset
// but clone it to pass into the thread.
self.interrupt_cb = Some(interrupt_cb.clone());
@ -509,33 +518,44 @@ impl VirtioDevice for Net {
})?;
}
let mut queues_v = Vec::new();
let mut queue_evts_v = Vec::new();
queues_v.push(queues.remove(0));
queues_v.push(queues.remove(0));
queue_evts_v.push(queue_evts.remove(0));
queue_evts_v.push(queue_evts.remove(0));
let mut handler = NetEpollHandler {
mem: mem.clone(),
tap,
rx: RxVirtio::new(),
tx: TxVirtio::new(),
interrupt_cb,
kill_evt,
pause_evt,
epoll_fd: 0,
rx_tap_listening: false,
};
let mut epoll_thread = Vec::new();
for _ in 0..taps.len() {
let rx = RxVirtio::new();
let tx = TxVirtio::new();
let rx_tap_listening = false;
let paused = self.paused.clone();
thread::Builder::new()
.name("virtio_net".to_string())
.spawn(move || handler.run(paused, queues_v, queue_evts_v))
.map(|thread| self.epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone the virtio-net epoll thread: {}", e);
ActivateError::BadActivate
})?;
let mut queue_pair = Vec::new();
queue_pair.push(queues.remove(0));
queue_pair.push(queues.remove(0));
let mut queue_evt_pair = Vec::new();
queue_evt_pair.push(queue_evts.remove(0));
queue_evt_pair.push(queue_evts.remove(0));
let mut handler = NetEpollHandler {
mem: mem.clone(),
tap: taps.remove(0),
rx,
tx,
interrupt_cb: interrupt_cb.clone(),
kill_evt: kill_evt.try_clone().unwrap(),
pause_evt: pause_evt.try_clone().unwrap(),
epoll_fd: 0,
rx_tap_listening,
};
let paused = self.paused.clone();
thread::Builder::new()
.name("virtio_net".to_string())
.spawn(move || handler.run(paused, queue_pair, queue_evt_pair))
.map(|thread| epoll_thread.push(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?;
}
self.epoll_thread = Some(epoll_thread);
return Ok(());
}
@ -561,6 +581,6 @@ impl VirtioDevice for Net {
}
}
virtio_pausable!(Net, true);
virtio_pausable!(Net, true, true);
impl Snapshotable for Net {}
impl Migratable for Net {}

View File

@ -25,6 +25,11 @@ type Result<T> = std::result::Result<T, Error>;
const MAX_BUFFER_SIZE: usize = 65562;
const QUEUE_SIZE: usize = 256;
const CONFIG_SPACE_MAC: usize = MAC_ADDR_LEN;
const CONFIG_SPACE_STATUS: usize = 2;
const CONFIG_SPACE_QUEUE_PAIRS: usize = 2;
const CONFIG_SPACE_NET: usize = CONFIG_SPACE_MAC + CONFIG_SPACE_STATUS + CONFIG_SPACE_QUEUE_PAIRS;
// The guest has made a buffer available to receive a frame into.
pub const RX_QUEUE_EVENT: DeviceEventT = 0;
// The transmit queue has a frame that is ready to send from the guest.
@ -408,32 +413,83 @@ impl RxVirtio {
}
}
pub fn build_net_config_space(mac: MacAddr, avail_features: &mut u64) -> Vec<u8> {
pub fn build_net_config_space(
mac: MacAddr,
num_queues: usize,
mut avail_features: &mut u64,
) -> Vec<u8> {
let mut config_space = Vec::with_capacity(MAC_ADDR_LEN);
unsafe { config_space.set_len(MAC_ADDR_LEN) }
config_space[..].copy_from_slice(mac.get_bytes());
*avail_features |= 1 << VIRTIO_NET_F_MAC;
build_net_config_space_with_mq(num_queues, &mut config_space, &mut avail_features);
config_space
}
pub fn build_net_config_space_with_mq(
num_queues: usize,
config_space: &mut Vec<u8>,
avail_features: &mut u64,
) {
let num_queue_pairs = (num_queues / 2) as u16;
if (num_queue_pairs >= VIRTIO_NET_CTRL_MQ_VQ_PAIRS_MIN as u16)
&& (num_queue_pairs <= VIRTIO_NET_CTRL_MQ_VQ_PAIRS_MAX as u16)
{
config_space.resize(CONFIG_SPACE_NET, 0);
let max_queue_pairs = num_queue_pairs.to_le_bytes();
config_space[CONFIG_SPACE_MAC + CONFIG_SPACE_STATUS..CONFIG_SPACE_NET]
.copy_from_slice(&max_queue_pairs);
*avail_features |= 1 << VIRTIO_NET_F_MQ;
}
}
fn vnet_hdr_len() -> usize {
mem::size_of::<virtio_net_hdr_v1>()
}
/// Create a new virtio network device with the given IP address and
/// netmask.
pub fn open_tap(ip_addr: Ipv4Addr, netmask: Ipv4Addr) -> Result<Tap> {
pub fn open_tap(
if_name: Option<&str>,
ip_addr: Option<Ipv4Addr>,
netmask: Option<Ipv4Addr>,
num_rx_q: usize,
) -> Result<Vec<Tap>> {
let mut taps: Vec<Tap> = Vec::new();
let mut ifname: String = String::new();
let vnet_hdr_size = vnet_hdr_len() as i32;
let flag = net_gen::TUN_F_CSUM | net_gen::TUN_F_UFO | net_gen::TUN_F_TSO4 | net_gen::TUN_F_TSO6;
let tap = Tap::new(1).map_err(Error::TapOpen)?;
tap.set_ip_addr(ip_addr).map_err(Error::TapSetIp)?;
tap.set_netmask(netmask).map_err(Error::TapSetNetmask)?;
tap.enable().map_err(Error::TapEnable)?;
tap.set_offload(flag).map_err(Error::TapSetOffload)?;
tap.set_vnet_hdr_size(vnet_hdr_size)
.map_err(Error::TapSetVnetHdrSize)?;
for i in 0..num_rx_q {
let tap: Tap;
if i == 0 {
tap = match if_name {
Some(name) => Tap::open_named(name, num_rx_q).map_err(Error::TapOpen)?,
None => Tap::new(num_rx_q).map_err(Error::TapOpen)?,
};
if let Some(ip) = ip_addr {
tap.set_ip_addr(ip).map_err(Error::TapSetIp)?;
}
if let Some(mask) = netmask {
tap.set_netmask(mask).map_err(Error::TapSetNetmask)?;
}
tap.enable().map_err(Error::TapEnable)?;
tap.set_offload(flag).map_err(Error::TapSetOffload)?;
Ok(tap)
tap.set_vnet_hdr_size(vnet_hdr_size)
.map_err(Error::TapSetVnetHdrSize)?;
ifname = String::from_utf8(tap.get_if_name()).unwrap();
} else {
tap = Tap::open_named(ifname.as_str(), num_rx_q).map_err(Error::TapOpen)?;
tap.set_offload(flag).map_err(Error::TapSetOffload)?;
tap.set_vnet_hdr_size(vnet_hdr_size)
.map_err(Error::TapSetVnetHdrSize)?;
}
taps.push(tap);
}
Ok(taps)
}

View File

@ -29,6 +29,8 @@ use vm_device::{Migratable, MigratableError, Pausable, Snapshotable};
use vm_memory::GuestMemoryMmap;
use vmm_sys_util::eventfd::EventFd;
const DEFAULT_QUEUE_NUMBER: usize = 2;
struct SlaveReqHandler {}
impl VhostUserMasterReqHandler for SlaveReqHandler {}
@ -43,12 +45,13 @@ pub struct Net {
queue_sizes: Vec<u16>,
queue_evts: Option<Vec<EventFd>>,
interrupt_cb: Option<Arc<VirtioInterrupt>>,
epoll_thread: Option<thread::JoinHandle<result::Result<(), DeviceError>>>,
epoll_thread: Option<Vec<thread::JoinHandle<result::Result<(), DeviceError>>>>,
ctrl_queue_epoll_thread: Option<thread::JoinHandle<result::Result<(), CtrlError>>>,
paused: Arc<AtomicBool>,
}
impl Net {
/// Create a new vhost-user-net device
/// Create a new vhost-user-net device
pub fn new(mac_addr: MacAddr, vu_cfg: VhostUserConfig) -> Result<Net> {
let mut vhost_user_net = Master::connect(&vu_cfg.sock, vu_cfg.num_queues as u64)
@ -87,24 +90,39 @@ impl Net {
.set_features(avail_features)
.map_err(Error::VhostUserSetFeatures)?;
let protocol_features;
let mut acked_features = 0;
if avail_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() != 0 {
acked_features |= VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
let mut protocol_features = vhost_user_net
protocol_features = vhost_user_net
.get_protocol_features()
.map_err(Error::VhostUserGetProtocolFeatures)?;
protocol_features &= VhostUserProtocolFeatures::MQ;
vhost_user_net
.set_protocol_features(protocol_features)
.map_err(Error::VhostUserSetProtocolFeatures)?;
} else {
return Err(Error::VhostUserProtocolNotSupport);
}
let max_queue_number =
if protocol_features.bits() & VhostUserProtocolFeatures::MQ.bits() != 0 {
vhost_user_net
.set_protocol_features(protocol_features & VhostUserProtocolFeatures::MQ)
.map_err(Error::VhostUserSetProtocolFeatures)?;
match vhost_user_net.get_queue_num() {
Ok(qn) => qn,
Err(_) => DEFAULT_QUEUE_NUMBER as u64,
}
} else {
DEFAULT_QUEUE_NUMBER as u64
};
if vu_cfg.num_queues > max_queue_number as usize {
error!("vhost-user-net has queue number: {} larger than the max queue number: {} backend allowed\n",
vu_cfg.num_queues, max_queue_number);
return Err(Error::BadQueueNum);
}
avail_features |= 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ;
let queue_num = vu_cfg.num_queues + 1;
let config_space = build_net_config_space(mac_addr, &mut avail_features);
let config_space = build_net_config_space(mac_addr, vu_cfg.num_queues, &mut avail_features);
// Send set_vring_base here, since it could tell backends, like OVS + DPDK,
// how many virt queues to be handled, which backend required to know at early stage.
@ -279,7 +297,7 @@ impl VirtioDevice for Net {
})?;
}
let vu_interrupt_list = setup_vhost_user(
let mut vu_interrupt_list = setup_vhost_user(
&mut self.vhost_user_net,
mem.load().as_ref(),
queues,
@ -288,23 +306,32 @@ impl VirtioDevice for Net {
)
.map_err(ActivateError::VhostUserNetSetup)?;
let mut handler = VhostUserEpollHandler::<SlaveReqHandler>::new(VhostUserEpollConfig {
interrupt_cb,
kill_evt,
pause_evt,
vu_interrupt_list,
slave_req_handler: None,
});
let mut epoll_thread = Vec::new();
for _ in 0..vu_interrupt_list.len() / 2 {
let mut interrupt_list_sub: Vec<(EventFd, Queue)> = Vec::with_capacity(2);
interrupt_list_sub.push(vu_interrupt_list.remove(0));
interrupt_list_sub.push(vu_interrupt_list.remove(0));
let paused = self.paused.clone();
thread::Builder::new()
.name("vhost_user_net".to_string())
.spawn(move || handler.run(paused))
.map(|thread| self.epoll_thread = Some(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?;
let mut handler = VhostUserEpollHandler::<SlaveReqHandler>::new(VhostUserEpollConfig {
interrupt_cb: interrupt_cb.clone(),
kill_evt: kill_evt.try_clone().unwrap(),
pause_evt: pause_evt.try_clone().unwrap(),
vu_interrupt_list: interrupt_list_sub,
slave_req_handler: None,
});
let paused = self.paused.clone();
thread::Builder::new()
.name("vhost_user_net".to_string())
.spawn(move || handler.run(paused))
.map(|thread| epoll_thread.push(thread))
.map_err(|e| {
error!("failed to clone queue EventFd: {}", e);
ActivateError::BadActivate
})?;
}
self.epoll_thread = Some(epoll_thread);
Ok(())
}
@ -333,6 +360,6 @@ impl VirtioDevice for Net {
}
}
virtio_pausable!(Net, true);
virtio_pausable!(Net, true, true);
impl Snapshotable for Net {}
impl Migratable for Net {}

View File

@ -24,7 +24,6 @@ use kvm_bindings::kvm_irq_routing_entry;
use kvm_ioctls::*;
use libc::O_TMPFILE;
use libc::TIOCGWINSZ;
use net_util::Tap;
#[cfg(feature = "pci_support")]
use pci::{
DeviceRelocation, InterruptDelivery, InterruptParameters, PciBarRegionType, PciBus,
@ -991,18 +990,28 @@ impl DeviceManager {
if let Some(net_list_cfg) = &vm_info.vm_cfg.lock().unwrap().net {
for net_cfg in net_list_cfg.iter() {
let virtio_net_device = if let Some(ref tap_if_name) = net_cfg.tap {
let tap = Tap::open_named(tap_if_name, 1).map_err(DeviceManagerError::OpenTap)?;
Arc::new(Mutex::new(
vm_virtio::Net::new_with_tap(tap, Some(net_cfg.mac), net_cfg.iommu)
.map_err(DeviceManagerError::CreateVirtioNet)?,
vm_virtio::Net::new(
Some(tap_if_name),
None,
None,
Some(net_cfg.mac),
net_cfg.iommu,
net_cfg.num_queues,
net_cfg.queue_size,
)
.map_err(DeviceManagerError::CreateVirtioNet)?,
))
} else {
Arc::new(Mutex::new(
vm_virtio::Net::new(
net_cfg.ip,
net_cfg.mask,
None,
Some(net_cfg.ip),
Some(net_cfg.mask),
Some(net_cfg.mac),
net_cfg.iommu,
net_cfg.num_queues,
net_cfg.queue_size,
)
.map_err(DeviceManagerError::CreateVirtioNet)?,
))