vhost_user_fs: add SlaveFsCacheReq to handle map/unmap

This introduces SlaveFsCacheReq which implements
VhostUserMasterReqHandler to handle map/unmap requests.

Signed-off-by: Liu Bo <bo.liu@linux.alibaba.com>
This commit is contained in:
Liu Bo 2020-02-12 15:02:01 -08:00 committed by Rob Bradford
parent be78c6da49
commit 269d660b0a
2 changed files with 102 additions and 0 deletions

View File

@ -41,6 +41,9 @@ pub use self::slave::SlaveListener;
mod slave_req_handler; mod slave_req_handler;
#[cfg(feature = "vhost-user-slave")] #[cfg(feature = "vhost-user-slave")]
pub use self::slave_req_handler::{SlaveReqHandler, VhostUserSlaveReqHandler}; pub use self::slave_req_handler::{SlaveReqHandler, VhostUserSlaveReqHandler};
#[cfg(feature = "vhost-user-slave")]
mod slave_fs_cache;
pub use self::slave_fs_cache::SlaveFsCacheReq;
pub mod sock_ctrl_msg; pub mod sock_ctrl_msg;
@ -69,6 +72,8 @@ pub enum Error {
SocketRetry(std::io::Error), SocketRetry(std::io::Error),
/// Failure from the slave side. /// Failure from the slave side.
SlaveInternalError, SlaveInternalError,
/// Failure from the master side.
MasterInternalError,
/// Virtio/protocol features mismatch. /// Virtio/protocol features mismatch.
FeatureMismatch, FeatureMismatch,
/// Error from request handler /// Error from request handler
@ -89,6 +94,7 @@ impl std::fmt::Display for Error {
Error::SocketBroken(e) => write!(f, "socket is broken: {}", e), Error::SocketBroken(e) => write!(f, "socket is broken: {}", e),
Error::SocketRetry(e) => write!(f, "temporary socket error: {}", e), Error::SocketRetry(e) => write!(f, "temporary socket error: {}", e),
Error::SlaveInternalError => write!(f, "slave internal error"), Error::SlaveInternalError => write!(f, "slave internal error"),
Error::MasterInternalError => write!(f, "Master internal error"),
Error::FeatureMismatch => write!(f, "virtio/protocol features mismatch"), Error::FeatureMismatch => write!(f, "virtio/protocol features mismatch"),
Error::ReqHandlerError(e) => write!(f, "handler failed to handle request: {}", e), Error::ReqHandlerError(e) => write!(f, "handler failed to handle request: {}", e),
} }
@ -105,6 +111,8 @@ impl Error {
Error::SocketBroken(_) => true, Error::SocketBroken(_) => true,
// Slave internal error, hope it recovers on reconnect. // Slave internal error, hope it recovers on reconnect.
Error::SlaveInternalError => true, Error::SlaveInternalError => true,
// Master internal error, hope it recovers on reconnect.
Error::MasterInternalError => true,
// Should just retry the IO operation instead of rebuilding the underline connection. // Should just retry the IO operation instead of rebuilding the underline connection.
Error::SocketRetry(_) => false, Error::SocketRetry(_) => false,
Error::InvalidParam | Error::InvalidOperation => false, Error::InvalidParam | Error::InvalidOperation => false,

View File

@ -0,0 +1,94 @@
// Copyright (C) 2020 Alibaba Cloud Computing. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use super::connection::Endpoint;
use super::message::*;
use super::{Error, HandlerResult, Result, VhostUserMasterReqHandler};
use std::io;
use std::mem;
use std::os::unix::io::RawFd;
use std::os::unix::net::UnixStream;
use std::sync::{Arc, Mutex};
struct SlaveFsCacheReqInternal {
sock: Endpoint<SlaveReq>,
}
/// A vhost-user slave endpoint which sends fs cache requests to the master
#[derive(Clone)]
pub struct SlaveFsCacheReq {
// underlying Unix domain socket for communication
node: Arc<Mutex<SlaveFsCacheReqInternal>>,
// whether the endpoint has encountered any failure
error: Option<i32>,
}
impl SlaveFsCacheReq {
fn new(ep: Endpoint<SlaveReq>) -> Self {
SlaveFsCacheReq {
node: Arc::new(Mutex::new(SlaveFsCacheReqInternal { sock: ep })),
error: None,
}
}
/// Create a new instance.
pub fn from_stream(sock: UnixStream) -> Self {
Self::new(Endpoint::<SlaveReq>::from_stream(sock))
}
fn send_message(
&mut self,
flags: SlaveReq,
fs: &VhostUserFSSlaveMsg,
fds: Option<&[RawFd]>,
) -> Result<()> {
self.check_state()?;
let len = mem::size_of::<VhostUserFSSlaveMsg>();
let mut hdr = VhostUserMsgHeader::new(flags, 0, len as u32);
hdr.set_need_reply(true);
self.node.lock().unwrap().sock.send_message(&hdr, fs, fds)?;
self.wait_for_ack(&hdr)
}
fn wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<SlaveReq>) -> Result<()> {
self.check_state()?;
let (reply, body, rfds) = self.node.lock().unwrap().sock.recv_body::<VhostUserU64>()?;
if !reply.is_reply_for(&hdr) || rfds.is_some() || !body.is_valid() {
Endpoint::<SlaveReq>::close_rfds(rfds);
return Err(Error::InvalidMessage);
}
if body.value != 0 {
return Err(Error::MasterInternalError);
}
Ok(())
}
fn check_state(&self) -> Result<()> {
match self.error {
Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))),
None => Ok(()),
}
}
/// Mark endpoint as failed with specified error code.
pub fn set_failed(&mut self, error: i32) {
self.error = Some(error);
}
}
impl VhostUserMasterReqHandler for SlaveFsCacheReq {
/// Handle virtio-fs map file requests from the slave.
fn fs_slave_map(&mut self, fs: &VhostUserFSSlaveMsg, fd: RawFd) -> HandlerResult<()> {
self.send_message(SlaveReq::FS_MAP, fs, Some(&[fd]))
.or_else(|e| Err(io::Error::new(io::ErrorKind::Other, format!("{}", e))))
}
/// Handle virtio-fs unmap file requests from the slave.
fn fs_slave_unmap(&mut self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<()> {
self.send_message(SlaveReq::FS_UNMAP, fs, None)
.or_else(|e| Err(io::Error::new(io::ErrorKind::Other, format!("{}", e))))
}
}