diff --git a/vmm/src/lib.rs b/vmm/src/lib.rs index 98a39a736..399c862ce 100644 --- a/vmm/src/lib.rs +++ b/vmm/src/lib.rs @@ -32,17 +32,22 @@ use crate::config::{ use crate::migration::{get_vm_snapshot, recv_vm_snapshot}; use crate::seccomp_filters::{get_seccomp_filter, Thread}; use crate::vm::{Error as VmError, Vm, VmState}; +use anyhow::anyhow; use libc::EFD_NONBLOCK; use seccomp::{SeccompAction, SeccompFilter}; use serde::ser::{Serialize, SerializeStruct, Serializer}; use std::fs::File; use std::io; +use std::io::{Read, Write}; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::os::unix::net::UnixListener; +use std::os::unix::net::UnixStream; use std::sync::mpsc::{Receiver, RecvError, SendError, Sender}; use std::sync::{Arc, Mutex}; use std::{result, thread}; use thiserror::Error; -use vm_migration::{MigratableError, Pausable, Snapshottable, Transportable}; +use vm_migration::protocol::*; +use vm_migration::{MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; use vmm_sys_util::eventfd::EventFd; pub mod api; @@ -642,14 +647,172 @@ impl Vmm { } } + fn vm_receive_state( + &mut self, + req: &Request, + socket: &mut T, + ) -> std::result::Result<(), MigratableError> + where + T: Read + Write, + { + // Read in state data + let mut data = Vec::with_capacity(req.length() as usize); + unsafe { + data.set_len(req.length() as usize); + } + socket + .read_exact(&mut data) + .map_err(MigratableError::MigrateSocket)?; + let snapshot: Snapshot = serde_json::from_slice(&data).map_err(|e| { + MigratableError::MigrateReceive(anyhow!("Error deserialising snapshot: {}", e)) + })?; + + // Create VM + let vm_snapshot = get_vm_snapshot(&snapshot)?; + self.vm_config = Some(Arc::clone(&vm_snapshot.config)); + let exit_evt = self.exit_evt.try_clone().map_err(|e| { + MigratableError::MigrateReceive(anyhow!("Error cloning exit EventFd: {}", e)) + })?; + let reset_evt = self.reset_evt.try_clone().map_err(|e| { + MigratableError::MigrateReceive(anyhow!("Error cloning reset EventFd: {}", e)) + })?; + let mut vm = Vm::new_from_snapshot( + &snapshot, + exit_evt, + reset_evt, + None, + false, + &self.seccomp_action, + self.hypervisor.clone(), + ) + .map_err(|e| { + MigratableError::MigrateReceive(anyhow!("Error creating VM from snapshot: {:?}", e)) + })?; + vm.restore(snapshot).map_err(|e| { + Response::error().write_to(socket).ok(); + e + })?; + self.vm = Some(vm); + + Response::ok().write_to(socket)?; + + Ok(()) + } + + fn vm_receive_memory( + &mut self, + req: &Request, + socket: &mut T, + ) -> std::result::Result<(), MigratableError> + where + T: Read + Write, + { + // Read table + let table = MemoryRangeTable::read_from(socket, req.length())?; + + // And then read the memory itself + self.vm + .as_mut() + .unwrap() + .receive_memory_regions(&table, socket) + .map_err(|e| { + Response::error().write_to(socket).ok(); + e + })?; + Response::ok().write_to(socket)?; + Ok(()) + } + fn vm_receive_migration( &mut self, receive_data_migration: VmReceiveMigrationData, ) -> result::Result<(), MigratableError> { info!( - "Migration Receiver: {}", + "Receiving migration: receiver_url = {}", receive_data_migration.receiver_url ); + + let url = url::Url::parse(&receive_data_migration.receiver_url) + .map_err(|e| MigratableError::MigrateReceive(anyhow!("Error parsing URL: {}", e)))?; + + let mut socket = match url.scheme() { + "unix" => { + let listener = UnixListener::bind(url.to_file_path().map_err(|_| { + MigratableError::MigrateReceive(anyhow!("Error extracting path from URL")) + })?) + .map_err(|e| { + MigratableError::MigrateReceive(anyhow!("Error binding to UNIX socket: {}", e)) + })?; + let (socket, _addr) = listener.accept().map_err(|e| { + MigratableError::MigrateReceive(anyhow!( + "Error accepting on UNIX socket: {}", + e + )) + })?; + socket + } + _ => { + return Err(MigratableError::MigrateReceive(anyhow!( + "Unsupported URL scheme" + ))) + } + }; + + let mut started = false; + + loop { + let req = Request::read_from(&mut socket)?; + match req.command() { + Command::Invalid => info!("Invalid Command Received"), + Command::Start => { + info!("Start Command Received"); + started = true; + + Response::ok().write_to(&mut socket)?; + } + Command::State => { + info!("State Command Received"); + + if !started { + warn!("Migration not started yet"); + Response::error().write_to(&mut socket)?; + continue; + } + + // TODO: Remove this when doing live migration + if self.vm.is_some() { + warn!("State already sent"); + Response::error().write_to(&mut socket)?; + continue; + } + + self.vm_receive_state(&req, &mut socket)?; + } + Command::Memory => { + info!("Memory Command Received"); + + if !started { + warn!("Migration not started yet"); + Response::error().write_to(&mut socket)?; + continue; + } + + self.vm_receive_memory(&req, &mut socket)?; + } + Command::Complete => { + info!("Complete Command Received"); + Response::ok().write_to(&mut socket)?; + break; + } + Command::Abandon => { + info!("Abandon Command Received"); + self.vm = None; + Response::ok().write_to(&mut socket).ok(); + break; + } + } + } + Ok(()) } @@ -657,8 +820,90 @@ impl Vmm { &mut self, send_data_migration: VmSendMigrationData, ) -> result::Result<(), MigratableError> { - info!("Migration Sender: {}", send_data_migration.destination_url); - Ok(()) + info!( + "Sending migration: destination_url = {}", + send_data_migration.destination_url + ); + if let Some(ref mut vm) = self.vm { + let url = url::Url::parse(&send_data_migration.destination_url) + .map_err(|e| MigratableError::MigrateSend(anyhow!("Error parsing URL: {}", e)))?; + let mut socket = match url.scheme() { + "unix" => UnixStream::connect(url.to_file_path().map_err(|_| { + MigratableError::MigrateSend(anyhow!("Error extracting path from URL")) + })?) + .map_err(|e| { + MigratableError::MigrateSend(anyhow!("Error connecting to UNIX socket: {}", e)) + })?, + _ => { + return Err(MigratableError::MigrateReceive(anyhow!( + "Unsupported URL scheme" + ))) + } + }; + + // Start the migration + Request::start().write_to(&mut socket)?; + let res = Response::read_from(&mut socket)?; + if res.status() != Status::Ok { + warn!("Error starting migration"); + Request::abandon().write_to(&mut socket)?; + Response::read_from(&mut socket).ok(); + return Err(MigratableError::MigrateSend(anyhow!( + "Error starting migration" + ))); + } + + // Capture snapshot and send it + let vm_snapshot = vm.snapshot()?; + let snapshot_data = serde_json::to_vec(&vm_snapshot).unwrap(); + Request::state(snapshot_data.len() as u64).write_to(&mut socket)?; + socket + .write_all(&snapshot_data) + .map_err(MigratableError::MigrateSocket)?; + let res = Response::read_from(&mut socket)?; + if res.status() != Status::Ok { + warn!("Error during state migration"); + Request::abandon().write_to(&mut socket)?; + Response::read_from(&mut socket).ok(); + return Err(MigratableError::MigrateSend(anyhow!( + "Error during state migration" + ))); + } + + // Send memory table + let table = vm.memory_range_table()?; + Request::memory(table.length()) + .write_to(&mut socket) + .unwrap(); + table.write_to(&mut socket)?; + + // And then the memory itself + vm.send_memory_regions(&table, &mut socket)?; + if res.status() != Status::Ok { + warn!("Error during memory migration"); + Request::abandon().write_to(&mut socket)?; + Response::read_from(&mut socket).ok(); + return Err(MigratableError::MigrateSend(anyhow!( + "Error during memory migration" + ))); + } + + // Complete the migration + Request::complete().write_to(&mut socket)?; + let res = Response::read_from(&mut socket)?; + if res.status() != Status::Ok { + warn!("Error completing migration"); + Request::abandon().write_to(&mut socket)?; + Response::read_from(&mut socket).ok(); + return Err(MigratableError::MigrateSend(anyhow!( + "Error completing migration" + ))); + } + info!("Migration complete"); + Ok(()) + } else { + Err(MigratableError::MigrateSend(anyhow!("VM is not running"))) + } } fn control_loop(&mut self, api_receiver: Arc>) -> Result<()> { diff --git a/vmm/src/vm.rs b/vmm/src/vm.rs index 864da7dac..2d45d95cf 100644 --- a/vmm/src/vm.rs +++ b/vmm/src/vm.rs @@ -58,7 +58,7 @@ use std::ffi::CString; #[cfg(target_arch = "x86_64")] use std::fmt; use std::fs::{File, OpenOptions}; -use std::io::{self, Write}; +use std::io::{self, Read, Write}; use std::io::{Seek, SeekFrom}; use std::num::Wrapping; use std::ops::Deref; @@ -67,10 +67,11 @@ use std::{result, str, thread}; use url::Url; use vm_device::Bus; use vm_memory::{ - Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap, - GuestRegionMmap, + Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, + GuestMemoryMmap, GuestMemoryRegion, GuestRegionMmap, }; use vm_migration::{ + protocol::{MemoryRange, MemoryRangeTable}, Migratable, MigratableError, Pausable, Snapshot, SnapshotDataSection, Snapshottable, Transportable, }; @@ -1607,6 +1608,68 @@ impl Vm { pub fn balloon_size(&self) -> u64 { self.device_manager.lock().unwrap().balloon_size() } + + pub fn receive_memory_regions( + &mut self, + ranges: &MemoryRangeTable, + fd: &mut F, + ) -> std::result::Result<(), MigratableError> + where + F: Read, + { + let guest_memory = self.memory_manager.lock().as_ref().unwrap().guest_memory(); + let mem = guest_memory.memory(); + + for range in ranges.regions() { + mem.read_exact_from(GuestAddress(range.gpa), fd, range.length as usize) + .map_err(|e| { + MigratableError::MigrateReceive(anyhow!( + "Error transferring memory to socket: {}", + e + )) + })?; + } + Ok(()) + } + + pub fn send_memory_regions( + &mut self, + ranges: &MemoryRangeTable, + fd: &mut F, + ) -> std::result::Result<(), MigratableError> + where + F: Write, + { + let guest_memory = self.memory_manager.lock().as_ref().unwrap().guest_memory(); + let mem = guest_memory.memory(); + + for range in ranges.regions() { + mem.write_all_to(GuestAddress(range.gpa), fd, range.length as usize) + .map_err(|e| { + MigratableError::MigrateSend(anyhow!( + "Error transferring memory to socket: {}", + e + )) + })?; + } + + Ok(()) + } + + pub fn memory_range_table(&self) -> std::result::Result { + let mut table = MemoryRangeTable::default(); + let guest_memory = self.memory_manager.lock().as_ref().unwrap().guest_memory(); + + guest_memory.memory().with_regions_mut(|_, region| { + table.push(MemoryRange { + gpa: region.start_addr().raw_value(), + length: region.len() as u64, + }); + Ok(()) + })?; + + Ok(table) + } } impl Pausable for Vm {