mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-01-18 02:25:18 +00:00
b348f37445
It is possible to build OVMF for SEV with an embedded Grub that can fetch LUKS disk secrets. This adds support for injecting secrets in the required format. Reviewed-by: Cole Robinson <crobinso@redhat.com> Reviewed-by: Ján Tomko <jtomko@redhat.com> Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
1336 lines
45 KiB
Python
Executable File
1336 lines
45 KiB
Python
Executable File
#!/usr/bin/python3
|
||
#
|
||
# SPDX-License-Identifier: LGPL-2.1-or-later
|
||
#
|
||
# Validates a guest AMD SEV launch measurement
|
||
#
|
||
# A general principle in writing this tool is that it must calculate the
|
||
# expected measurement based entirely on information it receives on the CLI
|
||
# from the guest owner.
|
||
#
|
||
# It cannot generally trust information obtained from the guest XML or from the
|
||
# virtualization host OS. The main exceptions are:
|
||
#
|
||
# - The guest measurement
|
||
#
|
||
# This is a result of cryptographic operation using a shared secret known
|
||
# only to the guest owner and SEV platform, not the host OS.
|
||
#
|
||
# - The guest policy
|
||
#
|
||
# This is encoded in the launch session blob that is encrypted with a shared
|
||
# secret known only to the guest owner and SEV platform, not the host OS. It
|
||
# is impossible for the host OS to maliciously launch a guest with different
|
||
# policy and the user provided launch session blob.
|
||
#
|
||
# CAVEAT: the user must ALWAYS create a launch blob with freshly generated
|
||
# TIK/TEK for every new VM. Re-use of the same TIK/TEK for multiple VMs
|
||
# is insecure.
|
||
#
|
||
# - The SEV API version / build ID
|
||
#
|
||
# This does not have an impact on the security of the measurement, unless
|
||
# the guest owner needs a guarantee that the host is not using specific
|
||
# firmware versions with known flaws.
|
||
#
|
||
|
||
import abc
|
||
import argparse
|
||
from base64 import b64decode, b64encode
|
||
from hashlib import sha256
|
||
import hmac
|
||
import logging
|
||
import os
|
||
import re
|
||
import socket
|
||
from struct import pack
|
||
import sys
|
||
import traceback
|
||
from uuid import UUID
|
||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||
|
||
|
||
from lxml import etree
|
||
import libvirt
|
||
|
||
log = logging.getLogger()
|
||
|
||
|
||
class AttestationFailedException(Exception):
|
||
pass
|
||
|
||
|
||
class UnsupportedUsageException(Exception):
|
||
pass
|
||
|
||
|
||
class InsecureUsageException(Exception):
|
||
pass
|
||
|
||
|
||
class IncorrectConfigException(Exception):
|
||
pass
|
||
|
||
|
||
class InvalidStateException(Exception):
|
||
pass
|
||
|
||
|
||
class Field(object):
|
||
U8 = 0
|
||
U16 = 2
|
||
U32 = 4
|
||
U64 = 8
|
||
|
||
SCALAR = 0
|
||
BITMASK = 1
|
||
ARRAY = 2
|
||
|
||
def __init__(self, name, size, fmt, value, order):
|
||
self.name = name
|
||
self.size = size
|
||
self.value = value
|
||
self.fmt = fmt
|
||
self.order = order
|
||
|
||
|
||
class Struct(object):
|
||
def __init__(self, size):
|
||
self._fields = {}
|
||
self.size = size
|
||
|
||
def register_field(self, name, size, fmt=Field.SCALAR, defvalue=0):
|
||
self._fields[name] = Field(name, size, fmt,
|
||
defvalue, len(self.fields))
|
||
|
||
@property
|
||
def fields(self):
|
||
return sorted(self._fields.values(), key=lambda f: f.order)
|
||
|
||
def __getattr__(self, name):
|
||
return self._fields[name]
|
||
|
||
def __setattr__(self, name, value):
|
||
if name in ["_fields", "size"]:
|
||
super().__setattr__(name, value)
|
||
else:
|
||
self._fields[name].value = value
|
||
|
||
def binary_format(self):
|
||
fmt = ["<"]
|
||
datalen = 0
|
||
for field in self.fields:
|
||
if field.size == Field.U8:
|
||
if field.fmt == Field.ARRAY:
|
||
datalen += len(field.value)
|
||
fmt += ["%dB" % len(field.value)]
|
||
else:
|
||
datalen += 1
|
||
fmt += ["B"]
|
||
elif field.size == Field.U16:
|
||
datalen += 2
|
||
fmt += ["H"]
|
||
elif field.size == Field.U32:
|
||
datalen += 4
|
||
fmt += ["L"]
|
||
elif field.size == Field.U64:
|
||
datalen += 8
|
||
fmt += ["Q"]
|
||
|
||
pad = self.size - datalen
|
||
assert self.size >= 1
|
||
fmt += ["%dB" % pad]
|
||
|
||
return "".join(fmt), pad
|
||
|
||
def pack(self):
|
||
fmt, pad = self.binary_format()
|
||
|
||
values = []
|
||
for field in self.fields:
|
||
if field.size == Field.U8 and field.fmt == Field.ARRAY:
|
||
for _, k in enumerate(field.value):
|
||
values.append(k)
|
||
else:
|
||
values.append(field.value)
|
||
values.extend([0] * pad)
|
||
|
||
return pack(fmt, *values)
|
||
|
||
|
||
class VMSA(Struct):
|
||
ATTR_G_SHIFT = 23
|
||
ATTR_G_MASK = (1 << ATTR_G_SHIFT)
|
||
ATTR_B_SHIFT = 22
|
||
ATTR_B_MASK = (1 << ATTR_B_SHIFT)
|
||
ATTR_L_SHIFT = 21
|
||
ATTR_L_MASK = (1 << ATTR_L_SHIFT)
|
||
ATTR_AVL_SHIFT = 20
|
||
ATTR_AVL_MASK = (1 << ATTR_AVL_SHIFT)
|
||
ATTR_P_SHIFT = 15
|
||
ATTR_P_MASK = (1 << ATTR_P_SHIFT)
|
||
ATTR_DPL_SHIFT = 13
|
||
ATTR_DPL_MASK = (3 << ATTR_DPL_SHIFT)
|
||
ATTR_S_SHIFT = 12
|
||
ATTR_S_MASK = (1 << ATTR_S_SHIFT)
|
||
ATTR_TYPE_SHIFT = 8
|
||
ATTR_TYPE_MASK = (15 << ATTR_TYPE_SHIFT)
|
||
ATTR_A_MASK = (1 << 8)
|
||
|
||
ATTR_CS_MASK = (1 << 11)
|
||
ATTR_C_MASK = (1 << 10)
|
||
ATTR_R_MASK = (1 << 9)
|
||
|
||
ATTR_E_MASK = (1 << 10)
|
||
ATTR_W_MASK = (1 << 9)
|
||
|
||
def __init__(self):
|
||
super().__init__(4096)
|
||
|
||
# From Linux arch/x86/include/asm/svm.h, we're unpacking the
|
||
# struct vmcb_save_area
|
||
|
||
self.register_field("es_selector", Field.U16)
|
||
self.register_field("es_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("es_limit", Field.U32)
|
||
self.register_field("es_base", Field.U64)
|
||
|
||
self.register_field("cs_selector", Field.U16)
|
||
self.register_field("cs_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("cs_limit", Field.U32)
|
||
self.register_field("cs_base", Field.U64)
|
||
|
||
self.register_field("ss_selector", Field.U16)
|
||
self.register_field("ss_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("ss_limit", Field.U32)
|
||
self.register_field("ss_base", Field.U64)
|
||
|
||
self.register_field("ds_selector", Field.U16)
|
||
self.register_field("ds_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("ds_limit", Field.U32)
|
||
self.register_field("ds_base", Field.U64)
|
||
|
||
self.register_field("fs_selector", Field.U16)
|
||
self.register_field("fs_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("fs_limit", Field.U32)
|
||
self.register_field("fs_base", Field.U64)
|
||
|
||
self.register_field("gs_selector", Field.U16)
|
||
self.register_field("gs_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("gs_limit", Field.U32)
|
||
self.register_field("gs_base", Field.U64)
|
||
|
||
self.register_field("gdtr_selector", Field.U16)
|
||
self.register_field("gdtr_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("gdtr_limit", Field.U32)
|
||
self.register_field("gdtr_base", Field.U64)
|
||
|
||
self.register_field("ldtr_selector", Field.U16)
|
||
self.register_field("ldtr_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("ldtr_limit", Field.U32)
|
||
self.register_field("ldtr_base", Field.U64)
|
||
|
||
self.register_field("idtr_selector", Field.U16)
|
||
self.register_field("idtr_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("idtr_limit", Field.U32)
|
||
self.register_field("idtr_base", Field.U64)
|
||
|
||
self.register_field("tr_selector", Field.U16)
|
||
self.register_field("tr_attrib", Field.U16, Field.BITMASK)
|
||
self.register_field("tr_limit", Field.U32)
|
||
self.register_field("tr_base", Field.U64)
|
||
|
||
self.register_field("reserved_1",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 43))
|
||
|
||
self.register_field("cpl", Field.U8)
|
||
|
||
self.register_field("reserved_2",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 4))
|
||
|
||
self.register_field("efer", Field.U64)
|
||
|
||
self.register_field("reserved_3",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 104))
|
||
|
||
self.register_field("xss", Field.U64)
|
||
self.register_field("cr4", Field.U64)
|
||
self.register_field("cr3", Field.U64)
|
||
self.register_field("cr0", Field.U64)
|
||
self.register_field("dr7", Field.U64)
|
||
self.register_field("dr6", Field.U64)
|
||
self.register_field("rflags", Field.U64)
|
||
self.register_field("rip", Field.U64)
|
||
|
||
self.register_field("reserved_4",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 88))
|
||
|
||
self.register_field("rsp", Field.U64)
|
||
|
||
self.register_field("reserved_5",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 24))
|
||
|
||
self.register_field("rax", Field.U64)
|
||
self.register_field("star", Field.U64)
|
||
self.register_field("lstar", Field.U64)
|
||
self.register_field("cstar", Field.U64)
|
||
self.register_field("sfmask", Field.U64)
|
||
self.register_field("kernel_gs_base", Field.U64)
|
||
self.register_field("sysenter_cs", Field.U64)
|
||
self.register_field("sysenter_esp", Field.U64)
|
||
self.register_field("sysenter_eip", Field.U64)
|
||
self.register_field("cr2", Field.U64)
|
||
|
||
self.register_field("reserved_6",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 32))
|
||
|
||
self.register_field("g_pat", Field.U64)
|
||
self.register_field("dbgctl", Field.U64)
|
||
self.register_field("br_from", Field.U64)
|
||
self.register_field("br_to", Field.U64)
|
||
self.register_field("last_excp_from", Field.U64)
|
||
self.register_field("last_excp_to", Field.U64)
|
||
|
||
self.register_field("reserved_7",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 72))
|
||
|
||
self.register_field("spec_ctrl", Field.U32)
|
||
|
||
self.register_field("reserved_7b",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 4))
|
||
|
||
self.register_field("pkru", Field.U32)
|
||
|
||
self.register_field("reserved_7a",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 20))
|
||
|
||
self.register_field("reserved_8", Field.U64) # rax duplicate
|
||
|
||
self.register_field("rcx", Field.U64)
|
||
self.register_field("rdx", Field.U64, Field.BITMASK)
|
||
self.register_field("rbx", Field.U64)
|
||
|
||
self.register_field("reserved_9", Field.U64) # rsp duplicate
|
||
|
||
self.register_field("rbp", Field.U64)
|
||
self.register_field("rsi", Field.U64)
|
||
self.register_field("rdi", Field.U64)
|
||
self.register_field("r8", Field.U64)
|
||
self.register_field("r9", Field.U64)
|
||
self.register_field("r10", Field.U64)
|
||
self.register_field("r11", Field.U64)
|
||
self.register_field("r12", Field.U64)
|
||
self.register_field("r13", Field.U64)
|
||
self.register_field("r14", Field.U64)
|
||
self.register_field("r15", Field.U64)
|
||
|
||
self.register_field("reserved_10",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 16))
|
||
|
||
self.register_field("sw_exit_code", Field.U64)
|
||
self.register_field("sw_exit_info_1", Field.U64)
|
||
self.register_field("sw_exit_info_2", Field.U64)
|
||
self.register_field("sw_scratch", Field.U64)
|
||
|
||
self.register_field("reserved_11",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 56))
|
||
|
||
self.register_field("xcr0", Field.U64)
|
||
self.register_field("valid_bitmap",
|
||
Field.U8, Field.ARRAY, bytearray([0] * 16))
|
||
self.register_field("x87_state_gpa",
|
||
Field.U64)
|
||
|
||
def amd64_cpu_init(self):
|
||
# AMD64 Architecture Programmer’s Manual
|
||
# Volume 2: System Programming.
|
||
#
|
||
# 14.1.3 Processor Initialization State
|
||
#
|
||
# Values after INIT
|
||
|
||
self.cr0 = (1 << 4)
|
||
self.rip = 0xfff0
|
||
|
||
self.cs_selector = 0xf000
|
||
self.cs_base = 0xffff0000
|
||
self.cs_limit = 0xffff
|
||
|
||
self.ds_limit = 0xffff
|
||
|
||
self.es_limit = 0xffff
|
||
self.fs_limit = 0xffff
|
||
self.gs_limit = 0xffff
|
||
self.ss_limit = 0xffff
|
||
|
||
self.gdtr_limit = 0xffff
|
||
self.idtr_limit = 0xffff
|
||
|
||
self.ldtr_limit = 0xffff
|
||
self.tr_limit = 0xffff
|
||
|
||
self.dr6 = 0xffff0ff0
|
||
self.dr7 = 0x0400
|
||
self.rflags = 0x2
|
||
self.xcr0 = 0x1
|
||
|
||
def kvm_cpu_init(self):
|
||
# svm_set_cr4() sets guest X86_CR4_MCE bit if host
|
||
# has X86_CR4_MCE enabled
|
||
self.cr4 = 0x40
|
||
|
||
# svm_set_efer sets guest EFER_SVME (Secure Virtual Machine enable)
|
||
self.efer = 0x1000
|
||
|
||
# init_vmcb + init_sys_seg() sets
|
||
# SVM_SELECTOR_P_MASK | SEG_TYPE_LDT
|
||
self.ldtr_attrib = 0x0082
|
||
|
||
# init_vmcb + init_sys_seg() sets
|
||
# SVM_SELECTOR_P_MASK | SEG_TYPE_BUSY_TSS16
|
||
self.tr_attrib = 0x0083
|
||
|
||
# kvm_arch_vcpu_create() in arch/x86/kvm/x86.c
|
||
self.g_pat = 0x0007040600070406
|
||
|
||
def qemu_cpu_init(self):
|
||
# Based on logic in x86_cpu_reset()
|
||
#
|
||
# file target/i386/cpu.c
|
||
|
||
def attr(mask):
|
||
return (mask >> VMSA.ATTR_TYPE_SHIFT)
|
||
|
||
self.ldtr_attrib = attr(VMSA.ATTR_P_MASK |
|
||
(2 << VMSA.ATTR_TYPE_SHIFT))
|
||
self.tr_attrib = attr(VMSA.ATTR_P_MASK |
|
||
(11 << VMSA.ATTR_TYPE_SHIFT))
|
||
self.cs_attrib = attr(VMSA.ATTR_P_MASK |
|
||
VMSA.ATTR_S_MASK |
|
||
VMSA.ATTR_CS_MASK |
|
||
VMSA.ATTR_R_MASK |
|
||
VMSA.ATTR_A_MASK)
|
||
self.ds_attrib = attr(VMSA.ATTR_P_MASK |
|
||
VMSA.ATTR_S_MASK |
|
||
VMSA.ATTR_W_MASK |
|
||
VMSA.ATTR_A_MASK)
|
||
self.es_attrib = attr(VMSA.ATTR_P_MASK |
|
||
VMSA.ATTR_S_MASK |
|
||
VMSA.ATTR_W_MASK |
|
||
VMSA.ATTR_A_MASK)
|
||
self.ss_attrib = attr(VMSA.ATTR_P_MASK |
|
||
VMSA.ATTR_S_MASK |
|
||
VMSA.ATTR_W_MASK |
|
||
VMSA.ATTR_A_MASK)
|
||
self.fs_attrib = attr(VMSA.ATTR_P_MASK |
|
||
VMSA.ATTR_S_MASK |
|
||
VMSA.ATTR_W_MASK |
|
||
VMSA.ATTR_A_MASK)
|
||
self.gs_attrib = attr(VMSA.ATTR_P_MASK |
|
||
VMSA.ATTR_S_MASK |
|
||
VMSA.ATTR_W_MASK |
|
||
VMSA.ATTR_A_MASK)
|
||
|
||
self.g_pat = 0x0007040600070406
|
||
|
||
def cpu_sku(self, family, model, stepping):
|
||
stepping &= 0xf
|
||
model &= 0xff
|
||
family &= 0xfff
|
||
|
||
self.rdx.value = stepping
|
||
|
||
if family > 0xf:
|
||
self.rdx.value |= 0xf00 | ((family - 0x0f) << 20)
|
||
else:
|
||
self.rdx.value |= family << 8
|
||
|
||
self.rdx.value |= ((model & 0xf) << 4) | ((model >> 4) << 16)
|
||
|
||
def reset_addr(self, reset_addr):
|
||
reset_cs = reset_addr & 0xffff0000
|
||
reset_ip = reset_addr & 0x0000ffff
|
||
|
||
self.rip.value = reset_ip
|
||
self.cs_base.value = reset_cs
|
||
|
||
|
||
class OVMF(object):
|
||
|
||
OVMF_TABLE_FOOTER_GUID = UUID("96b582de-1fb2-45f7-baea-a366c55a082d")
|
||
SEV_INFO_BLOCK_GUID = UUID("00f771de-1a7e-4fcb-890e-68c77e2fb44e")
|
||
|
||
def __init__(self):
|
||
self.entries = {}
|
||
|
||
def load(self, content):
|
||
expect = OVMF.OVMF_TABLE_FOOTER_GUID.bytes_le
|
||
actual = content[-48:-32]
|
||
if expect != actual:
|
||
raise Exception("OVMF footer GUID not found")
|
||
|
||
tablelen = int.from_bytes(content[-50:-48], byteorder='little')
|
||
|
||
if tablelen == 0:
|
||
raise Exception("OVMF tables zero length")
|
||
|
||
table = content[-(32 + tablelen):-50]
|
||
|
||
self.parse_table(table)
|
||
|
||
def parse_table(self, data):
|
||
while len(data) > 0:
|
||
entryuuid = UUID(bytes_le=data[-16:])
|
||
entrylen = int.from_bytes(data[-18:-16], byteorder='little')
|
||
entrydata = data[-entrylen:-18]
|
||
|
||
self.entries[str(entryuuid)] = entrydata
|
||
|
||
data = data[0:-entrylen]
|
||
|
||
def reset_addr(self):
|
||
if str(OVMF.SEV_INFO_BLOCK_GUID) not in self.entries:
|
||
raise Exception("SEV info block GUID not found")
|
||
|
||
reset_addr = int.from_bytes(
|
||
self.entries[str(OVMF.SEV_INFO_BLOCK_GUID)], "little")
|
||
return reset_addr
|
||
|
||
|
||
class GUIDTable(abc.ABC):
|
||
GUID_LEN = 16
|
||
|
||
def __init__(self, guid, lenlen=2):
|
||
self.guid = guid
|
||
self.lenlen = lenlen
|
||
|
||
@abc.abstractmethod
|
||
def entries(self):
|
||
pass
|
||
|
||
def build_entry(self, guid, payload, lenlen):
|
||
dummylen = int(0).to_bytes(lenlen, 'little')
|
||
entry = bytearray(guid + dummylen + payload)
|
||
|
||
lenle = len(entry).to_bytes(lenlen, 'little')
|
||
entry[self.GUID_LEN:(self.GUID_LEN + lenlen)] = lenle
|
||
|
||
return bytes(entry)
|
||
|
||
def build(self):
|
||
payload = self.entries()
|
||
|
||
if len(payload) == 0:
|
||
return bytes([])
|
||
|
||
dummylen = int(0).to_bytes(self.lenlen, 'little')
|
||
table = bytearray(self.guid + dummylen + payload)
|
||
|
||
guidlen = len(table).to_bytes(self.lenlen, 'little')
|
||
table[self.GUID_LEN:(self.GUID_LEN + self.lenlen)] = guidlen
|
||
|
||
pad = 16 - (len(table) % 16)
|
||
table += bytes([0]) * pad
|
||
|
||
log.debug("Table(hex): %s", bytes(table).hex())
|
||
return bytes(table)
|
||
|
||
|
||
class KernelTable(GUIDTable):
|
||
|
||
TABLE_GUID = UUID('{9438d606-4f22-4cc9-b479-a793-d411fd21}').bytes_le
|
||
KERNEL_GUID = UUID('{4de79437-abd2-427f-b835-d5b1-72d2045b}').bytes_le
|
||
INITRD_GUID = UUID('{44baf731-3a2f-4bd7-9af1-41e2-9169781d}').bytes_le
|
||
CMDLINE_GUID = UUID('{97d02dd8-bd20-4c94-aa78-e771-4d36ab2a}').bytes_le
|
||
|
||
def __init__(self):
|
||
super().__init__(guid=self.TABLE_GUID,
|
||
lenlen=2)
|
||
|
||
self.kernel = None
|
||
self.initrd = None
|
||
self.cmdline = None
|
||
|
||
def load_kernel(self, path):
|
||
with open(path, "rb") as fh:
|
||
self.kernel = sha256(fh.read()).digest()
|
||
|
||
def load_initrd(self, path):
|
||
with open(path, "rb") as fh:
|
||
self.initrd = sha256(fh.read()).digest()
|
||
|
||
def load_cmdline(self, val):
|
||
self.cmdline = sha256(val.encode("utf8") + bytes([0])).digest()
|
||
|
||
def entries(self):
|
||
entries = bytes([])
|
||
if self.kernel is None:
|
||
return entries
|
||
|
||
if self.initrd is None:
|
||
self.initrd = sha256(bytes([])).digest()
|
||
if self.cmdline is None:
|
||
self.cmdline = sha256(bytes([0])).digest()
|
||
|
||
log.debug("Kernel(sha256): %s", self.kernel.hex())
|
||
log.debug("Initrd(sha256): %s", self.initrd.hex())
|
||
log.debug("Cmdline(sha256): %s", self.cmdline.hex())
|
||
entries += self.build_entry(self.CMDLINE_GUID, self.cmdline, 2)
|
||
entries += self.build_entry(self.INITRD_GUID, self.initrd, 2)
|
||
entries += self.build_entry(self.KERNEL_GUID, self.kernel, 2)
|
||
|
||
return entries
|
||
|
||
|
||
class SecretsTable(GUIDTable):
|
||
|
||
TABLE_GUID = UUID('{1e74f542-71dd-4d66-963e-ef4287ff173b}').bytes_le
|
||
|
||
GUID_ALIASES = {
|
||
"luks-key": UUID('{736869e5-84f0-4973-92ec-06879ce3da0b}')
|
||
}
|
||
|
||
def __init__(self):
|
||
super().__init__(guid=self.TABLE_GUID,
|
||
lenlen=4)
|
||
self.secrets = {}
|
||
|
||
def load_secret(self, alias_or_guid, path):
|
||
guid = None
|
||
if re.match(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
|
||
alias_or_guid):
|
||
guid = UUID(alias_or_guid)
|
||
else:
|
||
if alias_or_guid not in self.GUID_ALIASES:
|
||
raise UnsupportedUsageException(
|
||
"Secret alias '%s' is not known" % alias_or_guid)
|
||
|
||
guid = self.GUID_ALIASES[alias_or_guid]
|
||
|
||
if guid in self.secrets:
|
||
raise UnsupportedUsageException(
|
||
"Secret for GUID %s already loaded" % guid)
|
||
|
||
with open(path, 'rb') as fh:
|
||
self.secrets[guid] = fh.read()
|
||
|
||
def entries(self):
|
||
entries = bytes([])
|
||
for guid, value in self.secrets.items():
|
||
entries += self.build_entry(guid.bytes_le, value, 4)
|
||
return entries
|
||
|
||
|
||
class ConfidentialVM(abc.ABC):
|
||
POLICY_BIT_SEV_ES = 2
|
||
POLICY_VAL_SEV_ES = (1 << POLICY_BIT_SEV_ES)
|
||
|
||
def __init__(self,
|
||
measurement=None,
|
||
api_major=None,
|
||
api_minor=None,
|
||
build_id=None,
|
||
policy=None,
|
||
num_cpus=None):
|
||
self.measurement = measurement
|
||
self.api_major = api_major
|
||
self.api_minor = api_minor
|
||
self.build_id = build_id
|
||
self.policy = policy
|
||
|
||
self.firmware = None
|
||
self.tik = None
|
||
self.tek = None
|
||
|
||
self.num_cpus = num_cpus
|
||
self.vmsa_cpu0 = None
|
||
self.vmsa_cpu1 = None
|
||
|
||
self.kernel_table = KernelTable()
|
||
self.secrets_table = SecretsTable()
|
||
|
||
def is_sev_es(self):
|
||
return self.policy & self.POLICY_VAL_SEV_ES
|
||
|
||
def load_tik_tek(self, tik_path, tek_path):
|
||
with open(tik_path, 'rb') as fh:
|
||
self.tik = fh.read()
|
||
log.debug("TIK(hex): %s", self.tik.hex())
|
||
|
||
if len(self.tik) != 16:
|
||
raise UnsupportedUsageException(
|
||
"Expected 16 bytes in TIK file, but got %d" % len(self.tik))
|
||
|
||
with open(tek_path, 'rb') as fh:
|
||
self.tek = fh.read()
|
||
log.debug("TEK(hex): %s", self.tek.hex())
|
||
|
||
if len(self.tek) != 16:
|
||
raise UnsupportedUsageException(
|
||
"Expected 16 bytes in TEK file, but got %d" % len(self.tek))
|
||
|
||
def load_tk(self, tk_path):
|
||
with open(tk_path, 'rb') as fh:
|
||
tk = fh.read()
|
||
|
||
if len(tk) != 32:
|
||
raise UnsupportedUsageException(
|
||
"Expected 32 bytes in TIK/TEK file, but got %d" % len(tk))
|
||
|
||
self.tek = tk[0:16]
|
||
self.tik = tk[16:32]
|
||
log.debug("TIK(hex): %s", self.tik.hex())
|
||
log.debug("TEK(hex): %s", self.tek.hex())
|
||
|
||
def load_firmware(self, firmware_path):
|
||
with open(firmware_path, 'rb') as fh:
|
||
self.firmware = fh.read()
|
||
log.debug("Firmware(sha256): %s", sha256(self.firmware).hexdigest())
|
||
|
||
@staticmethod
|
||
def _load_vmsa(path):
|
||
with open(path, 'rb') as fh:
|
||
vmsa = fh.read()
|
||
|
||
if len(vmsa) != 4096:
|
||
raise UnsupportedUsageException(
|
||
"VMSA must be 4096 bytes in length")
|
||
return vmsa
|
||
|
||
def load_vmsa_cpu0(self, path):
|
||
self.vmsa_cpu0 = self._load_vmsa(path)
|
||
log.debug("VMSA CPU 0(sha256): %s",
|
||
sha256(self.vmsa_cpu0).hexdigest())
|
||
|
||
def load_vmsa_cpu1(self, path):
|
||
self.vmsa_cpu1 = self._load_vmsa(path)
|
||
log.debug("VMSA CPU 1(sha256): %s",
|
||
sha256(self.vmsa_cpu1).hexdigest())
|
||
|
||
def build_vmsas(self, family, model, stepping):
|
||
ovmf = OVMF()
|
||
ovmf.load(self.firmware)
|
||
|
||
vmsa = VMSA()
|
||
vmsa.amd64_cpu_init()
|
||
vmsa.kvm_cpu_init()
|
||
vmsa.qemu_cpu_init()
|
||
|
||
vmsa.cpu_sku(family, model, stepping)
|
||
|
||
self.vmsa_cpu0 = vmsa.pack()
|
||
log.debug("VMSA CPU 0(sha256): %s",
|
||
sha256(self.vmsa_cpu0).hexdigest())
|
||
|
||
vmsa.reset_addr(ovmf.reset_addr())
|
||
self.vmsa_cpu1 = vmsa.pack()
|
||
log.debug("VMSA CPU 1(sha256): %s",
|
||
sha256(self.vmsa_cpu1).hexdigest())
|
||
|
||
def get_cpu_state(self):
|
||
if self.num_cpus is None:
|
||
raise UnsupportedUsageException(
|
||
"Number of virtual CPUs must be specified for SEV-ES domain")
|
||
|
||
if self.vmsa_cpu0 is None:
|
||
raise UnsupportedUsageException(
|
||
"VMSA for boot CPU required for SEV-ES domain")
|
||
|
||
if self.num_cpus > 1 and self.vmsa_cpu1 is None:
|
||
raise UnsupportedUsageException(
|
||
"VMSA for additional CPUs required for SEV-ES domain with SMP")
|
||
|
||
vmsa = self.vmsa_cpu0 + (self.vmsa_cpu1 * (self.num_cpus - 1))
|
||
log.debug("VMSA(sha256): %s", sha256(vmsa).hexdigest())
|
||
return vmsa
|
||
|
||
# Get the full set of measured launch data for the domain
|
||
#
|
||
# The measured data that the guest is initialized with is the concatenation
|
||
# of the following:
|
||
#
|
||
# - The firmware blob
|
||
# - The kernel GUID table
|
||
def get_measured_data(self):
|
||
measured_data = (self.firmware +
|
||
self.kernel_table.build())
|
||
if self.is_sev_es():
|
||
measured_data += self.get_cpu_state()
|
||
log.debug("Measured-data(sha256): %s",
|
||
sha256(measured_data).hexdigest())
|
||
return measured_data
|
||
|
||
# Get the reported and computed launch measurements for the domain
|
||
#
|
||
# AMD Secure Encrypted Virtualization API , section 6.5:
|
||
#
|
||
# measurement = HMAC(0x04 || API_MAJOR || API_MINOR || BUILD ||
|
||
# GCTX.POLICY || GCTX.LD || MNONCE; GCTX.TIK)
|
||
#
|
||
# Where GCTX.LD covers all the measured data the guest is initialized with
|
||
# per get_measured_data().
|
||
def get_measurements(self):
|
||
measurement = b64decode(self.measurement)
|
||
reported = measurement[0:32]
|
||
nonce = measurement[32:48]
|
||
|
||
measured_data = self.get_measured_data()
|
||
msg = (
|
||
bytes([0x4]) +
|
||
self.api_major.to_bytes(1, 'little') +
|
||
self.api_minor.to_bytes(1, 'little') +
|
||
self.build_id.to_bytes(1, 'little') +
|
||
self.policy.to_bytes(4, 'little') +
|
||
sha256(measured_data).digest() +
|
||
nonce
|
||
)
|
||
log.debug("Measured-msg(hex): %s", msg.hex())
|
||
|
||
computed = hmac.new(self.tik, msg, 'sha256').digest()
|
||
|
||
log.debug("Measurement reported(hex): %s", reported.hex())
|
||
log.debug("Measurement computed(hex): %s", computed.hex())
|
||
|
||
return reported, computed
|
||
|
||
def attest(self):
|
||
reported, computed = self.get_measurements()
|
||
|
||
if reported != computed:
|
||
raise AttestationFailedException(
|
||
"Measurement does not match, VM is not trustworthy")
|
||
|
||
def build_secrets(self):
|
||
measurement, _ = self.get_measurements()
|
||
|
||
iv = os.urandom(16)
|
||
|
||
secret_table = self.secrets_table.build()
|
||
|
||
cipher = Cipher(algorithms.AES(self.tek), modes.CTR(iv))
|
||
enc = cipher.encryptor()
|
||
secret_table_ciphertext = (enc.update(secret_table) +
|
||
enc.finalize())
|
||
|
||
flags = 0
|
||
|
||
##
|
||
# Table 55. LAUNCH_SECRET Packet Header Buffer
|
||
##
|
||
header = (
|
||
flags.to_bytes(4, byteorder='little') +
|
||
iv
|
||
)
|
||
|
||
# AMD Secure Encrypted Virtualization API , section 6.6
|
||
#
|
||
# hdrmac = HMAC(0x01 || FLAGS || IV || GUEST_LENGTH ||
|
||
# TRANS_LENGTH || DATA ||
|
||
# MEASURE; GCTX.TIK)
|
||
#
|
||
msg = (
|
||
bytes([0x01]) +
|
||
flags.to_bytes(4, byteorder='little') +
|
||
iv +
|
||
len(secret_table).to_bytes(4, byteorder='little') +
|
||
len(secret_table).to_bytes(4, byteorder='little') +
|
||
secret_table_ciphertext +
|
||
measurement
|
||
)
|
||
|
||
h = hmac.new(self.tik, msg, 'sha256')
|
||
header = (
|
||
flags.to_bytes(4, byteorder='little') +
|
||
iv +
|
||
h.digest()
|
||
)
|
||
|
||
header64 = b64encode(header).decode('utf8')
|
||
secret64 = b64encode(secret_table_ciphertext).decode('utf8')
|
||
log.debug("Header: %s (%d bytes)", header64, len(header))
|
||
log.debug("Secret: %s (%d bytes)",
|
||
secret64, len(secret_table_ciphertext))
|
||
|
||
return header64, secret64
|
||
|
||
@abc.abstractmethod
|
||
def inject_secrets(self):
|
||
pass
|
||
|
||
|
||
class OfflineConfidentialVM(ConfidentialVM):
|
||
def __init__(self,
|
||
secret_header=None,
|
||
secret_payload=None,
|
||
**kwargs):
|
||
super().__init__(**kwargs)
|
||
|
||
self.secret_header = secret_header
|
||
self.secret_payload = secret_payload
|
||
|
||
def inject_secrets(self):
|
||
header64, secret64 = self.build_secrets()
|
||
|
||
with open(self.secret_header, "wb") as fh:
|
||
fh.write(header64.encode('utf8'))
|
||
with open(self.secret_payload, "wb") as fh:
|
||
fh.write(secret64.encode('utf8'))
|
||
|
||
|
||
class LibvirtConfidentialVM(ConfidentialVM):
|
||
def __init__(self, **kwargs):
|
||
super().__init__(**kwargs)
|
||
|
||
self.conn = None
|
||
self.dom = None
|
||
|
||
def check_domain(self, doc, secure):
|
||
ls = doc.xpath("/domain/launchSecurity[@type='sev']")
|
||
if len(ls) != 1:
|
||
raise IncorrectConfigException(
|
||
"Domain is not configured with SEV launch security")
|
||
|
||
dh = doc.xpath("/domain/launchSecurity[@type='sev']/dhCert")
|
||
if len(dh) != 1:
|
||
raise IncorrectConfigException(
|
||
"Domain must have SEV owner cert to validate measurement")
|
||
|
||
session = doc.xpath("/domain/launchSecurity[@type='sev']/session")
|
||
if len(session) != 1:
|
||
raise IncorrectConfigException(
|
||
"Domain must have SEV session data to validate measurement")
|
||
|
||
nvramnodes = doc.xpath("/domain/os/nvram")
|
||
if len(nvramnodes) != 0 and secure:
|
||
raise InsecureUsageException(
|
||
"Domain firmware with NVRAM cannot be securely measured")
|
||
|
||
loadernodes = doc.xpath("/domain/os/loader")
|
||
if len(loadernodes) != 1:
|
||
raise IncorrectConfigException(
|
||
"Domain must have one firmware path")
|
||
|
||
measure_kernel_nodes = doc.xpath(
|
||
"/domain/launchSecurity[@type='sev']/@kernelHashes")
|
||
measure_kernel = False
|
||
if len(measure_kernel_nodes) == 1:
|
||
if measure_kernel_nodes[0] == "yes":
|
||
measure_kernel = True
|
||
|
||
xp_kernel = "/domain/os/kernel"
|
||
xp_initrd = "/domain/os/initrd"
|
||
xp_cmdline = "/domain/os/cmdline"
|
||
kern_nodes = (doc.xpath(xp_kernel) +
|
||
doc.xpath(xp_initrd) +
|
||
doc.xpath(xp_cmdline))
|
||
if not measure_kernel:
|
||
if len(self.kernel_table.entries()) != 0:
|
||
raise UnsupportedUsageException(
|
||
"kernel/initrd/cmdline provided but kernel "
|
||
"measurement not enabled")
|
||
|
||
# Check for an insecure scenario
|
||
if len(kern_nodes) != 0 and secure:
|
||
raise InsecureUsageException(
|
||
"direct kernel boot present without measurement")
|
||
else:
|
||
if len(kern_nodes) == 0:
|
||
raise IncorrectConfigException(
|
||
"kernel/initrd/cmdline not provided but kernel "
|
||
"measurement is enabled")
|
||
|
||
def load_domain(self, uri, id_name_uuid, secure, ignore_config):
|
||
self.conn = libvirt.open(uri)
|
||
|
||
remote = socket.gethostname() != self.conn.getHostname()
|
||
if not remote and secure:
|
||
raise InsecureUsageException(
|
||
"running locally on the hypervisor host is not secure")
|
||
|
||
if re.match(r'^\d+$', id_name_uuid):
|
||
self.dom = self.conn.lookupByID(int(id_name_uuid))
|
||
elif re.match(r'^[-a-f0-9]+$', id_name_uuid):
|
||
self.dom = self.conn.lookupByUUIDString(id_name_uuid)
|
||
else:
|
||
self.dom = self.conn.lookupByName(id_name_uuid)
|
||
|
||
log.debug("VM: id=%d name=%s uuid=%s",
|
||
self.dom.ID(), self.dom.name(), self.dom.UUIDString())
|
||
|
||
if not self.dom.isActive():
|
||
raise InvalidStateException(
|
||
"Domain must be running to validate measurement")
|
||
|
||
state = self.dom.info()[0]
|
||
if state != libvirt.VIR_DOMAIN_PAUSED and secure:
|
||
raise InvalidStateException(
|
||
"Domain must be paused to validate measurement")
|
||
|
||
xml = self.dom.XMLDesc()
|
||
|
||
doc = etree.fromstring(xml)
|
||
if not ignore_config:
|
||
self.check_domain(doc, secure)
|
||
|
||
# See comments at top of file wrt why we are OK to trust the
|
||
# sev-api-major, sev-api-minor, sev-build-id and sev-policy data
|
||
# reported by the host
|
||
sevinfo = self.dom.launchSecurityInfo()
|
||
|
||
if "sev-api-major" not in sevinfo:
|
||
raise UnsupportedUsageException(
|
||
"'api-major' not reported in domain launch security info")
|
||
|
||
if self.measurement is None:
|
||
self.measurement = sevinfo["sev-measurement"]
|
||
if self.api_major is None:
|
||
self.api_major = sevinfo["sev-api-major"]
|
||
if self.api_minor is None:
|
||
self.api_minor = sevinfo["sev-api-minor"]
|
||
if self.build_id is None:
|
||
self.build_id = sevinfo["sev-build-id"]
|
||
if self.policy is None:
|
||
self.policy = sevinfo["sev-policy"]
|
||
|
||
if self.is_sev_es() and self.num_cpus is None:
|
||
if secure:
|
||
raise InsecureUsageException(
|
||
"Using CPU count from guest is not secure")
|
||
|
||
info = self.dom.info()
|
||
self.num_cpus = info[3]
|
||
|
||
if self.firmware is None:
|
||
if remote:
|
||
raise UnsupportedUsageException(
|
||
"Cannot access firmware path remotely")
|
||
if secure:
|
||
raise InsecureUsageException(
|
||
"Using firmware path from XML is not secure")
|
||
|
||
loadernodes = doc.xpath("/domain/os/loader")
|
||
if len(loadernodes) == 0:
|
||
raise UnsupportedUsageException(
|
||
"--firmware not specified and no firmware path found")
|
||
|
||
self.load_firmware(loadernodes[0].text)
|
||
|
||
if self.kernel_table.kernel is None:
|
||
kernelnodes = doc.xpath("/domain/os/kernel")
|
||
if len(kernelnodes) != 0:
|
||
if remote:
|
||
raise UnsupportedUsageException(
|
||
"Cannot access kernel path remotely")
|
||
if secure:
|
||
raise InsecureUsageException(
|
||
"Using kernel path from XML is not secure")
|
||
self.kernel_table.load_kernel(kernelnodes[0].text)
|
||
|
||
if self.kernel_table.initrd is None:
|
||
initrdnodes = doc.xpath("/domain/os/initrd")
|
||
if len(initrdnodes) != 0:
|
||
if remote:
|
||
raise UnsupportedUsageException(
|
||
"Cannot access initrd path remotely")
|
||
if secure:
|
||
raise InsecureUsageException(
|
||
"Using initrd path from XML is not secure")
|
||
self.kernel_table.load_initrd(initrdnodes[0].text)
|
||
|
||
if self.kernel_table.cmdline is None:
|
||
cmdlinenodes = doc.xpath("/domain/os/cmdline")
|
||
if len(cmdlinenodes) != 0:
|
||
if secure:
|
||
raise InsecureUsageException(
|
||
"Using cmdline string from XML is not secure")
|
||
self.kernel_table.load_cmdline(cmdlinenodes[0].text)
|
||
|
||
capsxml = self.conn.getCapabilities()
|
||
capsdoc = etree.fromstring(capsxml)
|
||
|
||
if self.is_sev_es() and self.vmsa_cpu0 is None:
|
||
if secure:
|
||
raise InsecureUsageException(
|
||
"Using CPU SKU from capabilities is not secure")
|
||
|
||
sig = capsdoc.xpath("/capabilities/host/cpu/signature")
|
||
if len(sig) != 1:
|
||
raise UnsupportedUsageException(
|
||
"Libvirt is too old to report host CPU signature")
|
||
|
||
cpu_family = int(sig[0].get("family"))
|
||
cpu_model = int(sig[0].get("model"))
|
||
cpu_stepping = int(sig[0].get("stepping"))
|
||
self.build_vmsas(cpu_family, cpu_model, cpu_stepping)
|
||
|
||
def inject_secrets(self):
|
||
header64, secret64 = self.build_secrets()
|
||
|
||
params = {"sev-secret": secret64,
|
||
"sev-secret-header": header64}
|
||
self.dom.setLaunchSecurityState(params, 0)
|
||
self.dom.resume()
|
||
|
||
|
||
def parse_command_line():
|
||
parser = argparse.ArgumentParser(
|
||
description='Validate guest AMD SEV launch measurement')
|
||
parser.add_argument('--debug', '-d', action='store_true',
|
||
help='Show debug information')
|
||
parser.add_argument('--quiet', '-q', action='store_true',
|
||
help='Do not display status')
|
||
|
||
# Arguments related to the state of the launched guest
|
||
vmstate = parser.add_argument_group("Virtual machine launch state")
|
||
vmstate.add_argument('--measurement', '-m',
|
||
help='Measurement for the running domain')
|
||
vmstate.add_argument('--api-major', type=int,
|
||
help='SEV API major version for the running domain')
|
||
vmstate.add_argument('--api-minor', type=int,
|
||
help='SEV API minor version for the running domain')
|
||
vmstate.add_argument('--build-id', type=int,
|
||
help='SEV build ID for the running domain')
|
||
vmstate.add_argument('--policy', type=int,
|
||
help='SEV policy for the running domain')
|
||
|
||
# Arguments related to calculation of the expected launch measurement
|
||
vmconfig = parser.add_argument_group("Virtual machine config")
|
||
vmconfig.add_argument('--firmware', '-f',
|
||
help='Path to the firmware binary')
|
||
vmconfig.add_argument('--kernel', '-k',
|
||
help='Path to the kernel binary')
|
||
vmconfig.add_argument('--initrd', '-r',
|
||
help='Path to the initrd binary')
|
||
vmconfig.add_argument('--cmdline', '-e',
|
||
help='Cmdline string booted with')
|
||
vmconfig.add_argument('--num-cpus', '-n', type=int,
|
||
help='Number of virtual CPUs')
|
||
vmconfig.add_argument('--vmsa-cpu0', '-0',
|
||
help='VMSA state for the boot CPU')
|
||
vmconfig.add_argument('--vmsa-cpu1', '-1',
|
||
help='VMSA state for the additional CPUs')
|
||
vmconfig.add_argument('--cpu-family', type=int,
|
||
help='Hypervisor host CPU family number')
|
||
vmconfig.add_argument('--cpu-model', type=int,
|
||
help='Hypervisor host CPU model number')
|
||
vmconfig.add_argument('--cpu-stepping', type=int,
|
||
help='Hypervisor host CPU stepping number')
|
||
vmconfig.add_argument('--tik',
|
||
help='TIK file for domain')
|
||
vmconfig.add_argument('--tek',
|
||
help='TEK file for domain')
|
||
vmconfig.add_argument('--tk',
|
||
help='TEK/TIK combined file for domain')
|
||
|
||
# Arguments related to the connection to libvirt
|
||
vmconn = parser.add_argument_group("Libvirt guest connection")
|
||
vmconn.add_argument('--connect', '-c', default="qemu:///system",
|
||
help='libvirt connection URI')
|
||
vmconn.add_argument('--domain', '-o',
|
||
help='domain ID / Name / UUID')
|
||
vmconn.add_argument('--insecure', '-i', action='store_true',
|
||
help='Proceed even if usage scenario is insecure')
|
||
vmconn.add_argument('--ignore-config', '-g', action='store_true',
|
||
help='Do not attempt to sanity check the guest config')
|
||
|
||
# Arguments related to secret injection
|
||
inject = parser.add_argument_group("Secret injection parameters")
|
||
inject.add_argument('--inject-secret', '-s', action='append', default=[],
|
||
help='ALIAS-OR-GUID:PATH file containing secret to inject')
|
||
inject.add_argument('--secret-payload',
|
||
help='Path to file to write secret data payload to')
|
||
inject.add_argument('--secret-header',
|
||
help='Path to file to write secret data header to')
|
||
|
||
return parser.parse_args()
|
||
|
||
|
||
# Sanity check the set of CLI args specified provide enough info for us to do
|
||
# the job
|
||
def check_usage(args):
|
||
if args.tk is not None:
|
||
if args.tik is not None or args.tek is not None:
|
||
raise UnsupportedUsageException(
|
||
"--tk is mutually exclusive with --tek/--tik")
|
||
else:
|
||
if args.tik is None or args.tek is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --tk or both of --tek/--tik are required")
|
||
|
||
if args.domain is None:
|
||
if args.measurement is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --measurement or --domain is required")
|
||
|
||
if args.api_major is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --api-major or --domain is required")
|
||
|
||
if args.api_minor is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --api-minor or --domain is required")
|
||
|
||
if args.build_id is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --build-id or --domain is required")
|
||
|
||
if args.policy is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --policy or --domain is required")
|
||
|
||
if args.firmware is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --firmware or --domain is required")
|
||
|
||
if len(args.inject_secret) > 0:
|
||
if args.secret_header is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --secret-header or --domain is required")
|
||
|
||
if args.secret_payload is None:
|
||
raise UnsupportedUsageException(
|
||
"Either --secret-payload or --domain is required")
|
||
|
||
if args.kernel is None:
|
||
if args.initrd is not None or args.cmdline is not None:
|
||
raise UnsupportedUsageException(
|
||
"--initrd/--cmdline require --kernel")
|
||
|
||
sku = [args.cpu_family, args.cpu_model, args.cpu_stepping]
|
||
if sku.count(None) == len(sku):
|
||
if args.vmsa_cpu1 is not None and args.vmsa_cpu0 is None:
|
||
raise UnsupportedUsageException(
|
||
"VMSA for additional CPU also requires VMSA for boot CPU")
|
||
else:
|
||
if args.vmsa_cpu0 is not None or args.vmsa_cpu1 is not None:
|
||
raise UnsupportedUsageException(
|
||
"VMSA files are mutually exclusive with CPU SKU")
|
||
|
||
if sku.count(None) != 0:
|
||
raise UnsupportedUsageException(
|
||
"CPU SKU needs family, model and stepping for SEV-ES domain")
|
||
|
||
secret = [args.secret_payload, args.secret_header]
|
||
if secret.count(None) > 0 and secret.count(None) != len(secret):
|
||
raise UnsupportedUsageException(
|
||
"Both --secret-payload and --secret-header are required")
|
||
|
||
|
||
def attest(args):
|
||
if args.domain is None:
|
||
cvm = OfflineConfidentialVM(measurement=args.measurement,
|
||
api_major=args.api_major,
|
||
api_minor=args.api_minor,
|
||
build_id=args.build_id,
|
||
policy=args.policy,
|
||
num_cpus=args.num_cpus,
|
||
secret_header=args.secret_header,
|
||
secret_payload=args.secret_payload)
|
||
else:
|
||
cvm = LibvirtConfidentialVM(measurement=args.measurement,
|
||
api_major=args.api_major,
|
||
api_minor=args.api_minor,
|
||
build_id=args.build_id,
|
||
policy=args.policy,
|
||
num_cpus=args.num_cpus)
|
||
|
||
if args.firmware is not None:
|
||
cvm.load_firmware(args.firmware)
|
||
|
||
if args.tk is not None:
|
||
cvm.load_tk(args.tk)
|
||
else:
|
||
cvm.load_tik_tek(args.tik, args.tek)
|
||
|
||
if args.kernel is not None:
|
||
cvm.kernel_table.load_kernel(args.kernel)
|
||
|
||
if args.initrd is not None:
|
||
cvm.kernel_table.load_initrd(args.initrd)
|
||
|
||
if args.cmdline is not None:
|
||
cvm.kernel_table.load_cmdline(args.cmdline)
|
||
|
||
if args.vmsa_cpu0 is not None:
|
||
cvm.load_vmsa_cpu0(args.vmsa_cpu0)
|
||
|
||
if args.vmsa_cpu1 is not None:
|
||
cvm.load_vmsa_cpu1(args.vmsa_cpu1)
|
||
|
||
if args.cpu_family is not None:
|
||
cvm.build_vmsas(args.cpu_family,
|
||
args.cpu_model,
|
||
args.cpu_stepping)
|
||
|
||
if args.domain is not None:
|
||
cvm.load_domain(args.connect,
|
||
args.domain,
|
||
not args.insecure,
|
||
args.ignore_config)
|
||
|
||
cvm.attest()
|
||
if not args.quiet:
|
||
print("OK: Looks good to me")
|
||
|
||
for secret in args.inject_secret:
|
||
bits = secret.split(":")
|
||
if len(bits) != 2:
|
||
raise UnsupportedUsageException(
|
||
"Expecting ALIAS-OR-GUID:PATH for injected secret")
|
||
|
||
cvm.secrets_table.load_secret(bits[0], bits[1])
|
||
|
||
if len(args.inject_secret) > 0:
|
||
cvm.inject_secrets()
|
||
if not args.quiet:
|
||
print("OK: Injected %d secrets" % len(args.inject_secret))
|
||
|
||
|
||
def main():
|
||
args = parse_command_line()
|
||
if args.debug:
|
||
logging.basicConfig(level="DEBUG")
|
||
formatter = logging.Formatter("[%(levelname)s]: %(message)s")
|
||
handler = log.handlers[0]
|
||
handler.setFormatter(formatter)
|
||
|
||
try:
|
||
check_usage(args)
|
||
|
||
attest(args)
|
||
|
||
sys.exit(0)
|
||
except AttestationFailedException as e:
|
||
if args.debug:
|
||
traceback.print_tb(e.__traceback__)
|
||
if not args.quiet:
|
||
print("ERROR: %s" % e, file=sys.stderr)
|
||
sys.exit(1)
|
||
except UnsupportedUsageException as e:
|
||
if args.debug:
|
||
traceback.print_tb(e.__traceback__)
|
||
if not args.quiet:
|
||
print("ERROR: %s" % e, file=sys.stderr)
|
||
sys.exit(2)
|
||
except InsecureUsageException as e:
|
||
if not args.quiet:
|
||
print("ERROR: %s" % e, file=sys.stderr)
|
||
sys.exit(3)
|
||
except IncorrectConfigException as e:
|
||
if not args.quiet:
|
||
print("ERROR: %s" % e, file=sys.stderr)
|
||
sys.exit(4)
|
||
except InvalidStateException as e:
|
||
if not args.quiet:
|
||
print("ERROR: %s" % e, file=sys.stderr)
|
||
sys.exit(5)
|
||
except Exception as e:
|
||
if args.debug:
|
||
traceback.print_tb(e.__traceback__)
|
||
if not args.quiet:
|
||
print("ERROR: %s" % e, file=sys.stderr)
|
||
sys.exit(6)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|