G

plugin XEP-0373 and XEP-0374: Implementation of OX and OXIM:

<div xmlns="http://www.w3.org/1999/xhtml"><p>GPGME is used as the GPG provider.

rel 374</p></div>

S

syndace 25/09/2022, 13:48

Hi!

This is my current work-in-progress state of XEP-0373. It's not linted/type checked/tested yet. I have to read through the XEP carefully again and make sure I have all uses covered, then fix remaining TODOs and test everything.

However, I'll be on vacation for a few days starting tomorrow, so I wanted to get this out to you now. It's likely enough for you to start working on pubsub encryption, even if the code doesn't run yet.

S

syndace 07/10/2022, 14:26

Just updated: the offline functionality (i.e. interaction with gpg as well as building, serialization, parsing and validation of the XML) is now tested and working. I'll continue to test/fix the online parts now, i.e. pubsub interactions to publish public keys etc.

S

syndace 07/10/2022, 14:33

Just updated: the offline functionality (i.e. interaction with gpg as well as building, serialization, parsing and validation of the XML) is now tested and working. I'll continue to test/fix the online parts now, i.e. pubsub interactions to publish public keys etc.

G

goffi 08/10/2022, 15:33

Hi, thanks for your MR.

  1. In

    python python # TODO: GPG home directory? Maybe a per-profile setting? import os os.makedirs("/tmp/ox-gpg-home/", mode=0o700, exist_ok=True) gpg_provider = GPGME_GPGProvider("/tmp/ox-gpg-home/")

    If you need to use a temporary dir, you should use tempfile standard module, it's more secure and work cross platforms. If you need to keep data, use host.memory.getCachePath.

  2. I don't see any dependency for gpgme? Should not be this added somewhere?

  3. Why replacing ValueError by ParsingError in sat.tools.xmpp_datatime? I'm not against it, just wonder if there is any specific reason? ValueError (well sometime it is exceptions.DataError) is usually used in Libervia when a value is not valid, as it make error catching simpler with a single exception raised.

S

syndace 09/10/2022, 11:38

  1. This is just for testing, the GPG home is obviously not supposed to be temporary.
  2. The recommended installation method for GPGME is the system's package manager. See the explanation in the exceptions.MissingModule exception I'm raising. This has to be documented somewhere though I guess.
  3. Well it is specifically a parsing error in that case, so ParsingError feels more fitting to me. Also ParsingError is a subclass of ValueError IIRC, so you can still just catch that.
G

goffi 09/10/2022, 12:03

sure make sense, thanks!

S

syndace 10/10/2022, 09:29

I've added key management (trust) UI. Since 373 isn't an encryption plugin, the UI can't currently be opened. Opening the UI will be part of the 374 implementation that I'm working on next.

I think this is ready-to-merge now.

S

syndace 11/10/2022, 12:18

Updated again. I have added the 374 implementation and key list management/PEP node handling etc.

The implementation is interoperating with Gajim too!

Now it really is ready-to-merge :)

G

goffi 11/10/2022, 13:33

Merged, thanks!

id

9

author

Tim

created

2022-09-25T13:43:24Z

updated

2022-10-11T13:32:43Z

labels
core
status
closed

plugin XEP-0373 and XEP-0374: Implementation of OX and OXIM: GPGME is used as the GPG provider. rel 374

diff --git a/sat/plugins/plugin_xep_0060.py b/sat/plugins/plugin_xep_0060.py
--- a/sat/plugins/plugin_xep_0060.py
+++ b/sat/plugins/plugin_xep_0060.py
@@ -893,14 +893,20 @@
             client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
         )
 
-    def createNode(self, client, service, nodeIdentifier=None, options=None):
+    def createNode(
+        self,
+        client: SatXMPPClient,
+        service: jid.JID,
+        nodeIdentifier: Optional[str] = None,
+        options: Optional[Dict[str, str]] = None
+    ) -> str:
         """Create a new node
 
-        @param service(jid.JID): PubSub service,
-        @param NodeIdentifier(unicode, None): node name
-           use None to create instant node (identifier will be returned by this method)
-        @param option(dict[unicode, unicode], None): node configuration options
-        @return (unicode): identifier of the created node (may be different from requested name)
+        @param service: PubSub service,
+        @param NodeIdentifier: node name use None to create instant node (identifier will
+            be returned by this method)
+        @param option: node configuration options
+        @return: identifier of the created node (may be different from requested name)
         """
         # TODO: if pubsub service doesn't hande publish-options, configure it in a second time
         return client.pubsub_client.createNode(service, nodeIdentifier, options)
