Follow zbus API break, everything async /o\

for better or worse...

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
This commit is contained in:
Marc-André Lureau 2021-09-20 16:52:03 +04:00
parent 7242c46513
commit fabfa85adc
22 changed files with 769 additions and 894 deletions

View File

@ -11,18 +11,8 @@ members = [
default-members = ["qemu-rdw"]
[patch.crates-io]
vnc = { git = 'https://github.com/elmarco/rust-vnc', branch = 'server' }
usbredirhost = { path = "../usbredir-rs/usbredirhost" }
libusb1-sys = { path = "../rusb/libusb1-sys" }
rusb = { path = "../rusb" }
zbus = { path = "../zbus/zbus" }
zvariant = { path = "../zbus/zvariant" }
[patch."https://gitlab.gnome.org/malureau/rdw.git"]
rdw = { path = '../rdw/rdw' }
[patch."https://github.com/gtk-rs/gtk4-rs"]
gdk4-wayland = { path = '../gtk4-rs/gdk4-wayland' }
gdk4-x11 = { path = '../gtk4-rs/gdk4-x11' }
gtk4 = { path = '../gtk4-rs/gtk4' }
gtk4-sys = { path = '../gtk4-rs/gtk4/sys' }
vnc = { git = "https://github.com/elmarco/rust-vnc", branch = "server" }
zbus = { git = "https://gitlab.freedesktop.org/dbus/zbus.git" }
zvariant = { git = "https://gitlab.freedesktop.org/dbus/zbus.git" }
#zbus = { path = "../zbus/zbus" }
#zvariant = { path = "../zbus/zvariant" }

View File

@ -12,7 +12,6 @@ derivative = "2.2.0"
zbus = { version = "2.0.0-beta", features = ["xml"] }
zvariant = { version = "2.4.0", features = ["serde_bytes"] }
libc = "0.2.86"
glib = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true }
enumflags2 = { version = "0.6.4", features = ["serde"] }
serde = { version = "1.0.123", features = ["derive"] }
serde_repr = "0.1.6"
@ -22,3 +21,4 @@ once_cell = "1.5"
futures = "0.3.13"
usbredirhost = "0.0.1"
async-broadcast = "0.3.3"
async-trait = "0.1.48"

View File

