Add a new qmp feature and Display::new_qmp

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
This commit is contained in:
Marc-André Lureau 2022-10-18 17:44:43 +04:00
parent 8f70feb2e5
commit 18f887f111
7 changed files with 224 additions and 174 deletions

View File

@ -6,6 +6,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
qmp = ["dep:qapi", "dep:base64"]
[dependencies]
cfg-if = "1.0"
log = "0.4"
@ -14,7 +17,7 @@ zbus = { version = "3.2", features = ["xml"] }
zvariant = { version = "3.0", features = ["serde_bytes"] }
libc = "0.2.86"
enumflags2 = { version = "0.7", features = ["serde"] }
serde = { version = "1.0.123", features = ["derive"] }
serde = { version = "1.0.27", features = ["derive"] }
serde_repr = "0.1.6"
serde_bytes = "0.11.5"
futures-util = { version = "0.3.8", features = ["async-await-macro"] }
@ -24,14 +27,17 @@ usbredirhost = "0.0.1"
async-broadcast = "0.3.3"
async-trait = "0.1.48"
async-lock = "2.3.0"
qapi = { version = "0.9.0", features = ["qmp"], optional = true }
base64 = { version = "0.13", optional = true }
[target.'cfg(windows)'.dependencies]
uds_windows = "1.0.1"
windows = { version = "0.39.0", features = ["Win32_Networking_WinSock", "Win32_Foundation", "Win32_System_IO", "Win32_System_Threading"] }
[target.'cfg(windows)'.dev-dependencies]
qapi = { version = "0.9.0", features = ["qmp"] }
serde = { version = "^1.0.27", features = ["derive"] }
base64 = "0.13"
async-std = { version = "1.12.0", features = ["attributes"] }
tracing-subscriber = { version = "0.3.11", features = ["env-filter" , "fmt"], default-features = false }
[[example]]
name = 'win32-test'
required-features = ["qmp"]

View File

@ -1,135 +1,18 @@
#![allow(non_snake_case, non_camel_case_types)]
use std::env::args;
use std::error::Error;
use std::os::windows::io::AsRawSocket;
use std::thread::sleep;
use std::time::Duration;
use qapi::{qmp, Qmp};
use serde::{Deserialize, Serialize};
use uds_windows::UnixStream;
use windows::Win32::Networking::WinSock::{WSADuplicateSocketW, SOCKET, WSAPROTOCOL_INFOW};
use qemu_display::Display;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct get_win32_socket {
#[serde(rename = "info")]
pub info: ::std::string::String,
#[serde(rename = "fdname")]
pub fdname: ::std::string::String,
}
impl qmp::QmpCommand for get_win32_socket {}
impl qapi::Command for get_win32_socket {
const NAME: &'static str = "get-win32-socket";
const ALLOW_OOB: bool = false;
type Ok = qapi::Empty;
}
fn wsa_last_err() -> std::io::Error {
use windows::Win32::Networking::WinSock::WSAGetLastError;
let err = unsafe { WSAGetLastError() };
std::io::Error::from_raw_os_error(err.0)
}
// Get the process ID of the connected peer
fn unix_stream_get_peer_pid(stream: &UnixStream) -> Result<u32, std::io::Error> {
use windows::Win32::Networking::WinSock::{WSAIoctl, IOC_OUT, IOC_VENDOR, SOCKET_ERROR};
macro_rules! _WSAIOR {
($x:expr, $y:expr) => {
IOC_OUT | $x | $y
};
}
let socket = stream.as_raw_socket();
const SIO_AF_UNIX_GETPEERPID: u32 = _WSAIOR!(IOC_VENDOR, 256);
let mut ret = 0 as u32;
let mut bytes = 0;
let r = unsafe {
WSAIoctl(
SOCKET(socket as _),
SIO_AF_UNIX_GETPEERPID,
0 as *mut _,
0,
&mut ret as *mut _ as *mut _,
std::mem::size_of_val(&ret) as u32,
&mut bytes,
0 as *mut _,
None,
)
};
if r == SOCKET_ERROR {
return Err(wsa_last_err());
}
Ok(ret)
}
fn duplicate_socket(pid: u32, sock: SOCKET) -> Result<Vec<u8>, std::io::Error> {
let mut info = unsafe { std::mem::zeroed() };
if unsafe { WSADuplicateSocketW(sock, pid, &mut info) } != 0 {
return Err(wsa_last_err());
}
let info = unsafe {
std::slice::from_raw_parts(
&info as *const _ as *const u8,
std::mem::size_of::<WSAPROTOCOL_INFOW>(),
)
};
Ok(info.to_vec())
}
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt::init();
let socket_addr = args().nth(1).expect("argument: QMP socket path");
let stream = UnixStream::connect(socket_addr).expect("failed to connect to socket");
let pid = unix_stream_get_peer_pid(&stream).expect("failed to get peer PID");
let qmp_path = args().nth(1).expect("argument: QMP socket path");
let display = Display::new_qmp(qmp_path).await?;
let mut qmp = Qmp::from_stream(&stream);
let info = qmp.handshake().expect("handshake failed");
println!("QMP info: {:#?}", info);
let (p0, p1) = UnixStream::pair().expect("failed to make a socketpair");
let info =
duplicate_socket(pid, SOCKET(p0.as_raw_socket() as _)).expect("Failed to pass socket");
let info = base64::encode(info);
qmp.execute(&get_win32_socket {
info,
fdname: "fdname".into(),
})
.unwrap();
qmp.execute(&qmp::add_client {
skipauth: None,
tls: None,
protocol: "@dbus-display".into(),
fdname: "fdname".into(),
})
.unwrap();
let conn = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
.await
.unwrap();
let display = Display::new(&conn, Option::<String>::None).await.unwrap();
loop {
qmp.nop().unwrap();
for event in qmp.events() {
println!("Got event: {:#?}", event);
}
sleep(Duration::from_secs(1));
}
}

