event_monitor: refactor the implementation to support concurrent access

This patch modifies `event_monitor` to ensure that concurrent access to
`event_log` from multiple threads is safe. Previously, the `event_log`
function would acquire a reference to the event log file and write
to it without doing any synchronization, which made it prone to
data races. This issue likely went under the radar because the
relevant `SAFETY` comment on the unsafe block was incomplete.

The new implementation spawns a dedicated thread named `event-monitor`
solely for writing to the file. It uses the MPMC channel exposed by
`flume` to pass messages to the `event-monitor` thread. Since
`flume::Sender<T>` implements `Sync`, it is safe for multiple threads
to share it and send messages to the `event-monitor` thread.
This is not possible with `std::sync::mpsc::Sender<T>` since it's
`!Sync`, meaning it is not safe for it to be shared between different
threads.

The `event_monitor::set_monitor` function now only initializes
the required global state and returns an instance of the
`Monitor` struct. This decouples the actual logging logic from the
`event_monitor` crate. The `event-monitor` thread is then spawned by
the `vmm` crate.

Signed-off-by: Omer Faruk Bayram <omer.faruk@sartura.hr>
This commit is contained in:
Omer Faruk Bayram 2023-07-30 00:53:36 +03:00 committed by Rob Bradford
parent 3d82867fe2
commit 02e1c54426
5 changed files with 244 additions and 60 deletions

123
Cargo.lock generated
View File

