diff --git a/dev-requirements.txt b/dev-requirements.txt
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -6,3 +6,7 @@
pylint
pytest
pytest_twisted
+
+sh
+libervia-templates @ hg+https://repos.goffi.org/sat_templates
+libervia-web @ hg+https://repos.goffi.org/libervia-web
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -19,7 +19,7 @@
hyperlink==21.0.0
idna==3.3
incremental==21.3.0
-Jinja2==3.1.2
+Jinja2==3.0.3
langid==1.1.6
lxml==4.8.0
Mako==1.2.0
diff --git a/sat/plugins/plugin_xep_0374.py b/sat/plugins/plugin_xep_0374.py
--- a/sat/plugins/plugin_xep_0374.py
+++ b/sat/plugins/plugin_xep_0374.py
@@ -207,8 +207,6 @@
else:
# I'm not sure why this check is required, this code is copied from XEP-0384
if sender_jid.userhostJID() == client.jid.userhostJID():
- # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
- # like "to" isn't always set.
try:
feedback_jid = jid.JID(message_elt["to"])
except KeyError:
diff --git a/sat/plugins/plugin_xep_0384.py b/sat/plugins/plugin_xep_0384.py
--- a/sat/plugins/plugin_xep_0384.py
+++ b/sat/plugins/plugin_xep_0384.py
@@ -16,18 +16,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import base64
+from datetime import datetime
import enum
import logging
import time
-from typing import (
- Any, Callable, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast
-)
+from typing import \
+ Any, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast
import uuid
import xml.etree.ElementTree as ET
from xml.sax.saxutils import quoteattr
-from typing_extensions import Never, assert_never
+from typing_extensions import Final, Never, assert_never
from wokkel import muc, pubsub # type: ignore[import]
+import xmlschema
from sat.core import exceptions
from sat.core.constants import Const as C
@@ -43,7 +45,8 @@
from sat.plugins.plugin_xep_0163 import XEP_0163
from sat.plugins.plugin_xep_0334 import XEP_0334
from sat.plugins.plugin_xep_0359 import XEP_0359
-from sat.plugins.plugin_xep_0420 import XEP_0420, SCEAffixPolicy, SCEProfile
+from sat.plugins.plugin_xep_0420 import \
+ XEP_0420, SCEAffixPolicy, SCEAffixValues, SCEProfile
from sat.tools import xml_tools
from twisted.internet import defer
from twisted.words.protocols.jabber import error, jid
@@ -78,9 +81,6 @@
log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
-string_to_domish = cast(Callable[[str], domish.Element], xml_tools.ElementParser())
-
-
PLUGIN_INFO = {
C.PI_NAME: "OMEMO",
C.PI_IMPORT_NAME: "XEP-0384",
@@ -134,68 +134,10 @@
message_uid: str
-# TODO: Convert without serialization/parsing
-# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took
-# around 0.6 seconds on my setup.
-def etree_to_domish(element: ET.Element) -> domish.Element:
- """
- @param element: An ElementTree element.
- @return: The ElementTree element converted to a domish element.
- """
-
- return string_to_domish(ET.tostring(element, encoding="unicode"))
-
-
-# TODO: Convert without serialization/parsing
-# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took less
-# than one second on my setup.
-def domish_to_etree(element: domish.Element) -> ET.Element:
- """
- @param element: A domish element.
- @return: The domish element converted to an ElementTree element.
- """
-
- return ET.fromstring(element.toXml())
-
-
-def domish_to_etree2(element: domish.Element) -> ET.Element:
- """
- WIP
- """
-
- element_name = element.name
- if element.uri is not None:
- element_name = "{" + element.uri + "}" + element_name
-
- attrib: Dict[str, str] = {}
- for qname, value in element.attributes.items():
- attribute_name = qname[1] if isinstance(qname, tuple) else qname
- attribute_namespace = qname[0] if isinstance(qname, tuple) else None
- if attribute_namespace is not None:
- attribute_name = "{" + attribute_namespace + "}" + attribute_name
-
- attrib[attribute_name] = value
-
- result = ET.Element(element_name, attrib)
-
- last_child: Optional[ET.Element] = None
- for child in element.children:
- if isinstance(child, str):
- if last_child is None:
- result.text = child
- else:
- last_child.tail = child
- else:
- last_child = domish_to_etree2(child)
- result.append(last_child)
-
- return result
-
-
@enum.unique
class TrustLevel(enum.Enum):
"""
- The trust levels required for BTBV and manual trust.
+ The trust levels required for ATM and BTBV.
"""
TRUSTED: str = "TRUSTED"
@@ -203,20 +145,6 @@
UNDECIDED: str = "UNDECIDED"
DISTRUSTED: str = "DISTRUSTED"
- def to_omemo_trust_level(self) -> omemo.TrustLevel:
- """
- @return: This custom trust level evaluated to one of the OMEMO trust levels.
- """
-
- if self is TrustLevel.TRUSTED or self is TrustLevel.BLINDLY_TRUSTED:
- return omemo.TrustLevel.TRUSTED
- if self is TrustLevel.UNDECIDED:
- return omemo.TrustLevel.UNDECIDED
- if self is TrustLevel.DISTRUSTED:
- return omemo.TrustLevel.DISTRUSTED
-
- return assert_never(self)
-
TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices"
OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist"
@@ -431,7 +359,8 @@
f" {namespace}: Unexpected number of items retrieved: {len(items)}."
)
- element = next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
+ element = \
+ next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
if element is None:
raise omemo.BundleDownloadFailed(
f"Bundle download failed for {bare_jid}: {device_id} under namespace"
@@ -447,6 +376,433 @@
) from e
+# ATM only supports protocols based on SCE, which is currently only omemo:2, and relies on
+# so many implementation details of the encryption protocol that it makes more sense to
+# add ATM to the OMEMO plugin directly instead of having it a separate Libervia plugin.
+NS_TM: Final = "urn:xmpp:tm:1"
+NS_ATM: Final = "urn:xmpp:atm:1"
+
+
+TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema("""<?xml version='1.0' encoding='UTF-8'?>
+<xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema'
+ targetNamespace='urn:xmpp:tm:1'
+ xmlns='urn:xmpp:tm:1'
+ elementFormDefault='qualified'>
+
+ <xs:element name='trust-message'>
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref='key-owner' minOccurs='1' maxOccurs='unbounded'/>
+ </xs:sequence>
+ <xs:attribute name='usage' type='xs:string' use='required'/>
+ <xs:attribute name='encryption' type='xs:string' use='required'/>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name='key-owner'>
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element
+ name='trust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
+ <xs:element
+ name='distrust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
+ </xs:sequence>
+ <xs:attribute name='jid' type='xs:string' use='required'/>
+ </xs:complexType>
+ </xs:element>
+</xs:schema>
+""")
+
+
+# This is compatible with omemo:2's SCE profile
+TM_SCE_PROFILE = SCEProfile(
+ rpad_policy=SCEAffixPolicy.REQUIRED,
+ time_policy=SCEAffixPolicy.REQUIRED,
+ to_policy=SCEAffixPolicy.OPTIONAL,
+ from_policy=SCEAffixPolicy.OPTIONAL,
+ custom_policies={}
+)
+
+
+class TrustUpdate(NamedTuple):
+ # pylint: disable=invalid-name
+ """
+ An update to the trust status of an identity key, used by Automatic Trust Management.
+ """
+
+ target_jid: jid.JID
+ target_key: bytes
+ target_trust: bool
+
+
+class TrustMessageCacheEntry(NamedTuple):
+ # pylint: disable=invalid-name
+ """
+ An entry in the trust message cache used by ATM.
+ """
+
+ sender_jid: jid.JID
+ sender_key: bytes
+ timestamp: datetime
+ trust_update: TrustUpdate
+
+
+class PartialTrustMessage(NamedTuple):
+ # pylint: disable=invalid-name
+ """
+ A structure representing a partial trust message, used by :func:`send_trust_messages`
+ to build trust messages.
+ """
+
+ recipient_jid: jid.JID
+ updated_jid: jid.JID
+ trust_updates: FrozenSet[TrustUpdate]
+
+
+async def manage_trust_message_cache(
+ client: SatXMPPClient,
+ session_manager: omemo.SessionManager,
+ applied_trust_updates: FrozenSet[TrustUpdate]
+) -> None:
+ """Manage the ATM trust message cache after trust updates have been applied.
+
+ @param client: The client this operation runs under.
+ @param session_manager: The session manager to use.
+ @param applied_trust_updates: The trust updates that have already been applied,
+ triggering this cache management run.
+ """
+
+ trust_message_cache = persistent.LazyPersistentBinaryDict(
+ "XEP-0384/TM",
+ client.profile
+ )
+
+ # Load cache entries
+ cache_entries = cast(
+ Set[TrustMessageCacheEntry],
+ await trust_message_cache.get("cache", set())
+ )
+
+ # Expire cache entries that were overwritten by the applied trust updates
+ cache_entries_by_target = {
+ (
+ cache_entry.trust_update.target_jid.userhostJID(),
+ cache_entry.trust_update.target_key
+ ): cache_entry
+ for cache_entry
+ in cache_entries
+ }
+
+ for trust_update in applied_trust_updates:
+ cache_entry = cache_entries_by_target.get(
+ (trust_update.target_jid.userhostJID(), trust_update.target_key),
+ None
+ )
+
+ if cache_entry is not None:
+ cache_entries.remove(cache_entry)
+
+ # Apply cached Trust Messages by newly trusted devices
+ new_trust_updates: Set[TrustUpdate] = set()
+
+ for trust_update in applied_trust_updates:
+ if trust_update.target_trust:
+ # Iterate over a copy such that cache_entries can be modified
+ for cache_entry in set(cache_entries):
+ if (
+ cache_entry.sender_jid.userhostJID()
+ == trust_update.target_jid.userhostJID()
+ and cache_entry.sender_key == trust_update.target_key
+ ):
+ trust_level = (
+ TrustLevel.TRUSTED
+ if cache_entry.trust_update.target_trust
+ else TrustLevel.DISTRUSTED
+ )
+
+ # Apply the trust update
+ await session_manager.set_trust(
+ cache_entry.trust_update.target_jid.userhost(),
+ cache_entry.trust_update.target_key,
+ trust_level.name
+ )
+
+ # Track the fact that this trust update has been applied
+ new_trust_updates.add(cache_entry.trust_update)
+
+ # Remove the corresponding cache entry
+ cache_entries.remove(cache_entry)
+
+ # Store the updated cache entries
+ await trust_message_cache.force("cache", cache_entries)
+
+ # TODO: Notify the user ("feedback") about automatically updated trust?
+
+ if len(new_trust_updates) > 0:
+ # If any trust has been updated, recursively perform another run of cache
+ # management
+ await manage_trust_message_cache(
+ client,
+ session_manager,
+ frozenset(new_trust_updates)
+ )
+
+
+async def get_trust_as_trust_updates(
+ session_manager: omemo.SessionManager,
+ target_jid: jid.JID
+) -> FrozenSet[TrustUpdate]:
+ """Get the trust status of all known keys of a JID as trust updates for use with ATM.
+
+ @param session_manager: The session manager to load the trust from.
+ @param target_jid: The JID to load the trust for.
+ @return: The trust updates encoding the trust status of all known keys of the JID that
+ are either explicitly trusted or distrusted. Undecided keys are not included in
+ the trust updates.
+ """
+
+ devices = await session_manager.get_device_information(target_jid.userhost())
+
+ trust_updates: Set[TrustUpdate] = set()
+
+ for device in devices:
+ trust_level = TrustLevel(device.trust_level_name)
+ target_trust: bool
+
+ if trust_level is TrustLevel.TRUSTED:
+ target_trust = True
+ elif trust_level is TrustLevel.DISTRUSTED:
+ target_trust = False
+ else:
+ # Skip devices that are not explicitly trusted or distrusted
+ continue
+
+ trust_updates.add(TrustUpdate(
+ target_jid=target_jid.userhostJID(),
+ target_key=device.identity_key,
+ target_trust=target_trust
+ ))
+
+ return frozenset(trust_updates)
+
+
+async def send_trust_messages(
+ client: SatXMPPClient,
+ session_manager: omemo.SessionManager,
+ applied_trust_updates: FrozenSet[TrustUpdate]
+) -> None:
+ """Send information about updated trust to peers via ATM (XEP-0450).
+
+ @param client: The client.
+ @param session_manager: The session manager.
+ @param applied_trust_updates: The trust updates that have already been applied, to
+ notify other peers about.
+ """
+ # NOTE: This currently sends information about oldmemo trust too. This is not
+ # specified and experimental, but since twomemo and oldmemo share the same identity
+ # keys and trust systems, this could be a cool side effect.
+
+ # Send Trust Messages for newly trusted and distrusted devices
+ own_jid = client.jid.userhostJID()
+ own_trust_updates = await get_trust_as_trust_updates(session_manager, own_jid)
+
+ # JIDs of which at least one device's trust has been updated
+ updated_jids = frozenset({
+ trust_update.target_jid.userhostJID()
+ for trust_update
+ in applied_trust_updates
+ })
+
+ trust_messages: Set[PartialTrustMessage] = set()
+
+ for updated_jid in updated_jids:
+ # Get the trust updates for that JID
+ trust_updates = frozenset({
+ trust_update for trust_update in applied_trust_updates
+ if trust_update.target_jid.userhostJID() == updated_jid
+ })
+
+ if updated_jid == own_jid:
+ # If the own JID is updated, _all_ peers have to be notified
+ # TODO: Using my author's privilege here to shamelessly access private fields
+ # and storage keys until I've added public API to get a list of peers to
+ # python-omemo.
+ storage: omemo.Storage = getattr(session_manager, "_SessionManager__storage")
+ peer_jids = frozenset({
+ jid.JID(bare_jid).userhostJID() for bare_jid in (await storage.load_list(
+ f"/{OMEMO.NS_TWOMEMO}/bare_jids",
+ str
+ )).maybe([])
+ })
+
+ if len(peer_jids) == 0:
+ # If there are no peers to notify, notify our other devices about the
+ # changes directly
+ trust_messages.add(PartialTrustMessage(
+ recipient_jid=own_jid,
+ updated_jid=own_jid,
+ trust_updates=trust_updates
+ ))
+ else:
+ # Otherwise, notify all peers about the changes in trust and let carbons
+ # handle the copy to our own JID
+ for peer_jid in peer_jids:
+ trust_messages.add(PartialTrustMessage(
+ recipient_jid=peer_jid,
+ updated_jid=own_jid,
+ trust_updates=trust_updates
+ ))
+
+ # Also send full trust information about _every_ peer to our newly
+ # trusted devices
+ peer_trust_updates = \
+ await get_trust_as_trust_updates(session_manager, peer_jid)
+
+ trust_messages.add(PartialTrustMessage(
+ recipient_jid=own_jid,
+ updated_jid=peer_jid,
+ trust_updates=peer_trust_updates
+ ))
+
+ # Send information about our own devices to our newly trusted devices
+ trust_messages.add(PartialTrustMessage(
+ recipient_jid=own_jid,
+ updated_jid=own_jid,
+ trust_updates=own_trust_updates
+ ))
+ else:
+ # Notify our other devices about the changes in trust
+ trust_messages.add(PartialTrustMessage(
+ recipient_jid=own_jid,
+ updated_jid=updated_jid,
+ trust_updates=trust_updates
+ ))
+
+ # Send a summary of our own trust to newly trusted devices
+ trust_messages.add(PartialTrustMessage(
+ recipient_jid=updated_jid,
+ updated_jid=own_jid,
+ trust_updates=own_trust_updates
+ ))
+
+ # All trust messages prepared. Merge all trust messages directed at the same
+ # recipient.
+ recipient_jids = { trust_message.recipient_jid for trust_message in trust_messages }
+
+ for recipient_jid in recipient_jids:
+ updated: Dict[jid.JID, Set[TrustUpdate]] = {}
+
+ for trust_message in trust_messages:
+ # Merge trust messages directed at that recipient
+ if trust_message.recipient_jid == recipient_jid:
+ # Merge the trust updates
+ updated[trust_message.updated_jid] = \
+ updated.get(trust_message.updated_jid, set())
+
+ updated[trust_message.updated_jid] |= trust_message.trust_updates
+
+ # Build the trust message
+ trust_message_elt = domish.Element((NS_TM, "trust-message"))
+ trust_message_elt["usage"] = NS_ATM
+ trust_message_elt["encryption"] = twomemo.twomemo.NAMESPACE
+
+ for updated_jid, trust_updates in updated.items():
+ key_owner_elt = trust_message_elt.addElement((NS_TM, "key-owner"))
+ key_owner_elt["jid"] = updated_jid.userhost()
+
+ for trust_update in trust_updates:
+ serialized_identity_key = \
+ base64.b64encode(trust_update.target_key).decode("ASCII")
+
+ if trust_update.target_trust:
+ key_owner_elt.addElement(
+ (NS_TM, "trust"),
+ content=serialized_identity_key
+ )
+ else:
+ key_owner_elt.addElement(
+ (NS_TM, "distrust"),
+ content=serialized_identity_key
+ )
+
+ # Finally, encrypt and send the trust message!
+ message_data = client.generateMessageXML(MessageData({
+ "from": own_jid,
+ "to": recipient_jid,
+ "uid": str(uuid.uuid4()),
+ "message": {},
+ "subject": {},
+ "type": C.MESS_TYPE_CHAT,
+ "extra": {},
+ "timestamp": time.time()
+ }))
+
+ message_data["xml"].addChild(trust_message_elt)
+
+ plaintext = XEP_0420.pack_stanza(TM_SCE_PROFILE, message_data["xml"])
+
+ feedback_jid = recipient_jid
+
+ # TODO: The following is mostly duplicate code
+ try:
+ messages, encryption_errors = await session_manager.encrypt(
+ frozenset({ own_jid.userhost(), recipient_jid.userhost() }),
+ { OMEMO.NS_TWOMEMO: plaintext },
+ backend_priority_order=[ OMEMO.NS_TWOMEMO ],
+ identifier=feedback_jid.userhost()
+ )
+ except Exception as e:
+ msg = _(
+ # pylint: disable=consider-using-f-string
+ "Can't encrypt message for {entities}: {reason}".format(
+ entities=', '.join({ own_jid.userhost(), recipient_jid.userhost() }),
+ reason=e
+ )
+ )
+ log.warning(msg)
+ client.feedback(feedback_jid, msg, {
+ C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+ })
+ raise e
+
+ if len(encryption_errors) > 0:
+ log.warning(
+ f"Ignored the following non-critical encryption errors:"
+ f" {encryption_errors}"
+ )
+
+ encrypted_errors_stringified = ", ".join([
+ f"device {err.device_id} of {err.bare_jid} under namespace"
+ f" {err.namespace}"
+ for err
+ in encryption_errors
+ ])
+
+ client.feedback(
+ feedback_jid,
+ D_(
+ "There were non-critical errors during encryption resulting in some"
+ " of your destinees' devices potentially not receiving the message."
+ " This happens when the encryption data/key material of a device is"
+ " incomplete or broken, which shouldn't happen for actively used"
+ " devices, and can usually be ignored. The following devices are"
+ f" affected: {encrypted_errors_stringified}."
+ )
+ )
+
+ message = next(
+ message for message in messages
+ if message.namespace == OMEMO.NS_TWOMEMO
+ )
+
+ # Add the encrypted element
+ message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(
+ twomemo.etree.serialize_message(message)
+ ))
+
+ await client.a_send(message_data["xml"])
+
+
def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]:
"""
@param sat: The SAT instance.
@@ -705,13 +1061,18 @@
if len(items) == 0:
return {}
- elif len(items) != 1:
+
+ if len(items) != 1:
raise omemo.DeviceListDownloadFailed(
f"Device list download failed for {bare_jid} under namespace"
f" {namespace}: Unexpected number of items retrieved: {len(items)}."
)
- element = next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
+ element = next(
+ iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))),
+ None
+ )
+
if element is None:
raise omemo.DeviceListDownloadFailed(
f"Device list download failed for {bare_jid} under namespace"
@@ -732,15 +1093,62 @@
raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
- @staticmethod
- def _evaluate_custom_trust_level(trust_level_name: str) -> omemo.TrustLevel:
+ async def _evaluate_custom_trust_level(
+ self,
+ device: omemo.DeviceInformation
+ ) -> omemo.TrustLevel:
+ # Get the custom trust level
try:
- return TrustLevel(trust_level_name).to_omemo_trust_level()
+ trust_level = TrustLevel(device.trust_level_name)
except ValueError as e:
raise omemo.UnknownTrustLevel(
- f"Unknown trust level name {trust_level_name}"
+ f"Unknown trust level name {device.trust_level_name}"
) from e
+ # The first three cases are a straight-forward mapping
+ if trust_level is TrustLevel.TRUSTED:
+ return omemo.TrustLevel.TRUSTED
+ if trust_level is TrustLevel.UNDECIDED:
+ return omemo.TrustLevel.UNDECIDED
+ if trust_level is TrustLevel.DISTRUSTED:
+ return omemo.TrustLevel.DISTRUSTED
+
+ # The blindly trusted case is more complicated, since its evaluation depends
+ # on the trust system and phase
+ if trust_level is TrustLevel.BLINDLY_TRUSTED:
+ # Get the name of the active trust system
+ trust_system = cast(str, sat.memory.getParamA(
+ PARAM_NAME,
+ PARAM_CATEGORY,
+ profile_key=profile
+ ))
+
+ # If the trust model is BTBV, blind trust is always enabled
+ if trust_system == "btbv":
+ return omemo.TrustLevel.TRUSTED
+
+ # If the trust model is ATM, blind trust is disabled in the second phase
+ # and counts as undecided
+ if trust_system == "atm":
+ # Find out whether we are in phase one or two
+ devices = await self.get_device_information(device.bare_jid)
+
+ phase_one = all(TrustLevel(device.trust_level_name) in {
+ TrustLevel.UNDECIDED,
+ TrustLevel.BLINDLY_TRUSTED
+ } for device in devices)
+
+ if phase_one:
+ return omemo.TrustLevel.TRUSTED
+
+ return omemo.TrustLevel.UNDECIDED
+
+ raise exceptions.InternalError(
+ f"Unknown trust system active: {trust_system}"
+ )
+
+ assert_never(trust_level)
+
async def _make_trust_decision(
self,
undecided: FrozenSet[omemo.DeviceInformation],
@@ -754,50 +1162,38 @@
# The feedback JID is transferred via the identifier
feedback_jid = jid.JID(identifier).userhostJID()
- # Get the name of the trust model to use
- trust_model = cast(str, sat.memory.getParamA(
- PARAM_NAME,
- PARAM_CATEGORY,
- profile_key=cast(str, client.profile)
- ))
-
- # Under the BTBV trust model, if at least one device of a bare JID is manually
- # trusted or distrusted, the trust model is "downgraded" to manual trust.
- # Thus, we can separate bare JIDs into two pools here, one pool of bare JIDs
- # for which BTBV is active, and one pool of bare JIDs for which manual trust
- # is used.
+ # Both the ATM and the BTBV trust models work with blind trust before the
+ # first manual verification is performed. Thus, we can separate bare JIDs into
+ # two pools here, one pool of bare JIDs for which blind trust is active, and
+ # one pool of bare JIDs for which manual trust is used instead.
bare_jids = { device.bare_jid for device in undecided }
- btbv_bare_jids: Set[str] = set()
+ blind_trust_bare_jids: Set[str] = set()
manual_trust_bare_jids: Set[str] = set()
- if trust_model == "btbv":
- # For each bare JID, decide whether BTBV or manual trust applies
- for bare_jid in bare_jids:
- # Get all known devices belonging to the bare JID
- devices = await self.get_device_information(bare_jid)
-
- # If the trust levels of all devices correspond to those used by BTBV,
- # BTBV applies. Otherwise, fall back to manual trust.
- if all(TrustLevel(device.trust_level_name) in {
- TrustLevel.UNDECIDED,
- TrustLevel.BLINDLY_TRUSTED
- } for device in devices):
- btbv_bare_jids.add(bare_jid)
- else:
- manual_trust_bare_jids.add(bare_jid)
-
- if trust_model == "manual":
- manual_trust_bare_jids = bare_jids
+ # For each bare JID, decide whether blind trust applies
+ for bare_jid in bare_jids:
+ # Get all known devices belonging to the bare JID
+ devices = await self.get_device_information(bare_jid)
+
+ # If the trust levels of all devices correspond to those used by blind
+ # trust, blind trust applies. Otherwise, fall back to manual trust.
+ if all(TrustLevel(device.trust_level_name) in {
+ TrustLevel.UNDECIDED,
+ TrustLevel.BLINDLY_TRUSTED
+ } for device in devices):
+ blind_trust_bare_jids.add(bare_jid)
+ else:
+ manual_trust_bare_jids.add(bare_jid)
# With the JIDs sorted into their respective pools, the undecided devices can
# be categorized too
blindly_trusted_devices = \
- { dev for dev in undecided if dev.bare_jid in btbv_bare_jids }
+ { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids }
manually_trusted_devices = \
{ dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids }
- # Blindly trust devices handled by BTBV
+ # Blindly trust devices handled by blind trust
if len(blindly_trusted_devices) > 0:
for device in blindly_trusted_devices:
await self.set_trust(
@@ -817,11 +1213,8 @@
feedback_jid,
D_(
"Not all destination devices are trusted, unknown devices will be"
- " blindly trusted due to the Blind Trust Before Verification"
- " policy. If you want a more secure workflow, please activate the"
- " \"manual\" policy in the settings' \"Security\" tab.\nFollowing"
- " devices have been automatically trusted:"
- f" {blindly_trusted_devices_stringified}."
+ " blindly trusted.\nFollowing devices have been automatically"
+ f" trusted: {blindly_trusted_devices_stringified}."
)
)
@@ -854,7 +1247,6 @@
if element is None:
raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}")
- # TODO: Untested
message_data = client.generateMessageXML(MessageData({
"from": client.jid,
"to": jid.JID(bare_jid),
@@ -959,19 +1351,40 @@
trust_ui_result
))
+ trust_updates: Set[TrustUpdate] = set()
+
for key, value in data_form_result.items():
if not key.startswith("trust_"):
continue
device = undecided_ordered[int(key[len("trust_"):])]
- trust = C.bool(value)
+ target_trust = C.bool(value)
+ trust_level = \
+ TrustLevel.TRUSTED if target_trust else TrustLevel.DISTRUSTED
await self.set_trust(
device.bare_jid,
device.identity_key,
- TrustLevel.TRUSTED.name if trust else TrustLevel.DISTRUSTED.name
+ trust_level.name
)
+ trust_updates.add(TrustUpdate(
+ target_jid=jid.JID(device.bare_jid).userhostJID(),
+ target_key=device.identity_key,
+ target_trust=target_trust
+ ))
+
+ # Check whether ATM is enabled and handle everything in case it is
+ trust_system = cast(str, sat.memory.getParamA(
+ PARAM_NAME,
+ PARAM_CATEGORY,
+ profile_key=profile
+ ))
+
+ if trust_system == "atm":
+ await manage_trust_message_cache(client, self, frozenset(trust_updates))
+ await send_trust_messages(client, self, frozenset(trust_updates))
+
return SessionManagerImpl
@@ -1086,7 +1499,8 @@
<param name="{PARAM_NAME}"
label={quoteattr(D_('OMEMO default trust policy'))}
type="list" security="3">
- <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+ <option value="atm"
+ label={quoteattr(D_('Automatic Trust Management (more secure)'))} />
<option value="btbv"
label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
selected="true" />
@@ -1103,7 +1517,7 @@
``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl``
namespace. Both versions of the protocol are handled by this plugin and compatibility
between the two is maintained. MUC messages are supported next to one to one messages.
- For trust management, the two trust models "BTBV" and "manual" are supported.
+ For trust management, the two trust models "ATM" and "BTBV" are supported.
"""
NS_TWOMEMO = twomemo.twomemo.NAMESPACE
NS_OLDMEMO = oldmemo.oldmemo.NAMESPACE
@@ -1163,7 +1577,8 @@
self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {}
# These triggers are used by oldmemo, which doesn't do SCE and only applies to
- # messages
+ # messages. Temporarily, until a more fitting trigger for SCE-based encryption is
+ # added, the messageReceived trigger is also used for twomemo.
sat.trigger.add(
"messageReceived",
self.__message_received_trigger,
@@ -1298,7 +1713,7 @@
async def callback(
data: Any,
- profile: str # pylint: disable=unused-argument
+ profile: str
) -> Dict[Never, Never]:
"""
@param data: The XMLUI result produces by the trust UI form.
@@ -1314,18 +1729,56 @@
Dict[str, str],
xml_tools.XMLUIResult2DataFormResult(data)
)
+
+ trust_updates: Set[TrustUpdate] = set()
+
for key, value in data_form_result.items():
if not key.startswith("trust_"):
continue
device = devices[int(key[len("trust_"):])]
- trust = TrustLevel(value)
-
- if TrustLevel(device.trust_level_name) is not trust:
+ trust_level_name = value
+
+ if device.trust_level_name != trust_level_name:
await session_manager.set_trust(
device.bare_jid,
device.identity_key,
- value
+ trust_level_name
+ )
+
+ target_trust: Optional[bool] = None
+
+ if TrustLevel(trust_level_name) is TrustLevel.TRUSTED:
+ target_trust = True
+ if TrustLevel(trust_level_name) is TrustLevel.DISTRUSTED:
+ target_trust = False
+
+ if target_trust is not None:
+ trust_updates.add(TrustUpdate(
+ target_jid=jid.JID(device.bare_jid).userhostJID(),
+ target_key=device.identity_key,
+ target_trust=target_trust
+ ))
+
+ # Check whether ATM is enabled and handle everything in case it is
+ trust_system = cast(str, self.__sat.memory.getParamA(
+ PARAM_NAME,
+ PARAM_CATEGORY,
+ profile_key=profile
+ ))
+
+ if trust_system == "atm":
+ if len(trust_updates) > 0:
+ await manage_trust_message_cache(
+ client,
+ session_manager,
+ frozenset(trust_updates)
+ )
+
+ await send_trust_messages(
+ client,
+ session_manager,
+ frozenset(trust_updates)
)
return {}
@@ -1341,13 +1794,15 @@
# pylint: disable=no-member
trust_ui = cast(Any, result)
trust_ui.addText(D_(
- "This is OMEMO trusting system. You'll see below the devices of your "
- "contacts, and a list selection to trust them or not. A trusted device "
- "can read your messages in plain text, so be sure to only validate "
- "devices that you are sure are belonging to your contact. It's better "
- "to do this when you are next to your contact and their device, so "
- "you can check the \"fingerprint\" (the number next to the device) "
- "yourself. Do *not* validate a device if the fingerprint is wrong!"
+ "This is OMEMO trusting system. You'll see below the devices of your"
+ " contacts, and a list selection to trust them or not. A trusted device"
+ " can read your messages in plain text, so be sure to only validate"
+ " devices that you are sure are belonging to your contact. It's better"
+ " to do this when you are next to your contact and their device, so"
+ " you can check the \"fingerprint\" (the number next to the device)"
+ " yourself. Do *not* validate a device if the fingerprint is wrong!"
+ " Note that manually validating a fingerprint disables any form of automatic"
+ " trust."
))
own_device, __ = await session_manager.get_own_device_information()
@@ -1491,6 +1946,156 @@
return session_manager
+ async def __message_received_trigger_atm(
+ self,
+ client: SatXMPPClient,
+ message_elt: domish.Element,
+ session_manager: omemo.SessionManager,
+ sender_device_information: omemo.DeviceInformation,
+ timestamp: datetime
+ ) -> None:
+ """Check a newly decrypted message stanza for ATM content and perform ATM in case.
+
+ @param client: The client which received the message.
+ @param message_elt: The message element. Can be modified.
+ @param session_manager: The session manager.
+ @param sender_device_information: Information about the device that sent/encrypted
+ the message.
+ @param timestamp: Timestamp extracted from the SCE time affix.
+ """
+
+ trust_message_cache = persistent.LazyPersistentBinaryDict(
+ "XEP-0384/TM",
+ client.profile
+ )
+
+ new_cache_entries: Set[TrustMessageCacheEntry] = set()
+
+ for trust_message_elt in message_elt.elements(NS_TM, "trust-message"):
+ assert isinstance(trust_message_elt, domish.Element)
+
+ try:
+ TRUST_MESSAGE_SCHEMA.validate(trust_message_elt.toXml())
+ except xmlschema.XMLSchemaValidationError as e:
+ raise exceptions.ParsingError(
+ "<trust-message/> element doesn't pass schema validation."
+ ) from e
+
+ if trust_message_elt["usage"] != NS_ATM:
+ # Skip non-ATM trust message
+ continue
+
+ if trust_message_elt["encryption"] != OMEMO.NS_TWOMEMO:
+ # Skip non-twomemo trust message
+ continue
+
+ for key_owner_elt in trust_message_elt.elements(NS_TM, "key-owner"):
+ assert isinstance(key_owner_elt, domish.Element)
+
+ key_owner_jid = jid.JID(key_owner_elt["jid"]).userhostJID()
+
+ for trust_elt in key_owner_elt.elements(NS_TM, "trust"):
+ assert isinstance(trust_elt, domish.Element)
+
+ new_cache_entries.add(TrustMessageCacheEntry(
+ sender_jid=jid.JID(sender_device_information.bare_jid),
+ sender_key=sender_device_information.identity_key,
+ timestamp=timestamp,
+ trust_update=TrustUpdate(
+ target_jid=key_owner_jid,
+ target_key=base64.b64decode(str(trust_elt)),
+ target_trust=True
+ )
+ ))
+
+ for distrust_elt in key_owner_elt.elements(NS_TM, "distrust"):
+ assert isinstance(distrust_elt, domish.Element)
+
+ new_cache_entries.add(TrustMessageCacheEntry(
+ sender_jid=jid.JID(sender_device_information.bare_jid),
+ sender_key=sender_device_information.identity_key,
+ timestamp=timestamp,
+ trust_update=TrustUpdate(
+ target_jid=key_owner_jid,
+ target_key=base64.b64decode(str(distrust_elt)),
+ target_trust=False
+ )
+ ))
+
+ # Load existing cache entries
+ existing_cache_entries = cast(
+ Set[TrustMessageCacheEntry],
+ await trust_message_cache.get("cache", set())
+ )
+
+ # Discard cache entries by timestamp comparison
+ existing_by_target = {
+ (
+ cache_entry.trust_update.target_jid.userhostJID(),
+ cache_entry.trust_update.target_key
+ ): cache_entry
+ for cache_entry
+ in existing_cache_entries
+ }
+
+ # Iterate over a copy here, such that new_cache_entries can be modified
+ for new_cache_entry in set(new_cache_entries):
+ existing_cache_entry = existing_by_target.get(
+ (
+ new_cache_entry.trust_update.target_jid.userhostJID(),
+ new_cache_entry.trust_update.target_key
+ ),
+ None
+ )
+
+ if existing_cache_entry is not None:
+ if existing_cache_entry.timestamp > new_cache_entry.timestamp:
+ # If the existing cache entry is newer than the new cache entry,
+ # discard the new one in favor of the existing one
+ new_cache_entries.remove(new_cache_entry)
+ else:
+ # Otherwise, discard the existing cache entry. This includes the case
+ # when both cache entries have matching timestamps.
+ existing_cache_entries.remove(existing_cache_entry)
+
+ # If the sending device is trusted, apply the new cache entries
+ applied_trust_updates: Set[TrustUpdate] = set()
+
+ if TrustLevel(sender_device_information.trust_level_name) is TrustLevel.TRUSTED:
+ # Iterate over a copy such that new_cache_entries can be modified
+ for cache_entry in set(new_cache_entries):
+ trust_update = cache_entry.trust_update
+
+ trust_level = (
+ TrustLevel.TRUSTED
+ if trust_update.target_trust
+ else TrustLevel.DISTRUSTED
+ )
+
+ await session_manager.set_trust(
+ trust_update.target_jid.userhost(),
+ trust_update.target_key,
+ trust_level.name
+ )
+
+ applied_trust_updates.add(trust_update)
+
+ new_cache_entries.remove(cache_entry)
+
+ # Store the remaining existing and new cache entries
+ await trust_message_cache.force(
+ "cache",
+ existing_cache_entries | new_cache_entries
+ )
+
+ # If the trust of at least one device was modified, run the ATM cache update logic
+ if len(applied_trust_updates) > 0:
+ await manage_trust_message_cache(
+ client,
+ session_manager,
+ frozenset(applied_trust_updates)
+ )
+
async def __message_received_trigger(
self,
client: SatXMPPClient,
@@ -1506,6 +2111,7 @@
encrypted.
@return: Whether to continue the message received flow.
"""
+
muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
sender_jid = jid.JID(message_elt["from"])
@@ -1569,8 +2175,6 @@
# I'm not sure why this check is required, this code is copied from the old
# plugin.
if sender_jid.userhostJID() == client.jid.userhostJID():
- # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
- # like "to" isn't always set.
try:
feedback_jid = jid.JID(message_elt["to"])
except KeyError:
@@ -1700,6 +2304,8 @@
# No point in further processing this message
return False
+ affix_values: Optional[SCEAffixValues] = None
+
if message.namespace == twomemo.twomemo.NAMESPACE:
if plaintext is not None:
# XEP_0420.unpack_stanza handles the whole unpacking, including the
@@ -1726,12 +2332,12 @@
)
# No point in further processing this message
return False
-
- if affix_values.timestamp is not None:
- # TODO: affix_values.timestamp contains the timestamp included in the
- # encrypted element here. The XEP says it SHOULD be displayed with the
- # plaintext by clients.
- pass
+ else:
+ if affix_values.timestamp is not None:
+ # TODO: affix_values.timestamp contains the timestamp included in
+ # the encrypted element here. The XEP says it SHOULD be displayed
+ # with the plaintext by clients.
+ pass
if message.namespace == oldmemo.oldmemo.NAMESPACE:
# Remove all body elements from the original element, since those act as
@@ -1746,7 +2352,8 @@
# Mark the message as trusted or untrusted. Undecided counts as untrusted here.
trust_level = \
- TrustLevel(device_information.trust_level_name).to_omemo_trust_level()
+ await session_manager._evaluate_custom_trust_level(device_information)
+
if trust_level is omemo.TrustLevel.TRUSTED:
post_treat.addCallback(client.encryption.markAsTrusted)
else:
@@ -1758,6 +2365,16 @@
namespace=message.namespace
)
+ # Handle potential ATM trust updates
+ if affix_values is not None and affix_values.timestamp is not None:
+ await self.__message_received_trigger_atm(
+ client,
+ message_elt,
+ session_manager,
+ device_information,
+ affix_values.timestamp
+ )
+
# Message processed successfully, continue with the flow
return True
@@ -1769,7 +2386,7 @@
"""
# SCE is only applicable to message and IQ stanzas
# FIXME: temporary disabling IQ stanza encryption
- if stanza.name not in { "message" }: # , "iq" }:
+ if stanza.name not in { "message" }: # , "iq" }:
return True
# Get the intended recipient
@@ -2019,11 +2636,15 @@
if namespace == twomemo.twomemo.NAMESPACE:
# Add the encrypted element
- stanza.addChild(xml_tools.et_elt_2_domish_elt(twomemo.etree.serialize_message(message)))
+ stanza.addChild(xml_tools.et_elt_2_domish_elt(
+ twomemo.etree.serialize_message(message)
+ ))
if namespace == oldmemo.oldmemo.NAMESPACE:
# Add the encrypted element
- stanza.addChild(xml_tools.et_elt_2_domish_elt(oldmemo.etree.serialize_message(message)))
+ stanza.addChild(xml_tools.et_elt_2_domish_elt(
+ oldmemo.etree.serialize_message(message)
+ ))
if muc_plaintext_cache_key is not None:
self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext
diff --git a/sat/tools/xml_tools.py b/sat/tools/xml_tools.py
--- a/sat/tools/xml_tools.py
+++ b/sat/tools/xml_tools.py
@@ -20,7 +20,7 @@
from collections import OrderedDict
import html.entities
import re
-from typing import Optional, Tuple, Union, Literal, overload
+from typing import Dict, Optional, Tuple, Union, Literal, overload
from xml.dom import NotFoundErr, minidom
import xml.etree.ElementTree as ET
from lxml import etree
@@ -2030,3 +2030,37 @@
for child in elt.elements():
et_elt.append(domish_elt_2_et_elt(child, lxml=lxml))
return et_elt
+
+
+def domish_elt_2_et_elt2(element: domish.Element) -> ET.Element:
+ """
+ WIP, originally from the OMEMO plugin
+ """
+
+ element_name = element.name
+ if element.uri is not None:
+ element_name = "{" + element.uri + "}" + element_name
+
+ attrib: Dict[str, str] = {}
+ for qname, value in element.attributes.items():
+ attribute_name = qname[1] if isinstance(qname, tuple) else qname
+ attribute_namespace = qname[0] if isinstance(qname, tuple) else None
+ if attribute_namespace is not None:
+ attribute_name = "{" + attribute_namespace + "}" + attribute_name
+
+ attrib[attribute_name] = value
+
+ result = ET.Element(element_name, attrib)
+
+ last_child: Optional[ET.Element] = None
+ for child in element.children:
+ if isinstance(child, str):
+ if last_child is None:
+ result.text = child
+ else:
+ last_child.tail = child
+ else:
+ last_child = domish_elt_2_et_elt2(child)
+ result.append(last_child)
+
+ return result
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -52,8 +52,8 @@
'urwid-satext == 0.9.*',
'wokkel >= 18.0.0, < 19.0.0',
'omemo >= 1.0.0, < 2',
- 'twomemo[xml] >= 1.0.0, < 2',
- 'oldmemo[xml] >= 1.0.0, < 2',
+ 'twomemo >= 1.0.0, < 2',
+ 'oldmemo >= 1.0.0, < 2',
'pyyaml < 7.0.0',
'sqlalchemy >= 1.4',
'alembic',