View File

@ -16,6 +16,11 @@ use zvariant::OwnedObjectPath;
use crate::UsbRedir;
use crate::{Audio, Chardev, Clipboard, Error, Result, VMProxy};
#[cfg(all(unix, feature = "qmp"))]
use std::os::unix::net::UnixStream;
#[cfg(all(windows, feature = "qmp"))]
use uds_windows::UnixStream;
struct Inner<'d> {
proxy: fdo::ObjectManagerProxy<'d>,
conn: Connection,
@ -104,6 +109,63 @@ impl<'d> Display<'d> {
})
}
pub fn connection(&self) -> &Connection {
&self.inner.conn
}
#[cfg(all(windows, feature = "qmp"))]
pub async fn new_qmp<P: AsRef<std::path::Path>>(path: P) -> Result<Display<'d>> {
#![allow(non_snake_case, non_camel_case_types)]
use crate::win32::{duplicate_socket, unix_stream_get_peer_pid};
use qapi::{qmp, Qmp};
use serde::{Deserialize, Serialize};
use std::os::windows::io::AsRawSocket;
use windows::Win32::Networking::WinSock::SOCKET;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct get_win32_socket {
#[serde(rename = "info")]
pub info: ::std::string::String,
#[serde(rename = "fdname")]
pub fdname: ::std::string::String,
}
impl qmp::QmpCommand for get_win32_socket {}
impl qapi::Command for get_win32_socket {
const NAME: &'static str = "get-win32-socket";
const ALLOW_OOB: bool = false;
type Ok = qapi::Empty;
}
let stream = UnixStream::connect(path)?;
let pid = unix_stream_get_peer_pid(&stream)?;
let mut qmp = Qmp::from_stream(&stream);
let _info = qmp.handshake()?;
let (p0, p1) = UnixStream::pair()?;
let info = duplicate_socket(pid, SOCKET(p0.as_raw_socket() as _))?;
let info = base64::encode(info);
qmp.execute(&get_win32_socket {
info,
fdname: "fdname".into(),
})?;
qmp.execute(&qmp::add_client {
skipauth: None,
tls: None,
protocol: "@dbus-display".into(),
fdname: "fdname".into(),
})?;
let conn = zbus::ConnectionBuilder::unix_stream(p1)
.p2p()
.build()
.await?;
Self::new(&conn, Option::<String>::None).await
}
pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> {
Ok(self.inner.proxy.receive_owner_changed().await?)
}

View File