diff --git a/sat/plugins/plugin_xep_0373.py b/sat/plugins/plugin_xep_0373.py
new file mode 100644
--- /dev/null
+++ b/sat/plugins/plugin_xep_0373.py
@@ -0,0 +1,2085 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from abc import ABC, abstractmethod
+import base64
+from datetime import datetime, timezone
+import enum
+import secrets
+import string
+from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast
+from xml.sax.saxutils import quoteattr
+
+from typing_extensions import Final, NamedTuple, Never, assert_never
+from wokkel import muc, pubsub
+from wokkel.disco import DiscoFeature, DiscoInfo
+import xmlschema
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.memory import persistent
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0060 import XEP_0060
+from sat.plugins.plugin_xep_0163 import XEP_0163
+from sat.tools.xmpp_datetime import format_datetime, parse_datetime
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+try:
+    import gpg
+except ImportError as import_error:
+    raise exceptions.MissingModule(
+        "You are missing the 'gpg' package required by the OX plugin. The recommended"
+        " installation method is via your operating system's package manager, since the"
+        " version of the library has to match the version of your GnuPG installation. See"
+        " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme"
+    ) from import_error
+
+
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "NS_OX",
+    "XEP_0373",
+    "VerificationError",
+    "XMPPInteractionFailed",
+    "InvalidPacket",
+    "DecryptionFailed",
+    "VerificationFailed",
+    "UnknownKey",
+    "GPGProviderError",
+    "GPGPublicKey",
+    "GPGSecretKey",
+    "GPGProvider",
+    "PublicKeyMetadata",
+    "gpg_provider",
+    "TrustLevel"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "XEP-0373",
+    C.PI_IMPORT_NAME: "XEP-0373",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0373" ],
+    C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "XEP_0373",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
+}
+
+
+NS_OX: Final = "urn:xmpp:openpgp:0"
+
+
+PARAM_CATEGORY = "Security"
+PARAM_NAME = "ox_policy"
+
+
+class VerificationError(Exception):
+    """
+    Raised by verifying methods of :class:`XEP_0373` on semantical verification errors.
+    """
+
+
+class XMPPInteractionFailed(Exception):
+    """
+    Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this
+    exception exists is that the exceptions raised by XMPP interactions are not properly
+    documented for the most part, thus all exceptions are caught and wrapped in instances
+    of this class.
+    """
+
+
+class InvalidPacket(ValueError):
+    """
+    Raised by methods of :class:`GPGProvider` when an invalid packet is encountered.
+    """
+
+
+class DecryptionFailed(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on decryption failures.
+    """
+
+
+class VerificationFailed(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on verification failures.
+    """
+
+
+class UnknownKey(ValueError):
+    """
+    Raised by methods of :class:`GPGProvider` when an unknown key is referenced.
+    """
+
+
+class GPGProviderError(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on internal errors.
+    """
+
+
+class GPGPublicKey(ABC):
+    """
+    Interface describing a GPG public key.
+    """
+
+    @property
+    @abstractmethod
+    def fingerprint(self) -> str:
+        """
+        @return: The OpenPGP v4 fingerprint string of this public key.
+        """
+
+
+class GPGSecretKey(ABC):
+    """
+    Interface descibing a GPG secret key.
+    """
+
+    @property
+    @abstractmethod
+    def public_key(self) -> GPGPublicKey:
+        """
+        @return: The public key corresponding to this secret key.
+        """
+
+
+class GPGProvider(ABC):
+    """
+    Interface describing a GPG provider, i.e. a library or framework providing GPG
+    encryption, signing and key management.
+
+    All methods may raise :class:`GPGProviderError` in addition to those exception types
+    listed explicitly.
+
+    # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?)
+    """
+
+    @abstractmethod
+    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+        """Export a public key in a key material packet according to RFC 4880 §5.5.
+
+        Do not use OpenPGP's ASCII Armor.
+
+        @param public_key: The public key to export.
+        @return: The packet containing the exported public key.
+        @raise UnknownKey: if the public key is not available.
+        """
+
+    @abstractmethod
+    def import_public_key(self, packet: bytes) -> GPGPublicKey:
+        """Import a public key from a key material packet according to RFC 4880 §5.5.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param packet: A packet containing an exported public key.
+        @return: The public key imported from the packet.
+        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+            invalid.
+
+        @warning: Only packets of version 4 or higher may be accepted, packets below
+            version 4 MUST be rejected.
+        """
+
+    @abstractmethod
+    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+        """Export a secret key for transfer according to RFC 4880 §11.1.
+
+        Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage
+        conventions to zero in the corresponding secret-key packet according to RFC 4880
+        §5.5.3. Do not use OpenPGP's ASCII Armor.
+
+        @param secret_key: The secret key to export.
+        @return: The binary blob containing the exported secret key.
+        @raise UnknownKey: if the secret key is not available.
+        """
+
+    @abstractmethod
+    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+        """Restore secret keys exported for transfer according to RFC 4880 §11.1.
+
+        The secret data is not encrypted, i.e. the octet indicating string-to-key usage
+        conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3
+        are set to zero. OpenPGP's ASCII Armor is not used.
+
+        @param data: Concatenation of one or more secret keys exported for transfer.
+        @return: The secret keys imported from the data.
+        @raise InvalidPacket: if the data or one of the packets included in the data is
+            either syntactically or semantically deemed invalid.
+
+        @warning: Only packets of version 4 or higher may be accepted, packets below
+            version 4 MUST be rejected.
+        """
+
+    @abstractmethod
+    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+        """Encrypt data symmetrically according to RFC 4880 §5.3.
+
+        The password is used to build a Symmetric-Key Encrypted Session Key packet which
+        precedes the Symmetrically Encrypted Data packet that holds the encrypted data.
+
+        @param plaintext: The data to encrypt.
+        @param password: The password to encrypt the data with.
+        @return: The encrypted data.
+        """
+
+    @abstractmethod
+    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+        """Decrypt data symmetrically according to RFC 4880 §5.3.
+
+        The ciphertext consists of a Symmetrically Encrypted Data packet that holds the
+        encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the
+        password.
+
+        @param ciphertext: The ciphertext.
+        @param password: The password to decrypt the data with.
+        @return: The plaintext.
+        @raise DecryptionFailed: on decryption failure.
+        """
+
+    @abstractmethod
+    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        """Sign some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data to sign.
+        @param secret_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the signed data.
+        """
+
+    @abstractmethod
+    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        """Sign some data. Create the signature detached from the data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data to sign.
+        @param secret_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the detached signature.
+        """
+
+    @abstractmethod
+    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+        """Verify signed data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param signed_data: The signed data as an OpenPGP message.
+        @param public_keys: The public keys to verify the signature with.
+        @return: The verified and unpacked data.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that a valid signature by one
+            of the public keys is available.
+        """
+
+    @abstractmethod
+    def verify_detached(
+        self,
+        data: bytes,
+        signature: bytes,
+        public_keys: Set[GPGPublicKey]
+    ) -> None:
+        """Verify signed data, where the signature was created detached from the data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data.
+        @param signature: The signature as an OpenPGP message.
+        @param public_keys: The public keys to verify the signature with.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that a valid signature by one
+            of the public keys is available.
+        """
+
+    @abstractmethod
+    def encrypt(
+        self,
+        plaintext: bytes,
+        public_keys: Set[GPGPublicKey],
+        signing_keys: Optional[Set[GPGSecretKey]] = None
+    ) -> bytes:
+        """Encrypt and optionally sign some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param plaintext: The data to encrypt and optionally sign.
+        @param public_keys: The public keys to encrypt the data for.
+        @param signing_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the encrypted and optionally signed data.
+        """
+
+    @abstractmethod
+    def decrypt(
+        self,
+        ciphertext: bytes,
+        secret_keys: Set[GPGSecretKey],
+        public_keys: Optional[Set[GPGPublicKey]] = None
+    ) -> bytes:
+        """Decrypt and optionally verify some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param ciphertext: The encrypted and optionally signed data as an OpenPGP message.
+        @param secret_keys: The secret keys to attempt decryption with.
+        @param public_keys: The public keys to verify the optional signature with.
+        @return: The decrypted, optionally verified and unpacked data.
+        @raise DecryptionFailed: on decryption failure.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that the data was decrypted
+            using one of the secret keys and that a valid signature by one of the public
+            keys is available in case the data is signed.
+        """
+
+    @abstractmethod
+    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+        """List public keys.
+
+        @param user_id: The user id.
+        @return: The set of public keys available for this user id.
+        """
+
+    @abstractmethod
+    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+        """List secret keys.
+
+        @param user_id: The user id.
+        @return: The set of secret keys available for this user id.
+        """
+
+    @abstractmethod
+    def can_sign(self, public_key: GPGPublicKey) -> bool:
+        """
+        @return: Whether the public key belongs to a key pair capable of signing.
+        """
+
+    @abstractmethod
+    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+        """
+        @return: Whether the public key belongs to a key pair capable of encryption.
+        """
+
+    @abstractmethod
+    def create_key(self, user_id: str) -> GPGSecretKey:
+        """Create a new GPG key, capable of signing and encryption.
+
+        The key is generated without password protection and without expiration. If a key
+        with the same user id already exists, a new key is created anyway.
+
+        @param user_id: The user id to assign to the new key.
+        @return: The new key.
+        """
+
+
+class GPGME_GPGPublicKey(GPGPublicKey):
+    """
+    GPG public key implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, key_obj: Any) -> None:
+        """
+        @param key_obj: The GPGME key object.
+        """
+
+        self.__key_obj = key_obj
+
+    @property
+    def fingerprint(self) -> str:
+        return self.__key_obj.fpr
+
+    @property
+    def key_obj(self) -> Any:
+        return self.__key_obj
+
+
+class GPGME_GPGSecretKey(GPGSecretKey):
+    """
+    GPG secret key implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, public_key: GPGME_GPGPublicKey) -> None:
+        """
+        @param public_key: The public key corresponding to this secret key.
+        """
+
+        self.__public_key = public_key
+
+    @property
+    def public_key(self) -> GPGME_GPGPublicKey:
+        return self.__public_key
+
+
+class GPGME_GPGProvider(GPGProvider):
+    """
+    GPG provider implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, home_dir: Optional[str] = None) -> None:
+        """
+        @param home_dir: Optional GPG home directory path to use for all operations.
+        """
+
+        self.__home_dir = home_dir
+
+    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        pattern = public_key.fingerprint
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_export_minimal(pattern)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if result is None:
+                raise UnknownKey(f"Public key {pattern} not found.")
+
+            return result
+
+    def import_public_key(self, packet: bytes) -> GPGPublicKey:
+        # TODO
+        # - Reject packets older than version 4
+        # - Check whether it's actually a public key (through packet inspection?)
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_import(packet)
+            except gpg.errors.GPGMEError as e:
+                # From looking at the code, `key_import` never raises. The documentation
+                # says it does though, so this is included for future-proofness.
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if not hasattr(result, "considered"):
+                raise InvalidPacket(
+                    f"Data not considered for public key import: {result}"
+                )
+
+            if len(result.imports) != 1:
+                raise InvalidPacket(
+                    "Public key packet does not contain exactly one public key (not"
+                    " counting subkeys)."
+                )
+
+            try:
+                key_obj = c.get_key(result.imports[0].fpr, secret=False)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.KeyError as e:
+                raise GPGProviderError("Newly imported public key not found") from e
+
+            return GPGME_GPGPublicKey(key_obj)
+
+    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+        assert isinstance(secret_key, GPGME_GPGSecretKey)
+        # TODO
+        # - Handle password protection/pinentry
+        # - Make sure the key is exported unencrypted
+
+        pattern = secret_key.public_key.fingerprint
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_export_secret(pattern)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if result is None:
+                raise UnknownKey(f"Secret key {pattern} not found.")
+
+            return result
+
+    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+        # TODO
+        # - Reject packets older than version 4
+        # - Check whether it's actually secret keys (through packet inspection?)
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_import(data)
+            except gpg.errors.GPGMEError as e:
+                # From looking at the code, `key_import` never raises. The documentation
+                # says it does though, so this is included for future-proofness.
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if not hasattr(result, "considered"):
+                raise InvalidPacket(
+                    f"Data not considered for secret key import: {result}"
+                )
+
+            if len(result.imports) == 0:
+                raise InvalidPacket("Secret key packet does not contain a secret key.")
+
+            secret_keys = set()
+            for import_status in result.imports:
+                try:
+                    key_obj = c.get_key(import_status.fpr, secret=True)
+                except gpg.errors.GPGMEError as e:
+                    raise GPGProviderError("Internal GPGME error") from e
+                except gpg.errors.KeyError as e:
+                    raise GPGProviderError("Newly imported secret key not found") from e
+
+                secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)))
+
+            return secret_keys
+
+    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            return ciphertext
+
+    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                plaintext, __, __ = c.decrypt(
+                    ciphertext,
+                    passphrase=password,
+                    verify=False
+                )
+            except gpg.errors.GPGMEError as e:
+                # TODO: Find out what kind of error is raised if the password is wrong and
+                # re-raise it as DecryptionFailed instead.
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.UnsupportedAlgorithm as e:
+                raise DecryptionFailed("Unsupported algorithm") from e
+
+            return plaintext
+
+    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        signers = []
+        for secret_key in secret_keys:
+            assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+            signers.append(secret_key.public_key.key_obj)
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                signed_data, __ = c.sign(data)
+            except gpg.error.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the secret keys is invalid for signing"
+                ) from e
+
+            return signed_data
+
+    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        signers = []
+        for secret_key in secret_keys:
+            assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+            signers.append(secret_key.public_key.key_obj)
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH)
+            except gpg.error.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the secret keys is invalid for signing"
+                ) from e
+
+            return signature
+
+    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                data, result = c.verify(signed_data)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.BadSignatures as e:
+                raise VerificationFailed("Bad signatures on signed data") from e
+
+            valid_signature_found = False
+            for public_key in public_keys:
+                assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                for subkey in public_key.key_obj.subkeys:
+                    for sig in result.signatures:
+                        if subkey.can_sign and subkey.fpr == sig.fpr:
+                            valid_signature_found = True
+
+            if not valid_signature_found:
+                raise VerificationFailed(
+                    "Data not signed by one of the expected public keys"
+                )
+
+            return data
+
+    def verify_detached(
+        self,
+        data: bytes,
+        signature: bytes,
+        public_keys: Set[GPGPublicKey]
+    ) -> None:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                __, result = c.verify(data, signature=signature)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.BadSignatures as e:
+                raise VerificationFailed("Bad signatures on signed data") from e
+
+            valid_signature_found = False
+            for public_key in public_keys:
+                assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                for subkey in public_key.key_obj.subkeys:
+                    for sig in result.signatures:
+                        if subkey.can_sign and subkey.fpr == sig.fpr:
+                            valid_signature_found = True
+
+            if not valid_signature_found:
+                raise VerificationFailed(
+                    "Data not signed by one of the expected public keys"
+                )
+
+    def encrypt(
+        self,
+        plaintext: bytes,
+        public_keys: Set[GPGPublicKey],
+        signing_keys: Optional[Set[GPGSecretKey]] = None
+    ) -> bytes:
+        recipients = []
+        for public_key in public_keys:
+            assert isinstance(public_key, GPGME_GPGPublicKey)
+
+            recipients.append(public_key.key_obj)
+
+        signers = []
+        if signing_keys is not None:
+            for secret_key in signing_keys:
+                assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+                signers.append(secret_key.public_key.key_obj)
+
+        sign = signing_keys is not None
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                ciphertext, __, __ = c.encrypt(
+                    plaintext,
+                    recipients=recipients,
+                    sign=sign,
+                    always_trust=True,
+                    add_encrypt_to=True
+                )
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidRecipients as e:
+                raise GPGProviderError(
+                    "At least one of the public keys is invalid for encryption"
+                ) from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the signing keys is invalid for signing"
+                ) from e
+
+            return ciphertext
+
+    def decrypt(
+        self,
+        ciphertext: bytes,
+        secret_keys: Set[GPGSecretKey],
+        public_keys: Optional[Set[GPGPublicKey]] = None
+    ) -> bytes:
+        verify = public_keys is not None
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                plaintext, result, verify_result = c.decrypt(
+                    ciphertext,
+                    verify=verify
+                )
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.UnsupportedAlgorithm as e:
+                raise DecryptionFailed("Unsupported algorithm") from e
+
+            # TODO: Check whether the data was decrypted using one of the expected secret
+            # keys
+
+            if public_keys is not None:
+                valid_signature_found = False
+                for public_key in public_keys:
+                    assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                    for subkey in public_key.key_obj.subkeys:
+                        for sig in verify_result.signatures:
+                            if subkey.can_sign and subkey.fpr == sig.fpr:
+                                valid_signature_found = True
+
+                if not valid_signature_found:
+                    raise VerificationFailed(
+                        "Data not signed by one of the expected public keys"
+                    )
+
+            return plaintext
+
+    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                return {
+                    GPGME_GPGPublicKey(key)
+                    for key
+                    in c.keylist(pattern=user_id, secret=False)
+                }
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                return {
+                    GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
+                    for key
+                    in c.keylist(pattern=user_id, secret=True)
+                }
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+    def can_sign(self, public_key: GPGPublicKey) -> bool:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        return any(subkey.can_sign for subkey in public_key.key_obj.subkeys)
+
+    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys)
+
+    def create_key(self, user_id: str) -> GPGSecretKey:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.create_key(
+                    user_id,
+                    expires=False,
+                    sign=True,
+                    encrypt=True,
+                    certify=False,
+                    authenticate=False,
+                    force=True
+                )
+
+                key_obj = c.get_key(result.fpr, secret=True)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.KeyError as e:
+                raise GPGProviderError("Newly created key not found") from e
+
+            return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))
+
+
+class PublicKeyMetadata(NamedTuple):
+    """
+    Metadata about a published public key.
+    """
+
+    fingerprint: str
+    timestamp: datetime
+
+
+@enum.unique
+class TrustLevel(enum.Enum):
+    """
+    The trust levels required for BTBV and manual trust.
+    """
+
+    TRUSTED: str = "TRUSTED"
+    BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
+    UNDECIDED: str = "UNDECIDED"
+    DISTRUSTED: str = "DISTRUSTED"
+
+
+OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="openpgp" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+# The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
+# Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
+# implementation of XML Schema, including version 1.1.
+CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="signcrypt">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="sign">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="crypt">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="to">
+        <xs:complexType>
+            <xs:attribute name="jid" type="xs:string"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="time">
+        <xs:complexType>
+            <xs:attribute name="stamp" type="xs:dateTime"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="rpad" type="xs:string"/>
+
+    <xs:element name="payload">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+</xs:schema>
+""")
+
+
+PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
+PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="public-keys-list">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="pubkey-metadata">
+        <xs:complexType>
+            <xs:attribute name="v4-fingerprint" type="xs:string"/>
+            <xs:attribute name="date" type="xs:dateTime"/>
+        </xs:complexType>
+    </xs:element>
+</xs:schema>
+""")
+
+
+PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="pubkey">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="data"/>
+            </xs:all>
+            <xs:anyAttribute processContents="skip"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="data" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="secretkey" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+DEFAULT_TRUST_MODEL_PARAM = f"""
+<params>
+<individual>
+<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
+    <param name="{PARAM_NAME}"
+        label={quoteattr(D_('OMEMO default trust policy'))}
+        type="list" security="3">
+        <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+        <option value="btbv"
+            label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
+            selected="true" />
+    </param>
+</category>
+</individual>
+</params>
+"""
+
+
+def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider:
+    """Get the GPG provider for a client.
+
+    @param sat: The SAT instance.
+    @param client: The client.
+    @return: The GPG provider specifically for that client.
+    """
+
+    return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home")))
+
+
+def generate_passphrase() -> str:
+    """Generate a secure passphrase for symmetric encryption.
+
+    @return: The passphrase.
+    """
+
+    return "-".join("".join(
+        secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
+    ) for __ in range(6))
+
+
+# TODO: Handle the user id mess
+class XEP_0373:
+    """
+    Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``.
+    """
+
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Add configuration option to choose between manual trust and BTBV as the trust
+        # model
+        sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM)
+
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
+
+        self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}
+
+        xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
+        xep_0163.addPEPEvent(
+            "OX_PUBLIC_KEYS_LIST",
+            PUBLIC_KEYS_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_public_keys_list_update(items_event, profile)
+            )
+        )
+
+    async def profileConnected(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient
+    ) -> None:
+        """
+        @param client: The client.
+        """
+
+        profile = cast(str, client.profile)
+
+        if not profile in self.__storage:
+            self.__storage[profile] = \
+                persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)
+
+        if len(self.list_secret_keys(client)) == 0:
+            log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
+            await self.create_key(client)
+
+    async def __on_public_keys_list_update(
+        self,
+        items_event: pubsub.ItemsEvent,
+        profile: str
+    ) -> None:
+        """Handle public keys list updates fired by PEP.
+
+        @param items_event: The event.
+        @param profile: The profile this event belongs to.
+        """
+
+        client = self.__sat.getClient(profile)
+
+        sender = cast(jid.JID, items_event.sender)
+        items = cast(List[domish.Element], items_event.items)
+
+        if len(items) > 1:
+            log.warning("Ignoring public keys list update with more than one element.")
+            return
+
+        item_elt = next(iter(items), None)
+        if item_elt is None:
+            log.debug("Ignoring empty public keys list update.")
+            return
+
+        public_keys_list_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+        )
+
+        pubkey_metadata_elts: Optional[List[domish.Element]] = None
+
+        if public_keys_list_elt is not None:
+            try:
+                PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+            except xmlschema.XMLSchemaValidationError:
+                pass
+            else:
+                pubkey_metadata_elts = \
+                    list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))
+
+        if pubkey_metadata_elts is None:
+            log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
+            return
+
+        new_public_keys_metadata = { PublicKeyMetadata(
+            fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
+            timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
+        ) for pubkey_metadata_elt in pubkey_metadata_elts }
+
+        storage_key = f"/public-keys-metadata/{sender.userhost()}"
+
+        local_public_keys_metadata = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[profile].get(storage_key, set())
+        )
+
+        unchanged_keys = new_public_keys_metadata & local_public_keys_metadata
+        changed_or_new_keys = new_public_keys_metadata - unchanged_keys
+        available_keys = self.list_public_keys(client, sender)
+
+        for key_metadata in changed_or_new_keys:
+            # Check whether the changed or new key has been imported before
+            if any(key.fingerprint == key_metadata.fingerprint for key in available_keys):
+                try:
+                    # If it has been imported before, try to update it
+                    await self.import_public_key(client, sender, key_metadata.fingerprint)
+                except Exception as e:
+                    log.warning(f"Public key import failed: {e}")
+
+                    # If the update fails, remove the key from the local metadata list
+                    # such that the update is attempted again next time
+                    new_public_keys_metadata.remove(key_metadata)
+
+        # Check whether this update was for our account and make sure all of our keys are
+        # included in the update
+        if sender.userhost() == client.jid.userhost():
+            secret_keys = self.list_secret_keys(client)
+            missing_keys = set(filter(lambda secret_key: all(
+                key_metadata.fingerprint != secret_key.public_key.fingerprint
+                for key_metadata
+                in new_public_keys_metadata
+            ), secret_keys))
+
+            if len(missing_keys) > 0:
+                log.warning(
+                    "Public keys list update did not contain at least one of our keys."
+                    f" {new_public_keys_metadata}"
+                )
+
+                for missing_key in missing_keys:
+                    log.warning(missing_key.public_key.fingerprint)
+                    new_public_keys_metadata.add(PublicKeyMetadata(
+                        fingerprint=missing_key.public_key.fingerprint,
+                        timestamp=datetime.now(timezone.utc)
+                    ))
+
+                await self.publish_public_keys_list(client, new_public_keys_metadata)
+
+        await self.__storage[profile].force(storage_key, new_public_keys_metadata)
+
+    def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
+        """List GPG public keys available for a JID.
+
+        @param client: The client to perform this operation with.
+        @param jid: The JID. Can be a bare JID.
+        @return: The set of public keys available for this JID.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}")
+
+    def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]:
+        """List GPG secret keys available for a JID.
+
+        @param client: The client to perform this operation with.
+        @return: The set of secret keys available for this JID.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}")
+
+    async def create_key(self, client: SatXMPPClient) -> GPGSecretKey:
+        """Create a new GPG key, capable of signing and encryption.
+
+        The key is generated without password protection and without expiration.
+
+        @param client: The client to perform this operation with.
+        @return: The new key.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}")
+
+        await self.publish_public_key(client, secret_key.public_key)
+
+        storage_key = f"/public-keys-metadata/{client.jid.userhost()}"
+
+        public_keys_list = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[client.profile].get(storage_key, set())
+        )
+
+        public_keys_list.add(PublicKeyMetadata(
+            fingerprint=secret_key.public_key.fingerprint,
+            timestamp=datetime.now(timezone.utc)
+        ))
+
+        await self.publish_public_keys_list(client, public_keys_list)
+
+        await self.__storage[client.profile].force(storage_key, public_keys_list)
+
+        return secret_key
+
+    @staticmethod
+    def __build_content_element(
+        element_name: Literal["signcrypt", "sign", "crypt"],
+        recipient_jids: Iterable[jid.JID],
+        include_rpad: bool
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a content element.
+
+        @param element_name: The name of the content element.
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @param include_rpad: Whether to include random-length random-content padding.
+        @return: The content element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        content_elt = domish.Element((NS_OX, element_name))
+
+        for recipient_jid in recipient_jids:
+            content_elt.addElement("to")["jid"] = recipient_jid.userhost()
+
+        content_elt.addElement("time")["stamp"] = format_datetime()
+
+        if include_rpad:
+            # XEP-0373 doesn't specify bounds for the length of the random padding. This
+            # uses the bounds specified in XEP-0420 for the closely related rpad affix.
+            rpad_length = secrets.randbelow(201)
+            rpad_content = "".join(
+                secrets.choice(string.digits + string.ascii_letters + string.punctuation)
+                for __
+                in range(rpad_length)
+            )
+            content_elt.addElement("rpad", content=rpad_content)
+
+        payload_elt = content_elt.addElement("payload")
+
+        return content_elt, payload_elt
+
+    @staticmethod
+    def build_signcrypt_element(
+        recipient_jids: Iterable[jid.JID]
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<signcrypt/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the
+            stanza extension elements to.
+        """
+
+        if len(recipient_jids) == 0:
+            raise ValueError("Recipient JIDs must be provided.")
+
+        return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)
+
+    @staticmethod
+    def build_sign_element(
+        recipient_jids: Iterable[jid.JID],
+        include_rpad: bool
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<sign/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @param include_rpad: Whether to include random-length random-content padding,
+            which is OPTIONAL for the ``<sign/>`` content element.
+        @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        if len(recipient_jids) == 0:
+            raise ValueError("Recipient JIDs must be provided.")
+
+        return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)
+
+    @staticmethod
+    def build_crypt_element(
+        recipient_jids: Iterable[jid.JID]
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<crypt/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Specifying
+            the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
+            be bare JIDs.
+        @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        return XEP_0373.__build_content_element("crypt", recipient_jids, True)
+
+    async def build_openpgp_element(
+        self,
+        client: SatXMPPClient,
+        content_elt: domish.Element,
+        recipient_jids: Set[jid.JID]
+    ) -> domish.Element:
+        """Build an ``<openpgp/>`` element.
+
+        @param client: The client to perform this operation with.
+        @param content_elt: The content element to contain in the ``<openpgp/>`` element.
+        @param recipient_jids: The recipient's JIDs. Can be bare JIDs.
+        @return: The ``<openpgp/>`` element.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        # TODO: I'm not sure whether we want to sign with all keys by default or choose
+        # just one key/a subset of keys to sign with.
+        signing_keys = set(filter(
+            lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
+            self.list_secret_keys(client)
+        ))
+
+        encryption_keys: Set[GPGPublicKey] = set()
+
+        for recipient_jid in recipient_jids:
+            # Import all keys of the recipient
+            all_public_keys = await self.import_all_public_keys(client, recipient_jid)
+
+            # Filter for keys that can encrypt
+            encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))
+
+        # TODO: Handle trust
+
+        content = content_elt.toXml().encode("utf-8")
+        data: bytes
+
+        if content_elt.name == "signcrypt":
+            data = gpg_provider.encrypt(content, encryption_keys, signing_keys)
+        elif content_elt.name == "sign":
+            data = gpg_provider.sign(content, signing_keys)
+        elif content_elt.name == "crypt":
+            data = gpg_provider.encrypt(content, encryption_keys)
+        else:
+            raise ValueError(f"Unknown content element <{content_elt.name}/>")
+
+        openpgp_elt = domish.Element((NS_OX, "openpgp"))
+        openpgp_elt.addContent(base64.b64encode(data).decode("ASCII"))
+        return openpgp_elt
+
+    async def unpack_openpgp_element(
+        self,
+        client: SatXMPPClient,
+        openpgp_elt: domish.Element,
+        element_name: Literal["signcrypt", "sign", "crypt"],
+        sender_jid: jid.JID
+    ) -> Tuple[domish.Element, datetime]:
+        """Verify, decrypt and unpack an ``<openpgp/>`` element.
+
+        @param client: The client to perform this operation with.
+        @param openpgp_elt: The ``<openpgp/>`` element.
+        @param element_name: The name of the content element.
+        @param sender_jid: The sender's JID. Can be a bare JID.
+        @return: The ``<payload/>`` element containing the decrypted/verified stanza
+            extension elements carried by this ``<openpgp/>`` element, and the timestamp
+            contained in the content element.
+        @raise exceptions.ParsingError: on syntactical verification errors.
+        @raise VerificationError: on semantical verification errors accoding to XEP-0373.
+        @raise DecryptionFailed: on decryption failure.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: The timestamp is not verified for plausibility; this SHOULD be done by
+            the calling code.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        decryption_keys = set(filter(
+            lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
+            self.list_secret_keys(client)
+        ))
+
+        # Import all keys of the sender
+        all_public_keys = await self.import_all_public_keys(client, sender_jid)
+
+        # Filter for keys that can sign
+        verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))
+
+        # TODO: Handle trust
+
+        try:
+            OPENPGP_SCHEMA.validate(openpgp_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                "<openpgp/> element doesn't pass schema validation."
+            ) from e
+
+        openpgp_message = base64.b64decode(str(openpgp_elt))
+        content: bytes
+
+        if element_name == "signcrypt":
+            content = gpg_provider.decrypt(
+                openpgp_message,
+                decryption_keys,
+                public_keys=verification_keys
+            )
+        elif element_name == "sign":
+            content = gpg_provider.verify(openpgp_message, verification_keys)
+        elif element_name == "crypt":
+            content = gpg_provider.decrypt(openpgp_message, decryption_keys)
+        else:
+            assert_never(element_name)
+
+        try:
+            content_elt = cast(
+                domish.Element,
+                xml_tools.ElementParser()(content.decode("utf-8"))
+            )
+        except UnicodeDecodeError as e:
+            raise exceptions.ParsingError("UTF-8 decoding error") from e
+
+        try:
+            CONTENT_SCHEMA.validate(content_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"<{element_name}/> element doesn't pass schema validation."
+            ) from e
+
+        if content_elt.name != element_name:
+            raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
+
+        recipient_jids = \
+            { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }
+
+        if (
+            client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
+            and element_name != "crypt"
+        ):
+            raise VerificationError(
+                f"Recipient list in <{element_name}/> element does not list our (bare)"
+                f" JID."
+            )
+
+        time_elt = next(content_elt.elements(NS_OX, "time"))
+
+        timestamp = parse_datetime(time_elt["stamp"])
+
+        payload_elt = next(content_elt.elements(NS_OX, "payload"))
+
+        return payload_elt, timestamp
+
+    async def publish_public_key(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey
+    ) -> None:
+        """Publish a public key.
+
+        @param client: The client.
+        @param public_key: The public key to publish.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        packet = gpg_provider.export_public_key(public_key)
+
+        node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}"
+
+        pubkey_elt = domish.Element((NS_OX, "pubkey"))
+
+        pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))
+
+        try:
+            await self.__xep_0060.sendItem(
+                client,
+                client.jid.userhostJID(),
+                node,
+                pubkey_elt,
+                format_datetime(),
+                extra={
+                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "open",
+                        XEP_0060.OPT_MAX_ITEMS: 1
+                    },
+                    # TODO: Do we really want publish_without_options here?
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                }
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the public key failed.") from e
+
+    async def import_all_public_keys(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID
+    ) -> Set[GPGPublicKey]:
+        """Import all public keys of a JID that have not been imported before.
+
+        @param client: The client.
+        @param jid: The JID. Can be a bare JID.
+        @return: The public keys.
+        @note: Failure to import a key simply results in the key not being included in the
+            result.
+        """
+
+        available_public_keys = self.list_public_keys(client, jid)
+
+        storage_key = f"/public-keys-metadata/{jid.userhost()}"
+
+        public_keys_metadata = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[client.profile].get(storage_key, set())
+        )
+
+        missing_keys = set(filter(lambda public_key_metadata: all(
+            public_key_metadata.fingerprint != public_key.fingerprint
+            for public_key
+            in available_public_keys
+        ), public_keys_metadata))
+
+        for missing_key in missing_keys:
+            try:
+                available_public_keys.add(
+                    await self.import_public_key(client, jid, missing_key.fingerprint)
+                )
+            except Exception as e:
+                log.warning(
+                    f"Import of public key {missing_key.fingerprint} owned by"
+                    f" {jid.userhost()} failed, ignoring: {e}"
+                )
+
+        return available_public_keys
+
+    async def import_public_key(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID,
+        fingerprint: str
+    ) -> GPGPublicKey:
+        """Import a public key.
+
+        @param client: The client.
+        @param jid: The JID owning the public key. Can be a bare JID.
+        @param fingerprint: The fingerprint of the public key.
+        @return: The public key.
+        @raise exceptions.NotFound: if the public key was not found.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+            invalid.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
+
+        try:
+            items, __ = await self.__xep_0060.getItems(
+                client,
+                jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound as e:
+            raise exceptions.NotFound(
+                f"No public key with fingerprint {fingerprint} published by JID"
+                f" {jid.userhost()}."
+            ) from e
+        except Exception as e:
+            raise XMPPInteractionFailed("Fetching the public keys list failed.") from e
+
+        try:
+            item_elt = cast(domish.Element, items[0])
+        except IndexError as e:
+            raise exceptions.NotFound(
+                f"No public key with fingerprint {fingerprint} published by JID"
+                f" {jid.userhost()}."
+            ) from e
+
+        pubkey_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "pubkey"), None)
+        )
+
+        if pubkey_elt is None:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
+                f" element."
+            )
+
+        try:
+            PUBKEY_SCHEMA.validate(pubkey_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
+                f" schema validation."
+            ) from e
+
+        public_key = gpg_provider.import_public_key(base64.b64decode(str(
+            next(pubkey_elt.elements(NS_OX, "data"))
+        )))
+
+        return public_key
+
+    async def publish_public_keys_list(
+        self,
+        client: SatXMPPClient,
+        public_keys_list: Iterable[PublicKeyMetadata]
+    ) -> None:
+        """Publish/update the own public keys list.
+
+        @param client: The client.
+        @param public_keys_list: The public keys list.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+
+        @warning: All public keys referenced in the public keys list MUST be published
+            beforehand.
+        """
+
+        if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
+            raise ValueError("Public keys list contains duplicate fingerprints.")
+
+        node = "urn:xmpp:openpgp:0:public-keys"
+
+        public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))
+
+        for public_key_metadata in public_keys_list:
+            pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
+            pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
+            pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)
+
+        try:
+            await self.__xep_0060.sendItem(
+                client,
+                client.jid.userhostJID(),
+                node,
+                public_keys_list_elt,
+                item_id=XEP_0060.ID_SINGLETON,
+                extra={
+                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "open",
+                        XEP_0060.OPT_MAX_ITEMS: 1
+                    },
+                    # TODO: Do we really want publish_without_options here?
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                }
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
+
+    async def download_public_keys_list(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID
+    ) -> Optional[Set[PublicKeyMetadata]]:
+        """Download the public keys list of a JID.
+
+        @param client: The client.
+        @param jid: The JID. Can be a bare JID.
+        @return: The public keys list or ``None`` if the JID hasn't published a public
+            keys list. An empty list means the JID has published an empty list.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        node = "urn:xmpp:openpgp:0:public-keys"
+
+        try:
+            items, __ = await self.__xep_0060.getItems(
+                client,
+                jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound:
+            return None
+        except Exception as e:
+            raise XMPPInteractionFailed() from e
+
+        try:
+            item_elt = cast(domish.Element, items[0])
+        except IndexError:
+            return None
+
+        public_keys_list_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+        )
+
+        if public_keys_list_elt is None:
+            return None
+
+        try:
+            PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys"
+                f" list schema validation."
+            ) from e
+
+        return {
+            PublicKeyMetadata(
+                fingerprint=pubkey_metadata_elt["v4-fingerprint"],
+                timestamp=parse_datetime(pubkey_metadata_elt["date"])
+            )
+            for pubkey_metadata_elt
+            in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
+        }
+
+    async def __prepare_secret_key_synchronization(
+        self,
+        client: SatXMPPClient
+    ) -> Optional[domish.Element]:
+        """Prepare for secret key synchronization.
+
+        Makes sure the relative protocols and protocol extensions are supported by the
+        server and makes sure that the PEP node for secret synchronization exists and is
+        configured correctly. The node is created if necessary.
+
+        @param client: The client.
+        @return: As part of the preparations, the secret key synchronization PEP node is
+            fetched. The result of that fetch is returned here.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        try:
+            infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos(
+                client,
+                client.jid.userhostJID()
+            ))
+        except Exception as e:
+            raise XMPPInteractionFailed(
+                "Error performing service discovery on the own bare JID."
+            ) from e
+
+        identities = cast(Dict[Tuple[str, str], str], infos.identities)
+        features = cast(Set[DiscoFeature], infos.features)
+
+        if ("pubsub", "pep") not in identities:
+            raise exceptions.FeatureNotFound("Server doesn't support PEP.")
+
+        if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
+            raise exceptions.FeatureNotFound(
+                "Server doesn't support the whitelist access model."
+            )
+
+        persistent_items_supported = \
+            "http://jabber.org/protocol/pubsub#persistent-items" in features
+
+        # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
+
+        node = "urn:xmpp:openpgp:0:secret-key"
+
+        try:
+            items, __ = await self.__xep_0060.getItems(
+                client,
+                client.jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound:
+            try:
+                await self.__xep_0060.createNode(
+                    client,
+                    client.jid.userhostJID(),
+                    node,
+                    {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "whitelist",
+                        XEP_0060.OPT_MAX_ITEMS: "1"
+                    }
+                )
+            except Exception as e:
+                raise XMPPInteractionFailed(
+                    "Error creating the secret key synchronization node."
+                ) from e
+        except Exception as e:
+            raise XMPPInteractionFailed(
+                "Error fetching the secret key synchronization node."
+            ) from e
+
+        try:
+            return cast(domish.Element, items[0])
+        except IndexError:
+            return None
+
+    async def export_secret_keys(
+        self,
+        client: SatXMPPClient,
+        secret_keys: Iterable[GPGSecretKey]
+    ) -> str:
+        """Export secret keys to synchronize them with other devices.
+
+        @param client: The client.
+        @param secret_keys: The secret keys to export.
+        @return: The backup code needed to decrypt the exported secret keys.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        await self.__prepare_secret_key_synchronization(client)
+
+        backup_code = generate_passphrase()
+
+        plaintext = b"".join(
+            gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys
+        )
+
+        ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code)
+
+        node = "urn:xmpp:openpgp:0:secret-key"
+
+        secretkey_elt = domish.Element((NS_OX, "secretkey"))
+        secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
+
+        try:
+            await self.__xep_0060.sendItem(
+                client,
+                client.jid.userhostJID(),
+                node,
+                secretkey_elt
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
+
+        return backup_code
+
+    async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]:
+        """Download previously exported secret keys to import them in a second step.
+
+        The downloading and importing steps are separate since a backup code is required
+        for the import and it should be possible to try multiple backup codes without
+        redownloading the data every time. The second half of the import procedure is
+        provided by :meth:`import_secret_keys`.
+
+        @param client: The client.
+        @return: The encrypted secret keys previously exported, if any.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        item_elt = await self.__prepare_secret_key_synchronization(client)
+        if item_elt is None:
+            return None
+
+        secretkey_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "secretkey"), None)
+        )
+
+        if secretkey_elt is None:
+            return None
+
+        try:
+            SECRETKEY_SCHEMA.validate(secretkey_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                "Publish-Subscribe item doesn't pass secretkey schema validation."
+            ) from e
+
+        return base64.b64decode(str(secretkey_elt))
+
+    def import_secret_keys(
+        self,
+        client: SatXMPPClient,
+        ciphertext: bytes,
+        backup_code: str
+    ) -> Set[GPGSecretKey]:
+        """Import previously downloaded secret keys.
+
+        The downloading and importing steps are separate since a backup code is required
+        for the import and it should be possible to try multiple backup codes without
+        redownloading the data every time. The first half of the import procedure is
+        provided by :meth:`download_secret_keys`.
+
+        @param client: The client to perform this operation with.
+        @param ciphertext: The ciphertext, i.e. the data returned by
+            :meth:`download_secret_keys`.
+        @param backup_code: The backup code needed to decrypt the data.
+        @raise InvalidPacket: if one of the GPG packets building the secret key data is
+            either syntactically or semantically deemed invalid.
+        @raise DecryptionFailed: on decryption failure.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
+            ciphertext,
+            backup_code
+        ))
+
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[jid.JID]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
+        # TODO: This should probably be a global helper somewhere
+
+        bare_jids: Set[jid.JID] = set()
+
+        try:
+            room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
+
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhostJID())
+
+        return bare_jids
+
+    async def get_trust(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey,
+        owner: jid.JID
+    ) -> TrustLevel:
+        """Query the trust level of a public key.
+
+        @param client: The client to perform this operation under.
+        @param public_key: The public key.
+        @param owner: The owner of the public key. Can be a bare JID.
+        @return: The trust level.
+        """
+
+        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+        try:
+            return TrustLevel(await self.__storage[client.profile][key])
+        except KeyError:
+            return TrustLevel.UNDECIDED
+
+    async def set_trust(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey,
+        owner: jid.JID,
+        trust_level: TrustLevel
+    ) -> None:
+        """Set the trust level of a public key.
+
+        @param client: The client to perform this operation under.
+        @param public_key: The public key.
+        @param owner: The owner of the public key. Can be a bare JID.
+        @param trust_leve: The trust level.
+        """
+
+        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+        await self.__storage[client.profile].force(key, trust_level.name)
+
+    async def getTrustUI(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        if entity.resource:
+            raise ValueError("A bare JID is expected.")
+
+        bare_jids: Set[jid.JID]
+        if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity):
+            bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
+        else:
+            bare_jids = { entity.userhostJID() }
+
+        all_public_keys = list({
+            bare_jid: list(self.list_public_keys(client, bare_jid))
+            for bare_jid
+            in bare_jids
+        }.items())
+
+        async def callback(
+            data: Any,
+            profile: str  # pylint: disable=unused-argument
+        ) -> Dict[Never, Never]:
+            """
+            @param data: The XMLUI result produces by the trust UI form.
+            @param profile: The profile.
+            @return: An empty dictionary. The type of the return value was chosen
+                conservatively since the exact options are neither known not needed here.
+            """
+
+            if C.bool(data.get("cancelled", "false")):
+                return {}
+
+            data_form_result = cast(
+                Dict[str, str],
+                xml_tools.XMLUIResult2DataFormResult(data)
+            )
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
+
+                outer_index, inner_index = key.split("_")[1:]
+
+                owner, public_keys = all_public_keys[int(outer_index)]
+                public_key = public_keys[int(inner_index)]
+                trust = TrustLevel(value)
+
+                if (await self.get_trust(client, public_key, owner)) is not trust:
+                    await self.set_trust(client, public_key, owner, value)
+
+            return {}
+
+        submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True)
+
+        result = xml_tools.XMLUI(
+            panel_type=C.XMLUI_FORM,
+            title=D_("OX trust management"),
+            submit_id=submit_id
+        )
+        # Casting this to Any, otherwise all calls on the variable cause type errors
+        # pylint: disable=no-member
+        trust_ui = cast(Any, result)
+        trust_ui.addText(D_(
+            "This is OX trusting system. You'll see below the GPG keys of your "
+            "contacts, and a list selection to trust them or not. A trusted key "
+            "can read your messages in plain text, so be sure to only validate "
+            "keys that you are sure are belonging to your contact. It's better "
+            "to do this when you are next to your contact, so "
+            "you can check the \"fingerprint\" of the key "
+            "yourself. Do *not* validate a key if the fingerprint is wrong!"
+        ))
+
+        own_secret_keys = self.list_secret_keys(client)
+
+        trust_ui.changeContainer("label")
+        for index, secret_key in enumerate(own_secret_keys):
+            trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
+            trust_ui.addText(secret_key.public_key.fingerprint)
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+        for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
+            for inner_index, public_key in enumerate(public_keys):
+                trust_ui.addLabel(D_("Contact"))
+                trust_ui.addJid(jid.JID(owner))
+                trust_ui.addLabel(D_("Fingerprint"))
+                trust_ui.addText(public_key.fingerprint)
+                trust_ui.addLabel(D_("Trust this device?"))
+
+                current_trust_level = await self.get_trust(client, public_key, owner)
+                avaiable_trust_levels = \
+                    { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
+
+                trust_ui.addList(
+                    f"trust_{outer_index}_{inner_index}",
+                    options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+                    selected=current_trust_level.name,
+                    styles=[ "inline" ]
+                )
+
+                trust_ui.addEmpty()
+                trust_ui.addEmpty()
+
+        return result
diff --git a/sat/plugins/plugin_xep_0374.py b/sat/plugins/plugin_xep_0374.py
new file mode 100644
--- /dev/null
+++ b/sat/plugins/plugin_xep_0374.py
@@ -0,0 +1,421 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP Instant Messaging
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Dict, Optional, Set, cast
+
+from typing_extensions import Final
+from wokkel import muc  # type: ignore[import]
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0334 import XEP_0334
+from sat.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "XEP_0374",
+    "NS_OXIM"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "OXIM",
+    C.PI_IMPORT_NAME: "XEP-0374",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0374" ],
+    C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ],
+    C.PI_RECOMMENDATIONS: [ "XEP-0045" ],
+    C.PI_MAIN: "XEP_0374",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Implementation of OXIM"""),
+}
+
+
+# The disco feature
+NS_OXIM: Final = "urn:xmpp:openpgp:im:0"
+
+
+class XEP_0374:
+    """
+    Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0``
+    namespace. MUC messages are supported next to one to one messages. For trust
+    management, the two trust models "BTBV" and "manual" are supported.
+    """
+
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Plugins
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
+        self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"])
+
+        # Triggers
+        sat.trigger.add(
+            "messageReceived",
+            self.__message_received_trigger,
+            priority=100050
+        )
+        sat.trigger.add("send", self.__send_trigger, priority=0)
+
+        # Register the encryption plugin
+        sat.registerEncryptionPlugin(self, "OXIM", NS_OX, 102)
+
+    async def getTrustUI(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        return await self.__xep_0373.getTrustUI(client, entity)
+
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[jid.JID]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
+
+        bare_jids: Set[jid.JID] = set()
+
+        try:
+            room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
+
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhostJID())
+
+        return bare_jids
+
+    async def __message_received_trigger(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        """
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
+            message has fully progressed through the message receiving flow. Can be used
+            to apply treatments to the fully processed message, like marking it as
+            encrypted.
+        @return: Whether to continue the message received flow.
+        """
+        sender_jid = jid.JID(message_elt["from"])
+        feedback_jid: jid.JID
+
+        message_type = message_elt.getAttribute("type", "unknown")
+        is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                log.warning(
+                    "Ignoring MUC message since plugin XEP-0045 is not available."
+                )
+                # Can't handle a MUC message without XEP-0045, let the flow continue
+                # normally
+                return True
+
+            room_jid = feedback_jid = sender_jid.userhostJID()
+
+            try:
+                room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid))
+            except exceptions.NotFound:
+                log.warning(
+                    f"Ignoring MUC message from a room that has not been joined:"
+                    f" {room_jid}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
+            if sender_user is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's user"
+                    f" wasn't found {sender_jid.resource}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
+            if sender_user_jid is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's bare"
+                    f" JID couldn't be found from its user information: {sender_user}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_jid = sender_user_jid
+        else:
+            # I'm not sure why this check is required, this code is copied from XEP-0384
+            if sender_jid.userhostJID() == client.jid.userhostJID():
+                # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
+                # like "to" isn't always set.
+                feedback_jid = jid.JID(message_elt["to"])
+            else:
+                feedback_jid = sender_jid
+
+        sender_bare_jid = sender_jid.userhost()
+
+        openpgp_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(NS_OX, "openpgp"),
+            None
+        ))
+
+        if openpgp_elt is None:
+            # None of our business, let the flow continue
+            return True
+
+        try:
+            payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element(
+                client,
+                openpgp_elt,
+                "signcrypt",
+                jid.JID(sender_bare_jid)
+            )
+        except Exception as e:
+            # TODO: More specific exception handling
+            log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
+                reason=e,
+                xml=message_elt.toXml()
+            ))
+            client.feedback(
+                feedback_jid,
+                D_(
+                    f"An OXIM message from {sender_jid.full()} can't be decrypted:"
+                    f" {e}"
+                ),
+                { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+            )
+            # No point in further processing this message
+            return False
+
+        message_elt.children.remove(openpgp_elt)
+
+        log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}")
+
+        # Remove all body elements from the original element, since those act as
+        # fallbacks in case the encryption protocol is not supported
+        for child in message_elt.elements():
+            if child.name == "body":
+                message_elt.children.remove(child)
+
+        # Move all extension elements from the payload to the stanza root
+        # TODO: There should probably be explicitly forbidden elements here too, just as
+        # for XEP-0420
+        for child in list(payload_elt.elements()):
+            # Remove the child from the content element
+            payload_elt.children.remove(child)
+
+            # Add the child to the stanza
+            message_elt.addChild(child)
+
+        # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
+        trust_level = TrustLevel.UNDECIDED  # TODO: Load the actual trust level
+        if trust_level is TrustLevel.TRUSTED:
+            post_treat.addCallback(client.encryption.markAsTrusted)
+        else:
+            post_treat.addCallback(client.encryption.markAsUntrusted)
+
+        # Mark the message as originally encrypted
+        post_treat.addCallback(
+            client.encryption.markAsEncrypted,
+            namespace=NS_OX
+        )
+
+        # Message processed successfully, continue with the flow
+        return True
+
+    async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
+        """
+        @param client: The client sending this message.
+        @param stanza: The stanza that is about to be sent. Can be modified.
+        @return: Whether the send message flow should continue or not.
+        """
+        # OXIM only handles message stanzas
+        if stanza.name != "message":
+            return True
+
+        # Get the intended recipient
+        recipient = stanza.getAttribute("to", None)
+        if recipient is None:
+            raise exceptions.InternalError(
+                f"Message without recipient encountered. Blocking further processing to"
+                f" avoid leaking plaintext data: {stanza.toXml()}"
+            )
+
+        # Parse the JID
+        recipient_bare_jid = jid.JID(recipient).userhostJID()
+
+        # Check whether encryption with OXIM is requested
+        encryption = client.encryption.getSession(recipient_bare_jid)
+
+        if encryption is None:
+            # Encryption is not requested for this recipient
+            return True
+
+        if encryption["plugin"].namespace != NS_OX:
+            # Encryption is requested for this recipient, but not with OXIM
+            return True
+
+        # All pre-checks done, we can start encrypting!
+        await self.__encrypt(
+            client,
+            stanza,
+            recipient_bare_jid,
+            stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT
+        )
+
+        # Add a store hint if this is a message stanza
+        self.__xep_0334.addHintElements(stanza, [ "store" ])
+
+        # Let the flow continue.
+        return True
+
+    async def __encrypt(
+        self,
+        client: SatXMPPClient,
+        stanza: domish.Element,
+        recipient_jid: jid.JID,
+        is_muc_message: bool
+    ) -> None:
+        """
+        @param client: The client.
+        @param stanza: The stanza, which is modified by this call.
+        @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
+            but doesn't have to.
+        @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
+
+        @warning: The calling code MUST take care of adding the store message processing
+            hint to the stanza if applicable! This can be done before or after this call,
+            the order doesn't matter.
+        """
+
+        recipient_bare_jids: Set[jid.JID]
+        feedback_jid: jid.JID
+
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but plugin XEP-0045 is not"
+                    " available."
+                )
+
+            room_jid = feedback_jid = recipient_jid.userhostJID()
+
+            recipient_bare_jids = self.__get_joined_muc_users(
+                client,
+                self.__xep_0045,
+                room_jid
+            )
+        else:
+            recipient_bare_jids = { recipient_jid.userhostJID() }
+            feedback_jid = recipient_jid.userhostJID()
+
+        log.debug(
+            f"Intercepting message that is to be encrypted by {NS_OX} for"
+            f" {recipient_bare_jids}"
+        )
+
+        signcrypt_elt, payload_elt = \
+            self.__xep_0373.build_signcrypt_element(recipient_bare_jids)
+
+        # Move elements from the stanza to the content element.
+        # TODO: There should probably be explicitly forbidden elements here too, just as
+        # for XEP-0420
+        for child in list(stanza.elements()):
+            # Remove the child from the stanza
+            stanza.children.remove(child)
+
+            # A namespace of ``None`` can be used on domish elements to inherit the
+            # namespace from the parent. When moving elements from the stanza root to
+            # the content element, however, we don't want elements to inherit the
+            # namespace of the content element. Thus, check for elements with ``None``
+            # for their namespace and set the namespace to jabber:client, which is the
+            # namespace of the parent element.
+            if child.uri is None:
+                child.uri = C.NS_CLIENT
+                child.defaultUri = C.NS_CLIENT
+
+            # Add the child with corrected namespaces to the content element
+            payload_elt.addChild(child)
+
+        try:
+            openpgp_elt = await self.__xep_0373.build_openpgp_element(
+                client,
+                signcrypt_elt,
+                recipient_bare_jids
+            )
+        except Exception as e:
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join(jid.userhost() for jid in recipient_bare_jids),
+                    reason=e
+                )
+            )
+            log.warning(msg)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
+            raise e
+
+        stanza.addChild(openpgp_elt)
diff --git a/sat/plugins/plugin_xep_0384.py b/sat/plugins/plugin_xep_0384.py
--- a/sat/plugins/plugin_xep_0384.py
+++ b/sat/plugins/plugin_xep_0384.py
@@ -479,10 +479,10 @@
                         xml_tools.et_elt_2_domish_elt(element),
                         item_id=str(bundle.device_id),
                         extra={
-                            xep_0060.EXTRA_PUBLISH_OPTIONS: {
-                                xep_0060.OPT_MAX_ITEMS: "max"
+                            XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                                XEP_0060.OPT_MAX_ITEMS: "max"
                             },
-                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                            XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
                         }
                     )
                 except (error.StanzaError, Exception) as e:
