#!/usr/bin/python3 # # SPDX-License-Identifier: LGPL-2.1-or-later # # Validates a guest AMD SEV launch measurement # # A general principle in writing this tool is that it must calculate the # expected measurement based entirely on information it receives on the CLI # from the guest owner. # # It cannot generally trust information obtained from the guest XML or from the # virtualization host OS. The main exceptions are: # # - The guest measurement # # This is a result of cryptographic operation using a shared secret known # only to the guest owner and SEV platform, not the host OS. # # - The guest policy # # This is encoded in the launch session blob that is encrypted with a shared # secret known only to the guest owner and SEV platform, not the host OS. It # is impossible for the host OS to maliciously launch a guest with different # policy and the user provided launch session blob. # # CAVEAT: the user must ALWAYS create a launch blob with freshly generated # TIK/TEK for every new VM. Re-use of the same TIK/TEK for multiple VMs # is insecure. # # - The SEV API version / build ID # # This does not have an impact on the security of the measurement, unless # the guest owner needs a guarantee that the host is not using specific # firmware versions with known flaws. # import argparse from base64 import b64decode from hashlib import sha256 import hmac import logging import re import socket import sys import traceback from lxml import etree import libvirt log = logging.getLogger() class AttestationFailedException(Exception): pass class UnsupportedUsageException(Exception): pass class InsecureUsageException(Exception): pass class IncorrectConfigException(Exception): pass class InvalidStateException(Exception): pass class ConfidentialVM(object): def __init__(self, measurement=None, api_major=None, api_minor=None, build_id=None, policy=None): self.measurement = measurement self.api_major = api_major self.api_minor = api_minor self.build_id = build_id self.policy = policy self.firmware = None self.tik = None self.tek = None def load_tik_tek(self, tik_path, tek_path): with open(tik_path, 'rb') as fh: self.tik = fh.read() log.debug("TIK(hex): %s", self.tik.hex()) if len(self.tik) != 16: raise UnsupportedUsageException( "Expected 16 bytes in TIK file, but got %d" % len(self.tik)) with open(tek_path, 'rb') as fh: self.tek = fh.read() log.debug("TEK(hex): %s", self.tek.hex()) if len(self.tek) != 16: raise UnsupportedUsageException( "Expected 16 bytes in TEK file, but got %d" % len(self.tek)) def load_tk(self, tk_path): with open(tk_path, 'rb') as fh: tk = fh.read() if len(tk) != 32: raise UnsupportedUsageException( "Expected 32 bytes in TIK/TEK file, but got %d" % len(tk)) self.tek = tk[0:16] self.tik = tk[16:32] log.debug("TIK(hex): %s", self.tik.hex()) log.debug("TEK(hex): %s", self.tek.hex()) def load_firmware(self, firmware_path): with open(firmware_path, 'rb') as fh: self.firmware = fh.read() log.debug("Firmware(sha256): %s", sha256(self.firmware).hexdigest()) # Get the full set of measured launch data for the domain # # The measured data that the guest is initialized with is the concatenation # of the following: # # - The firmware blob def get_measured_data(self): measured_data = self.firmware log.debug("Measured-data(sha256): %s", sha256(measured_data).hexdigest()) return measured_data # Get the reported and computed launch measurements for the domain # # AMD Secure Encrypted Virtualization API , section 6.5: # # measurement = HMAC(0x04 || API_MAJOR || API_MINOR || BUILD || # GCTX.POLICY || GCTX.LD || MNONCE; GCTX.TIK) # # Where GCTX.LD covers all the measured data the guest is initialized with # per get_measured_data(). def get_measurements(self): measurement = b64decode(self.measurement) reported = measurement[0:32] nonce = measurement[32:48] measured_data = self.get_measured_data() msg = ( bytes([0x4]) + self.api_major.to_bytes(1, 'little') + self.api_minor.to_bytes(1, 'little') + self.build_id.to_bytes(1, 'little') + self.policy.to_bytes(4, 'little') + sha256(measured_data).digest() + nonce ) log.debug("Measured-msg(hex): %s", msg.hex()) computed = hmac.new(self.tik, msg, 'sha256').digest() log.debug("Measurement reported(hex): %s", reported.hex()) log.debug("Measurement computed(hex): %s", computed.hex()) return reported, computed def attest(self): reported, computed = self.get_measurements() if reported != computed: raise AttestationFailedException( "Measurement does not match, VM is not trustworthy") class LibvirtConfidentialVM(ConfidentialVM): def __init__(self, **kwargs): super().__init__(**kwargs) self.conn = None self.dom = None def check_domain(self, doc, secure): ls = doc.xpath("/domain/launchSecurity[@type='sev']") if len(ls) != 1: raise IncorrectConfigException( "Domain is not configured with SEV launch security") dh = doc.xpath("/domain/launchSecurity[@type='sev']/dhCert") if len(dh) != 1: raise IncorrectConfigException( "Domain must have SEV owner cert to validate measurement") session = doc.xpath("/domain/launchSecurity[@type='sev']/session") if len(session) != 1: raise IncorrectConfigException( "Domain must have SEV session data to validate measurement") nvramnodes = doc.xpath("/domain/os/nvram") if len(nvramnodes) != 0 and secure: raise InsecureUsageException( "Domain firmware with NVRAM cannot be securely measured") loadernodes = doc.xpath("/domain/os/loader") if len(loadernodes) != 1: raise IncorrectConfigException( "Domain must have one firmware path") def load_domain(self, uri, id_name_uuid, secure, ignore_config): self.conn = libvirt.open(uri) remote = socket.gethostname() != self.conn.getHostname() if not remote and secure: raise InsecureUsageException( "running locally on the hypervisor host is not secure") if re.match(r'^\d+$', id_name_uuid): self.dom = self.conn.lookupByID(int(id_name_uuid)) elif re.match(r'^[-a-f0-9]+$', id_name_uuid): self.dom = self.conn.lookupByUUIDString(id_name_uuid) else: self.dom = self.conn.lookupByName(id_name_uuid) log.debug("VM: id=%d name=%s uuid=%s", self.dom.ID(), self.dom.name(), self.dom.UUIDString()) if not self.dom.isActive(): raise InvalidStateException( "Domain must be running to validate measurement") state = self.dom.info()[0] if state != libvirt.VIR_DOMAIN_PAUSED and secure: raise InvalidStateException( "Domain must be paused to validate measurement") xml = self.dom.XMLDesc() doc = etree.fromstring(xml) if not ignore_config: self.check_domain(doc, secure) # See comments at top of file wrt why we are OK to trust the # sev-api-major, sev-api-minor, sev-build-id and sev-policy data # reported by the host sevinfo = self.dom.launchSecurityInfo() if "sev-api-major" not in sevinfo: raise UnsupportedUsageException( "'api-major' not reported in domain launch security info") if self.measurement is None: self.measurement = sevinfo["sev-measurement"] if self.api_major is None: self.api_major = sevinfo["sev-api-major"] if self.api_minor is None: self.api_minor = sevinfo["sev-api-minor"] if self.build_id is None: self.build_id = sevinfo["sev-build-id"] if self.policy is None: self.policy = sevinfo["sev-policy"] if self.firmware is None: if remote: raise UnsupportedUsageException( "Cannot access firmware path remotely") if secure: raise InsecureUsageException( "Using firmware path from XML is not secure") loadernodes = doc.xpath("/domain/os/loader") if len(loadernodes) == 0: raise UnsupportedUsageException( "--firmware not specified and no firmware path found") self.load_firmware(loadernodes[0].text) def parse_command_line(): parser = argparse.ArgumentParser( description='Validate guest AMD SEV launch measurement') parser.add_argument('--debug', '-d', action='store_true', help='Show debug information') parser.add_argument('--quiet', '-q', action='store_true', help='Do not display status') # Arguments related to the state of the launched guest vmstate = parser.add_argument_group("Virtual machine launch state") vmstate.add_argument('--measurement', '-m', help='Measurement for the running domain') vmstate.add_argument('--api-major', type=int, help='SEV API major version for the running domain') vmstate.add_argument('--api-minor', type=int, help='SEV API minor version for the running domain') vmstate.add_argument('--build-id', type=int, help='SEV build ID for the running domain') vmstate.add_argument('--policy', type=int, help='SEV policy for the running domain') # Arguments related to calculation of the expected launch measurement vmconfig = parser.add_argument_group("Virtual machine config") vmconfig.add_argument('--firmware', '-f', help='Path to the firmware binary') vmconfig.add_argument('--tik', help='TIK file for domain') vmconfig.add_argument('--tek', help='TEK file for domain') vmconfig.add_argument('--tk', help='TEK/TIK combined file for domain') # Arguments related to the connection to libvirt vmconn = parser.add_argument_group("Libvirt guest connection") vmconn.add_argument('--connect', '-c', default="qemu:///system", help='libvirt connection URI') vmconn.add_argument('--domain', '-o', help='domain ID / Name / UUID') vmconn.add_argument('--insecure', '-i', action='store_true', help='Proceed even if usage scenario is insecure') vmconn.add_argument('--ignore-config', '-g', action='store_true', help='Do not attempt to sanity check the guest config') return parser.parse_args() # Sanity check the set of CLI args specified provide enough info for us to do # the job def check_usage(args): if args.tk is not None: if args.tik is not None or args.tek is not None: raise UnsupportedUsageException( "--tk is mutually exclusive with --tek/--tik") else: if args.tik is None or args.tek is None: raise UnsupportedUsageException( "Either --tk or both of --tek/--tik are required") if args.domain is None: if args.measurement is None: raise UnsupportedUsageException( "Either --measurement or --domain is required") if args.api_major is None: raise UnsupportedUsageException( "Either --api-major or --domain is required") if args.api_minor is None: raise UnsupportedUsageException( "Either --api-minor or --domain is required") if args.build_id is None: raise UnsupportedUsageException( "Either --build-id or --domain is required") if args.policy is None: raise UnsupportedUsageException( "Either --policy or --domain is required") if args.firmware is None: raise UnsupportedUsageException( "Either --firmware or --domain is required") def attest(args): if args.domain is None: cvm = ConfidentialVM(measurement=args.measurement, api_major=args.api_major, api_minor=args.api_minor, build_id=args.build_id, policy=args.policy) else: cvm = LibvirtConfidentialVM(measurement=args.measurement, api_major=args.api_major, api_minor=args.api_minor, build_id=args.build_id, policy=args.policy) if args.firmware is not None: cvm.load_firmware(args.firmware) if args.tk is not None: cvm.load_tk(args.tk) else: cvm.load_tik_tek(args.tik, args.tek) if args.domain is not None: cvm.load_domain(args.connect, args.domain, not args.insecure, args.ignore_config) cvm.attest() if not args.quiet: print("OK: Looks good to me") def main(): args = parse_command_line() if args.debug: logging.basicConfig(level="DEBUG") formatter = logging.Formatter("[%(levelname)s]: %(message)s") handler = log.handlers[0] handler.setFormatter(formatter) try: check_usage(args) attest(args) sys.exit(0) except AttestationFailedException as e: if args.debug: traceback.print_tb(e.__traceback__) if not args.quiet: print("ERROR: %s" % e, file=sys.stderr) sys.exit(1) except UnsupportedUsageException as e: if args.debug: traceback.print_tb(e.__traceback__) if not args.quiet: print("ERROR: %s" % e, file=sys.stderr) sys.exit(2) except InsecureUsageException as e: if not args.quiet: print("ERROR: %s" % e, file=sys.stderr) sys.exit(3) except IncorrectConfigException as e: if not args.quiet: print("ERROR: %s" % e, file=sys.stderr) sys.exit(4) except InvalidStateException as e: if not args.quiet: print("ERROR: %s" % e, file=sys.stderr) sys.exit(5) except Exception as e: if args.debug: traceback.print_tb(e.__traceback__) if not args.quiet: print("ERROR: %s" % e, file=sys.stderr) sys.exit(6) if __name__ == "__main__": main()