rdp: add smart/filtering display queue

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
This commit is contained in:
Marc-André Lureau 2024-08-20 11:12:33 +04:00
parent 33925f23cf
commit 8d267d73fd
7 changed files with 620 additions and 49 deletions

View File

@ -3,7 +3,7 @@ use qemu_display::zbus;
mod args;
mod server;
mod util;
mod utils;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {

View File

@ -0,0 +1,346 @@
#![allow(dead_code)]
// I wish I could reuse tokio::mpsc instead of creating a new queue from scratch
use ironrdp::server::{BitmapUpdate, DisplayUpdate, PixelOrder};
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::{Mutex, MutexGuard, Notify};
use DisplayUpdate::{Bitmap, ColorPointer, DefaultPointer, HidePointer, RGBAPointer, Resize};
mod rect;
use rect::Rect;
// The queue capacity is not fixed, as its size may vary depending on how pending updates are splitted
// however, if there are more than 30 updates pending, the producer will be blocked until the consumer.
const QUEUE_CAPACITY: usize = 30;
pub(crate) struct DisplayQueue {
queue: Mutex<VecDeque<DisplayUpdate>>,
notify_producer: Notify,
notify_consumer: Notify,
is_closed: AtomicBool,
}
pub(crate) struct Sender {
channel: Arc<DisplayQueue>,
}
impl Drop for Sender {
fn drop(&mut self) {
self.channel.close();
}
}
pub(crate) struct Receiver {
channel: Arc<DisplayQueue>,
}
impl Drop for Receiver {
fn drop(&mut self) {
self.channel.close();
}
}
impl DisplayQueue {
pub(crate) fn new() -> (Sender, Receiver) {
let channel = Arc::new(DisplayQueue {
queue: Mutex::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
notify_producer: Notify::new(),
notify_consumer: Notify::new(),
is_closed: AtomicBool::new(false),
});
(
Sender {
channel: Arc::clone(&channel),
},
Receiver { channel },
)
}
fn close(&self) {
self.is_closed.store(true, Ordering::SeqCst);
self.notify_consumer.notify_one();
self.notify_producer.notify_one();
}
fn closed(&self) -> bool {
self.is_closed.load(Ordering::SeqCst)
}
}
impl Sender {
pub(crate) async fn send(&self, item: DisplayUpdate) -> Result<(), ()> {
self.push_update(item, true).await
}
async fn push_update(&self, item: DisplayUpdate, update: bool) -> Result<(), ()> {
loop {
if self.channel.closed() {
return Err(());
}
let mut queue = self.channel.queue.lock().await;
if queue.len() < QUEUE_CAPACITY {
if update {
if push_update(&mut queue, item) {
self.channel.notify_consumer.notify_one();
}
} else {
queue.push_back(item);
self.channel.notify_consumer.notify_one();
}
return Ok(());
} else {
drop(queue);
self.channel.notify_producer.notified().await;
}
}
}
}
fn push_update(queue: &mut MutexGuard<VecDeque<DisplayUpdate>>, item: DisplayUpdate) -> bool {
// Try to optimize the pending updates, while not being too aggressive for a reasonable experience
match item {
DisplayUpdate::PointerPosition { .. } => {
// If there is already a pointer position update in the queue, replace it with the new one
if let Some(idx) = queue
.iter()
.position(|update| matches!(update, DisplayUpdate::PointerPosition { .. }))
{
queue[idx] = item;
return true;
}
}
ColorPointer(_) | RGBAPointer(_) | HidePointer | DefaultPointer => {
// If there is already a pointer position update in the queue, replace it with the new one
if let Some(idx) = queue.iter().position(|update| {
matches!(
update,
ColorPointer(_) | RGBAPointer(_) | HidePointer | DefaultPointer
)
}) {
queue[idx] = item;
return true;
}
}
Resize(_) => {
// drop all graphics operations and keep only the last resize operation
// Note: pointer updates are not considered graphics operations
queue.retain(|item| !matches!(item, Resize(_) | Bitmap(_)));
}
Bitmap(ref update @ BitmapUpdate { order, .. }) => {
// let's ignore weird order for now
if order == PixelOrder::TopToBottom {
// drop and split bitmap updates if they overlap
let mut i = 0;
while i < queue.len() {
let prev = match &queue[i] {
Bitmap(prev) => prev,
_ => {
i += 1;
continue;
}
};
if let Some(res) = mask_bitmap_update(update, prev) {
tracing::debug!(res = res.len(), "dropping");
queue.remove(i);
for item in res {
queue.insert(i, Bitmap(item));
i += 1;
}
}
i += 1;
}
}
}
}
queue.push_back(item);
true
}
impl From<&BitmapUpdate> for Rect {
fn from(update: &BitmapUpdate) -> Self {
Rect {
x: update.left as _,
y: update.top as _,
width: update.width.get() as _,
height: update.height.get() as _,
}
}
}
fn mask_bitmap_update(mask: &BitmapUpdate, prev: &BitmapUpdate) -> Option<Vec<BitmapUpdate>> {
let mask_rect = Rect::from(mask);
let prev_rect = Rect::from(prev);
if matches!(prev_rect.intersect(&mask_rect), Some(vec) if vec.is_empty()) {
// full overlap discards the prev update
return Some(vec![]);
}
// else TODO: split the updates.. sharing buffer is not supported yet
None
}
impl Receiver {
pub(crate) async fn recv(&self) -> Option<DisplayUpdate> {
loop {
let mut queue = self.channel.queue.lock().await;
if let Some(item) = queue.pop_front() {
self.channel.notify_producer.notify_one();
return Some(item);
} else if self.channel.closed() && queue.is_empty() {
return None;
} else {
drop(queue);
self.channel.notify_consumer.notified().await;
}
}
}
}
#[cfg(test)]
mod tests {
use std::num::NonZero;
use ironrdp::server::{DesktopSize, PixelFormat, PixelOrder};
use super::*;
#[test]
fn test_send_recv() {
let (tx, rx) = DisplayQueue::new();
crate::utils::block_on(async move {
let tx_task = tokio::spawn(async move {
let ops = vec![
// dummy
Resize(DesktopSize {
width: 1024,
height: 768,
}),
];
for op in ops {
tx.send(op).await.unwrap();
}
});
let rx_task = tokio::spawn(async move {
let _ = rx.recv().await;
});
let _ = tokio::join!(tx_task, rx_task);
});
}
#[test]
fn test_capacity() {
let (tx, _rx) = DisplayQueue::new();
crate::utils::block_on(async move {
let ops = vec![DefaultPointer; QUEUE_CAPACITY];
for op in ops {
tx.push_update(op, false).await.unwrap();
}
tokio::time::timeout(std::time::Duration::from_millis(200), async move {
tx.push_update(DefaultPointer, false).await.unwrap();
})
.await
.unwrap_err()
});
}
#[test]
fn test_close() {
let (tx, rx) = DisplayQueue::new();
crate::utils::block_on(async move {
let tx_task = tokio::spawn(async move {
drop(tx);
});
let rx_task = tokio::spawn(async move {
let res = rx.recv().await;
assert!(res.is_none());
});
let _ = tokio::join!(tx_task, rx_task);
});
}
#[test]
fn test_resize() {
let (tx, rx) = DisplayQueue::new();
crate::utils::block_on(async move {
let ops = vec![
Resize(DesktopSize {
width: 1024,
height: 768,
}),
Bitmap(BitmapUpdate {
top: 0,
left: 0,
width: NonZero::new(1024).unwrap(),
height: NonZero::new(768).unwrap(),
format: PixelFormat::ABgr32,
order: PixelOrder::TopToBottom,
data: vec![],
}),
Resize(DesktopSize {
width: 800,
height: 600,
}),
DefaultPointer,
Resize(DesktopSize {
width: 1080,
height: 1024,
}),
];
for op in ops {
tx.send(op).await.unwrap();
}
drop(tx);
let up = rx.recv().await;
assert!(matches!(up, Some(DefaultPointer)));
let up = rx.recv().await;
assert!(matches!(
up,
Some(Resize(DesktopSize {
width: 1080,
height: 1024
}))
));
assert!(rx.recv().await.is_none());
});
}
#[test]
fn test_bitmap() {
let (tx, rx) = DisplayQueue::new();
crate::utils::block_on(async move {
let ops = vec![
Bitmap(BitmapUpdate {
top: 0,
left: 0,
width: NonZero::new(1024).unwrap(),
height: NonZero::new(768).unwrap(),
format: PixelFormat::ABgr32,
order: PixelOrder::TopToBottom,
data: vec![],
}),
Bitmap(BitmapUpdate {
top: 0,
left: 0,
width: NonZero::new(1024).unwrap(),
height: NonZero::new(768).unwrap(),
format: PixelFormat::ABgr32,
order: PixelOrder::TopToBottom,
data: vec![],
}),
];
for op in ops {
tx.send(op).await.unwrap();
}
drop(tx);
let up = rx.recv().await;
assert!(matches!(up, Some(Bitmap(_))));
assert!(rx.recv().await.is_none());
});
}
}

View File

@ -0,0 +1,183 @@
use std::cmp::{max, min};
#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) struct Rect {
pub(crate) x: u32,
pub(crate) y: u32,
pub(crate) width: u32,
pub(crate) height: u32,
}
impl Rect {
pub(crate) fn intersect(&self, other: &Rect) -> Option<Vec<Rect>> {
let mut result = Vec::new();
// Check if there's no overlap
if self.x + self.width <= other.x
|| other.x + other.width <= self.x
|| self.y + self.height <= other.y
|| other.y + other.height <= self.y
{
return None;
}
// Top
if self.y < other.y {
result.push(Rect {
x: self.x,
y: self.y,
width: self.width,
height: other.y - self.y,
});
}
// Bottom
if self.y + self.height > other.y + other.height {
result.push(Rect {
x: self.x,
y: other.y + other.height,
width: self.width,
height: (self.y + self.height) - (other.y + other.height),
});
}
// Left
if self.x < other.x {
result.push(Rect {
x: self.x,
y: max(self.y, other.y),
width: other.x - self.x,
height: min(self.y + self.height, other.y + other.height) - max(self.y, other.y),
});
}
// Right
if self.x + self.width > other.x + other.width {
result.push(Rect {
x: other.x + other.width,
y: max(self.y, other.y),
width: (self.x + self.width) - (other.x + other.width),
height: min(self.y + self.height, other.y + other.height) - max(self.y, other.y),
});
}
Some(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test() {
// no overlap
let rect = Rect {
x: 0,
y: 0,
width: 100,
height: 100,
};
let other = Rect {
x: 100,
y: 100,
width: 100,
height: 100,
};
let unmasked = rect.intersect(&other);
assert!(unmasked.is_none());
// full overlap
let rect = Rect {
x: 0,
y: 0,
width: 100,
height: 100,
};
let other = Rect {
x: 0,
y: 0,
width: 100,
height: 100,
};
let unmasked = rect.intersect(&other);
assert_eq!(unmasked, Some(vec![]));
// overlap center
let rect = Rect {
x: 0,
y: 0,
width: 100,
height: 100,
};
let other = Rect {
x: 25,
y: 25,
width: 50,
height: 50,
};
let unmasked = rect.intersect(&other);
assert_eq!(
unmasked,
Some(vec![
Rect {
x: 0,
y: 0,
width: 100,
height: 25
},
Rect {
x: 0,
y: 75,
width: 100,
height: 25
},
Rect {
x: 0,
y: 25,
width: 25,
height: 50
},
Rect {
x: 75,
y: 25,
width: 25,
height: 50
}
])
);
// overlap bottom right
let rect = Rect {
x: 0,
y: 0,
width: 100,
height: 100,
};
let other = Rect {
x: 90,
y: 90,
width: 100,
height: 100,
};
let unmasked = rect.intersect(&other);
assert_eq!(
unmasked,
Some(vec![
Rect {
x: 0,
y: 0,
width: 100,
height: 90
},
Rect {
x: 0,
y: 90,
width: 90,
height: 10
},
])
);
}
}

View File

@ -28,7 +28,7 @@ impl RdpServerInputHandler for InputHandler {
}
fn mouse(&mut self, event: MouseEvent) {
tracing::debug!(?event);
tracing::trace!(?event);
if let Err(e) = self.tx.try_send(InputEvent::Mouse(event)) {
eprintln!("mouse error: {:?}", e);
}
@ -41,6 +41,11 @@ async fn input_receive_task(mut rx: Receiver<InputEvent>, console: Console) {
Some(InputEvent::Keyboard(ev)) => match ev {
KeyboardEvent::Pressed { code, .. } => console.keyboard.press(code as u32).await,
KeyboardEvent::Released { code, .. } => console.keyboard.release(code as u32).await,
KeyboardEvent::Synchronize(flags) => {
tracing::debug!(?flags, "lock keys sync not supported yet");
// console.keyboard.set_modifiers(0).await
Ok(())
}
other => {
eprintln!("unhandled keyboard event: {:?}", other);
Ok(())
@ -48,7 +53,7 @@ async fn input_receive_task(mut rx: Receiver<InputEvent>, console: Console) {
},
Some(InputEvent::Mouse(ev)) => match ev {
MouseEvent::Move { x, y } => {
tracing::debug!(?x, ?y);
tracing::trace!(?x, ?y);
console.mouse.set_abs_position(cast!(x), cast!(y)).await
}
MouseEvent::RightPressed => console.mouse.press(MouseButton::Right).await,

View File

@ -109,6 +109,7 @@ impl AudioOutHandler for DBusHandler {
return;
}
// TODO: handle format conversion
if let Some(sender) = inner.ev_sender.as_ref() {
let ts = inner.start_time.elapsed().as_millis() as _;
let _ = sender.send(ServerEvent::Rdpsnd(RdpsndServerMessage::Wave(data, ts)));

View File

@ -1,46 +0,0 @@
use ironrdp::server::PixelFormat;
#[macro_export]
macro_rules! cast {
($value:expr) => {
match $value.try_into() {
Ok(val) => val,
Err(err) => {
eprintln!("Error casting value: {}", err);
return;
}
}
};
}
pub(crate) struct PixmanFormat(pub u32);
#[cfg(target_endian = "little")]
impl TryFrom<PixmanFormat> for PixelFormat {
type Error = ();
fn try_from(value: PixmanFormat) -> Result<Self, Self::Error> {
use pixman_sys::*;
#[allow(non_upper_case_globals)]
match value.0 {
pixman_format_code_t_PIXMAN_x8r8g8b8 => Ok(PixelFormat::BgrX32),
_ => Err(()),
}
}
}
#[cfg(target_endian = "big")]
impl TryFrom<PixmanFormat> for PixelFormat {
type Error = ();
fn try_from(value: PixmanFormat) -> Result<Self, Self::Error> {
use pixman_sys::*;
#[allow(non_upper_case_globals)]
match value.0 {
pixman_format_code_t_PIXMAN_x8r8g8b8 => Ok(PixelFormat::XRgb32),
_ => Err(()),
}
}
}

82
qemu-rdp/src/utils.rs Normal file
View File

@ -0,0 +1,82 @@
use ironrdp::server::PixelFormat;
use std::sync::OnceLock;
use tokio::runtime::{self, Runtime};
#[macro_export]
macro_rules! cast {
($value:expr) => {
match $value.try_into() {
Ok(val) => val,
Err(err) => {
eprintln!("Error casting value: {}", err);
return;
}
}
};
}
pub(crate) struct PixmanFormat(pub u32);
#[cfg(target_endian = "little")]
impl TryFrom<PixmanFormat> for PixelFormat {
type Error = ();
fn try_from(value: PixmanFormat) -> Result<Self, Self::Error> {
use pixman_sys::*;
#[allow(non_upper_case_globals)]
match value.0 {
pixman_format_code_t_PIXMAN_x8r8g8b8 => Ok(PixelFormat::BgrX32),
_ => Err(()),
}
}
}
#[cfg(target_endian = "big")]
impl TryFrom<PixmanFormat> for PixelFormat {
type Error = ();
fn try_from(value: PixmanFormat) -> Result<Self, Self::Error> {
use pixman_sys::*;
#[allow(non_upper_case_globals)]
match value.0 {
pixman_format_code_t_PIXMAN_x8r8g8b8 => Ok(PixelFormat::XRgb32),
_ => Err(()),
}
}
}
/// Blocks on a given future using a single-threaded Tokio runtime.
///
/// This function ensures that a Tokio runtime is initialized only once and is reused for subsequent calls.
/// It will block the current thread until the given future resolves.
///
/// # Arguments
///
/// * `future` - A future to run to completion.
///
/// # Returns
///
/// The output of the future.
///
/// # Panics
///
/// Panics if the runtime fails to initialize.
#[allow(unused)]
pub(crate) fn block_on<F: std::future::Future>(future: F) -> F::Output {
// A static OnceLock to ensure the runtime is initialized only once
static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
// Initialize the runtime if it hasn't been initialized yet
let runtime = TOKIO_RT.get_or_init(|| {
runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.expect("Failed to initialize single-threaded Tokio runtime")
});
// Block on the provided future using the initialized runtime
runtime.block_on(future)
}