diff --git a/sat/plugins/plugin_xep_0060.py b/sat/plugins/plugin_xep_0060.py
--- a/sat/plugins/plugin_xep_0060.py
+++ b/sat/plugins/plugin_xep_0060.py
@@ -893,14 +893,20 @@
client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
)
- def createNode(self, client, service, nodeIdentifier=None, options=None):
+ def createNode(
+ self,
+ client: SatXMPPClient,
+ service: jid.JID,
+ nodeIdentifier: Optional[str] = None,
+ options: Optional[Dict[str, str]] = None
+ ) -> str:
"""Create a new node
- @param service(jid.JID): PubSub service,
- @param NodeIdentifier(unicode, None): node name
- use None to create instant node (identifier will be returned by this method)
- @param option(dict[unicode, unicode], None): node configuration options
- @return (unicode): identifier of the created node (may be different from requested name)
+ @param service: PubSub service,
+ @param NodeIdentifier: node name use None to create instant node (identifier will
+ be returned by this method)
+ @param option: node configuration options
+ @return: identifier of the created node (may be different from requested name)
"""
# TODO: if pubsub service doesn't hande publish-options, configure it in a second time
return client.pubsub_client.createNode(service, nodeIdentifier, options)
diff --git a/sat/plugins/plugin_xep_0373.py b/sat/plugins/plugin_xep_0373.py
new file mode 100644
--- /dev/null
+++ b/sat/plugins/plugin_xep_0373.py
@@ -0,0 +1,2085 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from abc import ABC, abstractmethod
+import base64
+from datetime import datetime, timezone
+import enum
+import secrets
+import string
+from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast
+from xml.sax.saxutils import quoteattr
+
+from typing_extensions import Final, NamedTuple, Never, assert_never
+from wokkel import muc, pubsub
+from wokkel.disco import DiscoFeature, DiscoInfo
+import xmlschema
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.memory import persistent
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0060 import XEP_0060
+from sat.plugins.plugin_xep_0163 import XEP_0163
+from sat.tools.xmpp_datetime import format_datetime, parse_datetime
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+try:
+ import gpg
+except ImportError as import_error:
+ raise exceptions.MissingModule(
+ "You are missing the 'gpg' package required by the OX plugin. The recommended"
+ " installation method is via your operating system's package manager, since the"
+ " version of the library has to match the version of your GnuPG installation. See"
+ " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme"
+ ) from import_error
+
+
+__all__ = [ # pylint: disable=unused-variable
+ "PLUGIN_INFO",
+ "NS_OX",
+ "XEP_0373",
+ "VerificationError",
+ "XMPPInteractionFailed",
+ "InvalidPacket",
+ "DecryptionFailed",
+ "VerificationFailed",
+ "UnknownKey",
+ "GPGProviderError",
+ "GPGPublicKey",
+ "GPGSecretKey",
+ "GPGProvider",
+ "PublicKeyMetadata",
+ "gpg_provider",
+ "TrustLevel"
+]
+
+
+log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+ C.PI_NAME: "XEP-0373",
+ C.PI_IMPORT_NAME: "XEP-0373",
+ C.PI_TYPE: "SEC",
+ C.PI_PROTOCOLS: [ "XEP-0373" ],
+ C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
+ C.PI_RECOMMENDATIONS: [],
+ C.PI_MAIN: "XEP_0373",
+ C.PI_HANDLER: "no",
+ C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
+}
+
+
+NS_OX: Final = "urn:xmpp:openpgp:0"
+
+
+PARAM_CATEGORY = "Security"
+PARAM_NAME = "ox_policy"
+
+
+class VerificationError(Exception):
+ """
+ Raised by verifying methods of :class:`XEP_0373` on semantical verification errors.
+ """
+
+
+class XMPPInteractionFailed(Exception):
+ """
+ Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this
+ exception exists is that the exceptions raised by XMPP interactions are not properly
+ documented for the most part, thus all exceptions are caught and wrapped in instances
+ of this class.
+ """
+
+
+class InvalidPacket(ValueError):
+ """
+ Raised by methods of :class:`GPGProvider` when an invalid packet is encountered.
+ """
+
+
+class DecryptionFailed(Exception):
+ """
+ Raised by methods of :class:`GPGProvider` on decryption failures.
+ """
+
+
+class VerificationFailed(Exception):
+ """
+ Raised by methods of :class:`GPGProvider` on verification failures.
+ """
+
+
+class UnknownKey(ValueError):
+ """
+ Raised by methods of :class:`GPGProvider` when an unknown key is referenced.
+ """
+
+
+class GPGProviderError(Exception):
+ """
+ Raised by methods of :class:`GPGProvider` on internal errors.
+ """
+
+
+class GPGPublicKey(ABC):
+ """
+ Interface describing a GPG public key.
+ """
+
+ @property
+ @abstractmethod
+ def fingerprint(self) -> str:
+ """
+ @return: The OpenPGP v4 fingerprint string of this public key.
+ """
+
+
+class GPGSecretKey(ABC):
+ """
+ Interface descibing a GPG secret key.
+ """
+
+ @property
+ @abstractmethod
+ def public_key(self) -> GPGPublicKey:
+ """
+ @return: The public key corresponding to this secret key.
+ """
+
+
+class GPGProvider(ABC):
+ """
+ Interface describing a GPG provider, i.e. a library or framework providing GPG
+ encryption, signing and key management.
+
+ All methods may raise :class:`GPGProviderError` in addition to those exception types
+ listed explicitly.
+
+ # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?)
+ """
+
+ @abstractmethod
+ def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+ """Export a public key in a key material packet according to RFC 4880 §5.5.
+
+ Do not use OpenPGP's ASCII Armor.
+
+ @param public_key: The public key to export.
+ @return: The packet containing the exported public key.
+ @raise UnknownKey: if the public key is not available.
+ """
+
+ @abstractmethod
+ def import_public_key(self, packet: bytes) -> GPGPublicKey:
+ """Import a public key from a key material packet according to RFC 4880 §5.5.
+
+ OpenPGP's ASCII Armor is not used.
+
+ @param packet: A packet containing an exported public key.
+ @return: The public key imported from the packet.
+ @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+ invalid.
+
+ @warning: Only packets of version 4 or higher may be accepted, packets below
+ version 4 MUST be rejected.
+ """
+
+ @abstractmethod
+ def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+ """Export a secret key for transfer according to RFC 4880 §11.1.
+
+ Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage
+ conventions to zero in the corresponding secret-key packet according to RFC 4880
+ §5.5.3. Do not use OpenPGP's ASCII Armor.
+
+ @param secret_key: The secret key to export.
+ @return: The binary blob containing the exported secret key.
+ @raise UnknownKey: if the secret key is not available.
+ """
+
+ @abstractmethod
+ def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+ """Restore secret keys exported for transfer according to RFC 4880 §11.1.
+
+ The secret data is not encrypted, i.e. the octet indicating string-to-key usage
+ conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3
+ are set to zero. OpenPGP's ASCII Armor is not used.
+
+ @param data: Concatenation of one or more secret keys exported for transfer.
+ @return: The secret keys imported from the data.
+ @raise InvalidPacket: if the data or one of the packets included in the data is
+ either syntactically or semantically deemed invalid.
+
+ @warning: Only packets of version 4 or higher may be accepted, packets below
+ version 4 MUST be rejected.
+ """
+
+ @abstractmethod
+ def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+ """Encrypt data symmetrically according to RFC 4880 §5.3.
+
+ The password is used to build a Symmetric-Key Encrypted Session Key packet which
+ precedes the Symmetrically Encrypted Data packet that holds the encrypted data.
+
+ @param plaintext: The data to encrypt.
+ @param password: The password to encrypt the data with.
+ @return: The encrypted data.
+ """
+
+ @abstractmethod
+ def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+ """Decrypt data symmetrically according to RFC 4880 §5.3.
+
+ The ciphertext consists of a Symmetrically Encrypted Data packet that holds the
+ encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the
+ password.
+
+ @param ciphertext: The ciphertext.
+ @param password: The password to decrypt the data with.
+ @return: The plaintext.
+ @raise DecryptionFailed: on decryption failure.
+ """
+
+ @abstractmethod
+ def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+ """Sign some data.
+
+ OpenPGP's ASCII Armor is not used.
+
+ @param data: The data to sign.
+ @param secret_keys: The secret keys to sign the data with.
+ @return: The OpenPGP message carrying the signed data.
+ """
+
+ @abstractmethod
+ def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+ """Sign some data. Create the signature detached from the data.
+
+ OpenPGP's ASCII Armor is not used.
+
+ @param data: The data to sign.
+ @param secret_keys: The secret keys to sign the data with.
+ @return: The OpenPGP message carrying the detached signature.
+ """
+
+ @abstractmethod
+ def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+ """Verify signed data.
+
+ OpenPGP's ASCII Armor is not used.
+
+ @param signed_data: The signed data as an OpenPGP message.
+ @param public_keys: The public keys to verify the signature with.
+ @return: The verified and unpacked data.
+ @raise VerificationFailed: if the data could not be verified.
+
+ @warning: For implementors: it has to be confirmed that a valid signature by one
+ of the public keys is available.
+ """
+
+ @abstractmethod
+ def verify_detached(
+ self,
+ data: bytes,
+ signature: bytes,
+ public_keys: Set[GPGPublicKey]
+ ) -> None:
+ """Verify signed data, where the signature was created detached from the data.
+
+ OpenPGP's ASCII Armor is not used.
+
+ @param data: The data.
+ @param signature: The signature as an OpenPGP message.
+ @param public_keys: The public keys to verify the signature with.
+ @raise VerificationFailed: if the data could not be verified.
+
+ @warning: For implementors: it has to be confirmed that a valid signature by one
+ of the public keys is available.
+ """
+
+ @abstractmethod
+ def encrypt(
+ self,
+ plaintext: bytes,
+ public_keys: Set[GPGPublicKey],
+ signing_keys: Optional[Set[GPGSecretKey]] = None
+ ) -> bytes:
+ """Encrypt and optionally sign some data.
+
+ OpenPGP's ASCII Armor is not used.
+
+ @param plaintext: The data to encrypt and optionally sign.
+ @param public_keys: The public keys to encrypt the data for.
+ @param signing_keys: The secret keys to sign the data with.
+ @return: The OpenPGP message carrying the encrypted and optionally signed data.
+ """
+
+ @abstractmethod
+ def decrypt(
+ self,
+ ciphertext: bytes,
+ secret_keys: Set[GPGSecretKey],
+ public_keys: Optional[Set[GPGPublicKey]] = None
+ ) -> bytes:
+ """Decrypt and optionally verify some data.
+
+ OpenPGP's ASCII Armor is not used.
+
+ @param ciphertext: The encrypted and optionally signed data as an OpenPGP message.
+ @param secret_keys: The secret keys to attempt decryption with.
+ @param public_keys: The public keys to verify the optional signature with.
+ @return: The decrypted, optionally verified and unpacked data.
+ @raise DecryptionFailed: on decryption failure.
+ @raise VerificationFailed: if the data could not be verified.
+
+ @warning: For implementors: it has to be confirmed that the data was decrypted
+ using one of the secret keys and that a valid signature by one of the public
+ keys is available in case the data is signed.
+ """
+
+ @abstractmethod
+ def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+ """List public keys.
+
+ @param user_id: The user id.
+ @return: The set of public keys available for this user id.
+ """
+
+ @abstractmethod
+ def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+ """List secret keys.
+
+ @param user_id: The user id.
+ @return: The set of secret keys available for this user id.
+ """
+
+ @abstractmethod
+ def can_sign(self, public_key: GPGPublicKey) -> bool:
+ """
+ @return: Whether the public key belongs to a key pair capable of signing.
+ """
+
+ @abstractmethod
+ def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+ """
+ @return: Whether the public key belongs to a key pair capable of encryption.
+ """
+
+ @abstractmethod
+ def create_key(self, user_id: str) -> GPGSecretKey:
+ """Create a new GPG key, capable of signing and encryption.
+
+ The key is generated without password protection and without expiration. If a key
+ with the same user id already exists, a new key is created anyway.
+
+ @param user_id: The user id to assign to the new key.
+ @return: The new key.
+ """
+
+
+class GPGME_GPGPublicKey(GPGPublicKey):
+ """
+ GPG public key implementation based on GnuPG Made Easy (GPGME).
+ """
+
+ def __init__(self, key_obj: Any) -> None:
+ """
+ @param key_obj: The GPGME key object.
+ """
+
+ self.__key_obj = key_obj
+
+ @property
+ def fingerprint(self) -> str:
+ return self.__key_obj.fpr
+
+ @property
+ def key_obj(self) -> Any:
+ return self.__key_obj
+
+
+class GPGME_GPGSecretKey(GPGSecretKey):
+ """
+ GPG secret key implementation based on GnuPG Made Easy (GPGME).
+ """
+
+ def __init__(self, public_key: GPGME_GPGPublicKey) -> None:
+ """
+ @param public_key: The public key corresponding to this secret key.
+ """
+
+ self.__public_key = public_key
+
+ @property
+ def public_key(self) -> GPGME_GPGPublicKey:
+ return self.__public_key
+
+
+class GPGME_GPGProvider(GPGProvider):
+ """
+ GPG provider implementation based on GnuPG Made Easy (GPGME).
+ """
+
+ def __init__(self, home_dir: Optional[str] = None) -> None:
+ """
+ @param home_dir: Optional GPG home directory path to use for all operations.
+ """
+
+ self.__home_dir = home_dir
+
+ def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+ assert isinstance(public_key, GPGME_GPGPublicKey)
+
+ pattern = public_key.fingerprint
+
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ result = c.key_export_minimal(pattern)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+
+ if result is None:
+ raise UnknownKey(f"Public key {pattern} not found.")
+
+ return result
+
+ def import_public_key(self, packet: bytes) -> GPGPublicKey:
+ # TODO
+ # - Reject packets older than version 4
+ # - Check whether it's actually a public key (through packet inspection?)
+
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ result = c.key_import(packet)
+ except gpg.errors.GPGMEError as e:
+ # From looking at the code, `key_import` never raises. The documentation
+ # says it does though, so this is included for future-proofness.
+ raise GPGProviderError("Internal GPGME error") from e
+
+ if not hasattr(result, "considered"):
+ raise InvalidPacket(
+ f"Data not considered for public key import: {result}"
+ )
+
+ if len(result.imports) != 1:
+ raise InvalidPacket(
+ "Public key packet does not contain exactly one public key (not"
+ " counting subkeys)."
+ )
+
+ try:
+ key_obj = c.get_key(result.imports[0].fpr, secret=False)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.KeyError as e:
+ raise GPGProviderError("Newly imported public key not found") from e
+
+ return GPGME_GPGPublicKey(key_obj)
+
+ def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+ assert isinstance(secret_key, GPGME_GPGSecretKey)
+ # TODO
+ # - Handle password protection/pinentry
+ # - Make sure the key is exported unencrypted
+
+ pattern = secret_key.public_key.fingerprint
+
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ result = c.key_export_secret(pattern)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+
+ if result is None:
+ raise UnknownKey(f"Secret key {pattern} not found.")
+
+ return result
+
+ def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+ # TODO
+ # - Reject packets older than version 4
+ # - Check whether it's actually secret keys (through packet inspection?)
+
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ result = c.key_import(data)
+ except gpg.errors.GPGMEError as e:
+ # From looking at the code, `key_import` never raises. The documentation
+ # says it does though, so this is included for future-proofness.
+ raise GPGProviderError("Internal GPGME error") from e
+
+ if not hasattr(result, "considered"):
+ raise InvalidPacket(
+ f"Data not considered for secret key import: {result}"
+ )
+
+ if len(result.imports) == 0:
+ raise InvalidPacket("Secret key packet does not contain a secret key.")
+
+ secret_keys = set()
+ for import_status in result.imports:
+ try:
+ key_obj = c.get_key(import_status.fpr, secret=True)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.KeyError as e:
+ raise GPGProviderError("Newly imported secret key not found") from e
+
+ secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)))
+
+ return secret_keys
+
+ def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+
+ return ciphertext
+
+ def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ plaintext, __, __ = c.decrypt(
+ ciphertext,
+ passphrase=password,
+ verify=False
+ )
+ except gpg.errors.GPGMEError as e:
+ # TODO: Find out what kind of error is raised if the password is wrong and
+ # re-raise it as DecryptionFailed instead.
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.UnsupportedAlgorithm as e:
+ raise DecryptionFailed("Unsupported algorithm") from e
+
+ return plaintext
+
+ def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+ signers = []
+ for secret_key in secret_keys:
+ assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+ signers.append(secret_key.public_key.key_obj)
+
+ with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+ try:
+ signed_data, __ = c.sign(data)
+ except gpg.error.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.InvalidSigners as e:
+ raise GPGProviderError(
+ "At least one of the secret keys is invalid for signing"
+ ) from e
+
+ return signed_data
+
+ def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+ signers = []
+ for secret_key in secret_keys:
+ assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+ signers.append(secret_key.public_key.key_obj)
+
+ with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+ try:
+ signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH)
+ except gpg.error.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.InvalidSigners as e:
+ raise GPGProviderError(
+ "At least one of the secret keys is invalid for signing"
+ ) from e
+
+ return signature
+
+ def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ data, result = c.verify(signed_data)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.BadSignatures as e:
+ raise VerificationFailed("Bad signatures on signed data") from e
+
+ valid_signature_found = False
+ for public_key in public_keys:
+ assert isinstance(public_key, GPGME_GPGPublicKey)
+
+ for subkey in public_key.key_obj.subkeys:
+ for sig in result.signatures:
+ if subkey.can_sign and subkey.fpr == sig.fpr:
+ valid_signature_found = True
+
+ if not valid_signature_found:
+ raise VerificationFailed(
+ "Data not signed by one of the expected public keys"
+ )
+
+ return data
+
+ def verify_detached(
+ self,
+ data: bytes,
+ signature: bytes,
+ public_keys: Set[GPGPublicKey]
+ ) -> None:
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ __, result = c.verify(data, signature=signature)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.BadSignatures as e:
+ raise VerificationFailed("Bad signatures on signed data") from e
+
+ valid_signature_found = False
+ for public_key in public_keys:
+ assert isinstance(public_key, GPGME_GPGPublicKey)
+
+ for subkey in public_key.key_obj.subkeys:
+ for sig in result.signatures:
+ if subkey.can_sign and subkey.fpr == sig.fpr:
+ valid_signature_found = True
+
+ if not valid_signature_found:
+ raise VerificationFailed(
+ "Data not signed by one of the expected public keys"
+ )
+
+ def encrypt(
+ self,
+ plaintext: bytes,
+ public_keys: Set[GPGPublicKey],
+ signing_keys: Optional[Set[GPGSecretKey]] = None
+ ) -> bytes:
+ recipients = []
+ for public_key in public_keys:
+ assert isinstance(public_key, GPGME_GPGPublicKey)
+
+ recipients.append(public_key.key_obj)
+
+ signers = []
+ if signing_keys is not None:
+ for secret_key in signing_keys:
+ assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+ signers.append(secret_key.public_key.key_obj)
+
+ sign = signing_keys is not None
+
+ with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+ try:
+ ciphertext, __, __ = c.encrypt(
+ plaintext,
+ recipients=recipients,
+ sign=sign,
+ always_trust=True,
+ add_encrypt_to=True
+ )
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.InvalidRecipients as e:
+ raise GPGProviderError(
+ "At least one of the public keys is invalid for encryption"
+ ) from e
+ except gpg.errors.InvalidSigners as e:
+ raise GPGProviderError(
+ "At least one of the signing keys is invalid for signing"
+ ) from e
+
+ return ciphertext
+
+ def decrypt(
+ self,
+ ciphertext: bytes,
+ secret_keys: Set[GPGSecretKey],
+ public_keys: Optional[Set[GPGPublicKey]] = None
+ ) -> bytes:
+ verify = public_keys is not None
+
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ plaintext, result, verify_result = c.decrypt(
+ ciphertext,
+ verify=verify
+ )
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.UnsupportedAlgorithm as e:
+ raise DecryptionFailed("Unsupported algorithm") from e
+
+ # TODO: Check whether the data was decrypted using one of the expected secret
+ # keys
+
+ if public_keys is not None:
+ valid_signature_found = False
+ for public_key in public_keys:
+ assert isinstance(public_key, GPGME_GPGPublicKey)
+
+ for subkey in public_key.key_obj.subkeys:
+ for sig in verify_result.signatures:
+ if subkey.can_sign and subkey.fpr == sig.fpr:
+ valid_signature_found = True
+
+ if not valid_signature_found:
+ raise VerificationFailed(
+ "Data not signed by one of the expected public keys"
+ )
+
+ return plaintext
+
+ def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ return {
+ GPGME_GPGPublicKey(key)
+ for key
+ in c.keylist(pattern=user_id, secret=False)
+ }
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+
+ def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ return {
+ GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
+ for key
+ in c.keylist(pattern=user_id, secret=True)
+ }
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+
+ def can_sign(self, public_key: GPGPublicKey) -> bool:
+ assert isinstance(public_key, GPGME_GPGPublicKey)
+
+ return any(subkey.can_sign for subkey in public_key.key_obj.subkeys)
+
+ def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+ assert isinstance(public_key, GPGME_GPGPublicKey)
+
+ return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys)
+
+ def create_key(self, user_id: str) -> GPGSecretKey:
+ with gpg.Context(home_dir=self.__home_dir) as c:
+ try:
+ result = c.create_key(
+ user_id,
+ expires=False,
+ sign=True,
+ encrypt=True,
+ certify=False,
+ authenticate=False,
+ force=True
+ )
+
+ key_obj = c.get_key(result.fpr, secret=True)
+ except gpg.errors.GPGMEError as e:
+ raise GPGProviderError("Internal GPGME error") from e
+ except gpg.errors.KeyError as e:
+ raise GPGProviderError("Newly created key not found") from e
+
+ return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))
+
+
+class PublicKeyMetadata(NamedTuple):
+ """
+ Metadata about a published public key.
+ """
+
+ fingerprint: str
+ timestamp: datetime
+
+
+@enum.unique
+class TrustLevel(enum.Enum):
+ """
+ The trust levels required for BTBV and manual trust.
+ """
+
+ TRUSTED: str = "TRUSTED"
+ BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
+ UNDECIDED: str = "UNDECIDED"
+ DISTRUSTED: str = "DISTRUSTED"
+
+
+OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ targetNamespace="urn:xmpp:openpgp:0"
+ xmlns="urn:xmpp:openpgp:0">
+
+ <xs:element name="openpgp" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+# The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
+# Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
+# implementation of XML Schema, including version 1.1.
+CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ targetNamespace="urn:xmpp:openpgp:0"
+ xmlns="urn:xmpp:openpgp:0">
+
+ <xs:element name="signcrypt">
+ <xs:complexType>
+ <xs:all>
+ <xs:element ref="to" maxOccurs="unbounded"/>
+ <xs:element ref="time"/>
+ <xs:element ref="rpad" minOccurs="0"/>
+ <xs:element ref="payload"/>
+ </xs:all>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="sign">
+ <xs:complexType>
+ <xs:all>
+ <xs:element ref="to" maxOccurs="unbounded"/>
+ <xs:element ref="time"/>
+ <xs:element ref="rpad" minOccurs="0"/>
+ <xs:element ref="payload"/>
+ </xs:all>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="crypt">
+ <xs:complexType>
+ <xs:all>
+ <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element ref="time"/>
+ <xs:element ref="rpad" minOccurs="0"/>
+ <xs:element ref="payload"/>
+ </xs:all>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="to">
+ <xs:complexType>
+ <xs:attribute name="jid" type="xs:string"/>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="time">
+ <xs:complexType>
+ <xs:attribute name="stamp" type="xs:dateTime"/>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="rpad" type="xs:string"/>
+
+ <xs:element name="payload">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+</xs:schema>
+""")
+
+
+PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
+PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ targetNamespace="urn:xmpp:openpgp:0"
+ xmlns="urn:xmpp:openpgp:0">
+
+ <xs:element name="public-keys-list">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="pubkey-metadata">
+ <xs:complexType>
+ <xs:attribute name="v4-fingerprint" type="xs:string"/>
+ <xs:attribute name="date" type="xs:dateTime"/>
+ </xs:complexType>
+ </xs:element>
+</xs:schema>
+""")
+
+
+PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ targetNamespace="urn:xmpp:openpgp:0"
+ xmlns="urn:xmpp:openpgp:0">
+
+ <xs:element name="pubkey">
+ <xs:complexType>
+ <xs:all>
+ <xs:element ref="data"/>
+ </xs:all>
+ <xs:anyAttribute processContents="skip"/>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="data" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ targetNamespace="urn:xmpp:openpgp:0"
+ xmlns="urn:xmpp:openpgp:0">
+
+ <xs:element name="secretkey" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+DEFAULT_TRUST_MODEL_PARAM = f"""
+<params>
+<individual>
+<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
+ <param name="{PARAM_NAME}"
+ label={quoteattr(D_('OMEMO default trust policy'))}
+ type="list" security="3">
+ <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+ <option value="btbv"
+ label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
+ selected="true" />
+ </param>
+</category>
+</individual>
+</params>
+"""
+
+
+def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider:
+ """Get the GPG provider for a client.
+
+ @param sat: The SAT instance.
+ @param client: The client.
+ @return: The GPG provider specifically for that client.
+ """
+
+ return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home")))
+
+
+def generate_passphrase() -> str:
+ """Generate a secure passphrase for symmetric encryption.
+
+ @return: The passphrase.
+ """
+
+ return "-".join("".join(
+ secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
+ ) for __ in range(6))
+
+
+# TODO: Handle the user id mess
+class XEP_0373:
+ """
+ Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``.
+ """
+
+ def __init__(self, sat: SAT) -> None:
+ """
+ @param sat: The SAT instance.
+ """
+
+ self.__sat = sat
+
+ # Add configuration option to choose between manual trust and BTBV as the trust
+ # model
+ sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM)
+
+ self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+ self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
+
+ self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}
+
+ xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
+ xep_0163.addPEPEvent(
+ "OX_PUBLIC_KEYS_LIST",
+ PUBLIC_KEYS_LIST_NODE,
+ lambda items_event, profile: defer.ensureDeferred(
+ self.__on_public_keys_list_update(items_event, profile)
+ )
+ )
+
+ async def profileConnected( # pylint: disable=invalid-name
+ self,
+ client: SatXMPPClient
+ ) -> None:
+ """
+ @param client: The client.
+ """
+
+ profile = cast(str, client.profile)
+
+ if not profile in self.__storage:
+ self.__storage[profile] = \
+ persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)
+
+ if len(self.list_secret_keys(client)) == 0:
+ log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
+ await self.create_key(client)
+
+ async def __on_public_keys_list_update(
+ self,
+ items_event: pubsub.ItemsEvent,
+ profile: str
+ ) -> None:
+ """Handle public keys list updates fired by PEP.
+
+ @param items_event: The event.
+ @param profile: The profile this event belongs to.
+ """
+
+ client = self.__sat.getClient(profile)
+
+ sender = cast(jid.JID, items_event.sender)
+ items = cast(List[domish.Element], items_event.items)
+
+ if len(items) > 1:
+ log.warning("Ignoring public keys list update with more than one element.")
+ return
+
+ item_elt = next(iter(items), None)
+ if item_elt is None:
+ log.debug("Ignoring empty public keys list update.")
+ return
+
+ public_keys_list_elt = cast(
+ Optional[domish.Element],
+ next(item_elt.elements(NS_OX, "public-keys-list"), None)
+ )
+
+ pubkey_metadata_elts: Optional[List[domish.Element]] = None
+
+ if public_keys_list_elt is not None:
+ try:
+ PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+ except xmlschema.XMLSchemaValidationError:
+ pass
+ else:
+ pubkey_metadata_elts = \
+ list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))
+
+ if pubkey_metadata_elts is None:
+ log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
+ return
+
+ new_public_keys_metadata = { PublicKeyMetadata(
+ fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
+ timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
+ ) for pubkey_metadata_elt in pubkey_metadata_elts }
+
+ storage_key = f"/public-keys-metadata/{sender.userhost()}"
+
+ local_public_keys_metadata = cast(
+ Set[PublicKeyMetadata],
+ await self.__storage[profile].get(storage_key, set())
+ )
+
+ unchanged_keys = new_public_keys_metadata & local_public_keys_metadata
+ changed_or_new_keys = new_public_keys_metadata - unchanged_keys
+ available_keys = self.list_public_keys(client, sender)
+
+ for key_metadata in changed_or_new_keys:
+ # Check whether the changed or new key has been imported before
+ if any(key.fingerprint == key_metadata.fingerprint for key in available_keys):
+ try:
+ # If it has been imported before, try to update it
+ await self.import_public_key(client, sender, key_metadata.fingerprint)
+ except Exception as e:
+ log.warning(f"Public key import failed: {e}")
+
+ # If the update fails, remove the key from the local metadata list
+ # such that the update is attempted again next time
+ new_public_keys_metadata.remove(key_metadata)
+
+ # Check whether this update was for our account and make sure all of our keys are
+ # included in the update
+ if sender.userhost() == client.jid.userhost():
+ secret_keys = self.list_secret_keys(client)
+ missing_keys = set(filter(lambda secret_key: all(
+ key_metadata.fingerprint != secret_key.public_key.fingerprint
+ for key_metadata
+ in new_public_keys_metadata
+ ), secret_keys))
+
+ if len(missing_keys) > 0:
+ log.warning(
+ "Public keys list update did not contain at least one of our keys."
+ f" {new_public_keys_metadata}"
+ )
+
+ for missing_key in missing_keys:
+ log.warning(missing_key.public_key.fingerprint)
+ new_public_keys_metadata.add(PublicKeyMetadata(
+ fingerprint=missing_key.public_key.fingerprint,
+ timestamp=datetime.now(timezone.utc)
+ ))
+
+ await self.publish_public_keys_list(client, new_public_keys_metadata)
+
+ await self.__storage[profile].force(storage_key, new_public_keys_metadata)
+
+ def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
+ """List GPG public keys available for a JID.
+
+ @param client: The client to perform this operation with.
+ @param jid: The JID. Can be a bare JID.
+ @return: The set of public keys available for this JID.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}")
+
+ def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]:
+ """List GPG secret keys available for a JID.
+
+ @param client: The client to perform this operation with.
+ @return: The set of secret keys available for this JID.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}")
+
+ async def create_key(self, client: SatXMPPClient) -> GPGSecretKey:
+ """Create a new GPG key, capable of signing and encryption.
+
+ The key is generated without password protection and without expiration.
+
+ @param client: The client to perform this operation with.
+ @return: The new key.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}")
+
+ await self.publish_public_key(client, secret_key.public_key)
+
+ storage_key = f"/public-keys-metadata/{client.jid.userhost()}"
+
+ public_keys_list = cast(
+ Set[PublicKeyMetadata],
+ await self.__storage[client.profile].get(storage_key, set())
+ )
+
+ public_keys_list.add(PublicKeyMetadata(
+ fingerprint=secret_key.public_key.fingerprint,
+ timestamp=datetime.now(timezone.utc)
+ ))
+
+ await self.publish_public_keys_list(client, public_keys_list)
+
+ await self.__storage[client.profile].force(storage_key, public_keys_list)
+
+ return secret_key
+
+ @staticmethod
+ def __build_content_element(
+ element_name: Literal["signcrypt", "sign", "crypt"],
+ recipient_jids: Iterable[jid.JID],
+ include_rpad: bool
+ ) -> Tuple[domish.Element, domish.Element]:
+ """Build a content element.
+
+ @param element_name: The name of the content element.
+ @param recipient_jids: The intended recipients of this content element. Can be
+ bare JIDs.
+ @param include_rpad: Whether to include random-length random-content padding.
+ @return: The content element and the ``<payload/>`` element to add the stanza
+ extension elements to.
+ """
+
+ content_elt = domish.Element((NS_OX, element_name))
+
+ for recipient_jid in recipient_jids:
+ content_elt.addElement("to")["jid"] = recipient_jid.userhost()
+
+ content_elt.addElement("time")["stamp"] = format_datetime()
+
+ if include_rpad:
+ # XEP-0373 doesn't specify bounds for the length of the random padding. This
+ # uses the bounds specified in XEP-0420 for the closely related rpad affix.
+ rpad_length = secrets.randbelow(201)
+ rpad_content = "".join(
+ secrets.choice(string.digits + string.ascii_letters + string.punctuation)
+ for __
+ in range(rpad_length)
+ )
+ content_elt.addElement("rpad", content=rpad_content)
+
+ payload_elt = content_elt.addElement("payload")
+
+ return content_elt, payload_elt
+
+ @staticmethod
+ def build_signcrypt_element(
+ recipient_jids: Iterable[jid.JID]
+ ) -> Tuple[domish.Element, domish.Element]:
+ """Build a ``<signcrypt/>`` content element.
+
+ @param recipient_jids: The intended recipients of this content element. Can be
+ bare JIDs.
+ @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the
+ stanza extension elements to.
+ """
+
+ if len(recipient_jids) == 0:
+ raise ValueError("Recipient JIDs must be provided.")
+
+ return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)
+
+ @staticmethod
+ def build_sign_element(
+ recipient_jids: Iterable[jid.JID],
+ include_rpad: bool
+ ) -> Tuple[domish.Element, domish.Element]:
+ """Build a ``<sign/>`` content element.
+
+ @param recipient_jids: The intended recipients of this content element. Can be
+ bare JIDs.
+ @param include_rpad: Whether to include random-length random-content padding,
+ which is OPTIONAL for the ``<sign/>`` content element.
+ @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza
+ extension elements to.
+ """
+
+ if len(recipient_jids) == 0:
+ raise ValueError("Recipient JIDs must be provided.")
+
+ return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)
+
+ @staticmethod
+ def build_crypt_element(
+ recipient_jids: Iterable[jid.JID]
+ ) -> Tuple[domish.Element, domish.Element]:
+ """Build a ``<crypt/>`` content element.
+
+ @param recipient_jids: The intended recipients of this content element. Specifying
+ the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
+ be bare JIDs.
+ @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza
+ extension elements to.
+ """
+
+ return XEP_0373.__build_content_element("crypt", recipient_jids, True)
+
+ async def build_openpgp_element(
+ self,
+ client: SatXMPPClient,
+ content_elt: domish.Element,
+ recipient_jids: Set[jid.JID]
+ ) -> domish.Element:
+ """Build an ``<openpgp/>`` element.
+
+ @param client: The client to perform this operation with.
+ @param content_elt: The content element to contain in the ``<openpgp/>`` element.
+ @param recipient_jids: The recipient's JIDs. Can be bare JIDs.
+ @return: The ``<openpgp/>`` element.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ # TODO: I'm not sure whether we want to sign with all keys by default or choose
+ # just one key/a subset of keys to sign with.
+ signing_keys = set(filter(
+ lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
+ self.list_secret_keys(client)
+ ))
+
+ encryption_keys: Set[GPGPublicKey] = set()
+
+ for recipient_jid in recipient_jids:
+ # Import all keys of the recipient
+ all_public_keys = await self.import_all_public_keys(client, recipient_jid)
+
+ # Filter for keys that can encrypt
+ encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))
+
+ # TODO: Handle trust
+
+ content = content_elt.toXml().encode("utf-8")
+ data: bytes
+
+ if content_elt.name == "signcrypt":
+ data = gpg_provider.encrypt(content, encryption_keys, signing_keys)
+ elif content_elt.name == "sign":
+ data = gpg_provider.sign(content, signing_keys)
+ elif content_elt.name == "crypt":
+ data = gpg_provider.encrypt(content, encryption_keys)
+ else:
+ raise ValueError(f"Unknown content element <{content_elt.name}/>")
+
+ openpgp_elt = domish.Element((NS_OX, "openpgp"))
+ openpgp_elt.addContent(base64.b64encode(data).decode("ASCII"))
+ return openpgp_elt
+
+ async def unpack_openpgp_element(
+ self,
+ client: SatXMPPClient,
+ openpgp_elt: domish.Element,
+ element_name: Literal["signcrypt", "sign", "crypt"],
+ sender_jid: jid.JID
+ ) -> Tuple[domish.Element, datetime]:
+ """Verify, decrypt and unpack an ``<openpgp/>`` element.
+
+ @param client: The client to perform this operation with.
+ @param openpgp_elt: The ``<openpgp/>`` element.
+ @param element_name: The name of the content element.
+ @param sender_jid: The sender's JID. Can be a bare JID.
+ @return: The ``<payload/>`` element containing the decrypted/verified stanza
+ extension elements carried by this ``<openpgp/>`` element, and the timestamp
+ contained in the content element.
+ @raise exceptions.ParsingError: on syntactical verification errors.
+ @raise VerificationError: on semantical verification errors accoding to XEP-0373.
+ @raise DecryptionFailed: on decryption failure.
+ @raise VerificationFailed: if the data could not be verified.
+
+ @warning: The timestamp is not verified for plausibility; this SHOULD be done by
+ the calling code.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ decryption_keys = set(filter(
+ lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
+ self.list_secret_keys(client)
+ ))
+
+ # Import all keys of the sender
+ all_public_keys = await self.import_all_public_keys(client, sender_jid)
+
+ # Filter for keys that can sign
+ verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))
+
+ # TODO: Handle trust
+
+ try:
+ OPENPGP_SCHEMA.validate(openpgp_elt.toXml())
+ except xmlschema.XMLSchemaValidationError as e:
+ raise exceptions.ParsingError(
+ "<openpgp/> element doesn't pass schema validation."
+ ) from e
+
+ openpgp_message = base64.b64decode(str(openpgp_elt))
+ content: bytes
+
+ if element_name == "signcrypt":
+ content = gpg_provider.decrypt(
+ openpgp_message,
+ decryption_keys,
+ public_keys=verification_keys
+ )
+ elif element_name == "sign":
+ content = gpg_provider.verify(openpgp_message, verification_keys)
+ elif element_name == "crypt":
+ content = gpg_provider.decrypt(openpgp_message, decryption_keys)
+ else:
+ assert_never(element_name)
+
+ try:
+ content_elt = cast(
+ domish.Element,
+ xml_tools.ElementParser()(content.decode("utf-8"))
+ )
+ except UnicodeDecodeError as e:
+ raise exceptions.ParsingError("UTF-8 decoding error") from e
+
+ try:
+ CONTENT_SCHEMA.validate(content_elt.toXml())
+ except xmlschema.XMLSchemaValidationError as e:
+ raise exceptions.ParsingError(
+ f"<{element_name}/> element doesn't pass schema validation."
+ ) from e
+
+ if content_elt.name != element_name:
+ raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
+
+ recipient_jids = \
+ { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }
+
+ if (
+ client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
+ and element_name != "crypt"
+ ):
+ raise VerificationError(
+ f"Recipient list in <{element_name}/> element does not list our (bare)"
+ f" JID."
+ )
+
+ time_elt = next(content_elt.elements(NS_OX, "time"))
+
+ timestamp = parse_datetime(time_elt["stamp"])
+
+ payload_elt = next(content_elt.elements(NS_OX, "payload"))
+
+ return payload_elt, timestamp
+
+ async def publish_public_key(
+ self,
+ client: SatXMPPClient,
+ public_key: GPGPublicKey
+ ) -> None:
+ """Publish a public key.
+
+ @param client: The client.
+ @param public_key: The public key to publish.
+ @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ packet = gpg_provider.export_public_key(public_key)
+
+ node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}"
+
+ pubkey_elt = domish.Element((NS_OX, "pubkey"))
+
+ pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))
+
+ try:
+ await self.__xep_0060.sendItem(
+ client,
+ client.jid.userhostJID(),
+ node,
+ pubkey_elt,
+ format_datetime(),
+ extra={
+ XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+ XEP_0060.OPT_PERSIST_ITEMS: "true",
+ XEP_0060.OPT_ACCESS_MODEL: "open",
+ XEP_0060.OPT_MAX_ITEMS: 1
+ },
+ # TODO: Do we really want publish_without_options here?
+ XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+ }
+ )
+ except Exception as e:
+ raise XMPPInteractionFailed("Publishing the public key failed.") from e
+
+ async def import_all_public_keys(
+ self,
+ client: SatXMPPClient,
+ jid: jid.JID
+ ) -> Set[GPGPublicKey]:
+ """Import all public keys of a JID that have not been imported before.
+
+ @param client: The client.
+ @param jid: The JID. Can be a bare JID.
+ @return: The public keys.
+ @note: Failure to import a key simply results in the key not being included in the
+ result.
+ """
+
+ available_public_keys = self.list_public_keys(client, jid)
+
+ storage_key = f"/public-keys-metadata/{jid.userhost()}"
+
+ public_keys_metadata = cast(
+ Set[PublicKeyMetadata],
+ await self.__storage[client.profile].get(storage_key, set())
+ )
+
+ missing_keys = set(filter(lambda public_key_metadata: all(
+ public_key_metadata.fingerprint != public_key.fingerprint
+ for public_key
+ in available_public_keys
+ ), public_keys_metadata))
+
+ for missing_key in missing_keys:
+ try:
+ available_public_keys.add(
+ await self.import_public_key(client, jid, missing_key.fingerprint)
+ )
+ except Exception as e:
+ log.warning(
+ f"Import of public key {missing_key.fingerprint} owned by"
+ f" {jid.userhost()} failed, ignoring: {e}"
+ )
+
+ return available_public_keys
+
+ async def import_public_key(
+ self,
+ client: SatXMPPClient,
+ jid: jid.JID,
+ fingerprint: str
+ ) -> GPGPublicKey:
+ """Import a public key.
+
+ @param client: The client.
+ @param jid: The JID owning the public key. Can be a bare JID.
+ @param fingerprint: The fingerprint of the public key.
+ @return: The public key.
+ @raise exceptions.NotFound: if the public key was not found.
+ @raise exceptions.ParsingError: on XML-level parsing errors.
+ @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+ invalid.
+ @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
+
+ try:
+ items, __ = await self.__xep_0060.getItems(
+ client,
+ jid.userhostJID(),
+ node,
+ max_items=1
+ )
+ except exceptions.NotFound as e:
+ raise exceptions.NotFound(
+ f"No public key with fingerprint {fingerprint} published by JID"
+ f" {jid.userhost()}."
+ ) from e
+ except Exception as e:
+ raise XMPPInteractionFailed("Fetching the public keys list failed.") from e
+
+ try:
+ item_elt = cast(domish.Element, items[0])
+ except IndexError as e:
+ raise exceptions.NotFound(
+ f"No public key with fingerprint {fingerprint} published by JID"
+ f" {jid.userhost()}."
+ ) from e
+
+ pubkey_elt = cast(
+ Optional[domish.Element],
+ next(item_elt.elements(NS_OX, "pubkey"), None)
+ )
+
+ if pubkey_elt is None:
+ raise exceptions.ParsingError(
+ f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
+ f" element."
+ )
+
+ try:
+ PUBKEY_SCHEMA.validate(pubkey_elt.toXml())
+ except xmlschema.XMLSchemaValidationError as e:
+ raise exceptions.ParsingError(
+ f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
+ f" schema validation."
+ ) from e
+
+ public_key = gpg_provider.import_public_key(base64.b64decode(str(
+ next(pubkey_elt.elements(NS_OX, "data"))
+ )))
+
+ return public_key
+
+ async def publish_public_keys_list(
+ self,
+ client: SatXMPPClient,
+ public_keys_list: Iterable[PublicKeyMetadata]
+ ) -> None:
+ """Publish/update the own public keys list.
+
+ @param client: The client.
+ @param public_keys_list: The public keys list.
+ @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+
+ @warning: All public keys referenced in the public keys list MUST be published
+ beforehand.
+ """
+
+ if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
+ raise ValueError("Public keys list contains duplicate fingerprints.")
+
+ node = "urn:xmpp:openpgp:0:public-keys"
+
+ public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))
+
+ for public_key_metadata in public_keys_list:
+ pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
+ pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
+ pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)
+
+ try:
+ await self.__xep_0060.sendItem(
+ client,
+ client.jid.userhostJID(),
+ node,
+ public_keys_list_elt,
+ item_id=XEP_0060.ID_SINGLETON,
+ extra={
+ XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+ XEP_0060.OPT_PERSIST_ITEMS: "true",
+ XEP_0060.OPT_ACCESS_MODEL: "open",
+ XEP_0060.OPT_MAX_ITEMS: 1
+ },
+ # TODO: Do we really want publish_without_options here?
+ XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+ }
+ )
+ except Exception as e:
+ raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
+
+ async def download_public_keys_list(
+ self,
+ client: SatXMPPClient,
+ jid: jid.JID
+ ) -> Optional[Set[PublicKeyMetadata]]:
+ """Download the public keys list of a JID.
+
+ @param client: The client.
+ @param jid: The JID. Can be a bare JID.
+ @return: The public keys list or ``None`` if the JID hasn't published a public
+ keys list. An empty list means the JID has published an empty list.
+ @raise exceptions.ParsingError: on XML-level parsing errors.
+ @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+ """
+
+ node = "urn:xmpp:openpgp:0:public-keys"
+
+ try:
+ items, __ = await self.__xep_0060.getItems(
+ client,
+ jid.userhostJID(),
+ node,
+ max_items=1
+ )
+ except exceptions.NotFound:
+ return None
+ except Exception as e:
+ raise XMPPInteractionFailed() from e
+
+ try:
+ item_elt = cast(domish.Element, items[0])
+ except IndexError:
+ return None
+
+ public_keys_list_elt = cast(
+ Optional[domish.Element],
+ next(item_elt.elements(NS_OX, "public-keys-list"), None)
+ )
+
+ if public_keys_list_elt is None:
+ return None
+
+ try:
+ PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+ except xmlschema.XMLSchemaValidationError as e:
+ raise exceptions.ParsingError(
+ f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys"
+ f" list schema validation."
+ ) from e
+
+ return {
+ PublicKeyMetadata(
+ fingerprint=pubkey_metadata_elt["v4-fingerprint"],
+ timestamp=parse_datetime(pubkey_metadata_elt["date"])
+ )
+ for pubkey_metadata_elt
+ in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
+ }
+
+ async def __prepare_secret_key_synchronization(
+ self,
+ client: SatXMPPClient
+ ) -> Optional[domish.Element]:
+ """Prepare for secret key synchronization.
+
+ Makes sure the relative protocols and protocol extensions are supported by the
+ server and makes sure that the PEP node for secret synchronization exists and is
+ configured correctly. The node is created if necessary.
+
+ @param client: The client.
+ @return: As part of the preparations, the secret key synchronization PEP node is
+ fetched. The result of that fetch is returned here.
+ @raise exceptions.FeatureNotFound: if the server lacks support for the required
+ protocols or protocol extensions.
+ @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+ """
+
+ try:
+ infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos(
+ client,
+ client.jid.userhostJID()
+ ))
+ except Exception as e:
+ raise XMPPInteractionFailed(
+ "Error performing service discovery on the own bare JID."
+ ) from e
+
+ identities = cast(Dict[Tuple[str, str], str], infos.identities)
+ features = cast(Set[DiscoFeature], infos.features)
+
+ if ("pubsub", "pep") not in identities:
+ raise exceptions.FeatureNotFound("Server doesn't support PEP.")
+
+ if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
+ raise exceptions.FeatureNotFound(
+ "Server doesn't support the whitelist access model."
+ )
+
+ persistent_items_supported = \
+ "http://jabber.org/protocol/pubsub#persistent-items" in features
+
+ # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
+
+ node = "urn:xmpp:openpgp:0:secret-key"
+
+ try:
+ items, __ = await self.__xep_0060.getItems(
+ client,
+ client.jid.userhostJID(),
+ node,
+ max_items=1
+ )
+ except exceptions.NotFound:
+ try:
+ await self.__xep_0060.createNode(
+ client,
+ client.jid.userhostJID(),
+ node,
+ {
+ XEP_0060.OPT_PERSIST_ITEMS: "true",
+ XEP_0060.OPT_ACCESS_MODEL: "whitelist",
+ XEP_0060.OPT_MAX_ITEMS: "1"
+ }
+ )
+ except Exception as e:
+ raise XMPPInteractionFailed(
+ "Error creating the secret key synchronization node."
+ ) from e
+ except Exception as e:
+ raise XMPPInteractionFailed(
+ "Error fetching the secret key synchronization node."
+ ) from e
+
+ try:
+ return cast(domish.Element, items[0])
+ except IndexError:
+ return None
+
+ async def export_secret_keys(
+ self,
+ client: SatXMPPClient,
+ secret_keys: Iterable[GPGSecretKey]
+ ) -> str:
+ """Export secret keys to synchronize them with other devices.
+
+ @param client: The client.
+ @param secret_keys: The secret keys to export.
+ @return: The backup code needed to decrypt the exported secret keys.
+ @raise exceptions.FeatureNotFound: if the server lacks support for the required
+ protocols or protocol extensions.
+ @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ await self.__prepare_secret_key_synchronization(client)
+
+ backup_code = generate_passphrase()
+
+ plaintext = b"".join(
+ gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys
+ )
+
+ ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code)
+
+ node = "urn:xmpp:openpgp:0:secret-key"
+
+ secretkey_elt = domish.Element((NS_OX, "secretkey"))
+ secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
+
+ try:
+ await self.__xep_0060.sendItem(
+ client,
+ client.jid.userhostJID(),
+ node,
+ secretkey_elt
+ )
+ except Exception as e:
+ raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
+
+ return backup_code
+
+ async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]:
+ """Download previously exported secret keys to import them in a second step.
+
+ The downloading and importing steps are separate since a backup code is required
+ for the import and it should be possible to try multiple backup codes without
+ redownloading the data every time. The second half of the import procedure is
+ provided by :meth:`import_secret_keys`.
+
+ @param client: The client.
+ @return: The encrypted secret keys previously exported, if any.
+ @raise exceptions.FeatureNotFound: if the server lacks support for the required
+ protocols or protocol extensions.
+ @raise exceptions.ParsingError: on XML-level parsing errors.
+ @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+ """
+
+ item_elt = await self.__prepare_secret_key_synchronization(client)
+ if item_elt is None:
+ return None
+
+ secretkey_elt = cast(
+ Optional[domish.Element],
+ next(item_elt.elements(NS_OX, "secretkey"), None)
+ )
+
+ if secretkey_elt is None:
+ return None
+
+ try:
+ SECRETKEY_SCHEMA.validate(secretkey_elt.toXml())
+ except xmlschema.XMLSchemaValidationError as e:
+ raise exceptions.ParsingError(
+ "Publish-Subscribe item doesn't pass secretkey schema validation."
+ ) from e
+
+ return base64.b64decode(str(secretkey_elt))
+
+ def import_secret_keys(
+ self,
+ client: SatXMPPClient,
+ ciphertext: bytes,
+ backup_code: str
+ ) -> Set[GPGSecretKey]:
+ """Import previously downloaded secret keys.
+
+ The downloading and importing steps are separate since a backup code is required
+ for the import and it should be possible to try multiple backup codes without
+ redownloading the data every time. The first half of the import procedure is
+ provided by :meth:`download_secret_keys`.
+
+ @param client: The client to perform this operation with.
+ @param ciphertext: The ciphertext, i.e. the data returned by
+ :meth:`download_secret_keys`.
+ @param backup_code: The backup code needed to decrypt the data.
+ @raise InvalidPacket: if one of the GPG packets building the secret key data is
+ either syntactically or semantically deemed invalid.
+ @raise DecryptionFailed: on decryption failure.
+ """
+
+ gpg_provider = get_gpg_provider(self.__sat, client)
+
+ return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
+ ciphertext,
+ backup_code
+ ))
+
+ @staticmethod
+ def __get_joined_muc_users(
+ client: SatXMPPClient,
+ xep_0045: XEP_0045,
+ room_jid: jid.JID
+ ) -> Set[jid.JID]:
+ """
+ @param client: The client.
+ @param xep_0045: A MUC plugin instance.
+ @param room_jid: The room JID.
+ @return: A set containing the bare JIDs of the MUC participants.
+ @raise InternalError: if the MUC is not joined or the entity information of a
+ participant isn't available.
+ """
+ # TODO: This should probably be a global helper somewhere
+
+ bare_jids: Set[jid.JID] = set()
+
+ try:
+ room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+ except exceptions.NotFound as e:
+ raise exceptions.InternalError(
+ "Participant list of unjoined MUC requested."
+ ) from e
+
+ for user in cast(Dict[str, muc.User], room.roster).values():
+ entity = cast(Optional[SatXMPPEntity], user.entity)
+ if entity is None:
+ raise exceptions.InternalError(
+ f"Participant list of MUC requested, but the entity information of"
+ f" the participant {user} is not available."
+ )
+
+ bare_jids.add(entity.jid.userhostJID())
+
+ return bare_jids
+
+ async def get_trust(
+ self,
+ client: SatXMPPClient,
+ public_key: GPGPublicKey,
+ owner: jid.JID
+ ) -> TrustLevel:
+ """Query the trust level of a public key.
+
+ @param client: The client to perform this operation under.
+ @param public_key: The public key.
+ @param owner: The owner of the public key. Can be a bare JID.
+ @return: The trust level.
+ """
+
+ key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+ try:
+ return TrustLevel(await self.__storage[client.profile][key])
+ except KeyError:
+ return TrustLevel.UNDECIDED
+
+ async def set_trust(
+ self,
+ client: SatXMPPClient,
+ public_key: GPGPublicKey,
+ owner: jid.JID,
+ trust_level: TrustLevel
+ ) -> None:
+ """Set the trust level of a public key.
+
+ @param client: The client to perform this operation under.
+ @param public_key: The public key.
+ @param owner: The owner of the public key. Can be a bare JID.
+ @param trust_leve: The trust level.
+ """
+
+ key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+ await self.__storage[client.profile].force(key, trust_level.name)
+
+ async def getTrustUI( # pylint: disable=invalid-name
+ self,
+ client: SatXMPPClient,
+ entity: jid.JID
+ ) -> xml_tools.XMLUI:
+ """
+ @param client: The client.
+ @param entity: The entity whose device trust levels to manage.
+ @return: An XMLUI instance which opens a form to manage the trust level of all
+ devices belonging to the entity.
+ """
+
+ if entity.resource:
+ raise ValueError("A bare JID is expected.")
+
+ bare_jids: Set[jid.JID]
+ if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity):
+ bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
+ else:
+ bare_jids = { entity.userhostJID() }
+
+ all_public_keys = list({
+ bare_jid: list(self.list_public_keys(client, bare_jid))
+ for bare_jid
+ in bare_jids
+ }.items())
+
+ async def callback(
+ data: Any,
+ profile: str # pylint: disable=unused-argument
+ ) -> Dict[Never, Never]:
+ """
+ @param data: The XMLUI result produces by the trust UI form.
+ @param profile: The profile.
+ @return: An empty dictionary. The type of the return value was chosen
+ conservatively since the exact options are neither known not needed here.
+ """
+
+ if C.bool(data.get("cancelled", "false")):
+ return {}
+
+ data_form_result = cast(
+ Dict[str, str],
+ xml_tools.XMLUIResult2DataFormResult(data)
+ )
+ for key, value in data_form_result.items():
+ if not key.startswith("trust_"):
+ continue
+
+ outer_index, inner_index = key.split("_")[1:]
+
+ owner, public_keys = all_public_keys[int(outer_index)]
+ public_key = public_keys[int(inner_index)]
+ trust = TrustLevel(value)
+
+ if (await self.get_trust(client, public_key, owner)) is not trust:
+ await self.set_trust(client, public_key, owner, value)
+
+ return {}
+
+ submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True)
+
+ result = xml_tools.XMLUI(
+ panel_type=C.XMLUI_FORM,
+ title=D_("OX trust management"),
+ submit_id=submit_id
+ )
+ # Casting this to Any, otherwise all calls on the variable cause type errors
+ # pylint: disable=no-member
+ trust_ui = cast(Any, result)
+ trust_ui.addText(D_(
+ "This is OX trusting system. You'll see below the GPG keys of your "
+ "contacts, and a list selection to trust them or not. A trusted key "
+ "can read your messages in plain text, so be sure to only validate "
+ "keys that you are sure are belonging to your contact. It's better "
+ "to do this when you are next to your contact, so "
+ "you can check the \"fingerprint\" of the key "
+ "yourself. Do *not* validate a key if the fingerprint is wrong!"
+ ))
+
+ own_secret_keys = self.list_secret_keys(client)
+
+ trust_ui.changeContainer("label")
+ for index, secret_key in enumerate(own_secret_keys):
+ trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
+ trust_ui.addText(secret_key.public_key.fingerprint)
+ trust_ui.addEmpty()
+ trust_ui.addEmpty()
+
+ for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
+ for inner_index, public_key in enumerate(public_keys):
+ trust_ui.addLabel(D_("Contact"))
+ trust_ui.addJid(jid.JID(owner))
+ trust_ui.addLabel(D_("Fingerprint"))
+ trust_ui.addText(public_key.fingerprint)
+ trust_ui.addLabel(D_("Trust this device?"))
+
+ current_trust_level = await self.get_trust(client, public_key, owner)
+ avaiable_trust_levels = \
+ { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
+
+ trust_ui.addList(
+ f"trust_{outer_index}_{inner_index}",
+ options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+ selected=current_trust_level.name,
+ styles=[ "inline" ]
+ )
+
+ trust_ui.addEmpty()
+ trust_ui.addEmpty()
+
+ return result
diff --git a/sat/plugins/plugin_xep_0374.py b/sat/plugins/plugin_xep_0374.py
new file mode 100644
--- /dev/null
+++ b/sat/plugins/plugin_xep_0374.py
@@ -0,0 +1,421 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP Instant Messaging
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Dict, Optional, Set, cast
+
+from typing_extensions import Final
+from wokkel import muc # type: ignore[import]
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0334 import XEP_0334
+from sat.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+
+__all__ = [ # pylint: disable=unused-variable
+ "PLUGIN_INFO",
+ "XEP_0374",
+ "NS_OXIM"
+]
+
+
+log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+ C.PI_NAME: "OXIM",
+ C.PI_IMPORT_NAME: "XEP-0374",
+ C.PI_TYPE: "SEC",
+ C.PI_PROTOCOLS: [ "XEP-0374" ],
+ C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ],
+ C.PI_RECOMMENDATIONS: [ "XEP-0045" ],
+ C.PI_MAIN: "XEP_0374",
+ C.PI_HANDLER: "no",
+ C.PI_DESCRIPTION: _("""Implementation of OXIM"""),
+}
+
+
+# The disco feature
+NS_OXIM: Final = "urn:xmpp:openpgp:im:0"
+
+
+class XEP_0374:
+ """
+ Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0``
+ namespace. MUC messages are supported next to one to one messages. For trust
+ management, the two trust models "BTBV" and "manual" are supported.
+ """
+
+ def __init__(self, sat: SAT) -> None:
+ """
+ @param sat: The SAT instance.
+ """
+
+ self.__sat = sat
+
+ # Plugins
+ self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+ self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
+ self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"])
+
+ # Triggers
+ sat.trigger.add(
+ "messageReceived",
+ self.__message_received_trigger,
+ priority=100050
+ )
+ sat.trigger.add("send", self.__send_trigger, priority=0)
+
+ # Register the encryption plugin
+ sat.registerEncryptionPlugin(self, "OXIM", NS_OX, 102)
+
+ async def getTrustUI( # pylint: disable=invalid-name
+ self,
+ client: SatXMPPClient,
+ entity: jid.JID
+ ) -> xml_tools.XMLUI:
+ """
+ @param client: The client.
+ @param entity: The entity whose device trust levels to manage.
+ @return: An XMLUI instance which opens a form to manage the trust level of all
+ devices belonging to the entity.
+ """
+
+ return await self.__xep_0373.getTrustUI(client, entity)
+
+ @staticmethod
+ def __get_joined_muc_users(
+ client: SatXMPPClient,
+ xep_0045: XEP_0045,
+ room_jid: jid.JID
+ ) -> Set[jid.JID]:
+ """
+ @param client: The client.
+ @param xep_0045: A MUC plugin instance.
+ @param room_jid: The room JID.
+ @return: A set containing the bare JIDs of the MUC participants.
+ @raise InternalError: if the MUC is not joined or the entity information of a
+ participant isn't available.
+ """
+
+ bare_jids: Set[jid.JID] = set()
+
+ try:
+ room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+ except exceptions.NotFound as e:
+ raise exceptions.InternalError(
+ "Participant list of unjoined MUC requested."
+ ) from e
+
+ for user in cast(Dict[str, muc.User], room.roster).values():
+ entity = cast(Optional[SatXMPPEntity], user.entity)
+ if entity is None:
+ raise exceptions.InternalError(
+ f"Participant list of MUC requested, but the entity information of"
+ f" the participant {user} is not available."
+ )
+
+ bare_jids.add(entity.jid.userhostJID())
+
+ return bare_jids
+
+ async def __message_received_trigger(
+ self,
+ client: SatXMPPClient,
+ message_elt: domish.Element,
+ post_treat: defer.Deferred
+ ) -> bool:
+ """
+ @param client: The client which received the message.
+ @param message_elt: The message element. Can be modified.
+ @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
+ message has fully progressed through the message receiving flow. Can be used
+ to apply treatments to the fully processed message, like marking it as
+ encrypted.
+ @return: Whether to continue the message received flow.
+ """
+ sender_jid = jid.JID(message_elt["from"])
+ feedback_jid: jid.JID
+
+ message_type = message_elt.getAttribute("type", "unknown")
+ is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
+ if is_muc_message:
+ if self.__xep_0045 is None:
+ log.warning(
+ "Ignoring MUC message since plugin XEP-0045 is not available."
+ )
+ # Can't handle a MUC message without XEP-0045, let the flow continue
+ # normally
+ return True
+
+ room_jid = feedback_jid = sender_jid.userhostJID()
+
+ try:
+ room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid))
+ except exceptions.NotFound:
+ log.warning(
+ f"Ignoring MUC message from a room that has not been joined:"
+ f" {room_jid}"
+ )
+ # Whatever, let the flow continue
+ return True
+
+ sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
+ if sender_user is None:
+ log.warning(
+ f"Ignoring MUC message from room {room_jid} since the sender's user"
+ f" wasn't found {sender_jid.resource}"
+ )
+ # Whatever, let the flow continue
+ return True
+
+ sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
+ if sender_user_jid is None:
+ log.warning(
+ f"Ignoring MUC message from room {room_jid} since the sender's bare"
+ f" JID couldn't be found from its user information: {sender_user}"
+ )
+ # Whatever, let the flow continue
+ return True
+
+ sender_jid = sender_user_jid
+ else:
+ # I'm not sure why this check is required, this code is copied from XEP-0384
+ if sender_jid.userhostJID() == client.jid.userhostJID():
+ # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
+ # like "to" isn't always set.
+ feedback_jid = jid.JID(message_elt["to"])
+ else:
+ feedback_jid = sender_jid
+
+ sender_bare_jid = sender_jid.userhost()
+
+ openpgp_elt = cast(Optional[domish.Element], next(
+ message_elt.elements(NS_OX, "openpgp"),
+ None
+ ))
+
+ if openpgp_elt is None:
+ # None of our business, let the flow continue
+ return True
+
+ try:
+ payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element(
+ client,
+ openpgp_elt,
+ "signcrypt",
+ jid.JID(sender_bare_jid)
+ )
+ except Exception as e:
+ # TODO: More specific exception handling
+ log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
+ reason=e,
+ xml=message_elt.toXml()
+ ))
+ client.feedback(
+ feedback_jid,
+ D_(
+ f"An OXIM message from {sender_jid.full()} can't be decrypted:"
+ f" {e}"
+ ),
+ { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+ )
+ # No point in further processing this message
+ return False
+
+ message_elt.children.remove(openpgp_elt)
+
+ log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}")
+
+ # Remove all body elements from the original element, since those act as
+ # fallbacks in case the encryption protocol is not supported
+ for child in message_elt.elements():
+ if child.name == "body":
+ message_elt.children.remove(child)
+
+ # Move all extension elements from the payload to the stanza root
+ # TODO: There should probably be explicitly forbidden elements here too, just as
+ # for XEP-0420
+ for child in list(payload_elt.elements()):
+ # Remove the child from the content element
+ payload_elt.children.remove(child)
+
+ # Add the child to the stanza
+ message_elt.addChild(child)
+
+ # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
+ trust_level = TrustLevel.UNDECIDED # TODO: Load the actual trust level
+ if trust_level is TrustLevel.TRUSTED:
+ post_treat.addCallback(client.encryption.markAsTrusted)
+ else:
+ post_treat.addCallback(client.encryption.markAsUntrusted)
+
+ # Mark the message as originally encrypted
+ post_treat.addCallback(
+ client.encryption.markAsEncrypted,
+ namespace=NS_OX
+ )
+
+ # Message processed successfully, continue with the flow
+ return True
+
+ async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
+ """
+ @param client: The client sending this message.
+ @param stanza: The stanza that is about to be sent. Can be modified.
+ @return: Whether the send message flow should continue or not.
+ """
+ # OXIM only handles message stanzas
+ if stanza.name != "message":
+ return True
+
+ # Get the intended recipient
+ recipient = stanza.getAttribute("to", None)
+ if recipient is None:
+ raise exceptions.InternalError(
+ f"Message without recipient encountered. Blocking further processing to"
+ f" avoid leaking plaintext data: {stanza.toXml()}"
+ )
+
+ # Parse the JID
+ recipient_bare_jid = jid.JID(recipient).userhostJID()
+
+ # Check whether encryption with OXIM is requested
+ encryption = client.encryption.getSession(recipient_bare_jid)
+
+ if encryption is None:
+ # Encryption is not requested for this recipient
+ return True
+
+ if encryption["plugin"].namespace != NS_OX:
+ # Encryption is requested for this recipient, but not with OXIM
+ return True
+
+ # All pre-checks done, we can start encrypting!
+ await self.__encrypt(
+ client,
+ stanza,
+ recipient_bare_jid,
+ stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT
+ )
+
+ # Add a store hint if this is a message stanza
+ self.__xep_0334.addHintElements(stanza, [ "store" ])
+
+ # Let the flow continue.
+ return True
+
+ async def __encrypt(
+ self,
+ client: SatXMPPClient,
+ stanza: domish.Element,
+ recipient_jid: jid.JID,
+ is_muc_message: bool
+ ) -> None:
+ """
+ @param client: The client.
+ @param stanza: The stanza, which is modified by this call.
+ @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
+ but doesn't have to.
+ @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
+
+ @warning: The calling code MUST take care of adding the store message processing
+ hint to the stanza if applicable! This can be done before or after this call,
+ the order doesn't matter.
+ """
+
+ recipient_bare_jids: Set[jid.JID]
+ feedback_jid: jid.JID
+
+ if is_muc_message:
+ if self.__xep_0045 is None:
+ raise exceptions.InternalError(
+ "Encryption of MUC message requested, but plugin XEP-0045 is not"
+ " available."
+ )
+
+ room_jid = feedback_jid = recipient_jid.userhostJID()
+
+ recipient_bare_jids = self.__get_joined_muc_users(
+ client,
+ self.__xep_0045,
+ room_jid
+ )
+ else:
+ recipient_bare_jids = { recipient_jid.userhostJID() }
+ feedback_jid = recipient_jid.userhostJID()
+
+ log.debug(
+ f"Intercepting message that is to be encrypted by {NS_OX} for"
+ f" {recipient_bare_jids}"
+ )
+
+ signcrypt_elt, payload_elt = \
+ self.__xep_0373.build_signcrypt_element(recipient_bare_jids)
+
+ # Move elements from the stanza to the content element.
+ # TODO: There should probably be explicitly forbidden elements here too, just as
+ # for XEP-0420
+ for child in list(stanza.elements()):
+ # Remove the child from the stanza
+ stanza.children.remove(child)
+
+ # A namespace of ``None`` can be used on domish elements to inherit the
+ # namespace from the parent. When moving elements from the stanza root to
+ # the content element, however, we don't want elements to inherit the
+ # namespace of the content element. Thus, check for elements with ``None``
+ # for their namespace and set the namespace to jabber:client, which is the
+ # namespace of the parent element.
+ if child.uri is None:
+ child.uri = C.NS_CLIENT
+ child.defaultUri = C.NS_CLIENT
+
+ # Add the child with corrected namespaces to the content element
+ payload_elt.addChild(child)
+
+ try:
+ openpgp_elt = await self.__xep_0373.build_openpgp_element(
+ client,
+ signcrypt_elt,
+ recipient_bare_jids
+ )
+ except Exception as e:
+ msg = _(
+ # pylint: disable=consider-using-f-string
+ "Can't encrypt message for {entities}: {reason}".format(
+ entities=', '.join(jid.userhost() for jid in recipient_bare_jids),
+ reason=e
+ )
+ )
+ log.warning(msg)
+ client.feedback(feedback_jid, msg, {
+ C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+ })
+ raise e
+
+ stanza.addChild(openpgp_elt)
diff --git a/sat/plugins/plugin_xep_0384.py b/sat/plugins/plugin_xep_0384.py
--- a/sat/plugins/plugin_xep_0384.py
+++ b/sat/plugins/plugin_xep_0384.py
@@ -479,10 +479,10 @@
xml_tools.et_elt_2_domish_elt(element),
item_id=str(bundle.device_id),
extra={
- xep_0060.EXTRA_PUBLISH_OPTIONS: {
- xep_0060.OPT_MAX_ITEMS: "max"
+ XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+ XEP_0060.OPT_MAX_ITEMS: "max"
},
- xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+ XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
}
)
except (error.StanzaError, Exception) as e:
@@ -519,8 +519,8 @@
xml_tools.et_elt_2_domish_elt(element),
item_id=xep_0060.ID_SINGLETON,
extra={
- xep_0060.EXTRA_PUBLISH_OPTIONS: { xep_0060.OPT_MAX_ITEMS: 1 },
- xep_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1 },
+ XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
}
)
except Exception as e:
@@ -546,7 +546,6 @@
client,
jid.JID(bare_jid),
node,
- max_items=None,
item_ids=[ str(device_id) ]
)
except Exception as e:
@@ -653,11 +652,11 @@
xml_tools.et_elt_2_domish_elt(element),
item_id=xep_0060.ID_SINGLETON,
extra={
- xep_0060.EXTRA_PUBLISH_OPTIONS: {
- xep_0060.OPT_MAX_ITEMS: 1,
- xep_0060.OPT_ACCESS_MODEL: "open"
+ XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+ XEP_0060.OPT_MAX_ITEMS: 1,
+ XEP_0060.OPT_ACCESS_MODEL: "open"
},
- xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+ XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
}
)
except (error.StanzaError, Exception) as e:
diff --git a/sat/plugins/plugin_xep_0420.py b/sat/plugins/plugin_xep_0420.py
--- a/sat/plugins/plugin_xep_0420.py
+++ b/sat/plugins/plugin_xep_0420.py
@@ -22,8 +22,10 @@
import secrets
import string
from typing import Dict, NamedTuple, Optional, Set, Tuple, cast
+from typing_extensions import Final
from lxml import etree
+from sat.core import exceptions
from sat.core.constants import Const as C
from sat.core.i18n import D_
@@ -68,7 +70,7 @@
}
-NS_SCE = "urn:xmpp:sce:1"
+NS_SCE: Final = "urn:xmpp:sce:1"
class ProfileRequirementsNotMet(Exception):
@@ -114,7 +116,8 @@
remain. Do not modify.
@return: An affix element to include in the envelope. The element must have the
name :attr:`element_name` and must validate using :attr:`element_schema`.
- @raise ValueError: if the affix couldn't be built.
+ @raise ValueError: if the affix couldn't be built due to missing information on
+ the stanza.
"""
@abstractmethod
@@ -384,7 +387,7 @@
by the decryption scheme utilizing SCE.
@return: The parsed and processed values of all affixes that were present on the
envelope, notably including the timestamp.
- @raise ValueError: if the serialized envelope element is malformed.
+ @raise exceptions.ParsingError: if the serialized envelope element is malformed.
@raise ProfileRequirementsNotMet: if one or more affixes required by the profile
are missing from the envelope.
@raise AffixVerificationFailed: if an affix included in the envelope fails to
@@ -399,7 +402,9 @@
try:
envelope_serialized_string = envelope_serialized.decode("utf-8")
except UnicodeError as e:
- raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e
+ raise exceptions.ParsingError(
+ "Serialized envelope can't bare parsed as utf-8."
+ ) from e
custom_affixes = set(profile.custom_policies.keys())
@@ -420,7 +425,9 @@
try:
etree.fromstring(envelope_serialized_string, parser)
except etree.XMLSyntaxError as e:
- raise ValueError("Serialized envelope doesn't pass schema validation.") from e
+ raise exceptions.ParsingError(
+ "Serialized envelope doesn't pass schema validation."
+ ) from e
# Prepare the envelope and content elements
envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
@@ -452,7 +459,7 @@
timestamp_value = None if time_element is None else \
XEP_0082.parse_datetime(time_element["stamp"])
except ValueError as e:
- raise AffixVerificationFailed("Malformed time affix") from e
+ raise AffixVerificationFailed("Malformed time affix.") from e
# The to affix is verified by comparing the to attribute of the stanza with the
# JID referenced by the affix. Note that only bare JIDs are compared as per the
diff --git a/sat/tools/xmpp_datetime.py b/sat/tools/xmpp_datetime.py
--- a/sat/tools/xmpp_datetime.py
+++ b/sat/tools/xmpp_datetime.py
@@ -80,12 +80,15 @@
@param value: A string containing date information formatted according to the Date
profile specified in XEP-0082.
@return: The date parsed from the input string.
- @raise ValueError: if the input string is not correctly formatted.
+ @raise exceptions.ParsingError: if the input string is not correctly formatted.
"""
# CCYY-MM-DD
# The Date profile of XEP-0082 is equal to the ISO 8601 format.
- return date.fromisoformat(value)
+ try:
+ return date.fromisoformat(value)
+ except ValueError as e:
+ raise exceptions.ParsingError() from e
def format_datetime(
@@ -125,13 +128,16 @@
@param value: A string containing datetime information formatted according to the
DateTime profile specified in XEP-0082.
@return: The datetime parsed from the input string.
- @raise ValueError: if the input string is not correctly formatted.
+ @raise exceptions.ParsingError: if the input string is not correctly formatted.
"""
# CCYY-MM-DDThh:mm:ss[.sss]TZD
value, microsecond = __parse_fraction_of_a_second(value)
- result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
+ try:
+ result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
+ except ValueError as e:
+ raise exceptions.ParsingError() from e
if microsecond is not None:
result = result.replace(microsecond=microsecond)
@@ -167,7 +173,7 @@
@param value: A string containing time information formatted according to the Time
profile specified in XEP-0082.
@return: The time parsed from the input string.
- @raise ValueError: if the input string is not correctly formatted.
+ @raise exceptions.ParsingError: if the input string is not correctly formatted.
"""
# hh:mm:ss[.sss][TZD]
@@ -177,7 +183,10 @@
# profile, except that it doesn't handle the letter Z as time zone information for
# UTC. This can be fixed with a simple string replacement of 'Z' with "+00:00", which
# is another way to represent UTC.
- result = time.fromisoformat(value.replace('Z', "+00:00"))
+ try:
+ result = time.fromisoformat(value.replace('Z', "+00:00"))
+ except ValueError as e:
+ raise exceptions.ParsingError() from e
if microsecond is not None:
result = result.replace(microsecond=microsecond)
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -52,13 +52,14 @@
'urwid-satext == 0.9.*',
'wokkel >= 18.0.0, < 19.0.0',
'omemo >= 1.0.0, < 2',
- 'twomemo >= 1.0.0, < 2',
- 'oldmemo >= 1.0.0, < 2',
+ 'twomemo[xml] >= 1.0.0, < 2',
+ 'oldmemo[xml] >= 1.0.0, < 2',
'pyyaml < 7.0.0',
'sqlalchemy >= 1.4',
'alembic',
'aiosqlite',
'txdbus',
+ 'xmlschema',
]
extras_require = {
diff --git a/tests/unit/test_plugin_xep_0082.py b/tests/unit/test_plugin_xep_0082.py
--- a/tests/unit/test_plugin_xep_0082.py
+++ b/tests/unit/test_plugin_xep_0082.py
@@ -19,6 +19,7 @@
from datetime import date, datetime, time, timezone
import pytest
+from sat.core import exceptions
from sat.plugins.plugin_xep_0082 import XEP_0082
@@ -104,7 +105,7 @@
assert XEP_0082.parse_datetime("1969-07-20T21:56:15-05:00") == value
# Without timezone, without a fraction of a second
- with pytest.raises(ValueError):
+ with pytest.raises(exceptions.ParsingError):
XEP_0082.parse_datetime("1969-07-21T02:56:15")
# With timezone 'Z', with a fraction of a second consisting of two digits
@@ -123,7 +124,7 @@
assert XEP_0082.parse_datetime("1969-07-20T21:56:15.-05:00") == value
# Without timezone, with a fraction of a second consisting of six digits
- with pytest.raises(ValueError):
+ with pytest.raises(exceptions.ParsingError):
XEP_0082.parse_datetime("1969-07-21T02:56:15.123456")
diff --git a/tests/unit/test_plugin_xep_0373.py b/tests/unit/test_plugin_xep_0373.py
new file mode 100644
--- /dev/null
+++ b/tests/unit/test_plugin_xep_0373.py
@@ -0,0 +1,146 @@
+from datetime import datetime, timedelta, timezone
+from sat.plugins.plugin_xep_0373 import XEP_0373, NS_OX
+from sat.tools.xmpp_datetime import parse_datetime
+
+import pytest
+from twisted.words.protocols.jabber import jid
+
+
+a = jid.JID("foo@example.com")
+b = jid.JID("bar@example.com")
+
+
+def test_signcrypt_element_args() -> None:
+ with pytest.raises(ValueError):
+ XEP_0373.build_signcrypt_element([])
+
+
+def test_signcrypt_element() -> None:
+ signcrypt_elt, payload_elt = XEP_0373.build_signcrypt_element([ a, b ])
+ payload_elt.addElement("signcrypt-test-content", content="signcrypt test content")
+
+ rpad_elt = next(signcrypt_elt.elements(NS_OX, "rpad"))
+ time_elt = next(signcrypt_elt.elements(NS_OX, "time"))
+
+ rpad = str(rpad_elt)
+ timestamp = parse_datetime(time_elt["stamp"])
+
+ signcrypt_elt.children.remove(rpad_elt)
+ signcrypt_elt.children.remove(time_elt)
+
+ assert rpad
+ assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+ assert signcrypt_elt.toXml() == (
+ "<signcrypt xmlns='urn:xmpp:openpgp:0'>"
+ "<to jid='foo@example.com'/>"
+ "<to jid='bar@example.com'/>"
+ "<payload>"
+ "<signcrypt-test-content>signcrypt test content</signcrypt-test-content>"
+ "</payload>"
+ "</signcrypt>"
+ )
+
+
+def test_sign_element_args() -> None:
+ with pytest.raises(ValueError):
+ XEP_0373.build_sign_element([], True)
+
+
+def test_sign_element_with_rpad() -> None:
+ sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], True)
+ payload_elt.addElement("sign-test-content", content="sign test content")
+
+ rpad_elt = next(sign_elt.elements(NS_OX, "rpad"))
+ time_elt = next(sign_elt.elements(NS_OX, "time"))
+
+ rpad = str(rpad_elt)
+ timestamp = parse_datetime(time_elt["stamp"])
+
+ sign_elt.children.remove(rpad_elt)
+ sign_elt.children.remove(time_elt)
+
+ assert rpad
+ assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+ assert sign_elt.toXml() == (
+ "<sign xmlns='urn:xmpp:openpgp:0'>"
+ "<to jid='foo@example.com'/>"
+ "<to jid='bar@example.com'/>"
+ "<payload>"
+ "<sign-test-content>sign test content</sign-test-content>"
+ "</payload>"
+ "</sign>"
+ )
+
+
+def test_sign_element_without_rpad() -> None:
+ sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], False)
+ payload_elt.addElement("sign-test-content", content="sign test content")
+
+ rpad_elt = next(sign_elt.elements(NS_OX, "rpad"), None)
+ time_elt = next(sign_elt.elements(NS_OX, "time"))
+
+ timestamp = parse_datetime(time_elt["stamp"])
+
+ sign_elt.children.remove(time_elt)
+
+ assert rpad_elt is None
+ assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+ assert sign_elt.toXml() == (
+ "<sign xmlns='urn:xmpp:openpgp:0'>"
+ "<to jid='foo@example.com'/>"
+ "<to jid='bar@example.com'/>"
+ "<payload>"
+ "<sign-test-content>sign test content</sign-test-content>"
+ "</payload>"
+ "</sign>"
+ )
+
+
+def test_crypt_element_with_recipients() -> None:
+ crypt_elt, payload_elt = XEP_0373.build_crypt_element([ a, b ])
+ payload_elt.addElement("crypt-test-content", content="crypt test content")
+
+ rpad_elt = next(crypt_elt.elements(NS_OX, "rpad"))
+ time_elt = next(crypt_elt.elements(NS_OX, "time"))
+
+ rpad = str(rpad_elt)
+ timestamp = parse_datetime(time_elt["stamp"])
+
+ crypt_elt.children.remove(rpad_elt)
+ crypt_elt.children.remove(time_elt)
+
+ assert rpad
+ assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+ assert crypt_elt.toXml() == (
+ "<crypt xmlns='urn:xmpp:openpgp:0'>"
+ "<to jid='foo@example.com'/>"
+ "<to jid='bar@example.com'/>"
+ "<payload>"
+ "<crypt-test-content>crypt test content</crypt-test-content>"
+ "</payload>"
+ "</crypt>"
+ )
+
+
+def test_crypt_element_without_recipients() -> None:
+ crypt_elt, payload_elt = XEP_0373.build_crypt_element([])
+ payload_elt.addElement("crypt-test-content", content="crypt test content")
+
+ rpad_elt = next(crypt_elt.elements(NS_OX, "rpad"))
+ time_elt = next(crypt_elt.elements(NS_OX, "time"))
+
+ rpad = str(rpad_elt)
+ timestamp = parse_datetime(time_elt["stamp"])
+
+ crypt_elt.children.remove(rpad_elt)
+ crypt_elt.children.remove(time_elt)
+
+ assert rpad
+ assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+ assert crypt_elt.toXml() == (
+ "<crypt xmlns='urn:xmpp:openpgp:0'>"
+ "<payload>"
+ "<crypt-test-content>crypt test content</crypt-test-content>"
+ "</payload>"
+ "</crypt>"
+ )
diff --git a/tests/unit/test_plugin_xep_0420.py b/tests/unit/test_plugin_xep_0420.py
--- a/tests/unit/test_plugin_xep_0420.py
+++ b/tests/unit/test_plugin_xep_0420.py
@@ -20,6 +20,7 @@
from typing import Callable, cast
import pytest
+from sat.core import exceptions
from sat.plugins.plugin_xep_0334 import NS_HINTS
from sat.plugins.plugin_xep_0420 import (
@@ -435,7 +436,7 @@
</body>
</message>""")
- with pytest.raises(ValueError):
+ with pytest.raises(exceptions.ParsingError):
XEP_0420.unpack_stanza(
unpacking_profile,
stanza,
@@ -558,5 +559,5 @@
<unknown-affix unknown-attr="unknown"/>
</envelope>""".encode("utf-8")
- with pytest.raises(ValueError):
+ with pytest.raises(exceptions.ParsingError):
XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)