@ -1,6 +1,9 @@
use std::{convert::Infallible, error, fmt, io};
use usbredirhost::rusb;
use std::{convert::Infallible, error, fmt, io};
#[cfg(feature = "qmp")]
use qapi::ExecuteError;
#[derive(Debug)]
pub enum Error {
@ -9,6 +12,8 @@ pub enum Error {
Rusb(rusb::Error),
Usbredir(usbredirhost::Error),
Failed(String),
#[cfg(feature = "qmp")]
Qmp(ExecuteError),
}
impl fmt::Display for Error {
@ -19,6 +24,8 @@ impl fmt::Display for Error {
Error::Rusb(e) => write!(f, "rusb error: {}", e),
Error::Usbredir(e) => write!(f, "usbredir error: {}", e),
Error::Failed(e) => write!(f, "{}", e),
#[cfg(feature = "qmp")]
Error::Qmp(e) => write!(f, "qmp error: {}", e),
}
}
}
@ -31,6 +38,8 @@ impl error::Error for Error {
Error::Rusb(e) => Some(e),
Error::Usbredir(e) => Some(e),
Error::Failed(_) => None,
#[cfg(feature = "qmp")]
Error::Qmp(e) => Some(e),
}
}
}
@ -77,6 +86,13 @@ impl From<usbredirhost::Error> for Error {
}
}
#[cfg(feature = "qmp")]
impl From<ExecuteError> for Error {
fn from(e: ExecuteError) -> Self {
Error::Qmp(e)
}
}
impl From<Infallible> for Error {
fn from(_: Infallible) -> Self {
unreachable!()

View File

@ -1,8 +1,12 @@
use std::io;
use std::os::windows::io::AsRawSocket;
use windows::Win32::Foundation::{CloseHandle, HANDLE};
use windows::Win32::Networking::WinSock::{WSADuplicateSocketW, SOCKET, WSAPROTOCOL_INFOW};
use windows::Win32::System::Threading::PROCESS_ACCESS_RIGHTS;
#[cfg(feature = "qmp")]
use uds_windows::UnixStream;
pub type Fd = Vec<u8>;
// A process handle
@ -14,15 +18,30 @@ impl Drop for ProcessHandle {
}
}
pub(crate) fn duplicate_socket(pid: u32, sock: SOCKET) -> crate::Result<Vec<u8>> {
let mut info = unsafe { std::mem::zeroed() };
if unsafe { WSADuplicateSocketW(sock, pid, &mut info) } != 0 {
return Err(wsa_last_err().into());
}
let info = unsafe {
std::slice::from_raw_parts(
&info as *const _ as *const u8,
std::mem::size_of::<WSAPROTOCOL_INFOW>(),
)
};
Ok(info.to_vec())
}
impl ProcessHandle {
// Open the process associated with the process_id (if None, the current process)
pub fn open(
process_id: Option<u32>,
desired_access: PROCESS_ACCESS_RIGHTS,
) -> Result<Self, io::Error> {
use windows::Win32::System::Threading::{GetCurrentProcess, OpenProcess};
use windows::Win32::System::Threading::{GetCurrentProcess, OpenProcess, PROCESS_QUERY_INFORMATION};
let process = if let Some(process_id) = process_id {
let desired_access = desired_access | PROCESS_QUERY_INFORMATION;
unsafe { OpenProcess(desired_access, false, process_id)? }
} else {
unsafe { GetCurrentProcess() }
@ -31,24 +50,22 @@ impl ProcessHandle {
Ok(Self(process))
}
pub fn process_id(&self) -> u32 {
pub fn process_id(&self) -> crate::Result<u32> {
use windows::Win32::System::Threading::GetProcessId;
use windows::Win32::Foundation::GetLastError;
unsafe { GetProcessId(self.0) }
unsafe {
let pid = GetProcessId(self.0);
if pid == 0 {
Err(io::Error::from_raw_os_error(GetLastError().0 as _).into())
} else {
Ok(pid)
}
}
}
pub fn duplicate_socket(&self, sock: SOCKET) -> crate::Result<Fd> {
let mut info = unsafe { std::mem::zeroed() };
if unsafe { WSADuplicateSocketW(sock, self.process_id(), &mut info) } != 0 {
return Err(crate::Error::Io(wsa_last_err()));
}
let info = unsafe {
std::slice::from_raw_parts(
&info as *const _ as *const u8,
std::mem::size_of::<WSAPROTOCOL_INFOW>(),
)
};
Ok(info.to_vec())
duplicate_socket(self.process_id()?, sock)
}
}
@ -58,3 +75,40 @@ pub(crate) fn wsa_last_err() -> io::Error {
let err = unsafe { WSAGetLastError() };
io::Error::from_raw_os_error(err.0)
}
// Get the process ID of the connected peer
#[cfg(feature = "qmp")]
pub(crate) fn unix_stream_get_peer_pid(stream: &UnixStream) -> Result<u32, std::io::Error> {
use windows::Win32::Networking::WinSock::{WSAIoctl, IOC_OUT, IOC_VENDOR, SOCKET_ERROR};
macro_rules! _WSAIOR {
($x:expr, $y:expr) => {
IOC_OUT | $x | $y
};
}
let socket = stream.as_raw_socket();
const SIO_AF_UNIX_GETPEERPID: u32 = _WSAIOR!(IOC_VENDOR, 256);
let mut ret = 0 as u32;
let mut bytes = 0;
let r = unsafe {
WSAIoctl(
SOCKET(socket as _),
SIO_AF_UNIX_GETPEERPID,
0 as *mut _,
0,
&mut ret as *mut _ as *mut _,
std::mem::size_of_val(&ret) as u32,
&mut bytes,
0 as *mut _,
None,
)
};
if r == SOCKET_ERROR {
return Err(wsa_last_err());
}
Ok(ret)
}

View File

@ -5,6 +5,8 @@ authors = ["Marc-André Lureau <marcandre.lureau@redhat.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
qmp = ["qemu-display/qmp"]
[dependencies]
log = "0.4"

View File

@ -29,10 +29,53 @@ struct App {
struct AppOptions {
vm_name: Option<String>,
address: Option<String>,
#[cfg(feature = "qmp")]
qmp: Option<String>,
list: bool,
wait: bool,
}
async fn display_from_opt(opt: Arc<RefCell<AppOptions>>) -> Option<Display<'static>> {
#[cfg(feature = "qmp")]
if let Some(qmp_addr) = &opt.borrow().qmp {
return Some(Display::new_qmp(qmp_addr).await.unwrap());
}
let builder = if let Some(addr) = &opt.borrow().address {
zbus::ConnectionBuilder::address(addr.as_str())
} else {
zbus::ConnectionBuilder::session()
};
let conn = builder
.unwrap()
.internal_executor(false)
.build()
.await
.expect("Failed to connect to DBus");
let conn_clone = conn.clone();
MainContext::default().spawn_local(async move {
loop {
conn_clone.executor().tick().await;
}
});
if opt.borrow().list {
let list = Display::by_name(&conn).await.unwrap();
for (name, dest) in list {
println!("{} (at {})", name, dest);
}
return None;
}
let dest = {
let name = opt.borrow().vm_name.clone();
let wait = opt.borrow().wait;
Display::lookup(&conn, wait, name.as_deref()).await.unwrap()
};
Some(Display::new(&conn, dest).await.unwrap())
}
impl App {
fn new() -> Self {
let app = gtk::Application::new(Some("org.qemu.rdw.demo"), ApplicationFlags::NON_UNIQUE);
@ -52,6 +95,15 @@ impl App {
"D-Bus bus address",
None,
);
#[cfg(feature = "qmp")]
app.add_main_option(
"qmp",
glib::Char(b'q' as _),
glib::OptionFlags::NONE,
glib::OptionArg::String,
"QMP monitor address",
None,
);
app.add_main_option(
"list",
glib::Char(0),
@ -88,6 +140,10 @@ impl App {
if let Some(arg) = opt.lookup_value("address", None) {
app_opt.address = arg.get::<String>();
}
#[cfg(feature = "qmp")]
if let Some(arg) = opt.lookup_value("qmp", None) {
app_opt.qmp = arg.get::<String>();
}
if opt.lookup_value("list", None).is_some() {
app_opt.list = true;
}
@ -124,42 +180,13 @@ impl App {
let app_clone = app_clone.clone();
let opt_clone = opt.clone();
MainContext::default().spawn_local(async move {
let builder = if let Some(addr) = &opt_clone.borrow().address {
zbus::ConnectionBuilder::address(addr.as_str())
} else {
zbus::ConnectionBuilder::session()
};
let conn = builder
.unwrap()
.internal_executor(false)
.build()
.await
.expect("Failed to connect to DBus");
let conn_clone = conn.clone();
MainContext::default().spawn_local(async move {
loop {
conn_clone.executor().tick().await;
let display = match display_from_opt(opt_clone).await {
Some(d) => d,
None => {
app_clone.inner.app.quit();
return;
}
});
if opt_clone.borrow().list {
let list = Display::by_name(&conn).await.unwrap();
for (name, dest) in list {
println!("{} (at {})", name, dest);
}
app_clone.inner.app.quit();
return;
}
let dest = {
let name = opt_clone.borrow().vm_name.clone();
let wait = opt_clone.borrow().wait;
Display::lookup(&conn, wait, name.as_deref()).await.unwrap()
};
let display = Display::new(&conn, dest).await.unwrap();
let disp = display.clone();
MainContext::default().spawn_local(async move {
let mut changed = disp.receive_owner_changed().await.unwrap();
@ -168,7 +195,7 @@ impl App {
}
});
let console = Console::new(&conn, 0)
let console = Console::new(display.connection(), 0)
.await
.expect("Failed to get the QEMU console");
let rdw = display::Display::new(console);
@ -200,7 +227,7 @@ impl App {
}
}
if let Ok(c) = Chardev::new(&conn, "qmp").await {
if let Ok(c) = Chardev::new(display.connection(), "qmp").await {
use std::io::{prelude::*, BufReader};
#[cfg(unix)]
use std::os::unix::net::UnixStream;