@@ -519,8 +519,8 @@
                         xml_tools.et_elt_2_domish_elt(element),
                         item_id=xep_0060.ID_SINGLETON,
                         extra={
-                            xep_0060.EXTRA_PUBLISH_OPTIONS: { xep_0060.OPT_MAX_ITEMS: 1 },
-                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                            XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1 },
+                            XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
                         }
                     )
                 except Exception as e:
@@ -546,7 +546,6 @@
                         client,
                         jid.JID(bare_jid),
                         node,
-                        max_items=None,
                         item_ids=[ str(device_id) ]
                     )
                 except Exception as e:
@@ -653,11 +652,11 @@
                     xml_tools.et_elt_2_domish_elt(element),
                     item_id=xep_0060.ID_SINGLETON,
                     extra={
-                        xep_0060.EXTRA_PUBLISH_OPTIONS: {
-                            xep_0060.OPT_MAX_ITEMS: 1,
-                            xep_0060.OPT_ACCESS_MODEL: "open"
+                        XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                            XEP_0060.OPT_MAX_ITEMS: 1,
+                            XEP_0060.OPT_ACCESS_MODEL: "open"
                         },
-                        xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                        XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
                     }
                 )
             except (error.StanzaError, Exception) as e:
diff --git a/sat/plugins/plugin_xep_0420.py b/sat/plugins/plugin_xep_0420.py
--- a/sat/plugins/plugin_xep_0420.py
+++ b/sat/plugins/plugin_xep_0420.py
@@ -22,8 +22,10 @@
 import secrets
 import string
 from typing import Dict, NamedTuple, Optional, Set, Tuple, cast
