# SPDX-License-Identifier: BSD-2
from .internal.utils import _lib_version_atleast
if not _lib_version_atleast("tss2-fapi", "3.0.0"):
raise NotImplementedError("FAPI Not installed or version is not 3.0.0")
import contextlib
import json
import logging
import os
import tempfile
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from ._libtpm2_pytss import ffi, lib
from .callbacks import Callback, CallbackType, get_callback, unlock_callback
from .fapi_info import FapiInfo
from .internal.utils import _chkrc, _check_bug_fixed, _get_dptr, _to_bytes_or_null
from .TCTI import TCTI
from .types import TPM2B_PUBLIC, TPM2B_PRIVATE
logger = logging.getLogger(__name__)
ffi_malloc = ffi.new_allocator(free=None)
FAPI_CONFIG_ENV = "TSS2_FAPICONF"
FAPI_CONFIG_PATHS = [
"/etc/tpm2-tss/fapi-config.json",
"/usr/local/etc/tpm2-tss/fapi-config.json",
]
class FAPIConfig(contextlib.ExitStack):
"""Context to create a temporary Fapi environment."""
def __init__(self, config: Optional[dict] = None, temp_dirs: bool = True, **kwargs):
f"""Create a temporary Fapi environment. Get the fapi_conf in this order:
* `config` if given
* File specified with environment variable `{FAPI_CONFIG_ENV}` if defined
* Installed config at `{FAPI_CONFIG_PATHS}`
Single entries are overridden if additional named arguments are given
and/or if `temp_dirs` is True.
Args:
config (dict, optional): Fapi configuration to use instead of the installed `fapi-config.json`. Defaults to None.
temp_dirs (bool, optional): Create temporary keystore and log directories and set the respective config entries. Defaults to True.
**kwargs: Single configuration entries which override those in `config` or `fapi-config.json`.
"""
super().__init__()
self.config_env_backup = None
self.config_tmp_path = None
self.config = config
# Return if no custom fapi config is used
if not (config is not None or temp_dirs or kwargs):
return
if self.config is None:
# Load the currently active fapi-config.json
config_path = os.environ.get(FAPI_CONFIG_ENV, None)
if config_path is None:
for p in FAPI_CONFIG_PATHS:
try:
with open(p) as file:
self.config = json.load(file)
break
except FileNotFoundError:
# keep trying
pass
if self.config is None:
raise RuntimeError(
f"Could not find fapi config at {FAPI_CONFIG_PATHS}, "
f"set env var {FAPI_CONFIG_ENV}"
)
else:
with open(config_path) as file:
self.config = json.load(file)
self.config = {**self.config, **kwargs}
if temp_dirs:
temp_dir_config = {
"user_dir": self.enter_context(tempfile.TemporaryDirectory()),
"system_dir": self.enter_context(tempfile.TemporaryDirectory()),
"log_dir": self.enter_context(tempfile.TemporaryDirectory()),
}
conflicting_keys = [k for k in temp_dir_config.keys() if k in kwargs]
if conflicting_keys:
raise ValueError(
f"Conflicting config entries from temp_dirs and **kwargs: {conflicting_keys}"
)
self.config = {**self.config, **temp_dir_config}
fapi_conf_file = tempfile.NamedTemporaryFile(mode="w", delete=False)
self.config_tmp_path = fapi_conf_file.name
fapi_conf_file.write(json.dumps(self.config))
fapi_conf_file.close
logger.debug(
f"Creating FAPIConfig: {self.config_tmp_path}:\n{json.dumps(self.config, indent=4)}"
)
# Set fapi config env variable
self.config_env_backup = os.environ.get(FAPI_CONFIG_ENV, None)
os.environ[FAPI_CONFIG_ENV] = self.config_tmp_path
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
del os.environ[FAPI_CONFIG_ENV]
if self.config_tmp_path is not None:
os.unlink(self.config_tmp_path)
[docs]class FAPI:
"""The TPM2 Feature API. This class can be used as a python context or be closed manually via
:meth:`~tpm2_pytss.FAPI.close`.
"""
def __init__(self, uri: Optional[Union[bytes, str]] = None):
self.encoding = "utf-8"
self._ctx_pp = ffi.new("FAPI_CONTEXT **")
uri = _to_bytes_or_null(uri)
ret = lib.Fapi_Initialize(self._ctx_pp, uri)
_chkrc(ret)
# set callbacks
self.callbacks: Dict[CallbackType, Optional[Callback]] = {}
@property
def _ctx(self):
"""Get the Feature API C context used by the library to hold state.
Returns:
The Feature API C context.
"""
return self._ctx_pp[0]
def __enter__(self):
return self
def __exit__(self, _type, value, traceback):
self.close()
[docs] def close(self) -> None:
"""Finalize the Feature API. This frees allocated memory and invalidates the FAPI object."""
lib.Fapi_Finalize(self._ctx_pp)
for callback_type, callback in self.callbacks.items():
if callback is not None:
unlock_callback(callback_type, callback.name)
self.callbacks[callback_type] = None
# TODO flesh out info class
@property
def version(self):
"""
Get the tpm2-tss library version.
Returns:
str: The Feature API C context.
"""
info = json.loads(self.get_info())
return FapiInfo(info).version
@property
def config(self): # TODO doc, test
info = json.loads(self.get_info())
return FapiInfo(info).fapi_config
@property
def tcti(self): # TODO doc, test
tcti = ffi.new("TSS2_TCTI_CONTEXT **")
# returns the actual tcti context, not a copy (so no extra memory is allocated by the fapi)
ret = lib.Fapi_GetTcti(self._ctx, tcti)
_chkrc(ret)
return TCTI(tcti[0])
[docs] def provision(
self,
auth_value_eh: Optional[Union[bytes, str]] = None,
auth_value_sh: Optional[Union[bytes, str]] = None,
auth_value_lockout: Optional[Union[bytes, str]] = None,
is_provisioned_ok: bool = True,
) -> bool:
"""Provision the Feature API. Creates the keystore and creates some TPM
objects. See also config file `/etc/tpm2-tss/fapi-config.json`.
Args:
auth_value_eh (bytes or str, optional): Endorsement Hierarchy password. Defaults to None.
auth_value_sh (bytes or str, optional, optional): Storage/Owner Hierarchy password. Defaults to None.
auth_value_lockout (bytes or str, optional): Lockout Hierarchy password. Defaults to None.
is_provisioned_ok (bool, optional): Do not throw a TSS2_Exception if Fapi is already provisioned. Defaults to True.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bool: True if Fapi was provisioned, False otherwise.
"""
auth_value_eh = _to_bytes_or_null(auth_value_eh)
auth_value_sh = _to_bytes_or_null(auth_value_sh)
auth_value_lockout = _to_bytes_or_null(auth_value_lockout, allow_null=False)
ret = lib.Fapi_Provision(
self._ctx, auth_value_eh, auth_value_sh, auth_value_lockout
)
_chkrc(
ret,
acceptable=[lib.TSS2_FAPI_RC_ALREADY_PROVISIONED]
if is_provisioned_ok
else None,
)
return ret == lib.TPM2_RC_SUCCESS
[docs] def get_random(self, num_bytes: int) -> bytes:
"""Get true random bytes, generated by the TPM.
Args:
num_bytes (int): Number of bytes to generate.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bytes: The random bytes.
"""
if num_bytes > 1024:
logger.warning(
"Requesting a large number of bytes. This may take a while: {num_bytes}"
)
data = ffi.new("uint8_t **")
ret = lib.Fapi_GetRandom(self._ctx, num_bytes, data)
_chkrc(ret)
return bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), num_bytes))
[docs] def get_info(self) -> str:
"""Get Fapi information, containing library info, TPM capabilities and more.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
str: JSON-encoded info string.
"""
info = ffi.new("char **")
ret = lib.Fapi_GetInfo(self._ctx, info)
_chkrc(ret)
return ffi.string(_get_dptr(info, lib.Fapi_Free)).decode(self.encoding)
[docs] def list(self, search_path: Optional[Union[bytes, str]] = None) -> List[str]:
"""Get a list of all Fapi current object paths.
Args:
search_path (bytes or str, optional): If given, only list children of `search_path`. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
List[str]: List of all current Fapi object paths.
"""
search_path = _to_bytes_or_null(search_path, allow_null=False)
path_list = ffi.new("char **")
ret = lib.Fapi_List(self._ctx, search_path, path_list)
_chkrc(ret)
return (
ffi.string(_get_dptr(path_list, lib.Fapi_Free))
.decode(self.encoding)
.split(":")
)
[docs] def create_key(
self,
path: Union[bytes, str],
type_: Optional[Union[bytes, str]] = None, # TODO enum
policy_path: Optional[Union[bytes, str]] = None,
auth_value: Optional[Union[bytes, str]] = None,
exists_ok: bool = False,
) -> bool:
"""Create a cryptographic key inside the TPM.
Args:
path (bytes or str): Path to the new key object, e.g. `/HS/SRK/new_signing_key`.
type_ (bytes or str, optional): Comma separated list. Possible values: system, sign, decrypt, restricted, exportable, noda, 0x81000000. Defaults to None.
auth_value (bytes or str, optional): Password to key. Defaults to None.
exists_ok (bool, optional): Do not throw a TSS2_Exception if an object with the given path already exists. Defaults to False.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bool: True if the key was created. False otherwise.
"""
path = _to_bytes_or_null(path)
type_ = _to_bytes_or_null(type_)
policy_path = _to_bytes_or_null(policy_path)
auth_value = _to_bytes_or_null(auth_value)
ret = lib.Fapi_CreateKey(self._ctx, path, type_, policy_path, auth_value)
_chkrc(
ret, acceptable=lib.TSS2_FAPI_RC_PATH_ALREADY_EXISTS if exists_ok else None
)
return ret == lib.TPM2_RC_SUCCESS
[docs] def sign(
self,
path: Union[bytes, str],
digest: bytes,
padding: Optional[Union[bytes, str]] = None, # TODO enum
) -> Tuple[bytes, str, str]:
"""Create a signature over a given digest.
Args:
path (bytes or str): Path to the signing key.
digest (bytes): Digest to sign.
padding (bytes or str, optional): `"rsa_ssa"` or `"rsa_pss"`. Defaults to None (using the scheme specified in the crypto profile).
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Tuple[bytes, str, str]: (signature (DER), public key (PEM), certificate (PEM))
"""
path = _to_bytes_or_null(path)
padding = _to_bytes_or_null(padding) # enum
digest = _to_bytes_or_null(digest)
signature = ffi.new("uint8_t **")
signature_size = ffi.new("size_t *")
public_key = ffi.new("char **")
certificate = ffi.new("char **")
ret = lib.Fapi_Sign(
self._ctx,
path,
padding,
digest,
len(digest),
signature,
signature_size,
public_key,
certificate,
)
_chkrc(ret)
return (
bytes(ffi.unpack(_get_dptr(signature, lib.Fapi_Free), signature_size[0])),
ffi.string(_get_dptr(public_key, lib.Fapi_Free)),
ffi.string(_get_dptr(certificate, lib.Fapi_Free)),
)
[docs] def verify_signature(
self, path: Union[bytes, str], digest: bytes, signature: bytes
):
"""Verify a signature on a given digest.
Args:
path (bytes or str): Path to the signing key.
digest (bytes): Digest which was signed.
signature (bytes): Signature to be verified.
Raises:
TSS2_Exception: If Fapi returned an error code, e.g. if the signature cannot be verified successfully.
"""
path = _to_bytes_or_null(path)
ret = lib.Fapi_VerifySignature(
self._ctx, path, digest, len(digest), signature, len(signature)
)
_chkrc(ret)
[docs] def encrypt(
self, path: Union[bytes, str], plaintext: Union[bytes, str]
) -> bytes: # TODO difference seal/unseal
"""Encrypt the plaintext and return the ciphertext.
Args:
path (bytes or str): The decrypt key used for encryption.
plaintext (bytes or str): The data to be encrypted.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bytes: The ciphertext.
"""
_check_bug_fixed(
fixed_in="3.2",
backports=["2.4.7", "3.0.5", "3.1.1"],
details="Faulty free of FAPI Encrypt might lead to Segmentation Fault. See https://github.com/tpm2-software/tpm2-tss/issues/2092",
)
path = _to_bytes_or_null(path)
plaintext = _to_bytes_or_null(plaintext)
ciphertext = ffi.new("uint8_t **")
ciphertext_size = ffi.new("size_t *")
ret = lib.Fapi_Encrypt(
self._ctx, path, plaintext, len(plaintext), ciphertext, ciphertext_size
)
_chkrc(ret)
return bytes(
ffi.unpack(_get_dptr(ciphertext, lib.Fapi_Free), ciphertext_size[0])
)
[docs] def decrypt(self, path: Union[bytes, str], ciphertext: bytes) -> bytes:
"""Decrypt the ciphertext and return the plaintext.
Args:
path (bytes or str): The decrypt key used for decryption.
ciphertext (bytes or str): The data to be decrypted.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bytes: The plaintext.
"""
path = _to_bytes_or_null(path)
plaintext = ffi.new("uint8_t **")
plaintext_size = ffi.new("size_t *")
ret = lib.Fapi_Decrypt(
self._ctx, path, ciphertext, len(ciphertext), plaintext, plaintext_size
)
_chkrc(ret)
return bytes(ffi.unpack(plaintext[0], plaintext_size[0]))
[docs] def create_seal(
self,
path: Union[bytes, str],
data: Optional[Union[bytes, str]] = None,
type_: Optional[Union[bytes, str]] = None,
policy_path: Optional[Union[bytes, str]] = None,
auth_value: Optional[Union[bytes, str]] = None,
size: Optional[int] = None,
exists_ok: bool = False,
) -> bool:
"""Create a Fapi sealed (= encrypted) object, that is data sealed a Fapi parent key. Oftentimes, the data is a digest.
Args:
path (bytes or str): The path of the new sealed object.
data (bytes or str, optional): Data to be sealed (often a digest). If None, random data will be generated. Defaults to None.
type_ (bytes or str, optional): Comma separated list. Possible values: system, sign, decrypt, restricted, exportable, noda, 0x81000000. Defaults to None.
policy_path (bytes or str, optional): The path to the policy which will be associated with the sealed object. Defaults to None.
auth_value (bytes or str, optional): Password to protect the new sealed object. Defaults to None.
size (int, optional): If data is None, random bytes of length size are generated. Parameters data and size cannot be given at the same time. Defaults to None.
exists_ok (bool, optional): Do not throw a TSS2_Exception if an object with the given path already exists. Defaults to False.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bool: True if the sealed object was created. False otherwise.
"""
path = _to_bytes_or_null(path)
if data is not None and size is not None:
raise ValueError("Parameters data and size cannot be given at same time.")
if data is None and size is None:
raise ValueError("Either parameter data or parameter size must be given.")
if data is None:
data_len = size
else:
data_len = len(data)
data = _to_bytes_or_null(data)
type_ = _to_bytes_or_null(type_)
policy_path = _to_bytes_or_null(policy_path)
auth_value = _to_bytes_or_null(auth_value)
ret = lib.Fapi_CreateSeal(
self._ctx, path, type_, data_len, policy_path, auth_value, data
)
_chkrc(
ret, acceptable=lib.TSS2_FAPI_RC_PATH_ALREADY_EXISTS if exists_ok else None
)
return ret == lib.TPM2_RC_SUCCESS
[docs] def unseal(self, path: Union[bytes, str]) -> bytes:
"""Unseal a sealed (= encrypted) Fapi object and return the data in plaintext.
Args:
path (Union[bytes, str]): The path to the sealed object.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bytes: The unsealed data in plaintext.
"""
path = _to_bytes_or_null(path)
data = ffi.new("uint8_t **")
data_size = ffi.new("size_t *")
ret = lib.Fapi_Unseal(self._ctx, path, data, data_size)
_chkrc(ret)
return bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), data_size[0]))
[docs] def import_object(
self,
path: Union[bytes, str],
import_data: Union[bytes, str],
exists_ok: bool = False,
) -> bool:
"""Import policy, policy template or key into the keystore.
Args:
path (bytes or str): Path of the future Fapi object.
import_data (bytes or str): JSON-encoded data to import.
exists_ok (bool, optional): Do not throw a TSS2_Exception if an object with the given path already exists. Defaults to False.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bool: True if the object was imported. False otherwise.
"""
_check_bug_fixed(
fixed_in="3.2",
details="FAPI Import will overwrite existing objects with same path silently. See https://github.com/tpm2-software/tpm2-tss/issues/2028",
)
path = _to_bytes_or_null(path)
import_data = _to_bytes_or_null(import_data)
ret = lib.Fapi_Import(self._ctx, path, import_data)
_chkrc(
ret, acceptable=lib.TSS2_FAPI_RC_PATH_ALREADY_EXISTS if exists_ok else None
)
return ret == lib.TPM2_RC_SUCCESS
[docs] def delete(self, path: Union[bytes, str]) -> None:
"""Delete Fapi object.
Args:
path (bytes or str): Path to the Fapi object to delete.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
ret = lib.Fapi_Delete(self._ctx, path)
_chkrc(ret)
[docs] def change_auth(
self, path: Union[bytes, str], auth_value: Optional[Union[bytes, str]] = None
) -> None:
"""Change the password to a Fapi object.
Args:
path (bytes or str): Path to the Fapi object.
auth_value (bytes or str, optional): New password. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
auth_value = _to_bytes_or_null(auth_value)
ret = lib.Fapi_ChangeAuth(self._ctx, path, auth_value)
_chkrc(ret)
[docs] def export_key(
self, path: Union[bytes, str], new_path: Union[bytes, str] = None
) -> str:
"""Export a Fapi object as a JSON-encoded string.
Args:
path (bytes or str): Path to the existing Fapi object.
new_path (bytes or str, optional): New path to the Fapi object. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
str: The exported data.
"""
path = _to_bytes_or_null(path)
new_path = _to_bytes_or_null(new_path)
exported_data = ffi.new("char **")
ret = lib.Fapi_ExportKey(self._ctx, path, new_path, exported_data)
_chkrc(ret)
return ffi.string(_get_dptr(exported_data, lib.Fapi_Free)).decode(self.encoding)
[docs] def set_description(
self, path: Union[bytes, str], description: Optional[Union[bytes, str]] = None
) -> None:
"""Set the description of a Fapi object.
Args:
path (bytes or str): Path to the Fapi object.
description (bytes or str, optional): New description of the Fapi object. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
description = _to_bytes_or_null(description)
ret = lib.Fapi_SetDescription(self._ctx, path, description)
_chkrc(ret)
[docs] def get_description(self, path: Union[bytes, str] = None) -> str:
"""Get the description of a Fapi object.
Args:
path (bytes or str): Path to the Fapi object.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
str: The description of the Fapi object.
"""
path = _to_bytes_or_null(path)
description = ffi.new("char **")
ret = lib.Fapi_GetDescription(self._ctx, path, description)
_chkrc(ret)
# description is guaranteed to be a null-terminated string
return ffi.string(_get_dptr(description, lib.Fapi_Free)).decode()
[docs] def set_app_data(
self, path: Union[bytes, str], app_data: Optional[Union[bytes, str]] = None
) -> None:
"""Add custom application data to a Fapi object. This data is saved alongside the object and can be used by the application.
Args:
path (bytes or str): Path to the Fapi object.
app_data (bytes or str, optional): Custom application data to be associated with the Fapi object. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
if app_data is None:
app_data_len = 0
else:
app_data_len = len(app_data)
app_data = _to_bytes_or_null(app_data)
ret = lib.Fapi_SetAppData(self._ctx, path, app_data, app_data_len)
_chkrc(ret)
[docs] def get_app_data(self, path: Union[bytes, str]) -> Optional[bytes]:
"""Get the custom application data of a Fapi object.
Args:
path (bytes or str): Path to the Fapi object.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Optional[bytes]: The application data or None.
"""
path = _to_bytes_or_null(path)
app_data = ffi.new("uint8_t **")
app_data_size = ffi.new("size_t *")
ret = lib.Fapi_GetAppData(self._ctx, path, app_data, app_data_size)
_chkrc(ret)
if app_data[0] == ffi.NULL:
return None
return bytes(ffi.unpack(_get_dptr(app_data, lib.Fapi_Free), app_data_size[0]))
[docs] def set_certificate(
self, path: Union[bytes, str], certificate: Optional[Union[bytes, str]] = None
) -> None:
"""Add x509 certificate to a Fapi object. This data is saved alongside the object and can be used by the application.
Args:
path (bytes or str): Path to the Fapi object.
certificate (bytes or str, optional): x509 certificate to be associated with the Fapi object. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
certificate = _to_bytes_or_null(certificate)
ret = lib.Fapi_SetCertificate(self._ctx, path, certificate)
_chkrc(ret)
[docs] def get_certificate(self, path: Union[bytes, str]) -> str:
"""Get the custom application data of a Fapi object.
Args:
path (bytes or str): Path to the Fapi object.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
bytes: The application data.
"""
path = _to_bytes_or_null(path)
certificate = ffi.new("char **")
ret = lib.Fapi_GetCertificate(self._ctx, path, certificate)
_chkrc(ret)
# certificate is guaranteed to be a null-terminated string
return ffi.string(_get_dptr(certificate, lib.Fapi_Free)).decode()
[docs] def get_tpm_blobs(
self, path: Union[bytes, str]
) -> Tuple[TPM2B_PUBLIC, TPM2B_PRIVATE, str]:
"""Get the TPM data blobs and the policy associates with a Fapi object.
Args:
path (bytes or str): Path to the Fapi object.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Tuple[TPM2B_PUBLIC, TPM2B_PRIVATE, str]: (tpm_2b_public, tpm_2b_private, policy)
"""
path = _to_bytes_or_null(path)
tpm_2b_public = ffi.new("uint8_t **")
tpm_2b_public_size = ffi.new("size_t *")
tpm_2b_private = ffi.new("uint8_t **")
tpm_2b_private_size = ffi.new("size_t *")
policy = ffi.new("char **")
ret = lib.Fapi_GetTpmBlobs(
self._ctx,
path,
tpm_2b_public,
tpm_2b_public_size,
tpm_2b_private,
tpm_2b_private_size,
policy,
)
_chkrc(ret)
policy_str = ffi.string(policy[0]).decode(self.encoding)
tpm_2b_public_buffer = bytes(
ffi.buffer(tpm_2b_public[0], tpm_2b_public_size[0])
)
tpm_2b_public_unmarsh, _ = TPM2B_PUBLIC.unmarshal(tpm_2b_public_buffer)
tpm_2b_private_buffer = bytes(
ffi.buffer(tpm_2b_private[0], tpm_2b_private_size[0])
)
tpm_2b_private_unmarsh, _ = TPM2B_PRIVATE.unmarshal(tpm_2b_private_buffer)
return (
tpm_2b_public_unmarsh,
tpm_2b_private_unmarsh,
policy_str,
)
[docs] def get_esys_blob(self, path: Union[bytes, str]) -> Tuple[bytes, Any]:
"""Return the ESAPI binary blob associated with a Fapi object.
This blob can be easily loaded with :meth:`~tpm2_pytss.ESAPI.load_blob()`.
Args:
path (bytes or str): Path to the Fapi object.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Tuple[bytes, Any]: A tuple of the binary blob and its type (:const:`FAPI_ESYSBLOB.CONTEXTLOAD` or :const:`FAPI_ESYSBLOB.DESERIALIZE)`
"""
path = _to_bytes_or_null(path)
type_ = ffi.new("uint8_t *")
data = ffi.new("uint8_t **")
length = ffi.new("size_t *")
ret = lib.Fapi_GetEsysBlob(self._ctx, path, type_, data, length)
_chkrc(ret)
return bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), length[0])), type_[0]
[docs] def export_policy(self, path: Union[bytes, str]) -> str:
"""Export a policy from the key store as a JSON-encoded string.
Args:
path (bytes or str): Path to the FAPI policy.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
str: JSON-encoded policy.
"""
path = _to_bytes_or_null(path)
policy = ffi.new("char **")
ret = lib.Fapi_ExportPolicy(self._ctx, path, policy)
_chkrc(ret)
return ffi.string(_get_dptr(policy, lib.Fapi_Free)).decode()
[docs] def authorize_policy(
self,
policy_path: Union[bytes, str],
key_path: Union[bytes, str],
policy_ref: Optional[Union[bytes, str]] = None,
):
"""Specify the underlying policy/policies for a policy Authorize.
Args:
policy_path (bytes or str): Path to the underlying policy.
key_path (bytes or str): Path to the key associated with the policy Authorize.
policy_ref (bytes or str, optional): Additional application data (e.g. a reference to another policy). Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
policy_path = _to_bytes_or_null(policy_path)
key_path = _to_bytes_or_null(key_path)
if policy_ref is None:
policy_ref_len = 0
else:
policy_ref_len = len(policy_ref)
policy_ref = _to_bytes_or_null(policy_ref)
ret = lib.Fapi_AuthorizePolicy(
self._ctx, policy_path, key_path, policy_ref, policy_ref_len
)
_chkrc(ret)
[docs] def pcr_read(self, index: int) -> Tuple[bytes, str]:
"""Read the value of a TPM Platform Configuration Register (PCR) and its
associated event log.
Args:
index (int): Index of the PCR (in the range of 0-23 in most cases).
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Tuple[bytes, str]: (pcr_value, event_log)
"""
value = ffi.new("uint8_t **")
value_size = ffi.new("size_t *")
log = ffi.new("char **")
ret = lib.Fapi_PcrRead(self._ctx, index, value, value_size, log)
_chkrc(ret)
return (
bytes(ffi.unpack(_get_dptr(value, lib.Fapi_Free), value_size[0])),
ffi.string(_get_dptr(log, lib.Fapi_Free)).decode(),
)
[docs] def pcr_extend(
self,
index: int,
data: Union[bytes, str],
log: Optional[Union[bytes, str]] = None,
) -> None:
"""Extend the value of a TPM Platform Configuration Register (PCR).
The data given by the user and the previous PCR value are hashed
together. The resulting digest is stored as the new PCR value. As a
result, a PCR value depends on every piece of data given via the extend
command (until the PCR is reset).
Args:
index (int): Index of the PCR (in the range of 0-23 in most cases).
data (bytes or str): Input data to the extend operation.
log (bytes or str, optional): JSON-encoded event log data. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Tuple[bytes, str]: PCR value and its associated event log.
"""
# TODO "extend", formula in doc
log = _to_bytes_or_null(log)
data = _to_bytes_or_null(data)
ret = lib.Fapi_PcrExtend(self._ctx, index, data, len(data), log)
_chkrc(ret)
[docs] def quote(
self,
path: Union[bytes, str],
pcrs: List[int],
quote_type: Optional[Union[bytes, str]] = None,
qualifying_data: Optional[Union[bytes, str]] = None,
) -> Tuple[str, bytes, str, str]:
"""Create a TPM quote, that is a signed data structure of the TPM Platform Configuration Registers (PCRs), reset count, firmware version and more.
Args:
path (bytes or str): Path to the key used for signing.
pcrs (List[int]): List of PCR indices to be included in the quote.
quote_type (bytes or str, optional): Type of quote to create. The default "TPM-Quote" is used if None is given. Defaults to None.
qualifying_data (bytes or str, optional): Additional application-defined data. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Tuple[str, bytes, str, str]: info, signature, pcr_log, certificate
"""
_check_bug_fixed(
fixed_in="3.2",
backports=["2.4.7", "3.0.5", "3.1.1"],
details="Multiple calls of FAPI Quote might lead to TPM out of memory errors. See https://github.com/tpm2-software/tpm2-tss/issues/2084",
)
path = _to_bytes_or_null(path)
quote_type = _to_bytes_or_null(quote_type)
if qualifying_data is None:
qualifying_data_len = 0
else:
qualifying_data_len = len(qualifying_data)
qualifying_data = _to_bytes_or_null(qualifying_data)
quote_info = ffi.new("char **")
signature = ffi.new("uint8_t **")
signature_len = ffi.new("size_t *")
pcr_log = ffi.new("char **")
certificate = ffi.new("char **")
ret = lib.Fapi_Quote(
self._ctx,
pcrs,
len(pcrs),
path,
quote_type,
qualifying_data,
qualifying_data_len,
quote_info,
signature,
signature_len,
pcr_log,
certificate,
)
_chkrc(ret)
return (
ffi.string(_get_dptr(quote_info, lib.Fapi_Free)).decode(),
bytes(ffi.unpack(_get_dptr(signature, lib.Fapi_Free), signature_len[0])),
ffi.string(_get_dptr(pcr_log, lib.Fapi_Free)).decode(),
ffi.string(
_get_dptr(certificate, lib.Fapi_Free) or ffi.new("char *")
).decode(),
)
[docs] def verify_quote(
self,
path: Union[bytes, str],
signature: bytes,
quote_info: Union[bytes, str],
qualifying_data: Optional[Union[bytes, str]] = None,
pcr_log: Optional[Union[bytes, str]] = None,
):
"""Verify the signature to a TPM quote.
Args:
path (bytes or str): Path to the key used for verifying the signature.
signature (bytes): Signature to the quote.
quote_info (bytes or str, optional): Quote info structure.
qualifying_data (bytes or str, optional): Additional application-defined data. Defaults to None.
pcr_log (bytes or str, optional): JSON-encoded PCR log entry.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
signature = _to_bytes_or_null(signature)
if qualifying_data is None:
qualifying_data_len = 0
else:
qualifying_data_len = len(qualifying_data)
qualifying_data = _to_bytes_or_null(qualifying_data)
quote_info = _to_bytes_or_null(quote_info)
pcr_log = _to_bytes_or_null(pcr_log)
ret = lib.Fapi_VerifyQuote(
self._ctx,
path,
qualifying_data,
qualifying_data_len,
quote_info,
signature,
len(signature),
pcr_log,
)
_chkrc(ret)
[docs] def create_nv(
self,
path: Union[bytes, str],
size: int,
type_: Optional[Union[bytes, str]] = None,
policy_path: Optional[Union[bytes, str]] = None,
auth_value: Optional[Union[bytes, str]] = None,
) -> None:
"""Create non-volatile (NV) storage on the TPM.
Args:
path (bytes or str): Path to the NV storage area.
size (int): Size of the storage area in bytes.
type_ (bytes or str, optional): Type of the storage area. A combination of `bitfield`, `counter`, `pcr`, `system`, `noda`. Defaults to None.
policy_path (bytes or str, optional): The path to the policy which will be associated with the storage area. Defaults to None.
auth_value (bytes or str, optional): Password to protect the new storage area. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
type_ = _to_bytes_or_null(type_)
policy_path = _to_bytes_or_null(policy_path)
auth_value = _to_bytes_or_null(auth_value)
ret = lib.Fapi_CreateNv(self._ctx, path, type_, size, policy_path, auth_value)
_chkrc(ret)
[docs] def nv_read(self, path: Union[bytes, str]) -> Tuple[bytes, str]:
"""Read from non-volatile (NV) TPM storage.
Args:
path (bytes or str): Path to the NV storage area.
Raises:
TSS2_Exception: If Fapi returned an error code.
Returns:
Tuple[bytes, str]: Data stored in the NV storage area and its associated event log.
"""
path = _to_bytes_or_null(path)
data = ffi.new("uint8_t **")
data_size = ffi.new("size_t *")
log = ffi.new("char **")
ret = lib.Fapi_NvRead(self._ctx, path, data, data_size, log)
_chkrc(ret)
return (
bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), data_size[0])),
ffi.string(_get_dptr(log, lib.Fapi_Free)).decode(),
)
[docs] def nv_write(self, path: Union[bytes, str], data: Union[bytes, str]) -> None:
"""Write data to a non-volatile (NV) TPM storage and the associated event log.
Args:
path (bytes or str): Path to the NV storage area.
data (bytes or str): Data to write to the NV storage area.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
data = _to_bytes_or_null(data)
ret = lib.Fapi_NvWrite(self._ctx, path, data, len(data))
_chkrc(ret)
[docs] def nv_extend(
self,
path: Union[bytes, str],
data: Union[bytes, str],
log: Optional[Union[bytes, str]] = None,
) -> None:
"""Perform an extend operation on a non-volatile TPM storage area.
Requires an NV object of type `pcr`. For more information on the extend
operation, see :meth:`~tpm2_pytss.FAPI.pcr_extend`.
Args:
path (bytes or str): Path to the NV storage area.
data (bytes or str): Input data to the extend operation.
log (bytes or str, optional): JSON-encoded event to be associated with this change. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
data = _to_bytes_or_null(data)
log = _to_bytes_or_null(log)
ret = lib.Fapi_NvExtend(self._ctx, path, data, len(data), log)
_chkrc(ret)
[docs] def nv_increment(self, path: Union[bytes, str]) -> None:
"""Increment the counter value stored in non-volatile (NV) TPM storage.
Args:
path (bytes or str): Path to the NV storage area.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
ret = lib.Fapi_NvIncrement(self._ctx, path)
_chkrc(ret)
[docs] def nv_set_bits(self, path: Union[bytes, str], bitmap: int) -> None:
"""Set bits of bitfielad, stored in non-volatile (NV) TPM storage.
Args:
path (bytes or str): Path to the NV storage area.
bitmap (int): Bits to set in the NV storage area.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
path = _to_bytes_or_null(path)
ret = lib.Fapi_NvSetBits(self._ctx, path, bitmap)
_chkrc(ret)
[docs] def write_authorize_nv(
self, nv_path: Union[bytes, str], policy_path: Union[bytes, str]
) -> None:
"""Write a policy to non-volatile (NV) TPM storage.
Args:
nv_path (bytes or str): Path to the NV storage area.
policy_path (bytes or str): Path to the policy to be written.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
nv_path = _to_bytes_or_null(nv_path)
policy_path = _to_bytes_or_null(policy_path)
ret = lib.Fapi_WriteAuthorizeNv(self._ctx, nv_path, policy_path)
_chkrc(ret)
def _register_callback(
self,
callback_type: CallbackType,
callback_wrapper: Callable,
unlock: bool = False,
) -> Callable:
"""(Un)register a C callback and tie it to a python wrapper. Does not call Fapi API calls.
Args:
callback_type (CallbackType): Type of callback. Each type can have up to one callback.
callback_wrapper (Callable): Python wrapper which is called by the C callback and will call a user-defined function.
unlock (bool, optional): True if the callback is to be freed again. Defaults to False.
Returns:
str: CFFI binding function
"""
if unlock and self.callbacks[callback_type] is not None:
# unlock c callback
unlock_callback(CallbackType.FAPI_AUTH, self.callbacks[callback_type].name) # type: ignore
self.callbacks[callback_type] = None
c_callback = ffi.NULL
else:
if (
callback_type not in self.callbacks
or self.callbacks[callback_type] is None
):
# get c callback and lock it
self.callbacks[callback_type] = get_callback(callback_type)
# link callback wrapper to c function
callback_wrapper.__name__ = self.callbacks[callback_type].name # type: ignore
ffi.def_extern()(callback_wrapper)
c_callback = self.callbacks[callback_type].c_function # type: ignore
return c_callback
[docs] def set_auth_callback(
self,
callback: Optional[Callable[[str, str, Optional[bytes]], bytes]] = None,
user_data: Optional[Union[bytes, str]] = None,
) -> None:
"""Register a callback that provides the password for Fapi objects when
needed. Typically, this callback implements a password prompt. If `callback` is None, the callback function is reset.
Args:
callback (Callable[[str, str, Optional[bytes]], bytes], optional): A callback function `callback(path, description, user_data=None)` which returns the password (:class:`bytes`). Defaults to None.
user_data (byte, optional): Bytes that will be handed to the callback. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
if callback is None and user_data is not None:
raise RuntimeError("If callback is None, user_data must be None, too.")
if user_data is None:
user_data_len = 0
else:
user_data_len = len(user_data)
user_data = _to_bytes_or_null(user_data)
def callback_wrapper(path, description, auth, user_data):
path = ffi.string(path).decode()
description = ffi.string(description).decode()
if user_data == ffi.NULL:
user_data = None
else:
user_data = bytes(
ffi.unpack(ffi.cast("uint8_t *", user_data), user_data_len)
)
try:
auth_value = callback(path, description, user_data)
except Exception:
return lib.TSS2_FAPI_RC_CB_FAILURE
# auth value is cleaned up by the FAPI
auth[0] = ffi_malloc("char[]", auth_value)
return lib.TPM2_RC_SUCCESS
c_callback = self._register_callback(
CallbackType.FAPI_AUTH, callback_wrapper, unlock=callback is None
)
ret = lib.Fapi_SetAuthCB(self._ctx, c_callback, user_data)
_chkrc(ret)
[docs] def set_branch_callback(
self,
callback: Optional[
Callable[[str, str, List[str], Optional[bytes]], int]
] = None,
user_data: Optional[Union[bytes, str]] = None,
):
"""Set the Fapi policy branch callback, called to decide which policy path to take in a policy Or. If `callback` is None, the callback function is reset.
Args:
callback (Callable[[str, str, List[str], Optional[bytes]], int], optional): A callback function `callback(path, description, branch_names, user_data=None)` which returns the index (:class:`int`) of the selected branch in `branch_names`. Defaults to None.
user_data (bytes or str, optional): Custom data passed to the callback function. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
if callback is None and user_data is not None:
raise ValueError("If callback is None, user_data must be None, too.")
if user_data is None:
user_data_len = 0
else:
user_data_len = len(user_data)
user_data = _to_bytes_or_null(user_data)
def callback_wrapper(
path, description, branch_names, num_branches, selected_branch, user_data
):
path = ffi.string(path).decode()
description = ffi.string(description).decode()
branch_names = [
ffi.string(branch_names[i]).decode() for i in range(0, num_branches)
]
if user_data == ffi.NULL:
user_data = None
else:
user_data = bytes(
ffi.unpack(ffi.cast("uint8_t *", user_data), user_data_len)
)
try:
selected_branch[0] = callback(
path, description, branch_names, user_data
)
except Exception:
return lib.TSS2_FAPI_RC_GENERAL_FAILURE
return lib.TPM2_RC_SUCCESS
c_callback = self._register_callback(
CallbackType.FAPI_BRANCH, callback_wrapper, unlock=callback is None
)
ret = lib.Fapi_SetBranchCB(self._ctx, c_callback, user_data)
_chkrc(ret)
[docs] def set_sign_callback(
self,
callback: Optional[
Callable[[str, str, str, str, int, bytes, Optional[bytes]], bytes]
] = None,
user_data: Optional[Union[bytes, str]] = None,
):
"""Set the Fapi signing callback which is called to satisfy the policy Signed. If `callback` is None, the callback function is reset.
Args:
callback (Callable[[str, str, str, str, int, bytes, Optional[bytes]], bytes], optional): A callback function `callback(path, description, public_key, public_key_hint, hash_alg, data_to_sign, user_data=None)` which returns a signature (:class:`bytes`) of `data_to_sign`. Defaults to None.
user_data (bytes or str, optional): Custom data passed to the callback function. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
_check_bug_fixed(
fixed_in="3.2",
details="FAPI PolicySigned default nameAlg might be SHA1 unexpectedly. See https://github.com/tpm2-software/tpm2-tss/issues/2080. Fixed in https://github.com/tpm2-software/tpm2-tss/commit/b843960b6e601a786b469832392dc0a12e13cf34",
)
if callback is None and user_data is not None:
raise RuntimeError("If callback is None, user_data must be None, too.")
if user_data is None:
user_data_len = 0
else:
user_data_len = len(user_data)
user_data = _to_bytes_or_null(user_data)
def callback_wrapper(
path,
description,
public_key,
public_key_hint,
hash_alg,
data_to_sign,
data_to_sign_len,
signature,
signature_len,
user_data,
):
path = ffi.string(path).decode()
description = ffi.string(description).decode()
public_key = ffi.string(public_key).decode()
public_key_hint = ffi.string(public_key_hint).decode()
data_to_sign = bytes(ffi.unpack(data_to_sign, data_to_sign_len))
if user_data == ffi.NULL:
user_data = None
else:
user_data = bytes(
ffi.unpack(ffi.cast("uint8_t *", user_data), user_data_len,)
)
try:
signature_value = callback(
path,
description,
public_key,
public_key_hint,
hash_alg,
data_to_sign,
user_data,
)
except Exception:
return lib.TSS2_FAPI_RC_CB_FAILURE
# signature is cleaned up by the FAPI
signature[0] = ffi_malloc("char[]", signature_value)
signature_len[0] = len(signature_value)
return lib.TPM2_RC_SUCCESS
c_callback = self._register_callback(
CallbackType.FAPI_SIGN, callback_wrapper, unlock=callback is None
)
ret = lib.Fapi_SetSignCB(self._ctx, c_callback, user_data)
_chkrc(ret)
[docs] def set_policy_action_callback(
self,
callback: Optional[Callable[[str, str, Optional[bytes]], None]] = None,
user_data: Optional[Union[bytes, str]] = None,
):
"""Set the policy Action callback which is called to satisfy the policy Action. If `callback` is None, the callback function is reset.
Args:
callback (Callable[[str, str, Optional[bytes]], None], optional): A callback function `callback(path, action, user_data=None)`. Defaults to None.
user_data (bytes or str, optional): Custom data passed to the callback function. Defaults to None.
Raises:
TSS2_Exception: If Fapi returned an error code.
"""
if callback is None and user_data is not None:
raise ValueError("If callback is None, user_data must be None, too.")
_check_bug_fixed(
fixed_in="3.2",
backports=["2.4.7", "3.0.5", "3.1.1"],
details="FAPI Policy Action might lead to crashes. See https://github.com/tpm2-software/tpm2-tss/issues/2089",
)
if user_data is None:
user_data_len = 0
else:
user_data_len = len(user_data)
user_data = _to_bytes_or_null(user_data)
def callback_wrapper(path, action, user_data):
path = ffi.string(path).decode()
action = ffi.string(action).decode()
if user_data == ffi.NULL:
user_data = None
else:
user_data = bytes(
ffi.unpack(ffi.cast("uint8_t *", user_data), user_data_len)
)
try:
callback(path, action, user_data)
except Exception:
return lib.TSS2_FAPI_RC_GENERAL_FAILURE
return lib.TPM2_RC_SUCCESS
c_callback = self._register_callback(
CallbackType.FAPI_POLICYACTION, callback_wrapper, unlock=callback is None
)
ret = lib.Fapi_SetPolicyActionCB(self._ctx, c_callback, user_data)
_chkrc(ret)