diff --git a/qemu-display/src/usbredir.rs b/qemu-display/src/usbredir.rs index 81accd0..d900b66 100644 --- a/qemu-display/src/usbredir.rs +++ b/qemu-display/src/usbredir.rs @@ -25,6 +25,7 @@ struct InnerHandler { stream_thread: JoinHandle<()>, ctxt: rusb::Context, ctxt_thread: Option<JoinHandle<()>>, + event: (UnixStream, UnixStream), quit: bool, } @@ -36,7 +37,7 @@ struct Handler { impl DeviceHandler for Handler { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { let mut inner = self.inner.lock().unwrap(); - let read = match fd_poll_readable(inner.stream.as_raw_fd(), false) { + let read = match fd_poll_readable(inner.stream.as_raw_fd(), None) { Ok(true) => { let read = inner.stream.read(buf); if let Ok(0) = read { @@ -99,17 +100,17 @@ impl Handler { }; let (stream, peer) = UnixStream::pair()?; - chardev.proxy.register(dbg!(peer.as_raw_fd()).into()).await?; + chardev.proxy.register(peer.as_raw_fd().into()).await?; let c = ctxt.clone(); let stream_fd = stream.as_raw_fd(); - dbg!(stream_fd); // really annoying libusb/usbredir APIs... + let event = UnixStream::pair()?; + let event_fd = event.1.as_raw_fd(); let stream_thread = std::thread::spawn(move || loop { - let ret = fd_poll_readable(stream_fd, true); + let ret = fd_poll_readable(stream_fd, Some(event_fd)); c.interrupt_handle_events(); if ret.is_err() { - dbg!(); break; } }); @@ -119,6 +120,7 @@ impl Handler { device_fd, stream, stream_thread, + event, quit: false, ctxt: ctxt.clone(), ctxt_thread: Default::default(), @@ -130,10 +132,9 @@ impl Handler { let inner = handler.inner.clone(); let ctxt_thread = std::thread::spawn(move || loop { if inner.lock().unwrap().quit { - dbg!(); break; } - if let Ok(true) = fd_poll_readable(stream_fd, false) { + if let Ok(true) = fd_poll_readable(stream_fd, None) { redirdev.read_peer().unwrap(); } if redirdev.has_data_to_write() > 0 { @@ -152,18 +153,13 @@ impl Handler { } } -impl Drop for InnerHandler { - fn drop(&mut self) { - //FIXME: for some reason close stream doesn't HUP qemu ?? - dbg!() - } -} - impl Drop for Handler { fn drop(&mut self) { let mut inner = self.inner.lock().unwrap(); inner.quit = true; inner.ctxt.interrupt_handle_events(); + // stream will be dropped and stream_thread will kick context_thread + inner.event.0.write(&[0]).unwrap(); } } @@ -239,22 +235,30 @@ impl UsbRedir { } } -fn fd_poll_readable(fd: RawFd, wait: bool) -> std::io::Result<bool> { - let mut fds = [libc::pollfd { +fn fd_poll_readable(fd: RawFd, wait: Option<RawFd>) -> std::io::Result<bool> { + let mut fds = vec![libc::pollfd { fd, events: libc::POLLIN|libc::POLLHUP, revents: 0, }]; - let ret = unsafe { libc::poll(fds.as_mut_ptr(), 1, if wait { -1 } else { 0 }) }; - if ret > 0 { - if fds[0].revents & libc::POLLHUP != 0 { - Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "hup")) - } else { - Ok(fds[0].revents & libc::POLLIN != 0) - } + if let Some(wait) = wait { + fds.push(libc::pollfd { + fd: wait, + events: libc::POLLIN|libc::POLLHUP, + revents: 0, + }); + } + let ret = unsafe { libc::poll(fds.as_mut_ptr(), + fds.len() as _, + if wait.is_some() { -1 } else { 0 }) }; + if ret < 0 { + Err(std::io::Error::last_os_error()) } else if ret == 0 { Ok(false) + } else if fds[0].revents & libc::POLLHUP != 0 || + (wait.is_some() && fds[1].revents & libc::POLLIN != 0) { + Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "hup")) } else { - Err(std::io::Error::last_os_error()) + Ok(fds[0].revents & libc::POLLIN != 0) } }