+from typing_extensions import Final
 
 from lxml import etree
+from sat.core import exceptions
 
 from sat.core.constants import Const as C
 from sat.core.i18n import D_
@@ -68,7 +70,7 @@
 }
 
 
-NS_SCE = "urn:xmpp:sce:1"
+NS_SCE: Final = "urn:xmpp:sce:1"
 
 
 class ProfileRequirementsNotMet(Exception):
@@ -114,7 +116,8 @@
             remain. Do not modify.
         @return: An affix element to include in the envelope. The element must have the
             name :attr:`element_name` and must validate using :attr:`element_schema`.
-        @raise ValueError: if the affix couldn't be built.
+        @raise ValueError: if the affix couldn't be built due to missing information on
+            the stanza.
         """
 
     @abstractmethod
@@ -384,7 +387,7 @@
             by the decryption scheme utilizing SCE.
         @return: The parsed and processed values of all affixes that were present on the
             envelope, notably including the timestamp.
-        @raise ValueError: if the serialized envelope element is malformed.
+        @raise exceptions.ParsingError: if the serialized envelope element is malformed.
         @raise ProfileRequirementsNotMet: if one or more affixes required by the profile
             are missing from the envelope.
         @raise AffixVerificationFailed: if an affix included in the envelope fails to
@@ -399,7 +402,9 @@
         try:
             envelope_serialized_string = envelope_serialized.decode("utf-8")
         except UnicodeError as e:
-            raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e
+            raise exceptions.ParsingError(
+                "Serialized envelope can't bare parsed as utf-8."
+            ) from e
 
         custom_affixes = set(profile.custom_policies.keys())
 
@@ -420,7 +425,9 @@
         try:
             etree.fromstring(envelope_serialized_string, parser)
         except etree.XMLSyntaxError as e:
-            raise ValueError("Serialized envelope doesn't pass schema validation.") from e
+            raise exceptions.ParsingError(
+                "Serialized envelope doesn't pass schema validation."
+            ) from e
 
         # Prepare the envelope and content elements
         envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
@@ -452,7 +459,7 @@
             timestamp_value = None if time_element is None else \
                 XEP_0082.parse_datetime(time_element["stamp"])
         except ValueError as e:
-            raise AffixVerificationFailed("Malformed time affix") from e
+            raise AffixVerificationFailed("Malformed time affix.") from e
 
         # The to affix is verified by comparing the to attribute of the stanza with the
         # JID referenced by the affix. Note that only bare JIDs are compared as per the
diff --git a/sat/tools/xmpp_datetime.py b/sat/tools/xmpp_datetime.py
--- a/sat/tools/xmpp_datetime.py
+++ b/sat/tools/xmpp_datetime.py
@@ -80,12 +80,15 @@
     @param value: A string containing date information formatted according to the Date
         profile specified in XEP-0082.
     @return: The date parsed from the input string.
-    @raise ValueError: if the input string is not correctly formatted.
+    @raise exceptions.ParsingError: if the input string is not correctly formatted.
     """
     # CCYY-MM-DD
 
     # The Date profile of XEP-0082 is equal to the ISO 8601 format.
