diff --git a/vm-virtio/src/queue.rs b/vm-virtio/src/queue.rs index 3325e4c62..7037ebfda 100644 --- a/vm-virtio/src/queue.rs +++ b/vm-virtio/src/queue.rs @@ -41,11 +41,13 @@ unsafe impl ByteValued for Descriptor {} /// A virtio descriptor chain. pub struct DescriptorChain<'a> { - mem: &'a GuestMemoryMmap, desc_table: GuestAddress, queue_size: u16, ttl: u16, // used to prevent infinite chain cycles + /// Reference to guest memory + pub mem: &'a GuestMemoryMmap, + /// Index into the descriptor table pub index: u16, diff --git a/vm-virtio/src/vsock/csm/connection.rs b/vm-virtio/src/vsock/csm/connection.rs new file mode 100644 index 000000000..a226fd1a9 --- /dev/null +++ b/vm-virtio/src/vsock/csm/connection.rs @@ -0,0 +1,633 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +// +/// The main job of `VsockConnection` is to forward data traffic, back and forth, between a +/// guest-side AF_VSOCK socket and a host-side generic `Read + Write + AsRawFd` stream, while +/// also managing its internal state. +/// To that end, `VsockConnection` implements: +/// - `VsockChannel` for: +/// - moving data from the host stream to a guest-provided RX buffer, via `recv_pkt()`; and +/// - moving data from a guest-provided TX buffer to the host stream, via `send_pkt()`; and +/// - updating its internal state, by absorbing control packets (anything other than +/// VSOCK_OP_RW). +/// - `VsockEpollListener` for getting notified about the availability of data or free buffer +/// space at the host stream. +/// +/// Note: there is a certain asymmetry to the RX and TX data flows: +/// - RX transfers do not need any data buffering, since data is read straight from the +/// host stream and into the guest-provided RX buffer; +/// - TX transfers may require some data to be buffered by `VsockConnection`, if the host +/// peer can't keep up with reading the data that we're writing. This is because, once +/// the guest driver provides some data in a virtio TX buffer, the vsock device must +/// consume it. If that data can't be forwarded straight to the host stream, we'll +/// have to store it in a buffer (and flush it at a later time). Vsock flow control +/// ensures that our TX buffer doesn't overflow. +/// +// The code in this file is best read with a fresh memory of the vsock protocol inner-workings. +// To help with that, here is a +// +// Short primer on the vsock protocol +// ---------------------------------- +// +// 1. Establishing a connection +// A vsock connection is considered established after a two-way handshake: +// - the initiating peer sends a connection request packet (`hdr.op` == VSOCK_OP_REQUEST); +// then +// - the listening peer sends back a connection response packet (`hdr.op` == +// VSOCK_OP_RESPONSE). +// +// 2. Terminating a connection +// When a peer wants to shut down an established connection, it sends a VSOCK_OP_SHUTDOWN +// packet. Two header flags are used with VSOCK_OP_SHUTDOWN, indicating the sender's +// intention: +// - VSOCK_FLAGS_SHUTDOWN_RCV: the sender will receive no more data for this connection; and +// - VSOCK_FLAGS_SHUTDOWN_SEND: the sender will send no more data for this connection. +// After a shutdown packet, the receiving peer will have some protocol-undefined time to +// flush its buffers, and then forcefully terminate the connection by sending back an RST +// packet. If the shutdown-initiating peer doesn't receive this RST packet during a timeout +// period, it will send one itself, thus terminating the connection. +// Note: a peer can send more than one VSOCK_OP_SHUTDOWN packets. However, read/write +// indications cannot be undone. E.g. once a "no-more-sending" promise was made, it +// cannot be taken back. That is, `hdr.flags` will be ORed between subsequent +// VSOCK_OP_SHUTDOWN packets. +// +// 3. Flow control +// Before sending a data packet (VSOCK_OP_RW), the sender must make sure that the receiver +// has enough free buffer space to store that data. If this condition is not respected, the +// receiving peer's behaviour is undefined. In this implementation, we forcefully terminate +// the connection by sending back a VSOCK_OP_RST packet. +// Note: all buffer space information is computed and stored on a per-connection basis. +// Peers keep each other informed about the free buffer space they have by filling in two +// packet header members with each packet they send: +// - `hdr.buf_alloc`: the total buffer space the peer has allocated for receiving data; and +// - `hdr.fwd_cnt`: the total number of bytes the peer has successfully flushed out of its +// buffer. +// One can figure out how much space its peer has available in its buffer by inspecting the +// difference between how much it has sent to the peer and how much the peer has flushed out +// (i.e. "forwarded", in the vsock spec terminology): +// `peer_free = peer_buf_alloc - (total_bytes_sent_to_peer - peer_fwd_cnt)`. +// Note: the above requires that peers constantly keep each other informed on their buffer +// space situation. However, since there are no receipt acknowledgement packets +// defined for the vsock protocol, packet flow can often be unidirectional (just one +// peer sending data to another), so the sender's information about the receiver's +// buffer space can get quickly outdated. The vsock protocol defines two solutions to +// this problem: +// 1. The sender can explicitly ask for a buffer space (i.e. "credit") update from its +// peer, via a VSOCK_OP_CREDIT_REQUEST packet, to which it will get a +// VSOCK_OP_CREDIT_UPDATE response (or any response will do, really, since credit +// information must be included in any packet); +// 2. The receiver can be proactive, and send VSOCK_OP_CREDIT_UPDATE packet, whenever +// it thinks its peer's information is out of date. +// Our implementation uses the proactive approach. +// +use std::io::{ErrorKind, Read, Write}; +use std::num::Wrapping; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::time::{Duration, Instant}; + +use super::super::defs::uapi; +use super::super::packet::VsockPacket; +use super::super::{Result as VsockResult, VsockChannel, VsockEpollListener, VsockError}; +use super::defs; +use super::txbuf::TxBuf; +use super::{ConnState, Error, PendingRx, PendingRxSet, Result}; + +/// A self-managing connection object, that handles communication between a guest-side AF_VSOCK +/// socket and a host-side `Read + Write + AsRawFd` stream. +/// +pub struct VsockConnection { + /// The current connection state. + state: ConnState, + /// The local CID. Most of the time this will be the constant `2` (the vsock host CID). + local_cid: u64, + /// The peer (guest) CID. + peer_cid: u64, + /// The local (host) port. + local_port: u32, + /// The peer (guest) port. + peer_port: u32, + /// The (connected) host-side stream. + stream: S, + /// The TX buffer for this connection. + tx_buf: TxBuf, + /// Total number of bytes that have been successfully written to `self.stream`, either + /// directly, or flushed from `self.tx_buf`. + fwd_cnt: Wrapping, + /// The amount of buffer space that the peer (guest) has allocated for this connection. + peer_buf_alloc: u32, + /// The total number of bytes that the peer has forwarded away. + peer_fwd_cnt: Wrapping, + /// The total number of bytes sent to the peer (guest vsock driver) + rx_cnt: Wrapping, + /// Our `self.fwd_cnt`, as last sent to the peer. This is used to provide proactive credit + /// updates, and let the peer know it's OK to send more data. + last_fwd_cnt_to_peer: Wrapping, + /// The set of pending RX packet indications that `recv_pkt()` will use to fill in a + /// packet for the peer (guest). + pending_rx: PendingRxSet, + /// Instant when this connection should be scheduled for immediate termination, due to some + /// timeout condition having been fulfilled. + expiry: Option, +} + +impl VsockChannel for VsockConnection +where + S: Read + Write + AsRawFd, +{ + /// Fill in a vsock packet, to be delivered to our peer (the guest driver). + /// + /// As per the `VsockChannel` trait, this should only be called when there is data to be + /// fetched from the channel (i.e. `has_pending_rx()` is true). Otherwise, it will error + /// out with `VsockError::NoData`. + /// Pending RX indications are set by other mutable actions performed on the channel. For + /// instance, `send_pkt()` could set an Rst indication, if called with a VSOCK_OP_SHUTDOWN + /// packet, or `notify()` could set a Rw indication (a data packet can be fetched from the + /// channel), if data was ready to be read from the host stream. + /// + /// Returns: + /// - `Ok(())`: the packet has been successfully filled in and is ready for delivery; + /// - `Err(VsockError::NoData)`: there was no data available with which to fill in the + /// packet; + /// - `Err(VsockError::PktBufMissing)`: the packet would've been filled in with data, but + /// it is missing the data buffer. + /// + fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> { + // Perform some generic initialization that is the same for any packet operation (e.g. + // source, destination, credit, etc). + self.init_pkt(pkt); + + // If forceful termination is pending, there's no point in checking for anything else. + // It's dead, Jim. + if self.pending_rx.remove(PendingRx::Rst) { + pkt.set_op(uapi::VSOCK_OP_RST); + return Ok(()); + } + + // Next up: if we're due a connection confirmation, that's all we need to know to fill + // in this packet. + if self.pending_rx.remove(PendingRx::Response) { + self.state = ConnState::Established; + pkt.set_op(uapi::VSOCK_OP_RESPONSE); + return Ok(()); + } + + // Same thing goes for locally-initiated connections that need to yield a connection + // request. + if self.pending_rx.remove(PendingRx::Request) { + self.expiry = + Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS)); + pkt.set_op(uapi::VSOCK_OP_REQUEST); + return Ok(()); + } + + // A credit update is basically a no-op, so we should only waste a perfectly fine RX + // buffer on it if we really have nothing else to say. + if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() { + pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE); + self.last_fwd_cnt_to_peer = self.fwd_cnt; + return Ok(()); + } + + // Alright, if we got to here, we need to cough up a data packet. We've already checked + // for all other pending RX indications. + if !self.pending_rx.remove(PendingRx::Rw) { + return Err(VsockError::NoData); + } + + match self.state { + // A data packet is only valid for established connections, and connections for + // which our peer has initiated a graceful shutdown, but can still receive data. + ConnState::Established | ConnState::PeerClosed(false, _) => (), + _ => { + // Any other connection state is invalid at this point, and we need to kill it + // with fire. + pkt.set_op(uapi::VSOCK_OP_RST); + return Ok(()); + } + } + + // Oh wait, before we start bringing in the big data, can our peer handle receiving so + // much bytey goodness? + if self.need_credit_update_from_peer() { + self.last_fwd_cnt_to_peer = self.fwd_cnt; + pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST); + return Ok(()); + } + + let buf = pkt.buf_mut().ok_or(VsockError::PktBufMissing)?; + + // The maximum amount of data we can read in is limited by both the RX buffer size and + // the peer available buffer space. + let max_len = std::cmp::min(buf.len(), self.peer_avail_credit()); + + // Read data from the stream straight to the RX buffer, for maximum throughput. + match self.stream.read(&mut buf[..max_len]) { + Ok(read_cnt) => { + if read_cnt == 0 { + // A 0-length read means the host stream was closed down. In that case, + // we'll ask our peer to shut down the connection. We can neither send nor + // receive any more data. + self.state = ConnState::LocalClosed; + self.expiry = Some( + Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS), + ); + pkt.set_op(uapi::VSOCK_OP_SHUTDOWN) + .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV) + .set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND); + } else { + // On a successful data read, we fill in the packet with the RW op, and + // length of the read data. + pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32); + } + } + Err(err) => { + // We are not expecting any errors when reading from the underlying stream. If + // any show up, we'll immediately kill this connection. + error!( + "vsock: error reading from backing stream: lp={}, pp={}, err={:?}", + self.local_port, self.peer_port, err + ); + pkt.set_op(uapi::VSOCK_OP_RST); + } + }; + + self.rx_cnt += Wrapping(pkt.len()); + self.last_fwd_cnt_to_peer = self.fwd_cnt; + + Ok(()) + } + + /// Deliver a guest-generated packet to this connection. + /// + /// This forwards the data in RW packets to the host stream, and absorbs control packets, + /// using them to manage the internal connection state. + /// + /// Returns: + /// always `Ok(())`: the packet has been consumed; + /// + fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> { + // Update the peer credit information. + self.peer_buf_alloc = pkt.buf_alloc(); + self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt()); + + match self.state { + // Most frequent case: this is an established connection that needs to forward some + // data to the host stream. Also works for a connection that has begun shutting + // down, but the peer still has some data to send. + ConnState::Established | ConnState::PeerClosed(_, false) + if pkt.op() == uapi::VSOCK_OP_RW => + { + if pkt.buf().is_none() { + info!( + "vsock: dropping empty data packet from guest (lp={}, pp={}", + self.local_port, self.peer_port + ); + return Ok(()); + } + + // Unwrapping here is safe, since we just checked `pkt.buf()` above. + let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)]; + if let Err(err) = self.send_bytes(buf_slice) { + // If we can't write to the host stream, that's an unrecoverable error, so + // we'll terminate this connection. + warn!( + "vsock: error writing to local stream (lp={}, pp={}): {:?}", + self.local_port, self.peer_port, err + ); + self.kill(); + return Ok(()); + } + + // We might've just consumed some data. If that's the case, we might need to + // update the peer on our buffer space situation, so that it can keep sending + // data packets our way. + if self.peer_needs_credit_update() { + self.pending_rx.insert(PendingRx::CreditUpdate); + } + } + + // Next up: receiving a response / confirmation for a host-initiated connection. + // We'll move to an Established state, and pass on the good news through the host + // stream. + ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => { + self.expiry = None; + self.state = ConnState::Established; + } + + // The peer wants to shut down an established connection. If they have nothing + // more to send nor receive, and we don't have to wait to drain our TX buffer, we + // can schedule an RST packet (to terminate the connection on the next recv call). + // Otherwise, we'll arm the kill timer. + ConnState::Established if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => { + let recv_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0; + let send_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0; + self.state = ConnState::PeerClosed(recv_off, send_off); + if recv_off && send_off { + if self.tx_buf.is_empty() { + self.pending_rx.insert(PendingRx::Rst); + } else { + self.expiry = Some( + Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS), + ); + } + } + } + + // The peer wants to update a shutdown request, with more receive/send indications. + // The same logic as above applies. + ConnState::PeerClosed(ref mut recv_off, ref mut send_off) + if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => + { + *recv_off = *recv_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0); + *send_off = *send_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0); + if *recv_off && *send_off && self.tx_buf.is_empty() { + self.pending_rx.insert(PendingRx::Rst); + } + } + + // A credit update from our peer is valid only in a state which allows data + // transfer towards the peer. + ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(false, _) + if pkt.op() == uapi::VSOCK_OP_CREDIT_UPDATE => + { + // Nothing to do here; we've already updated peer credit. + } + + // A credit request from our peer is valid only in a state which allows data + // transfer from the peer. We'll respond with a credit update packet. + ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(_, false) + if pkt.op() == uapi::VSOCK_OP_CREDIT_REQUEST => + { + self.pending_rx.insert(PendingRx::CreditUpdate); + } + + _ => { + debug!( + "vsock: dropping invalid TX pkt for connection: state={:?}, pkt.hdr={:?}", + self.state, + pkt.hdr() + ); + } + }; + + Ok(()) + } + + /// Check if the connection has any pending packet addressed to the peer. + /// + fn has_pending_rx(&self) -> bool { + !self.pending_rx.is_empty() + } +} + +impl VsockEpollListener for VsockConnection +where + S: Read + Write + AsRawFd, +{ + /// Get the file descriptor that this connection wants polled. + /// + /// The connection is interested in being notified about EPOLLIN / EPOLLOUT events on the + /// host stream. + /// + fn get_polled_fd(&self) -> RawFd { + self.stream.as_raw_fd() + } + + /// Get the event set that this connection is interested in. + /// + /// A connection will want to be notified when: + /// - data is available to be read from the host stream, so that it can store an RW pending + /// RX indication; and + /// - data can be written to the host stream, and the TX buffer needs to be flushed. + /// + fn get_polled_evset(&self) -> epoll::Events { + let mut evset = epoll::Events::empty(); + if !self.tx_buf.is_empty() { + // There's data waiting in the TX buffer, so we are interested in being notified + // when writing to the host stream wouldn't block. + evset.insert(epoll::Events::EPOLLOUT); + } + // We're generally interested in being notified when data can be read from the host + // stream, unless we're in a state which doesn't allow moving data from host to guest. + match self.state { + ConnState::Killed | ConnState::LocalClosed | ConnState::PeerClosed(true, _) => (), + _ if self.need_credit_update_from_peer() => (), + _ => evset.insert(epoll::Events::EPOLLIN), + } + evset + } + + /// Notify the connection about an event (or set of events) that it was interested in. + /// + fn notify(&mut self, evset: epoll::Events) { + if evset.contains(epoll::Events::EPOLLIN) { + // Data can be read from the host stream. Setting a Rw pending indication, so that + // the muxer will know to call `recv_pkt()` later. + self.pending_rx.insert(PendingRx::Rw); + } + + if evset.contains(epoll::Events::EPOLLOUT) { + // Data can be written to the host stream. Time to flush out the TX buffer. + // + if self.tx_buf.is_empty() { + info!("vsock: connection received unexpected EPOLLOUT event"); + return; + } + let flushed = self + .tx_buf + .flush_to(&mut self.stream) + .unwrap_or_else(|err| { + warn!( + "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", + self.local_port, self.peer_port, err + ); + self.kill(); + 0 + }); + self.fwd_cnt += Wrapping(flushed as u32); + + // If this connection was shutting down, but is waiting to drain the TX buffer + // before forceful termination, the wait might be over. + if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() { + self.pending_rx.insert(PendingRx::Rst); + } else if self.peer_needs_credit_update() { + // If we've freed up some more buffer space, we may need to let the peer know it + // can safely send more data our way. + self.pending_rx.insert(PendingRx::CreditUpdate); + } + } + } +} + +impl VsockConnection +where + S: Read + Write + AsRawFd, +{ + /// Create a new guest-initiated connection object. + /// + pub fn new_peer_init( + stream: S, + local_cid: u64, + peer_cid: u64, + local_port: u32, + peer_port: u32, + peer_buf_alloc: u32, + ) -> Self { + Self { + local_cid, + peer_cid, + local_port, + peer_port, + stream, + state: ConnState::PeerInit, + tx_buf: TxBuf::new(), + fwd_cnt: Wrapping(0), + peer_buf_alloc, + peer_fwd_cnt: Wrapping(0), + rx_cnt: Wrapping(0), + last_fwd_cnt_to_peer: Wrapping(0), + pending_rx: PendingRxSet::from(PendingRx::Response), + expiry: None, + } + } + + /// Create a new host-initiated connection object. + /// + pub fn new_local_init( + stream: S, + local_cid: u64, + peer_cid: u64, + local_port: u32, + peer_port: u32, + ) -> Self { + Self { + local_cid, + peer_cid, + local_port, + peer_port, + stream, + state: ConnState::LocalInit, + tx_buf: TxBuf::new(), + fwd_cnt: Wrapping(0), + peer_buf_alloc: 0, + peer_fwd_cnt: Wrapping(0), + rx_cnt: Wrapping(0), + last_fwd_cnt_to_peer: Wrapping(0), + pending_rx: PendingRxSet::from(PendingRx::Request), + expiry: None, + } + } + + /// Check if there is an expiry (kill) timer set for this connection, sometime in the + /// future. + /// + pub fn will_expire(&self) -> bool { + match self.expiry { + None => false, + Some(t) => t > Instant::now(), + } + } + + /// Check if this connection needs to be scheduled for forceful termination, due to its + /// kill timer having expired. + /// + pub fn has_expired(&self) -> bool { + match self.expiry { + None => false, + Some(t) => t <= Instant::now(), + } + } + + /// Get the kill timer value, if one is set. + /// + pub fn expiry(&self) -> Option { + self.expiry + } + + /// Schedule the connection to be forcefully terminated ASAP (i.e. the next time the + /// connection is asked to yield a packet, via `recv_pkt()`). + /// + pub fn kill(&mut self) { + self.state = ConnState::Killed; + self.pending_rx.insert(PendingRx::Rst); + } + + /// Send some raw data (a byte-slice) to the host stream. + /// + /// Raw data can either be sent straight to the host stream, or to our TX buffer, if the + /// former fails. + /// + fn send_bytes(&mut self, buf: &[u8]) -> Result<()> { + // If there is data in the TX buffer, that means we're already registered for EPOLLOUT + // events on the underlying stream. Therefore, there's no point in attempting a write + // at this point. `self.notify()` will get called when EPOLLOUT arrives, and it will + // attempt to drain the TX buffer then. + if !self.tx_buf.is_empty() { + return self.tx_buf.push(buf); + } + + // The TX buffer is empty, so we can try to write straight to the host stream. + let written = match self.stream.write(buf) { + Ok(cnt) => cnt, + Err(e) => { + // Absorb any would-block errors, since we can always try again later. + if e.kind() == ErrorKind::WouldBlock { + 0 + } else { + // We don't know how to handle any other write error, so we'll send it up + // the call chain. + return Err(Error::StreamWrite(e)); + } + } + }; + // Move the "forwarded bytes" counter ahead by how much we were able to send out. + self.fwd_cnt += Wrapping(written as u32); + + // If we couldn't write the whole slice, we'll need to push the remaining data to our + // buffer. + if written < buf.len() { + self.tx_buf.push(&buf[written..])?; + } + + Ok(()) + } + + /// Check if the credit information the peer has last received from us is outdated. + /// + fn peer_needs_credit_update(&self) -> bool { + (self.fwd_cnt - self.last_fwd_cnt_to_peer).0 as usize >= defs::CONN_CREDIT_UPDATE_THRESHOLD + } + + /// Check if we need to ask the peer for a credit update before sending any more data its + /// way. + /// + fn need_credit_update_from_peer(&self) -> bool { + self.peer_avail_credit() == 0 + } + + /// Get the maximum number of bytes that we can send to our peer, without overflowing its + /// buffer. + /// + fn peer_avail_credit(&self) -> usize { + (Wrapping(self.peer_buf_alloc as u32) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize + } + + /// Prepare a packet header for transmission to our peer. + /// + fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket { + // Make sure the header is zeroed-out first. + // This looks sub-optimal, but it is actually optimized-out in the compiled code to be + // faster than a memset(). + for b in pkt.hdr_mut() { + *b = 0; + } + + pkt.set_src_cid(self.local_cid) + .set_dst_cid(self.peer_cid) + .set_src_port(self.local_port) + .set_dst_port(self.peer_port) + .set_type(uapi::VSOCK_TYPE_STREAM) + .set_buf_alloc(defs::CONN_TX_BUF_SIZE as u32) + .set_fwd_cnt(self.fwd_cnt.0) + } +} diff --git a/vm-virtio/src/vsock/csm/mod.rs b/vm-virtio/src/vsock/csm/mod.rs new file mode 100644 index 000000000..a6bd7071a --- /dev/null +++ b/vm-virtio/src/vsock/csm/mod.rs @@ -0,0 +1,129 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +// +/// This module implements our vsock connection state machine. The heavy lifting is done by +/// `connection::VsockConnection`, while this file only defines some constants and helper structs. +/// +mod connection; +mod txbuf; + +pub use connection::VsockConnection; + +pub mod defs { + /// Vsock connection TX buffer capacity. + pub const CONN_TX_BUF_SIZE: usize = 64 * 1024; + + /// After the guest thinks it has filled our TX buffer up to this limit (in bytes), we'll send + /// them a credit update packet, to let them know we can handle more. + pub const CONN_CREDIT_UPDATE_THRESHOLD: usize = CONN_TX_BUF_SIZE - 4 * 4 * 1024; + + /// Connection request timeout, in millis. + pub const CONN_REQUEST_TIMEOUT_MS: u64 = 2000; + + /// Connection graceful shutdown timeout, in millis. + pub const CONN_SHUTDOWN_TIMEOUT_MS: u64 = 2000; +} + +#[derive(Debug)] +pub enum Error { + /// Attempted to push data to a full TX buffer. + TxBufFull, + /// An I/O error occurred, when attempting to flush the connection TX buffer. + TxBufFlush(std::io::Error), + /// An I/O error occurred, when attempting to write data to the host-side stream. + StreamWrite(std::io::Error), +} + +type Result = std::result::Result; + +/// A vsock connection state. +/// +#[derive(Debug, PartialEq)] +pub enum ConnState { + /// The connection has been initiated by the host end, but is yet to be confirmed by the guest. + LocalInit, + /// The connection has been initiated by the guest, but we are yet to confirm it, by sending + /// a response packet (VSOCK_OP_RESPONSE). + PeerInit, + /// The connection handshake has been performed successfully, and data can now be exchanged. + Established, + /// The host (AF_UNIX) socket was closed. + LocalClosed, + /// A VSOCK_OP_SHUTDOWN packet was received from the guest. The tuple represents the guest R/W + /// indication: (will_not_recv_anymore_data, will_not_send_anymore_data). + PeerClosed(bool, bool), + /// The connection is scheduled to be forcefully terminated as soon as possible. + Killed, +} + +/// An RX indication, used by `VsockConnection` to schedule future `recv_pkt()` responses. +/// For instance, after being notified that there is available data to be read from the host stream +/// (via `notify()`), the connection will store a `PendingRx::Rw` to be later inspected by +/// `recv_pkt()`. +/// +#[derive(Clone, Copy, PartialEq)] +enum PendingRx { + /// We need to yield a connection request packet (VSOCK_OP_REQUEST). + Request = 0, + /// We need to yield a connection response packet (VSOCK_OP_RESPONSE). + Response = 1, + /// We need to yield a forceful connection termination packet (VSOCK_OP_RST). + Rst = 2, + /// We need to yield a data packet (VSOCK_OP_RW), by reading from the AF_UNIX socket. + Rw = 3, + /// We need to yield a credit update packet (VSOCK_OP_CREDIT_UPDATE). + CreditUpdate = 4, +} +impl PendingRx { + /// Transform the enum value into a bitmask, that can be used for set operations. + /// + fn into_mask(self) -> u16 { + 1u16 << (self as u16) + } +} + +/// A set of RX indications (`PendingRx` items). +/// +struct PendingRxSet { + data: u16, +} + +impl PendingRxSet { + /// Insert an item into the set. + /// + fn insert(&mut self, it: PendingRx) { + self.data |= it.into_mask(); + } + + /// Remove an item from the set and return: + /// - true, if the item was in the set; or + /// - false, if the item wasn't in the set. + /// + fn remove(&mut self, it: PendingRx) -> bool { + let ret = self.contains(it); + self.data &= !it.into_mask(); + ret + } + + /// Check if an item is present in this set. + /// + fn contains(&self, it: PendingRx) -> bool { + self.data & it.into_mask() != 0 + } + + /// Check if the set is empty. + /// + fn is_empty(&self) -> bool { + self.data == 0 + } +} + +/// Create a set containing only one item. +/// +impl From for PendingRxSet { + fn from(it: PendingRx) -> Self { + Self { + data: it.into_mask(), + } + } +} diff --git a/vm-virtio/src/vsock/csm/txbuf.rs b/vm-virtio/src/vsock/csm/txbuf.rs new file mode 100644 index 000000000..080788c34 --- /dev/null +++ b/vm-virtio/src/vsock/csm/txbuf.rs @@ -0,0 +1,279 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +// + +use std::io::Write; +use std::mem; +use std::num::Wrapping; + +use super::defs; +use super::{Error, Result}; + +/// A simple ring-buffer implementation, used by vsock connections to buffer TX (guest -> host) +/// data. Memory for this buffer is allocated lazily, since buffering will only be needed when +/// the host can't read fast enough. +/// +pub struct TxBuf { + /// The actual u8 buffer - only allocated after the first push. + data: Option>, + /// Ring-buffer head offset - where new data is pushed to. + head: Wrapping, + /// Ring-buffer tail offset - where data is flushed from. + tail: Wrapping, +} + +impl TxBuf { + /// Total buffer size, in bytes. + /// + const SIZE: usize = defs::CONN_TX_BUF_SIZE; + + /// Ring-buffer constructor. + /// + pub fn new() -> Self { + Self { + data: None, + head: Wrapping(0), + tail: Wrapping(0), + } + } + + /// Get the used length of this buffer - number of bytes that have been pushed in, but not + /// yet flushed out. + /// + pub fn len(&self) -> usize { + (self.head - self.tail).0 as usize + } + + /// Push a byte slice onto the ring-buffer. + /// + /// Either the entire source slice will be pushed to the ring-buffer, or none of it, if + /// there isn't enough room, in which case `Err(Error::TxBufFull)` is returned. + /// + pub fn push(&mut self, src: &[u8]) -> Result<()> { + // Error out if there's no room to push the entire slice. + if self.len() + src.len() > Self::SIZE { + return Err(Error::TxBufFull); + } + + // We're using a closure here to return the boxed slice, instead of a value (i.e. + // `get_or_insert_with()` instead of `get_or_insert()`), because we only want the box + // created when `self.data` is None. If we were to use `get_or_insert(box)`, the box + // argument would always get evaluated (which implies a heap allocation), even though + // it would later be discarded (when `self.data.is_some()`). Apparently, clippy fails + // to see this, and insists on issuing some warning. + #[allow(clippy::redundant_closure)] + let data = self.data.get_or_insert_with(|| + // Using uninitialized memory here is quite safe, since we never read from any + // area of the buffer before writing to it. First we push, then we flush only + // what had been prviously pushed. + Box::new(unsafe {mem::uninitialized::<[u8; Self::SIZE]>()})); + + // Buffer head, as an offset into the data slice. + let head_ofs = self.head.0 as usize % Self::SIZE; + + // Pushing a slice to this buffer can take either one or two slice copies: - one copy, + // if the slice fits between `head_ofs` and `Self::SIZE`; or - two copies, if the + // ring-buffer head wraps around. + + // First copy length: we can only go from the head offset up to the total buffer size. + let len = std::cmp::min(Self::SIZE - head_ofs, src.len()); + data[head_ofs..(head_ofs + len)].copy_from_slice(&src[..len]); + + // If the slice didn't fit, the buffer head will wrap around, and pushing continues + // from the start of the buffer (`&self.data[0]`). + if len < src.len() { + data[..(src.len() - len)].copy_from_slice(&src[len..]); + } + + // Either way, we've just pushed exactly `src.len()` bytes, so that's the amount by + // which the (wrapping) buffer head needs to move forward. + self.head += Wrapping(src.len() as u32); + + Ok(()) + } + + /// Flush the contents of the ring-buffer to a writable stream. + /// + /// Return the number of bytes that have been transferred out of the ring-buffer and into + /// the writable stream. + /// + pub fn flush_to(&mut self, sink: &mut W) -> Result + where + W: Write, + { + // Nothing to do, if this buffer holds no data. + if self.is_empty() { + return Ok(0); + } + + // Buffer tail, as an offset into the buffer data slice. + let tail_ofs = self.tail.0 as usize % Self::SIZE; + + // Flushing the buffer can take either one or two writes: + // - one write, if the tail doesn't need to wrap around to reach the head; or + // - two writes, if the tail would wrap around: tail to slice end, then slice end to + // head. + + // First write length: the lesser of tail to slice end, or tail to head. + let len_to_write = std::cmp::min(Self::SIZE - tail_ofs, self.len()); + + // It's safe to unwrap here, since we've already checked if the buffer was empty. + let data = self.data.as_ref().unwrap(); + + // Issue the first write and absorb any `WouldBlock` error (we can just try again + // later). + let written = sink + .write(&data[tail_ofs..(tail_ofs + len_to_write)]) + .map_err(Error::TxBufFlush)?; + + // Move the buffer tail ahead by the amount (of bytes) we were able to flush out. + self.tail += Wrapping(written as u32); + + // If we weren't able to flush out as much as we tried, there's no point in attempting + // our second write. + if written < len_to_write { + return Ok(written); + } + + // Attempt our second write. This will return immediately if a second write isn't + // needed, since checking for an empty buffer is the first thing we do in this + // function. + Ok(written + self.flush_to(sink)?) + } + + /// Check if the buffer holds any data that hasn't yet been flushed out. + /// + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Error as IoError; + use std::io::Result as IoResult; + use std::io::{ErrorKind, Write}; + + struct TestSink { + data: Vec, + err: Option, + capacity: usize, + } + + impl TestSink { + const DEFAULT_CAPACITY: usize = 2 * TxBuf::SIZE; + fn new() -> Self { + Self { + data: Vec::with_capacity(Self::DEFAULT_CAPACITY), + err: None, + capacity: Self::DEFAULT_CAPACITY, + } + } + } + + impl Write for TestSink { + fn write(&mut self, src: &[u8]) -> IoResult { + if self.err.is_some() { + return Err(self.err.take().unwrap()); + } + let len_to_push = std::cmp::min(self.capacity - self.data.len(), src.len()); + self.data.extend_from_slice(&src[..len_to_push]); + Ok(len_to_push) + } + fn flush(&mut self) -> IoResult<()> { + Ok(()) + } + } + + impl TestSink { + fn clear(&mut self) { + self.data = Vec::with_capacity(self.capacity); + self.err = None; + } + fn set_err(&mut self, err: IoError) { + self.err = Some(err); + } + fn set_capacity(&mut self, capacity: usize) { + self.capacity = capacity; + if self.data.len() > self.capacity { + self.data.resize(self.capacity, 0); + } + } + } + + #[test] + fn test_push_nowrap() { + let mut txbuf = TxBuf::new(); + let mut sink = TestSink::new(); + assert!(txbuf.is_empty()); + + assert!(txbuf.data.is_none()); + txbuf.push(&[1, 2, 3, 4]).unwrap(); + txbuf.push(&[5, 6, 7, 8]).unwrap(); + txbuf.flush_to(&mut sink).unwrap(); + assert_eq!(sink.data, [1, 2, 3, 4, 5, 6, 7, 8]); + } + + #[test] + fn test_push_wrap() { + let mut txbuf = TxBuf::new(); + let mut sink = TestSink::new(); + let mut tmp: Vec = Vec::new(); + + tmp.resize(TxBuf::SIZE - 2, 0); + txbuf.push(tmp.as_slice()).unwrap(); + txbuf.flush_to(&mut sink).unwrap(); + sink.clear(); + + txbuf.push(&[1, 2, 3, 4]).unwrap(); + assert_eq!(txbuf.flush_to(&mut sink).unwrap(), 4); + assert_eq!(sink.data, [1, 2, 3, 4]); + } + + #[test] + fn test_push_error() { + let mut txbuf = TxBuf::new(); + let mut tmp = Vec::with_capacity(TxBuf::SIZE); + + tmp.resize(TxBuf::SIZE - 1, 0); + txbuf.push(tmp.as_slice()).unwrap(); + match txbuf.push(&[1, 2]) { + Err(Error::TxBufFull) => (), + other => panic!("Unexpected result: {:?}", other), + } + } + + #[test] + fn test_incomplete_flush() { + let mut txbuf = TxBuf::new(); + let mut sink = TestSink::new(); + + sink.set_capacity(2); + txbuf.push(&[1, 2, 3, 4]).unwrap(); + assert_eq!(txbuf.flush_to(&mut sink).unwrap(), 2); + assert_eq!(txbuf.len(), 2); + assert_eq!(sink.data, [1, 2]); + + sink.set_capacity(4); + assert_eq!(txbuf.flush_to(&mut sink).unwrap(), 2); + assert!(txbuf.is_empty()); + assert_eq!(sink.data, [1, 2, 3, 4]); + } + + #[test] + fn test_flush_error() { + const EACCESS: i32 = 13; + + let mut txbuf = TxBuf::new(); + let mut sink = TestSink::new(); + + txbuf.push(&[1, 2, 3, 4]).unwrap(); + let io_err = IoError::from_raw_os_error(EACCESS); + sink.set_err(io_err); + match txbuf.flush_to(&mut sink) { + Err(Error::TxBufFlush(ref err)) if err.kind() == ErrorKind::PermissionDenied => (), + other => panic!("Unexpected result: {:?}", other), + } + } +} diff --git a/vm-virtio/src/vsock/mod.rs b/vm-virtio/src/vsock/mod.rs index 4e8e24a05..0cabc1116 100644 --- a/vm-virtio/src/vsock/mod.rs +++ b/vm-virtio/src/vsock/mod.rs @@ -8,6 +8,123 @@ // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. +mod csm; mod device; +mod packet; pub use self::device::Vsock; + +use std::os::unix::io::RawFd; + +use packet::VsockPacket; + +mod defs { + + /// Max vsock packet data/buffer size. + pub const MAX_PKT_BUF_SIZE: usize = 64 * 1024; + + pub mod uapi { + + /// Vsock packet operation IDs. + /// Defined in `/include/uapi/linux/virtio_vsock.h`. + /// + /// Connection request. + pub const VSOCK_OP_REQUEST: u16 = 1; + /// Connection response. + pub const VSOCK_OP_RESPONSE: u16 = 2; + /// Connection reset. + pub const VSOCK_OP_RST: u16 = 3; + /// Connection clean shutdown. + pub const VSOCK_OP_SHUTDOWN: u16 = 4; + /// Connection data (read/write). + pub const VSOCK_OP_RW: u16 = 5; + /// Flow control credit update. + pub const VSOCK_OP_CREDIT_UPDATE: u16 = 6; + /// Flow control credit update request. + pub const VSOCK_OP_CREDIT_REQUEST: u16 = 7; + + /// Vsock packet flags. + /// Defined in `/include/uapi/linux/virtio_vsock.h`. + /// + /// Valid with a VSOCK_OP_SHUTDOWN packet: the packet sender will receive no more data. + pub const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1; + /// Valid with a VSOCK_OP_SHUTDOWN packet: the packet sender will send no more data. + pub const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2; + + /// Vsock packet type. + /// Defined in `/include/uapi/linux/virtio_vsock.h`. + /// + /// Stream / connection-oriented packet (the only currently valid type). + pub const VSOCK_TYPE_STREAM: u16 = 1; + + pub const VSOCK_HOST_CID: u64 = 2; + } +} + +#[derive(Debug)] +pub enum VsockError { + /// The vsock data/buffer virtio descriptor length is smaller than expected. + BufDescTooSmall, + /// The vsock data/buffer virtio descriptor is expected, but missing. + BufDescMissing, + /// Chained GuestMemory error. + GuestMemory, + /// Bounds check failed on guest memory pointer. + GuestMemoryBounds, + /// The vsock header descriptor length is too small. + HdrDescTooSmall(u32), + /// The vsock header `len` field holds an invalid value. + InvalidPktLen(u32), + /// A data fetch was attempted when no data was available. + NoData, + /// A data buffer was expected for the provided packet, but it is missing. + PktBufMissing, + /// Encountered an unexpected write-only virtio descriptor. + UnreadableDescriptor, + /// Encountered an unexpected read-only virtio descriptor. + UnwritableDescriptor, +} +type Result = std::result::Result; + +/// A passive, event-driven object, that needs to be notified whenever an epoll-able event occurs. +/// An event-polling control loop will use `get_polled_fd()` and `get_polled_evset()` to query +/// the listener for the file descriptor and the set of events it's interested in. When such an +/// event occurs, the control loop will route the event to the listener via `notify()`. +/// +pub trait VsockEpollListener { + /// Get the file descriptor the listener needs polled. + fn get_polled_fd(&self) -> RawFd; + + /// Get the set of events for which the listener wants to be notified. + fn get_polled_evset(&self) -> epoll::Events; + + /// Notify the listener that one ore more events have occurred. + fn notify(&mut self, evset: epoll::Events); +} + +/// Any channel that handles vsock packet traffic: sending and receiving packets. Since we're +/// implementing the device model here, our responsibility is to always process the sending of +/// packets (i.e. the TX queue). So, any locally generated data, addressed to the driver (e.g. +/// a connection response or RST), will have to be queued, until we get to processing the RX queue. +/// +/// Note: `recv_pkt()` and `send_pkt()` are named analogous to `Read::read()` and `Write::write()`, +/// respectively. I.e. +/// - `recv_pkt(&mut pkt)` will read data from the channel, and place it into `pkt`; and +/// - `send_pkt(&pkt)` will fetch data from `pkt`, and place it into the channel. +pub trait VsockChannel { + /// Read/receive an incoming packet from the channel. + fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()>; + + /// Write/send a packet through the channel. + fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()>; + + /// Checks whether there is pending incoming data inside the channel, meaning that a subsequent + /// call to `recv_pkt()` won't fail. + fn has_pending_rx(&self) -> bool; +} + +/// The vsock backend, which is basically an epoll-event-driven vsock channel, that needs to be +/// sendable through a mpsc channel (the latter due to how `vmm::EpollContext` works). +/// Currently, the only implementation we have is `crate::virtio::unix::muxer::VsockMuxer`, which +/// translates guest-side vsock connections to host-side Unix domain socket connections. +pub trait VsockBackend: VsockChannel + VsockEpollListener + Send {} diff --git a/vm-virtio/src/vsock/packet.rs b/vm-virtio/src/vsock/packet.rs new file mode 100644 index 000000000..cd28e3533 --- /dev/null +++ b/vm-virtio/src/vsock/packet.rs @@ -0,0 +1,341 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +// + +/// `VsockPacket` provides a thin wrapper over the buffers exchanged via virtio queues. +/// There are two components to a vsock packet, each using its own descriptor in a +/// virtio queue: +/// - the packet header; and +/// - the packet data/buffer. +/// There is a 1:1 relation between descriptor chains and packets: the first (chain head) holds +/// the header, and an optional second descriptor holds the data. The second descriptor is only +/// present for data packets (VSOCK_OP_RW). +/// +/// `VsockPacket` wraps these two buffers and provides direct access to the data stored +/// in guest memory. This is done to avoid unnecessarily copying data from guest memory +/// to temporary buffers, before passing it on to the vsock backend. +/// +use byteorder::{ByteOrder, LittleEndian}; + +use super::super::DescriptorChain; +use super::defs; +use super::{Result, VsockError}; + +// The vsock packet header is defined by the C struct: +// +// ```C +// struct virtio_vsock_hdr { +// le64 src_cid; +// le64 dst_cid; +// le32 src_port; +// le32 dst_port; +// le32 len; +// le16 type; +// le16 op; +// le32 flags; +// le32 buf_alloc; +// le32 fwd_cnt; +// }; +// ``` +// +// This structed will occupy the buffer pointed to by the head descriptor. We'll be accessing it +// as a byte slice. To that end, we define below the offsets for each field struct, as well as the +// packed struct size, as a bunch of `usize` consts. +// Note that these offsets are only used privately by the `VsockPacket` struct, the public interface +// consisting of getter and setter methods, for each struct field, that will also handle the correct +// endianess. + +/// The vsock packet header struct size (when packed). +pub const VSOCK_PKT_HDR_SIZE: usize = 44; + +// Source CID. +const HDROFF_SRC_CID: usize = 0; + +// Destination CID. +const HDROFF_DST_CID: usize = 8; + +// Source port. +const HDROFF_SRC_PORT: usize = 16; + +// Destination port. +const HDROFF_DST_PORT: usize = 20; + +// Data length (in bytes) - may be 0, if there is no data buffer. +const HDROFF_LEN: usize = 24; + +// Socket type. Currently, only connection-oriented streams are defined by the vsock protocol. +const HDROFF_TYPE: usize = 28; + +// Operation ID - one of the VSOCK_OP_* values; e.g. +// - VSOCK_OP_RW: a data packet; +// - VSOCK_OP_REQUEST: connection request; +// - VSOCK_OP_RST: forcefull connection termination; +// etc (see `super::defs::uapi` for the full list). +const HDROFF_OP: usize = 30; + +// Additional options (flags) associated with the current operation (`op`). +// Currently, only used with shutdown requests (VSOCK_OP_SHUTDOWN). +const HDROFF_FLAGS: usize = 32; + +// Size (in bytes) of the packet sender receive buffer (for the connection to which this packet +// belongs). +const HDROFF_BUF_ALLOC: usize = 36; + +// Number of bytes the sender has received and consumed (for the connection to which this packet +// belongs). For instance, for our Unix backend, this counter would be the total number of bytes +// we have successfully written to a backing Unix socket. +const HDROFF_FWD_CNT: usize = 40; + +/// The vsock packet, implemented as a wrapper over a virtq descriptor chain: +/// - the chain head, holding the packet header; and +/// - (an optional) data/buffer descriptor, only present for data packets (VSOCK_OP_RW). +/// +pub struct VsockPacket { + hdr: *mut u8, + buf: Option<*mut u8>, + buf_size: usize, +} + +impl VsockPacket { + /// Create the packet wrapper from a TX virtq chain head. + /// + /// The chain head is expected to hold valid packet header data. A following packet buffer + /// descriptor can optionally end the chain. Bounds and pointer checks are performed when + /// creating the wrapper. + /// + pub fn from_tx_virtq_head(head: &DescriptorChain) -> Result { + // All buffers in the TX queue must be readable. + // + if head.is_write_only() { + return Err(VsockError::UnreadableDescriptor); + } + + // The packet header should fit inside the head descriptor. + if head.len < VSOCK_PKT_HDR_SIZE as u32 { + return Err(VsockError::HdrDescTooSmall(head.len)); + } + + let mut pkt = Self { + hdr: head + .mem + .get_host_address(head.addr) + .ok_or_else(|| VsockError::GuestMemory)? as *mut u8, + buf: None, + buf_size: 0, + }; + + // No point looking for a data/buffer descriptor, if the packet is zero-lengthed. + if pkt.len() == 0 { + return Ok(pkt); + } + + // Reject weirdly-sized packets. + // + if pkt.len() > defs::MAX_PKT_BUF_SIZE as u32 { + return Err(VsockError::InvalidPktLen(pkt.len())); + } + + // If the packet header showed a non-zero length, there should be a data descriptor here. + let buf_desc = head.next_descriptor().ok_or(VsockError::BufDescMissing)?; + + // TX data should be read-only. + if buf_desc.is_write_only() { + return Err(VsockError::UnreadableDescriptor); + } + + // The data buffer should be large enough to fit the size of the data, as described by + // the header descriptor. + if buf_desc.len < pkt.len() { + return Err(VsockError::BufDescTooSmall); + } + + pkt.buf_size = buf_desc.len as usize; + pkt.buf = Some( + buf_desc + .mem + .get_host_address(buf_desc.addr) + .ok_or_else(|| VsockError::GuestMemory)? as *mut u8, + ); + + Ok(pkt) + } + + /// Create the packet wrapper from an RX virtq chain head. + /// + /// There must be two descriptors in the chain, both writable: a header descriptor and a data + /// descriptor. Bounds and pointer checks are performed when creating the wrapper. + /// + pub fn from_rx_virtq_head(head: &DescriptorChain) -> Result { + // All RX buffers must be writable. + // + if !head.is_write_only() { + return Err(VsockError::UnwritableDescriptor); + } + + // The packet header should fit inside the head descriptor. + if head.len < VSOCK_PKT_HDR_SIZE as u32 { + return Err(VsockError::HdrDescTooSmall(head.len)); + } + + // All RX descriptor chains should have a header and a data descriptor. + if !head.has_next() { + return Err(VsockError::BufDescMissing); + } + let buf_desc = head.next_descriptor().ok_or(VsockError::BufDescMissing)?; + + Ok(Self { + hdr: head + .mem + .get_host_address(head.addr) + .ok_or_else(|| VsockError::GuestMemory)? as *mut u8, + buf: Some( + buf_desc + .mem + .get_host_address(buf_desc.addr) + .ok_or_else(|| VsockError::GuestMemory)? as *mut u8, + ), + buf_size: buf_desc.len as usize, + }) + } + + /// Provides in-place, byte-slice, access to the vsock packet header. + /// + pub fn hdr(&self) -> &[u8] { + // This is safe since bound checks have already been performed when creating the packet + // from the virtq descriptor. + unsafe { std::slice::from_raw_parts(self.hdr as *const u8, VSOCK_PKT_HDR_SIZE) } + } + + /// Provides in-place, byte-slice, mutable access to the vsock packet header. + /// + pub fn hdr_mut(&mut self) -> &mut [u8] { + // This is safe since bound checks have already been performed when creating the packet + // from the virtq descriptor. + unsafe { std::slice::from_raw_parts_mut(self.hdr, VSOCK_PKT_HDR_SIZE) } + } + + /// Provides in-place, byte-slice access to the vsock packet data buffer. + /// + /// Note: control packets (e.g. connection request or reset) have no data buffer associated. + /// For those packets, this method will return `None`. + /// Also note: calling `len()` on the returned slice will yield the buffer size, which may be + /// (and often is) larger than the length of the packet data. The packet data length + /// is stored in the packet header, and accessible via `VsockPacket::len()`. + pub fn buf(&self) -> Option<&[u8]> { + self.buf.map(|ptr| { + // This is safe since bound checks have already been performed when creating the packet + // from the virtq descriptor. + unsafe { std::slice::from_raw_parts(ptr as *const u8, self.buf_size) } + }) + } + + /// Provides in-place, byte-slice, mutable access to the vsock packet data buffer. + /// + /// Note: control packets (e.g. connection request or reset) have no data buffer associated. + /// For those packets, this method will return `None`. + /// Also note: calling `len()` on the returned slice will yield the buffer size, which may be + /// (and often is) larger than the length of the packet data. The packet data length + /// is stored in the packet header, and accessible via `VsockPacket::len()`. + pub fn buf_mut(&mut self) -> Option<&mut [u8]> { + self.buf.map(|ptr| { + // This is safe since bound checks have already been performed when creating the packet + // from the virtq descriptor. + unsafe { std::slice::from_raw_parts_mut(ptr, self.buf_size) } + }) + } + + pub fn src_cid(&self) -> u64 { + LittleEndian::read_u64(&self.hdr()[HDROFF_SRC_CID..]) + } + + pub fn set_src_cid(&mut self, cid: u64) -> &mut Self { + LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_SRC_CID..], cid); + self + } + + pub fn dst_cid(&self) -> u64 { + LittleEndian::read_u64(&self.hdr()[HDROFF_DST_CID..]) + } + + pub fn set_dst_cid(&mut self, cid: u64) -> &mut Self { + LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_DST_CID..], cid); + self + } + + pub fn src_port(&self) -> u32 { + LittleEndian::read_u32(&self.hdr()[HDROFF_SRC_PORT..]) + } + + pub fn set_src_port(&mut self, port: u32) -> &mut Self { + LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_SRC_PORT..], port); + self + } + + pub fn dst_port(&self) -> u32 { + LittleEndian::read_u32(&self.hdr()[HDROFF_DST_PORT..]) + } + + pub fn set_dst_port(&mut self, port: u32) -> &mut Self { + LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_DST_PORT..], port); + self + } + + pub fn len(&self) -> u32 { + LittleEndian::read_u32(&self.hdr()[HDROFF_LEN..]) + } + + pub fn set_len(&mut self, len: u32) -> &mut Self { + LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_LEN..], len); + self + } + + pub fn type_(&self) -> u16 { + LittleEndian::read_u16(&self.hdr()[HDROFF_TYPE..]) + } + + pub fn set_type(&mut self, type_: u16) -> &mut Self { + LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_TYPE..], type_); + self + } + + pub fn op(&self) -> u16 { + LittleEndian::read_u16(&self.hdr()[HDROFF_OP..]) + } + + pub fn set_op(&mut self, op: u16) -> &mut Self { + LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_OP..], op); + self + } + + pub fn flags(&self) -> u32 { + LittleEndian::read_u32(&self.hdr()[HDROFF_FLAGS..]) + } + + pub fn set_flags(&mut self, flags: u32) -> &mut Self { + LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FLAGS..], flags); + self + } + + pub fn set_flag(&mut self, flag: u32) -> &mut Self { + self.set_flags(self.flags() | flag); + self + } + + pub fn buf_alloc(&self) -> u32 { + LittleEndian::read_u32(&self.hdr()[HDROFF_BUF_ALLOC..]) + } + + pub fn set_buf_alloc(&mut self, buf_alloc: u32) -> &mut Self { + LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_BUF_ALLOC..], buf_alloc); + self + } + + pub fn fwd_cnt(&self) -> u32 { + LittleEndian::read_u32(&self.hdr()[HDROFF_FWD_CNT..]) + } + + pub fn set_fwd_cnt(&mut self, fwd_cnt: u32) -> &mut Self { + LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FWD_CNT..], fwd_cnt); + self + } +}