event_monitor: make it possible to subscribe to Monitor

Signed-off-by: Omer Faruk Bayram <omer.faruk@sartura.hr>
This commit is contained in:
Omer Faruk Bayram 2023-08-13 15:13:59 +00:00 committed by Bo Chen
parent 654ac49bd6
commit e02efe9ba0
2 changed files with 25 additions and 1 deletions

View File

@ -9,6 +9,7 @@ use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io; use std::io;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
static mut MONITOR: Option<MonitorHandle> = None; static mut MONITOR: Option<MonitorHandle> = None;
@ -24,6 +25,23 @@ struct Event<'a> {
pub struct Monitor { pub struct Monitor {
pub rx: flume::Receiver<String>, pub rx: flume::Receiver<String>,
pub file: File, pub file: File,
pub broadcast: Vec<flume::Sender<Arc<String>>>,
}
impl Monitor {
pub fn new(rx: flume::Receiver<String>, file: File) -> Self {
Self {
rx,
file,
broadcast: vec![],
}
}
pub fn subscribe(&mut self) -> flume::Receiver<Arc<String>> {
let (tx, rx) = flume::unbounded();
self.broadcast.push(tx);
rx
}
} }
struct MonitorHandle { struct MonitorHandle {
@ -56,7 +74,7 @@ pub fn set_monitor(file: File) -> io::Result<Monitor> {
set_file_nonblocking(&file)?; set_file_nonblocking(&file)?;
let (tx, rx) = flume::unbounded(); let (tx, rx) = flume::unbounded();
let monitor = Monitor { rx, file }; let monitor = Monitor::new(rx, file);
// SAFETY: MONITOR is None. Nobody else can hold a reference to it. // SAFETY: MONITOR is None. Nobody else can hold a reference to it.
unsafe { unsafe {

View File

@ -319,8 +319,14 @@ pub fn start_event_monitor_thread(
std::panic::catch_unwind(AssertUnwindSafe(move || { std::panic::catch_unwind(AssertUnwindSafe(move || {
while let Ok(event) = monitor.rx.recv() { while let Ok(event) = monitor.rx.recv() {
let event = Arc::new(event);
monitor.file.write_all(event.as_bytes().as_ref()).ok(); monitor.file.write_all(event.as_bytes().as_ref()).ok();
monitor.file.write_all(b"\n\n").ok(); monitor.file.write_all(b"\n\n").ok();
for tx in monitor.broadcast.iter() {
tx.send(event.clone()).ok();
}
} }
})) }))
.map_err(|_| { .map_err(|_| {