@ -319,6 +319,12 @@ dependencies = [
"log",
]
[[package]]
name = "bumpalo"
version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -613,6 +619,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
name = "event_monitor"
version = "0.1.0"
dependencies = [
"flume",
"libc",
"serde",
"serde_json",
@ -633,6 +640,19 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784a4df722dc6267a04af36895398f59d21d07dce47232adf31ec0ff2fa45e67"
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -784,8 +804,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@ -928,6 +950,15 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "js-sys"
version = "0.3.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kvm-bindings"
version = "0.6.0"
@ -1091,6 +1122,15 @@ dependencies = [
"vmm-sys-util",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "net_gen"
version = "0.1.0"
@ -1304,6 +1344,26 @@ dependencies = [
"wait-timeout",
]
[[package]]
name = "pin-project"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.23",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@ -1756,6 +1816,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "ssh2"
version = "0.9.4"
@ -2319,6 +2388,60 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.23",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.23",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "winapi"
version = "0.3.9"

View File

@ -5,6 +5,7 @@ authors = ["The Cloud Hypervisor Authors"]
edition = "2021"
[dependencies]
flume = "0.10.14"
libc = "0.2.139"
serde = { version = "1.0.164", features = ["rc", "derive"] }
serde_json = "1.0.96"

View File

@ -7,33 +7,11 @@ use serde::Serialize;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::io;
use std::os::unix::io::AsRawFd;
use std::time::{Duration, Instant};
static mut MONITOR: Option<(File, Instant)> = None;
/// This function must only be called once from the main process before any threads
/// are created to avoid race conditions
pub fn set_monitor(file: File) -> Result<(), std::io::Error> {
// SAFETY: there is only one caller of this function, so MONITOR is written to only once
assert!(unsafe { MONITOR.is_none() });
let fd = file.as_raw_fd();
// SAFETY: FFI call to configure the fd
let ret = unsafe {
let mut flags = libc::fcntl(fd, libc::F_GETFL);
flags |= libc::O_NONBLOCK;
libc::fcntl(fd, libc::F_SETFL, flags)
};
if ret < 0 {
return Err(std::io::Error::last_os_error());
}
// SAFETY: MONITOR is None. Nobody else can hold a reference to it.
unsafe {
MONITOR = Some((file, Instant::now()));
};
Ok(())
}
static mut MONITOR: Option<MonitorHandle> = None;
#[derive(Serialize)]
struct Event<'a> {
@ -43,19 +21,71 @@ struct Event<'a> {
properties: Option<&'a HashMap<Cow<'a, str>, Cow<'a, str>>>,
}
pub struct Monitor {
pub rx: flume::Receiver<String>,
pub file: File,
}
struct MonitorHandle {
tx: flume::Sender<String>,
start: Instant,
}
fn set_file_nonblocking(file: &File) -> io::Result<()> {
let fd = file.as_raw_fd();
// SAFETY: FFI call to configure the fd
let ret = unsafe {
let mut flags = libc::fcntl(fd, libc::F_GETFL);
flags |= libc::O_NONBLOCK;
libc::fcntl(fd, libc::F_SETFL, flags)
};
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
/// This function must only be called once from the main thread before any threads
/// are created to avoid race conditions.
pub fn set_monitor(file: File) -> io::Result<Monitor> {
// SAFETY: there is only one caller of this function, so MONITOR is written to only once
assert!(unsafe { MONITOR.is_none() });
set_file_nonblocking(&file)?;
let (tx, rx) = flume::unbounded();
let monitor = Monitor { rx, file };
// SAFETY: MONITOR is None. Nobody else can hold a reference to it.
unsafe {
MONITOR = Some(MonitorHandle {
tx,
start: Instant::now(),
});
};
Ok(monitor)
}
pub fn event_log(source: &str, event: &str, properties: Option<&HashMap<Cow<str>, Cow<str>>>) {
// SAFETY: MONITOR is always in a valid state (None or Some).
if let Some((file, start)) = unsafe { MONITOR.as_ref() } {
let e = Event {
timestamp: start.elapsed(),
// SAFETY: `MONITOR` is always in a valid state (None or Some), because it
// is set only once before any threads are spawned, and it's not mutated
// afterwards. This function only creates immutable references to `MONITOR`.
// Because `MONITOR.tx` is `Sync`, it's safe to share `MONITOR` across
// threads, making this function thread-safe.
if let Some(monitor_handle) = unsafe { MONITOR.as_ref() } {
let event = Event {
timestamp: monitor_handle.start.elapsed(),
source,
event,
properties,
};
serde_json::to_writer_pretty(file, &e).ok();
let mut file = file;
file.write_all(b"\n\n").ok();
if let Ok(event) = serde_json::to_string_pretty(&event) {
monitor_handle.tx.send(event).ok();
}
}
}
@ -79,5 +109,4 @@ macro_rules! event {
$crate::event_log($source, $event, Some(&properties))
}
};
}

View File

@ -69,6 +69,8 @@ enum Error {
BareEventMonitor,
#[error("Error doing event monitor I/O: {0}")]
EventMonitorIo(std::io::Error),
#[error("Event monitor thread failed: {0}")]
EventMonitorThread(#[source] vmm::Error),
#[cfg(feature = "guest_debug")]
#[error("Error parsing --gdb: {0}")]
ParsingGdb(option_parser::OptionParserError),
@ -459,32 +461,6 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
(None, None) => Ok(None),
}?;
if let Some(ref monitor_config) = toplevel.event_monitor {
let mut parser = OptionParser::new();
parser.add("path").add("fd");
parser
.parse(monitor_config)
.map_err(Error::ParsingEventMonitor)?;
let file = if parser.is_set("fd") {
let fd = parser
.convert("fd")
.map_err(Error::ParsingEventMonitor)?
.unwrap();
// SAFETY: fd is valid
unsafe { File::from_raw_fd(fd) }
} else if parser.is_set("path") {
std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(parser.get("path").unwrap())
.map_err(Error::EventMonitorIo)?
} else {
return Err(Error::BareEventMonitor);
};
event_monitor::set_monitor(file).map_err(Error::EventMonitorIo)?;
}
let (api_request_sender, api_request_receiver) = channel();
let api_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateApiEventFd)?;
@ -531,8 +507,6 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
}
}
event!("vmm", "starting");
let hypervisor = hypervisor::new().map_err(Error::CreateHypervisor)?;
#[cfg(feature = "guest_debug")]
@ -556,6 +530,37 @@ fn start_vmm(toplevel: TopLevel) -> Result<Option<String>, Error> {
let exit_evt = EventFd::new(EFD_NONBLOCK).map_err(Error::CreateExitEventFd)?;
if let Some(ref monitor_config) = toplevel.event_monitor {
let mut parser = OptionParser::new();
parser.add("path").add("fd");
parser
.parse(monitor_config)
.map_err(Error::ParsingEventMonitor)?;
let file = if parser.is_set("fd") {
let fd = parser
.convert("fd")
.map_err(Error::ParsingEventMonitor)?
.unwrap();
// SAFETY: fd is valid
unsafe { File::from_raw_fd(fd) }
} else if parser.is_set("path") {
std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(parser.get("path").unwrap())
.map_err(Error::EventMonitorIo)?
} else {
return Err(Error::BareEventMonitor);
};
let monitor = event_monitor::set_monitor(file).map_err(Error::EventMonitorIo)?;
vmm::start_event_monitor_thread(monitor, exit_evt.try_clone().unwrap())
.map_err(Error::EventMonitorThread)?;
}
event!("vmm", "starting");
let vmm_thread_handle = vmm::start_vmm_thread(
vmm::VmmVersionInfo::new(env!("BUILD_VERSION"), env!("CARGO_PKG_VERSION")),
&api_socket_path,

View File

@ -126,6 +126,10 @@ pub enum Error {
#[error("Error starting D-Bus session: {0}")]
CreateDBusSession(#[source] zbus::Error),
/// Cannot create `event-monitor` thread
#[error("Error spawning `event-monitor` thread: {0}")]
EventMonitorThreadSpawn(#[source] io::Error),
/// Cannot handle the VM STDIN stream
#[error("Error handling VM stdin: {0:?}")]
Stdin(VmError),
@ -289,6 +293,28 @@ impl Serialize for PciDeviceInfo {
}
}
pub fn start_event_monitor_thread(
mut monitor: event_monitor::Monitor,
exit_event: EventFd,
) -> Result<()> {
thread::Builder::new()
.name("event-monitor".to_owned())
.spawn(move || {
std::panic::catch_unwind(AssertUnwindSafe(move || {
while let Ok(event) = monitor.rx.recv() {
monitor.file.write_all(event.as_bytes().as_ref()).ok();
monitor.file.write_all(b"\n\n").ok();
}
}))
.map_err(|_| {
error!("`event-monitor` thread panicked");
exit_event.write(1).ok();
})
})
.map(|_| ())
.map_err(Error::EventMonitorThreadSpawn)
}
#[allow(unused_variables)]
#[allow(clippy::too_many_arguments)]
pub fn start_vmm_thread(