diff --git a/.hgignore b/.hgignore
--- a/.hgignore
+++ b/.hgignore
@@ -18,3 +18,8 @@
Session.vim
.build
.pytest_cache
+env/
+.eggs/
+libervia_backend.egg-info/
+.mypy_cache/
+twisted/plugins/dropin.cache
diff --git a/dev-requirements.txt b/dev-requirements.txt
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -1,4 +1,8 @@
-r requirements.txt
+lxml-stubs
+
+mypy
+pylint
pytest
pytest_twisted
diff --git a/pylintrc b/pylintrc
new file mode 100644
--- /dev/null
+++ b/pylintrc
@@ -0,0 +1,498 @@
+[MASTER]
+
+# A comma-separated list of package or module names from where C extensions may
+# be loaded. Extensions are loading into the active Python interpreter and may
+# run arbitrary code.
+extension-pkg-allow-list=lxml.etree
+
+# Add files or directories to the blacklist. They should be base names, not
+# paths.
+ignore=CVS
+
+# Add files or directories matching the regex patterns to the blacklist. The
+# regex matches against base names, not paths.
+ignore-patterns=
+
+# Python code to execute, usually for sys.path manipulation such as
+# pygtk.require().
+#init-hook=
+
+# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
+# number of processors available to use.
+jobs=1
+
+# Control the amount of potential inferred values when inferring a single
+# object. This can help the performance when dealing with large functions or
+# complex, nested conditions.
+limit-inference-results=100
+
+# List of plugins (as comma separated values of python module names) to load,
+# usually to register additional checkers.
+load-plugins=
+
+# Pickle collected data for later comparisons.
+persistent=yes
+
+# Specify a configuration file.
+#rcfile=
+
+# When enabled, pylint would attempt to guess common misconfiguration and emit
+# user-friendly hints instead of false-positive error messages.
+suggestion-mode=yes
+
+# Allow loading of arbitrary C extensions. Extensions are imported into the
+# active Python interpreter and may run arbitrary code.
+unsafe-load-any-extension=no
+
+
+[MESSAGES CONTROL]
+
+# Only show warnings with the listed confidence levels. Leave empty to show
+# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED.
+confidence=
+
+# Disable the message, report, category or checker with the given id(s). You
+# can either give multiple identifiers separated by comma (,) or put this
+# option multiple times (only on the command line, not in the configuration
+# file where it should appear only once). You can also use "--disable=all" to
+# disable everything first and then reenable specific checks. For example, if
+# you want to run only the similarities checker, you can use "--disable=all
+# --enable=similarities". If you want to run only the classes checker, but have
+# no Warning level messages displayed, use "--disable=all --enable=classes
+# --disable=W".
+disable=missing-module-docstring,
+ duplicate-code,
+ fixme,
+ logging-fstring-interpolation
+
+# Enable the message, report, category or checker with the given id(s). You can
+# either give multiple identifier separated by comma (,) or put this option
+# multiple time (only on the command line, not in the configuration file where
+# it should appear only once). See also the "--disable" option for examples.
+enable=useless-suppression
+
+
+[REPORTS]
+
+# Python expression which should return a score less than or equal to 10. You
+# have access to the variables 'error', 'warning', 'refactor', and 'convention'
+# which contain the number of messages in each category, as well as 'statement'
+# which is the total number of statements analyzed. This score is used by the
+# global evaluation report (RP0004).
+evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
+
+# Template used to display messages. This is a python new-style format string
+# used to format the message information. See doc for all details.
+#msg-template=
+
+# Set the output format. Available formats are text, parseable, colorized, json
+# and msvs (visual studio). You can also give a reporter class, e.g.
+# mypackage.mymodule.MyReporterClass.
+output-format=text
+
+# Tells whether to display a full report or only the messages.
+reports=no
+
+# Activate the evaluation score.
+score=yes
+
+
+[REFACTORING]
+
+# Maximum number of nested blocks for function / method body
+max-nested-blocks=5
+
+# Complete name of functions that never returns. When checking for
+# inconsistent-return-statements if a never returning function is called then
+# it will be considered as an explicit return statement and no message will be
+# printed.
+never-returning-functions=sys.exit
+
+
+[SPELLING]
+
+# Limits count of emitted suggestions for spelling mistakes.
+max-spelling-suggestions=4
+
+# Spelling dictionary name. Available dictionaries: none. To make it work,
+# install the python-enchant package.
+spelling-dict=
+
+# List of comma separated words that should not be checked.
+spelling-ignore-words=
+
+# A path to a file that contains the private dictionary; one word per line.
+spelling-private-dict-file=
+
+# Tells whether to store unknown words to the private dictionary (see the
+# --spelling-private-dict-file option) instead of raising a message.
+spelling-store-unknown-words=no
+
+
+[VARIABLES]
+
+# List of additional names supposed to be defined in builtins. Remember that
+# you should avoid defining new builtins when possible.
+additional-builtins=
+
+# Tells whether unused global variables should be treated as a violation.
+allow-global-unused-variables=no
+
+# List of strings which can identify a callback function by name. A callback
+# name must start or end with one of those strings.
+callbacks=cb_,
+ _cb
+
+# A regular expression matching the name of dummy variables (i.e. expected to
+# not be used).
+dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
+
+# Argument names that match this expression will be ignored. Default to name
+# with leading underscore.
+ignored-argument-names=_.*|^ignored_|^unused_
+
+# Tells whether we should check for unused import in __init__ files.
+init-import=no
+
+# List of qualified module names which can have objects that can redefine
+# builtins.
+redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
+
+
+[LOGGING]
+
+# Format style used to check logging format string. `old` means using %
+# formatting, `new` is for `{}` formatting,and `fstr` is for f-strings.
+logging-format-style=old
+
+# Logging modules to check that the string format arguments are in logging
+# function parameter format.
+logging-modules=logging
+
+
+[FORMAT]
+
+# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
+expected-line-ending-format=
+
+# Regexp for a line that is allowed to be longer than the limit.
+ignore-long-lines=^\s*(# )?<?https?://\S+>?$
+
+# Number of spaces of indent required inside a hanging or continued line.
+indent-after-paren=4
+
+# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
+# tab).
+indent-string=' '
+
+# Maximum number of characters on a single line.
+max-line-length=90
+
+# Maximum number of lines in a module.
+max-module-lines=10000
+
+# Allow the body of a class to be on the same line as the declaration if body
+# contains single statement.
+single-line-class-stmt=no
+
+# Allow the body of an if to be on the same line as the test if there is no
+# else.
+single-line-if-stmt=yes
+
+
+[TYPECHECK]
+
+# List of decorators that produce context managers, such as
+# contextlib.contextmanager. Add to this list to register other decorators that
+# produce valid context managers.
+contextmanager-decorators=contextlib.contextmanager
+
+# List of members which are set dynamically and missed by pylint inference
+# system, and so shouldn't trigger E1101 when accessed. Python regular
+# expressions are accepted.
+generated-members=
+
+# Tells whether missing members accessed in mixin class should be ignored. A
+# mixin class is detected if its name ends with "mixin" (case insensitive).
+ignore-mixin-members=yes
+
+# Tells whether to warn about missing members when the owner of the attribute
+# is inferred to be None.
+ignore-none=no
+
+# This flag controls whether pylint should warn about no-member and similar
+# checks whenever an opaque object is returned when inferring. The inference
+# can return multiple potential results while evaluating a Python object, but
+# some branches might not be evaluated, which results in partial inference. In
+# that case, it might be useful to still emit no-member and other checks for
+# the rest of the inferred objects.
+ignore-on-opaque-inference=no
+
+# List of class names for which member attributes should not be checked (useful
+# for classes with dynamically set attributes). This supports the use of
+# qualified names.
+ignored-classes=optparse.Values,thread._local,_thread._local
+
+# List of module names for which member attributes should not be checked
+# (useful for modules/projects where namespaces are manipulated during runtime
+# and thus existing member attributes cannot be deduced by static analysis). It
+# supports qualified module names, as well as Unix pattern matching.
+ignored-modules=
+
+# Show a hint with possible names when a member name was not found. The aspect
+# of finding the hint is based on edit distance.
+missing-member-hint=yes
+
+# The minimum edit distance a name should have in order to be considered a
+# similar match for a missing member name.
+missing-member-hint-distance=1
+
+# The total number of similar names that should be taken in consideration when
+# showing a hint for a missing member.
+missing-member-max-choices=1
+
+# List of decorators that change the signature of a decorated function.
+signature-mutators=
+
+
+[STRING]
+
+# This flag controls whether the implicit-str-concat-in-sequence should
+# generate a warning on implicit string concatenation in sequences defined over
+# several lines.
+check-str-concat-over-line-jumps=no
+
+
+[SIMILARITIES]
+
+# Ignore comments when computing similarities.
+ignore-comments=yes
+
+# Ignore docstrings when computing similarities.
+ignore-docstrings=yes
+
+# Ignore imports when computing similarities.
+ignore-imports=no
+
+# Minimum lines number of a similarity.
+min-similarity-lines=4
+
+
+[BASIC]
+
+# Naming style matching correct argument names.
+argument-naming-style=snake_case
+
+# Regular expression matching correct argument names. Overrides argument-
+# naming-style.
+#argument-rgx=
+
+# Naming style matching correct attribute names.
+attr-naming-style=snake_case
+
+# Regular expression matching correct attribute names. Overrides attr-naming-
+# style.
+#attr-rgx=
+
+# Bad variable names which should always be refused, separated by a comma.
+bad-names=foo,
+ bar,
+ baz,
+ toto,
+ tutu,
+ tata
+
+# Naming style matching correct class attribute names.
+class-attribute-naming-style=UPPER_CASE
+
+# Regular expression matching correct class attribute names. Overrides class-
+# attribute-naming-style.
+#class-attribute-rgx=
+
+# Naming style matching correct class names.
+class-naming-style=PascalCase
+
+# Regular expression matching correct class names. Overrides class-naming-
+# style.
+#class-rgx=
+
+# Naming style matching correct constant names.
+const-naming-style=any
+
+# Regular expression matching correct constant names. Overrides const-naming-
+# style.
+#const-rgx=
+
+# Minimum line length for functions/classes that require docstrings, shorter
+# ones are exempt.
+docstring-min-length=-1
+
+# Naming style matching correct function names.
+function-naming-style=snake_case
+
+# Regular expression matching correct function names. Overrides function-
+# naming-style.
+#function-rgx=
+
+# Good variable names which should always be accepted, separated by a comma.
+good-names=i,
+ j,
+ e, # exceptions in except blocks
+ _
+
+# Include a hint for the correct naming format with invalid-name.
+include-naming-hint=no
+
+# Naming style matching correct inline iteration names.
+inlinevar-naming-style=any
+
+# Regular expression matching correct inline iteration names. Overrides
+# inlinevar-naming-style.
+#inlinevar-rgx=
+
+# Naming style matching correct method names.
+method-naming-style=snake_case
+
+# Regular expression matching correct method names. Overrides method-naming-
+# style.
+#method-rgx=
+
+# Naming style matching correct module names.
+module-naming-style=snake_case
+
+# Regular expression matching correct module names. Overrides module-naming-
+# style.
+#module-rgx=
+
+# Colon-delimited sets of names that determine each other's naming style when
+# the name regexes allow several styles.
+name-group=
+
+# Regular expression which should only match function or class names that do
+# not require a docstring.
+no-docstring-rgx=^_
+
+# List of decorators that produce properties, such as abc.abstractproperty. Add
+# to this list to register other decorators that produce valid properties.
+# These decorators are taken in consideration only for invalid-name.
+property-classes=abc.abstractproperty
+
+# Naming style matching correct variable names.
+variable-naming-style=snake_case
+
+# Regular expression matching correct variable names. Overrides variable-
+# naming-style.
+#variable-rgx=
+
+
+[MISCELLANEOUS]
+
+# List of note tags to take in consideration, separated by a comma.
+notes=FIXME,
+ XXX,
+ TODO
+
+
+[IMPORTS]
+
+# List of modules that can be imported at any level, not just the top level
+# one.
+allow-any-import-level=
+
+# Allow wildcard imports from modules that define __all__.
+allow-wildcard-with-all=no
+
+# Analyse import fallback blocks. This can be used to support both Python 2 and
+# 3 compatible code, which means that the block might have code that exists
+# only in one or another interpreter, leading to false positives when analysed.
+analyse-fallback-blocks=yes
+
+# Deprecated modules which should not be used, separated by a comma.
+deprecated-modules=optparse,tkinter.tix
+
+# Create a graph of external dependencies in the given file (report RP0402 must
+# not be disabled).
+ext-import-graph=
+
+# Create a graph of every (i.e. internal and external) dependencies in the
+# given file (report RP0402 must not be disabled).
+import-graph=
+
+# Create a graph of internal dependencies in the given file (report RP0402 must
+# not be disabled).
+int-import-graph=
+
+# Force import order to recognize a module as part of the standard
+# compatibility libraries.
+known-standard-library=
+
+# Force import order to recognize a module as part of a third party library.
+known-third-party=enchant
+
+# Couples of modules and preferred modules, separated by a comma.
+preferred-modules=
+
+
+[CLASSES]
+
+# List of method names used to declare (i.e. assign) instance attributes.
+defining-attr-methods=__init__,
+ __new__,
+ setUp,
+ __post_init__,
+ create
+
+# List of member names, which should be excluded from the protected access
+# warning.
+exclude-protected=_asdict,
+ _fields,
+ _replace,
+ _source,
+ _make
+
+# List of valid names for the first argument in a class method.
+valid-classmethod-first-arg=cls
+
+# List of valid names for the first argument in a metaclass class method.
+valid-metaclass-classmethod-first-arg=cls
+
+
+[DESIGN]
+
+# Maximum number of arguments for function / method.
+max-args=100
+
+# Maximum number of attributes for a class (see R0902).
+max-attributes=100
+
+# Maximum number of boolean expressions in an if statement (see R0916).
+max-bool-expr=10
+
+# Maximum number of branch for function / method body.
+max-branches=100
+
+# Maximum number of locals for function / method body.
+max-locals=100
+
+# Maximum number of parents for a class (see R0901).
+max-parents=10
+
+# Maximum number of public methods for a class (see R0904).
+max-public-methods=100
+
+# Maximum number of return / yield for function / method body.
+max-returns=100
+
+# Maximum number of statements in function / method body.
+max-statements=1000
+
+# Minimum number of public methods for a class (see R0903).
+min-public-methods=0
+
+
+[EXCEPTIONS]
+
+# Exceptions that will emit a warning when being caught. Defaults to
+# "BaseException, Exception".
+overgeneral-exceptions=BaseException,
+ Exception
diff --git a/sat/core/i18n.py b/sat/core/i18n.py
--- a/sat/core/i18n.py
+++ b/sat/core/i18n.py
@@ -17,6 +17,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from typing import Callable, cast
from sat.core.log import getLogger
@@ -40,10 +41,10 @@
except ImportError:
log.warning("gettext support disabled")
- _ = lambda msg: msg # Libervia doesn't support gettext
+ _ = cast(Callable[[str], str], lambda msg: msg) # Libervia doesn't support gettext
def languageSwitch(lang=None):
pass
-D_ = lambda msg: msg # used for deferred translations
+D_ = cast(Callable[[str], str], lambda msg: msg) # used for deferred translations
diff --git a/sat/plugins/plugin_xep_0082.py b/sat/plugins/plugin_xep_0082.py
new file mode 100644
--- /dev/null
+++ b/sat/plugins/plugin_xep_0082.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for XMPP Date and Time Profile formatting and parsing with Python's
+# datetime package
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Type-check with `mypy --strict`
+# Lint with `pylint`
+
+from sat.core.constants import Const as C
+from sat.core.i18n import D_
+from sat.core.sat_main import SAT
+from sat.tools import datetime
+
+
+__all__ = [ # pylint: disable=unused-variable
+ "PLUGIN_INFO",
+ "XEP_0082"
+]
+
+
+PLUGIN_INFO = {
+ C.PI_NAME: "XMPP Date and Time Profiles",
+ C.PI_IMPORT_NAME: "XEP-0082",
+ C.PI_TYPE: C.PLUG_TYPE_MISC,
+ C.PI_PROTOCOLS: [ "XEP-0082" ],
+ C.PI_DEPENDENCIES: [],
+ C.PI_RECOMMENDATIONS: [],
+ C.PI_MAIN: "XEP_0082",
+ C.PI_HANDLER: "no",
+ C.PI_DESCRIPTION: D_("Date and Time Profiles for XMPP"),
+}
+
+
+class XEP_0082: # pylint: disable=invalid-name
+ """
+ Implementation of the date and time profiles specified in XEP-0082 using Python's
+ datetime module. The legacy format described in XEP-0082 section "4. Migration" is not
+ supported. Reexports of the functions in :mod:`sat.tools.datetime`.
+
+ This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas
+ actively, but offers API for other plugins to use.
+ """
+
+ def __init__(self, sat: SAT) -> None:
+ """
+ @param sat: The SAT instance.
+ """
+
+ format_date = staticmethod(datetime.format_date)
+ parse_date = staticmethod(datetime.parse_date)
+ format_datetime = staticmethod(datetime.format_datetime)
+ parse_datetime = staticmethod(datetime.parse_datetime)
+ format_time = staticmethod(datetime.format_time)
+ parse_time = staticmethod(datetime.parse_time)
diff --git a/sat/plugins/plugin_xep_0420.py b/sat/plugins/plugin_xep_0420.py
new file mode 100644
--- /dev/null
+++ b/sat/plugins/plugin_xep_0420.py
@@ -0,0 +1,582 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Stanza Content Encryption
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Type-check with `mypy --strict --disable-error-code no-untyped-call`
+# Lint with `pylint`
+
+from abc import ABC, abstractmethod
+from datetime import datetime
+import enum
+import secrets
+import string
+from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast
+
+from lxml import etree
+
+from sat.core.constants import Const as C
+from sat.core.i18n import D_
+from sat.core.log import Logger, getLogger
+from sat.core.sat_main import SAT
+from sat.tools.xml_tools import ElementParser
+from sat.plugins.plugin_xep_0033 import NS_ADDRESS
+from sat.plugins.plugin_xep_0082 import XEP_0082
+from sat.plugins.plugin_xep_0334 import NS_HINTS
+from sat.plugins.plugin_xep_0359 import NS_SID
+from sat.plugins.plugin_xep_0380 import NS_EME
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+
+__all__ = [ # pylint: disable=unused-variable
+ "PLUGIN_INFO",
+ "NS_SCE",
+ "XEP_0420",
+ "ProfileRequirementsNotMet",
+ "AffixVerificationFailed",
+ "SCECustomAffix",
+ "SCEAffixPolicy",
+ "SCEProfile",
+ "SCEAffixValues"
+]
+
+
+log = cast(Logger, getLogger(__name__))
+
+
+PLUGIN_INFO = {
+ C.PI_NAME: "SCE",
+ C.PI_IMPORT_NAME: "XEP-0420",
+ C.PI_TYPE: "SEC",
+ C.PI_PROTOCOLS: [ "XEP-0420" ],
+ C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ],
+ C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ],
+ C.PI_MAIN: "XEP_0420",
+ C.PI_HANDLER: "no",
+ C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"),
+}
+
+
+NS_SCE = "urn:xmpp:sce:1"
+
+
+class ProfileRequirementsNotMet(Exception):
+ """
+ Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the
+ profile are not met.
+ """
+
+
+class AffixVerificationFailed(Exception):
+ """
+ Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure.
+ """
+
+
+class SCECustomAffix(ABC):
+ """
+ Interface for custom affixes of SCE profiles.
+ """
+
+ @property
+ @abstractmethod
+ def element_name(self) -> str:
+ """
+ @return: The name of the affix's XML element.
+ """
+
+ @property
+ @abstractmethod
+ def element_schema(self) -> str:
+ """
+ @return: The XML schema definition of the affix element's XML structure, i.e. the
+ ``<xs:element/>`` schema element. This element will be referenced using
+ ``<xs:element ref="{element_name}"/>``.
+ """
+
+ @abstractmethod
+ def create(self, stanza: domish.Element) -> domish.Element:
+ """
+ @param stanza: The stanza element which has been processed by
+ :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed
+ and only the root ``<message/>`` or ``<iq/>`` and unencryptable children
+ remain. Do not modify.
+ @return: An affix element to include in the envelope. The element must have the
+ name :attr:`element_name` and must validate using :attr:`element_schema`.
+ @raise ValueError: if the affix couldn't be built.
+ """
+
+ @abstractmethod
+ def verify(self, stanza: domish.Element, element: domish.Element) -> None:
+ """
+ @param stanza: The stanza element before being processed by
+ :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been
+ removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable
+ children remain. Do not modify.
+ @param element: The affix element to verify.
+ @raise AffixVerificationFailed: on verification failure.
+ """
+
+
+@enum.unique
+class SCEAffixPolicy(enum.Enum):
+ """
+ Policy for the presence of an affix in an SCE envelope.
+ """
+
+ REQUIRED: str = "REQUIRED"
+ OPTIONAL: str = "OPTIONAL"
+ NOT_NEEDED: str = "NOT_NEEDED"
+
+
+class SCEProfile(NamedTuple):
+ # pylint: disable=invalid-name
+ """
+ An SCE profile, i.e. the definition which affixes are required, optional or not needed
+ at all by an SCE-enabled encryption protocol.
+ """
+
+ rpad_policy: SCEAffixPolicy
+ time_policy: SCEAffixPolicy
+ to_policy: SCEAffixPolicy
+ from_policy: SCEAffixPolicy
+ custom_policies: Dict[SCECustomAffix, SCEAffixPolicy]
+
+
+class SCEAffixValues(NamedTuple):
+ # pylint: disable=invalid-name
+ """
+ Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values
+ of all affixes included in the envelope. For custom affixes, the whole affix element
+ is returned.
+ """
+
+ rpad: Optional[str]
+ timestamp: Optional[datetime]
+ recipient: Optional[jid.JID]
+ sender: Optional[jid.JID]
+ custom: Dict[SCECustomAffix, domish.Element]
+
+
+ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ targetNamespace="urn:xmpp:sce:1"
+ xmlns="urn:xmpp:sce:1">
+
+ <xs:element name="envelope">
+ <xs:complexType>
+ <xs:all>
+ <xs:element ref="content"/>
+ <xs:element ref="rpad" minOccurs="0"/>
+ <xs:element ref="time" minOccurs="0"/>
+ <xs:element ref="to" minOccurs="0"/>
+ <xs:element ref="from" minOccurs="0"/>
+ {custom_affix_references}
+ </xs:all>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="content">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="rpad" type="xs:string"/>
+
+ <xs:element name="time">
+ <xs:complexType>
+ <xs:attribute name="stamp" type="xs:dateTime"/>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="to">
+ <xs:complexType>
+ <xs:attribute name="jid" type="xs:string"/>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="from">
+ <xs:complexType>
+ <xs:attribute name="jid" type="xs:string"/>
+ </xs:complexType>
+ </xs:element>
+
+ {custom_affix_definitions}
+</xs:schema>
+"""
+
+
+class XEP_0420: # pylint: disable=invalid-name
+ """
+ Implementation of XEP-0420: Stanza Content Encryption under namespace
+ ``urn:xmpp:sce:1``.
+
+ This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas
+ actively, but offers API for other plugins to use.
+ """
+
+ # Set of namespaces whose elements are never allowed to be transferred in an encrypted
+ # envelope.
+ MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = {
+ NS_HINTS,
+ NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id
+ NS_ADDRESS,
+ # Not part of the specification (yet), but just doesn't make sense in an encrypted
+ # envelope:
+ NS_EME
+ }
+
+ # Set of (namespace, element name) tuples that define elements which are never allowed
+ # to be transferred in an encrypted envelope. If all elements under a certain
+ # namespace are forbidden, the namespace can be added to
+ # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead.
+ # Note: only full namespaces are forbidden by the spec for now, the following is for
+ # potential future use.
+ MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set()
+
+ def __init__(self, sat: SAT) -> None:
+ """
+ @param sat: The SAT instance.
+ """
+
+ @staticmethod
+ def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes:
+ """Pack a stanza according to Stanza Content Encryption.
+
+ Removes all elements from the stanza except for a few exceptions that explicitly
+ need to be transferred in plaintext, e.g. because they contain hints/instructions
+ for the server on how to process the stanza. Together with the affix elements as
+ requested by the profile, the removed elements are added to an envelope XML
+ structure that builds the plaintext to be encrypted by the SCE-enabled encryption
+ scheme. Optional affixes are always added to the structure, i.e. they are treated
+ by the packing code as if they were required.
+
+ Once built, the envelope structure is serialized to a byte string and returned for
+ the encryption scheme to encrypt and add to the stanza.
+
+ @param profile: The SCE profile, i.e. the definition of affixes to include in the
+ envelope.
+ @param stanza: The stanza to process. Will be modified by the call.
+ @return: The serialized envelope structure that builds the plaintext for the
+ encryption scheme to process.
+ @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza
+ doesn't have the "to"/"from" attribute set to extract the value from. Can also
+ be raised by custom affixes.
+
+ @warning: It is up to the calling code to add a <store/> message processing hint
+ if applicable.
+ """
+
+ # Prepare the envelope and content elements
+ envelope = domish.Element((NS_SCE, "envelope"))
+ content = envelope.addElement((NS_SCE, "content"))
+
+ # Note the serialized byte size of the content element before adding any children
+ empty_content_byte_size = len(content.toXml().encode("utf-8"))
+
+ # Just for type safety
+ stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
+ content_children = cast(List[Union[domish.Element, str]], content.children)
+
+ # Move elements that are not explicitly forbidden from being encrypted from the
+ # stanza to the content element.
+ for child in list(cast(Iterator[domish.Element], stanza.elements())):
+ if (
+ child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
+ and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
+ ):
+ # Remove the child from the stanza
+ stanza_children.remove(child)
+
+ # A namespace of ``None`` can be used on domish elements to inherit the
+ # namespace from the parent. When moving elements from the stanza root to
+ # the content element, however, we don't want elements to inherit the
+ # namespace of the content element. Thus, check for elements with ``None``
+ # for their namespace and set the namespace to jabber:client, which is the
+ # namespace of the parent element.
+ if child.uri is None:
+ child.uri = C.NS_CLIENT
+ child.defaultUri = C.NS_CLIENT
+
+ # Add the child with corrected namespaces to the content element
+ content_children.append(child)
+
+ # Add the affixes requested by the profile
+ if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED:
+ # The specification defines the rpad affix to contain "[...] a randomly
+ # generated sequence of random length between 0 and 200 characters." This
+ # implementation differs a bit from the specification in that a minimum size
+ # other than 0 is chosen depending on the serialized size of the content
+ # element. This is to prevent the scenario where the encrypted content is
+ # short and the rpad is also randomly chosen to be short, which could allow
+ # guessing the content of a short message. To do so, the rpad length is first
+ # chosen to pad the content to at least 53 bytes, then afterwards another 0 to
+ # 200 bytes are added. Note that single-byte characters are used by this
+ # implementation, thus the number of characters equals the number of bytes.
+ content_byte_size = len(content.toXml().encode("utf-8"))
+ content_byte_size_diff = content_byte_size - empty_content_byte_size
+ rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201)
+ rpad_content = "".join(
+ secrets.choice(string.digits + string.ascii_letters + string.punctuation)
+ for __
+ in range(rpad_length)
+ )
+ envelope.addElement((NS_SCE, "rpad"), content=rpad_content)
+
+ if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED:
+ time_element = envelope.addElement((NS_SCE, "time"))
+ time_element["stamp"] = XEP_0082.format_datetime()
+
+ if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED:
+ recipient = cast(Optional[str], stanza.getAttribute("to", None))
+ if recipient is None:
+ raise ValueError(
+ "<to/> affix requested, but stanza doesn't have the 'to' attribute"
+ " set."
+ )
+
+ to_element = envelope.addElement((NS_SCE, "to"))
+ to_element["jid"] = jid.JID(recipient).userhost()
+
+ if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED:
+ sender = cast(Optional[str], stanza.getAttribute("from", None))
+ if sender is None:
+ raise ValueError(
+ "<from/> affix requested, but stanza doesn't have the 'from'"
+ " attribute set."
+ )
+
+ from_element = envelope.addElement((NS_SCE, "from"))
+ from_element["jid"] = jid.JID(sender).userhost()
+
+ for affix, policy in profile.custom_policies.items():
+ if policy is not SCEAffixPolicy.NOT_NEEDED:
+ envelope.addChild(affix.create(stanza))
+
+ return cast(str, envelope.toXml()).encode("utf-8")
+
+ @staticmethod
+ def unpack_stanza(
+ profile: SCEProfile,
+ stanza: domish.Element,
+ envelope_serialized: bytes
+ ) -> SCEAffixValues:
+ """Unpack a stanza packed according to Stanza Content Encryption.
+
+ Parses the serialized envelope as XML, verifies included affixes and makes sure
+ the requirements of the profile are met, and restores the stanza by moving
+ decrypted elements from the envelope back to the stanza top level.
+
+ @param profile: The SCE profile, i.e. the definition of affixes that have to/may
+ be included in the envelope.
+ @param stanza: The stanza to process. Will be modified by the call.
+ @param envelope_serialized: The serialized envelope, i.e. the plaintext produced
+ by the decryption scheme utilizing SCE.
+ @return: The parsed and processed values of all affixes that were present on the
+ envelope, notably including the timestamp.
+ @raise ValueError: if the serialized envelope element is malformed.
+ @raise ProfileRequirementsNotMet: if one or more affixes required by the profile
+ are missing from the envelope.
+ @raise AffixVerificationFailed: if an affix included in the envelope fails to
+ validate. It doesn't matter whether the affix is required by the profile or
+ not, all affixes included in the envelope are validated and cause this
+ exception to be raised on failure.
+
+ @warning: It is up to the calling code to verify the timestamp, if returned, since
+ the requirements on the timestamp may vary between SCE-enabled protocols.
+ """
+
+ try:
+ envelope_serialized_string = envelope_serialized.decode("utf-8")
+ except UnicodeError as e:
+ raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e
+
+ custom_affixes = set(profile.custom_policies.keys())
+
+ # Make sure the envelope adheres to the schema
+ parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format(
+ custom_affix_references="".join(
+ f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>'
+ for custom_affix
+ in custom_affixes
+ ),
+ custom_affix_definitions="".join(
+ custom_affix.element_schema
+ for custom_affix
+ in custom_affixes
+ )
+ ).encode("utf-8"))))
+
+ try:
+ etree.fromstring(envelope_serialized_string, parser)
+ except etree.XMLSyntaxError as e:
+ raise ValueError("Serialized envelope doesn't pass schema validation.") from e
+
+ # Prepare the envelope and content elements
+ envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
+ content = cast(domish.Element, next(envelope.elements(NS_SCE, "content")))
+
+ # Verify the affixes
+ rpad_element = cast(
+ Optional[domish.Element],
+ next(envelope.elements(NS_SCE, "rpad"), None)
+ )
+ time_element = cast(
+ Optional[domish.Element],
+ next(envelope.elements(NS_SCE, "time"), None)
+ )
+ to_element = cast(
+ Optional[domish.Element],
+ next(envelope.elements(NS_SCE, "to"), None)
+ )
+ from_element = cast(
+ Optional[domish.Element],
+ next(envelope.elements(NS_SCE, "from"), None)
+ )
+
+ # The rpad doesn't need verification.
+ rpad_value = None if rpad_element is None else str(rpad_element)
+
+ # The time affix isn't verified other than that the timestamp is parseable.
+ try:
+ timestamp_value = None if time_element is None else \
+ XEP_0082.parse_datetime(time_element["stamp"])
+ except ValueError as e:
+ raise AffixVerificationFailed("Malformed time affix") from e
+
+ # The to affix is verified by comparing the to attribute of the stanza with the
+ # JID referenced by the affix. Note that only bare JIDs are compared as per the
+ # specification.
+ recipient_value: Optional[jid.JID] = None
+ if to_element is not None:
+ recipient_value = jid.JID(to_element["jid"])
+
+ recipient_actual = cast(Optional[str], stanza.getAttribute("to", None))
+ if recipient_actual is None:
+ raise AffixVerificationFailed(
+ "'To' affix is included in the envelope, but the stanza is lacking a"
+ " 'to' attribute to compare the value to."
+ )
+
+ recipient_actual_bare_jid = jid.JID(recipient_actual).userhost()
+ recipient_target_bare_jid = recipient_value.userhost()
+
+ if recipient_actual_bare_jid != recipient_target_bare_jid:
+ raise AffixVerificationFailed(
+ f"Mismatch between actual and target recipient bare JIDs:"
+ f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}."
+ )
+
+ # The from affix is verified by comparing the from attribute of the stanza with
+ # the JID referenced by the affix. Note that only bare JIDs are compared as per
+ # the specification.
+ sender_value: Optional[jid.JID] = None
+ if from_element is not None:
+ sender_value = jid.JID(from_element["jid"])
+
+ sender_actual = cast(Optional[str], stanza.getAttribute("from", None))
+ if sender_actual is None:
+ raise AffixVerificationFailed(
+ "'From' affix is included in the envelope, but the stanza is lacking"
+ " a 'from' attribute to compare the value to."
+ )
+
+ sender_actual_bare_jid = jid.JID(sender_actual).userhost()
+ sender_target_bare_jid = sender_value.userhost()
+
+ if sender_actual_bare_jid != sender_target_bare_jid:
+ raise AffixVerificationFailed(
+ f"Mismatch between actual and target sender bare JIDs:"
+ f" {sender_actual_bare_jid} vs {sender_target_bare_jid}."
+ )
+
+ # Find and verify custom affixes
+ custom_values: Dict[SCECustomAffix, domish.Element] = {}
+ for affix in custom_affixes:
+ element_name = affix.element_name
+ element = cast(
+ Optional[domish.Element],
+ next(envelope.elements(NS_SCE, element_name), None)
+ )
+ if element is not None:
+ affix.verify(stanza, element)
+ custom_values[affix] = element
+
+ # Check whether all affixes required by the profile are present
+ rpad_missing = \
+ profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None
+ time_missing = \
+ profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None
+ to_missing = \
+ profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None
+ from_missing = \
+ profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None
+ custom_missing = any(
+ affix not in custom_values
+ for affix, policy
+ in profile.custom_policies.items()
+ if policy is SCEAffixPolicy.REQUIRED
+ )
+
+ if rpad_missing or time_missing or to_missing or from_missing or custom_missing:
+ custom_missing_string = ""
+ for custom_affix in custom_affixes:
+ value = "present" if custom_affix in custom_values else "missing"
+ custom_missing_string += f", [custom]{custom_affix.element_name}={value}"
+
+ raise ProfileRequirementsNotMet(
+ f"SCE envelope is missing affixes required by the profile {profile}."
+ f" Affix presence:"
+ f" rpad={'missing' if rpad_missing else 'present'}"
+ f", time={'missing' if time_missing else 'present'}"
+ f", to={'missing' if to_missing else 'present'}"
+ f", from={'missing' if from_missing else 'present'}"
+ + custom_missing_string
+ )
+
+ # Just for type safety
+ content_children = cast(List[Union[domish.Element, str]], content.children)
+ stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
+
+ # Move elements that are not explicitly forbidden from being encrypted from the
+ # content element to the stanza.
+ for child in list(cast(Iterator[domish.Element], content.elements())):
+ if (
+ child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
+ or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
+ ):
+ log.warning(
+ f"An element that MUST be transferred in plaintext was found in an"
+ f" SCE envelope: {child.toXml()}"
+ )
+ else:
+ # Remove the child from the content element
+ content_children.remove(child)
+
+ # Add the child to the stanza
+ stanza_children.append(child)
+
+ return SCEAffixValues(
+ rpad_value,
+ timestamp_value,
+ recipient_value,
+ sender_value,
+ custom_values
+ )
diff --git a/sat/tools/datetime.py b/sat/tools/datetime.py
new file mode 100644
--- /dev/null
+++ b/sat/tools/datetime.py
@@ -0,0 +1,182 @@
+#!/usr/bin/env python3
+
+# Libervia: XMPP Date and Time profiles as per XEP-0082
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Type-check with `mypy --strict`
+# Lint with `pylint`
+
+from datetime import date, datetime, time, timezone
+import re
+from typing import Optional, Tuple
+
+
+__all__ = [ # pylint: disable=unused-variable
+ "format_date",
+ "parse_date",
+ "format_datetime",
+ "parse_datetime",
+ "format_time",
+ "parse_time"
+]
+
+
+def __parse_fraction_of_a_second(value: str) -> Tuple[str, Optional[int]]:
+ """
+ datetime's strptime only supports up to six digits of the fraction of a seconds, while
+ the XEP-0082 specification allows for any number of digits. This function parses and
+ removes the optional fraction of a second from the input string.
+
+ @param value: The input string, containing a section of the format [.sss].
+ @return: The input string with the fraction of a second removed, and the fraction of a
+ second parsed with microsecond resolution. Returns the unaltered input string and
+ ``None`` if no fraction of a second was found in the input string.
+ """
+
+ # The following regex matches the optional fraction of a seconds for manual
+ # processing.
+ match = re.search(r"\.(\d*)", value)
+ microsecond: Optional[int] = None
+ if match is not None:
+ # Remove the fraction of a second from the input string
+ value = value[:match.start()] + value[match.end():]
+
+ # datetime supports microsecond resolution for the fraction of a second, thus
+ # limit/pad the parsed fraction of a second to six digits
+ microsecond = int(match.group(1)[:6].ljust(6, '0'))
+
+ return value, microsecond
+
+
+def format_date(value: Optional[date] = None) -> str:
+ """
+ @param value: The date for format. Defaults to the current date in the UTC timezone.
+ @return: The date formatted according to the Date profile specified in XEP-0082.
+
+ @warning: Formatting of the current date in the local timezone may leak geographical
+ information of the sender. Thus, it is advised to only format the current date in
+ UTC.
+ """
+ # CCYY-MM-DD
+
+ # The Date profile of XEP-0082 is equal to the ISO 8601 format.
+ return (datetime.now(timezone.utc).date() if value is None else value).isoformat()
+
+
+def parse_date(value: str) -> date:
+ """
+ @param value: A string containing date information formatted according to the Date
+ profile specified in XEP-0082.
+ @return: The date parsed from the input string.
+ @raise ValueError: if the input string is not correctly formatted.
+ """
+ # CCYY-MM-DD
+
+ # The Date profile of XEP-0082 is equal to the ISO 8601 format.
+ return date.fromisoformat(value)
+
+
+def format_datetime(
+ value: Optional[datetime] = None,
+ include_microsecond: bool = False
+) -> str:
+ """
+ @param value: The datetime to format. Defaults to the current datetime.
+ @param include_microsecond: Include the microsecond of the datetime in the output.
+ @return: The datetime formatted according to the DateTime profile specified in
+ XEP-0082. The datetime is always converted to UTC before formatting to avoid
+ leaking geographical information of the sender.
+ """
+ # CCYY-MM-DDThh:mm:ss[.sss]TZD
+
+ # We format the time in UTC, since the %z formatter of strftime doesn't include colons
+ # to separate hours and minutes which is required by XEP-0082. UTC allows us to put a
+ # simple letter 'Z' as the time zone definition.
+ value = (
+ datetime.now(timezone.utc)
+ if value is None
+ else value.astimezone(timezone.utc) # pylint: disable=no-member
+ )
+
+ if include_microsecond:
+ return value.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+
+ return value.strftime("%Y-%m-%dT%H:%M:%SZ")
+
+
+def parse_datetime(value: str) -> datetime:
+ """
+ @param value: A string containing datetime information formatted according to the
+ DateTime profile specified in XEP-0082.
+ @return: The datetime parsed from the input string.
+ @raise ValueError: if the input string is not correctly formatted.
+ """
+ # CCYY-MM-DDThh:mm:ss[.sss]TZD
+
+ value, microsecond = __parse_fraction_of_a_second(value)
+
+ result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
+
+ if microsecond is not None:
+ result = result.replace(microsecond=microsecond)
+
+ return result
+
+
+def format_time(value: Optional[time] = None, include_microsecond: bool = False) -> str:
+ """
+ @param value: The time to format. Defaults to the current time in the UTC timezone.
+ @param include_microsecond: Include the microsecond of the time in the output.
+ @return: The time formatted according to the Time profile specified in XEP-0082.
+
+ @warning: Since accurate timezone conversion requires the date to be known, this
+ function cannot convert input times to UTC before formatting. This means that
+ geographical information of the sender may be leaked if a time in local timezone
+ is formatted. Thus, when passing a time to format, it is advised to pass the time
+ in UTC if possible.
+ """
+ # hh:mm:ss[.sss][TZD]
+
+ if value is None:
+ # There is no time.now() method as one might expect, but the current time can be
+ # extracted from a datetime object including time zone information.
+ value = datetime.now(timezone.utc).timetz()
+
+ # The format created by time.isoformat complies with the XEP-0082 Time profile.
+ return value.isoformat("auto" if include_microsecond else "seconds")
+
+
+def parse_time(value: str) -> time:
+ """
+ @param value: A string containing time information formatted according to the Time
+ profile specified in XEP-0082.
+ @return: The time parsed from the input string.
+ @raise ValueError: if the input string is not correctly formatted.
+ """
+ # hh:mm:ss[.sss][TZD]
+
+ value, microsecond = __parse_fraction_of_a_second(value)
+
+ # The format parsed by time.fromisoformat mostly complies with the XEP-0082 Time
+ # profile, except that it doesn't handle the letter Z as time zone information for
+ # UTC. This can be fixed with a simple string replacement of 'Z' with "+00:00", which
+ # is another way to represent UTC.
+ result = time.fromisoformat(value.replace('Z', "+00:00"))
+
+ if microsecond is not None:
+ result = result.replace(microsecond=microsecond)
+
+ return result
diff --git a/sat/tools/utils.py b/sat/tools/utils.py
--- a/sat/tools/utils.py
+++ b/sat/tools/utils.py
@@ -34,6 +34,7 @@
from twisted.internet import defer
from sat.core.constants import Const as C
from sat.core.log import getLogger
+from sat.tools.datetime import format_date, format_datetime
log = getLogger(__name__)
@@ -146,18 +147,16 @@
to avoid reveling the timezone, we always return UTC dates
the string returned by this method is valid with RFC 3339
+ this function redirects to the functions in the :mod:`sat.tools.datetime` module
@param timestamp(None, float): posix timestamp. If None current time will be used
@param with_time(bool): if True include the time
@return(unicode): XEP-0082 formatted date and time
"""
- template_date = "%Y-%m-%d"
- template_time = "%H:%M:%SZ"
- template = (
- "{}T{}".format(template_date, template_time) if with_time else template_date
+ dtime = datetime.datetime.utcfromtimestamp(
+ time.time() if timestamp is None else timestamp
)
- return datetime.datetime.utcfromtimestamp(
- time.time() if timestamp is None else timestamp
- ).strftime(template)
+
+ return format_datetime(dtime) if with_time else format_date(dtime.date())
def generatePassword(vocabulary=None, size=20):
diff --git a/tests/unit/test_plugin_xep_0082.py b/tests/unit/test_plugin_xep_0082.py
new file mode 100644
--- /dev/null
+++ b/tests/unit/test_plugin_xep_0082.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python3
+
+# Tests for Libervia's XMPP Date and Time Profile formatting and parsing plugin
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Type-check with `mypy --strict`
+# Lint with `pylint`
+
+from datetime import date, datetime, time, timezone
+
+import pytest
+
+from sat.plugins.plugin_xep_0082 import XEP_0082
+
+
+__all__ = [ # pylint: disable=unused-variable
+ "test_date_formatting",
+ "test_date_parsing",
+ "test_datetime_formatting",
+ "test_datetime_parsing",
+ "test_time_formatting",
+ "test_time_parsing"
+]
+
+
+def test_date_formatting() -> None: # pylint: disable=missing-function-docstring
+ # The following tests don't check the output of format_date directly; instead, they
+ # assume that parse_date rejects incorrect formatting.
+
+ # The date version 0.1 of XEP-0082 was published
+ value = date(2003, 4, 21)
+
+ # Check that the parsed date equals the formatted date
+ assert XEP_0082.parse_date(XEP_0082.format_date(value)) == value
+
+ # Check that a date instance is returned when format_date is called without an
+ # explicit input date
+ assert isinstance(XEP_0082.parse_date(XEP_0082.format_date()), date)
+
+
+def test_date_parsing() -> None: # pylint: disable=missing-function-docstring
+ # There isn't really a point in testing much more than this
+ assert XEP_0082.parse_date("2003-04-21") == date(2003, 4, 21)
+
+
+def test_datetime_formatting() -> None: # pylint: disable=missing-function-docstring
+ # The following tests don't check the output of format_datetime directly; instead,
+ # they assume that parse_datetime rejects incorrect formatting.
+
+ # A datetime in UTC
+ value = datetime(1234, 5, 6, 7, 8, 9, 123456, timezone.utc)
+
+ # Check that the parsed datetime equals the formatted datetime except for the
+ # microseconds
+ parsed = XEP_0082.parse_datetime(XEP_0082.format_datetime(value))
+ assert parsed != value
+ assert parsed.replace(microsecond=123456) == value
+
+ # Check that the parsed datetime equals the formatted datetime including microseconds
+ assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value, True)) == value
+
+ # A datetime in some other timezone, from the Python docs of the datetime module
+ value = datetime.fromisoformat("2011-11-04T00:05:23+04:00")
+
+ # Check that the parsed datetime equals the formatted datetime with or without
+ # microseconds
+ assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value)) == value
+ assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value, True)) == value
+
+ # Check that the datetime was converted to UTC while formatting
+ assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value)).tzinfo == timezone.utc
+
+ # Check that a datetime instance is returned when format_datetime is called without an
+ # explicit input
+ # datetime
+ assert isinstance(XEP_0082.parse_datetime(XEP_0082.format_datetime()), datetime)
+ assert isinstance(XEP_0082.parse_datetime(XEP_0082.format_datetime(
+ include_microsecond=True
+ )), datetime)
+ assert XEP_0082.parse_datetime(XEP_0082.format_datetime()).tzinfo == timezone.utc
+
+
+def test_datetime_parsing() -> None: # pylint: disable=missing-function-docstring
+ # Datetime of the first human steps on the Moon (UTC)
+ value = datetime(1969, 7, 21, 2, 56, 15, tzinfo=timezone.utc)
+
+ # With timezone 'Z', without a fraction of a second
+ assert XEP_0082.parse_datetime("1969-07-21T02:56:15Z") == value
+
+ # With timezone '+04:00', without a fraction of a second
+ assert XEP_0082.parse_datetime("1969-07-21T06:56:15+04:00") == value
+
+ # With timezone '-05:00', without a fraction of a second
+ assert XEP_0082.parse_datetime("1969-07-20T21:56:15-05:00") == value
+
+ # Without timezone, without a fraction of a second
+ with pytest.raises(ValueError):
+ XEP_0082.parse_datetime("1969-07-21T02:56:15")
+
+ # With timezone 'Z', with a fraction of a second consisting of two digits
+ assert XEP_0082.parse_datetime("1969-07-21T02:56:15.12Z") == \
+ value.replace(microsecond=120000)
+
+ # With timezone 'Z', with a fraction of a second consisting of nine digits
+ assert XEP_0082.parse_datetime("1969-07-21T02:56:15.123456789Z") == \
+ value.replace(microsecond=123456)
+
+ # With timezone '+04:00', with a fraction of a second consisting of six digits
+ assert XEP_0082.parse_datetime("1969-07-21T06:56:15.123456+04:00") == \
+ value.replace(microsecond=123456)
+
+ # With timezone '-05:00', with a fraction of a second consisting of zero digits
+ assert XEP_0082.parse_datetime("1969-07-20T21:56:15.-05:00") == value
+
+ # Without timezone, with a fraction of a second consisting of six digits
+ with pytest.raises(ValueError):
+ XEP_0082.parse_datetime("1969-07-21T02:56:15.123456")
+
+
+def test_time_formatting() -> None: # pylint: disable=missing-function-docstring
+ # The following tests don't check the output of format_time directly; instead, they
+ # assume that parse_time rejects incorrect formatting.
+
+ # A time in UTC
+ value = time(12, 34, 56, 789012, timezone.utc)
+
+ # Check that the parsed time equals the formatted time except for the microseconds
+ parsed = XEP_0082.parse_time(XEP_0082.format_time(value))
+ assert parsed != value
+ assert parsed.replace(microsecond=789012) == value
+
+ # Check that the parsed time equals the formatted time including microseconds
+ assert XEP_0082.parse_time(XEP_0082.format_time(value, True)) == value
+
+ # A time in some other timezone, from the Python docs of the datetime module
+ value = time.fromisoformat("04:23:01+04:00")
+
+ # Check that the parsed time equals the formatted time with or without microseconds
+ assert XEP_0082.parse_time(XEP_0082.format_time(value)) == value
+ assert XEP_0082.parse_time(XEP_0082.format_time(value, True)) == value
+
+ # Check that the time has retained its timezone
+ assert XEP_0082.parse_time(XEP_0082.format_time(value)).tzinfo == value.tzinfo
+
+ # A time without timezone information, from the Python docs of the datetime module
+ value = time.fromisoformat("04:23:01")
+
+ # Check that the parsed time doesn't have timezone information either
+ assert XEP_0082.parse_time(XEP_0082.format_time(value)).tzinfo is None
+
+ # Check that a time instance is returned when format_time is called without an
+ # explicit input date
+ assert isinstance(XEP_0082.parse_time(XEP_0082.format_time()), time)
+ assert isinstance(XEP_0082.parse_time(XEP_0082.format_time(
+ include_microsecond=True
+ )), time)
+ assert XEP_0082.parse_time(XEP_0082.format_time()).tzinfo == timezone.utc
+
+
+def test_time_parsing() -> None: # pylint: disable=missing-function-docstring
+ # Time for tea
+ value = time(16, 0, 0, tzinfo=timezone.utc)
+
+ # With timezone 'Z', without a fraction of a second
+ assert XEP_0082.parse_time("16:00:00Z") == value
+
+ # With timezone '+04:00', without a fraction of a second
+ assert XEP_0082.parse_time("20:00:00+04:00") == value
+
+ # With timezone '-05:00', without a fraction of a second
+ assert XEP_0082.parse_time("11:00:00-05:00") == value
+
+ # Without a timezone, without a fraction of a second
+ assert XEP_0082.parse_time("16:00:00") == value.replace(tzinfo=None)
+
+ # With timezone 'Z', with a fraction of a second consisting of two digits
+ assert XEP_0082.parse_time("16:00:00.12Z") == value.replace(microsecond=120000)
+
+ # With timezone 'Z', with a fraction of a second consisting of nine digits
+ assert XEP_0082.parse_time("16:00:00.123456789Z") == value.replace(microsecond=123456)
+
+ # With timezone '+04:00', with a fraction of a second consisting of six digits
+ assert XEP_0082.parse_time("20:00:00.123456+04:00") == \
+ value.replace(microsecond=123456)
+
+ # With timezone '-05:00', with a fraction of a second consisting of zero digits
+ assert XEP_0082.parse_time("11:00:00.-05:00") == value
+
+ # Without a timezone, with a fraction of a second consisting of six digits
+ assert XEP_0082.parse_time("16:00:00.123456") == \
+ value.replace(microsecond=123456, tzinfo=None)
diff --git a/tests/unit/test_plugin_xep_0420.py b/tests/unit/test_plugin_xep_0420.py
new file mode 100644
--- /dev/null
+++ b/tests/unit/test_plugin_xep_0420.py
@@ -0,0 +1,581 @@
+#!/usr/bin/env python3
+
+# Tests for Libervia's Stanza Content Encryption plugin
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Type-check with `mypy --strict --disable-error-code no-untyped-call`
+# Lint with `pylint`
+
+from datetime import datetime, timezone
+from typing import Callable, Iterator, Optional, cast
+
+import pytest
+
+from sat.plugins.plugin_xep_0334 import NS_HINTS
+from sat.plugins.plugin_xep_0420 import (
+ NS_SCE, XEP_0420, AffixVerificationFailed, ProfileRequirementsNotMet, SCEAffixPolicy,
+ SCECustomAffix, SCEProfile
+)
+from sat.tools.xml_tools import ElementParser
+from twisted.words.xish import domish
+
+
+__all__ = [ # pylint: disable=unused-variable
+ "test_unpack_matches_original",
+ "test_affixes_included",
+ "test_all_affixes_verified",
+ "test_incomplete_affixes",
+ "test_rpad_affix",
+ "test_time_affix",
+ "test_to_affix",
+ "test_from_affix",
+ "test_custom_affixes",
+ "test_namespace_conversion",
+ "test_non_encryptable_elements",
+ "test_schema_validation"
+]
+
+
+string_to_domish = cast(Callable[[str], domish.Element], ElementParser())
+
+
+class CustomAffixImpl(SCECustomAffix):
+ """
+ A simple custom affix implementation for testing purposes. Verifies the full JIDs of
+ both sender and recipient.
+
+ @warning: This is just an example, an affix element like this might not make sense due
+ to potentially allowed modifications of recipient/sender full JIDs (I don't know
+ enough about XMPP routing to know whether full JIDs are always left untouched by
+ the server).
+ """
+
+ @property
+ def element_name(self) -> str:
+ return "full-jids"
+
+ @property
+ def element_schema(self) -> str:
+ return """<xs:element name="full-jids">
+ <xs:complexType>
+ <xs:attribute name="recipient" type="xs:string"/>
+ <xs:attribute name="sender" type="xs:string"/>
+ </xs:complexType>
+ </xs:element>"""
+
+ def create(self, stanza: domish.Element) -> domish.Element:
+ recipient = cast(Optional[str], stanza.getAttribute("to", None))
+ sender = cast(Optional[str], stanza.getAttribute("from", None))
+
+ if recipient is None or sender is None:
+ raise ValueError(
+ "Stanza doesn't have ``to`` and ``from`` attributes required by the"
+ " full-jids custom affix."
+ )
+
+ element = domish.Element((NS_SCE, "full-jids"))
+ element["recipient"] = recipient
+ element["sender"] = sender
+ return element
+
+ def verify(self, stanza: domish.Element, element: domish.Element) -> None:
+ recipient_target = element["recipient"]
+ recipient_actual = stanza.getAttribute("to")
+
+ sender_target = element["sender"]
+ sender_actual = stanza.getAttribute("from")
+
+ if recipient_actual != recipient_target or sender_actual != sender_target:
+ raise AffixVerificationFailed(
+ f"Full JIDs differ. Recipient: actual={recipient_actual} vs."
+ f" target={recipient_target}; Sender: actual={sender_actual} vs."
+ f" target={sender_target}"
+ )
+
+
+def test_unpack_matches_original() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.OPTIONAL,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.OPTIONAL,
+ custom_policies={ CustomAffixImpl(): SCEAffixPolicy.NOT_NEEDED }
+ )
+
+ stanza_string = (
+ '<message from="foo@example.com" to="bar@example.com"><body>Test with both a body'
+ ' and some other custom element.</body><custom xmlns="urn:xmpp:example:0"'
+ ' test="matches-original">some more content</custom></message>'
+ )
+
+ stanza = string_to_domish(stanza_string)
+
+ envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
+
+ # The stanza should not have child elements any more
+ assert len(list(stanza.elements())) == 0
+
+ XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+
+ # domish.Element doesn't override __eq__, thus we compare the .toXml() strings here in
+ # the hope that serialization for an example as small as this is unique enough to be
+ # compared that way.
+ assert stanza.toXml() == string_to_domish(stanza_string).toXml()
+
+
+def test_affixes_included() -> None: # pylint: disable=missing-function-docstring
+ custom_affix = CustomAffixImpl()
+
+ profile = SCEProfile(
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.OPTIONAL,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.OPTIONAL,
+ custom_policies={ custom_affix: SCEAffixPolicy.OPTIONAL }
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>
+ Make sure that both the REQUIRED and the OPTIONAL affixes are included.
+ </body>
+ </message>""")
+
+ affix_values = XEP_0420.unpack_stanza(
+ profile,
+ stanza,
+ XEP_0420.pack_stanza(profile, stanza)
+ )
+
+ assert affix_values.rpad is not None
+ assert affix_values.timestamp is not None
+ assert affix_values.recipient is None
+ assert affix_values.sender is not None
+ assert custom_affix in affix_values.custom
+
+
+def test_all_affixes_verified() -> None: # pylint: disable=missing-function-docstring
+ packing_profile = SCEProfile(
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.REQUIRED,
+ custom_policies={}
+ )
+
+ unpacking_profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>
+ When unpacking, all affixes are loaded, even those marked as NOT_NEEDED.
+ </body>
+ </message>""")
+
+ envelope_serialized = XEP_0420.pack_stanza(packing_profile, stanza)
+
+ affix_values = XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized)
+
+ assert affix_values.rpad is not None
+ assert affix_values.timestamp is not None
+ assert affix_values.recipient is not None
+ assert affix_values.sender is not None
+
+ # When unpacking, all affixes are verified, even if they are NOT_NEEDED by the profile
+ stanza = string_to_domish(
+ """<message from="fooo@example.com" to="baz@example.com"></message>"""
+ )
+
+ with pytest.raises(AffixVerificationFailed):
+ XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized)
+
+
+def test_incomplete_affixes() -> None: # pylint: disable=missing-function-docstring
+ packing_profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ unpacking_profile = SCEProfile(
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.REQUIRED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>Check that all affixes REQUIRED by the profile are present.</body>
+ </message>""")
+
+ with pytest.raises(ProfileRequirementsNotMet):
+ XEP_0420.unpack_stanza(
+ unpacking_profile,
+ stanza,
+ XEP_0420.pack_stanza(packing_profile, stanza)
+ )
+
+ # Do the same but with a custom affix missing
+ custom_affix = CustomAffixImpl()
+
+ packing_profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={ custom_affix: SCEAffixPolicy.NOT_NEEDED }
+ )
+
+ unpacking_profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED }
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>
+ Check that all affixes REQUIRED by the profile are present, including custom
+ affixes.
+ </body>
+ </message>""")
+
+ with pytest.raises(ProfileRequirementsNotMet):
+ XEP_0420.unpack_stanza(
+ unpacking_profile,
+ stanza,
+ XEP_0420.pack_stanza(packing_profile, stanza)
+ )
+
+
+def test_rpad_affix() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ for _ in range(100):
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>OK</body>
+ </message>""")
+
+ affix_values = XEP_0420.unpack_stanza(
+ profile,
+ stanza,
+ XEP_0420.pack_stanza(profile, stanza)
+ )
+
+ # Test that the rpad exists and that the content elements are always padded to at
+ # least 53 characters
+ assert affix_values.rpad is not None
+ assert len(affix_values.rpad) >= 53 - len("<body xmlns='jabber:client'>OK</body>")
+
+
+def test_time_affix() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish(
+ """<message from="foo@example.com" to="bar@example.com"></message>"""
+ )
+
+ envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
+ <content>
+ <body xmlns="jabber:client">
+ The time affix is only parsed and not otherwise verified. Not much to test
+ here.
+ </body>
+ </content>
+ <time stamp="1969-07-21T02:56:15Z"/>
+ </envelope>""".encode("utf-8")
+
+ affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+ assert affix_values.timestamp == datetime(1969, 7, 21, 2, 56, 15, tzinfo=timezone.utc)
+
+
+def test_to_affix() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.REQUIRED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>Check that the ``to`` affix is correctly added.</body>
+ </message>""")
+
+ envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
+ affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+ assert affix_values.recipient is not None
+ assert affix_values.recipient.userhost() == "bar@example.com"
+
+ # Check that a mismatch in recipient bare JID causes an exception to be raised
+ stanza = string_to_domish(
+ """<message from="foo@example.com" to="baz@example.com"></message>"""
+ )
+
+ with pytest.raises(AffixVerificationFailed):
+ XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+
+ # Check that only the bare JID matters
+ stanza = string_to_domish(
+ """<message from="foo@example.com" to="bar@example.com/device"></message>"""
+ )
+
+ affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+ assert affix_values.recipient is not None
+ assert affix_values.recipient.userhost() == "bar@example.com"
+
+ stanza = string_to_domish("""<message from="foo@example.com">
+ <body>
+ Check that a missing "to" attribute on the stanza fails stanza packing.
+ </body>
+ </message>""")
+
+ with pytest.raises(ValueError):
+ XEP_0420.pack_stanza(profile, stanza)
+
+
+def test_from_affix() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.REQUIRED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>Check that the ``from`` affix is correctly added.</body>
+ </message>""")
+
+ envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
+ affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+ assert affix_values.sender is not None
+ assert affix_values.sender.userhost() == "foo@example.com"
+
+ # Check that a mismatch in sender bare JID causes an exception to be raised
+ stanza = string_to_domish(
+ """<message from="fooo@example.com" to="bar@example.com"></message>"""
+ )
+
+ with pytest.raises(AffixVerificationFailed):
+ XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+
+ # Check that only the bare JID matters
+ stanza = string_to_domish(
+ """<message from="foo@example.com/device" to="bar@example.com"></message>"""
+ )
+
+ affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+ assert affix_values.sender is not None
+ assert affix_values.sender.userhost() == "foo@example.com"
+
+ stanza = string_to_domish("""<message to="bar@example.com">
+ <body>
+ Check that a missing "from" attribute on the stanza fails stanza packing.
+ </body>
+ </message>""")
+
+ with pytest.raises(ValueError):
+ XEP_0420.pack_stanza(profile, stanza)
+
+
+def test_custom_affixes() -> None: # pylint: disable=missing-function-docstring
+ custom_affix = CustomAffixImpl()
+
+ packing_profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED }
+ )
+
+ unpacking_profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>
+ If a custom affix is included in the envelope, but not excpected by the
+ recipient, the schema validation should fail.
+ </body>
+ </message>""")
+
+ with pytest.raises(ValueError):
+ XEP_0420.unpack_stanza(
+ unpacking_profile,
+ stanza,
+ XEP_0420.pack_stanza(packing_profile, stanza)
+ )
+
+ profile = packing_profile
+
+ stanza = string_to_domish("""<message from="foo@example.com/device0"
+ to="bar@example.com/Libervia.123">
+ <body>The affix element should be returned as part of the affix values.</body>
+ </message>""")
+
+ affix_values = XEP_0420.unpack_stanza(
+ profile,
+ stanza,
+ XEP_0420.pack_stanza(profile, stanza)
+ )
+
+ assert custom_affix in affix_values.custom
+ assert affix_values.custom[custom_affix].getAttribute("recipient") == \
+ "bar@example.com/Libervia.123"
+ assert affix_values.custom[custom_affix].getAttribute("sender") == \
+ "foo@example.com/device0"
+
+
+def test_namespace_conversion() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ stanza = domish.Element((None, "message"))
+ stanza["from"] = "foo@example.com"
+ stanza["to"] = "bar@example.com"
+ stanza.addElement(
+ "body",
+ content=(
+ "This body element has namespace ``None``, which has to be replaced with"
+ " jabber:client."
+ )
+ )
+
+ envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
+ envelope = string_to_domish(envelope_serialized.decode("utf-8"))
+ content = next(cast(Iterator[domish.Element], envelope.elements(NS_SCE, "content")))
+
+ # The body should have been assigned ``jabber:client`` as its namespace
+ assert next(
+ cast(Iterator[domish.Element], content.elements("jabber:client", "body")),
+ None
+ ) is not None
+
+ XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+
+ # The body should still have ``jabber:client`` after unpacking
+ assert next(
+ cast(Iterator[domish.Element], stanza.elements("jabber:client", "body")),
+ None
+ ) is not None
+
+
+def test_non_encryptable_elements() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
+ <body>This stanza includes a store hint which must not be encrypted.</body>
+ <store xmlns="urn:xmpp:hints"/>
+ </message>""")
+
+ envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
+ envelope = string_to_domish(envelope_serialized.decode("utf-8"))
+ content = next(cast(Iterator[domish.Element], envelope.elements(NS_SCE, "content")))
+
+ # The store hint must not have been moved to the content element
+ assert next(
+ cast(Iterator[domish.Element], stanza.elements(NS_HINTS, "store")),
+ None
+ ) is not None
+
+ assert next(
+ cast(Iterator[domish.Element], content.elements(NS_HINTS, "store")),
+ None
+ ) is None
+
+ stanza = string_to_domish(
+ """<message from="foo@example.com" to="bar@example.com"></message>"""
+ )
+
+ envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
+ <content>
+ <body xmlns="jabber:client">
+ The store hint must not be moved to the stanza.
+ </body>
+ <store xmlns="urn:xmpp:hints"/>
+ </content>
+ </envelope>""".encode("utf-8")
+
+ XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
+
+ assert next(
+ cast(Iterator[domish.Element], stanza.elements(NS_HINTS, "store")),
+ None
+ ) is None
+
+
+def test_schema_validation() -> None: # pylint: disable=missing-function-docstring
+ profile = SCEProfile(
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ SCEAffixPolicy.NOT_NEEDED,
+ custom_policies={}
+ )
+
+ stanza = string_to_domish(
+ """<message from="foo@example.com" to="bar@example.com"></message>"""
+ )
+
+ envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
+ <content>
+ <body xmlns="jabber:client">
+ An unknwon affix should cause a schema validation error.
+ </body>
+ <store xmlns="urn:xmpp:hints"/>
+ </content>
+ <unknown-affix unknown-attr="unknown"/>
+ </envelope>""".encode("utf-8")
+
+ with pytest.raises(ValueError):
+ XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)