mirror of
https://github.com/cloud-hypervisor/cloud-hypervisor.git
synced 2025-02-22 11:22:26 +00:00
vm-virtio: vsock: Port submodule csm and packet from Firecracker
This code porting is based off of Firecracker commit 1e1cb6f8f8003e0bdce11d265f0feb23249a03f6 Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
parent
22f91ab3a2
commit
df61a8fea2
@ -41,11 +41,13 @@ unsafe impl ByteValued for Descriptor {}
|
||||
|
||||
/// A virtio descriptor chain.
|
||||
pub struct DescriptorChain<'a> {
|
||||
mem: &'a GuestMemoryMmap,
|
||||
desc_table: GuestAddress,
|
||||
queue_size: u16,
|
||||
ttl: u16, // used to prevent infinite chain cycles
|
||||
|
||||
/// Reference to guest memory
|
||||
pub mem: &'a GuestMemoryMmap,
|
||||
|
||||
/// Index into the descriptor table
|
||||
pub index: u16,
|
||||
|
||||
|
633
vm-virtio/src/vsock/csm/connection.rs
Normal file
633
vm-virtio/src/vsock/csm/connection.rs
Normal file
@ -0,0 +1,633 @@
|
||||
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
/// The main job of `VsockConnection` is to forward data traffic, back and forth, between a
|
||||
/// guest-side AF_VSOCK socket and a host-side generic `Read + Write + AsRawFd` stream, while
|
||||
/// also managing its internal state.
|
||||
/// To that end, `VsockConnection` implements:
|
||||
/// - `VsockChannel` for:
|
||||
/// - moving data from the host stream to a guest-provided RX buffer, via `recv_pkt()`; and
|
||||
/// - moving data from a guest-provided TX buffer to the host stream, via `send_pkt()`; and
|
||||
/// - updating its internal state, by absorbing control packets (anything other than
|
||||
/// VSOCK_OP_RW).
|
||||
/// - `VsockEpollListener` for getting notified about the availability of data or free buffer
|
||||
/// space at the host stream.
|
||||
///
|
||||
/// Note: there is a certain asymmetry to the RX and TX data flows:
|
||||
/// - RX transfers do not need any data buffering, since data is read straight from the
|
||||
/// host stream and into the guest-provided RX buffer;
|
||||
/// - TX transfers may require some data to be buffered by `VsockConnection`, if the host
|
||||
/// peer can't keep up with reading the data that we're writing. This is because, once
|
||||
/// the guest driver provides some data in a virtio TX buffer, the vsock device must
|
||||
/// consume it. If that data can't be forwarded straight to the host stream, we'll
|
||||
/// have to store it in a buffer (and flush it at a later time). Vsock flow control
|
||||
/// ensures that our TX buffer doesn't overflow.
|
||||
///
|
||||
// The code in this file is best read with a fresh memory of the vsock protocol inner-workings.
|
||||
// To help with that, here is a
|
||||
//
|
||||
// Short primer on the vsock protocol
|
||||
// ----------------------------------
|
||||
//
|
||||
// 1. Establishing a connection
|
||||
// A vsock connection is considered established after a two-way handshake:
|
||||
// - the initiating peer sends a connection request packet (`hdr.op` == VSOCK_OP_REQUEST);
|
||||
// then
|
||||
// - the listening peer sends back a connection response packet (`hdr.op` ==
|
||||
// VSOCK_OP_RESPONSE).
|
||||
//
|
||||
// 2. Terminating a connection
|
||||
// When a peer wants to shut down an established connection, it sends a VSOCK_OP_SHUTDOWN
|
||||
// packet. Two header flags are used with VSOCK_OP_SHUTDOWN, indicating the sender's
|
||||
// intention:
|
||||
// - VSOCK_FLAGS_SHUTDOWN_RCV: the sender will receive no more data for this connection; and
|
||||
// - VSOCK_FLAGS_SHUTDOWN_SEND: the sender will send no more data for this connection.
|
||||
// After a shutdown packet, the receiving peer will have some protocol-undefined time to
|
||||
// flush its buffers, and then forcefully terminate the connection by sending back an RST
|
||||
// packet. If the shutdown-initiating peer doesn't receive this RST packet during a timeout
|
||||
// period, it will send one itself, thus terminating the connection.
|
||||
// Note: a peer can send more than one VSOCK_OP_SHUTDOWN packets. However, read/write
|
||||
// indications cannot be undone. E.g. once a "no-more-sending" promise was made, it
|
||||
// cannot be taken back. That is, `hdr.flags` will be ORed between subsequent
|
||||
// VSOCK_OP_SHUTDOWN packets.
|
||||
//
|
||||
// 3. Flow control
|
||||
// Before sending a data packet (VSOCK_OP_RW), the sender must make sure that the receiver
|
||||
// has enough free buffer space to store that data. If this condition is not respected, the
|
||||
// receiving peer's behaviour is undefined. In this implementation, we forcefully terminate
|
||||
// the connection by sending back a VSOCK_OP_RST packet.
|
||||
// Note: all buffer space information is computed and stored on a per-connection basis.
|
||||
// Peers keep each other informed about the free buffer space they have by filling in two
|
||||
// packet header members with each packet they send:
|
||||
// - `hdr.buf_alloc`: the total buffer space the peer has allocated for receiving data; and
|
||||
// - `hdr.fwd_cnt`: the total number of bytes the peer has successfully flushed out of its
|
||||
// buffer.
|
||||
// One can figure out how much space its peer has available in its buffer by inspecting the
|
||||
// difference between how much it has sent to the peer and how much the peer has flushed out
|
||||
// (i.e. "forwarded", in the vsock spec terminology):
|
||||
// `peer_free = peer_buf_alloc - (total_bytes_sent_to_peer - peer_fwd_cnt)`.
|
||||
// Note: the above requires that peers constantly keep each other informed on their buffer
|
||||
// space situation. However, since there are no receipt acknowledgement packets
|
||||
// defined for the vsock protocol, packet flow can often be unidirectional (just one
|
||||
// peer sending data to another), so the sender's information about the receiver's
|
||||
// buffer space can get quickly outdated. The vsock protocol defines two solutions to
|
||||
// this problem:
|
||||
// 1. The sender can explicitly ask for a buffer space (i.e. "credit") update from its
|
||||
// peer, via a VSOCK_OP_CREDIT_REQUEST packet, to which it will get a
|
||||
// VSOCK_OP_CREDIT_UPDATE response (or any response will do, really, since credit
|
||||
// information must be included in any packet);
|
||||
// 2. The receiver can be proactive, and send VSOCK_OP_CREDIT_UPDATE packet, whenever
|
||||
// it thinks its peer's information is out of date.
|
||||
// Our implementation uses the proactive approach.
|
||||
//
|
||||
use std::io::{ErrorKind, Read, Write};
|
||||
use std::num::Wrapping;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::super::defs::uapi;
|
||||
use super::super::packet::VsockPacket;
|
||||
use super::super::{Result as VsockResult, VsockChannel, VsockEpollListener, VsockError};
|
||||
use super::defs;
|
||||
use super::txbuf::TxBuf;
|
||||
use super::{ConnState, Error, PendingRx, PendingRxSet, Result};
|
||||
|
||||
/// A self-managing connection object, that handles communication between a guest-side AF_VSOCK
|
||||
/// socket and a host-side `Read + Write + AsRawFd` stream.
|
||||
///
|
||||
pub struct VsockConnection<S: Read + Write + AsRawFd> {
|
||||
/// The current connection state.
|
||||
state: ConnState,
|
||||
/// The local CID. Most of the time this will be the constant `2` (the vsock host CID).
|
||||
local_cid: u64,
|
||||
/// The peer (guest) CID.
|
||||
peer_cid: u64,
|
||||
/// The local (host) port.
|
||||
local_port: u32,
|
||||
/// The peer (guest) port.
|
||||
peer_port: u32,
|
||||
/// The (connected) host-side stream.
|
||||
stream: S,
|
||||
/// The TX buffer for this connection.
|
||||
tx_buf: TxBuf,
|
||||
/// Total number of bytes that have been successfully written to `self.stream`, either
|
||||
/// directly, or flushed from `self.tx_buf`.
|
||||
fwd_cnt: Wrapping<u32>,
|
||||
/// The amount of buffer space that the peer (guest) has allocated for this connection.
|
||||
peer_buf_alloc: u32,
|
||||
/// The total number of bytes that the peer has forwarded away.
|
||||
peer_fwd_cnt: Wrapping<u32>,
|
||||
/// The total number of bytes sent to the peer (guest vsock driver)
|
||||
rx_cnt: Wrapping<u32>,
|
||||
/// Our `self.fwd_cnt`, as last sent to the peer. This is used to provide proactive credit
|
||||
/// updates, and let the peer know it's OK to send more data.
|
||||
last_fwd_cnt_to_peer: Wrapping<u32>,
|
||||
/// The set of pending RX packet indications that `recv_pkt()` will use to fill in a
|
||||
/// packet for the peer (guest).
|
||||
pending_rx: PendingRxSet,
|
||||
/// Instant when this connection should be scheduled for immediate termination, due to some
|
||||
/// timeout condition having been fulfilled.
|
||||
expiry: Option<Instant>,
|
||||
}
|
||||
|
||||
impl<S> VsockChannel for VsockConnection<S>
|
||||
where
|
||||
S: Read + Write + AsRawFd,
|
||||
{
|
||||
/// Fill in a vsock packet, to be delivered to our peer (the guest driver).
|
||||
///
|
||||
/// As per the `VsockChannel` trait, this should only be called when there is data to be
|
||||
/// fetched from the channel (i.e. `has_pending_rx()` is true). Otherwise, it will error
|
||||
/// out with `VsockError::NoData`.
|
||||
/// Pending RX indications are set by other mutable actions performed on the channel. For
|
||||
/// instance, `send_pkt()` could set an Rst indication, if called with a VSOCK_OP_SHUTDOWN
|
||||
/// packet, or `notify()` could set a Rw indication (a data packet can be fetched from the
|
||||
/// channel), if data was ready to be read from the host stream.
|
||||
///
|
||||
/// Returns:
|
||||
/// - `Ok(())`: the packet has been successfully filled in and is ready for delivery;
|
||||
/// - `Err(VsockError::NoData)`: there was no data available with which to fill in the
|
||||
/// packet;
|
||||
/// - `Err(VsockError::PktBufMissing)`: the packet would've been filled in with data, but
|
||||
/// it is missing the data buffer.
|
||||
///
|
||||
fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> VsockResult<()> {
|
||||
// Perform some generic initialization that is the same for any packet operation (e.g.
|
||||
// source, destination, credit, etc).
|
||||
self.init_pkt(pkt);
|
||||
|
||||
// If forceful termination is pending, there's no point in checking for anything else.
|
||||
// It's dead, Jim.
|
||||
if self.pending_rx.remove(PendingRx::Rst) {
|
||||
pkt.set_op(uapi::VSOCK_OP_RST);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Next up: if we're due a connection confirmation, that's all we need to know to fill
|
||||
// in this packet.
|
||||
if self.pending_rx.remove(PendingRx::Response) {
|
||||
self.state = ConnState::Established;
|
||||
pkt.set_op(uapi::VSOCK_OP_RESPONSE);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Same thing goes for locally-initiated connections that need to yield a connection
|
||||
// request.
|
||||
if self.pending_rx.remove(PendingRx::Request) {
|
||||
self.expiry =
|
||||
Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS));
|
||||
pkt.set_op(uapi::VSOCK_OP_REQUEST);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// A credit update is basically a no-op, so we should only waste a perfectly fine RX
|
||||
// buffer on it if we really have nothing else to say.
|
||||
if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() {
|
||||
pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE);
|
||||
self.last_fwd_cnt_to_peer = self.fwd_cnt;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Alright, if we got to here, we need to cough up a data packet. We've already checked
|
||||
// for all other pending RX indications.
|
||||
if !self.pending_rx.remove(PendingRx::Rw) {
|
||||
return Err(VsockError::NoData);
|
||||
}
|
||||
|
||||
match self.state {
|
||||
// A data packet is only valid for established connections, and connections for
|
||||
// which our peer has initiated a graceful shutdown, but can still receive data.
|
||||
ConnState::Established | ConnState::PeerClosed(false, _) => (),
|
||||
_ => {
|
||||
// Any other connection state is invalid at this point, and we need to kill it
|
||||
// with fire.
|
||||
pkt.set_op(uapi::VSOCK_OP_RST);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Oh wait, before we start bringing in the big data, can our peer handle receiving so
|
||||
// much bytey goodness?
|
||||
if self.need_credit_update_from_peer() {
|
||||
self.last_fwd_cnt_to_peer = self.fwd_cnt;
|
||||
pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let buf = pkt.buf_mut().ok_or(VsockError::PktBufMissing)?;
|
||||
|
||||
// The maximum amount of data we can read in is limited by both the RX buffer size and
|
||||
// the peer available buffer space.
|
||||
let max_len = std::cmp::min(buf.len(), self.peer_avail_credit());
|
||||
|
||||
// Read data from the stream straight to the RX buffer, for maximum throughput.
|
||||
match self.stream.read(&mut buf[..max_len]) {
|
||||
Ok(read_cnt) => {
|
||||
if read_cnt == 0 {
|
||||
// A 0-length read means the host stream was closed down. In that case,
|
||||
// we'll ask our peer to shut down the connection. We can neither send nor
|
||||
// receive any more data.
|
||||
self.state = ConnState::LocalClosed;
|
||||
self.expiry = Some(
|
||||
Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
|
||||
);
|
||||
pkt.set_op(uapi::VSOCK_OP_SHUTDOWN)
|
||||
.set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV)
|
||||
.set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
|
||||
} else {
|
||||
// On a successful data read, we fill in the packet with the RW op, and
|
||||
// length of the read data.
|
||||
pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// We are not expecting any errors when reading from the underlying stream. If
|
||||
// any show up, we'll immediately kill this connection.
|
||||
error!(
|
||||
"vsock: error reading from backing stream: lp={}, pp={}, err={:?}",
|
||||
self.local_port, self.peer_port, err
|
||||
);
|
||||
pkt.set_op(uapi::VSOCK_OP_RST);
|
||||
}
|
||||
};
|
||||
|
||||
self.rx_cnt += Wrapping(pkt.len());
|
||||
self.last_fwd_cnt_to_peer = self.fwd_cnt;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deliver a guest-generated packet to this connection.
|
||||
///
|
||||
/// This forwards the data in RW packets to the host stream, and absorbs control packets,
|
||||
/// using them to manage the internal connection state.
|
||||
///
|
||||
/// Returns:
|
||||
/// always `Ok(())`: the packet has been consumed;
|
||||
///
|
||||
fn send_pkt(&mut self, pkt: &VsockPacket) -> VsockResult<()> {
|
||||
// Update the peer credit information.
|
||||
self.peer_buf_alloc = pkt.buf_alloc();
|
||||
self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
|
||||
|
||||
match self.state {
|
||||
// Most frequent case: this is an established connection that needs to forward some
|
||||
// data to the host stream. Also works for a connection that has begun shutting
|
||||
// down, but the peer still has some data to send.
|
||||
ConnState::Established | ConnState::PeerClosed(_, false)
|
||||
if pkt.op() == uapi::VSOCK_OP_RW =>
|
||||
{
|
||||
if pkt.buf().is_none() {
|
||||
info!(
|
||||
"vsock: dropping empty data packet from guest (lp={}, pp={}",
|
||||
self.local_port, self.peer_port
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Unwrapping here is safe, since we just checked `pkt.buf()` above.
|
||||
let buf_slice = &pkt.buf().unwrap()[..(pkt.len() as usize)];
|
||||
if let Err(err) = self.send_bytes(buf_slice) {
|
||||
// If we can't write to the host stream, that's an unrecoverable error, so
|
||||
// we'll terminate this connection.
|
||||
warn!(
|
||||
"vsock: error writing to local stream (lp={}, pp={}): {:?}",
|
||||
self.local_port, self.peer_port, err
|
||||
);
|
||||
self.kill();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We might've just consumed some data. If that's the case, we might need to
|
||||
// update the peer on our buffer space situation, so that it can keep sending
|
||||
// data packets our way.
|
||||
if self.peer_needs_credit_update() {
|
||||
self.pending_rx.insert(PendingRx::CreditUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
// Next up: receiving a response / confirmation for a host-initiated connection.
|
||||
// We'll move to an Established state, and pass on the good news through the host
|
||||
// stream.
|
||||
ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => {
|
||||
self.expiry = None;
|
||||
self.state = ConnState::Established;
|
||||
}
|
||||
|
||||
// The peer wants to shut down an established connection. If they have nothing
|
||||
// more to send nor receive, and we don't have to wait to drain our TX buffer, we
|
||||
// can schedule an RST packet (to terminate the connection on the next recv call).
|
||||
// Otherwise, we'll arm the kill timer.
|
||||
ConnState::Established if pkt.op() == uapi::VSOCK_OP_SHUTDOWN => {
|
||||
let recv_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0;
|
||||
let send_off = pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0;
|
||||
self.state = ConnState::PeerClosed(recv_off, send_off);
|
||||
if recv_off && send_off {
|
||||
if self.tx_buf.is_empty() {
|
||||
self.pending_rx.insert(PendingRx::Rst);
|
||||
} else {
|
||||
self.expiry = Some(
|
||||
Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The peer wants to update a shutdown request, with more receive/send indications.
|
||||
// The same logic as above applies.
|
||||
ConnState::PeerClosed(ref mut recv_off, ref mut send_off)
|
||||
if pkt.op() == uapi::VSOCK_OP_SHUTDOWN =>
|
||||
{
|
||||
*recv_off = *recv_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV != 0);
|
||||
*send_off = *send_off || (pkt.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND != 0);
|
||||
if *recv_off && *send_off && self.tx_buf.is_empty() {
|
||||
self.pending_rx.insert(PendingRx::Rst);
|
||||
}
|
||||
}
|
||||
|
||||
// A credit update from our peer is valid only in a state which allows data
|
||||
// transfer towards the peer.
|
||||
ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(false, _)
|
||||
if pkt.op() == uapi::VSOCK_OP_CREDIT_UPDATE =>
|
||||
{
|
||||
// Nothing to do here; we've already updated peer credit.
|
||||
}
|
||||
|
||||
// A credit request from our peer is valid only in a state which allows data
|
||||
// transfer from the peer. We'll respond with a credit update packet.
|
||||
ConnState::Established | ConnState::PeerInit | ConnState::PeerClosed(_, false)
|
||||
if pkt.op() == uapi::VSOCK_OP_CREDIT_REQUEST =>
|
||||
{
|
||||
self.pending_rx.insert(PendingRx::CreditUpdate);
|
||||
}
|
||||
|
||||
_ => {
|
||||
debug!(
|
||||
"vsock: dropping invalid TX pkt for connection: state={:?}, pkt.hdr={:?}",
|
||||
self.state,
|
||||
pkt.hdr()
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the connection has any pending packet addressed to the peer.
|
||||
///
|
||||
fn has_pending_rx(&self) -> bool {
|
||||
!self.pending_rx.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> VsockEpollListener for VsockConnection<S>
|
||||
where
|
||||
S: Read + Write + AsRawFd,
|
||||
{
|
||||
/// Get the file descriptor that this connection wants polled.
|
||||
///
|
||||
/// The connection is interested in being notified about EPOLLIN / EPOLLOUT events on the
|
||||
/// host stream.
|
||||
///
|
||||
fn get_polled_fd(&self) -> RawFd {
|
||||
self.stream.as_raw_fd()
|
||||
}
|
||||
|
||||
/// Get the event set that this connection is interested in.
|
||||
///
|
||||
/// A connection will want to be notified when:
|
||||
/// - data is available to be read from the host stream, so that it can store an RW pending
|
||||
/// RX indication; and
|
||||
/// - data can be written to the host stream, and the TX buffer needs to be flushed.
|
||||
///
|
||||
fn get_polled_evset(&self) -> epoll::Events {
|
||||
let mut evset = epoll::Events::empty();
|
||||
if !self.tx_buf.is_empty() {
|
||||
// There's data waiting in the TX buffer, so we are interested in being notified
|
||||
// when writing to the host stream wouldn't block.
|
||||
evset.insert(epoll::Events::EPOLLOUT);
|
||||
}
|
||||
// We're generally interested in being notified when data can be read from the host
|
||||
// stream, unless we're in a state which doesn't allow moving data from host to guest.
|
||||
match self.state {
|
||||
ConnState::Killed | ConnState::LocalClosed | ConnState::PeerClosed(true, _) => (),
|
||||
_ if self.need_credit_update_from_peer() => (),
|
||||
_ => evset.insert(epoll::Events::EPOLLIN),
|
||||
}
|
||||
evset
|
||||
}
|
||||
|
||||
/// Notify the connection about an event (or set of events) that it was interested in.
|
||||
///
|
||||
fn notify(&mut self, evset: epoll::Events) {
|
||||
if evset.contains(epoll::Events::EPOLLIN) {
|
||||
// Data can be read from the host stream. Setting a Rw pending indication, so that
|
||||
// the muxer will know to call `recv_pkt()` later.
|
||||
self.pending_rx.insert(PendingRx::Rw);
|
||||
}
|
||||
|
||||
if evset.contains(epoll::Events::EPOLLOUT) {
|
||||
// Data can be written to the host stream. Time to flush out the TX buffer.
|
||||
//
|
||||
if self.tx_buf.is_empty() {
|
||||
info!("vsock: connection received unexpected EPOLLOUT event");
|
||||
return;
|
||||
}
|
||||
let flushed = self
|
||||
.tx_buf
|
||||
.flush_to(&mut self.stream)
|
||||
.unwrap_or_else(|err| {
|
||||
warn!(
|
||||
"vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
|
||||
self.local_port, self.peer_port, err
|
||||
);
|
||||
self.kill();
|
||||
0
|
||||
});
|
||||
self.fwd_cnt += Wrapping(flushed as u32);
|
||||
|
||||
// If this connection was shutting down, but is waiting to drain the TX buffer
|
||||
// before forceful termination, the wait might be over.
|
||||
if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() {
|
||||
self.pending_rx.insert(PendingRx::Rst);
|
||||
} else if self.peer_needs_credit_update() {
|
||||
// If we've freed up some more buffer space, we may need to let the peer know it
|
||||
// can safely send more data our way.
|
||||
self.pending_rx.insert(PendingRx::CreditUpdate);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> VsockConnection<S>
|
||||
where
|
||||
S: Read + Write + AsRawFd,
|
||||
{
|
||||
/// Create a new guest-initiated connection object.
|
||||
///
|
||||
pub fn new_peer_init(
|
||||
stream: S,
|
||||
local_cid: u64,
|
||||
peer_cid: u64,
|
||||
local_port: u32,
|
||||
peer_port: u32,
|
||||
peer_buf_alloc: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
local_cid,
|
||||
peer_cid,
|
||||
local_port,
|
||||
peer_port,
|
||||
stream,
|
||||
state: ConnState::PeerInit,
|
||||
tx_buf: TxBuf::new(),
|
||||
fwd_cnt: Wrapping(0),
|
||||
peer_buf_alloc,
|
||||
peer_fwd_cnt: Wrapping(0),
|
||||
rx_cnt: Wrapping(0),
|
||||
last_fwd_cnt_to_peer: Wrapping(0),
|
||||
pending_rx: PendingRxSet::from(PendingRx::Response),
|
||||
expiry: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new host-initiated connection object.
|
||||
///
|
||||
pub fn new_local_init(
|
||||
stream: S,
|
||||
local_cid: u64,
|
||||
peer_cid: u64,
|
||||
local_port: u32,
|
||||
peer_port: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
local_cid,
|
||||
peer_cid,
|
||||
local_port,
|
||||
peer_port,
|
||||
stream,
|
||||
state: ConnState::LocalInit,
|
||||
tx_buf: TxBuf::new(),
|
||||
fwd_cnt: Wrapping(0),
|
||||
peer_buf_alloc: 0,
|
||||
peer_fwd_cnt: Wrapping(0),
|
||||
rx_cnt: Wrapping(0),
|
||||
last_fwd_cnt_to_peer: Wrapping(0),
|
||||
pending_rx: PendingRxSet::from(PendingRx::Request),
|
||||
expiry: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if there is an expiry (kill) timer set for this connection, sometime in the
|
||||
/// future.
|
||||
///
|
||||
pub fn will_expire(&self) -> bool {
|
||||
match self.expiry {
|
||||
None => false,
|
||||
Some(t) => t > Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if this connection needs to be scheduled for forceful termination, due to its
|
||||
/// kill timer having expired.
|
||||
///
|
||||
pub fn has_expired(&self) -> bool {
|
||||
match self.expiry {
|
||||
None => false,
|
||||
Some(t) => t <= Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the kill timer value, if one is set.
|
||||
///
|
||||
pub fn expiry(&self) -> Option<Instant> {
|
||||
self.expiry
|
||||
}
|
||||
|
||||
/// Schedule the connection to be forcefully terminated ASAP (i.e. the next time the
|
||||
/// connection is asked to yield a packet, via `recv_pkt()`).
|
||||
///
|
||||
pub fn kill(&mut self) {
|
||||
self.state = ConnState::Killed;
|
||||
self.pending_rx.insert(PendingRx::Rst);
|
||||
}
|
||||
|
||||
/// Send some raw data (a byte-slice) to the host stream.
|
||||
///
|
||||
/// Raw data can either be sent straight to the host stream, or to our TX buffer, if the
|
||||
/// former fails.
|
||||
///
|
||||
fn send_bytes(&mut self, buf: &[u8]) -> Result<()> {
|
||||
// If there is data in the TX buffer, that means we're already registered for EPOLLOUT
|
||||
// events on the underlying stream. Therefore, there's no point in attempting a write
|
||||
// at this point. `self.notify()` will get called when EPOLLOUT arrives, and it will
|
||||
// attempt to drain the TX buffer then.
|
||||
if !self.tx_buf.is_empty() {
|
||||
return self.tx_buf.push(buf);
|
||||
}
|
||||
|
||||
// The TX buffer is empty, so we can try to write straight to the host stream.
|
||||
let written = match self.stream.write(buf) {
|
||||
Ok(cnt) => cnt,
|
||||
Err(e) => {
|
||||
// Absorb any would-block errors, since we can always try again later.
|
||||
if e.kind() == ErrorKind::WouldBlock {
|
||||
0
|
||||
} else {
|
||||
// We don't know how to handle any other write error, so we'll send it up
|
||||
// the call chain.
|
||||
return Err(Error::StreamWrite(e));
|
||||
}
|
||||
}
|
||||
};
|
||||
// Move the "forwarded bytes" counter ahead by how much we were able to send out.
|
||||
self.fwd_cnt += Wrapping(written as u32);
|
||||
|
||||
// If we couldn't write the whole slice, we'll need to push the remaining data to our
|
||||
// buffer.
|
||||
if written < buf.len() {
|
||||
self.tx_buf.push(&buf[written..])?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the credit information the peer has last received from us is outdated.
|
||||
///
|
||||
fn peer_needs_credit_update(&self) -> bool {
|
||||
(self.fwd_cnt - self.last_fwd_cnt_to_peer).0 as usize >= defs::CONN_CREDIT_UPDATE_THRESHOLD
|
||||
}
|
||||
|
||||
/// Check if we need to ask the peer for a credit update before sending any more data its
|
||||
/// way.
|
||||
///
|
||||
fn need_credit_update_from_peer(&self) -> bool {
|
||||
self.peer_avail_credit() == 0
|
||||
}
|
||||
|
||||
/// Get the maximum number of bytes that we can send to our peer, without overflowing its
|
||||
/// buffer.
|
||||
///
|
||||
fn peer_avail_credit(&self) -> usize {
|
||||
(Wrapping(self.peer_buf_alloc as u32) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize
|
||||
}
|
||||
|
||||
/// Prepare a packet header for transmission to our peer.
|
||||
///
|
||||
fn init_pkt<'a>(&self, pkt: &'a mut VsockPacket) -> &'a mut VsockPacket {
|
||||
// Make sure the header is zeroed-out first.
|
||||
// This looks sub-optimal, but it is actually optimized-out in the compiled code to be
|
||||
// faster than a memset().
|
||||
for b in pkt.hdr_mut() {
|
||||
*b = 0;
|
||||
}
|
||||
|
||||
pkt.set_src_cid(self.local_cid)
|
||||
.set_dst_cid(self.peer_cid)
|
||||
.set_src_port(self.local_port)
|
||||
.set_dst_port(self.peer_port)
|
||||
.set_type(uapi::VSOCK_TYPE_STREAM)
|
||||
.set_buf_alloc(defs::CONN_TX_BUF_SIZE as u32)
|
||||
.set_fwd_cnt(self.fwd_cnt.0)
|
||||
}
|
||||
}
|
129
vm-virtio/src/vsock/csm/mod.rs
Normal file
129
vm-virtio/src/vsock/csm/mod.rs
Normal file
@ -0,0 +1,129 @@
|
||||
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
/// This module implements our vsock connection state machine. The heavy lifting is done by
|
||||
/// `connection::VsockConnection`, while this file only defines some constants and helper structs.
|
||||
///
|
||||
mod connection;
|
||||
mod txbuf;
|
||||
|
||||
pub use connection::VsockConnection;
|
||||
|
||||
pub mod defs {
|
||||
/// Vsock connection TX buffer capacity.
|
||||
pub const CONN_TX_BUF_SIZE: usize = 64 * 1024;
|
||||
|
||||
/// After the guest thinks it has filled our TX buffer up to this limit (in bytes), we'll send
|
||||
/// them a credit update packet, to let them know we can handle more.
|
||||
pub const CONN_CREDIT_UPDATE_THRESHOLD: usize = CONN_TX_BUF_SIZE - 4 * 4 * 1024;
|
||||
|
||||
/// Connection request timeout, in millis.
|
||||
pub const CONN_REQUEST_TIMEOUT_MS: u64 = 2000;
|
||||
|
||||
/// Connection graceful shutdown timeout, in millis.
|
||||
pub const CONN_SHUTDOWN_TIMEOUT_MS: u64 = 2000;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// Attempted to push data to a full TX buffer.
|
||||
TxBufFull,
|
||||
/// An I/O error occurred, when attempting to flush the connection TX buffer.
|
||||
TxBufFlush(std::io::Error),
|
||||
/// An I/O error occurred, when attempting to write data to the host-side stream.
|
||||
StreamWrite(std::io::Error),
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
/// A vsock connection state.
|
||||
///
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ConnState {
|
||||
/// The connection has been initiated by the host end, but is yet to be confirmed by the guest.
|
||||
LocalInit,
|
||||
/// The connection has been initiated by the guest, but we are yet to confirm it, by sending
|
||||
/// a response packet (VSOCK_OP_RESPONSE).
|
||||
PeerInit,
|
||||
/// The connection handshake has been performed successfully, and data can now be exchanged.
|
||||
Established,
|
||||
/// The host (AF_UNIX) socket was closed.
|
||||
LocalClosed,
|
||||
/// A VSOCK_OP_SHUTDOWN packet was received from the guest. The tuple represents the guest R/W
|
||||
/// indication: (will_not_recv_anymore_data, will_not_send_anymore_data).
|
||||
PeerClosed(bool, bool),
|
||||
/// The connection is scheduled to be forcefully terminated as soon as possible.
|
||||
Killed,
|
||||
}
|
||||
|
||||
/// An RX indication, used by `VsockConnection` to schedule future `recv_pkt()` responses.
|
||||
/// For instance, after being notified that there is available data to be read from the host stream
|
||||
/// (via `notify()`), the connection will store a `PendingRx::Rw` to be later inspected by
|
||||
/// `recv_pkt()`.
|
||||
///
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
enum PendingRx {
|
||||
/// We need to yield a connection request packet (VSOCK_OP_REQUEST).
|
||||
Request = 0,
|
||||
/// We need to yield a connection response packet (VSOCK_OP_RESPONSE).
|
||||
Response = 1,
|
||||
/// We need to yield a forceful connection termination packet (VSOCK_OP_RST).
|
||||
Rst = 2,
|
||||
/// We need to yield a data packet (VSOCK_OP_RW), by reading from the AF_UNIX socket.
|
||||
Rw = 3,
|
||||
/// We need to yield a credit update packet (VSOCK_OP_CREDIT_UPDATE).
|
||||
CreditUpdate = 4,
|
||||
}
|
||||
impl PendingRx {
|
||||
/// Transform the enum value into a bitmask, that can be used for set operations.
|
||||
///
|
||||
fn into_mask(self) -> u16 {
|
||||
1u16 << (self as u16)
|
||||
}
|
||||
}
|
||||
|
||||
/// A set of RX indications (`PendingRx` items).
|
||||
///
|
||||
struct PendingRxSet {
|
||||
data: u16,
|
||||
}
|
||||
|
||||
impl PendingRxSet {
|
||||
/// Insert an item into the set.
|
||||
///
|
||||
fn insert(&mut self, it: PendingRx) {
|
||||
self.data |= it.into_mask();
|
||||
}
|
||||
|
||||
/// Remove an item from the set and return:
|
||||
/// - true, if the item was in the set; or
|
||||
/// - false, if the item wasn't in the set.
|
||||
///
|
||||
fn remove(&mut self, it: PendingRx) -> bool {
|
||||
let ret = self.contains(it);
|
||||
self.data &= !it.into_mask();
|
||||
ret
|
||||
}
|
||||
|
||||
/// Check if an item is present in this set.
|
||||
///
|
||||
fn contains(&self, it: PendingRx) -> bool {
|
||||
self.data & it.into_mask() != 0
|
||||
}
|
||||
|
||||
/// Check if the set is empty.
|
||||
///
|
||||
fn is_empty(&self) -> bool {
|
||||
self.data == 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a set containing only one item.
|
||||
///
|
||||
impl From<PendingRx> for PendingRxSet {
|
||||
fn from(it: PendingRx) -> Self {
|
||||
Self {
|
||||
data: it.into_mask(),
|
||||
}
|
||||
}
|
||||
}
|
279
vm-virtio/src/vsock/csm/txbuf.rs
Normal file
279
vm-virtio/src/vsock/csm/txbuf.rs
Normal file
@ -0,0 +1,279 @@
|
||||
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::io::Write;
|
||||
use std::mem;
|
||||
use std::num::Wrapping;
|
||||
|
||||
use super::defs;
|
||||
use super::{Error, Result};
|
||||
|
||||
/// A simple ring-buffer implementation, used by vsock connections to buffer TX (guest -> host)
|
||||
/// data. Memory for this buffer is allocated lazily, since buffering will only be needed when
|
||||
/// the host can't read fast enough.
|
||||
///
|
||||
pub struct TxBuf {
|
||||
/// The actual u8 buffer - only allocated after the first push.
|
||||
data: Option<Box<[u8; Self::SIZE]>>,
|
||||
/// Ring-buffer head offset - where new data is pushed to.
|
||||
head: Wrapping<u32>,
|
||||
/// Ring-buffer tail offset - where data is flushed from.
|
||||
tail: Wrapping<u32>,
|
||||
}
|
||||
|
||||
impl TxBuf {
|
||||
/// Total buffer size, in bytes.
|
||||
///
|
||||
const SIZE: usize = defs::CONN_TX_BUF_SIZE;
|
||||
|
||||
/// Ring-buffer constructor.
|
||||
///
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
data: None,
|
||||
head: Wrapping(0),
|
||||
tail: Wrapping(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the used length of this buffer - number of bytes that have been pushed in, but not
|
||||
/// yet flushed out.
|
||||
///
|
||||
pub fn len(&self) -> usize {
|
||||
(self.head - self.tail).0 as usize
|
||||
}
|
||||
|
||||
/// Push a byte slice onto the ring-buffer.
|
||||
///
|
||||
/// Either the entire source slice will be pushed to the ring-buffer, or none of it, if
|
||||
/// there isn't enough room, in which case `Err(Error::TxBufFull)` is returned.
|
||||
///
|
||||
pub fn push(&mut self, src: &[u8]) -> Result<()> {
|
||||
// Error out if there's no room to push the entire slice.
|
||||
if self.len() + src.len() > Self::SIZE {
|
||||
return Err(Error::TxBufFull);
|
||||
}
|
||||
|
||||
// We're using a closure here to return the boxed slice, instead of a value (i.e.
|
||||
// `get_or_insert_with()` instead of `get_or_insert()`), because we only want the box
|
||||
// created when `self.data` is None. If we were to use `get_or_insert(box)`, the box
|
||||
// argument would always get evaluated (which implies a heap allocation), even though
|
||||
// it would later be discarded (when `self.data.is_some()`). Apparently, clippy fails
|
||||
// to see this, and insists on issuing some warning.
|
||||
#[allow(clippy::redundant_closure)]
|
||||
let data = self.data.get_or_insert_with(||
|
||||
// Using uninitialized memory here is quite safe, since we never read from any
|
||||
// area of the buffer before writing to it. First we push, then we flush only
|
||||
// what had been prviously pushed.
|
||||
Box::new(unsafe {mem::uninitialized::<[u8; Self::SIZE]>()}));
|
||||
|
||||
// Buffer head, as an offset into the data slice.
|
||||
let head_ofs = self.head.0 as usize % Self::SIZE;
|
||||
|
||||
// Pushing a slice to this buffer can take either one or two slice copies: - one copy,
|
||||
// if the slice fits between `head_ofs` and `Self::SIZE`; or - two copies, if the
|
||||
// ring-buffer head wraps around.
|
||||
|
||||
// First copy length: we can only go from the head offset up to the total buffer size.
|
||||
let len = std::cmp::min(Self::SIZE - head_ofs, src.len());
|
||||
data[head_ofs..(head_ofs + len)].copy_from_slice(&src[..len]);
|
||||
|
||||
// If the slice didn't fit, the buffer head will wrap around, and pushing continues
|
||||
// from the start of the buffer (`&self.data[0]`).
|
||||
if len < src.len() {
|
||||
data[..(src.len() - len)].copy_from_slice(&src[len..]);
|
||||
}
|
||||
|
||||
// Either way, we've just pushed exactly `src.len()` bytes, so that's the amount by
|
||||
// which the (wrapping) buffer head needs to move forward.
|
||||
self.head += Wrapping(src.len() as u32);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush the contents of the ring-buffer to a writable stream.
|
||||
///
|
||||
/// Return the number of bytes that have been transferred out of the ring-buffer and into
|
||||
/// the writable stream.
|
||||
///
|
||||
pub fn flush_to<W>(&mut self, sink: &mut W) -> Result<usize>
|
||||
where
|
||||
W: Write,
|
||||
{
|
||||
// Nothing to do, if this buffer holds no data.
|
||||
if self.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Buffer tail, as an offset into the buffer data slice.
|
||||
let tail_ofs = self.tail.0 as usize % Self::SIZE;
|
||||
|
||||
// Flushing the buffer can take either one or two writes:
|
||||
// - one write, if the tail doesn't need to wrap around to reach the head; or
|
||||
// - two writes, if the tail would wrap around: tail to slice end, then slice end to
|
||||
// head.
|
||||
|
||||
// First write length: the lesser of tail to slice end, or tail to head.
|
||||
let len_to_write = std::cmp::min(Self::SIZE - tail_ofs, self.len());
|
||||
|
||||
// It's safe to unwrap here, since we've already checked if the buffer was empty.
|
||||
let data = self.data.as_ref().unwrap();
|
||||
|
||||
// Issue the first write and absorb any `WouldBlock` error (we can just try again
|
||||
// later).
|
||||
let written = sink
|
||||
.write(&data[tail_ofs..(tail_ofs + len_to_write)])
|
||||
.map_err(Error::TxBufFlush)?;
|
||||
|
||||
// Move the buffer tail ahead by the amount (of bytes) we were able to flush out.
|
||||
self.tail += Wrapping(written as u32);
|
||||
|
||||
// If we weren't able to flush out as much as we tried, there's no point in attempting
|
||||
// our second write.
|
||||
if written < len_to_write {
|
||||
return Ok(written);
|
||||
}
|
||||
|
||||
// Attempt our second write. This will return immediately if a second write isn't
|
||||
// needed, since checking for an empty buffer is the first thing we do in this
|
||||
// function.
|
||||
Ok(written + self.flush_to(sink)?)
|
||||
}
|
||||
|
||||
/// Check if the buffer holds any data that hasn't yet been flushed out.
|
||||
///
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::Error as IoError;
|
||||
use std::io::Result as IoResult;
|
||||
use std::io::{ErrorKind, Write};
|
||||
|
||||
struct TestSink {
|
||||
data: Vec<u8>,
|
||||
err: Option<IoError>,
|
||||
capacity: usize,
|
||||
}
|
||||
|
||||
impl TestSink {
|
||||
const DEFAULT_CAPACITY: usize = 2 * TxBuf::SIZE;
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
data: Vec::with_capacity(Self::DEFAULT_CAPACITY),
|
||||
err: None,
|
||||
capacity: Self::DEFAULT_CAPACITY,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for TestSink {
|
||||
fn write(&mut self, src: &[u8]) -> IoResult<usize> {
|
||||
if self.err.is_some() {
|
||||
return Err(self.err.take().unwrap());
|
||||
}
|
||||
let len_to_push = std::cmp::min(self.capacity - self.data.len(), src.len());
|
||||
self.data.extend_from_slice(&src[..len_to_push]);
|
||||
Ok(len_to_push)
|
||||
}
|
||||
fn flush(&mut self) -> IoResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl TestSink {
|
||||
fn clear(&mut self) {
|
||||
self.data = Vec::with_capacity(self.capacity);
|
||||
self.err = None;
|
||||
}
|
||||
fn set_err(&mut self, err: IoError) {
|
||||
self.err = Some(err);
|
||||
}
|
||||
fn set_capacity(&mut self, capacity: usize) {
|
||||
self.capacity = capacity;
|
||||
if self.data.len() > self.capacity {
|
||||
self.data.resize(self.capacity, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_nowrap() {
|
||||
let mut txbuf = TxBuf::new();
|
||||
let mut sink = TestSink::new();
|
||||
assert!(txbuf.is_empty());
|
||||
|
||||
assert!(txbuf.data.is_none());
|
||||
txbuf.push(&[1, 2, 3, 4]).unwrap();
|
||||
txbuf.push(&[5, 6, 7, 8]).unwrap();
|
||||
txbuf.flush_to(&mut sink).unwrap();
|
||||
assert_eq!(sink.data, [1, 2, 3, 4, 5, 6, 7, 8]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_wrap() {
|
||||
let mut txbuf = TxBuf::new();
|
||||
let mut sink = TestSink::new();
|
||||
let mut tmp: Vec<u8> = Vec::new();
|
||||
|
||||
tmp.resize(TxBuf::SIZE - 2, 0);
|
||||
txbuf.push(tmp.as_slice()).unwrap();
|
||||
txbuf.flush_to(&mut sink).unwrap();
|
||||
sink.clear();
|
||||
|
||||
txbuf.push(&[1, 2, 3, 4]).unwrap();
|
||||
assert_eq!(txbuf.flush_to(&mut sink).unwrap(), 4);
|
||||
assert_eq!(sink.data, [1, 2, 3, 4]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_error() {
|
||||
let mut txbuf = TxBuf::new();
|
||||
let mut tmp = Vec::with_capacity(TxBuf::SIZE);
|
||||
|
||||
tmp.resize(TxBuf::SIZE - 1, 0);
|
||||
txbuf.push(tmp.as_slice()).unwrap();
|
||||
match txbuf.push(&[1, 2]) {
|
||||
Err(Error::TxBufFull) => (),
|
||||
other => panic!("Unexpected result: {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_incomplete_flush() {
|
||||
let mut txbuf = TxBuf::new();
|
||||
let mut sink = TestSink::new();
|
||||
|
||||
sink.set_capacity(2);
|
||||
txbuf.push(&[1, 2, 3, 4]).unwrap();
|
||||
assert_eq!(txbuf.flush_to(&mut sink).unwrap(), 2);
|
||||
assert_eq!(txbuf.len(), 2);
|
||||
assert_eq!(sink.data, [1, 2]);
|
||||
|
||||
sink.set_capacity(4);
|
||||
assert_eq!(txbuf.flush_to(&mut sink).unwrap(), 2);
|
||||
assert!(txbuf.is_empty());
|
||||
assert_eq!(sink.data, [1, 2, 3, 4]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_flush_error() {
|
||||
const EACCESS: i32 = 13;
|
||||
|
||||
let mut txbuf = TxBuf::new();
|
||||
let mut sink = TestSink::new();
|
||||
|
||||
txbuf.push(&[1, 2, 3, 4]).unwrap();
|
||||
let io_err = IoError::from_raw_os_error(EACCESS);
|
||||
sink.set_err(io_err);
|
||||
match txbuf.flush_to(&mut sink) {
|
||||
Err(Error::TxBufFlush(ref err)) if err.kind() == ErrorKind::PermissionDenied => (),
|
||||
other => panic!("Unexpected result: {:?}", other),
|
||||
}
|
||||
}
|
||||
}
|
@ -8,6 +8,123 @@
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the THIRD-PARTY file.
|
||||
|
||||
mod csm;
|
||||
mod device;
|
||||
mod packet;
|
||||
|
||||
pub use self::device::Vsock;
|
||||
|
||||
use std::os::unix::io::RawFd;
|
||||
|
||||
use packet::VsockPacket;
|
||||
|
||||
mod defs {
|
||||
|
||||
/// Max vsock packet data/buffer size.
|
||||
pub const MAX_PKT_BUF_SIZE: usize = 64 * 1024;
|
||||
|
||||
pub mod uapi {
|
||||
|
||||
/// Vsock packet operation IDs.
|
||||
/// Defined in `/include/uapi/linux/virtio_vsock.h`.
|
||||
///
|
||||
/// Connection request.
|
||||
pub const VSOCK_OP_REQUEST: u16 = 1;
|
||||
/// Connection response.
|
||||
pub const VSOCK_OP_RESPONSE: u16 = 2;
|
||||
/// Connection reset.
|
||||
pub const VSOCK_OP_RST: u16 = 3;
|
||||
/// Connection clean shutdown.
|
||||
pub const VSOCK_OP_SHUTDOWN: u16 = 4;
|
||||
/// Connection data (read/write).
|
||||
pub const VSOCK_OP_RW: u16 = 5;
|
||||
/// Flow control credit update.
|
||||
pub const VSOCK_OP_CREDIT_UPDATE: u16 = 6;
|
||||
/// Flow control credit update request.
|
||||
pub const VSOCK_OP_CREDIT_REQUEST: u16 = 7;
|
||||
|
||||
/// Vsock packet flags.
|
||||
/// Defined in `/include/uapi/linux/virtio_vsock.h`.
|
||||
///
|
||||
/// Valid with a VSOCK_OP_SHUTDOWN packet: the packet sender will receive no more data.
|
||||
pub const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1;
|
||||
/// Valid with a VSOCK_OP_SHUTDOWN packet: the packet sender will send no more data.
|
||||
pub const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2;
|
||||
|
||||
/// Vsock packet type.
|
||||
/// Defined in `/include/uapi/linux/virtio_vsock.h`.
|
||||
///
|
||||
/// Stream / connection-oriented packet (the only currently valid type).
|
||||
pub const VSOCK_TYPE_STREAM: u16 = 1;
|
||||
|
||||
pub const VSOCK_HOST_CID: u64 = 2;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum VsockError {
|
||||
/// The vsock data/buffer virtio descriptor length is smaller than expected.
|
||||
BufDescTooSmall,
|
||||
/// The vsock data/buffer virtio descriptor is expected, but missing.
|
||||
BufDescMissing,
|
||||
/// Chained GuestMemory error.
|
||||
GuestMemory,
|
||||
/// Bounds check failed on guest memory pointer.
|
||||
GuestMemoryBounds,
|
||||
/// The vsock header descriptor length is too small.
|
||||
HdrDescTooSmall(u32),
|
||||
/// The vsock header `len` field holds an invalid value.
|
||||
InvalidPktLen(u32),
|
||||
/// A data fetch was attempted when no data was available.
|
||||
NoData,
|
||||
/// A data buffer was expected for the provided packet, but it is missing.
|
||||
PktBufMissing,
|
||||
/// Encountered an unexpected write-only virtio descriptor.
|
||||
UnreadableDescriptor,
|
||||
/// Encountered an unexpected read-only virtio descriptor.
|
||||
UnwritableDescriptor,
|
||||
}
|
||||
type Result<T> = std::result::Result<T, VsockError>;
|
||||
|
||||
/// A passive, event-driven object, that needs to be notified whenever an epoll-able event occurs.
|
||||
/// An event-polling control loop will use `get_polled_fd()` and `get_polled_evset()` to query
|
||||
/// the listener for the file descriptor and the set of events it's interested in. When such an
|
||||
/// event occurs, the control loop will route the event to the listener via `notify()`.
|
||||
///
|
||||
pub trait VsockEpollListener {
|
||||
/// Get the file descriptor the listener needs polled.
|
||||
fn get_polled_fd(&self) -> RawFd;
|
||||
|
||||
/// Get the set of events for which the listener wants to be notified.
|
||||
fn get_polled_evset(&self) -> epoll::Events;
|
||||
|
||||
/// Notify the listener that one ore more events have occurred.
|
||||
fn notify(&mut self, evset: epoll::Events);
|
||||
}
|
||||
|
||||
/// Any channel that handles vsock packet traffic: sending and receiving packets. Since we're
|
||||
/// implementing the device model here, our responsibility is to always process the sending of
|
||||
/// packets (i.e. the TX queue). So, any locally generated data, addressed to the driver (e.g.
|
||||
/// a connection response or RST), will have to be queued, until we get to processing the RX queue.
|
||||
///
|
||||
/// Note: `recv_pkt()` and `send_pkt()` are named analogous to `Read::read()` and `Write::write()`,
|
||||
/// respectively. I.e.
|
||||
/// - `recv_pkt(&mut pkt)` will read data from the channel, and place it into `pkt`; and
|
||||
/// - `send_pkt(&pkt)` will fetch data from `pkt`, and place it into the channel.
|
||||
pub trait VsockChannel {
|
||||
/// Read/receive an incoming packet from the channel.
|
||||
fn recv_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()>;
|
||||
|
||||
/// Write/send a packet through the channel.
|
||||
fn send_pkt(&mut self, pkt: &VsockPacket) -> Result<()>;
|
||||
|
||||
/// Checks whether there is pending incoming data inside the channel, meaning that a subsequent
|
||||
/// call to `recv_pkt()` won't fail.
|
||||
fn has_pending_rx(&self) -> bool;
|
||||
}
|
||||
|
||||
/// The vsock backend, which is basically an epoll-event-driven vsock channel, that needs to be
|
||||
/// sendable through a mpsc channel (the latter due to how `vmm::EpollContext` works).
|
||||
/// Currently, the only implementation we have is `crate::virtio::unix::muxer::VsockMuxer`, which
|
||||
/// translates guest-side vsock connections to host-side Unix domain socket connections.
|
||||
pub trait VsockBackend: VsockChannel + VsockEpollListener + Send {}
|
||||
|
341
vm-virtio/src/vsock/packet.rs
Normal file
341
vm-virtio/src/vsock/packet.rs
Normal file
@ -0,0 +1,341 @@
|
||||
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
/// `VsockPacket` provides a thin wrapper over the buffers exchanged via virtio queues.
|
||||
/// There are two components to a vsock packet, each using its own descriptor in a
|
||||
/// virtio queue:
|
||||
/// - the packet header; and
|
||||
/// - the packet data/buffer.
|
||||
/// There is a 1:1 relation between descriptor chains and packets: the first (chain head) holds
|
||||
/// the header, and an optional second descriptor holds the data. The second descriptor is only
|
||||
/// present for data packets (VSOCK_OP_RW).
|
||||
///
|
||||
/// `VsockPacket` wraps these two buffers and provides direct access to the data stored
|
||||
/// in guest memory. This is done to avoid unnecessarily copying data from guest memory
|
||||
/// to temporary buffers, before passing it on to the vsock backend.
|
||||
///
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
|
||||
use super::super::DescriptorChain;
|
||||
use super::defs;
|
||||
use super::{Result, VsockError};
|
||||
|
||||
// The vsock packet header is defined by the C struct:
|
||||
//
|
||||
// ```C
|
||||
// struct virtio_vsock_hdr {
|
||||
// le64 src_cid;
|
||||
// le64 dst_cid;
|
||||
// le32 src_port;
|
||||
// le32 dst_port;
|
||||
// le32 len;
|
||||
// le16 type;
|
||||
// le16 op;
|
||||
// le32 flags;
|
||||
// le32 buf_alloc;
|
||||
// le32 fwd_cnt;
|
||||
// };
|
||||
// ```
|
||||
//
|
||||
// This structed will occupy the buffer pointed to by the head descriptor. We'll be accessing it
|
||||
// as a byte slice. To that end, we define below the offsets for each field struct, as well as the
|
||||
// packed struct size, as a bunch of `usize` consts.
|
||||
// Note that these offsets are only used privately by the `VsockPacket` struct, the public interface
|
||||
// consisting of getter and setter methods, for each struct field, that will also handle the correct
|
||||
// endianess.
|
||||
|
||||
/// The vsock packet header struct size (when packed).
|
||||
pub const VSOCK_PKT_HDR_SIZE: usize = 44;
|
||||
|
||||
// Source CID.
|
||||
const HDROFF_SRC_CID: usize = 0;
|
||||
|
||||
// Destination CID.
|
||||
const HDROFF_DST_CID: usize = 8;
|
||||
|
||||
// Source port.
|
||||
const HDROFF_SRC_PORT: usize = 16;
|
||||
|
||||
// Destination port.
|
||||
const HDROFF_DST_PORT: usize = 20;
|
||||
|
||||
// Data length (in bytes) - may be 0, if there is no data buffer.
|
||||
const HDROFF_LEN: usize = 24;
|
||||
|
||||
// Socket type. Currently, only connection-oriented streams are defined by the vsock protocol.
|
||||
const HDROFF_TYPE: usize = 28;
|
||||
|
||||
// Operation ID - one of the VSOCK_OP_* values; e.g.
|
||||
// - VSOCK_OP_RW: a data packet;
|
||||
// - VSOCK_OP_REQUEST: connection request;
|
||||
// - VSOCK_OP_RST: forcefull connection termination;
|
||||
// etc (see `super::defs::uapi` for the full list).
|
||||
const HDROFF_OP: usize = 30;
|
||||
|
||||
// Additional options (flags) associated with the current operation (`op`).
|
||||
// Currently, only used with shutdown requests (VSOCK_OP_SHUTDOWN).
|
||||
const HDROFF_FLAGS: usize = 32;
|
||||
|
||||
// Size (in bytes) of the packet sender receive buffer (for the connection to which this packet
|
||||
// belongs).
|
||||
const HDROFF_BUF_ALLOC: usize = 36;
|
||||
|
||||
// Number of bytes the sender has received and consumed (for the connection to which this packet
|
||||
// belongs). For instance, for our Unix backend, this counter would be the total number of bytes
|
||||
// we have successfully written to a backing Unix socket.
|
||||
const HDROFF_FWD_CNT: usize = 40;
|
||||
|
||||
/// The vsock packet, implemented as a wrapper over a virtq descriptor chain:
|
||||
/// - the chain head, holding the packet header; and
|
||||
/// - (an optional) data/buffer descriptor, only present for data packets (VSOCK_OP_RW).
|
||||
///
|
||||
pub struct VsockPacket {
|
||||
hdr: *mut u8,
|
||||
buf: Option<*mut u8>,
|
||||
buf_size: usize,
|
||||
}
|
||||
|
||||
impl VsockPacket {
|
||||
/// Create the packet wrapper from a TX virtq chain head.
|
||||
///
|
||||
/// The chain head is expected to hold valid packet header data. A following packet buffer
|
||||
/// descriptor can optionally end the chain. Bounds and pointer checks are performed when
|
||||
/// creating the wrapper.
|
||||
///
|
||||
pub fn from_tx_virtq_head(head: &DescriptorChain) -> Result<Self> {
|
||||
// All buffers in the TX queue must be readable.
|
||||
//
|
||||
if head.is_write_only() {
|
||||
return Err(VsockError::UnreadableDescriptor);
|
||||
}
|
||||
|
||||
// The packet header should fit inside the head descriptor.
|
||||
if head.len < VSOCK_PKT_HDR_SIZE as u32 {
|
||||
return Err(VsockError::HdrDescTooSmall(head.len));
|
||||
}
|
||||
|
||||
let mut pkt = Self {
|
||||
hdr: head
|
||||
.mem
|
||||
.get_host_address(head.addr)
|
||||
.ok_or_else(|| VsockError::GuestMemory)? as *mut u8,
|
||||
buf: None,
|
||||
buf_size: 0,
|
||||
};
|
||||
|
||||
// No point looking for a data/buffer descriptor, if the packet is zero-lengthed.
|
||||
if pkt.len() == 0 {
|
||||
return Ok(pkt);
|
||||
}
|
||||
|
||||
// Reject weirdly-sized packets.
|
||||
//
|
||||
if pkt.len() > defs::MAX_PKT_BUF_SIZE as u32 {
|
||||
return Err(VsockError::InvalidPktLen(pkt.len()));
|
||||
}
|
||||
|
||||
// If the packet header showed a non-zero length, there should be a data descriptor here.
|
||||
let buf_desc = head.next_descriptor().ok_or(VsockError::BufDescMissing)?;
|
||||
|
||||
// TX data should be read-only.
|
||||
if buf_desc.is_write_only() {
|
||||
return Err(VsockError::UnreadableDescriptor);
|
||||
}
|
||||
|
||||
// The data buffer should be large enough to fit the size of the data, as described by
|
||||
// the header descriptor.
|
||||
if buf_desc.len < pkt.len() {
|
||||
return Err(VsockError::BufDescTooSmall);
|
||||
}
|
||||
|
||||
pkt.buf_size = buf_desc.len as usize;
|
||||
pkt.buf = Some(
|
||||
buf_desc
|
||||
.mem
|
||||
.get_host_address(buf_desc.addr)
|
||||
.ok_or_else(|| VsockError::GuestMemory)? as *mut u8,
|
||||
);
|
||||
|
||||
Ok(pkt)
|
||||
}
|
||||
|
||||
/// Create the packet wrapper from an RX virtq chain head.
|
||||
///
|
||||
/// There must be two descriptors in the chain, both writable: a header descriptor and a data
|
||||
/// descriptor. Bounds and pointer checks are performed when creating the wrapper.
|
||||
///
|
||||
pub fn from_rx_virtq_head(head: &DescriptorChain) -> Result<Self> {
|
||||
// All RX buffers must be writable.
|
||||
//
|
||||
if !head.is_write_only() {
|
||||
return Err(VsockError::UnwritableDescriptor);
|
||||
}
|
||||
|
||||
// The packet header should fit inside the head descriptor.
|
||||
if head.len < VSOCK_PKT_HDR_SIZE as u32 {
|
||||
return Err(VsockError::HdrDescTooSmall(head.len));
|
||||
}
|
||||
|
||||
// All RX descriptor chains should have a header and a data descriptor.
|
||||
if !head.has_next() {
|
||||
return Err(VsockError::BufDescMissing);
|
||||
}
|
||||
let buf_desc = head.next_descriptor().ok_or(VsockError::BufDescMissing)?;
|
||||
|
||||
Ok(Self {
|
||||
hdr: head
|
||||
.mem
|
||||
.get_host_address(head.addr)
|
||||
.ok_or_else(|| VsockError::GuestMemory)? as *mut u8,
|
||||
buf: Some(
|
||||
buf_desc
|
||||
.mem
|
||||
.get_host_address(buf_desc.addr)
|
||||
.ok_or_else(|| VsockError::GuestMemory)? as *mut u8,
|
||||
),
|
||||
buf_size: buf_desc.len as usize,
|
||||
})
|
||||
}
|
||||
|
||||
/// Provides in-place, byte-slice, access to the vsock packet header.
|
||||
///
|
||||
pub fn hdr(&self) -> &[u8] {
|
||||
// This is safe since bound checks have already been performed when creating the packet
|
||||
// from the virtq descriptor.
|
||||
unsafe { std::slice::from_raw_parts(self.hdr as *const u8, VSOCK_PKT_HDR_SIZE) }
|
||||
}
|
||||
|
||||
/// Provides in-place, byte-slice, mutable access to the vsock packet header.
|
||||
///
|
||||
pub fn hdr_mut(&mut self) -> &mut [u8] {
|
||||
// This is safe since bound checks have already been performed when creating the packet
|
||||
// from the virtq descriptor.
|
||||
unsafe { std::slice::from_raw_parts_mut(self.hdr, VSOCK_PKT_HDR_SIZE) }
|
||||
}
|
||||
|
||||
/// Provides in-place, byte-slice access to the vsock packet data buffer.
|
||||
///
|
||||
/// Note: control packets (e.g. connection request or reset) have no data buffer associated.
|
||||
/// For those packets, this method will return `None`.
|
||||
/// Also note: calling `len()` on the returned slice will yield the buffer size, which may be
|
||||
/// (and often is) larger than the length of the packet data. The packet data length
|
||||
/// is stored in the packet header, and accessible via `VsockPacket::len()`.
|
||||
pub fn buf(&self) -> Option<&[u8]> {
|
||||
self.buf.map(|ptr| {
|
||||
// This is safe since bound checks have already been performed when creating the packet
|
||||
// from the virtq descriptor.
|
||||
unsafe { std::slice::from_raw_parts(ptr as *const u8, self.buf_size) }
|
||||
})
|
||||
}
|
||||
|
||||
/// Provides in-place, byte-slice, mutable access to the vsock packet data buffer.
|
||||
///
|
||||
/// Note: control packets (e.g. connection request or reset) have no data buffer associated.
|
||||
/// For those packets, this method will return `None`.
|
||||
/// Also note: calling `len()` on the returned slice will yield the buffer size, which may be
|
||||
/// (and often is) larger than the length of the packet data. The packet data length
|
||||
/// is stored in the packet header, and accessible via `VsockPacket::len()`.
|
||||
pub fn buf_mut(&mut self) -> Option<&mut [u8]> {
|
||||
self.buf.map(|ptr| {
|
||||
// This is safe since bound checks have already been performed when creating the packet
|
||||
// from the virtq descriptor.
|
||||
unsafe { std::slice::from_raw_parts_mut(ptr, self.buf_size) }
|
||||
})
|
||||
}
|
||||
|
||||
pub fn src_cid(&self) -> u64 {
|
||||
LittleEndian::read_u64(&self.hdr()[HDROFF_SRC_CID..])
|
||||
}
|
||||
|
||||
pub fn set_src_cid(&mut self, cid: u64) -> &mut Self {
|
||||
LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_SRC_CID..], cid);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn dst_cid(&self) -> u64 {
|
||||
LittleEndian::read_u64(&self.hdr()[HDROFF_DST_CID..])
|
||||
}
|
||||
|
||||
pub fn set_dst_cid(&mut self, cid: u64) -> &mut Self {
|
||||
LittleEndian::write_u64(&mut self.hdr_mut()[HDROFF_DST_CID..], cid);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn src_port(&self) -> u32 {
|
||||
LittleEndian::read_u32(&self.hdr()[HDROFF_SRC_PORT..])
|
||||
}
|
||||
|
||||
pub fn set_src_port(&mut self, port: u32) -> &mut Self {
|
||||
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_SRC_PORT..], port);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn dst_port(&self) -> u32 {
|
||||
LittleEndian::read_u32(&self.hdr()[HDROFF_DST_PORT..])
|
||||
}
|
||||
|
||||
pub fn set_dst_port(&mut self, port: u32) -> &mut Self {
|
||||
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_DST_PORT..], port);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn len(&self) -> u32 {
|
||||
LittleEndian::read_u32(&self.hdr()[HDROFF_LEN..])
|
||||
}
|
||||
|
||||
pub fn set_len(&mut self, len: u32) -> &mut Self {
|
||||
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_LEN..], len);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn type_(&self) -> u16 {
|
||||
LittleEndian::read_u16(&self.hdr()[HDROFF_TYPE..])
|
||||
}
|
||||
|
||||
pub fn set_type(&mut self, type_: u16) -> &mut Self {
|
||||
LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_TYPE..], type_);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn op(&self) -> u16 {
|
||||
LittleEndian::read_u16(&self.hdr()[HDROFF_OP..])
|
||||
}
|
||||
|
||||
pub fn set_op(&mut self, op: u16) -> &mut Self {
|
||||
LittleEndian::write_u16(&mut self.hdr_mut()[HDROFF_OP..], op);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn flags(&self) -> u32 {
|
||||
LittleEndian::read_u32(&self.hdr()[HDROFF_FLAGS..])
|
||||
}
|
||||
|
||||
pub fn set_flags(&mut self, flags: u32) -> &mut Self {
|
||||
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FLAGS..], flags);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn set_flag(&mut self, flag: u32) -> &mut Self {
|
||||
self.set_flags(self.flags() | flag);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn buf_alloc(&self) -> u32 {
|
||||
LittleEndian::read_u32(&self.hdr()[HDROFF_BUF_ALLOC..])
|
||||
}
|
||||
|
||||
pub fn set_buf_alloc(&mut self, buf_alloc: u32) -> &mut Self {
|
||||
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_BUF_ALLOC..], buf_alloc);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn fwd_cnt(&self) -> u32 {
|
||||
LittleEndian::read_u32(&self.hdr()[HDROFF_FWD_CNT..])
|
||||
}
|
||||
|
||||
pub fn set_fwd_cnt(&mut self, fwd_cnt: u32) -> &mut Self {
|
||||
LittleEndian::write_u32(&mut self.hdr_mut()[HDROFF_FWD_CNT..], fwd_cnt);
|
||||
self
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user