@ -1,13 +1,7 @@
use once_cell::sync::OnceCell;
use std::default::Default;
use std::os::unix::net::UnixStream;
use std::sync::mpsc::{self, Receiver, SendError};
use std::sync::{Arc, Mutex};
use std::{os::unix::io::AsRawFd, thread};
use std::os::unix::{io::AsRawFd, net::UnixStream};
use zbus::{dbus_interface, dbus_proxy, zvariant::Fd, Connection};
use zbus::{dbus_interface, dbus_proxy, zvariant::Fd};
use crate::{EventSender, Result};
use crate::Result;
#[derive(Debug)]
pub struct PCMInfo {
@ -50,24 +44,6 @@ pub struct Volume {
pub volume: Vec<u8>,
}
#[derive(Debug)]
pub enum AudioOutEvent {
Init { id: u64, info: PCMInfo },
Fini { id: u64 },
SetEnabled { id: u64, enabled: bool },
SetVolume { id: u64, volume: Volume },
Write { id: u64, data: Vec<u8> },
}
#[derive(Debug)]
pub enum AudioInEvent {
Init { id: u64, info: PCMInfo },
Fini { id: u64 },
SetEnabled { id: u64, enabled: bool },
SetVolume { id: u64, volume: Volume },
Read { id: u64 },
}
#[dbus_proxy(
default_service = "org.qemu",
default_path = "/org/qemu/Display1/Audio",
@ -86,37 +62,31 @@ trait Audio {
pub struct Audio {
#[derivative(Debug = "ignore")]
pub proxy: AsyncAudioProxy<'static>,
out_listener: Option<Connection>,
in_listener: Option<Connection>,
}
#[derive(Debug)]
pub(crate) struct AudioOutListener<E: EventSender<Event = AudioOutEvent>> {
tx: E,
err: Arc<OnceCell<SendError<AudioOutEvent>>>,
#[async_trait::async_trait]
pub trait AudioOutHandler: 'static + Send + Sync {
async fn init(&mut self, id: u64, info: PCMInfo);
async fn fini(&mut self, id: u64);
async fn set_enabled(&mut self, id: u64, enabled: bool);
async fn set_volume(&mut self, id: u64, volume: Volume);
async fn write(&mut self, id: u64, data: Vec<u8>);
}
impl<E: EventSender<Event = AudioOutEvent>> AudioOutListener<E> {
pub(crate) fn new(tx: E) -> Self {
AudioOutListener {
tx,
err: Default::default(),
}
}
fn send(&mut self, event: AudioOutEvent) {
if let Err(e) = self.tx.send_event(event) {
let _ = self.err.set(e);
}
}
pub fn err(&self) -> Arc<OnceCell<SendError<AudioOutEvent>>> {
self.err.clone()
}
struct AudioOutListener<H: AudioOutHandler> {
handler: H,
}
#[dbus_interface(name = "org.qemu.Display1.AudioOutListener")]
impl<E: 'static + EventSender<Event = AudioOutEvent>> AudioOutListener<E> {
impl<H: AudioOutHandler> AudioOutListener<H> {
/// Init method
fn init(
async fn init(
&mut self,
id: u64,
bits: u8,
@ -128,80 +98,73 @@ impl<E: 'static + EventSender<Event = AudioOutEvent>> AudioOutListener<E> {
bytes_per_second: u32,
be: bool,
) {
self.send(AudioOutEvent::Init {
id,
info: PCMInfo {
bits,
is_signed,
is_float,
freq,
nchannels,
bytes_per_frame,
bytes_per_second,
be,
},
})
self.handler
.init(
id,
PCMInfo {
bits,
is_signed,
is_float,
freq,
nchannels,
bytes_per_frame,
bytes_per_second,
be,
},
)
.await
}
/// Fini method
fn fini(&mut self, id: u64) {
self.send(AudioOutEvent::Fini { id })
async fn fini(&mut self, id: u64) {
self.handler.fini(id).await
}
/// SetEnabled method
fn set_enabled(&mut self, id: u64, enabled: bool) {
self.send(AudioOutEvent::SetEnabled { id, enabled })
async fn set_enabled(&mut self, id: u64, enabled: bool) {
self.handler.set_enabled(id, enabled).await
}
/// SetVolume method
fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) {
self.send(AudioOutEvent::SetVolume {
id,
volume: Volume {
mute,
volume: volume.into_vec(),
},
});
async fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) {
self.handler
.set_volume(
id,
Volume {
mute,
volume: volume.into_vec(),
},
)
.await
}
/// Write method
fn write(&mut self, id: u64, data: serde_bytes::ByteBuf) {
self.send(AudioOutEvent::Write {
id,
data: data.into_vec(),
})
async fn write(&mut self, id: u64, data: serde_bytes::ByteBuf) {
self.handler.write(id, data.into_vec()).await
}
}
#[derive(Debug)]
pub(crate) struct AudioInListener<E: EventSender<Event = AudioInEvent>> {
tx: E,
err: Arc<OnceCell<SendError<AudioInEvent>>>,
#[async_trait::async_trait]
pub trait AudioInHandler: 'static + Send + Sync {
async fn init(&mut self, id: u64, info: PCMInfo);
async fn fini(&mut self, id: u64);
async fn set_enabled(&mut self, id: u64, enabled: bool);
async fn set_volume(&mut self, id: u64, volume: Volume);
async fn read(&mut self, id: u64, size: u64) -> Vec<u8>;
}
impl<E: EventSender<Event = AudioInEvent>> AudioInListener<E> {
pub(crate) fn new(tx: E) -> Self {
AudioInListener {
tx,
err: Default::default(),
}
}
fn send(&mut self, event: AudioInEvent) {
if let Err(e) = self.tx.send_event(event) {
let _ = self.err.set(e);
}
}
pub fn err(&self) -> Arc<OnceCell<SendError<AudioInEvent>>> {
self.err.clone()
}
struct AudioInListener<H: AudioInHandler> {
handler: H,
}
#[dbus_interface(name = "org.qemu.Display1.AudioInListener")]
impl<E: 'static + EventSender<Event = AudioInEvent>> AudioInListener<E> {
impl<H: AudioInHandler> AudioInListener<H> {
/// Init method
fn init(
async fn init(
&mut self,
id: u64,
bits: u8,
@ -213,116 +176,84 @@ impl<E: 'static + EventSender<Event = AudioInEvent>> AudioInListener<E> {
bytes_per_second: u32,
be: bool,
) {
self.send(AudioInEvent::Init {
id,
info: PCMInfo {
bits,
is_signed,
is_float,
freq,
nchannels,
bytes_per_frame,
bytes_per_second,
be,
},
})
self.handler
.init(
id,
PCMInfo {
bits,
is_signed,
is_float,
freq,
nchannels,
bytes_per_frame,
bytes_per_second,
be,
},
)
.await
}
/// Fini method
fn fini(&mut self, id: u64) {
self.send(AudioInEvent::Fini { id })
async fn fini(&mut self, id: u64) {
self.handler.fini(id).await
}
/// SetEnabled method
fn set_enabled(&mut self, id: u64, enabled: bool) {
self.send(AudioInEvent::SetEnabled { id, enabled })
async fn set_enabled(&mut self, id: u64, enabled: bool) {
self.handler.set_enabled(id, enabled).await
}
/// SetVolume method
fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) {
self.send(AudioInEvent::SetVolume {
id,
volume: Volume {
mute,
volume: volume.into_vec(),
},
});
async fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) {
self.handler
.set_volume(
id,
Volume {
mute,
volume: volume.into_vec(),
},
)
.await
}
/// Read method
fn read(&mut self, id: u64, size: u64) -> Vec<u8> {
dbg!((id, size));
vec![0; size as usize]
async fn read(&mut self, id: u64, size: u64) -> Vec<u8> {
self.handler.read(id, size).await
// dbg!((id, size));
// vec![0; size as usize]
}
}
impl Audio {
pub async fn new(conn: &zbus::azync::Connection) -> Result<Self> {
pub async fn new(conn: &zbus::Connection) -> Result<Self> {
let proxy = AsyncAudioProxy::new(conn).await?;
Ok(Self { proxy })
Ok(Self {
proxy,
in_listener: None,
out_listener: None,
})
}
pub async fn listen_out(&self) -> Result<Receiver<AudioOutEvent>> {
pub async fn register_out_listener<H: AudioOutHandler>(&mut self, handler: H) -> Result<()> {
let (p0, p1) = UnixStream::pair()?;
let (tx, rx) = mpsc::channel();
self.proxy
.register_out_listener(p0.as_raw_fd().into())
.await?;
let _thread = thread::spawn(move || {
let c = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
.unwrap();
let mut s = zbus::ObjectServer::new(&c);
let listener = AudioOutListener::new(Mutex::new(tx));
let err = listener.err();
s.at("/org/qemu/Display1/AudioOutListener", listener)
.unwrap();
loop {
if let Err(e) = s.try_handle_next() {
eprintln!("Listener DBus error: {}", e);
return;
}
if let Some(e) = err.get() {
eprintln!("Listener channel error: {}", e);
return;
}
}
});
Ok(rx)
}
pub async fn listen_in(&self) -> Result<Receiver<AudioInEvent>> {
let (p0, p1) = UnixStream::pair()?;
let (tx, rx) = mpsc::channel();
self.proxy
.register_in_listener(p0.as_raw_fd().into())
let c = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
.await?;
let _thread = thread::spawn(move || {
let c = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
{
let mut server = c.object_server_mut().await;
server
.at(
"/org/qemu/Display1/AudioOutListener",
AudioOutListener { handler },
)
.unwrap();
let mut s = zbus::ObjectServer::new(&c);
let listener = AudioInListener::new(Mutex::new(tx));
let err = listener.err();
s.at("/org/qemu/Display1/AudioInListener", listener)
.unwrap();
loop {
if let Err(e) = s.try_handle_next() {
eprintln!("Listener DBus error: {}", e);
return;
}
if let Some(e) = err.get() {
eprintln!("Listener channel error: {}", e);
return;
}
}
});
Ok(rx)
server.start_dispatch();
}
self.out_listener.replace(c);
Ok(())
}
}

View File

@ -1,6 +1,8 @@
use std::convert::TryFrom;
use zbus::dbus_proxy;
use zbus::zvariant::{Fd, ObjectPath};
use zbus::{
dbus_proxy,
zvariant::{Fd, ObjectPath},
};
use crate::Result;
@ -36,7 +38,7 @@ pub struct Chardev {
}
impl Chardev {
pub async fn new(conn: &zbus::azync::Connection, id: &str) -> Result<Self> {
pub async fn new(conn: &zbus::Connection, id: &str) -> Result<Self> {
let obj_path = ObjectPath::try_from(format!("/org/qemu/Display1/Chardev_{}", id))?;
let proxy = AsyncChardevProxy::builder(conn)
.path(&obj_path)?

View File

@ -1,12 +1,9 @@
use once_cell::sync::OnceCell;
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::convert::TryFrom;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use zbus::{dbus_interface, dbus_proxy, zvariant::ObjectPath};
use zvariant::derive::Type;
use crate::{EventSender, Result};
use crate::Result;
#[repr(u32)]
#[derive(Deserialize_repr, Serialize_repr, Type, Debug, Hash, PartialEq, Eq, Clone, Copy)]
@ -37,141 +34,88 @@ pub trait Clipboard {
) -> zbus::Result<(String, Vec<u8>)>;
}
pub type ClipboardReplyTx = Sender<Result<(String, Vec<u8>)>>;
#[async_trait::async_trait]
pub trait ClipboardHandler: 'static + Send + Sync {
async fn register(&mut self);
// TODO: replace events mpsc with async traits
#[derive(Debug)]
pub enum ClipboardEvent {
Register,
Unregister,
Grab {
selection: ClipboardSelection,
serial: u32,
mimes: Vec<String>,
},
Release {
selection: ClipboardSelection,
},
Request {
async fn unregister(&mut self);
async fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec<String>);
async fn release(&mut self, selection: ClipboardSelection);
async fn request(
&mut self,
selection: ClipboardSelection,
mimes: Vec<String>,
tx: Mutex<ClipboardReplyTx>,
},
) -> Result<(String, Vec<u8>)>;
}
#[derive(Debug)]
pub(crate) struct ClipboardListener<E: EventSender<Event = ClipboardEvent>> {
tx: E,
err: Arc<OnceCell<String>>,
pub(crate) struct ClipboardListener<H: ClipboardHandler> {
handler: H,
}
#[dbus_interface(name = "org.qemu.Display1.Clipboard")]
impl<E: 'static + EventSender<Event = ClipboardEvent>> ClipboardListener<E> {
fn register(&mut self) {
self.send(ClipboardEvent::Register)
impl<H: ClipboardHandler> ClipboardListener<H> {
async fn register(&mut self) {
self.handler.register().await;
}
fn unregister(&mut self) {
self.send(ClipboardEvent::Unregister)
async fn unregister(&mut self) {
self.handler.unregister().await;
}
fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec<String>) {
self.send(ClipboardEvent::Grab {
selection,
serial,
mimes,
})
async fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec<String>) {
self.handler.grab(selection, serial, mimes).await;
}
fn release(&mut self, selection: ClipboardSelection) {
self.send(ClipboardEvent::Release { selection })
async fn release(&mut self, selection: ClipboardSelection) {
self.handler.release(selection).await;
}
fn request(
async fn request(
&mut self,
selection: ClipboardSelection,
mimes: Vec<String>,
) -> zbus::fdo::Result<(String, Vec<u8>)> {
let (tx, rx) = channel();
self.send(ClipboardEvent::Request {
selection,
mimes,
tx: Mutex::new(tx),
});
rx.recv()
.map_err(|e| zbus::fdo::Error::Failed(format!("Request recv failed: {}", e)))?
self.handler
.request(selection, mimes)
.await
.map_err(|e| zbus::fdo::Error::Failed(format!("Request failed: {}", e)))
}
}
impl<E: 'static + EventSender<Event = ClipboardEvent>> ClipboardListener<E> {
pub fn new(tx: E) -> Self {
Self {
tx,
err: Default::default(),
}
}
fn send(&mut self, event: ClipboardEvent) {
if let Err(e) = self.tx.send_event(event) {
let _ = self.err.set(e.to_string());
}
}
pub fn err(&self) -> Arc<OnceCell<String>> {
self.err.clone()
}
}
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct Clipboard {
conn: zbus::azync::Connection,
#[derivative(Debug = "ignore")]
pub proxy: AsyncClipboardProxy<'static>,
conn: zbus::Connection,
}
impl Clipboard {
pub async fn new(conn: &zbus::azync::Connection) -> Result<Self> {
let obj_path = ObjectPath::try_from("/org/qemu/Display1/Clipboard")?;
pub async fn new(conn: &zbus::Connection) -> Result<Self> {
let obj_path = ObjectPath::try_from("/org/qemu/Display1/Clipboard").unwrap();
let proxy = AsyncClipboardProxy::builder(conn)
.path(&obj_path)?
.build()
.await?;
Ok(Self {
conn: conn.clone(),
proxy,
conn: conn.clone(),
})
}
pub async fn register(&self) -> Result<()> {
self.proxy.register().await?;
Ok(())
}
}
#[cfg(feature = "glib")]
impl Clipboard {
pub async fn glib_listen(&self) -> Result<glib::Receiver<ClipboardEvent>> {
let (tx, rx) = glib::MainContext::channel(glib::source::Priority::default());
let c = self.conn.clone().into();
let _thread = std::thread::spawn(move || {
let mut s = zbus::ObjectServer::new(&c);
let listener = ClipboardListener::new(tx);
let err = listener.err();
s.at("/org/qemu/Display1/Clipboard", listener).unwrap();
loop {
if let Err(e) = s.try_handle_next() {
eprintln!("Listener DBus error: {}", e);
break;
}
if let Some(e) = err.get() {
eprintln!("Listener channel error: {}", e);
break;
}
}
});
Ok(rx)
pub async fn register<H: ClipboardHandler>(&self, handler: H) -> Result<()> {
self.conn
.object_server_mut()
.await
.at(
"/org/qemu/Display1/Clipboard",
ClipboardListener { handler },
)
.unwrap();
Ok(self.proxy.register().await?)
}
}

View File

@ -1,16 +1,15 @@
use std::convert::TryFrom;
use std::os::unix::net::UnixStream;
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::Mutex;
use std::{os::unix::io::AsRawFd, thread};
use std::{
cell::RefCell,
convert::TryFrom,
os::unix::{io::AsRawFd, net::UnixStream},
};
use zbus::{
dbus_proxy,
zvariant::{Fd, ObjectPath},
Connection,
};
use crate::Result;
use crate::{AsyncKeyboardProxy, AsyncMouseProxy, ConsoleEvent, ConsoleListener};
use crate::{AsyncKeyboardProxy, AsyncMouseProxy, ConsoleListener, ConsoleListenerHandler, Result};
#[dbus_proxy(default_service = "org.qemu", interface = "org.qemu.Display1.Console")]
pub trait Console {
@ -54,10 +53,11 @@ pub struct Console {
pub keyboard: AsyncKeyboardProxy<'static>,
#[derivative(Debug = "ignore")]
pub mouse: AsyncMouseProxy<'static>,
listener: RefCell<Option<Connection>>,
}
impl Console {
pub async fn new(conn: &zbus::azync::Connection, idx: u32) -> Result<Self> {
pub async fn new(conn: &Connection, idx: u32) -> Result<Self> {
let obj_path = ObjectPath::try_from(format!("/org/qemu/Display1/Console_{}", idx))?;
let proxy = AsyncConsoleProxy::builder(conn)
.path(&obj_path)?
@ -75,30 +75,10 @@ impl Console {
proxy,
keyboard,
mouse,
listener: RefCell::new(None),
})
}
pub async fn dispatch_signals(&self) -> Result<()> {
use futures_util::{future::FutureExt, select};
if let Some(msg) = select!(
msg = self.proxy.next_signal().fuse() => {
msg?
},
msg = self.keyboard.next_signal().fuse() => {
msg?
},
msg = self.mouse.next_signal().fuse() => {
msg?
}
) {
if msg.primary_header().msg_type() == zbus::MessageType::Signal {
log::debug!("Ignoring {:?}", msg);
}
}
Ok(())
}
pub async fn label(&self) -> Result<String> {
Ok(self.proxy.label().await?)
}
@ -111,66 +91,25 @@ impl Console {
Ok(self.proxy.height().await?)
}
pub async fn listen(&self) -> Result<(Receiver<ConsoleEvent>, Sender<()>)> {
pub async fn register_listener<H: ConsoleListenerHandler>(&self, handler: H) -> Result<()> {
let (p0, p1) = UnixStream::pair()?;
let (tx, rx) = mpsc::channel();
self.proxy.register_listener(p0.as_raw_fd().into()).await?;
let (wait_tx, wait_rx) = mpsc::channel();
let _thread = thread::spawn(move || {
let c = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
let c = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
.await?;
{
let mut server = c.object_server_mut().await;
server
.at("/org/qemu/Display1/Listener", ConsoleListener::new(handler))
.unwrap();
let mut s = zbus::ObjectServer::new(&c);
let listener = ConsoleListener::new(Mutex::new(tx), wait_rx);
let err = listener.err();
s.at("/org/qemu/Display1/Listener", listener).unwrap();
loop {
if let Err(e) = s.try_handle_next() {
eprintln!("Listener DBus error: {}", e);
return;
}
if let Some(e) = err.get() {
eprintln!("Listener channel error: {}", e);
return;
}
}
});
server.start_dispatch();
}
self.listener.replace(Some(c));
Ok(())
}
Ok((rx, wait_tx))
}
}
#[cfg(feature = "glib")]
impl Console {
pub async fn glib_listen(&self) -> Result<(glib::Receiver<ConsoleEvent>, Sender<()>)> {
let (p0, p1) = UnixStream::pair()?;
let (tx, rx) = glib::MainContext::channel(glib::source::Priority::default());
self.proxy.register_listener(p0.as_raw_fd().into()).await?;
let (wait_tx, wait_rx) = mpsc::channel();
let _thread = thread::spawn(move || {
let c = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
.unwrap();
let mut s = zbus::ObjectServer::new(&c);
let listener = ConsoleListener::new(tx, wait_rx);
let err = listener.err();
s.at("/org/qemu/Display1/Listener", listener).unwrap();
loop {
if let Err(e) = s.try_handle_next() {
eprintln!("Listener DBus error: {}", e);
break;
}
if let Some(e) = err.get() {
eprintln!("Listener channel error: {}", e);
break;
}
}
});
Ok((rx, wait_tx))
pub fn unregister_listener(&mut self) {
self.listener.replace(None);
}
}

View File

@ -1,15 +1,10 @@
use once_cell::sync::OnceCell;
use std::ops::Drop;
use std::os::unix::io::IntoRawFd;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::mpsc::{Receiver, RecvError, SendError};
use std::sync::{Arc, Mutex};
use derivative::Derivative;
use std::{
ops::Drop,
os::unix::io::{AsRawFd, IntoRawFd, RawFd},
};
use zbus::{dbus_interface, zvariant::Fd};
use crate::EventSender;
#[derive(Derivative)]
#[derivative(Debug)]
pub struct Scanout {
@ -45,6 +40,17 @@ pub struct ScanoutDMABUF {
pub y0_top: bool,
}
#[derive(Derivative)]
#[derivative(Debug)]
pub struct Cursor {
pub width: i32,
pub height: i32,
pub hot_x: i32,
pub hot_y: i32,
#[derivative(Debug = "ignore")]
pub data: Vec<u8>,
}
impl Drop for ScanoutDMABUF {
fn drop(&mut self) {
if self.fd >= 0 {
@ -68,39 +74,39 @@ pub struct MouseSet {
pub on: i32,
}
// TODO: replace events mpsc with async traits
#[derive(Debug)]
pub enum ConsoleEvent {
Scanout(Scanout),
Update(Update),
ScanoutDMABUF(ScanoutDMABUF),
UpdateDMABUF {
x: i32,
y: i32,
w: i32,
h: i32,
},
MouseSet(MouseSet),
CursorDefine {
width: i32,
height: i32,
hot_x: i32,
hot_y: i32,
data: Vec<u8>,
},
Disconnected,
#[derive(Debug, Copy, Clone)]
pub struct UpdateDMABUF {
pub x: i32,
pub y: i32,
pub w: i32,
pub h: i32,
}
#[async_trait::async_trait]
pub trait ConsoleListenerHandler: 'static + Send + Sync {
async fn scanout(&mut self, scanout: Scanout);
async fn update(&mut self, update: Update);
async fn scanout_dmabuf(&mut self, scanout: ScanoutDMABUF);
async fn update_dmabuf(&mut self, update: UpdateDMABUF);
async fn mouse_set(&mut self, set: MouseSet);
async fn cursor_define(&mut self, cursor: Cursor);
fn disconnected(&mut self);
}
#[derive(Debug)]
pub(crate) struct ConsoleListener<E: EventSender<Event = ConsoleEvent>> {
tx: E,
wait_rx: Mutex<Receiver<()>>,
err: Arc<OnceCell<SendError<ConsoleEvent>>>,
pub(crate) struct ConsoleListener<H: ConsoleListenerHandler> {
handler: H,
}
#[dbus_interface(name = "org.qemu.Display1.Listener")]
impl<E: 'static + EventSender<Event = ConsoleEvent>> ConsoleListener<E> {
fn scanout(
impl<H: ConsoleListenerHandler> ConsoleListener<H> {
async fn scanout(
&mut self,
width: u32,
height: u32,
@ -108,16 +114,18 @@ impl<E: 'static + EventSender<Event = ConsoleEvent>> ConsoleListener<E> {
format: u32,
data: serde_bytes::ByteBuf,
) {
self.send(ConsoleEvent::Scanout(Scanout {
width,
height,
stride,
format,
data: data.into_vec(),
}))
self.handler
.scanout(Scanout {
width,
height,
stride,
format,
data: data.into_vec(),
})
.await;
}
fn update(
async fn update(
&mut self,
x: i32,
y: i32,
@ -127,19 +135,21 @@ impl<E: 'static + EventSender<Event = ConsoleEvent>> ConsoleListener<E> {
format: u32,
data: serde_bytes::ByteBuf,
) {
self.send(ConsoleEvent::Update(Update {
x,
y,
w,
h,
stride,
format,
data: data.into_vec(),
}))
self.handler
.update(Update {
x,
y,
w,
h,
stride,
format,
data: data.into_vec(),
})
.await;
}
#[dbus_interface(name = "ScanoutDMABUF")]
fn scanout_dmabuf(
async fn scanout_dmabuf(
&mut self,
fd: Fd,
width: u32,
@ -150,66 +160,58 @@ impl<E: 'static + EventSender<Event = ConsoleEvent>> ConsoleListener<E> {
y0_top: bool,
) {
let fd = unsafe { libc::dup(fd.as_raw_fd()) };
self.send(ConsoleEvent::ScanoutDMABUF(ScanoutDMABUF {
fd,
width,
height,
stride,
fourcc,
modifier,
y0_top,
}))
self.handler
.scanout_dmabuf(ScanoutDMABUF {
fd,
width,
height,
stride,
fourcc,
modifier,
y0_top,
})
.await;
}
#[dbus_interface(name = "UpdateDMABUF")]
fn update_dmabuf(&mut self, x: i32, y: i32, w: i32, h: i32) {
self.send(ConsoleEvent::UpdateDMABUF { x, y, w, h });
if let Err(e) = self.wait() {
eprintln!("update returned error: {}", e)
}
async fn update_dmabuf(&mut self, x: i32, y: i32, w: i32, h: i32) {
self.handler
.update_dmabuf(UpdateDMABUF { x, y, w, h })
.await;
}
fn mouse_set(&mut self, x: i32, y: i32, on: i32) {
self.send(ConsoleEvent::MouseSet(MouseSet { x, y, on }))
async fn mouse_set(&mut self, x: i32, y: i32, on: i32) {
self.handler.mouse_set(MouseSet { x, y, on }).await;
}
fn cursor_define(&mut self, width: i32, height: i32, hot_x: i32, hot_y: i32, data: Vec<u8>) {
self.send(ConsoleEvent::CursorDefine {
width,
height,
hot_x,
hot_y,
data,
})
async fn cursor_define(
&mut self,
width: i32,
height: i32,
hot_x: i32,
hot_y: i32,
data: Vec<u8>,
) {
self.handler
.cursor_define(Cursor {
width,
height,
hot_x,
hot_y,
data,
})
.await;
}
}
impl<E: EventSender<Event = ConsoleEvent>> ConsoleListener<E> {
pub(crate) fn new(tx: E, wait_rx: Receiver<()>) -> Self {
ConsoleListener {
tx,
wait_rx: Mutex::new(wait_rx),
err: Default::default(),
}
}
fn send(&mut self, event: ConsoleEvent) {
if let Err(e) = self.tx.send_event(event) {
let _ = self.err.set(e);
}
}
fn wait(&mut self) -> Result<(), RecvError> {
self.wait_rx.lock().unwrap().recv()
}
pub fn err(&self) -> Arc<OnceCell<SendError<ConsoleEvent>>> {
self.err.clone()
impl<H: ConsoleListenerHandler> ConsoleListener<H> {
pub(crate) fn new(handler: H) -> Self {
Self { handler }
}
}
impl<E: EventSender<Event = ConsoleEvent>> Drop for ConsoleListener<E> {
impl<H: ConsoleListenerHandler> Drop for ConsoleListener<H> {
fn drop(&mut self) {
self.send(ConsoleEvent::Disconnected)
self.handler.disconnected();
}
}

View File

@ -1,14 +1,14 @@
use futures::stream::{self, StreamExt};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::convert::TryInto;
use zbus::azync::Connection;
use zbus::fdo;
use zbus::fdo::ManagedObjects;
use zbus::names::BusName;
use zbus::names::OwnedUniqueName;
use zbus::names::UniqueName;
use zbus::names::WellKnownName;
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
};
use zbus::{
fdo,
fdo::ManagedObjects,
names::{BusName, OwnedUniqueName, UniqueName, WellKnownName},
Connection,
};
use zvariant::OwnedObjectPath;
use crate::{AsyncVMProxy, Audio, Chardev, Clipboard, Error, Result, UsbRedir};
@ -21,10 +21,15 @@ pub struct Display {
impl Display {
pub async fn by_name(conn: &Connection) -> Result<HashMap<String, OwnedUniqueName>> {
let mut hm = HashMap::new();
let list = fdo::AsyncDBusProxy::new(conn)
let list = match fdo::AsyncDBusProxy::new(conn)
.await?
.list_queued_owners(WellKnownName::from_str_unchecked("org.qemu"))
.await?;
.await
{
Ok(list) => list,
Err(zbus::fdo::Error::NameHasNoOwner(_)) => vec![],
Err(e) => return Err(e.into()),
};
for dest in list.into_iter() {
let name = AsyncVMProxy::builder(conn)
.destination(UniqueName::from(&dest))?
@ -47,7 +52,7 @@ impl Display {
} else {
"org.qemu".try_into().unwrap()
};
let objects = fdo::AsyncObjectManagerProxy::builder(&conn)
let objects = fdo::AsyncObjectManagerProxy::builder(conn)
.destination(dest)?
.path("/org/qemu/Display1")?
.build()
@ -107,7 +112,6 @@ impl Display {
.collect()
.await;
let redir = UsbRedir::new(chardevs);
redir
UsbRedir::new(chardevs)
}
}

View File

@ -1,9 +1,6 @@
use usbredirhost::rusb;
use std::convert::Infallible;
use std::error;
use std::fmt;
use std::io;
use std::{convert::Infallible, error, fmt, io};
#[derive(Debug)]
pub enum Error {

View File

@ -1,25 +0,0 @@
use std::sync::mpsc::{SendError, Sender};
use std::sync::Mutex;
pub(crate) trait EventSender: Send + Sync {
type Event;
fn send_event(&self, t: Self::Event) -> Result<(), SendError<Self::Event>>;
}
impl<T: Send + Sync> EventSender for Mutex<Sender<T>> {
type Event = T;
fn send_event(&self, t: Self::Event) -> Result<(), SendError<Self::Event>> {
self.lock().unwrap().send(t)
}
}
#[cfg(feature = "glib")]
impl<T: Send + Sync> EventSender for glib::Sender<T> {
type Event = T;
fn send_event(&self, t: Self::Event) -> Result<(), SendError<Self::Event>> {
self.send(t)
}
}

View File

@ -3,9 +3,6 @@
mod error;
pub use error::*;
mod event_sender;
use event_sender::*;
mod vm;
pub use vm::*;

View File

@ -23,9 +23,9 @@ use crate::{Chardev, Error, Result};
#[derive(Debug)]
struct InnerHandler {
#[allow(unused)] // keep the device opened, as rusb doesn't take it
device_fd: Option<zvariant::OwnedFd>,
stream: UnixStream,
stream_thread: JoinHandle<()>,
ctxt: rusb::Context,
ctxt_thread: Option<JoinHandle<()>>,
event: (UnixStream, UnixStream),
@ -90,7 +90,7 @@ impl Handler {
Ok(it) => (it, None),
Err(rusb::Error::Access) => {
let (bus, dev) = (device.bus_number(), device.address());
let sysbus = zbus::azync::Connection::system().await?;
let sysbus = zbus::Connection::system().await?;
let fd = AsyncSystemHelperProxy::new(&sysbus)
.await?
.open_bus_dev(bus, dev)
@ -110,7 +110,7 @@ impl Handler {
// really annoying libusb/usbredir APIs...
let event = UnixStream::pair()?;
let event_fd = event.1.as_raw_fd();
let stream_thread = std::thread::spawn(move || loop {
std::thread::spawn(move || loop {
let ret = fd_poll_readable(stream_fd, Some(event_fd));
c.interrupt_handle_events();
if ret.is_err() {
@ -122,7 +122,6 @@ impl Handler {
inner: Arc::new(Mutex::new(InnerHandler {
device_fd,
stream,
stream_thread,
event,
quit: false,
ctxt: ctxt.clone(),
@ -162,7 +161,7 @@ impl Drop for Handler {
inner.quit = true;
inner.ctxt.interrupt_handle_events();
// stream will be dropped and stream_thread will kick context_thread
inner.event.0.write(&[0]).unwrap();
inner.event.0.write_all(&[0]).unwrap();
}
}
@ -234,24 +233,30 @@ impl UsbRedir {
) -> Result<bool> {
let mut inner = self.inner.borrow_mut();
let key = Key::from_device(device);
let handled = inner.handlers.contains_key(&key);
// We should do better and watch for owner properties changes, but this would require tasks
// anticipate result
let mut nfree = inner.n_available_chardev().await as _;
if state {
if !inner.handlers.contains_key(&key) {
match (state, handled) {
(true, false) => {
let chardev = inner
.first_available_chardev()
.await
.ok_or_else(|| Error::Failed("There are no free USB channels".into()))?;
let handler = Handler::new(device, chardev).await?;
inner.handlers.insert(key, handler);
nfree -= 1;
}
(false, true) => {
inner.handlers.remove(&key);
nfree += 1;
}
_ => {
return Ok(state);
}
nfree -= 1;
} else {
inner.handlers.remove(&key);
nfree += 1;
}
// We should do better and watch for owner properties changes, but this would require tasks
let _ = inner.channel.0.broadcast(Event::NFreeChannels(nfree)).await;
Ok(state)

View File

@ -11,9 +11,9 @@ log = "0.4"
pretty_env_logger = "0.4"
once_cell = "1.5"
zbus = { version = "2.0.0-beta" }
qemu-display = { path = "../qemu-display", features = ["glib"] }
qemu-display = { path = "../qemu-display" }
keycodemap = { path = "../keycodemap" }
gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs" }
rdw = { git = "https://gitlab.gnome.org/malureau/rdw.git" }
rdw = { package = "rdw4", git = "https://gitlab.gnome.org/malureau/rdw.git" }
futures-util = "0.3.13"
futures = "0.3.13"
async-trait = "0.1.48"

View File

@ -1,60 +1,57 @@
use std::error::Error;
use std::result::Result;
use std::thread;
use std::{error::Error, result::Result};
use qemu_display::Audio;
use qemu_display::{Audio, AudioOutHandler};
#[derive(Debug)]
pub struct Handler {
#[allow(unused)]
audio: Audio,
}
#[derive(Debug, Default)]
pub struct Handler {
thread: Option<thread::JoinHandle<()>>,
pub struct OutListener {
gst: rdw::GstAudio,
}
#[async_trait::async_trait]
impl AudioOutHandler for OutListener {
async fn init(&mut self, id: u64, info: qemu_display::PCMInfo) {
if let Err(e) = self.gst.init_out(id, &info.gst_caps()) {
log::warn!("Failed to initialize audio stream: {}", e);
}
}
async fn fini(&mut self, id: u64) {
self.gst.fini_out(id);
}
async fn set_enabled(&mut self, id: u64, enabled: bool) {
if let Err(e) = self.gst.set_enabled_out(id, enabled) {
log::warn!("Failed to set enabled audio stream: {}", e);
}
}
async fn set_volume(&mut self, id: u64, volume: qemu_display::Volume) {
if let Err(e) = self.gst.set_volume_out(
id,
volume.mute,
volume.volume.first().map(|v| *v as f64 / 255f64),
) {
log::warn!("Failed to set volume: {}", e);
}
}
async fn write(&mut self, id: u64, data: Vec<u8>) {
if let Err(e) = self.gst.write_out(id, data) {
log::warn!("Failed to output stream: {}", e);
}
}
}
impl Handler {
pub async fn new(audio: Audio) -> Result<Self, Box<dyn Error>> {
let rx = audio.listen_out().await?;
let mut gst = rdw::GstAudio::new()?;
let thread = thread::spawn(move || loop {
match rx.recv() {
Ok(event) => {
use qemu_display::AudioOutEvent::*;
match event {
Init { id, info } => {
if let Err(e) = gst.init_out(id, &info.gst_caps()) {
log::warn!("Failed to initialize audio stream: {}", e);
}
}
Fini { id } => {
gst.fini_out(id);
}
SetEnabled { id, enabled } => {
if let Err(e) = gst.set_enabled_out(id, enabled) {
log::warn!("Failed to set enabled audio stream: {}", e);
}
}
SetVolume { id, volume } => {
if let Err(e) = gst.set_volume_out(
id,
volume.mute,
volume.volume.first().map(|v| *v as f64 / 255f64),
) {
log::warn!("Failed to set volume: {}", e);
}
}
Write { id, data } => {
if let Err(e) = gst.write_out(id, data) {
log::warn!("Failed to output stream: {}", e);
}
}
}
}
Err(e) => log::warn!("Audio thread error: {}", e),
}
});
Ok(Self {
thread: Some(thread),
})
pub async fn new(mut audio: Audio) -> Result<Handler, Box<dyn Error>> {
let gst = rdw::GstAudio::new()?;
audio.register_out_listener(OutListener { gst }).await?;
Ok(Handler { audio })
}
}

View File

@ -1,145 +1,195 @@
use std::cell::Cell;
use std::error::Error;
use std::rc::Rc;
use std::result::Result;
use crate::glib::{self, clone, prelude::*, SignalHandlerId, SourceId};
use gtk::{gdk, gio, prelude::DisplayExt, prelude::*};
use qemu_display::{
self as qdl, AsyncClipboardProxy, Clipboard, ClipboardEvent, ClipboardSelection,
use std::{
error::Error,
result::Result,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use glib::{clone, SignalHandlerId};
use gtk::{
gdk, gio, glib,
prelude::{DisplayExt, *},
};
use qemu_display::{AsyncClipboardProxy, Clipboard, ClipboardHandler, ClipboardSelection};
use rdw::gtk;
#[derive(Debug)]
pub struct Handler {
rx: SourceId,
#[allow(unused)]
clipboard: Clipboard,
cb_handler: Option<SignalHandlerId>,
cb_primary_handler: Option<SignalHandlerId>,
}
impl Handler {
pub async fn new(ctxt: Clipboard) -> Result<Self, Box<dyn Error>> {
let rx = ctxt
.glib_listen()
.await
.expect("Failed to listen to the clipboard");
let proxy = ctxt.proxy.clone();
let serials = Rc::new([Cell::new(0), Cell::new(0)]);
let current_serials = serials.clone();
let rx = rx.attach(None, move |evt| {
use ClipboardEvent::*;
#[derive(Debug)]
struct InnerHandler {
proxy: AsyncClipboardProxy<'static>,
serials: Arc<[AtomicU32; 2]>,
}
log::debug!("Clipboard event: {:?}", evt);
match evt {
Register | Unregister => {
current_serials[0].set(0);
current_serials[1].set(0);
}
Grab {
selection,
serial,
mimes,
} => {
if let Some((clipboard, idx)) = clipboard_from_selection(selection) {
if serial < current_serials[idx].get() {
log::debug!("Ignored peer grab: {} < {}", serial, current_serials[idx].get());
return Continue(true);
}
impl InnerHandler {
fn reset_serials(&mut self) {
self.serials[0].store(0, Ordering::SeqCst);
self.serials[1].store(0, Ordering::SeqCst);
}
}
current_serials[idx].set(serial);
let m: Vec<_> = mimes.iter().map(|s|s.as_str()).collect();
let p = proxy.clone();
let content = rdw::ContentProvider::new(&m, move |mime, stream, prio| {
log::debug!("content-provider-write: {:?}", (mime, stream));
#[async_trait::async_trait]
impl ClipboardHandler for InnerHandler {
async fn register(&mut self) {
self.reset_serials();
}
let p = p.clone();
let mime = mime.to_string();
Some(Box::pin(clone!(@strong stream => @default-return panic!(), async move {
match p.request(selection, &[&mime]).await {
Ok((_, data)) => {
let bytes = glib::Bytes::from(&data);
stream.write_bytes_async_future(&bytes, prio).await.map(|_| ())
}
Err(e) => {
let err = format!("failed to request clipboard data: {}", e);
log::warn!("{}", err);
Err(glib::Error::new(gio::IOErrorEnum::Failed, &err))
}
}
})))
});
async fn unregister(&mut self) {
self.reset_serials();
}
if let Err(e) = clipboard.set_content(Some(&content)) {
log::warn!("Failed to set clipboard grab: {}", e);
}
}
}
Release { selection } => {
if let Some((clipboard, _)) = clipboard_from_selection(selection) {
// TODO: track if the outside/app changed the clipboard
if let Err(e) = clipboard.set_content(gdk::NONE_CONTENT_PROVIDER) {
log::warn!("Failed to release clipboard: {}", e);
}
}
}
Request { selection, mimes, tx } => {
if let Some((clipboard, _)) = clipboard_from_selection(selection) {
glib::MainContext::default().spawn_local(async move {
let m: Vec<_> = mimes.iter().map(|s|s.as_str()).collect();
let res = clipboard.read_async_future(&m, glib::Priority::default()).await;
log::debug!("clipboard-read: {}", res.is_ok());
let reply = match res {
Ok((stream, mime)) => {
let out = gio::MemoryOutputStream::new_resizable();
let res = out.splice_async_future(
&stream,
gio::OutputStreamSpliceFlags::CLOSE_SOURCE | gio::OutputStreamSpliceFlags::CLOSE_TARGET,
glib::Priority::default()).await;
match res {
Ok(_) => {
let data = out.steal_as_bytes();
Ok((mime.to_string(), data.as_ref().to_vec()))
}
Err(e) => {
Err(qdl::Error::Failed(format!("{}", e)))
}
}
}
Err(e) => {
Err(qdl::Error::Failed(format!("{}", e)))
}
};
let _ = tx.lock().unwrap().send(reply);
});
}
}
async fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec<String>) {
if let Some((clipboard, idx)) = clipboard_from_selection(selection) {
let cur_serial = self.serials[idx].load(Ordering::SeqCst);
if serial < cur_serial {
log::debug!("Ignored peer grab: {} < {}", serial, cur_serial);
return;
}
Continue(true)
});
self.serials[idx].store(serial, Ordering::SeqCst);
let m: Vec<_> = mimes.iter().map(|s| s.as_str()).collect();
let p = self.proxy.clone();
let content = rdw::ContentProvider::new(&m, move |mime, stream, prio| {
log::debug!("content-provider-write: {:?}", (mime, stream));
let p = p.clone();
let mime = mime.to_string();
Some(Box::pin(
clone!(@strong stream => @default-return panic!(), async move {
match p.request(selection, &[&mime]).await {
Ok((_, data)) => {
let bytes = glib::Bytes::from(&data);
stream.write_bytes_async_future(&bytes, prio).await.map(|_| ())
}
Err(e) => {
let err = format!("failed to request clipboard data: {}", e);
log::warn!("{}", err);
Err(glib::Error::new(gio::IOErrorEnum::Failed, &err))
}
}
}),
))
});
if let Err(e) = clipboard.set_content(Some(&content)) {
log::warn!("Failed to set clipboard grab: {}", e);
}
}
}
async fn release(&mut self, selection: ClipboardSelection) {
if let Some((clipboard, _)) = clipboard_from_selection(selection) {
// TODO: track if the outside/app changed the clipboard
if let Err(e) = clipboard.set_content(gdk::NONE_CONTENT_PROVIDER) {
log::warn!("Failed to release clipboard: {}", e);
}
}
}
async fn request(
&mut self,
selection: ClipboardSelection,
mimes: Vec<String>,
) -> qemu_display::Result<(String, Vec<u8>)> {
// we have to spawn a local future, because clipboard is not Send
let (sender, receiver) = futures::channel::oneshot::channel();
glib::MainContext::default().spawn_local(async move {
let res = if let Some((clipboard, _)) = clipboard_from_selection(selection) {
let m: Vec<_> = mimes.iter().map(|s| s.as_str()).collect();
let res = clipboard
.read_async_future(&m, glib::Priority::default())
.await;
log::debug!("clipboard-read: {}", res.is_ok());
match res {
Ok((stream, mime)) => {
let out = gio::MemoryOutputStream::new_resizable();
let res = out
.splice_async_future(
&stream,
gio::OutputStreamSpliceFlags::CLOSE_SOURCE
| gio::OutputStreamSpliceFlags::CLOSE_TARGET,
glib::Priority::default(),
)
.await;
match res {
Ok(_) => {
let data = out.steal_as_bytes();
Ok((mime.to_string(), data.as_ref().to_vec()))
}
Err(e) => Err(qemu_display::Error::Failed(format!("{}", e))),
}
}
Err(e) => Err(qemu_display::Error::Failed(format!("{}", e))),
}
} else {
Err(qemu_display::Error::Failed(
"Clipboard request failed".into(),
))
};
sender.send(res).unwrap()
});
match receiver.await {
Ok(res) => res,
Err(e) => Err(qemu_display::Error::Failed(format!(
"Clipboard request failed: {}",
e
))),
}
}
}
impl Handler {
pub async fn new(clipboard: Clipboard) -> Result<Handler, Box<dyn Error>> {
let proxy = clipboard.proxy.clone();
let serials = Arc::new([AtomicU32::new(0), AtomicU32::new(0)]);
let cb_handler = watch_clipboard(
ctxt.proxy.clone(),
clipboard.proxy.clone(),
ClipboardSelection::Clipboard,
serials.clone(),
);
let cb_primary_handler = watch_clipboard(
ctxt.proxy.clone(),
clipboard.proxy.clone(),
ClipboardSelection::Primary,
serials.clone(),
);
ctxt.register().await?;
Ok(Self {
rx,
clipboard.register(InnerHandler { proxy, serials }).await?;
Ok(Handler {
clipboard,
cb_handler,
cb_primary_handler,
})
}
}
impl Drop for Handler {
fn drop(&mut self) {
if let Some(id) = self.cb_primary_handler.take() {
clipboard_from_selection(ClipboardSelection::Primary)
.unwrap()
.0
.disconnect(id);
}
if let Some(id) = self.cb_handler.take() {
clipboard_from_selection(ClipboardSelection::Clipboard)
.unwrap()
.0
.disconnect(id);
}
}
}
fn watch_clipboard(
proxy: AsyncClipboardProxy<'static>,
selection: ClipboardSelection,
serials: Rc<[Cell<u32>; 2]>,
serials: Arc<[AtomicU32; 2]>,
) -> Option<SignalHandlerId> {
let (clipboard, idx) = match clipboard_from_selection(selection) {
Some(it) => it,
@ -161,9 +211,9 @@ fn watch_clipboard(
let _ = proxy.release(selection).await;
} else {
let mimes: Vec<_> = types.iter().map(|s| s.as_str()).collect();
let ser = serials[idx].get();
let ser = serials[idx].load(Ordering::SeqCst);
let _ = proxy.grab(selection, ser, &mimes).await;
serials[idx].set(ser + 1);
serials[idx].store(ser + 1, Ordering::SeqCst);
}
});
}

View File

@ -1,15 +1,15 @@
use futures_util::StreamExt;
use glib::{clone, subclass::prelude::*, MainContext};
use gtk::{glib, prelude::*};
use once_cell::sync::OnceCell;
use gtk::glib;
use keycodemap::KEYMAP_XORGEVDEV2QNUM;
use qemu_display::Console;
use rdw::DisplayExt;
use once_cell::sync::OnceCell;
use qemu_display::{Console, ConsoleListenerHandler};
use rdw::{gtk, DisplayExt};
use std::os::unix::io::IntoRawFd;
mod imp {
use super::*;
use gtk::subclass::prelude::*;
use std::{convert::TryInto, os::unix::io::IntoRawFd};
#[repr(C)]
pub struct RdwDisplayQemuClass {
@ -136,22 +136,17 @@ mod imp {
MainContext::default().spawn_local(clone!(@weak widget => async move {
let self_ = Self::from_instance(&widget);
let console = self_.console.get().unwrap();
let (rx, wait_tx) = console
.glib_listen()
.await
.expect("Failed to listen to the console");
rx.attach(
None,
clone!(@weak widget => @default-panic, move |evt| {
use qemu_display::ConsoleEvent::*;
log::debug!("Console event: {:?}", evt);
match evt {
// we have to use a channel, because widget is not Send..
let (sender, mut receiver) = futures::channel::mpsc::unbounded();
console.register_listener(ConsoleHandler { sender }).await.unwrap();
MainContext::default().spawn_local(clone!(@weak widget => async move {
while let Some(e) = receiver.next().await {
use ConsoleEvent::*;
match e {
Scanout(s) => {
if s.format != 0x20020888 {
log::warn!("Format not yet supported: {:X}", s.format);
return Continue(true);
continue;
}
widget.set_display_size(Some((s.width as _, s.height as _)));
widget.update_area(0, 0, s.width as _, s.height as _, s.stride as _, &s.data);
@ -159,7 +154,7 @@ mod imp {
Update(u) => {
if u.format != 0x20020888 {
log::warn!("Format not yet supported: {:X}", u.format);
return Continue(true);
continue;
}
widget.update_area(u.x as _, u.y as _, u.w as _, u.h as _, u.stride as _, &u.data);
}
@ -175,19 +170,20 @@ mod imp {
fd: s.into_raw_fd(),
});
}
UpdateDMABUF { .. } => {
UpdateDMABUF { wait_tx, .. } => {
widget.render();
let _ = wait_tx.send(());
}
Disconnected => {
log::warn!("Console disconnected");
}
CursorDefine { width, height, hot_x, hot_y, data }=> {
CursorDefine(c) => {
let cursor = rdw::Display::make_cursor(
&data,
width,
height,
hot_x,
hot_y,
&c.data,
c.width,
c.height,
c.hot_x,
c.hot_y,
1,
);
widget.define_cursor(Some(cursor));
@ -200,41 +196,21 @@ mod imp {
}
}
}
Continue(true)
})
);
let mut abs_changed = console.mouse.receive_is_absolute_changed().await;
MainContext::default().spawn_local(clone!(@weak widget => async move {
use futures_util::StreamExt;
while let Some(abs) = abs_changed.next().await {
let abs = if let Some(abs) = abs {
abs.try_into().unwrap_or(false)
} else {
continue;
};
widget.set_mouse_absolute(abs);
}
}));
loop {
if let Err(e) = console.dispatch_signals().await {
log::warn!("Console dispatching error: {}", e);
break;
let mut abs_changed = console.mouse.receive_is_absolute_changed().await;
MainContext::default().spawn_local(clone!(@weak widget => async move {
while let Some(abs) = abs_changed.next().await {
if let Some(abs) = abs {
widget.set_mouse_absolute(abs);
}
}
}
}));
}));
}
}
impl rdw::DisplayImpl for Display {}
impl Display {
pub(crate) fn set_console(&self, console: Console) {
self.console.set(console).unwrap();
}
}
}
glib::wrapper! {
@ -245,7 +221,7 @@ impl Display {
pub fn new(console: Console) -> Self {
let obj = glib::Object::new::<Self>(&[]).unwrap();
let self_ = imp::Display::from_instance(&obj);
self_.set_console(console);
self_.console.set(console).unwrap();
obj
}
@ -255,6 +231,67 @@ impl Display {
}
}
#[derive(Debug)]
enum ConsoleEvent {
Scanout(qemu_display::Scanout),
Update(qemu_display::Update),
ScanoutDMABUF(qemu_display::ScanoutDMABUF),
UpdateDMABUF {
_update: qemu_display::UpdateDMABUF,
wait_tx: futures::channel::oneshot::Sender<()>,
},
MouseSet(qemu_display::MouseSet),
CursorDefine(qemu_display::Cursor),
Disconnected,
}
struct ConsoleHandler {
sender: futures::channel::mpsc::UnboundedSender<ConsoleEvent>,
}
impl ConsoleHandler {
fn send(&self, event: ConsoleEvent) {
if let Err(e) = self.sender.unbounded_send(event) {
log::warn!("failed to send console event: {}", e);
}
}
}
#[async_trait::async_trait]
impl ConsoleListenerHandler for ConsoleHandler {
async fn scanout(&mut self, scanout: qemu_display::Scanout) {
self.send(ConsoleEvent::Scanout(scanout));
}
async fn update(&mut self, update: qemu_display::Update) {
self.send(ConsoleEvent::Update(update));
}
async fn scanout_dmabuf(&mut self, scanout: qemu_display::ScanoutDMABUF) {
self.send(ConsoleEvent::ScanoutDMABUF(scanout));
}
async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) {
let (wait_tx, wait_rx) = futures::channel::oneshot::channel();
self.send(ConsoleEvent::UpdateDMABUF { _update, wait_tx });
if let Err(e) = wait_rx.await {
log::warn!("wait update dmabuf failed: {}", e);
}
}
async fn mouse_set(&mut self, set: qemu_display::MouseSet) {
self.send(ConsoleEvent::MouseSet(set));
}
async fn cursor_define(&mut self, cursor: qemu_display::Cursor) {
self.send(ConsoleEvent::CursorDefine(cursor));
}
fn disconnected(&mut self) {
self.send(ConsoleEvent::Disconnected);
}
}
fn from_gdk_button(button: u32) -> qemu_display::MouseButton {
use qemu_display::MouseButton::*;

View File

@ -2,9 +2,8 @@ use gio::ApplicationFlags;
use glib::MainContext;
use gtk::{gio, glib, prelude::*};
use qemu_display::{Chardev, Console, Display};
use std::cell::RefCell;
use std::sync::Arc;
use zbus::Connection;
use rdw::gtk;
use std::{cell::RefCell, sync::Arc};
mod audio;
mod clipboard;
@ -13,7 +12,6 @@ mod usbredir;
struct Inner {
app: gtk::Application,
conn: zbus::azync::Connection,
usbredir: RefCell<Option<usbredir::Handler>>,
audio: RefCell<Option<audio::Handler>>,
clipboard: RefCell<Option<clipboard::Handler>>,
@ -69,20 +67,15 @@ impl App {
if opt.lookup_value("list", None).is_some() {
app_opt.list = true;
}
app_opt.vm_name =
opt.lookup_value(&glib::OPTION_REMAINING, None)
.and_then(|args| args.child_value(0).get::<String>());
app_opt.vm_name = opt
.lookup_value(&glib::OPTION_REMAINING, None)
.and_then(|args| args.child_value(0).get::<String>());
-1
});
let conn = Connection::session()
.expect("Failed to connect to DBus")
.into();
let app = App {
inner: Arc::new(Inner {
app,
conn,
usbredir: Default::default(),
audio: Default::default(),
clipboard: Default::default(),
@ -90,7 +83,6 @@ impl App {
};
let app_clone = app.clone();
let opt_clone = opt.clone();
app.inner.app.connect_activate(move |app| {
let ui_src = include_str!("main.ui");
let builder = gtk::Builder::new();
@ -102,11 +94,24 @@ impl App {
window.set_application(Some(app));
let app_clone = app_clone.clone();
let opt_clone = opt_clone.clone();
let opt_clone = opt.clone();
MainContext::default().spawn_local(async move {
// let opt = opt_clone.borrow();
let conn = zbus::ConnectionBuilder::session()
.unwrap()
.internal_executor(false)
.build()
.await
.expect("Failed to connect to DBus");
let conn_clone = conn.clone();
MainContext::default().spawn_local(async move {
loop {
conn_clone.executor().tick().await;
}
});
if opt_clone.borrow().list {
let list = Display::by_name(app_clone.connection()).await.unwrap();
let list = Display::by_name(&conn).await.unwrap();
for (name, dest) in list {
println!("{} (at {})", name, dest);
}
@ -114,20 +119,18 @@ impl App {
return;
}
let dest = if let Some(name) = opt_clone.borrow().vm_name.as_ref() {
let list = Display::by_name(app_clone.connection()).await.unwrap();
let list = Display::by_name(&conn).await.unwrap();
Some(
list.get(name)
.expect(&format!("Can't find VM name: {}", name))
.unwrap_or_else(|| panic!("Can't find VM name: {}", name))
.clone(),
)
} else {
None
};
let display = Display::new(app_clone.connection(), dest.as_ref())
.await
.unwrap();
let display = Display::new(&conn, dest.as_ref()).await.unwrap();
let console = Console::new(app_clone.connection(), 0)
let console = Console::new(&conn, 0)
.await
.expect("Failed to get the QEMU console");
let rdw = display::Display::new(console);
@ -143,22 +146,26 @@ impl App {
if let Ok(Some(audio)) = display.audio().await {
match audio::Handler::new(audio).await {
Ok(handler) => app_clone.set_audio(handler),
Err(e) => log::warn!("Failed to setup audio: {}", e),
Err(e) => {
log::warn!("Failed to setup audio handler: {}", e);
}
}
}
if let Ok(Some(clipboard)) = display.clipboard().await {
match clipboard::Handler::new(clipboard).await {
Ok(handler) => app_clone.set_clipboard(handler),
Err(e) => log::warn!("Failed to setup clipboard: {}", e),
Err(e) => {
log::warn!("Failed to setup clipboard handler: {}", e);
}
}
}
if let Ok(c) = Chardev::new(app_clone.connection(), "qmp").await {
use std::io::prelude::*;
use std::io::BufReader;
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixStream;
if let Ok(c) = Chardev::new(&conn, "qmp").await {
use std::{
io::{prelude::*, BufReader},
os::unix::{io::AsRawFd, net::UnixStream},
};
let (p0, p1) = UnixStream::pair().unwrap();
if c.proxy.register(p1.as_raw_fd().into()).await.is_ok() {
@ -192,10 +199,6 @@ impl App {
app
}
fn connection(&self) -> &zbus::azync::Connection {
&self.inner.conn
}
fn set_usbredir(&self, usbredir: usbredir::Handler) {
self.inner.usbredir.replace(Some(usbredir));
}
@ -204,8 +207,8 @@ impl App {
self.inner.audio.replace(Some(audio));
}
fn set_clipboard(&self, clipboard: clipboard::Handler) {
self.inner.clipboard.replace(Some(clipboard));
fn set_clipboard(&self, cb: clipboard::Handler) {
self.inner.clipboard.replace(Some(cb));
}
fn run(&self) -> i32 {

View File

@ -1,6 +1,7 @@
use glib::{clone, MainContext};
use gtk::{glib, prelude::*};
use qemu_display::UsbRedir;
use rdw::gtk;
#[derive(Clone, Debug)]
pub struct Handler {
@ -30,7 +31,7 @@ impl Handler {
let usbredir = self.usbredir.clone();
widget.connect_device_state_set(move |widget, item, state| {
let device = match item.device() {
Some(it) => it.clone(),
Some(it) => it,
_ => return,
};

View File

@ -16,3 +16,4 @@ libc = "0.2.86"
image = "0.23.14"
derivative = "2.2.0"
async-io = "1.3.1"
async-trait = "0.1.48"

View File

@ -1,20 +1,22 @@
use std::iter::FromIterator;
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::{collections::HashSet, convert::TryInto};
use std::{error::Error, thread::JoinHandle};
use std::{io, thread, time};
use std::{
borrow::Borrow,
collections::HashSet,
error::Error,
io,
iter::FromIterator,
net::{TcpListener, TcpStream},
sync::{mpsc, Arc, Mutex},
thread, time,
};
use clap::Clap;
use image::GenericImage;
use keycodemap::*;
use qemu_display::{Console, ConsoleEvent, MouseButton, VMProxy};
use qemu_display::{AsyncVMProxy, Console, ConsoleListenerHandler, MouseButton};
use vnc::{
server::Event as VncEvent, server::FramebufferUpdate, Encoding, Error as VncError, PixelFormat,
Rect, Screen, Server as VncServer,
server::{Event as VncEvent, FramebufferUpdate},
Encoding, Error as VncError, PixelFormat, Rect, Screen, Server as VncServer,
};
use zbus::Connection;
#[derive(Clap, Debug)]
pub struct SocketAddrArgs {
@ -233,10 +235,59 @@ impl Client {
}
}
#[derive(Debug)]
struct ConsoleListener {
server: Server,
}
#[async_trait::async_trait]
impl ConsoleListenerHandler for ConsoleListener {
async fn scanout(&mut self, s: qemu_display::Scanout) {
let mut inner = self.server.inner.lock().unwrap();
inner.image = image_from_vec(s.format, s.width, s.height, s.stride, s.data);
}
async fn update(&mut self, u: qemu_display::Update) {
let mut inner = self.server.inner.lock().unwrap();
let update = image_from_vec(u.format, u.w as _, u.h as _, u.stride, u.data);
if (u.x, u.y) == (0, 0) && update.dimensions() == inner.image.dimensions() {
inner.image = update;
} else {
inner.image.copy_from(&update, u.x as _, u.y as _).unwrap();
}
let rect = Rect {
left: u.x as _,
top: u.y as _,
width: u.w as _,
height: u.h as _,
};
inner.tx.send(Event::ConsoleUpdate(rect)).unwrap();
}
async fn scanout_dmabuf(&mut self, _scanout: qemu_display::ScanoutDMABUF) {
unimplemented!()
}
async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) {
unimplemented!()
}
async fn mouse_set(&mut self, set: qemu_display::MouseSet) {
dbg!(set);
}
async fn cursor_define(&mut self, cursor: qemu_display::Cursor) {
dbg!(cursor);
}
fn disconnected(&mut self) {
dbg!();
}
}
#[derive(Debug)]
struct ServerInner {
console: Console,
console_thread: Option<JoinHandle<()>>,
image: BgraImage,
tx: mpsc::Sender<Event>,
}
@ -257,76 +308,24 @@ impl Server {
Ok(Self {
vm_name,
rx: Arc::new(Mutex::new(rx)),
inner: Arc::new(Mutex::new(ServerInner {
console,
console_thread: None,
image,
tx,
})),
inner: Arc::new(Mutex::new(ServerInner { console, image, tx })),
})
}
fn stop_console(&self) -> Result<(), Box<dyn Error>> {
let mut inner = self.inner.lock().unwrap();
if let Some(_thread) = inner.console_thread.take() {
todo!("join console thread");
//thread.join().unwrap();
}
inner.console.unregister_listener();
Ok(())
}
async fn run_console(&self) -> Result<(), Box<dyn Error>> {
let mut inner = self.inner.lock().unwrap();
if inner.console_thread.is_some() {
return Ok(());
}
let server = self.clone();
let (console_rx, _ack) = inner.console.listen().await?;
let thread = thread::spawn(move || loop {
match console_rx.recv().unwrap() {
ConsoleEvent::ScanoutDMABUF(_) | ConsoleEvent::UpdateDMABUF { .. } => {
unimplemented!();
}
ConsoleEvent::Scanout(s) => {
let mut inner = server.inner.lock().unwrap();
inner.image = image_from_vec(s.format, s.width, s.height, s.stride, s.data);
}
ConsoleEvent::Update(u) => {
let mut inner = server.inner.lock().unwrap();
let update = image_from_vec(
u.format,
u.w.try_into().unwrap(),
u.h.try_into().unwrap(),
u.stride,
u.data,
);
if (u.x, u.y) == (0, 0) && update.dimensions() == inner.image.dimensions() {
inner.image = update;
} else {
inner
.image
.copy_from(&update, u.x.try_into().unwrap(), u.y.try_into().unwrap())
.unwrap();
}
let rect = Rect {
left: u.x.try_into().unwrap(),
top: u.y.try_into().unwrap(),
width: u.w.try_into().unwrap(),
height: u.h.try_into().unwrap(),
};
inner.tx.send(Event::ConsoleUpdate(rect)).unwrap();
}
ConsoleEvent::CursorDefine { .. } => {}
ConsoleEvent::MouseSet(_) => {}
e => {
dbg!(e);
}
}
});
inner.console_thread = Some(thread);
inner
.console
.register_listener(ConsoleListener {
server: self.clone(),
})
.await?;
Ok(())
}
@ -471,13 +470,15 @@ async fn run() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind::<std::net::SocketAddr>(args.address.into()).unwrap();
let dbus = if let Some(addr) = args.dbus_address {
Connection::new_for_address(&addr, true)
zbus::ConnectionBuilder::address(addr.borrow())?
.build()
.await
} else {
Connection::new_session()
zbus::Connection::session().await
}
.expect("Failed to connect to DBus");
let vm_name = VMProxy::new(&dbus)?.name()?;
let vm_name = AsyncVMProxy::new(&dbus).await?.name().await?;
let console = Console::new(&dbus.into(), 0)
.await

View File

@ -8,9 +8,9 @@ log = "0.4"
pretty_env_logger = "0.4"
once_cell = "1.5"
zbus = { version = "2.0.0-beta" }
qemu-display = { path = "../qemu-display", features = ["glib"] }
qemu-display = { path = "../qemu-display" }
futures = "0.3.13"
[dependencies.vte]
package = "vte4"
git = "https://gitlab.gnome.org/malureau/vte4-rs"
version = "0.0.1"

View File

@ -1,11 +1,10 @@
use futures::prelude::*;
use glib::{clone, MainContext};
use gtk::{gio, glib};
use qemu_display::Chardev;
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixStream;
use vte::prelude::*;
use vte::{gio, glib, gtk};
use zbus::azync::Connection;
use std::os::unix::{io::AsRawFd, net::UnixStream};
use vte::{gtk, prelude::*};
use zbus::Connection;
fn main() {
pretty_env_logger::init();