performance-metrics: Move some helper functions to test_infra

The helper functions for measuring and parsing the performance of
virtio-net and virtio-block devices are moved to the `test_infra` crate
so that they be reused for integration tests of rate limiter.

Signed-off-by: Bo Chen <chen.bo@intel.com>
This commit is contained in:
Bo Chen 2022-08-24 11:44:30 -07:00 committed by Rob Bradford
parent ecaff8ff1b
commit d8a6725995
5 changed files with 302 additions and 304 deletions

2
Cargo.lock generated
View File

@ -1114,6 +1114,8 @@ dependencies = [
"epoll",
"libc",
"once_cell",
"serde",
"serde_json",
"ssh2",
"vmm-sys-util",
"wait-timeout",

View File

@ -20,6 +20,7 @@ use std::{
thread,
time::Duration,
};
use test_infra::FioOps;
use thiserror::Error;
#[derive(Error, Debug)]

View File

@ -6,40 +6,23 @@
// Performance tests
use crate::{mean, PerformanceTestControl};
use serde_json::Value;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::fs;
use std::path::PathBuf;
use std::string::String;
use std::thread;
use std::time::Duration;
use std::{fmt, fs};
use test_infra::Error as InfraError;
use test_infra::*;
use wait_timeout::ChildExt;
#[cfg(target_arch = "x86_64")]
pub const FOCAL_IMAGE_NAME: &str = "focal-server-cloudimg-amd64-custom-20210609-0.raw";
#[cfg(target_arch = "aarch64")]
pub const FOCAL_IMAGE_NAME: &str = "focal-server-cloudimg-arm64-custom-20210929-0-update-tool.raw";
#[derive(Debug)]
enum WaitTimeoutError {
Timedout,
ExitStatus,
General(std::io::Error),
}
#[derive(Debug)]
enum Error {
BootTimeParse,
EthrLogFile(std::io::Error),
EthrLogParse,
FioOutputParse,
Iperf3Parse,
Infra(InfraError),
Spawn(std::io::Error),
Scp(SshCommandError),
WaitTimeout(WaitTimeoutError),
}
impl From<InfraError> for Error {
@ -94,128 +77,6 @@ fn direct_kernel_boot_path() -> PathBuf {
kernel_path
}
// Wait the child process for a given timeout
fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> {
match child.wait_timeout(Duration::from_secs(timeout)) {
Err(e) => {
return Err(WaitTimeoutError::General(e));
}
Ok(s) => match s {
None => {
return Err(WaitTimeoutError::Timedout);
}
Some(s) => {
if !s.success() {
return Err(WaitTimeoutError::ExitStatus);
}
}
},
}
Ok(())
}
fn parse_iperf3_output(output: &[u8], sender: bool) -> Result<f64, Error> {
std::panic::catch_unwind(|| {
let s = String::from_utf8_lossy(output);
let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output");
let bps: f64 = if sender {
v["end"]["sum_sent"]["bits_per_second"]
.as_f64()
.expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'")
} else {
v["end"]["sum_received"]["bits_per_second"]
.as_f64()
.expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'")
};
bps
})
.map_err(|_| {
eprintln!(
"=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n",
String::from_utf8_lossy(output)
);
Error::Iperf3Parse
})
}
fn measure_virtio_net_throughput(
test_timeout: u32,
queue_pairs: u32,
guest: &Guest,
receive: bool,
) -> Result<f64, Error> {
let default_port = 5201;
// 1. start the iperf3 server on the guest
for n in 0..queue_pairs {
guest
.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))
.map_err(InfraError::SshCommand)?;
}
thread::sleep(Duration::new(1, 0));
// 2. start the iperf3 client on host to measure RX through-put
let mut clients = Vec::new();
for n in 0..queue_pairs {
let mut cmd = Command::new("iperf3");
cmd.args(&[
"-J", // Output in JSON format
"-c",
&guest.network.guest_ip,
"-p",
&format!("{}", default_port + n),
"-t",
&format!("{}", test_timeout),
]);
// For measuring the guest transmit throughput (as a sender),
// use reverse mode of the iperf3 client on the host
if !receive {
cmd.args(&["-R"]);
}
let client = cmd
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(Error::Spawn)?;
clients.push(client);
}
let mut err: Option<Error> = None;
let mut bps = Vec::new();
let mut failed = false;
for c in clients {
let mut c = c;
if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) {
err = Some(Error::WaitTimeout(e));
failed = true;
}
if !failed {
// Safe to unwrap as we know the child has terminated succesffully
let output = c.wait_with_output().unwrap();
bps.push(parse_iperf3_output(&output.stdout, receive)?);
} else {
let _ = c.kill();
let output = c.wait_with_output().unwrap();
println!(
"=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n",
String::from_utf8_lossy(&output.stdout)
);
}
}
if let Some(e) = err {
Err(e)
} else {
Ok(bps.iter().sum())
}
}
pub fn performance_net_throughput(control: &PerformanceTestControl) -> f64 {
let test_timeout = control.test_timeout;
let rx = control.net_rx.unwrap();
@ -260,95 +121,6 @@ pub fn performance_net_throughput(control: &PerformanceTestControl) -> f64 {
}
}
fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> {
std::panic::catch_unwind(|| {
let s = String::from_utf8_lossy(output);
let mut latency = Vec::new();
for l in s.lines() {
let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line");
// Skip header/summary lines
if let Some(avg) = v["Avg"].as_str() {
// Assume the latency unit is always "us"
latency.push(
avg.split("us").collect::<Vec<&str>>()[0]
.parse::<f64>()
.expect("'ethr' parse error: invalid 'Avg' entry"),
);
}
}
assert!(
!latency.is_empty(),
"'ethr' parse error: no valid latency data found"
);
latency
})
.map_err(|_| {
eprintln!(
"=============== ethr output ===============\n\n{}\n\n===========end============\n\n",
String::from_utf8_lossy(output)
);
Error::EthrLogParse
})
}
fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> {
// copy the 'ethr' tool to the guest image
let ethr_path = "/usr/local/bin/ethr";
let ethr_remote_path = "/tmp/ethr";
scp_to_guest(
Path::new(ethr_path),
Path::new(ethr_remote_path),
&guest.network.guest_ip,
//DEFAULT_SSH_RETRIES,
1,
DEFAULT_SSH_TIMEOUT,
)
.map_err(Error::Scp)?;
// Start the ethr server on the guest
guest
.ssh_command(&format!("{} -s &> /dev/null &", ethr_remote_path))
.map_err(InfraError::SshCommand)?;
thread::sleep(Duration::new(10, 0));
// Start the ethr client on the host
let log_file = guest
.tmp_dir
.as_path()
.join("ethr.client.log")
.to_str()
.unwrap()
.to_string();
let mut c = Command::new(ethr_path)
.args(&[
"-c",
&guest.network.guest_ip,
"-t",
"l",
"-o",
&log_file, // file output is JSON format
"-d",
&format!("{}s", test_timeout),
])
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(Error::Spawn)?;
if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout)
{
let _ = c.kill();
return Err(e);
}
// Parse the ethr latency test output
let content = fs::read(log_file).map_err(Error::EthrLogFile)?;
parse_ethr_latency_output(&content)
}
pub fn performance_net_latency(control: &PerformanceTestControl) -> f64 {
let focal = UbuntuDiskConfig::new(FOCAL_IMAGE_NAME.to_string());
let guest = performance_test_new_guest(Box::new(focal));
@ -561,78 +333,6 @@ pub fn performance_boot_time_pmem(control: &PerformanceTestControl) -> f64 {
}
}
pub enum FioOps {
Read,
RandomRead,
Write,
RandomWrite,
}
impl fmt::Display for FioOps {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
FioOps::Read => write!(f, "read"),
FioOps::RandomRead => write!(f, "randread"),
FioOps::Write => write!(f, "write"),
FioOps::RandomWrite => write!(f, "randwrite"),
}
}
}
fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> {
std::panic::catch_unwind(|| {
let v: Value =
serde_json::from_str(output).expect("'fio' parse error: invalid json output");
let jobs = v["jobs"]
.as_array()
.expect("'fio' parse error: missing entry 'jobs'");
assert_eq!(
jobs.len(),
num_jobs as usize,
"'fio' parse error: Unexpected number of 'fio' jobs."
);
let read = match fio_ops {
FioOps::Read | FioOps::RandomRead => true,
FioOps::Write | FioOps::RandomWrite => false,
};
let mut total_bps = 0_f64;
for j in jobs {
if read {
let bytes = j["read"]["io_bytes"]
.as_u64()
.expect("'fio' parse error: missing entry 'read.io_bytes'");
let runtime = j["read"]["runtime"]
.as_u64()
.expect("'fio' parse error: missing entry 'read.runtime'")
as f64
/ 1000_f64;
total_bps += bytes as f64 / runtime as f64;
} else {
let bytes = j["write"]["io_bytes"]
.as_u64()
.expect("'fio' parse error: missing entry 'write.io_bytes'");
let runtime = j["write"]["runtime"]
.as_u64()
.expect("'fio' parse error: missing entry 'write.runtime'")
as f64
/ 1000_f64;
total_bps += bytes as f64 / runtime as f64;
}
}
total_bps
})
.map_err(|_| {
eprintln!(
"=============== Fio output ===============\n\n{}\n\n===========end============\n\n",
output
);
Error::FioOutputParse
})
}
pub fn performance_block_io(control: &PerformanceTestControl) -> f64 {
let test_timeout = control.test_timeout;
let num_queues = control.num_queues.unwrap();

View File

@ -9,6 +9,8 @@ dirs = "4.0.0"
epoll = "4.3.1"
libc = "0.2.132"
once_cell = "1.14.0"
serde = { version = "1.0.144", features = ["rc", "derive"] }
serde_json = "1.0.85"
ssh2 = { version = "0.9.3", features = ["vendored-openssl"] }
vmm-sys-util = "0.10.0"
wait-timeout = "0.2.0"

View File

@ -4,11 +4,10 @@
//
use once_cell::sync::Lazy;
use serde_json::Value;
use ssh2::Session;
use std::env;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::fs;
use std::io;
use std::io::{Read, Write};
use std::net::TcpListener;
@ -20,13 +19,29 @@ use std::process::{Child, Command, ExitStatus, Output, Stdio};
use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use std::{fmt, fs};
use vmm_sys_util::tempdir::TempDir;
use wait_timeout::ChildExt;
#[derive(Debug)]
pub enum WaitTimeoutError {
Timedout,
ExitStatus,
General(std::io::Error),
}
#[derive(Debug)]
pub enum Error {
Parsing(std::num::ParseIntError),
SshCommand(SshCommandError),
WaitForBoot(WaitForBootError),
EthrLogFile(std::io::Error),
EthrLogParse,
FioOutputParse,
Iperf3Parse,
Spawn(std::io::Error),
WaitTimeout(WaitTimeoutError),
}
impl From<SshCommandError> for Error {
@ -1320,3 +1335,281 @@ pub fn clh_command(cmd: &str) -> String {
|target| format!("target/{}/release/{}", target, cmd),
)
}
pub fn parse_iperf3_output(output: &[u8], sender: bool) -> Result<f64, Error> {
std::panic::catch_unwind(|| {
let s = String::from_utf8_lossy(output);
let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output");
let bps: f64 = if sender {
v["end"]["sum_sent"]["bits_per_second"]
.as_f64()
.expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'")
} else {
v["end"]["sum_received"]["bits_per_second"]
.as_f64()
.expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'")
};
bps
})
.map_err(|_| {
eprintln!(
"=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n",
String::from_utf8_lossy(output)
);
Error::Iperf3Parse
})
}
pub enum FioOps {
Read,
RandomRead,
Write,
RandomWrite,
}
impl fmt::Display for FioOps {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
FioOps::Read => write!(f, "read"),
FioOps::RandomRead => write!(f, "randread"),
FioOps::Write => write!(f, "write"),
FioOps::RandomWrite => write!(f, "randwrite"),
}
}
}
pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result<f64, Error> {
std::panic::catch_unwind(|| {
let v: Value =
serde_json::from_str(output).expect("'fio' parse error: invalid json output");
let jobs = v["jobs"]
.as_array()
.expect("'fio' parse error: missing entry 'jobs'");
assert_eq!(
jobs.len(),
num_jobs as usize,
"'fio' parse error: Unexpected number of 'fio' jobs."
);
let read = match fio_ops {
FioOps::Read | FioOps::RandomRead => true,
FioOps::Write | FioOps::RandomWrite => false,
};
let mut total_bps = 0_f64;
for j in jobs {
if read {
let bytes = j["read"]["io_bytes"]
.as_u64()
.expect("'fio' parse error: missing entry 'read.io_bytes'");
let runtime = j["read"]["runtime"]
.as_u64()
.expect("'fio' parse error: missing entry 'read.runtime'")
as f64
/ 1000_f64;
total_bps += bytes as f64 / runtime as f64;
} else {
let bytes = j["write"]["io_bytes"]
.as_u64()
.expect("'fio' parse error: missing entry 'write.io_bytes'");
let runtime = j["write"]["runtime"]
.as_u64()
.expect("'fio' parse error: missing entry 'write.runtime'")
as f64
/ 1000_f64;
total_bps += bytes as f64 / runtime as f64;
}
}
total_bps
})
.map_err(|_| {
eprintln!(
"=============== Fio output ===============\n\n{}\n\n===========end============\n\n",
output
);
Error::FioOutputParse
})
}
// Wait the child process for a given timeout
fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> {
match child.wait_timeout(Duration::from_secs(timeout)) {
Err(e) => {
return Err(WaitTimeoutError::General(e));
}
Ok(s) => match s {
None => {
return Err(WaitTimeoutError::Timedout);
}
Some(s) => {
if !s.success() {
return Err(WaitTimeoutError::ExitStatus);
}
}
},
}
Ok(())
}
pub fn measure_virtio_net_throughput(
test_timeout: u32,
queue_pairs: u32,
guest: &Guest,
receive: bool,
) -> Result<f64, Error> {
let default_port = 5201;
// 1. start the iperf3 server on the guest
for n in 0..queue_pairs {
guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?;
}
thread::sleep(Duration::new(1, 0));
// 2. start the iperf3 client on host to measure RX through-put
let mut clients = Vec::new();
for n in 0..queue_pairs {
let mut cmd = Command::new("iperf3");
cmd.args(&[
"-J", // Output in JSON format
"-c",
&guest.network.guest_ip,
"-p",
&format!("{}", default_port + n),
"-t",
&format!("{}", test_timeout),
]);
// For measuring the guest transmit throughput (as a sender),
// use reverse mode of the iperf3 client on the host
if !receive {
cmd.args(&["-R"]);
}
let client = cmd
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(Error::Spawn)?;
clients.push(client);
}
let mut err: Option<Error> = None;
let mut bps = Vec::new();
let mut failed = false;
for c in clients {
let mut c = c;
if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) {
err = Some(Error::WaitTimeout(e));
failed = true;
}
if !failed {
// Safe to unwrap as we know the child has terminated succesffully
let output = c.wait_with_output().unwrap();
bps.push(parse_iperf3_output(&output.stdout, receive)?);
} else {
let _ = c.kill();
let output = c.wait_with_output().unwrap();
println!(
"=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n",
String::from_utf8_lossy(&output.stdout)
);
}
}
if let Some(e) = err {
Err(e)
} else {
Ok(bps.iter().sum())
}
}
pub fn parse_ethr_latency_output(output: &[u8]) -> Result<Vec<f64>, Error> {
std::panic::catch_unwind(|| {
let s = String::from_utf8_lossy(output);
let mut latency = Vec::new();
for l in s.lines() {
let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line");
// Skip header/summary lines
if let Some(avg) = v["Avg"].as_str() {
// Assume the latency unit is always "us"
latency.push(
avg.split("us").collect::<Vec<&str>>()[0]
.parse::<f64>()
.expect("'ethr' parse error: invalid 'Avg' entry"),
);
}
}
assert!(
!latency.is_empty(),
"'ethr' parse error: no valid latency data found"
);
latency
})
.map_err(|_| {
eprintln!(
"=============== ethr output ===============\n\n{}\n\n===========end============\n\n",
String::from_utf8_lossy(output)
);
Error::EthrLogParse
})
}
pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result<Vec<f64>, Error> {
// copy the 'ethr' tool to the guest image
let ethr_path = "/usr/local/bin/ethr";
let ethr_remote_path = "/tmp/ethr";
scp_to_guest(
Path::new(ethr_path),
Path::new(ethr_remote_path),
&guest.network.guest_ip,
//DEFAULT_SSH_RETRIES,
1,
DEFAULT_SSH_TIMEOUT,
)?;
// Start the ethr server on the guest
guest.ssh_command(&format!("{} -s &> /dev/null &", ethr_remote_path))?;
thread::sleep(Duration::new(10, 0));
// Start the ethr client on the host
let log_file = guest
.tmp_dir
.as_path()
.join("ethr.client.log")
.to_str()
.unwrap()
.to_string();
let mut c = Command::new(ethr_path)
.args(&[
"-c",
&guest.network.guest_ip,
"-t",
"l",
"-o",
&log_file, // file output is JSON format
"-d",
&format!("{}s", test_timeout),
])
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map_err(Error::Spawn)?;
if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout)
{
let _ = c.kill();
return Err(e);
}
// Parse the ethr latency test output
let content = fs::read(log_file).map_err(Error::EthrLogFile)?;
parse_ethr_latency_output(&content)
}