-    return date.fromisoformat(value)
+    try:
+        return date.fromisoformat(value)
+    except ValueError as e:
+        raise exceptions.ParsingError() from e
 
 
 def format_datetime(
@@ -125,13 +128,16 @@
     @param value: A string containing datetime information formatted according to the
         DateTime profile specified in XEP-0082.
     @return: The datetime parsed from the input string.
-    @raise ValueError: if the input string is not correctly formatted.
+    @raise exceptions.ParsingError: if the input string is not correctly formatted.
     """
     # CCYY-MM-DDThh:mm:ss[.sss]TZD
 
     value, microsecond = __parse_fraction_of_a_second(value)
 
-    result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
+    try:
+        result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
+    except ValueError as e:
+        raise exceptions.ParsingError() from e
 
     if microsecond is not None:
         result = result.replace(microsecond=microsecond)
@@ -167,7 +173,7 @@
     @param value: A string containing time information formatted according to the Time
         profile specified in XEP-0082.
     @return: The time parsed from the input string.
-    @raise ValueError: if the input string is not correctly formatted.
+    @raise exceptions.ParsingError: if the input string is not correctly formatted.
     """
     # hh:mm:ss[.sss][TZD]
 
@@ -177,7 +183,10 @@
     # profile, except that it doesn't handle the letter Z as time zone information for
     # UTC. This can be fixed with a simple string replacement of 'Z' with "+00:00", which
     # is another way to represent UTC.
-    result = time.fromisoformat(value.replace('Z', "+00:00"))
+    try:
+        result = time.fromisoformat(value.replace('Z', "+00:00"))
+    except ValueError as e:
+        raise exceptions.ParsingError() from e
 
     if microsecond is not None:
         result = result.replace(microsecond=microsecond)
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -52,13 +52,14 @@
     'urwid-satext == 0.9.*',
     'wokkel >= 18.0.0, < 19.0.0',
     'omemo >= 1.0.0, < 2',
-    'twomemo >= 1.0.0, < 2',
-    'oldmemo >= 1.0.0, < 2',
+    'twomemo[xml] >= 1.0.0, < 2',
+    'oldmemo[xml] >= 1.0.0, < 2',
     'pyyaml < 7.0.0',
     'sqlalchemy >= 1.4',
     'alembic',
     'aiosqlite',
     'txdbus',
+    'xmlschema',
 ]
 
 extras_require = {
diff --git a/tests/unit/test_plugin_xep_0082.py b/tests/unit/test_plugin_xep_0082.py
--- a/tests/unit/test_plugin_xep_0082.py
+++ b/tests/unit/test_plugin_xep_0082.py
@@ -19,6 +19,7 @@
 from datetime import date, datetime, time, timezone
 
 import pytest
+from sat.core import exceptions
 
 from sat.plugins.plugin_xep_0082 import XEP_0082
 
@@ -104,7 +105,7 @@
     assert XEP_0082.parse_datetime("1969-07-20T21:56:15-05:00") == value
 
     # Without timezone, without a fraction of a second
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0082.parse_datetime("1969-07-21T02:56:15")
 
     # With timezone 'Z', with a fraction of a second consisting of two digits
@@ -123,7 +124,7 @@
     assert XEP_0082.parse_datetime("1969-07-20T21:56:15.-05:00") == value
 
     # Without timezone, with a fraction of a second consisting of six digits
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0082.parse_datetime("1969-07-21T02:56:15.123456")
 
 
diff --git a/tests/unit/test_plugin_xep_0373.py b/tests/unit/test_plugin_xep_0373.py
new file mode 100644
--- /dev/null
+++ b/tests/unit/test_plugin_xep_0373.py
@@ -0,0 +1,146 @@
+from datetime import datetime, timedelta, timezone
+from sat.plugins.plugin_xep_0373 import XEP_0373, NS_OX
+from sat.tools.xmpp_datetime import parse_datetime
+
+import pytest
+from twisted.words.protocols.jabber import jid
+
+
+a = jid.JID("foo@example.com")
+b = jid.JID("bar@example.com")
+
+
+def test_signcrypt_element_args() -> None:
+    with pytest.raises(ValueError):
+        XEP_0373.build_signcrypt_element([])
+
+
+def test_signcrypt_element() -> None:
+    signcrypt_elt, payload_elt = XEP_0373.build_signcrypt_element([ a, b ])
+    payload_elt.addElement("signcrypt-test-content", content="signcrypt test content")
+
+    rpad_elt = next(signcrypt_elt.elements(NS_OX, "rpad"))
+    time_elt = next(signcrypt_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    signcrypt_elt.children.remove(rpad_elt)
+    signcrypt_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert signcrypt_elt.toXml() == (
+        "<signcrypt xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<signcrypt-test-content>signcrypt test content</signcrypt-test-content>"
+            "</payload>"
+        "</signcrypt>"
+    )
+
+
+def test_sign_element_args() -> None:
+    with pytest.raises(ValueError):
+        XEP_0373.build_sign_element([], True)
+
+
+def test_sign_element_with_rpad() -> None:
+    sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], True)
+    payload_elt.addElement("sign-test-content", content="sign test content")
+
+    rpad_elt = next(sign_elt.elements(NS_OX, "rpad"))
+    time_elt = next(sign_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    sign_elt.children.remove(rpad_elt)
+    sign_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert sign_elt.toXml() == (
+        "<sign xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<sign-test-content>sign test content</sign-test-content>"
+            "</payload>"
+        "</sign>"
+    )
+
+
+def test_sign_element_without_rpad() -> None:
+    sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], False)
+    payload_elt.addElement("sign-test-content", content="sign test content")
+
+    rpad_elt = next(sign_elt.elements(NS_OX, "rpad"), None)
+    time_elt = next(sign_elt.elements(NS_OX, "time"))
+
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    sign_elt.children.remove(time_elt)
+
+    assert rpad_elt is None
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert sign_elt.toXml() == (
+        "<sign xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<sign-test-content>sign test content</sign-test-content>"
+            "</payload>"
+        "</sign>"
+    )
+
+
+def test_crypt_element_with_recipients() -> None:
+    crypt_elt, payload_elt = XEP_0373.build_crypt_element([ a, b ])
+    payload_elt.addElement("crypt-test-content", content="crypt test content")
+
+    rpad_elt = next(crypt_elt.elements(NS_OX, "rpad"))
+    time_elt = next(crypt_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    crypt_elt.children.remove(rpad_elt)
+    crypt_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert crypt_elt.toXml() == (
+        "<crypt xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<crypt-test-content>crypt test content</crypt-test-content>"
+            "</payload>"
+        "</crypt>"
+    )
+
+
+def test_crypt_element_without_recipients() -> None:
+    crypt_elt, payload_elt = XEP_0373.build_crypt_element([])
+    payload_elt.addElement("crypt-test-content", content="crypt test content")
+
+    rpad_elt = next(crypt_elt.elements(NS_OX, "rpad"))
+    time_elt = next(crypt_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    crypt_elt.children.remove(rpad_elt)
+    crypt_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert crypt_elt.toXml() == (
+        "<crypt xmlns='urn:xmpp:openpgp:0'>"
+            "<payload>"
+                "<crypt-test-content>crypt test content</crypt-test-content>"
+            "</payload>"
+        "</crypt>"
+    )
diff --git a/tests/unit/test_plugin_xep_0420.py b/tests/unit/test_plugin_xep_0420.py
--- a/tests/unit/test_plugin_xep_0420.py
+++ b/tests/unit/test_plugin_xep_0420.py
@@ -20,6 +20,7 @@
 from typing import Callable, cast
 
 import pytest
+from sat.core import exceptions
 
 from sat.plugins.plugin_xep_0334 import NS_HINTS
 from sat.plugins.plugin_xep_0420 import (
@@ -435,7 +436,7 @@
         </body>
     </message>""")
 
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0420.unpack_stanza(
             unpacking_profile,
             stanza,
@@ -558,5 +559,5 @@
         <unknown-affix unknown-attr="unknown"/>
     </envelope>""".encode("utf-8")
 
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)