From 269d660b0a1d8cbb8e3a58d432b765abd2396f21 Mon Sep 17 00:00:00 2001 From: Liu Bo Date: Wed, 12 Feb 2020 15:02:01 -0800 Subject: [PATCH] vhost_user_fs: add SlaveFsCacheReq to handle map/unmap This introduces SlaveFsCacheReq which implements VhostUserMasterReqHandler to handle map/unmap requests. Signed-off-by: Liu Bo --- vhost_rs/src/vhost_user/mod.rs | 8 ++ vhost_rs/src/vhost_user/slave_fs_cache.rs | 94 +++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 vhost_rs/src/vhost_user/slave_fs_cache.rs diff --git a/vhost_rs/src/vhost_user/mod.rs b/vhost_rs/src/vhost_user/mod.rs index a03204d8e..b09a192cc 100644 --- a/vhost_rs/src/vhost_user/mod.rs +++ b/vhost_rs/src/vhost_user/mod.rs @@ -41,6 +41,9 @@ pub use self::slave::SlaveListener; mod slave_req_handler; #[cfg(feature = "vhost-user-slave")] pub use self::slave_req_handler::{SlaveReqHandler, VhostUserSlaveReqHandler}; +#[cfg(feature = "vhost-user-slave")] +mod slave_fs_cache; +pub use self::slave_fs_cache::SlaveFsCacheReq; pub mod sock_ctrl_msg; @@ -69,6 +72,8 @@ pub enum Error { SocketRetry(std::io::Error), /// Failure from the slave side. SlaveInternalError, + /// Failure from the master side. + MasterInternalError, /// Virtio/protocol features mismatch. FeatureMismatch, /// Error from request handler @@ -89,6 +94,7 @@ impl std::fmt::Display for Error { Error::SocketBroken(e) => write!(f, "socket is broken: {}", e), Error::SocketRetry(e) => write!(f, "temporary socket error: {}", e), Error::SlaveInternalError => write!(f, "slave internal error"), + Error::MasterInternalError => write!(f, "Master internal error"), Error::FeatureMismatch => write!(f, "virtio/protocol features mismatch"), Error::ReqHandlerError(e) => write!(f, "handler failed to handle request: {}", e), } @@ -105,6 +111,8 @@ impl Error { Error::SocketBroken(_) => true, // Slave internal error, hope it recovers on reconnect. Error::SlaveInternalError => true, + // Master internal error, hope it recovers on reconnect. + Error::MasterInternalError => true, // Should just retry the IO operation instead of rebuilding the underline connection. Error::SocketRetry(_) => false, Error::InvalidParam | Error::InvalidOperation => false, diff --git a/vhost_rs/src/vhost_user/slave_fs_cache.rs b/vhost_rs/src/vhost_user/slave_fs_cache.rs new file mode 100644 index 000000000..a4a1e5ee2 --- /dev/null +++ b/vhost_rs/src/vhost_user/slave_fs_cache.rs @@ -0,0 +1,94 @@ +// Copyright (C) 2020 Alibaba Cloud Computing. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::connection::Endpoint; +use super::message::*; +use super::{Error, HandlerResult, Result, VhostUserMasterReqHandler}; +use std::io; +use std::mem; +use std::os::unix::io::RawFd; +use std::os::unix::net::UnixStream; +use std::sync::{Arc, Mutex}; + +struct SlaveFsCacheReqInternal { + sock: Endpoint, +} + +/// A vhost-user slave endpoint which sends fs cache requests to the master +#[derive(Clone)] +pub struct SlaveFsCacheReq { + // underlying Unix domain socket for communication + node: Arc>, + + // whether the endpoint has encountered any failure + error: Option, +} + +impl SlaveFsCacheReq { + fn new(ep: Endpoint) -> Self { + SlaveFsCacheReq { + node: Arc::new(Mutex::new(SlaveFsCacheReqInternal { sock: ep })), + error: None, + } + } + + /// Create a new instance. + pub fn from_stream(sock: UnixStream) -> Self { + Self::new(Endpoint::::from_stream(sock)) + } + + fn send_message( + &mut self, + flags: SlaveReq, + fs: &VhostUserFSSlaveMsg, + fds: Option<&[RawFd]>, + ) -> Result<()> { + self.check_state()?; + + let len = mem::size_of::(); + let mut hdr = VhostUserMsgHeader::new(flags, 0, len as u32); + hdr.set_need_reply(true); + self.node.lock().unwrap().sock.send_message(&hdr, fs, fds)?; + + self.wait_for_ack(&hdr) + } + + fn wait_for_ack(&mut self, hdr: &VhostUserMsgHeader) -> Result<()> { + self.check_state()?; + let (reply, body, rfds) = self.node.lock().unwrap().sock.recv_body::()?; + if !reply.is_reply_for(&hdr) || rfds.is_some() || !body.is_valid() { + Endpoint::::close_rfds(rfds); + return Err(Error::InvalidMessage); + } + if body.value != 0 { + return Err(Error::MasterInternalError); + } + Ok(()) + } + + fn check_state(&self) -> Result<()> { + match self.error { + Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))), + None => Ok(()), + } + } + + /// Mark endpoint as failed with specified error code. + pub fn set_failed(&mut self, error: i32) { + self.error = Some(error); + } +} + +impl VhostUserMasterReqHandler for SlaveFsCacheReq { + /// Handle virtio-fs map file requests from the slave. + fn fs_slave_map(&mut self, fs: &VhostUserFSSlaveMsg, fd: RawFd) -> HandlerResult<()> { + self.send_message(SlaveReq::FS_MAP, fs, Some(&[fd])) + .or_else(|e| Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) + } + + /// Handle virtio-fs unmap file requests from the slave. + fn fs_slave_unmap(&mut self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<()> { + self.send_message(SlaveReq::FS_UNMAP, fs, None) + .or_else(|e| Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) + } +}