vmm: Add support for sending and receiving migration if VM is paused

This is tested by:

Source VMM:

target/debug/cloud-hypervisor --kernel ~/src/linux/vmlinux \
--pmem file=~/workloads/focal.raw --cpus boot=1 \
--memory size=2048M \
--cmdline"root=/dev/pmem0p1 console=ttyS0" --serial tty --console off \
--api-socket=/tmp/api1 -v

Destination VMM:

target/debug/cloud-hypervisor --api-socket=/tmp/api2 -v

And the following commands:

target/debug/ch-remote --api-socket=/tmp/api1 pause
target/debug/ch-remote --api-socket=/tmp/api2 receive-migration unix:/tmp/foo &
target/debug/ch-remote --api-socket=/tmp/api1 send-migration unix:/tmp/foo
target/debug/ch-remote --api-socket=/tmp/api2 resume

The VM is then responsive on the destination VMM.

Signed-off-by: Rob Bradford <robert.bradford@intel.com>
This commit is contained in:
Rob Bradford 2020-11-04 15:00:23 +00:00 committed by Samuel Ortiz
parent dfe2dadb3e
commit ca60adda70
2 changed files with 315 additions and 7 deletions

View File

@ -32,17 +32,22 @@ use crate::config::{
use crate::migration::{get_vm_snapshot, recv_vm_snapshot};
use crate::seccomp_filters::{get_seccomp_filter, Thread};
use crate::vm::{Error as VmError, Vm, VmState};
use anyhow::anyhow;
use libc::EFD_NONBLOCK;
use seccomp::{SeccompAction, SeccompFilter};
use serde::ser::{Serialize, SerializeStruct, Serializer};
use std::fs::File;
use std::io;
use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::net::UnixListener;
use std::os::unix::net::UnixStream;
use std::sync::mpsc::{Receiver, RecvError, SendError, Sender};
use std::sync::{Arc, Mutex};
use std::{result, thread};
use thiserror::Error;
use vm_migration::{MigratableError, Pausable, Snapshottable, Transportable};
use vm_migration::protocol::*;
use vm_migration::{MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
use vmm_sys_util::eventfd::EventFd;
pub mod api;
@ -642,14 +647,172 @@ impl Vmm {
}
}
fn vm_receive_state<T>(
&mut self,
req: &Request,
socket: &mut T,
) -> std::result::Result<(), MigratableError>
where
T: Read + Write,
{
// Read in state data
let mut data = Vec::with_capacity(req.length() as usize);
unsafe {
data.set_len(req.length() as usize);
}
socket
.read_exact(&mut data)
.map_err(MigratableError::MigrateSocket)?;
let snapshot: Snapshot = serde_json::from_slice(&data).map_err(|e| {
MigratableError::MigrateReceive(anyhow!("Error deserialising snapshot: {}", e))
})?;
// Create VM
let vm_snapshot = get_vm_snapshot(&snapshot)?;
self.vm_config = Some(Arc::clone(&vm_snapshot.config));
let exit_evt = self.exit_evt.try_clone().map_err(|e| {
MigratableError::MigrateReceive(anyhow!("Error cloning exit EventFd: {}", e))
})?;
let reset_evt = self.reset_evt.try_clone().map_err(|e| {
MigratableError::MigrateReceive(anyhow!("Error cloning reset EventFd: {}", e))
})?;
let mut vm = Vm::new_from_snapshot(
&snapshot,
exit_evt,
reset_evt,
None,
false,
&self.seccomp_action,
self.hypervisor.clone(),
)
.map_err(|e| {
MigratableError::MigrateReceive(anyhow!("Error creating VM from snapshot: {:?}", e))
})?;
vm.restore(snapshot).map_err(|e| {
Response::error().write_to(socket).ok();
e
})?;
self.vm = Some(vm);
Response::ok().write_to(socket)?;
Ok(())
}
fn vm_receive_memory<T>(
&mut self,
req: &Request,
socket: &mut T,
) -> std::result::Result<(), MigratableError>
where
T: Read + Write,
{
// Read table
let table = MemoryRangeTable::read_from(socket, req.length())?;
// And then read the memory itself
self.vm
.as_mut()
.unwrap()
.receive_memory_regions(&table, socket)
.map_err(|e| {
Response::error().write_to(socket).ok();
e
})?;
Response::ok().write_to(socket)?;
Ok(())
}
fn vm_receive_migration(
&mut self,
receive_data_migration: VmReceiveMigrationData,
) -> result::Result<(), MigratableError> {
info!(
"Migration Receiver: {}",
"Receiving migration: receiver_url = {}",
receive_data_migration.receiver_url
);
let url = url::Url::parse(&receive_data_migration.receiver_url)
.map_err(|e| MigratableError::MigrateReceive(anyhow!("Error parsing URL: {}", e)))?;
let mut socket = match url.scheme() {
"unix" => {
let listener = UnixListener::bind(url.to_file_path().map_err(|_| {
MigratableError::MigrateReceive(anyhow!("Error extracting path from URL"))
})?)
.map_err(|e| {
MigratableError::MigrateReceive(anyhow!("Error binding to UNIX socket: {}", e))
})?;
let (socket, _addr) = listener.accept().map_err(|e| {
MigratableError::MigrateReceive(anyhow!(
"Error accepting on UNIX socket: {}",
e
))
})?;
socket
}
_ => {
return Err(MigratableError::MigrateReceive(anyhow!(
"Unsupported URL scheme"
)))
}
};
let mut started = false;
loop {
let req = Request::read_from(&mut socket)?;
match req.command() {
Command::Invalid => info!("Invalid Command Received"),
Command::Start => {
info!("Start Command Received");
started = true;
Response::ok().write_to(&mut socket)?;
}
Command::State => {
info!("State Command Received");
if !started {
warn!("Migration not started yet");
Response::error().write_to(&mut socket)?;
continue;
}
// TODO: Remove this when doing live migration
if self.vm.is_some() {
warn!("State already sent");
Response::error().write_to(&mut socket)?;
continue;
}
self.vm_receive_state(&req, &mut socket)?;
}
Command::Memory => {
info!("Memory Command Received");
if !started {
warn!("Migration not started yet");
Response::error().write_to(&mut socket)?;
continue;
}
self.vm_receive_memory(&req, &mut socket)?;
}
Command::Complete => {
info!("Complete Command Received");
Response::ok().write_to(&mut socket)?;
break;
}
Command::Abandon => {
info!("Abandon Command Received");
self.vm = None;
Response::ok().write_to(&mut socket).ok();
break;
}
}
}
Ok(())
}
@ -657,8 +820,90 @@ impl Vmm {
&mut self,
send_data_migration: VmSendMigrationData,
) -> result::Result<(), MigratableError> {
info!("Migration Sender: {}", send_data_migration.destination_url);
info!(
"Sending migration: destination_url = {}",
send_data_migration.destination_url
);
if let Some(ref mut vm) = self.vm {
let url = url::Url::parse(&send_data_migration.destination_url)
.map_err(|e| MigratableError::MigrateSend(anyhow!("Error parsing URL: {}", e)))?;
let mut socket = match url.scheme() {
"unix" => UnixStream::connect(url.to_file_path().map_err(|_| {
MigratableError::MigrateSend(anyhow!("Error extracting path from URL"))
})?)
.map_err(|e| {
MigratableError::MigrateSend(anyhow!("Error connecting to UNIX socket: {}", e))
})?,
_ => {
return Err(MigratableError::MigrateReceive(anyhow!(
"Unsupported URL scheme"
)))
}
};
// Start the migration
Request::start().write_to(&mut socket)?;
let res = Response::read_from(&mut socket)?;
if res.status() != Status::Ok {
warn!("Error starting migration");
Request::abandon().write_to(&mut socket)?;
Response::read_from(&mut socket).ok();
return Err(MigratableError::MigrateSend(anyhow!(
"Error starting migration"
)));
}
// Capture snapshot and send it
let vm_snapshot = vm.snapshot()?;
let snapshot_data = serde_json::to_vec(&vm_snapshot).unwrap();
Request::state(snapshot_data.len() as u64).write_to(&mut socket)?;
socket
.write_all(&snapshot_data)
.map_err(MigratableError::MigrateSocket)?;
let res = Response::read_from(&mut socket)?;
if res.status() != Status::Ok {
warn!("Error during state migration");
Request::abandon().write_to(&mut socket)?;
Response::read_from(&mut socket).ok();
return Err(MigratableError::MigrateSend(anyhow!(
"Error during state migration"
)));
}
// Send memory table
let table = vm.memory_range_table()?;
Request::memory(table.length())
.write_to(&mut socket)
.unwrap();
table.write_to(&mut socket)?;
// And then the memory itself
vm.send_memory_regions(&table, &mut socket)?;
if res.status() != Status::Ok {
warn!("Error during memory migration");
Request::abandon().write_to(&mut socket)?;
Response::read_from(&mut socket).ok();
return Err(MigratableError::MigrateSend(anyhow!(
"Error during memory migration"
)));
}
// Complete the migration
Request::complete().write_to(&mut socket)?;
let res = Response::read_from(&mut socket)?;
if res.status() != Status::Ok {
warn!("Error completing migration");
Request::abandon().write_to(&mut socket)?;
Response::read_from(&mut socket).ok();
return Err(MigratableError::MigrateSend(anyhow!(
"Error completing migration"
)));
}
info!("Migration complete");
Ok(())
} else {
Err(MigratableError::MigrateSend(anyhow!("VM is not running")))
}
}
fn control_loop(&mut self, api_receiver: Arc<Receiver<ApiRequest>>) -> Result<()> {

View File

@ -58,7 +58,7 @@ use std::ffi::CString;
#[cfg(target_arch = "x86_64")]
use std::fmt;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::io::{self, Read, Write};
use std::io::{Seek, SeekFrom};
use std::num::Wrapping;
use std::ops::Deref;
@ -67,10 +67,11 @@ use std::{result, str, thread};
use url::Url;
use vm_device::Bus;
use vm_memory::{
Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap,
GuestRegionMmap,
Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic,
GuestMemoryMmap, GuestMemoryRegion, GuestRegionMmap,
};
use vm_migration::{
protocol::{MemoryRange, MemoryRangeTable},
Migratable, MigratableError, Pausable, Snapshot, SnapshotDataSection, Snapshottable,
Transportable,
};
@ -1607,6 +1608,68 @@ impl Vm {
pub fn balloon_size(&self) -> u64 {
self.device_manager.lock().unwrap().balloon_size()
}
pub fn receive_memory_regions<F>(
&mut self,
ranges: &MemoryRangeTable,
fd: &mut F,
) -> std::result::Result<(), MigratableError>
where
F: Read,
{
let guest_memory = self.memory_manager.lock().as_ref().unwrap().guest_memory();
let mem = guest_memory.memory();
for range in ranges.regions() {
mem.read_exact_from(GuestAddress(range.gpa), fd, range.length as usize)
.map_err(|e| {
MigratableError::MigrateReceive(anyhow!(
"Error transferring memory to socket: {}",
e
))
})?;
}
Ok(())
}
pub fn send_memory_regions<F>(
&mut self,
ranges: &MemoryRangeTable,
fd: &mut F,
) -> std::result::Result<(), MigratableError>
where
F: Write,
{
let guest_memory = self.memory_manager.lock().as_ref().unwrap().guest_memory();
let mem = guest_memory.memory();
for range in ranges.regions() {
mem.write_all_to(GuestAddress(range.gpa), fd, range.length as usize)
.map_err(|e| {
MigratableError::MigrateSend(anyhow!(
"Error transferring memory to socket: {}",
e
))
})?;
}
Ok(())
}
pub fn memory_range_table(&self) -> std::result::Result<MemoryRangeTable, MigratableError> {
let mut table = MemoryRangeTable::default();
let guest_memory = self.memory_manager.lock().as_ref().unwrap().guest_memory();
guest_memory.memory().with_regions_mut(|_, region| {
table.push(MemoryRange {
gpa: region.start_addr().raw_value(),
length: region.len() as u64,
});
Ok(())
})?;
Ok(table)
}
}
impl Pausable for Vm {