T

plugin XEP-0384: Fully reworked to adjust to the reworked python-omemo:

- support for both (modern) OMEMO under the `urn:xmpp:omemo:2` namespace and (legacy) OMEMO under the `eu.siacs.conversations.axolotl` namespace - maintains one identity across both versions of OMEMO - migrates data from the old plugin - includes more features for protocol stability - uses SCE for modern OMEMO - fully type-checked, linted and format-checked - added type hints to various pieces of backend code used by the plugin - added stubs for some Twisted APIs used by the plugin under stubs/ (use `export MYPYPATH=stubs/` before running mypy) - core (xmpp): enabled `send` trigger and made it an asyncPoint fix 375

G

goffi 20/09/2022, 11:03

Hi Tim, thank for your MR and sorry for the late review, I've been overwhelmed these past few weeks.

So about the review:

  • you're still using @defer.inlineCallbacks, please don't: this this the legacy method to do coroutines, and nowadays we use async def and await (with Twisted's ensureDeferred).

  • you are referencing a couple of packages which are not yet on Pypi (oldmemo, twomemo, OMEMO==1.0.0) this will break installation. Could you either publish them on pypi or use a git: link or equivalent?

  • not super found of the domish.Element <=> ElementTree ping pong. Can't this be avoided?

Beside those details, it looks good to me, great work! Thanks a lot.

S

syndace 20/09/2022, 13:23

Hey, thanks for the review!

About inlineCallbacks: okay sure, I'll replace that.

About PyPI: I want to delay publishing my packages until I have added Brython support, such that I don't have to version-bump all of the packages right away a few days after releasing them. If it's fine with you we can wait with this merge until I've published my packages.

About the XML element conversion: it can be avoided, but doing so comes at a different cost. First of all, the (de)serialization code for both twomemo and oldmemo add up to around 900 lines (including whitespace, doc, comments, etc.) which you don't have to maintain yourself if you use the serialization that comes with the twomemo/oldmemo libraries. Second, if you want the convenience and correctness of XML schema validation, you can't avoid etree, because the schema validator is based on etree (or lxml). I did a small benchmark of the conversion functions on my setup and added the results as comments above them. It is up to you to decide which cost outweighs the other, my preference should be obvious.

G

goffi 20/09/2022, 13:36

Great.

While I agree that we could wait, I may need OMEMO to work on pubsub encryption. I think that during development we can use git: link to the package, and I'll update it for 0.9 release once you've published them on pypi.

About XML: alright make sense, let keep it like that then.

S

syndace 20/09/2022, 15:22

Okay, I have updated the mr to remove inlineCallbacks and to reference my packages via git urls in requirements.txt.

G

goffi 20/09/2022, 15:49

This look good to merge. I'll do it in coming days.

Thanks a lot!

S

syndace 25/09/2022, 13:44

You might want to close this since it's merged now :)

id

8

author

Tim

created

2022-08-25T06:39:10Z

updated

2022-09-20T15:54:26Z

labels
core
status
queued

plugin XEP-0384: Fully reworked to adjust to the reworked python-omemo: - support for both (modern) OMEMO under the `urn:xmpp:omemo:2` namespace and (legacy) OMEMO under the `eu.siacs.conversations.axolotl` namespace - maintains one identity across both versions of OMEMO - migrates data from the old plugin - includes more features for protocol stability - uses SCE for modern OMEMO - fully type-checked, linted and format-checked - added type hints to various pieces of backend code used by the plugin - added stubs for some Twisted APIs used by the plugin under stubs/ (use `export MYPYPATH=stubs/` before running mypy) - core (xmpp): enabled `send` trigger and made it an asyncPoint fix 375

diff --git a/pylintrc b/pylintrc
--- a/pylintrc
+++ b/pylintrc
@@ -63,7 +63,8 @@
 disable=missing-module-docstring,
         duplicate-code,
         fixme,
-        logging-fstring-interpolation
+        logging-fstring-interpolation,
+        broad-except
 
 # Enable the message, report, category or checker with the given id(s). You can
 # either give multiple identifier separated by comma (,) or put this option
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -13,7 +13,7 @@
 cssselect2==0.6.0
 dbus-python==1.2.18
 defusedxml==0.7.1
-DoubleRatchet==0.7.0
+DoubleRatchet @ git+https://github.com/Syndace/python-doubleratchet.git@stable
 greenlet==1.1.2
 html2text==2020.1.16
 hyperlink==21.0.0
@@ -29,8 +29,8 @@
 mutagen==1.45.1
 netifaces==0.11.0
 numpy==1.22.3
-OMEMO==0.12.0
-omemo-backend-signal==0.2.7
+Oldmemo @ git+https://github.com/Syndace/python-oldmemo.git
+OMEMO @ git+https://github.com/Syndace/python-omemo.git@stable
 packaging==21.3
 Pillow==9.1.0
 progressbar2==3.53.3
@@ -61,14 +61,16 @@
 tomli==2.0.1
 treq==21.5.0
 Twisted==21.2.0
+Twomemo @ git+https://github.com/Syndace/python-twomemo.git
 txdbus==1.1.2
-typing_extensions==4.2.0
+typing_extensions==4.3.0
 urllib3==1.26.9
 urwid==2.1.2
 webencodings==0.5.1
 wokkel==18.0.0
-X3DH==0.5.9
-XEdDSA==0.4.7
+X3DH @ git+https://github.com/Syndace/python-x3dh.git@stable
+XEdDSA @ git+https://github.com/Syndace/python-xeddsa.git@stable
+xmlschema==2.0.4
 zope.interface==5.4.0
 
 sat-tmp @ hg+https://repos.goffi.org/sat_tmp
diff --git a/sat/core/constants.py b/sat/core/constants.py
--- a/sat/core/constants.py
+++ b/sat/core/constants.py
@@ -22,6 +22,7 @@
 except ImportError:
     BaseDirectory = None
 from os.path import dirname
+from typing_extensions import Final
 import sat
 
 
@@ -126,7 +127,7 @@
     EXTRA_INFO_ENCR_ERR = "ENCRYPTION_ERROR"
 
     # encryption is a key for plugins
-    MESS_KEY_ENCRYPTION = "ENCRYPTION"
+    MESS_KEY_ENCRYPTION: Final = "ENCRYPTION"
     # encrypted is a key for frontends
     MESS_KEY_ENCRYPTED = "encrypted"
     MESS_KEY_TRUSTED = "trusted"
@@ -431,7 +432,7 @@
         return value.lower() in (cls.BOOL_TRUE, "1", "yes", "on")
 
     @classmethod
-    def boolConst(cls, value):
+    def boolConst(cls, value: bool) -> str:
         """@return (str): constant associated to bool value"""
         assert isinstance(value, bool)
         return cls.BOOL_TRUE if value else cls.BOOL_FALSE
diff --git a/sat/core/core_types.py b/sat/core/core_types.py
--- a/sat/core/core_types.py
+++ b/sat/core/core_types.py
@@ -16,10 +16,47 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from collections import namedtuple
+from typing import Dict
+from typing_extensions import TypedDict
 from twisted.words.protocols.jabber import jid as t_jid
+from twisted.words.xish import domish
 
 
 class SatXMPPEntity:
 
     jid: t_jid.JID
     is_component: bool
+
+
+EncryptionPlugin = namedtuple("EncryptionPlugin", ("instance",
+                                                   "name",
+                                                   "namespace",
+                                                   "priority",
+                                                   "directed"))
+
+
+class EncryptionSession(TypedDict):
+    plugin: EncryptionPlugin
+
+
+# Incomplete types built through observation rather than code inspection.
+MessageDataExtra = TypedDict(
+    "MessageDataExtra",
+    { "encrypted": bool, "origin_id": str },
+    total=False
+)
+
+
+MessageData = TypedDict("MessageData", {
+    "from": t_jid.JID,
+    "to": t_jid.JID,
+    "uid": str,
+    "message": Dict[str, str],
+    "subject": Dict[str, str],
+    "type": str,
+    "timestamp": float,
+    "extra": MessageDataExtra,
+    "ENCRYPTION": EncryptionSession,
+    "xml": domish.Element
+}, total=False)
diff --git a/sat/core/log.py b/sat/core/log.py
--- a/sat/core/log.py
+++ b/sat/core/log.py
@@ -21,6 +21,14 @@
 # XXX: this module use standard logging module when possible, but as SàT can work in different cases where logging is not the best choice (twisted, pyjamas, etc), it is necessary to have a dedicated module. Additional feature like environment variables and colors are also managed.
 # TODO: change formatting from "%s" style to "{}" when moved to Python 3
 
+from typing import TYPE_CHECKING, Any, Optional
+from typing_extensions import TypedDict
+
+if TYPE_CHECKING:
+    from logging import _ExcInfoType
+else:
+    _ExcInfoType = Any
+
 from sat.core.constants import Const as C
 from sat.tools.common.ansi import ANSI as A
 from sat.core import exceptions
@@ -37,6 +45,10 @@
     pass
 
 
+class KWArgs(TypedDict):
+    exc_info: _ExcInfoType
+
+
 class Logger:
     """High level logging class"""
     fmt = None # format option as given by user (e.g. SAT_LOG_LOGGER)
@@ -60,7 +72,7 @@
         tb = traceback.format_exc()
         return message + "\n==== traceback ====\n" + tb
 
-    def out(self, message, level=None, **kwargs):
+    def out(self, message: object, level: Optional[str] = None, **kwargs: KWArgs) -> None:
         """Actually log the message
 
         @param message: formatted message
@@ -69,7 +81,7 @@
             message = self.addTraceback(message)
         print(message)
 
-    def log(self, level, message, **kwargs):
+    def log(self, level: str, message: object, **kwargs: KWArgs) -> None:
         """Print message
 
         @param level: one of C.LOG_LEVELS
@@ -84,7 +96,7 @@
         except Filtered:
             pass
 
-    def format(self, level, message):
+    def format(self, level: str, message: object) -> object:
         """Format message according to Logger.fmt
 
         @param level: one of C.LOG_LEVELS
@@ -117,19 +129,19 @@
             else:
                 raise e
 
-    def debug(self, msg, **kwargs):
+    def debug(self, msg: object, **kwargs: KWArgs) -> None:
         self.log(C.LOG_LVL_DEBUG, msg, **kwargs)
 
-    def info(self, msg, **kwargs):
+    def info(self, msg: object, **kwargs: KWArgs) -> None:
         self.log(C.LOG_LVL_INFO, msg, **kwargs)
 
-    def warning(self, msg, **kwargs):
+    def warning(self, msg: object, **kwargs: KWArgs) -> None:
         self.log(C.LOG_LVL_WARNING, msg, **kwargs)
 
-    def error(self, msg, **kwargs):
+    def error(self, msg: object, **kwargs: KWArgs) -> None:
         self.log(C.LOG_LVL_ERROR, msg, **kwargs)
 
-    def critical(self, msg, **kwargs):
+    def critical(self, msg: object, **kwargs: KWArgs) -> None:
         self.log(C.LOG_LVL_CRITICAL, msg, **kwargs)
 
 
diff --git a/sat/core/sat_main.py b/sat/core/sat_main.py
--- a/sat/core/sat_main.py
+++ b/sat/core/sat_main.py
@@ -634,10 +634,10 @@
             return (None, None)
         return (self.profiles[profile].jid, self.profiles[profile].xmlstream)
 
-    def getClient(self, profile_key):
+    def getClient(self, profile_key: str) -> xmpp.SatXMPPClient:
         """Convenient method to get client from profile key
 
-        @return: client or None if it doesn't exist
+        @return: the client
         @raise exceptions.ProfileKeyUnknown: the profile or profile key doesn't exist
         @raise exceptions.NotFound: client is not available
             This happen if profile has not been used yet
diff --git a/sat/core/xmpp.py b/sat/core/xmpp.py
--- a/sat/core/xmpp.py
+++ b/sat/core/xmpp.py
@@ -550,10 +550,14 @@
         ).toResponse(iq_elt)
         self.xmlstream.send(iq_error_elt)
 
-    def generateMessageXML(self, data, post_xml_treatments=None):
+    def generateMessageXML(
+        self,
+        data: core_types.MessageData,
+        post_xml_treatments: Optional[defer.Deferred] = None
+    ) -> core_types.MessageData:
         """Generate <message/> stanza from message data
 
-        @param data(dict): message data
+        @param data: message data
             domish element will be put in data['xml']
             following keys are needed:
                 - from
@@ -563,9 +567,9 @@
                 - type
                 - subject
                 - extra
-        @param post_xml_treatments(Deferred): a Deferred which will be called with data
-            once XML is generated
-        @return (dict) message data
+        @param post_xml_treatments: a Deferred which will be called with data once XML is
+            generated
+        @return: message data
         """
         data["xml"] = message_elt = domish.Element((None, "message"))
         message_elt["to"] = data["to"].full()
@@ -612,7 +616,7 @@
         """
         raise NotImplementedError
 
-    def send(self, obj):
+    async def send(self, obj):
         # original send method accept string
         # but we restrict to domish.Element to make trigger treatments easier
         assert isinstance(obj, domish.Element)
@@ -622,14 +626,11 @@
         #      (out of band transmission for instance).
         #      e2e should have a priority of 0 here, and out of band transmission
         #      a lower priority
-        #  FIXME: trigger not used yet, can be uncommented when e2e full stanza
-        #         encryption is implemented
-        #  if not self.host_app.trigger.point("send", self, obj):
-        #      return
+        if not (await self.host_app.trigger.asyncPoint("send", self, obj)):
+            return
         super().send(obj)
 
-    @defer.inlineCallbacks
-    def sendMessageData(self, mess_data):
+    async def sendMessageData(self, mess_data):
         """Convenient method to send message data to stream
 
         This method will send mess_data[u'xml'] to stream, but a trigger is there
@@ -644,10 +645,10 @@
         #      This is intented for e2e encryption which doesn't do full stanza
         #      encryption (e.g. OTR)
         #      This trigger point can't cancel the method
-        yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data,
+        await self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data,
             triggers_no_cancel=True)
-        self.send(mess_data["xml"])
-        defer.returnValue(mess_data)
+        await self.send(mess_data["xml"])
+        return mess_data
 
     def sendMessage(
             self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None,
@@ -824,6 +825,9 @@
         return ClientPluginWrapper(self, plugin_name, missing)
 
 
+ExtraDict = dict  # TODO
+
+
 @implementer(iwokkel.IDisco)
 class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient):
     trigger_suffix = ""
@@ -930,15 +934,20 @@
         )
         post_xml_treatments.addCallback(self.messageSendToBridge)
 
-    def feedback(self, to_jid, message, extra=None):
+    def feedback(
+        self,
+        to_jid: jid.JID,
+        message: str,
+        extra: Optional[ExtraDict] = None
+    ) -> None:
         """Send message to frontends
 
         This message will be an info message, not recorded in history.
         It can be used to give feedback of a command
-        @param to_jid(jid.JID): destinee jid
-        @param message(unicode): message to send to frontends
-        @param extra(None, dict): extra data to use
-            in particular, info subtype can be specified with MESS_EXTRA_INFO
+        @param to_jid: destinee jid
+        @param message: message to send to frontends
+        @param extra: extra data to use in particular, info subtype can be specified with
+            MESS_EXTRA_INFO
         """
         if extra is None:
             extra = {}
diff --git a/sat/memory/encryption.py b/sat/memory/encryption.py
--- a/sat/memory/encryption.py
+++ b/sat/memory/encryption.py
@@ -19,10 +19,11 @@
 
 import copy
 from functools import partial
-from collections import namedtuple
+from typing import Optional
 from twisted.words.protocols.jabber import jid
 from twisted.internet import defer
 from twisted.python import failure
+from sat.core.core_types import EncryptionPlugin, EncryptionSession, MessageData
 from sat.core.i18n import D_, _
 from sat.core.constants import Const as C
 from sat.core import exceptions
@@ -34,12 +35,6 @@
 
 log = getLogger(__name__)
 
-EncryptionPlugin = namedtuple("EncryptionPlugin", ("instance",
-                                                   "name",
-                                                   "namespace",
-                                                   "priority",
-                                                   "directed"))
-
 
 class EncryptionHandler:
     """Class to handle encryption sessions for a client"""
@@ -339,7 +334,7 @@
 
         self.client.feedback(entity, msg)
 
-    def getSession(self, entity):
+    def getSession(self, entity: jid.JID) -> Optional[EncryptionSession]:
         """Get encryption session for this contact
 
         @param entity(jid.JID): get the session for this entity
@@ -476,13 +471,17 @@
 
         return mess_data
 
-    def isEncryptionRequested(self, mess_data, namespace=None):
+    def isEncryptionRequested(
+        self,
+        mess_data: MessageData,
+        namespace: Optional[str] = None
+    ) -> bool:
         """Helper method to check if encryption is requested in an outgoind message
 
-        @param mess_data(dict): message data for outgoing message
-        @param namespace(str, None): if set, check if encryption is requested for the
-            algorithm specified
-        @return (bool): True if the encryption flag is present
+        @param mess_data: message data for outgoing message
+        @param namespace: if set, check if encryption is requested for the algorithm
+            specified
+        @return: True if the encryption flag is present
         """
         encryption = mess_data.get(C.MESS_KEY_ENCRYPTION)
         if encryption is None:
diff --git a/sat/plugins/plugin_xep_0045.py b/sat/plugins/plugin_xep_0045.py
--- a/sat/plugins/plugin_xep_0045.py
+++ b/sat/plugins/plugin_xep_0045.py
@@ -26,6 +26,7 @@
 from twisted.python import failure
 
 from sat.core import exceptions
+from sat.core.xmpp import SatXMPPClient
 from sat.memory import memory
 
 import time
@@ -204,10 +205,10 @@
                 return False
         return True
 
-    def getRoom(self, client, room_jid):
+    def getRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> muc.Room:
         """Retrieve Room instance from its jid
 
-        @param room_jid(jid.JID): jid of the room
+        @param room_jid: jid of the room
         @raise exceptions.NotFound: the room has not been joined
         """
         try:
@@ -223,10 +224,10 @@
         if room_jid not in client._muc_client.joined_rooms:
             raise exceptions.NotFound(_("This room has not been joined"))
 
-    def isJoinedRoom(self, client, room_jid):
+    def isJoinedRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> bool:
         """Tell if a jid is a known and joined room
 
-        @room_jid(jid.JID): jid of the room
+        @room_jid: jid of the room
         """
         try:
             self.checkRoomJoined(client, room_jid)
diff --git a/sat/plugins/plugin_xep_0060.py b/sat/plugins/plugin_xep_0060.py
--- a/sat/plugins/plugin_xep_0060.py
+++ b/sat/plugins/plugin_xep_0060.py
@@ -19,7 +19,7 @@
 
 from collections import namedtuple
 from functools import reduce
-from typing import Any, Dict, List, Optional, Tuple, Union, Callable
+from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, Callable
 import urllib.error
 import urllib.parse
 import urllib.request
@@ -39,6 +39,7 @@
 from sat.core.core_types import SatXMPPEntity
 from sat.core.i18n import _
 from sat.core.log import getLogger
+from sat.core.xmpp import SatXMPPClient
 from sat.tools import utils
 from sat.tools import sat_defer
 from sat.tools import xml_tools
@@ -538,17 +539,22 @@
         ))
 
     async def sendItem(
-        self, client, service, nodeIdentifier, payload, item_id=None, extra=None
-    ):
+        self,
+        client: SatXMPPClient,
+        service: Union[jid.JID, None],
+        nodeIdentifier: str,
+        payload: domish.Element,
+        item_id: Optional[str] = None,
+        extra: Optional[Dict[str, Any]] = None
+    ) -> Optional[str]:
         """High level method to send one item
 
-        @param service(jid.JID, None): service to send the item to
-            None to use PEP
-        @param NodeIdentifier(unicode): PubSub node to use
-        @param payload(domish.Element): payload of the item to send
-        @param item_id(unicode, None): id to use or None to create one
-        @param extra(dict, None): extra option, not used yet
-        @return (unicode, None): id of the created item
+        @param service: service to send the item to None to use PEP
+        @param NodeIdentifier: PubSub node to use
+        @param payload: payload of the item to send
+        @param item_id: id to use or None to create one
+        @param extra: extra options
+        @return: id of the created item
         """
         assert isinstance(payload, domish.Element)
         item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
@@ -1090,7 +1096,12 @@
             client, jid.JID(service_s) if service_s else None, nodeIdentifier
         )
 
-    def deleteNode(self, client, service, nodeIdentifier):
+    def deleteNode(
+        self,
+        client: SatXMPPClient,
+        service: jid.JID,
+        nodeIdentifier: str
+    ) -> defer.Deferred:
         return client.pubsub_client.deleteNode(service, nodeIdentifier)
 
     def _addWatch(self, service_s, node, profile_key):
@@ -1132,12 +1143,12 @@
 
     def retractItems(
         self,
-        client,
-        service,
-        nodeIdentifier,
-        itemIdentifiers,
-        notify=True,
-    ):
+        client: SatXMPPClient,
+        service: jid.JID,
+        nodeIdentifier: str,
+        itemIdentifiers: Iterable[str],
+        notify: bool = True,
+    ) -> defer.Deferred:
         return client.pubsub_client.retractItems(
             service, nodeIdentifier, itemIdentifiers, notify=notify
         )
diff --git a/sat/plugins/plugin_xep_0082.py b/sat/plugins/plugin_xep_0082.py
--- a/sat/plugins/plugin_xep_0082.py
+++ b/sat/plugins/plugin_xep_0082.py
@@ -17,9 +17,6 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-# Type-check with `mypy --strict`
-# Lint with `pylint`
-
 from sat.core.constants import Const as C
 from sat.core.i18n import D_
 from sat.core.sat_main import SAT
diff --git a/sat/plugins/plugin_xep_0334.py b/sat/plugins/plugin_xep_0334.py
--- a/sat/plugins/plugin_xep_0334.py
+++ b/sat/plugins/plugin_xep_0334.py
@@ -18,6 +18,7 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from typing import Iterable
 from sat.core.i18n import _, D_
 from sat.core.log import getLogger
 
@@ -29,6 +30,7 @@
 from wokkel import disco, iwokkel
 
 from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish
 from zope.interface import implementer
 from textwrap import dedent
 
@@ -83,11 +85,11 @@
         else:
             log.error("Unknown hint: {}".format(hint))
 
-    def addHintElements(self, message_elt, hints):
+    def addHintElements(self, message_elt: domish.Element, hints: Iterable[str]) -> None:
         """Add hints elements to message stanza
 
-        @param message_elt(domish.Element): stanza where hints must be added
-        @param hints(iterable(unicode)): hints to add
+        @param message_elt: stanza where hints must be added
+        @param hints: hints to add
         """
         for hint in hints:
             message_elt.addElement((NS_HINTS, hint))
diff --git a/sat/plugins/plugin_xep_0359.py b/sat/plugins/plugin_xep_0359.py
--- a/sat/plugins/plugin_xep_0359.py
+++ b/sat/plugins/plugin_xep_0359.py
@@ -18,6 +18,7 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from typing import Optional
 import uuid
 from zope.interface import implementer
 from twisted.words.protocols.jabber import xmlstream
@@ -26,6 +27,7 @@
 from sat.core import exceptions
 from sat.core.i18n import _
 from sat.core.log import getLogger
+from twisted.words.xish import domish
 
 log = getLogger(__name__)
 
@@ -101,11 +103,11 @@
         sid_elt["by"] = client.jid.userhost() if by is None else by.userhost()
         sid_elt["id"] = stanza_id
 
-    def getOriginId(self, element):
+    def getOriginId(self, element: domish.Element) -> Optional[str]:
         """Return origin-id if found in element
 
-        @param element(domish.Element): element to parse
-        @return (unicode, None): origin-id if found
+        @param element: element to parse
+        @return: origin-id if found
         """
         try:
             origin_elt = next(element.elements(NS_SID, "origin-id"))
diff --git a/sat/plugins/plugin_xep_0384.py b/sat/plugins/plugin_xep_0384.py
--- a/sat/plugins/plugin_xep_0384.py
+++ b/sat/plugins/plugin_xep_0384.py
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 
-# SAT plugin for OMEMO encryption
-# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+# Libervia plugin for OMEMO encryption
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
 
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
@@ -16,1431 +16,2055 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import enum
 import logging
-import random
-import base64
-from functools import partial
+import time
+from typing import (
+    Any, Callable, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast
+)
+import uuid
+import xml.etree.ElementTree as ET
 from xml.sax.saxutils import quoteattr
-from sat.core.i18n import _, D_
+
+from typing_extensions import Never, assert_never
+from wokkel import muc, pubsub  # type: ignore[import]
+
+from sat.core import exceptions
 from sat.core.constants import Const as C
-from sat.core.log import getLogger
-from sat.core import exceptions
-from twisted.internet import defer, reactor
+from sat.core.core_types import MessageData, SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.memory import persistent
+from sat.plugins.plugin_misc_text_commands import TextCommands
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0060 import XEP_0060
+from sat.plugins.plugin_xep_0163 import XEP_0163
+from sat.plugins.plugin_xep_0334 import XEP_0334
+from sat.plugins.plugin_xep_0359 import XEP_0359
+from sat.plugins.plugin_xep_0420 import XEP_0420, SCEAffixPolicy, SCEProfile
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import error, jid
 from twisted.words.xish import domish
-from twisted.words.protocols.jabber import jid
-from twisted.words.protocols.jabber import error as jabber_error
-from sat.memory import persistent
-from sat.tools import xml_tools
+
 try:
     import omemo
-    from omemo import exceptions as omemo_excpt
-    from omemo.extendedpublicbundle import ExtendedPublicBundle
-except ImportError:
+    import omemo.identity_key_pair
+    import twomemo
+    import twomemo.etree
+    import oldmemo
+    import oldmemo.etree
+    import oldmemo.migrations
+    from xmlschema import XMLSchemaValidationError
+
+    # An explicit version check of the OMEMO libraries should not be required here, since
+    # the stored data is fully versioned and the library will complain if a downgrade is
+    # attempted.
+except ImportError as import_error:
     raise exceptions.MissingModule(
-        'Missing module omemo, please download/install it. You can use '
-        '"pip install omemo"'
-    )
-try:
-    from omemo_backend_signal import BACKEND as omemo_backend
-except ImportError:
-    raise exceptions.MissingModule(
-        'Missing module omemo-backend-signal, please download/install it. You can use '
-        '"pip install omemo-backend-signal"'
-    )
+        "You are missing one or more package required by the OMEMO plugin. Please"
+        " download/install the pip packages 'omemo', 'twomemo', 'oldmemo' and"
+        " 'xmlschema'."
+    ) from import_error
+
 
-log = getLogger(__name__)
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "OMEMO"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+string_to_domish = cast(Callable[[str], domish.Element], xml_tools.ElementParser())
+
 
 PLUGIN_INFO = {
     C.PI_NAME: "OMEMO",
     C.PI_IMPORT_NAME: "XEP-0384",
     C.PI_TYPE: "SEC",
-    C.PI_PROTOCOLS: ["XEP-0384"],
-    C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060"],
-    C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0359", C.TEXT_CMDS],
+    C.PI_PROTOCOLS: [ "XEP-0384" ],
+    C.PI_DEPENDENCIES: [ "XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420" ],
+    C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0359", C.TEXT_CMDS ],
     C.PI_MAIN: "OMEMO",
     C.PI_HANDLER: "no",
     C.PI_DESCRIPTION: _("""Implementation of OMEMO"""),
 }
 
-OMEMO_MIN_VER = (0, 11, 0)
-NS_OMEMO = "eu.siacs.conversations.axolotl"
-NS_OMEMO_DEVICES = NS_OMEMO + ".devicelist"
-NS_OMEMO_BUNDLE = NS_OMEMO + ".bundles:{device_id}"
-KEY_STATE = "STATE"
-KEY_DEVICE_ID = "DEVICE_ID"
-KEY_SESSION = "SESSION"
-KEY_TRUST = "TRUST"
-# devices which have been automatically trusted by policy like BTBV
-KEY_AUTO_TRUST = "AUTO_TRUST"
-# list of peer bare jids where trust UI has been used at least once
-# this is useful to activate manual trust with BTBV policy
-KEY_MANUAL_TRUST = "MANUAL_TRUST"
-KEY_ACTIVE_DEVICES = "DEVICES"
-KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES"
-KEY_ALL_JIDS = "ALL_JIDS"
-# time before plaintext cache for MUC is expired
-# expressed in seconds, reset on each new MUC message
-MUC_CACHE_TTL = 60 * 5
 
 PARAM_CATEGORY = "Security"
 PARAM_NAME = "omemo_policy"
 
 
-# we want to manage log emitted by omemo module ourselves
+class LogHandler(logging.Handler):
+    """
+    Redirect python-omemo's log output to Libervia's log system.
+    """
 
-class SatHandler(logging.Handler):
-
-    def emit(self, record):
+    def emit(self, record: logging.LogRecord) -> None:
         log.log(record.levelname, record.getMessage())
 
-    @staticmethod
-    def install():
-        omemo_sm_logger = logging.getLogger("omemo.SessionManager")
-        omemo_sm_logger.propagate = False
-        omemo_sm_logger.addHandler(SatHandler())
+
+sm_logger = logging.getLogger(omemo.SessionManager.LOG_TAG)
+sm_logger.setLevel(logging.DEBUG)
+sm_logger.propagate = False
+sm_logger.addHandler(LogHandler())
+
+
+ikp_logger = logging.getLogger(omemo.identity_key_pair.IdentityKeyPair.LOG_TAG)
+ikp_logger.setLevel(logging.DEBUG)
+ikp_logger.propagate = False
+ikp_logger.addHandler(LogHandler())
+
+
+# TODO: Add handling for device labels, i.e. show device labels in the trust UI and give
+# the user a way to change their own device label.
 
 
-SatHandler.install()
+class MUCPlaintextCacheKey(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    Structure identifying an encrypted message sent to a MUC.
+    """
+
+    client: SatXMPPClient
+    room_jid: jid.JID
+    message_uid: str
+
+
+# TODO: Convert without serialization/parsing
+# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took
+# around 0.6 seconds on my setup.
+def etree_to_domish(element: ET.Element) -> domish.Element:
+    """
+    @param element: An ElementTree element.
+    @return: The ElementTree element converted to a domish element.
+    """
+
+    return string_to_domish(ET.tostring(element, encoding="unicode"))
+
+
+# TODO: Convert without serialization/parsing
+# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took less
+# than one second on my setup.
+def domish_to_etree(element: domish.Element) -> ET.Element:
+    """
+    @param element: A domish element.
+    @return: The domish element converted to an ElementTree element.
+    """
+
+    return ET.fromstring(element.toXml())
 
 
-def b64enc(data):
-    return base64.b64encode(bytes(bytearray(data))).decode("US-ASCII")
+def domish_to_etree2(element: domish.Element) -> ET.Element:
+    """
+    WIP
+    """
+
+    element_name = element.name
+    if element.uri is not None:
+        element_name = "{" + element.uri + "}" + element_name
+
+    attrib: Dict[str, str] = {}
+    for qname, value in element.attributes.items():
+        attribute_name = qname[1] if isinstance(qname, tuple) else qname
+        attribute_namespace = qname[0] if isinstance(qname, tuple) else None
+        if attribute_namespace is not None:
+            attribute_name = "{" + attribute_namespace + "}" + attribute_name
+
+        attrib[attribute_name] = value
+
+    result = ET.Element(element_name, attrib)
+
+    last_child: Optional[ET.Element] = None
+    for child in element.children:
+        if isinstance(child, str):
+            if last_child is None:
+                result.text = child
+            else:
+                last_child.tail = child
+        else:
+            last_child = domish_to_etree2(child)
+            result.append(last_child)
+
+    return result
 
 
-def promise2Deferred(promise_):
-    """Create a Deferred and fire it when promise is resolved
+@enum.unique
+class TrustLevel(enum.Enum):
+    """
+    The trust levels required for BTBV and manual trust.
+    """
+
+    TRUSTED: str = "TRUSTED"
+    BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
+    UNDECIDED: str = "UNDECIDED"
+    DISTRUSTED: str = "DISTRUSTED"
+
+    def to_omemo_trust_level(self) -> omemo.TrustLevel:
+        """
+        @return: This custom trust level evaluated to one of the OMEMO trust levels.
+        """
 
-    @param promise_(promise.Promise): promise to convert
-    @return (defer.Deferred): deferred instance linked to the promise
+        if self is TrustLevel.TRUSTED or self is TrustLevel.BLINDLY_TRUSTED:
+            return omemo.TrustLevel.TRUSTED
+        if self is TrustLevel.UNDECIDED:
+            return omemo.TrustLevel.UNDECIDED
+        if self is TrustLevel.DISTRUSTED:
+            return omemo.TrustLevel.DISTRUSTED
+
+        return assert_never(self)
+
+
+TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices"
+OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist"
+
+
+class StorageImpl(omemo.Storage):
+    """
+    Storage implementation for OMEMO based on :class:`persistent.LazyPersistentBinaryDict`
     """
-    d = defer.Deferred()
-    promise_.then(
-        lambda result: reactor.callFromThread(d.callback, result),
-        lambda exc: reactor.callFromThread(d.errback, exc)
-    )
-    return d
+
+    def __init__(self, profile: str) -> None:
+        """
+        @param profile: The profile this OMEMO data belongs to.
+        """
+
+        # persistent.LazyPersistentBinaryDict does not cache at all, so keep the caching
+        # option of omemo.Storage enabled.
+        super().__init__()
+
+        self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
+
+    async def _load(self, key: str) -> omemo.Maybe[omemo.JSONType]:
+        try:
+            return omemo.Just(await self.__storage[key])
+        except KeyError:
+            return omemo.Nothing()
+        except Exception as e:
+            raise omemo.StorageException(f"Error while loading key {key}") from e
+
+    async def _store(self, key: str, value: omemo.JSONType) -> None:
+        try:
+            await self.__storage.force(key, value)
+        except Exception as e:
+            raise omemo.StorageException(f"Error while storing key {key}: {value}") from e
+
+    async def _delete(self, key: str) -> None:
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+        except Exception as e:
+            raise omemo.StorageException(f"Error while deleting key {key}") from e
 
 
-class OmemoStorage(omemo.Storage):
-
-    def __init__(self, client, device_id, all_jids):
-        self.own_bare_jid_s = client.jid.userhost()
-        self.device_id = device_id
-        self.all_jids = all_jids
-        self.data = client._xep_0384_data
+class LegacyStorageImpl(oldmemo.migrations.LegacyStorage):
+    """
+    Legacy storage implementation to migrate data from the old XEP-0384 plugin.
+    """
 
-    @property
-    def is_async(self):
-        return True
+    KEY_DEVICE_ID = "DEVICE_ID"
+    KEY_STATE = "STATE"
+    KEY_SESSION = "SESSION"
+    KEY_ACTIVE_DEVICES = "DEVICES"
+    KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES"
+    KEY_TRUST = "TRUST"
+    KEY_ALL_JIDS = "ALL_JIDS"
 
-    def setCb(self, deferred, callback):
-        """Associate Deferred and callback
-
-        callback of omemo.Storage expect a boolean with success state then result
-        Deferred on the other hand use 2 methods for callback and errback
-        This method use partial to call callback with boolean then result when
-        Deferred is called
+    def __init__(self, profile: str, own_bare_jid: str) -> None:
+        """
+        @param profile: The profile this OMEMO data belongs to.
+        @param own_bare_jid: The own bare JID, to return by the :meth:`loadOwnData` call.
         """
-        deferred.addCallback(partial(callback, True))
-        deferred.addErrback(partial(callback, False))
+
+        self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
+        self.__own_bare_jid = own_bare_jid
+
+    async def loadOwnData(self) -> Optional[oldmemo.migrations.OwnData]:
+        own_device_id = await self.__storage.get(LegacyStorageImpl.KEY_DEVICE_ID, None)
+        if own_device_id is None:
+            return None
+
+        return oldmemo.migrations.OwnData(
+            own_bare_jid=self.__own_bare_jid,
+            own_device_id=own_device_id
+        )
+
+    async def deleteOwnData(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_DEVICE_ID)
+        except KeyError:
+            pass
+
+    async def loadState(self) -> Optional[oldmemo.migrations.State]:
+        return cast(
+            Optional[oldmemo.migrations.State],
+            await self.__storage.get(LegacyStorageImpl.KEY_STATE, None)
+        )
+
+    async def deleteState(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_STATE)
+        except KeyError:
+            pass
+
+    async def loadSession(
+        self,
+        bare_jid: str,
+        device_id: int
+    ) -> Optional[oldmemo.migrations.Session]:
+        key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
 
-    def _callMainThread(self, callback, method, *args, check_jid=None):
-        if check_jid is None:
-            d = method(*args)
-        else:
-            check_jid_d = self._checkJid(check_jid)
-            check_jid_d.addCallback(lambda __: method(*args))
-            d = check_jid_d
+        return cast(
+            Optional[oldmemo.migrations.Session],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteSession(self, bare_jid: str, device_id: int) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def loadActiveDevices(self, bare_jid: str) -> Optional[List[int]]:
+        key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
+
+        return cast(
+            Optional[List[int]],
+            await self.__storage.get(key, None)
+        )
+
+    async def loadInactiveDevices(self, bare_jid: str) -> Optional[Dict[int, int]]:
+        key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
+
+        return cast(
+            Optional[Dict[int, int]],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteActiveDevices(self, bare_jid: str) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def deleteInactiveDevices(self, bare_jid: str) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
 
-        if callback is not None:
-            d.addCallback(partial(callback, True))
-            d.addErrback(partial(callback, False))
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def loadTrust(
+        self,
+        bare_jid: str,
+        device_id: int
+    ) -> Optional[oldmemo.migrations.Trust]:
+        key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
+
+        return cast(
+            Optional[oldmemo.migrations.Trust],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteTrust(self, bare_jid: str, device_id: int) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def listJIDs(self) -> Optional[List[str]]:
+        bare_jids = await self.__storage.get(LegacyStorageImpl.KEY_ALL_JIDS, None)
+
+        return None if bare_jids is None else list(bare_jids)
+
+    async def deleteJIDList(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_ALL_JIDS)
+        except KeyError:
+            pass
+
 
-    def _call(self, callback, method, *args, check_jid=None):
-        """Create Deferred and add Promise callback to it
+async def download_oldmemo_bundle(
+    client: SatXMPPClient,
+    xep_0060: XEP_0060,
+    bare_jid: str,
+    device_id: int
+) -> oldmemo.oldmemo.BundleImpl:
+    """Download the oldmemo bundle corresponding to a specific device.
+
+    @param client: The client.
+    @param xep_0060: The XEP-0060 plugin instance to use for pubsub interactions.
+    @param bare_jid: The bare JID the device belongs to.
+    @param device_id: The id of the device.
+    @return: The bundle.
+    @raise BundleDownloadFailed: if the download failed. Feel free to raise a subclass
+        instead.
+    """
+    # Bundle downloads are needed by the session manager and for migrations from legacy,
+    # thus it is made a separate function.
 
-        This method use reactor.callLater to launch Deferred in main thread
-        @param check_jid: run self._checkJid before method
-        """
-        reactor.callFromThread(
-            self._callMainThread, callback, method, *args, check_jid=check_jid
+    namespace = oldmemo.oldmemo.NAMESPACE
+    node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
+
+    try:
+        items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node)
+    except Exception as e:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}"
+        ) from e
+
+    if len(items) != 1:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}: Unexpected number of items retrieved: {len(items)}."
+        )
+
+    element = next(iter(domish_to_etree(cast(domish.Element, items[0]))), None)
+    if element is None:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}: Item download succeeded but parsing failed: {element}."
         )
 
-    def _checkJid(self, bare_jid):
-        """Check if jid is known, and store it if not
-
-        @param bare_jid(unicode): bare jid to check
-        @return (D): Deferred fired when jid is stored
-        """
-        if bare_jid in self.all_jids:
-            return defer.succeed(None)
-        else:
-            self.all_jids.add(bare_jid)
-            d = self.data.force(KEY_ALL_JIDS, self.all_jids)
-            return d
-
-    def loadOwnData(self, callback):
-        callback(True, {'own_bare_jid': self.own_bare_jid_s,
-                        'own_device_id': self.device_id})
-
-    def storeOwnData(self, callback, own_bare_jid, own_device_id):
-        if own_bare_jid != self.own_bare_jid_s or own_device_id != self.device_id:
-            raise exceptions.InternalError('bare jid or device id inconsistency!')
-        callback(True, None)
-
-    def loadState(self, callback):
-        self._call(callback, self.data.get, KEY_STATE)
-
-    def storeState(self, callback, state):
-        self._call(callback, self.data.force, KEY_STATE, state)
-
-    def loadSession(self, callback, bare_jid, device_id):
-        key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        self._call(callback, self.data.get, key)
-
-    def storeSession(self, callback, bare_jid, device_id, session):
-        key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        self._call(callback, self.data.force, key, session)
-
-    def deleteSession(self, callback, bare_jid, device_id):
-        key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        self._call(callback, self.data.remove, key)
-
-    def loadActiveDevices(self, callback, bare_jid):
-        key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.get, key, {})
-
-    def loadInactiveDevices(self, callback, bare_jid):
-        key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.get, key, {})
-
-    def storeActiveDevices(self, callback, bare_jid, devices):
-        key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
-
-    def storeInactiveDevices(self, callback, bare_jid, devices):
-        key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
-
-    def storeTrust(self, callback, bare_jid, device_id, trust):
-        key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
-        self._call(callback, self.data.force, key, trust)
-
-    def loadTrust(self, callback, bare_jid, device_id):
-        key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
-        self._call(callback, self.data.get, key)
-
-    def listJIDs(self, callback):
-        if callback is not None:
-            callback(True, self.all_jids)
-
-    def _deleteJID_logResults(self, results):
-        failed = [success for success, __ in results if not success]
-        if failed:
-            log.warning(
-                "delete JID failed for {failed_count} on {total_count} operations"
-                .format(failed_count=len(failed), total_count=len(results)))
-        else:
-            log.info(
-                "Delete JID operation succeed ({total_count} operations)."
-                .format(total_count=len(results)))
-
-    def _deleteJID_gotDevices(self, results, bare_jid):
-        assert len(results) == 2
-        active_success, active_devices = results[0]
-        inactive_success, inactive_devices = results[0]
-        d_list = []
-        for success, devices in results:
-            if not success:
-                log.warning("Can't retrieve devices for {bare_jid}: {reason}"
-                    .format(bare_jid=bare_jid, reason=active_devices))
-            else:
-                for device_id in devices:
-                    for key in (KEY_SESSION, KEY_TRUST):
-                        k = '\n'.join([key, bare_jid, str(device_id)])
-                        d_list.append(self.data.remove(k))
-
-        d_list.append(self.data.remove(KEY_ACTIVE_DEVICES, bare_jid))
-        d_list.append(self.data.remove(KEY_INACTIVE_DEVICES, bare_jid))
-        d_list.append(lambda __: self.all_jids.discard(bare_jid))
-        # FIXME: there is a risk of race condition here,
-        #        if self.all_jids is modified between discard and force)
-        d_list.append(lambda __: self.data.force(KEY_ALL_JIDS, self.all_jids))
-        d = defer.DeferredList(d_list)
-        d.addCallback(self._deleteJID_logResults)
-        return d
-
-    def _deleteJID(self, callback, bare_jid):
-        d_list = []
-
-        key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        d_list.append(self.data.get(key, []))
-
-        key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        d_inactive = self.data.get(key, {})
-        # inactive devices are returned as a dict mapping from devices_id to timestamp
-        # but we only need devices ids
-        d_inactive.addCallback(lambda devices: [k for k, __ in devices])
-
-        d_list.append(d_inactive)
-        d = defer.DeferredList(d_list)
-        d.addCallback(self._deleteJID_gotDevices, bare_jid)
-        if callback is not None:
-            self.setCb(d, callback)
-
-    def deleteJID(self, callback, bare_jid):
-        """Retrieve all (in)actives devices of bare_jid, and delete all related keys"""
-        reactor.callFromThread(self._deleteJID, callback, bare_jid)
+    try:
+        return oldmemo.etree.parse_bundle(element, bare_jid, device_id)
+    except Exception as e:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle parsing failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}"
+        ) from e
 
 
-class SatOTPKPolicy(omemo.DefaultOTPKPolicy):
-    pass
+def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]:
+    """
+    @param sat: The SAT instance.
+    @param profile: The profile.
+    @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager`
+        with XMPP interactions and trust handled via the SAT instance.
+    """
+
+    client = sat.getClient(profile)
+    xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
+
+    class SessionManagerImpl(omemo.SessionManager):
+        """
+        Session manager implementation handling XMPP interactions and trust via an
+        instance of :class:`~sat.core.sat_main.SAT`.
+        """
+
+        @staticmethod
+        async def _upload_bundle(bundle: omemo.Bundle) -> None:
+            if isinstance(bundle, twomemo.twomemo.BundleImpl):
+                element = twomemo.etree.serialize_bundle(bundle)
+
+                node = "urn:xmpp:omemo:2:bundles"
+                try:
+                    await xep_0060.sendItem(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        etree_to_domish(element),
+                        item_id=str(bundle.device_id),
+                        extra={
+                            xep_0060.EXTRA_PUBLISH_OPTIONS: {
+                                xep_0060.OPT_MAX_ITEMS: "max"
+                            },
+                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                        }
+                    )
+                except (error.StanzaError, Exception) as e:
+                    if (
+                        isinstance(e, error.StanzaError)
+                        and e.condition == "conflict"
+                        and e.appCondition is not None
+                        # pylint: disable=no-member
+                        and e.appCondition.name == "precondition-not-met"
+                    ):
+                        # publish options couldn't be set on the fly, manually reconfigure
+                        # the node and publish again
+                        raise omemo.BundleUploadFailed(
+                            f"precondition-not-met: {bundle}"
+                        ) from e
+                        # TODO: What can I do here? The correct node configuration is a
+                        # MUST in the XEP.
+
+                    raise omemo.BundleUploadFailed(
+                        f"Bundle upload failed: {bundle}"
+                    ) from e
+
+                return
+
+            if isinstance(bundle, oldmemo.oldmemo.BundleImpl):
+                element = oldmemo.etree.serialize_bundle(bundle)
+
+                node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}"
+                try:
+                    await xep_0060.sendItem(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        etree_to_domish(element),
+                        item_id=xep_0060.ID_SINGLETON,
+                        extra={
+                            xep_0060.EXTRA_PUBLISH_OPTIONS: { xep_0060.OPT_MAX_ITEMS: 1 },
+                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                        }
+                    )
+                except Exception as e:
+                    raise omemo.BundleUploadFailed(
+                        f"Bundle upload failed: {bundle}"
+                    ) from e
+
+                return
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {bundle.namespace}")
+
+        @staticmethod
+        async def _download_bundle(
+            namespace: str,
+            bare_jid: str,
+            device_id: int
+        ) -> omemo.Bundle:
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = "urn:xmpp:omemo:2:bundles"
+
+                try:
+                    items, __ = await xep_0060.getItems(
+                        client,
+                        jid.JID(bare_jid),
+                        node,
+                        max_items=None,
+                        item_ids=[ str(device_id) ]
+                    )
+                except Exception as e:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}"
+                    ) from e
+
+                if len(items) != 1:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}: Unexpected number of items retrieved:"
+                        f" {len(items)}."
+                    )
+
+                element = next(
+                    iter(domish_to_etree(cast(domish.Element, items[0]))),
+                    None
+                )
+                if element is None:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}: Item download succeeded but parsing"
+                        f" failed: {element}."
+                    )
+
+                try:
+                    return twomemo.etree.parse_bundle(element, bare_jid, device_id)
+                except Exception as e:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle parsing failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}"
+                    ) from e
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                return await download_oldmemo_bundle(
+                    client,
+                    xep_0060,
+                    bare_jid,
+                    device_id
+                )
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        async def _delete_bundle(namespace: str, device_id: int) -> None:
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = "urn:xmpp:omemo:2:bundles"
+
+                try:
+                    await xep_0060.retractItems(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        [ str(device_id) ],
+                        notify=False
+                    )
+                except Exception as e:
+                    raise omemo.BundleDeletionFailed(
+                        f"Bundle deletion failed for {device_id} under namespace"
+                        f" {namespace}"
+                    ) from e
+
+                return
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
+
+                try:
+                    await xep_0060.deleteNode(client, client.jid.userhostJID(), node)
+                except Exception as e:
+                    raise omemo.BundleDeletionFailed(
+                        f"Bundle deletion failed for {device_id} under namespace"
+                        f" {namespace}"
+                    ) from e
+
+                return
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        async def _upload_device_list(
+            namespace: str,
+            device_list: Dict[int, Optional[str]]
+        ) -> None:
+            element: Optional[ET.Element] = None
+            node: Optional[str] = None
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                element = twomemo.etree.serialize_device_list(device_list)
+                node = TWOMEMO_DEVICE_LIST_NODE
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                element = oldmemo.etree.serialize_device_list(device_list)
+                node = OLDMEMO_DEVICE_LIST_NODE
+
+            if element is None or node is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+            try:
+                await xep_0060.sendItem(
+                    client,
+                    client.jid.userhostJID(),
+                    node,
+                    etree_to_domish(element),
+                    item_id=xep_0060.ID_SINGLETON,
+                    extra={
+                        xep_0060.EXTRA_PUBLISH_OPTIONS: {
+                            xep_0060.OPT_MAX_ITEMS: 1,
+                            xep_0060.OPT_ACCESS_MODEL: "open"
+                        },
+                        xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                    }
+                )
+            except (error.StanzaError, Exception) as e:
+                if (
+                    isinstance(e, error.StanzaError)
+                    and e.condition == "conflict"
+                    and e.appCondition is not None
+                    # pylint: disable=no-member
+                    and e.appCondition.name == "precondition-not-met"
+                ):
+                    # publish options couldn't be set on the fly, manually reconfigure the
+                    # node and publish again
+                    raise omemo.DeviceListUploadFailed(
+                        f"precondition-not-met for namespace {namespace}"
+                    ) from e
+                    # TODO: What can I do here? The correct node configuration is a MUST
+                    # in the XEP.
+
+                raise omemo.DeviceListUploadFailed(
+                    f"Device list upload failed for namespace {namespace}"
+                ) from e
+
+        @staticmethod
+        async def _download_device_list(
+            namespace: str,
+            bare_jid: str
+        ) -> Dict[int, Optional[str]]:
+            node: Optional[str] = None
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = TWOMEMO_DEVICE_LIST_NODE
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                node = OLDMEMO_DEVICE_LIST_NODE
+
+            if node is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+            try:
+                items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node)
+            except exceptions.NotFound:
+                return {}
+            except Exception as e:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}"
+                ) from e
+
+            if len(items) != 1:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}: Unexpected number of items retrieved: {len(items)}."
+                )
+
+            element = next(iter(domish_to_etree(cast(domish.Element, items[0]))), None)
+            if element is None:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}: Item download succeeded but parsing failed:"
+                    f" {element}."
+                )
+
+            try:
+                if namespace == twomemo.twomemo.NAMESPACE:
+                    return twomemo.etree.parse_device_list(element)
+                if namespace == oldmemo.oldmemo.NAMESPACE:
+                    return oldmemo.etree.parse_device_list(element)
+            except Exception as e:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}"
+                ) from e
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        def _evaluate_custom_trust_level(trust_level_name: str) -> omemo.TrustLevel:
+            try:
+                return TrustLevel(trust_level_name).to_omemo_trust_level()
+            except ValueError as e:
+                raise omemo.UnknownTrustLevel(
+                    f"Unknown trust level name {trust_level_name}"
+                ) from e
+
+        async def _make_trust_decision(
+            self,
+            undecided: FrozenSet[omemo.DeviceInformation],
+            identifier: Optional[str]
+        ) -> None:
+            if identifier is None:
+                raise omemo.TrustDecisionFailed(
+                    "The identifier must contain the feedback JID."
+                )
+
+            # The feedback JID is transferred via the identifier
+            feedback_jid = jid.JID(identifier).userhostJID()
+
+            # Get the name of the trust model to use
+            trust_model = cast(str, sat.memory.getParamA(
+                PARAM_NAME,
+                PARAM_CATEGORY,
+                profile_key=cast(str, client.profile)
+            ))
+
+            # Under the BTBV trust model, if at least one device of a bare JID is manually
+            # trusted or distrusted, the trust model is "downgraded" to manual trust.
+            # Thus, we can separate bare JIDs into two pools here, one pool of bare JIDs
+            # for which BTBV is active, and one pool of bare JIDs for which manual trust
+            # is used.
+            bare_jids = { device.bare_jid for device in undecided }
+
+            btbv_bare_jids: Set[str] = set()
+            manual_trust_bare_jids: Set[str] = set()
+
+            if trust_model == "btbv":
+                # For each bare JID, decide whether BTBV or manual trust applies
+                for bare_jid in bare_jids:
+                    # Get all known devices belonging to the bare JID
+                    devices = await self.get_device_information(bare_jid)
+
+                    # If the trust levels of all devices correspond to those used by BTBV,
+                    # BTBV applies. Otherwise, fall back to manual trust.
+                    if all(TrustLevel(device.trust_level_name) in {
+                        TrustLevel.UNDECIDED,
+                        TrustLevel.BLINDLY_TRUSTED
+                    } for device in devices):
+                        btbv_bare_jids.add(bare_jid)
+                    else:
+                        manual_trust_bare_jids.add(bare_jid)
+
+            if trust_model == "manual":
+                manual_trust_bare_jids = bare_jids
+
+            # With the JIDs sorted into their respective pools, the undecided devices can
+            # be categorized too
+            blindly_trusted_devices = \
+                { dev for dev in undecided if dev.bare_jid in btbv_bare_jids }
+            manually_trusted_devices = \
+                { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids }
+
+            # Blindly trust devices handled by BTBV
+            if len(blindly_trusted_devices) > 0:
+                for device in blindly_trusted_devices:
+                    await self.set_trust(
+                        device.bare_jid,
+                        device.identity_key,
+                        TrustLevel.BLINDLY_TRUSTED.name
+                    )
+
+                blindly_trusted_devices_stringified = ", ".join([
+                    f"device {device.device_id} of {device.bare_jid} under namespace"
+                    f" {device.namespaces}"
+                    for device
+                    in blindly_trusted_devices
+                ])
+
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        "Not all destination devices are trusted, unknown devices will be"
+                        " blindly trusted due to the Blind Trust Before Verification"
+                        " policy. If you want a more secure workflow, please activate the"
+                        " \"manual\" policy in the settings' \"Security\" tab.\nFollowing"
+                        " devices have been automatically trusted:"
+                        f" {blindly_trusted_devices_stringified}."
+                    )
+                )
+
+            # Prompt the user for manual trust decisions on the devices handled by manual
+            # trust
+            if len(manually_trusted_devices) > 0:
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        "Not all destination devices are trusted, we can't encrypt"
+                        " message in such a situation. Please indicate if you trust"
+                        " those devices or not in the trust manager before we can"
+                        " send this message."
+                    )
+                )
+                await self.__prompt_manual_trust(
+                    frozenset(manually_trusted_devices),
+                    feedback_jid
+                )
+
+        @staticmethod
+        async def _send_message(message: omemo.Message, bare_jid: str) -> None:
+            element: Optional[ET.Element] = None
+
+            if message.namespace == twomemo.twomemo.NAMESPACE:
+                element = twomemo.etree.serialize_message(message)
+            if message.namespace == oldmemo.oldmemo.NAMESPACE:
+                element = oldmemo.etree.serialize_message(message)
+
+            if element is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}")
+
+            # TODO: Untested
+            message_data = client.generateMessageXML(MessageData({
+                "from": client.jid,
+                "to": jid.JID(bare_jid),
+                "uid": str(uuid.uuid4()),
+                "message": {},
+                "subject": {},
+                "type": C.MESS_TYPE_CHAT,
+                "extra": {},
+                "timestamp": time.time()
+            }))
+
+            message_data["xml"].addChild(etree_to_domish(element))
+
+            try:
+                await client.send(message_data["xml"])
+            except Exception as e:
+                raise omemo.MessageSendingFailed() from e
+
+        async def __prompt_manual_trust(
+            self,
+            undecided: FrozenSet[omemo.DeviceInformation],
+            feedback_jid: jid.JID
+        ) -> None:
+            """Asks the user to decide on the manual trust level of a set of devices.
+
+            Blocks until the user has made a decision and updates the trust levels of all
+            devices using :meth:`set_trust`.
+
+            @param undecided: The set of devices to prompt manual trust for.
+            @param feedback_jid: The bare JID to redirect feedback to. In case of a one to
+                one message, the recipient JID. In case of a MUC message, the room JID.
+            @raise TrustDecisionFailed: if the user cancels the prompt.
+            """
+
+            # This session manager handles encryption with both twomemo and oldmemo, but
+            # both are currently registered as different plugins and the `deferXMLUI`
+            # below requires a single namespace identifying the encryption plugin. Thus,
+            # get the namespace of the requested encryption method from the encryption
+            # session using the feedback JID.
+            encryption = client.encryption.getSession(feedback_jid)
+            if encryption is None:
+                raise omemo.TrustDecisionFailed(
+                    f"Encryption not requested for {feedback_jid.userhost()}."
+                )
+
+            namespace = encryption["plugin"].namespace
+
+            # Casting this to Any, otherwise all calls on the variable cause type errors
+            # pylint: disable=no-member
+            trust_ui = cast(Any, xml_tools.XMLUI(
+                panel_type=C.XMLUI_FORM,
+                title=D_("OMEMO trust management"),
+                submit_id=""
+            ))
+            trust_ui.addText(D_(
+                "This is OMEMO trusting system. You'll see below the devices of your "
+                "contacts, and a checkbox to trust them or not. A trusted device "
+                "can read your messages in plain text, so be sure to only validate "
+                "devices that you are sure are belonging to your contact. It's better "
+                "to do this when you are next to your contact and their device, so "
+                "you can check the \"fingerprint\" (the number next to the device) "
+                "yourself. Do *not* validate a device if the fingerprint is wrong!"
+            ))
+
+            own_device, __ = await self.get_own_device_information()
+
+            trust_ui.changeContainer("label")
+            trust_ui.addLabel(D_("This device ID"))
+            trust_ui.addText(str(own_device.device_id))
+            trust_ui.addLabel(D_("This device's fingerprint"))
+            trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key)))
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+            # At least sort the devices by bare JID such that they aren't listed
+            # completely random
+            undecided_ordered = sorted(undecided, key=lambda device: device.bare_jid)
+
+            for index, device in enumerate(undecided_ordered):
+                trust_ui.addLabel(D_("Contact"))
+                trust_ui.addJid(jid.JID(device.bare_jid))
+                trust_ui.addLabel(D_("Device ID"))
+                trust_ui.addText(str(device.device_id))
+                trust_ui.addLabel(D_("Fingerprint"))
+                trust_ui.addText(" ".join(self.format_identity_key(device.identity_key)))
+                trust_ui.addLabel(D_("Trust this device?"))
+                trust_ui.addBool(f"trust_{index}", value=C.boolConst(False))
+                trust_ui.addEmpty()
+                trust_ui.addEmpty()
+
+            trust_ui_result = await xml_tools.deferXMLUI(
+                sat,
+                trust_ui,
+                action_extra={ "meta_encryption_trust": namespace },
+                profile=profile
+            )
+
+            if C.bool(trust_ui_result.get("cancelled", "false")):
+                raise omemo.TrustDecisionFailed("Trust UI cancelled.")
+
+            data_form_result = cast(Dict[str, str], xml_tools.XMLUIResult2DataFormResult(
+                trust_ui_result
+            ))
+
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
+
+                device = undecided_ordered[int(key[len("trust_"):])]
+                trust = C.bool(value)
+
+                await self.set_trust(
+                    device.bare_jid,
+                    device.identity_key,
+                    TrustLevel.TRUSTED.name if trust else TrustLevel.DISTRUSTED.name
+                )
+
+    return SessionManagerImpl
 
 
-class OmemoSession:
-    """Wrapper to use omemo.OmemoSession with Deferred"""
-
-    def __init__(self, session):
-        self._session = session
-
-    @property
-    def republish_bundle(self):
-        return self._session.republish_bundle
-
-    @property
-    def public_bundle(self):
-        return self._session.public_bundle
+async def prepare_for_profile(
+    sat: SAT,
+    profile: str,
+    initial_own_label: Optional[str],
+    signed_pre_key_rotation_period: int = 7 * 24 * 60 * 60,
+    pre_key_refill_threshold: int = 99,
+    max_num_per_session_skipped_keys: int = 1000,
+    max_num_per_message_skipped_keys: Optional[int] = None
+) -> omemo.SessionManager:
+    """Prepare the OMEMO library (storage, backends, core) for a specific profile.
 
-    @classmethod
-    def create(cls, client, storage, my_device_id = None):
-        omemo_session_p = omemo.SessionManager.create(
-            storage,
-            SatOTPKPolicy,
-            omemo_backend,
-            client.jid.userhost(),
-            my_device_id)
-        d = promise2Deferred(omemo_session_p)
-        d.addCallback(lambda session: cls(session))
-        return d
-
-    def newDeviceList(self, jid, devices):
-        jid = jid.userhost()
-        new_device_p = self._session.newDeviceList(jid, devices)
-        return promise2Deferred(new_device_p)
+    @param sat: The SAT instance.
+    @param profile: The profile.
+    @param initial_own_label: The initial (optional) label to assign to this device if
+        supported by any of the backends.
+    @param signed_pre_key_rotation_period: The rotation period for the signed pre key, in
+        seconds. The rotation period is recommended to be between one week (the default)
+        and one month.
+    @param pre_key_refill_threshold: The number of pre keys that triggers a refill to 100.
+        Defaults to 99, which means that each pre key gets replaced with a new one right
+        away. The threshold can not be configured to lower than 25.
+    @param max_num_per_session_skipped_keys: The maximum number of skipped message keys to
+        keep around per session. Once the maximum is reached, old message keys are deleted
+        to make space for newer ones. Accessible via
+        :attr:`max_num_per_session_skipped_keys`.
+    @param max_num_per_message_skipped_keys: The maximum number of skipped message keys to
+        accept in a single message. When set to ``None`` (the default), this parameter
+        defaults to the per-session maximum (i.e. the value of the
+        ``max_num_per_session_skipped_keys`` parameter). This parameter may only be 0 if
+        the per-session maximum is 0, otherwise it must be a number between 1 and the
+        per-session maximum. Accessible via :attr:`max_num_per_message_skipped_keys`.
+    @return: A session manager with ``urn:xmpp:omemo:2`` and
+        ``eu.siacs.conversations.axolotl`` capabilities, specifically for the given
+        profile.
+    @raise BundleUploadFailed: if a bundle upload failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise BundleDownloadFailed: if a bundle download failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise BundleDeletionFailed: if a bundle deletion failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise DeviceListUploadFailed: if a device list upload failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    """
 
-    def getDevices(self, bare_jid=None):
-        bare_jid = bare_jid.userhost()
-        get_devices_p = self._session.getDevices(bare_jid=bare_jid)
-        return promise2Deferred(get_devices_p)
+    client = sat.getClient(profile)
+    xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
 
-    def buildSession(self, bare_jid, device, bundle):
-        bare_jid = bare_jid.userhost()
-        build_session_p = self._session.buildSession(bare_jid, int(device), bundle)
-        return promise2Deferred(build_session_p)
-
-    def deleteSession(self, bare_jid, device):
-        bare_jid = bare_jid.userhost()
-        delete_session_p = self._session.deleteSession(
-            bare_jid=bare_jid, device=int(device))
-        return promise2Deferred(delete_session_p)
-
-    def encryptMessage(self, bare_jids, message, bundles=None, expect_problems=None):
-        """Encrypt a message
+    storage = StorageImpl(profile)
 
-        @param bare_jids(iterable[jid.JID]): destinees of the message
-        @param message(unicode): message to encode
-        @param bundles(dict[jid.JID, dict[int, ExtendedPublicBundle]):
-            entities => devices => bundles map
-        @return D(dict): encryption data
-        """
-        bare_jids = [e.userhost() for e in bare_jids]
-        if bundles is not None:
-            bundles = {e.userhost(): v for e, v in bundles.items()}
-        encrypt_mess_p = self._session.encryptMessage(
-            bare_jids=bare_jids,
-            plaintext=message.encode(),
-            bundles=bundles,
-            expect_problems=expect_problems)
-        return promise2Deferred(encrypt_mess_p)
+    # TODO: Untested
+    await oldmemo.migrations.migrate(
+        LegacyStorageImpl(profile, client.jid.userhost()),
+        storage,
+        # TODO: Do we want BLINDLY_TRUSTED or TRUSTED here?
+        TrustLevel.BLINDLY_TRUSTED.name,
+        TrustLevel.UNDECIDED.name,
+        TrustLevel.DISTRUSTED.name,
+        lambda bare_jid, device_id: download_oldmemo_bundle(
+            client,
+            xep_0060,
+            bare_jid,
+            device_id
+        )
+    )
 
-    def encryptRatchetForwardingMessage(
-        self, bare_jids, bundles=None, expect_problems=None):
-        bare_jids = [e.userhost() for e in bare_jids]
-        if bundles is not None:
-            bundles = {e.userhost(): v for e, v in bundles.items()}
-        encrypt_ratchet_fwd_p = self._session.encryptRatchetForwardingMessage(
-            bare_jids=bare_jids,
-            bundles=bundles,
-            expect_problems=expect_problems)
-        return promise2Deferred(encrypt_ratchet_fwd_p)
-
-    def decryptMessage(self, bare_jid, device, iv, message, is_pre_key_message,
-                       ciphertext, additional_information=None, allow_untrusted=False):
-        bare_jid = bare_jid.userhost()
-        decrypt_mess_p = self._session.decryptMessage(
-            bare_jid=bare_jid,
-            device=int(device),
-            iv=iv,
-            message=message,
-            is_pre_key_message=is_pre_key_message,
-            ciphertext=ciphertext,
-            additional_information=additional_information,
-            allow_untrusted=allow_untrusted
+    session_manager = await make_session_manager(sat, profile).create(
+        [
+            twomemo.Twomemo(
+                storage,
+                max_num_per_session_skipped_keys,
+                max_num_per_message_skipped_keys
+            ),
+            oldmemo.Oldmemo(
+                storage,
+                max_num_per_session_skipped_keys,
+                max_num_per_message_skipped_keys
             )
-        return promise2Deferred(decrypt_mess_p)
+        ],
+        storage,
+        client.jid.userhost(),
+        initial_own_label,
+        TrustLevel.UNDECIDED.value,
+        signed_pre_key_rotation_period,
+        pre_key_refill_threshold,
+        omemo.AsyncFramework.TWISTED
+    )
 
-    def decryptRatchetForwardingMessage(
-        self, bare_jid, device, iv, message, is_pre_key_message,
-        additional_information=None, allow_untrusted=False):
-        bare_jid = bare_jid.userhost()
-        decrypt_ratchet_fwd_p = self._session.decryptRatchetForwardingMessage(
-            bare_jid=bare_jid,
-            device=int(device),
-            iv=iv,
-            message=message,
-            is_pre_key_message=is_pre_key_message,
-            additional_information=additional_information,
-            allow_untrusted=allow_untrusted
-            )
-        return promise2Deferred(decrypt_ratchet_fwd_p)
+    # This shouldn't hurt here since we're not running on overly constrainted devices.
+    # TODO: Consider ensuring data consistency regularly/in response to certain events
+    await session_manager.ensure_data_consistency()
 
-    def setTrust(self, bare_jid, device, key, trusted):
-        bare_jid = bare_jid.userhost()
-        setTrust_p = self._session.setTrust(
-            bare_jid=bare_jid,
-            device=int(device),
-            key=key,
-            trusted=trusted,
-        )
-        return promise2Deferred(setTrust_p)
+    # TODO: Correct entering/leaving of the history synchronization mode isn't terribly
+    # important for now, since it only prevents an extremely unlikely race condition of
+    # multiple devices choosing the same pre key for new sessions while the device was
+    # offline. I don't believe other clients seriously defend against that race condition
+    # either. In the long run, it might still be cool to have triggers for when history
+    # sync starts and ends (MAM, MUC catch-up, etc.) and to react to those triggers.
+    await session_manager.after_history_sync()
+
+    return session_manager
+
 
-    def resetTrust(self, bare_jid, device):
-        bare_jid = bare_jid.userhost()
-        resetTrust_p = self._session.resetTrust(
-            bare_jid=bare_jid,
-            device=int(device),
-        )
-        return promise2Deferred(resetTrust_p)
-
-    def getTrustForJID(self, bare_jid):
-        bare_jid = bare_jid.userhost()
-        get_trust_p = self._session.getTrustForJID(bare_jid=bare_jid)
-        return promise2Deferred(get_trust_p)
+DEFAULT_TRUST_MODEL_PARAM = f"""
+<params>
+<individual>
+<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
+    <param name="{PARAM_NAME}"
+        label={quoteattr(D_('OMEMO default trust policy'))}
+        type="list" security="3">
+        <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+        <option value="btbv"
+            label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
+            selected="true" />
+    </param>
+</category>
+</individual>
+</params>
+"""
 
 
 class OMEMO:
+    """
+    Plugin equipping Libervia with OMEMO capabilities under the (modern)
+    ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl``
+    namespace. Both versions of the protocol are handled by this plugin and compatibility
+    between the two is maintained. MUC messages are supported next to one to one messages.
+    For trust management, the two trust models "BTBV" and "manual" are supported.
+    """
 
-    params = """
-    <params>
-    <individual>
-    <category name="{category_name}" label="{category_label}">
-        <param name="{param_name}" label={param_label} type="list" security="3">
-            <option value="manual" label={opt_manual_lbl} />
-            <option value="btbv" label={opt_btbv_lbl} selected="true" />
-        </param>
-     </category>
-    </individual>
-    </params>
-    """.format(
-        category_name=PARAM_CATEGORY,
-        category_label=D_("Security"),
-        param_name=PARAM_NAME,
-        param_label=quoteattr(D_("OMEMO default trust policy")),
-        opt_manual_lbl=quoteattr(D_("Manual trust (more secure)")),
-        opt_btbv_lbl=quoteattr(
-            D_("Blind Trust Before Verification (more user friendly)")),
+    # For MUC/MIX message stanzas, the <to/> affix is a MUST
+    SCE_PROFILE_GROUPCHAT = SCEProfile(
+        rpad_policy=SCEAffixPolicy.REQUIRED,
+        time_policy=SCEAffixPolicy.OPTIONAL,
+        to_policy=SCEAffixPolicy.REQUIRED,
+        from_policy=SCEAffixPolicy.OPTIONAL,
+        custom_policies={}
+    )
+
+    # For everything but MUC/MIX message stanzas, the <to/> affix is a MAY
+    SCE_PROFILE = SCEProfile(
+        rpad_policy=SCEAffixPolicy.REQUIRED,
+        time_policy=SCEAffixPolicy.OPTIONAL,
+        to_policy=SCEAffixPolicy.OPTIONAL,
+        from_policy=SCEAffixPolicy.OPTIONAL,
+        custom_policies={}
     )
 
-    def __init__(self, host):
-        log.info(_("OMEMO plugin initialization (omemo module v{version})").format(
-            version=omemo.__version__))
-        version = tuple(map(int, omemo.__version__.split('.')[:3]))
-        if version < OMEMO_MIN_VER:
-            log.warning(_(
-                "Your version of omemo module is too old: {v[0]}.{v[1]}.{v[2]} is "
-                "minimum required, please update.").format(v=OMEMO_MIN_VER))
-            raise exceptions.CancelError("module is too old")
-        self.host = host
-        host.memory.updateParams(self.params)
-        self._p_hints = host.plugins["XEP-0334"]
-        self._p_carbons = host.plugins["XEP-0280"]
-        self._p = host.plugins["XEP-0060"]
-        self._m = host.plugins.get("XEP-0045")
-        self._sid = host.plugins.get("XEP-0359")
-        host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=100050)
-        host.trigger.add("sendMessageData", self._sendMessageDataTrigger)
-        self.host.registerEncryptionPlugin(self, "OMEMO", NS_OMEMO, 100)
-        pep = host.plugins['XEP-0163']
-        pep.addPEPEvent(
-            "OMEMO_DEVICES", NS_OMEMO_DEVICES,
-            lambda itemsEvent, profile: defer.ensureDeferred(
-                self.onNewDevices(itemsEvent, profile))
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Add configuration option to choose between manual trust and BTBV as the trust
+        # model
+        sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM)
+
+        # Plugins
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
+        self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359"))
+        self.__xep_0420 = cast(XEP_0420, sat.plugins["XEP-0420"])
+
+        # In contrast to one to one messages, MUC messages are reflected to the sender.
+        # Thus, the sender does not add messages to their local message log when sending
+        # them, but when the reflection is received. This approach does not pair well with
+        # OMEMO, since for security reasons it is forbidden to encrypt messages for the
+        # own device. Thus, when the reflection of an OMEMO message is received, it can't
+        # be decrypted and added to the local message log as usual. To counteract this,
+        # the plaintext of encrypted messages sent to MUCs are cached in this field, such
+        # that when the reflection is received, the plaintext can be looked up from the
+        # cache and added to the local message log.
+        # TODO: The old plugin expired this cache after some time. I'm not sure that's
+        # really necessary.
+        self.__muc_plaintext_cache: Dict[MUCPlaintextCacheKey, bytes] = {}
+
+        # Mapping from profile name to corresponding session manager
+        self.__session_managers: Dict[str, omemo.SessionManager] = {}
+
+        # Calls waiting for a specific session manager to be built
+        self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {}
+
+        # These triggers are used by oldmemo, which doesn't do SCE and only applies to
+        # messages
+        sat.trigger.add(
+            "messageReceived",
+            self.__message_received_trigger,
+            priority=100050
         )
+        sat.trigger.add(
+            "sendMessageData",
+            self.__send_message_data_trigger,
+            priority=100050
+        )
+
+        # These triggers are used by twomemo, which does do SCE
+        sat.trigger.add("send", self.__send_trigger, priority=0)
+        # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas,
+        # including IQs.
+
+        # Give twomemo a (slightly) higher priority than oldmemo
+        sat.registerEncryptionPlugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101)
+        sat.registerEncryptionPlugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100)
+
+        xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
+        xep_0163.addPEPEvent(
+            "TWOMEMO_DEVICES",
+            TWOMEMO_DEVICE_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_device_list_update(items_event, profile)
+            )
+        )
+        xep_0163.addPEPEvent(
+            "OLDMEMO_DEVICES",
+            OLDMEMO_DEVICE_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_device_list_update(items_event, profile)
+            )
+        )
+
         try:
-            self.text_cmds = self.host.plugins[C.TEXT_CMDS]
+            self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS])
         except KeyError:
             log.info(_("Text commands not available"))
         else:
-            self.text_cmds.registerTextCommands(self)
+            self.__text_commands.registerTextCommands(self)
 
-    # Text commands #
+    async def profileConnected(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient
+    ) -> None:
+        """
+        @param client: The client.
+        """
 
-    async def cmd_omemo_reset(self, client, mess_data):
-        """reset OMEMO session (use only if encryption is broken)
+        await self.__prepare_for_profile(cast(str, client.profile))
+
+    async def cmd_omemo_reset(
+        self,
+        client: SatXMPPClient,
+        mess_data: MessageData
+    ) -> Literal[False]:
+        """Reset all sessions of devices that belong to the recipient of ``mess_data``.
 
-        @command(one2one):
+        This must only be callable manually by the user. Use this when a session is
+        apparently broken, i.e. sending and receiving encrypted messages doesn't work and
+        something being wrong has been confirmed manually with the recipient.
+
+        @param client: The client.
+        @param mess_data: The message data, whose ``to`` attribute will be the bare JID to
+            reset all sessions with.
+        @return: The constant value ``False``, indicating to the text commands plugin that
+            the message is not supposed to be sent.
         """
-        if not client.encryption.isEncryptionRequested(mess_data, NS_OMEMO):
-            feedback = _(
-                "You need to have OMEMO encryption activated to reset the session")
-            self.text_cmds.feedBack(client, feedback, mess_data)
+
+        twomemo_requested = \
+            client.encryption.isEncryptionRequested(mess_data, twomemo.twomemo.NAMESPACE)
+        oldmemo_requested = \
+            client.encryption.isEncryptionRequested(mess_data, oldmemo.oldmemo.NAMESPACE)
+
+        if not (twomemo_requested or oldmemo_requested):
+            self.__text_commands.feedBack(
+                client,
+                _("You need to have OMEMO encryption activated to reset the session"),
+                mess_data
+            )
             return False
-        to_jid = mess_data["to"].userhostJID()
-        session = client._xep_0384_session
-        devices = await session.getDevices(to_jid)
+
+        bare_jid = mess_data["to"].userhost()
+
+        session_manager = await self.__prepare_for_profile(client.profile)
+        devices = await session_manager.get_device_information(bare_jid)
 
-        for device in devices['active']:
-            log.debug(f"deleting session for device {device}")
-            await session.deleteSession(to_jid, device=device)
+        for device in devices:
+            log.debug(f"Replacing sessions with device {device}")
+            await session_manager.replace_sessions(device)
 
-        log.debug("Sending an empty message to trigger key exchange")
-        await client.sendMessage(to_jid, {'': ''})
+        self.__text_commands.feedBack(
+            client,
+            _("OMEMO session has been reset"),
+            mess_data
+        )
 
-        feedback = _("OMEMO session has been reset")
-        self.text_cmds.feedBack(client, feedback, mess_data)
         return False
 
-    async def trustUICb(
-            self, xmlui_data, trust_data, expect_problems=None, profile=C.PROF_KEY_NONE):
-        if C.bool(xmlui_data.get('cancelled', 'false')):
-            return {}
-        client = self.host.getClient(profile)
-        session = client._xep_0384_session
-        stored_data = client._xep_0384_data
-        manual_trust = await stored_data.get(KEY_MANUAL_TRUST, set())
-        auto_trusted_cache = {}
-        answer = xml_tools.XMLUIResult2DataFormResult(xmlui_data)
-        blind_trust = C.bool(answer.get('blind_trust', C.BOOL_FALSE))
-        for key, value in answer.items():
-            if key.startswith('trust_'):
-                trust_id = key[6:]
-            else:
-                continue
-            data = trust_data[trust_id]
-            if blind_trust:
-                # user request to restore blind trust for this entity
-                # so if the entity is present in manual trust, we remove it
-                if data["jid"].full() in manual_trust:
-                    manual_trust.remove(data["jid"].full())
-                    await stored_data.aset(KEY_MANUAL_TRUST, manual_trust)
-            elif data["jid"].full() not in manual_trust:
-                # validating this trust UI implies that we activate manual mode for
-                # this entity (used for BTBV policy)
-                manual_trust.add(data["jid"].full())
-                await stored_data.aset(KEY_MANUAL_TRUST, manual_trust)
-            trust = C.bool(value)
+    async def getTrustUI(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        if entity.resource:
+            raise ValueError("A bare JID is expected.")
 
-            if not trust:
-                # if device is not trusted, we check if it must be removed from auto
-                # trusted devices list
-                bare_jid_s = data['jid'].userhost()
-                key = f"{KEY_AUTO_TRUST}\n{bare_jid_s}"
-                if bare_jid_s not in auto_trusted_cache:
-                    auto_trusted_cache[bare_jid_s] = await stored_data.get(
-                        key, default=set())
-                auto_trusted = auto_trusted_cache[bare_jid_s]
-                if data['device'] in auto_trusted:
-                    # as we don't trust this device anymore, we can remove it from the
-                    # list of automatically trusted devices
-                    auto_trusted.remove(data['device'])
-                    await stored_data.aset(key, auto_trusted)
-                    log.info(D_(
-                        "device {device} from {peer_jid} is not an auto-trusted device "
-                        "anymore").format(device=data['device'], peer_jid=bare_jid_s))
+        bare_jids: Set[str]
+        if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity):
+            bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
+        else:
+            bare_jids = { entity.userhost() }
+
+        session_manager = await self.__prepare_for_profile(client.profile)
 
-            await session.setTrust(
-                data["jid"],
-                data["device"],
-                data["ik"],
-                trusted=trust,
-            )
-            if not trust and expect_problems is not None:
-                expect_problems.setdefault(data['jid'].userhost(), set()).add(
-                    data['device']
-                )
-        return {}
-
-    async def getTrustUI(self, client, entity_jid=None, trust_data=None, submit_id=None):
-        """Generate a XMLUI to manage trust
+        # At least sort the devices by bare JID such that they aren't listed completely
+        # random
+        devices = sorted(cast(Set[omemo.DeviceInformation], set()).union(*[
+            await session_manager.get_device_information(bare_jid)
+            for bare_jid
+            in bare_jids
+        ]), key=lambda device: device.bare_jid)
 
-        @param entity_jid(None, jid.JID): jid of entity to manage
-            None to use trust_data
-        @param trust_data(None, dict): devices data:
-            None to use entity_jid
-            else a dict mapping from trust ids (unicode) to devices data,
-            where a device data must have the following keys:
-                - jid(jid.JID): bare jid of the device owner
-                - device(int): device id
-                - ik(bytes): identity key
-            and may have the following key:
-                - trusted(bool): True if device is trusted
-        @param submit_id(None, unicode): submit_id to use
-            if None set UI callback to trustUICb
-        @return D(xmlui): trust management form
-        """
-        # we need entity_jid xor trust_data
-        assert entity_jid and not trust_data or not entity_jid and trust_data
-        if entity_jid and entity_jid.resource:
-            raise ValueError("A bare jid is expected")
+        async def callback(
+            data: Any,
+            profile: str  # pylint: disable=unused-argument
+        ) -> Dict[Never, Never]:
+            """
+            @param data: The XMLUI result produces by the trust UI form.
+            @param profile: The profile.
+            @return: An empty dictionary. The type of the return value was chosen
+                conservatively since the exact options are neither known not needed here.
+            """
 
-        session = client._xep_0384_session
-        stored_data = client._xep_0384_data
+            if C.bool(data.get("cancelled", "false")):
+                return {}
+
+            data_form_result = cast(
+                Dict[str, str],
+                xml_tools.XMLUIResult2DataFormResult(data)
+            )
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
 
-        if trust_data is None:
-            cache = client._xep_0384_cache.setdefault(entity_jid, {})
-            trust_data = {}
-            if self._m is not None and self._m.isJoinedRoom(client, entity_jid):
-                trust_jids = self.getJIDsForRoom(client, entity_jid)
-            else:
-                trust_jids = [entity_jid]
-            for trust_jid in trust_jids:
-                trust_session_data = await session.getTrustForJID(trust_jid)
-                bare_jid_s = trust_jid.userhost()
-                for device_id, trust_info in trust_session_data['active'].items():
-                    if trust_info is None:
-                        # device has never been (un)trusted, we have to retrieve its
-                        # fingerprint (i.e. identity key or "ik") through public bundle
-                        if device_id not in cache:
-                            bundles, missing = await self.getBundles(client,
-                                                                     trust_jid,
-                                                                     [device_id])
-                            if device_id not in bundles:
-                                log.warning(_(
-                                    "Can't find bundle for device {device_id} of user "
-                                    "{bare_jid}, ignoring").format(device_id=device_id,
-                                                                    bare_jid=bare_jid_s))
-                                continue
-                            cache[device_id] = bundles[device_id]
-                        # TODO: replace False below by None when undecided
-                        #       trusts are handled
-                        trust_info = {
-                            "key": cache[device_id].ik,
-                            "trusted": False
-                        }
+                device = devices[int(key[len("trust_"):])]
+                trust = TrustLevel(value)
+
+                if TrustLevel(device.trust_level_name) is not trust:
+                    await session_manager.set_trust(
+                        device.bare_jid,
+                        device.identity_key,
+                        value
+                    )
+
+            return {}
 
-                    ik = trust_info["key"]
-                    trust_id = str(hash((bare_jid_s, device_id, ik)))
-                    trust_data[trust_id] = {
-                        "jid": trust_jid,
-                        "device": device_id,
-                        "ik": ik,
-                        "trusted": trust_info["trusted"],
-                        }
+        submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True)
 
-        if submit_id is None:
-            submit_id = self.host.registerCallback(
-                lambda data, profile: defer.ensureDeferred(
-                    self.trustUICb(data, trust_data=trust_data, profile=profile)),
-                with_data=True,
-                one_shot=True)
-        xmlui = xml_tools.XMLUI(
-            panel_type = C.XMLUI_FORM,
-            title = D_("OMEMO trust management"),
-            submit_id = submit_id
+        result = xml_tools.XMLUI(
+            panel_type=C.XMLUI_FORM,
+            title=D_("OMEMO trust management"),
+            submit_id=submit_id
         )
-        xmlui.addText(D_(
+        # Casting this to Any, otherwise all calls on the variable cause type errors
+        # pylint: disable=no-member
+        trust_ui = cast(Any, result)
+        trust_ui.addText(D_(
             "This is OMEMO trusting system. You'll see below the devices of your "
-            "contacts, and a checkbox to trust them or not. A trusted device "
+            "contacts, and a list selection to trust them or not. A trusted device "
             "can read your messages in plain text, so be sure to only validate "
             "devices that you are sure are belonging to your contact. It's better "
-            "to do this when you are next to your contact and her/his device, so "
+            "to do this when you are next to your contact and their device, so "
             "you can check the \"fingerprint\" (the number next to the device) "
-            "yourself. Do *not* validate a device if the fingerprint is wrong!"))
+            "yourself. Do *not* validate a device if the fingerprint is wrong!"
+        ))
+
+        own_device, __ = await session_manager.get_own_device_information()
 
-        xmlui.changeContainer("label")
-        xmlui.addLabel(D_("This device ID"))
-        xmlui.addText(str(client._xep_0384_device_id))
-        xmlui.addLabel(D_("This device fingerprint"))
-        ik_hex = session.public_bundle.ik.hex().upper()
-        fp_human = ' '.join([ik_hex[i:i+8] for i in range(0, len(ik_hex), 8)])
-        xmlui.addText(fp_human)
-        xmlui.addEmpty()
-        xmlui.addEmpty()
-
-        if entity_jid is not None:
-            omemo_policy = self.host.memory.getParamA(
-                PARAM_NAME, PARAM_CATEGORY, profile_key=client.profile
-            )
-            if omemo_policy == 'btbv':
-                xmlui.addLabel(D_("Automatically trust new devices?"))
-                # blind trust is always disabled when UI is requested
-                # as submitting UI is a verification which should disable it.
-                xmlui.addBool("blind_trust", value=C.BOOL_FALSE)
-                xmlui.addEmpty()
-                xmlui.addEmpty()
+        trust_ui.changeContainer("label")
+        trust_ui.addLabel(D_("This device ID"))
+        trust_ui.addText(str(own_device.device_id))
+        trust_ui.addLabel(D_("This device's fingerprint"))
+        trust_ui.addText(" ".join(session_manager.format_identity_key(
+            own_device.identity_key
+        )))
+        trust_ui.addEmpty()
+        trust_ui.addEmpty()
 
-        auto_trust_cache = {}
+        for index, device in enumerate(devices):
+            trust_ui.addLabel(D_("Contact"))
+            trust_ui.addJid(jid.JID(device.bare_jid))
+            trust_ui.addLabel(D_("Device ID"))
+            trust_ui.addText(str(device.device_id))
+            trust_ui.addLabel(D_("Fingerprint"))
+            trust_ui.addText(" ".join(session_manager.format_identity_key(
+                device.identity_key
+            )))
+            trust_ui.addLabel(D_("Trust this device?"))
 
-        for trust_id, data in trust_data.items():
-            bare_jid_s = data['jid'].userhost()
-            if bare_jid_s not in auto_trust_cache:
-                key = f"{KEY_AUTO_TRUST}\n{bare_jid_s}"
-                auto_trust_cache[bare_jid_s] = await stored_data.get(key, set())
-            xmlui.addLabel(D_("Contact"))
-            xmlui.addJid(data['jid'])
-            xmlui.addLabel(D_("Device ID"))
-            xmlui.addText(str(data['device']))
-            xmlui.addLabel(D_("Fingerprint"))
-            ik_hex = data['ik'].hex().upper()
-            fp_human = ' '.join([ik_hex[i:i+8] for i in range(0, len(ik_hex), 8)])
-            xmlui.addText(fp_human)
-            xmlui.addLabel(D_("Trust this device?"))
-            xmlui.addBool("trust_{}".format(trust_id),
-                          value=C.boolConst(data.get('trusted', False)))
-            if data['device'] in auto_trust_cache[bare_jid_s]:
-                xmlui.addEmpty()
-                xmlui.addLabel(D_("(automatically trusted)"))
+            current_trust_level = TrustLevel(device.trust_level_name)
+            avaiable_trust_levels = \
+                { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
 
-
-            xmlui.addEmpty()
-            xmlui.addEmpty()
-
-        return xmlui
+            trust_ui.addList(
+                f"trust_{index}",
+                options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+                selected=current_trust_level.name,
+                styles=[ "inline" ]
+            )
 
-    async def profileConnected(self, client):
-        if self._m is not None:
-            # we keep plain text message for MUC messages we send
-            # as we can't encrypt for our own device
-            client._xep_0384_muc_cache = {}
-            # and we keep them only for some time, in case something goes wrong
-            # with the MUC
-            client._xep_0384_muc_cache_timer = None
+            twomemo_active = dict(device.active).get(twomemo.twomemo.NAMESPACE)
+            if twomemo_active is None:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(not available for Twomemo)"))
+            if twomemo_active is False:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(inactive for Twomemo)"))
 
-        # FIXME: is _xep_0384_ready needed? can we use profileConnecting?
-        #        Workflow should be checked
-        client._xep_0384_ready = defer.Deferred()
-        # we first need to get devices ids (including our own)
-        persistent_dict = persistent.LazyPersistentBinaryDict("XEP-0384", client.profile)
-        client._xep_0384_data = persistent_dict
-        # all known devices of profile
-        devices = await self.getDevices(client)
-        # and our own device id
-        device_id = await persistent_dict.get(KEY_DEVICE_ID)
-        if device_id is None:
-            log.info(_("We have no identity for this device yet, let's generate one"))
-            # we have a new device, we create device_id
-            device_id = random.randint(1, 2**31-1)
-            # we check that it's really unique
-            while device_id in devices:
-                device_id = random.randint(1, 2**31-1)
-            # and we save it
-            await persistent_dict.aset(KEY_DEVICE_ID, device_id)
+            oldmemo_active = dict(device.active).get(oldmemo.oldmemo.NAMESPACE)
+            if oldmemo_active is None:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(not available for Oldmemo)"))
+            if oldmemo_active is False:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(inactive for Oldmemo)"))
+
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+        return result
 
-        log.debug(f"our OMEMO device id is {device_id}")
-
-        if device_id not in devices:
-            log.debug(f"our device id ({device_id}) is not in the list, adding it")
-            devices.add(device_id)
-            await defer.ensureDeferred(self.setDevices(client, devices))
-
-        all_jids = await persistent_dict.get(KEY_ALL_JIDS, set())
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[str]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
 
-        omemo_storage = OmemoStorage(client, device_id, all_jids)
-        omemo_session = await OmemoSession.create(client, omemo_storage, device_id)
-        client._xep_0384_cache = {}
-        client._xep_0384_session = omemo_session
-        client._xep_0384_device_id = device_id
-        await omemo_session.newDeviceList(client.jid, devices)
-        if omemo_session.republish_bundle:
-            log.info(_("Saving public bundle for this device ({device_id})").format(
-                device_id=device_id))
-            await defer.ensureDeferred(
-                self.setBundle(client, omemo_session.public_bundle, device_id)
-            )
-        client._xep_0384_ready.callback(None)
-        del client._xep_0384_ready
+        bare_jids: Set[str] = set()
 
-
-    ## XMPP PEP nodes manipulation
-
-    # devices
-
-    def parseDevices(self, items):
-        """Parse devices found in items
+        try:
+            room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
 
-        @param items(iterable[domish.Element]): items as retrieved by getItems
-        @return set[int]: parsed devices
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhost())
+
+        return bare_jids
+
+    async def __prepare_for_profile(self, profile: str) -> omemo.SessionManager:
         """
-        devices = set()
-        if len(items) > 1:
-            log.warning(_("OMEMO devices list is stored in more that one items, "
-                          "this is not expected"))
-        if items:
-            try:
-                list_elt = next(items[0].elements(NS_OMEMO, 'list'))
-            except StopIteration:
-                log.warning(_("no list element found in OMEMO devices list"))
-                return devices
-            for device_elt in list_elt.elements(NS_OMEMO, 'device'):
-                try:
-                    device_id = int(device_elt['id'])
-                except KeyError:
-                    log.warning(_('device element is missing "id" attribute: {elt}')
-                                .format(elt=device_elt.toXml()))
-                except ValueError:
-                    log.warning(_('invalid device id: {device_id}').format(
-                        device_id=device_elt['id']))
-                else:
-                    devices.add(device_id)
-        return devices
+        @param profile: The profile to prepare for.
+        @return: A session manager instance for this profile. Creates a new instance if
+            none was prepared before.
+        """
 
-    async def getDevices(self, client, entity_jid=None):
-        """Retrieve list of registered OMEMO devices
-
-        @param entity_jid(jid.JID, None): get devices from this entity
-            None to get our own devices
-        @return (set(int)): list of devices
-        """
-        if entity_jid is not None:
-            assert not entity_jid.resource
-        try:
-            items, metadata = await self._p.getItems(client, entity_jid, NS_OMEMO_DEVICES)
-        except exceptions.NotFound:
-            log.info(_("there is no node to handle OMEMO devices"))
-            return set()
-
-        devices = self.parseDevices(items)
-        return devices
-
-    async def setDevices(self, client, devices):
-        log.debug(f"setting devices with {', '.join(str(d) for d in devices)}")
-        list_elt = domish.Element((NS_OMEMO, 'list'))
-        for device in devices:
-            device_elt = list_elt.addElement('device')
-            device_elt['id'] = str(device)
         try:
-            await self._p.sendItem(
-                client, None, NS_OMEMO_DEVICES, list_elt,
-                item_id=self._p.ID_SINGLETON,
-                extra={
-                    self._p.EXTRA_PUBLISH_OPTIONS: {self._p.OPT_MAX_ITEMS: 1},
-                    self._p.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
-                }
-            )
-        except Exception as e:
-            log.warning(_("Can't set devices: {reason}").format(reason=e))
+            # Try to return the session manager
+            return self.__session_managers[profile]
+        except KeyError:
+            # If a session manager for that profile doesn't exist yet, check whether it is
+            # currently being built. A session manager being built is signified by the
+            # profile key existing on __session_manager_waiters.
+            if profile in self.__session_manager_waiters:
+                # If the session manager is being built, add ourselves to the waiting
+                # queue
+                deferred = defer.Deferred()
+                self.__session_manager_waiters[profile].append(deferred)
+                return cast(omemo.SessionManager, await deferred)
+
+            # If the session manager is not being built, do so here.
+            self.__session_manager_waiters[profile] = []
 
-    # bundles
-
-    async def getBundles(self, client, entity_jid, devices_ids):
-        """Retrieve public bundles of an entity devices
+            # Build and store the session manager
+            session_manager = await prepare_for_profile(
+                self.__sat,
+                profile,
+                initial_own_label="Libervia"
+            )
+            self.__session_managers[profile] = session_manager
 
-        @param entity_jid(jid.JID): bare jid of entity
-        @param devices_id(iterable[int]): ids of the devices bundles to retrieve
-        @return (tuple(dict[int, ExtendedPublicBundle], list(int))):
-            - bundles collection:
-                * key is device_id
-                * value is parsed bundle
-            - set of bundles not found
+            # Notify the waiters and delete them
+            for waiter in self.__session_manager_waiters[profile]:
+                waiter.callback(session_manager)
+            del self.__session_manager_waiters[profile]
+
+            return session_manager
+
+    async def __message_received_trigger(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
         """
-        assert not entity_jid.resource
-        bundles = {}
-        missing = set()
-        for device_id in devices_ids:
-            node = NS_OMEMO_BUNDLE.format(device_id=device_id)
-            try:
-                items, metadata = await self._p.getItems(client, entity_jid, node)
-            except exceptions.NotFound:
-                log.warning(_("Bundle missing for device {device_id}")
-                    .format(device_id=device_id))
-                missing.add(device_id)
-                continue
-            except jabber_error.StanzaError as e:
-                log.warning(_("Can't get bundle for device {device_id}: {reason}")
-                    .format(device_id=device_id, reason=e))
-                continue
-            if not items:
-                log.warning(_("no item found in node {node}, can't get public bundle "
-                              "for device {device_id}").format(node=node,
-                                                                device_id=device_id))
-                continue
-            if len(items) > 1:
-                log.warning(_("more than one item found in {node}, "
-                              "this is not expected").format(node=node))
-            item = items[0]
-            try:
-                bundle_elt = next(item.elements(NS_OMEMO, 'bundle'))
-                signedPreKeyPublic_elt = next(bundle_elt.elements(
-                    NS_OMEMO, 'signedPreKeyPublic'))
-                signedPreKeySignature_elt = next(bundle_elt.elements(
-                    NS_OMEMO, 'signedPreKeySignature'))
-                identityKey_elt = next(bundle_elt.elements(
-                    NS_OMEMO, 'identityKey'))
-                prekeys_elt =  next(bundle_elt.elements(
-                    NS_OMEMO, 'prekeys'))
-            except StopIteration:
-                log.warning(_("invalid bundle for device {device_id}, ignoring").format(
-                    device_id=device_id))
-                continue
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
+            message has fully progressed through the message receiving flow. Can be used
+            to apply treatments to the fully processed message, like marking it as
+            encrypted.
+        @return: Whether to continue the message received flow.
+        """
+
+        muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
+
+        sender_jid = jid.JID(message_elt["from"])
+        feedback_jid: jid.JID
+
+        message_type = message_elt.getAttribute("type", "unknown")
+        is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                log.warning(
+                    "Ignoring MUC message since plugin XEP-0045 is not available."
+                )
+                # Can't handle a MUC message without XEP-0045, let the flow continue
+                # normally
+                return True
+
+            room_jid = feedback_jid = sender_jid.userhostJID()
 
             try:
-                spkPublic = base64.b64decode(str(signedPreKeyPublic_elt))
-                spkSignature = base64.b64decode(
-                    str(signedPreKeySignature_elt))
+                room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid))
+            except exceptions.NotFound:
+                log.warning(
+                    f"Ignoring MUC message from a room that has not been joined:"
+                    f" {room_jid}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
+            if sender_user is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's user"
+                    f" wasn't found {sender_jid.resource}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
+            if sender_user_jid is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's bare"
+                    f" JID couldn't be found from its user information: {sender_user}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_jid = sender_user_jid
+
+            message_uid: Optional[str] = None
+            if self.__xep_0359 is not None:
+                message_uid = self.__xep_0359.getOriginId(message_elt)
+            if message_uid is None:
+                message_uid = message_elt.getAttribute("id")
+            if message_uid is not None:
+                muc_plaintext_cache_key = MUCPlaintextCacheKey(
+                    client,
+                    room_jid,
+                    message_uid
+                )
+        else:
+            # I'm not sure why this check is required, this code is copied from the old
+            # plugin.
+            if sender_jid.userhostJID() == client.jid.userhostJID():
+                # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
+                # like "to" isn't always set.
+                feedback_jid = jid.JID(message_elt["to"])
+            else:
+                feedback_jid = sender_jid
+
+        sender_bare_jid = sender_jid.userhost()
+
+        message: Optional[omemo.Message] = None
+        encrypted_elt: Optional[domish.Element] = None
+
+        twomemo_encrypted_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"),
+            None
+        ))
+
+        oldmemo_encrypted_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"),
+            None
+        ))
+
+        session_manager = await self.__prepare_for_profile(cast(str, client.profile))
+
+        if twomemo_encrypted_elt is not None:
+            try:
+                message = twomemo.etree.parse_message(
+                    domish_to_etree(twomemo_encrypted_elt),
+                    sender_bare_jid
+                )
+            except (ValueError, XMLSchemaValidationError):
+                log.warning(
+                    f"Ingoring malformed encrypted message for namespace"
+                    f" {twomemo.twomemo.NAMESPACE}: {twomemo_encrypted_elt.toXml()}"
+                )
+            else:
+                encrypted_elt = twomemo_encrypted_elt
+
+        if oldmemo_encrypted_elt is not None:
+            try:
+                message = await oldmemo.etree.parse_message(
+                    domish_to_etree(oldmemo_encrypted_elt),
+                    sender_bare_jid,
+                    client.jid.userhost(),
+                    session_manager
+                )
+            except (ValueError, XMLSchemaValidationError):
+                log.warning(
+                    f"Ingoring malformed encrypted message for namespace"
+                    f" {oldmemo.oldmemo.NAMESPACE}: {oldmemo_encrypted_elt.toXml()}"
+                )
+            except omemo.SenderNotFound:
+                log.warning(
+                    f"Ingoring encrypted message for namespace"
+                    f" {oldmemo.oldmemo.NAMESPACE} by unknown sender:"
+                    f" {oldmemo_encrypted_elt.toXml()}"
+                )
+            else:
+                encrypted_elt = oldmemo_encrypted_elt
+
+        if message is None or encrypted_elt is None:
+            # None of our business, let the flow continue
+            return True
+
+        message_elt.children.remove(encrypted_elt)
 
-                ik = base64.b64decode(str(identityKey_elt))
-                spk = {
-                    "key": spkPublic,
-                    "id": int(signedPreKeyPublic_elt['signedPreKeyId'])
-                }
-                otpks = []
-                for preKeyPublic_elt in prekeys_elt.elements(NS_OMEMO, 'preKeyPublic'):
-                    preKeyPublic = base64.b64decode(str(preKeyPublic_elt))
-                    otpk = {
-                        "key": preKeyPublic,
-                        "id": int(preKeyPublic_elt['preKeyId'])
-                    }
-                    otpks.append(otpk)
+        log.debug(
+            f"{message.namespace} message of type {message_type} received from"
+            f" {sender_bare_jid}"
+        )
+
+        plaintext: Optional[bytes]
+        device_information: omemo.DeviceInformation
+
+        if (
+            muc_plaintext_cache_key is not None
+            and muc_plaintext_cache_key in self.__muc_plaintext_cache
+        ):
+            # Use the cached plaintext
+            plaintext = self.__muc_plaintext_cache.pop(muc_plaintext_cache_key)
+
+            # Since this message was sent by us, use the own device information here
+            device_information, __ = await session_manager.get_own_device_information()
+        else:
+            try:
+                plaintext, device_information = await session_manager.decrypt(message)
+            except omemo.MessageNotForUs:
+                # The difference between this being a debug or a warning is whether there
+                # is a body included in the message. Without a body, we can assume that
+                # it's an empty OMEMO message used for protocol stability reasons, which
+                # is not expected to be sent to all devices of all recipients. If a body
+                # is included, we can assume that the message carries content and we
+                # missed out on something.
+                if len(list(message_elt.elements(C.NS_CLIENT, "body"))) > 0:
+                    client.feedback(
+                        feedback_jid,
+                        D_(
+                            f"An OMEMO message from {sender_jid.full()} has not been"
+                            f" encrypted for our device, we can't decrypt it."
+                        ),
+                        { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                    )
+                    log.warning("Message not encrypted for us.")
+                else:
+                    log.debug("Message not encrypted for us.")
 
+                # No point in further processing this message.
+                return False
             except Exception as e:
-                log.warning(_("error while decoding key for device {device_id}: {msg}")
-                            .format(device_id=device_id, msg=e))
-                continue
+                log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
+                    reason=e,
+                    xml=message_elt.toXml()
+                ))
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        f"An OMEMO message from {sender_jid.full()} can't be decrypted:"
+                        f" {e}"
+                    ),
+                    { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                )
+                # No point in further processing this message
+                return False
 
-            bundles[device_id] = ExtendedPublicBundle.parse(omemo_backend, ik, spk,
-                                                            spkSignature, otpks)
+        if message.namespace == twomemo.twomemo.NAMESPACE:
+            if plaintext is not None:
+                # XEP_0420.unpack_stanza handles the whole unpacking, including the
+                # relevant modifications to the element
+                sce_profile = \
+                    OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE
+                try:
+                    affix_values = self.__xep_0420.unpack_stanza(
+                        sce_profile,
+                        message_elt,
+                        plaintext
+                    )
+                except Exception as e:
+                    log.warning(D_(
+                        f"Error unpacking SCE-encrypted message: {e}\n{plaintext}"
+                    ))
+                    client.feedback(
+                        feedback_jid,
+                        D_(
+                            f"An OMEMO message from {sender_jid.full()} was rejected:"
+                            f" {e}"
+                        ),
+                        { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                    )
+                    # No point in further processing this message
+                    return False
 
-        return (bundles, missing)
+                if affix_values.timestamp is not None:
+                    # TODO: affix_values.timestamp contains the timestamp included in the
+                    # encrypted element here. The XEP says it SHOULD be displayed with the
+                    # plaintext by clients.
+                    pass
 
-    async def setBundle(self, client, bundle, device_id):
-        """Set public bundle for this device.
+        if message.namespace == oldmemo.oldmemo.NAMESPACE:
+            # Remove all body elements from the original element, since those act as
+            # fallbacks in case the encryption protocol is not supported
+            for child in message_elt.elements():
+                if child.name == "body":
+                    message_elt.children.remove(child)
+
+            if plaintext is not None:
+                # Add the decrypted body
+                message_elt.addElement("body", content=plaintext.decode("utf-8"))
+
+        # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
+        trust_level = \
+            TrustLevel(device_information.trust_level_name).to_omemo_trust_level()
+        if trust_level is omemo.TrustLevel.TRUSTED:
+            post_treat.addCallback(client.encryption.markAsTrusted)
+        else:
+            post_treat.addCallback(client.encryption.markAsUntrusted)
 
-        @param bundle(ExtendedPublicBundle): bundle to publish
+        # Mark the message as originally encrypted
+        post_treat.addCallback(
+            client.encryption.markAsEncrypted,
+            namespace=message.namespace
+        )
+
+        # Message processed successfully, continue with the flow
+        return True
+
+    async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
+        """
+        @param client: The client sending this message.
+        @param stanza: The stanza that is about to be sent. Can be modified.
+        @return: Whether the send message flow should continue or not.
         """
-        log.debug(_("updating bundle for {device_id}").format(device_id=device_id))
-        bundle = bundle.serialize(omemo_backend)
-        bundle_elt = domish.Element((NS_OMEMO, 'bundle'))
-        signedPreKeyPublic_elt = bundle_elt.addElement(
-            "signedPreKeyPublic",
-            content=b64enc(bundle["spk"]['key']))
-        signedPreKeyPublic_elt['signedPreKeyId'] = str(bundle["spk"]['id'])
+
+        # SCE is only applicable to message and IQ stanzas
+        if stanza.name not in { "message", "iq" }:
+            return True
+
+        # Get the intended recipient
+        recipient = stanza.getAttribute("to", None)
+        if recipient is None:
+            if stanza.name == "message":
+                # Message stanzas must have a recipient
+                raise exceptions.InternalError(
+                    f"Message without recipient encountered. Blocking further processing"
+                    f" to avoid leaking plaintext data: {stanza.toXml()}"
+                )
+
+            # IQs without a recipient are a thing, I believe those simply target the
+            # server and are thus not eligible for e2ee anyway.
+            return True
+
+        # Parse the JID
+        recipient_bare_jid = jid.JID(recipient).userhostJID()
+
+        # Check whether encryption with twomemo is requested
+        encryption = client.encryption.getSession(recipient_bare_jid)
+
+        if encryption is None:
+            # Encryption is not requested for this recipient
+            return True
+
+        if encryption["plugin"].namespace != twomemo.twomemo.NAMESPACE:
+            # Encryption is requested for this recipient, but not with twomemo
+            return True
+
+        # All pre-checks done, we can start encrypting!
+        await self.__encrypt(
+            client,
+            twomemo.twomemo.NAMESPACE,
+            stanza,
+            recipient_bare_jid,
+            stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT,
+            stanza.getAttribute("id", None)
+        )
 
-        bundle_elt.addElement(
-            "signedPreKeySignature",
-            content=b64enc(bundle["spk_signature"]))
+        # Add a store hint if this is a message stanza
+        if stanza.name == "message":
+            self.__xep_0334.addHintElements(stanza, [ "store" ])
+
+        # Let the flow continue.
+        return True
+
+    async def __send_message_data_trigger(
+        self,
+        client: SatXMPPClient,
+        mess_data: MessageData
+    ) -> None:
+        """
+        @param client: The client sending this message.
+        @param mess_data: The message data that is about to be sent. Can be modified.
+        """
+
+        # Check whether encryption is requested for this message
+        try:
+            namespace = mess_data[C.MESS_KEY_ENCRYPTION]["plugin"].namespace
+        except KeyError:
+            return
+
+        # If encryption is requested, check whether it's oldmemo
+        if namespace != oldmemo.oldmemo.NAMESPACE:
+            return
+
+        # All pre-checks done, we can start encrypting!
+        stanza = mess_data["xml"]
+        recipient_jid = mess_data["to"]
+        is_muc_message = mess_data["type"] == C.MESS_TYPE_GROUPCHAT
+        stanza_id = mess_data["uid"]
+
+        await self.__encrypt(
+            client,
+            oldmemo.oldmemo.NAMESPACE,
+            stanza,
+            recipient_jid,
+            is_muc_message,
+            stanza_id
+        )
+
+        # Add a store hint
+        self.__xep_0334.addHintElements(stanza, [ "store" ])
 
-        bundle_elt.addElement(
-            "identityKey",
-            content=b64enc(bundle["ik"]))
+    async def __encrypt(
+        self,
+        client: SatXMPPClient,
+        namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"],
+        stanza: domish.Element,
+        recipient_jid: jid.JID,
+        is_muc_message: bool,
+        stanza_id: Optional[str]
+    ) -> None:
+        """
+        @param client: The client.
+        @param namespace: The namespace of the OMEMO version to use.
+        @param stanza: The stanza. Twomemo will encrypt the whole stanza using SCE,
+            oldmemo will encrypt only the body. The stanza is modified by this call.
+        @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
+            but doesn't have to.
+        @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
+        @param stanza_id: The id of this stanza. Especially relevant for message stanzas
+            to MUC rooms such that the outgoing plaintext can be cached for MUC message
+            reflection handling.
+
+        @warning: The calling code MUST take care of adding the store message processing
+            hint to the stanza if applicable! This can be done before or after this call,
+            the order doesn't matter.
+        """
+
+        muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
+
+        recipient_bare_jids: Set[str]
+        feedback_jid: jid.JID
+
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but plugin XEP-0045 is not"
+                    " available."
+                )
+
+            if stanza_id is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but stanza id not available."
+                )
+
+            room_jid = feedback_jid = recipient_jid.userhostJID()
+
+            recipient_bare_jids = self.__get_joined_muc_users(
+                client,
+                self.__xep_0045,
+                room_jid
+            )
+
+            muc_plaintext_cache_key = MUCPlaintextCacheKey(
+                client=client,
+                room_jid=room_jid,
+                message_uid=stanza_id
+            )
+        else:
+            recipient_bare_jids = { recipient_jid.userhost() }
+            feedback_jid = recipient_jid.userhostJID()
 
-        prekeys_elt = bundle_elt.addElement('prekeys')
-        for otpk in bundle["otpks"]:
-            preKeyPublic_elt = prekeys_elt.addElement(
-                'preKeyPublic',
-                content=b64enc(otpk["key"]))
-            preKeyPublic_elt['preKeyId'] = str(otpk['id'])
+        log.debug(
+            f"Intercepting message that is to be encrypted by {namespace} for"
+            f" {recipient_bare_jids}"
+        )
+
+        def prepare_stanza() -> Optional[bytes]:
+            """Prepares the stanza for encryption.
+
+            Does so by removing all parts that are not supposed to be sent in plain. Also
+            extracts/prepares the plaintext to encrypt.
+
+            @return: The plaintext to encrypt. Returns ``None`` in case body-only
+                encryption is requested and no body was found. The function should
+                gracefully return in that case, i.e. it's not a critical error that should
+                abort the message sending flow.
+            """
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                return self.__xep_0420.pack_stanza(
+                    OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE,
+                    stanza
+                )
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                plaintext: Optional[bytes] = None
+
+                for child in stanza.elements():
+                    if child.name == "body" and plaintext is None:
+                        plaintext = str(child).encode("utf-8")
 
-        node = NS_OMEMO_BUNDLE.format(device_id=device_id)
+                    # Any other sensitive elements to remove here?
+                    if child.name in { "body", "html" }:
+                        stanza.children.remove(child)
+
+                if plaintext is None:
+                    log.warning(
+                        "No body found in intercepted message to be encrypted with"
+                        " oldmemo."
+                    )
+
+                return plaintext
+
+            return assert_never(namespace)
+
+        # The stanza/plaintext preparation was moved into its own little function for type
+        # safety reasons.
+        plaintext = prepare_stanza()
+        if plaintext is None:
+            return
+
+        log.debug(f"Plaintext to encrypt: {plaintext}")
+
+        session_manager = await self.__prepare_for_profile(client.profile)
+
         try:
-            await self._p.sendItem(
-                client, None, node, bundle_elt, item_id=self._p.ID_SINGLETON,
-                extra={
-                    self._p.EXTRA_PUBLISH_OPTIONS: {self._p.OPT_MAX_ITEMS: 1},
-                    self._p.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
-                }
+            messages, encryption_errors = await session_manager.encrypt(
+                frozenset(recipient_bare_jids),
+                { namespace: plaintext },
+                backend_priority_order=[ namespace ],
+                identifier=feedback_jid.userhost()
             )
         except Exception as e:
-            log.warning(_("Can't set bundle: {reason}").format(reason=e))
-
-    ## PEP node events callbacks
-
-    async def onNewDevices(self, itemsEvent, profile):
-        log.debug("devices list has been updated")
-        client = self.host.getClient(profile)
-        try:
-            omemo_session = client._xep_0384_session
-        except AttributeError:
-            await client._xep_0384_ready
-            omemo_session = client._xep_0384_session
-        entity = itemsEvent.sender
-
-        devices = self.parseDevices(itemsEvent.items)
-        await omemo_session.newDeviceList(entity, devices)
-
-        if entity == client.jid.userhostJID():
-            own_device = client._xep_0384_device_id
-            if own_device not in devices:
-                log.warning(_("Our own device is missing from devices list, fixing it"))
-                devices.add(own_device)
-                await self.setDevices(client, devices)
-
-    ## triggers
-
-    async def policyBTBV(self, client, feedback_jid, expect_problems, undecided):
-        session = client._xep_0384_session
-        stored_data = client._xep_0384_data
-        for pb in undecided.values():
-            peer_jid = jid.JID(pb.bare_jid)
-            device = pb.device
-            ik = pb.ik
-            key = f"{KEY_AUTO_TRUST}\n{pb.bare_jid}"
-            auto_trusted = await stored_data.get(key, default=set())
-            auto_trusted.add(device)
-            await stored_data.aset(key, auto_trusted)
-            await session.setTrust(peer_jid, device, ik, True)
-
-        user_msg =  D_(
-            "Not all destination devices are trusted, unknown devices will be blind "
-            "trusted due to the OMEMO Blind Trust Before Verification policy. If you "
-            "want a more secure workflow, please activate \"manual\" OMEMO policy in "
-            "settings' \"Security\" tab.\nFollowing fingerprint have been automatically "
-            "trusted:\n{devices}"
-        ).format(
-            devices = ', '.join(
-                f"- {pb.device} ({pb.bare_jid}): {pb.ik.hex().upper()}"
-                for pb in undecided.values()
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join(recipient_bare_jids),
+                    reason=e
+                )
             )
-        )
-        client.feedback(feedback_jid, user_msg)
-
-    async def policyManual(self, client, feedback_jid, expect_problems, undecided):
-        trust_data = {}
-        for trust_id, data in undecided.items():
-            trust_data[trust_id] = {
-                'jid': jid.JID(data.bare_jid),
-                'device':  data.device,
-                'ik': data.ik}
-
-        user_msg =  D_("Not all destination devices are trusted, we can't encrypt "
-                       "message in such a situation. Please indicate if you trust "
-                       "those devices or not in the trust manager before we can "
-                       "send this message")
-        client.feedback(feedback_jid, user_msg)
-        xmlui = await self.getTrustUI(client, trust_data=trust_data, submit_id="")
-
-        answer = await xml_tools.deferXMLUI(
-            self.host,
-            xmlui,
-            action_extra={
-                "meta_encryption_trust": NS_OMEMO,
-            },
-            profile=client.profile)
-        await self.trustUICb(answer, trust_data, expect_problems, client.profile)
-
-    async def handleProblems(
-        self, client, feedback_jid, bundles, expect_problems, problems):
-        """Try to solve problems found by EncryptMessage
-
-        @param feedback_jid(jid.JID): bare jid where the feedback message must be sent
-        @param bundles(dict): bundles data as used in EncryptMessage
-            already filled with known bundles, missing bundles
-            need to be added to it
-            This dict is updated
-        @param problems(list): exceptions raised by EncryptMessage
-        @param expect_problems(dict): known problems to expect, used in encryptMessage
-            This dict will list devices where problems can be ignored
-            (those devices won't receive the encrypted data)
-            This dict is updated
-        """
-        # FIXME: not all problems are handled yet
-        undecided = {}
-        missing_bundles = {}
-        found_bundles = None
-        cache = client._xep_0384_cache
-        for problem in problems:
-            if isinstance(problem, omemo_excpt.TrustException):
-                if problem.problem == 'undecided':
-                    undecided[str(hash(problem))] = problem
-                elif problem.problem == 'untrusted':
-                    expect_problems.setdefault(problem.bare_jid, set()).add(
-                        problem.device)
-                    log.info(_(
-                        "discarding untrusted device {device_id} with key {device_key} "
-                        "for {entity}").format(
-                            device_id=problem.device,
-                            device_key=problem.ik.hex().upper(),
-                            entity=problem.bare_jid,
-                        )
-                    )
-                else:
-                    log.error(
-                        f"Unexpected trust problem: {problem.problem!r} for device "
-                        f"{problem.device} for {problem.bare_jid}, ignoring device")
-                    expect_problems.setdefault(problem.bare_jid, set()).add(
-                        problem.device)
-            elif isinstance(problem, omemo_excpt.MissingBundleException):
-                pb_entity = jid.JID(problem.bare_jid)
-                entity_cache = cache.setdefault(pb_entity, {})
-                entity_bundles = bundles.setdefault(pb_entity, {})
-                if problem.device in entity_cache:
-                    entity_bundles[problem.device] = entity_cache[problem.device]
-                else:
-                    found_bundles, missing = await self.getBundles(
-                        client, pb_entity, [problem.device])
-                    entity_cache.update(bundles)
-                    entity_bundles.update(found_bundles)
-                    if problem.device in missing:
-                        missing_bundles.setdefault(pb_entity, set()).add(
-                            problem.device)
-                        expect_problems.setdefault(problem.bare_jid, set()).add(
-                            problem.device)
-            elif isinstance(problem, omemo_excpt.NoEligibleDevicesException):
-                if undecided or found_bundles:
-                    # we may have new devices after this run, so let's continue for now
-                    continue
-                else:
-                    raise problem
-            else:
-                raise problem
-
-        for peer_jid, devices in missing_bundles.items():
-            devices_s = [str(d) for d in devices]
-            log.warning(
-                _("Can't retrieve bundle for device(s) {devices} of entity {peer}, "
-                  "the message will not be readable on this/those device(s)").format(
-                    devices=", ".join(devices_s), peer=peer_jid.full()))
-            client.feedback(
-                feedback_jid,
-                D_("You're destinee {peer} has missing encryption data on some of "
-                   "his/her device(s) (bundle on device {devices}), the message won't  "
-                   "be readable on this/those device.").format(
-                   peer=peer_jid.full(), devices=", ".join(devices_s)))
-
-        if undecided:
-            omemo_policy = self.host.memory.getParamA(
-                PARAM_NAME, PARAM_CATEGORY, profile_key=client.profile
-            )
-            if omemo_policy == 'btbv':
-                # we first separate entities which have been trusted manually
-                manual_trust = await client._xep_0384_data.get(KEY_MANUAL_TRUST)
-                if manual_trust:
-                    manual_undecided = {}
-                    for hash_, pb in undecided.items():
-                        if pb.bare_jid in manual_trust:
-                            manual_undecided[hash_] = pb
-                    for hash_ in manual_undecided:
-                        del undecided[hash_]
-                else:
-                    manual_undecided = None
-
-                if undecided:
-                    # we do the automatic trust here
-                    await self.policyBTBV(
-                        client, feedback_jid, expect_problems, undecided)
-                if manual_undecided:
-                    # here user has to manually trust new devices from entities already
-                    # verified
-                    await self.policyManual(
-                        client, feedback_jid, expect_problems, manual_undecided)
-            elif omemo_policy == 'manual':
-                await self.policyManual(
-                    client, feedback_jid, expect_problems, undecided)
-            else:
-                raise exceptions.InternalError(f"Unexpected OMEMO policy: {omemo_policy}")
-
-    async def encryptMessage(self, client, entity_bare_jids, message, feedback_jid=None):
-        if feedback_jid is None:
-            if len(entity_bare_jids) != 1:
-                log.error(
-                    "feedback_jid must be provided when message is encrypted for more "
-                    "than one entities")
-                feedback_jid = entity_bare_jids[0]
-        omemo_session = client._xep_0384_session
-        expect_problems = {}
-        bundles = {}
-        loop_idx = 0
-        try:
-            while True:
-                if loop_idx > 10:
-                    msg = _("Too many iterations in encryption loop")
-                    log.error(msg)
-                    raise exceptions.InternalError(msg)
-                # encryptMessage may fail, in case of e.g. trust issue or missing bundle
-                try:
-                    if not message:
-                        encrypted = await omemo_session.encryptRatchetForwardingMessage(
-                            entity_bare_jids,
-                            bundles,
-                            expect_problems = expect_problems)
-                    else:
-                        encrypted = await omemo_session.encryptMessage(
-                            entity_bare_jids,
-                            message,
-                            bundles,
-                            expect_problems = expect_problems)
-                except omemo_excpt.EncryptionProblemsException as e:
-                    # we know the problem to solve, we can try to fix them
-                    await self.handleProblems(
-                        client,
-                        feedback_jid=feedback_jid,
-                        bundles=bundles,
-                        expect_problems=expect_problems,
-                        problems=e.problems)
-                    loop_idx += 1
-                else:
-                    break
-        except Exception as e:
-            msg = _("Can't encrypt message for {entities}: {reason}".format(
-                entities=', '.join(e.full() for e in entity_bare_jids), reason=e))
             log.warning(msg)
-            extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR}
-            client.feedback(feedback_jid, msg, extra)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
             raise e
 
-        defer.returnValue(encrypted)
-
-    @defer.inlineCallbacks
-    def _messageReceivedTrigger(self, client, message_elt, post_treat):
-        try:
-            encrypted_elt = next(message_elt.elements(NS_OMEMO, "encrypted"))
-        except StopIteration:
-            # no OMEMO message here
-            defer.returnValue(True)
-
-        # we have an encrypted message let's decrypt it
-
-        from_jid = jid.JID(message_elt['from'])
+        if len(encryption_errors) > 0:
+            log.warning(
+                f"Ignored the following non-critical encryption errors:"
+                f" {encryption_errors}"
+            )
 
-        if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT:
-            # with group chat, we must get the real jid for decryption
-            # and use the room as feedback_jid
-
-            if self._m is None:
-                # plugin XEP-0045 (MUC) is not available
-                defer.returnValue(True)
-
-            room_jid = from_jid.userhostJID()
-            feedback_jid = room_jid
-            if self._sid is not None:
-                mess_id = self._sid.getOriginId(message_elt)
-            else:
-                mess_id = None
-
-            if mess_id is None:
-                mess_id = message_elt.getAttribute('id')
-            cache_key = (room_jid, mess_id)
+            encrypted_errors_stringified = ", ".join([
+                f"device {err.device_id} of {err.bare_jid} under namespace"
+                f" {err.namespace}"
+                for err
+                in encryption_errors
+            ])
 
-            try:
-                room = self._m.getRoom(client, room_jid)
-            except exceptions.NotFound:
-                log.warning(
-                    f"Received an OMEMO encrypted msg from a room {room_jid} which has "
-                    f"not been joined, ignoring")
-                defer.returnValue(True)
-
-            user = room.getUser(from_jid.resource)
-            if user is None:
-                log.warning(f"Can't find user {user} in room {room_jid}, ignoring")
-                defer.returnValue(True)
-            if not user.entity:
-                log.warning(
-                    f"Real entity of user {user} in room {room_jid} can't be established,"
-                    f" OMEMO encrypted message can't be decrypted")
-                defer.returnValue(True)
-
-            # now we have real jid of the entity, we use it instead of from_jid
-            from_jid = user.entity.userhostJID()
+            client.feedback(
+                feedback_jid,
+                D_(
+                    "There were non-critical errors during encryption resulting in some"
+                    " of your destinees' devices potentially not receiving the message."
+                    " This happens when the encryption data/key material of a device is"
+                    " incomplete or broken, which shouldn't happen for actively used"
+                    " devices, and can usually be ignored. The following devices are"
+                    f" affected: {encrypted_errors_stringified}."
+                )
+            )
 
-        else:
-            # we have a one2one message, we can user "from" and "to" normally
-
-            if from_jid.userhostJID() == client.jid.userhostJID():
-                feedback_jid = jid.JID(message_elt['to'])
-            else:
-                feedback_jid = from_jid
-
-
-        if (message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT
-            and mess_id is not None
-            and cache_key in client._xep_0384_muc_cache):
-            plaintext = client._xep_0384_muc_cache.pop(cache_key)
-            if not client._xep_0384_muc_cache:
-                client._xep_0384_muc_cache_timer.cancel()
-                client._xep_0384_muc_cache_timer = None
-        else:
-            try:
-                omemo_session = client._xep_0384_session
-            except AttributeError:
-                # on startup, message can ve received before session actually exists
-                # so we need to synchronise here
-                yield client._xep_0384_ready
-                omemo_session = client._xep_0384_session
+        message = next(message for message in messages if message.namespace == namespace)
 
-            device_id = client._xep_0384_device_id
-            try:
-                header_elt = next(encrypted_elt.elements(NS_OMEMO, 'header'))
-                iv_elt = next(header_elt.elements(NS_OMEMO, 'iv'))
-            except StopIteration:
-                log.warning(_("Invalid OMEMO encrypted stanza, ignoring: {xml}")
-                    .format(xml=message_elt.toXml()))
-                defer.returnValue(False)
-            try:
-                s_device_id = header_elt['sid']
-            except KeyError:
-                log.warning(_("Invalid OMEMO encrypted stanza, missing sender device ID, "
-                              "ignoring: {xml}")
-                    .format(xml=message_elt.toXml()))
-                defer.returnValue(False)
-            try:
-                key_elt = next((e for e in header_elt.elements(NS_OMEMO, 'key')
-                                if int(e['rid']) == device_id))
-            except StopIteration:
-                log.warning(_("This OMEMO encrypted stanza has not been encrypted "
-                              "for our device (device_id: {device_id}, fingerprint: "
-                              "{fingerprint}): {xml}").format(
-                              device_id=device_id,
-                              fingerprint=omemo_session.public_bundle.ik.hex().upper(),
-                              xml=encrypted_elt.toXml()))
-                user_msg = (D_("An OMEMO message from {sender} has not been encrypted for "
-                               "our device, we can't decrypt it").format(
-                               sender=from_jid.full()))
-                extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}
-                client.feedback(feedback_jid, user_msg, extra)
-                defer.returnValue(False)
-            except ValueError as e:
-                log.warning(_("Invalid recipient ID: {msg}".format(msg=e)))
-                defer.returnValue(False)
-            is_pre_key = C.bool(key_elt.getAttribute('prekey', 'false'))
-            payload_elt = next(encrypted_elt.elements(NS_OMEMO, 'payload'), None)
-            additional_information = {
-                "from_storage": bool(message_elt.delay)
-            }
+        if namespace == twomemo.twomemo.NAMESPACE:
+            # Add the encrypted element
+            stanza.addChild(etree_to_domish(twomemo.etree.serialize_message(message)))
 
-            kwargs = {
-                "bare_jid": from_jid.userhostJID(),
-                "device": s_device_id,
-                "iv": base64.b64decode(bytes(iv_elt)),
-                "message": base64.b64decode(bytes(key_elt)),
-                "is_pre_key_message": is_pre_key,
-                "additional_information":  additional_information,
-            }
+        if namespace == oldmemo.oldmemo.NAMESPACE:
+            # Add the encrypted element
+            stanza.addChild(etree_to_domish(oldmemo.etree.serialize_message(message)))
+
+        if muc_plaintext_cache_key is not None:
+            self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext
 
-            try:
-                if payload_elt is None:
-                    omemo_session.decryptRatchetForwardingMessage(**kwargs)
-                    plaintext = None
-                else:
-                    kwargs["ciphertext"] = base64.b64decode(bytes(payload_elt))
-                    try:
-                        plaintext = yield omemo_session.decryptMessage(**kwargs)
-                    except omemo_excpt.TrustException:
-                        post_treat.addCallback(client.encryption.markAsUntrusted)
-                        kwargs['allow_untrusted'] = True
-                        plaintext = yield omemo_session.decryptMessage(**kwargs)
-                    else:
-                        post_treat.addCallback(client.encryption.markAsTrusted)
-                    plaintext = plaintext.decode()
-            except Exception as e:
-                log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
-                    reason=e, xml=message_elt.toXml()))
-                user_msg = (D_(
-                    "An OMEMO message from {sender} can't be decrypted: {reason}")
-                    .format(sender=from_jid.full(), reason=e))
-                extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}
-                client.feedback(feedback_jid, user_msg, extra)
-                defer.returnValue(False)
-            finally:
-                if omemo_session.republish_bundle:
-                    # we don't wait for the Deferred (i.e. no yield) on purpose
-                    # there is no need to block the whole message workflow while
-                    # updating the bundle
-                    defer.ensureDeferred(
-                        self.setBundle(client, omemo_session.public_bundle, device_id)
-                    )
-
-        message_elt.children.remove(encrypted_elt)
-        if plaintext:
-            message_elt.addElement("body", content=plaintext)
-        post_treat.addCallback(client.encryption.markAsEncrypted, namespace=NS_OMEMO)
-        defer.returnValue(True)
+    async def __on_device_list_update(
+        self,
+        items_event: pubsub.ItemsEvent,
+        profile: str
+    ) -> None:
+        """Handle device list updates fired by PEP.
 
-    def getJIDsForRoom(self, client, room_jid):
-        if self._m is None:
-            exceptions.InternalError("XEP-0045 plugin missing, can't encrypt for group chat")
-        room = self._m.getRoom(client, room_jid)
-        return [u.entity.userhostJID() for u in room.roster.values()]
-
-    def _expireMUCCache(self, client):
-        client._xep_0384_muc_cache_timer = None
-        for (room_jid, uid), msg in client._xep_0384_muc_cache.items():
-            client.feedback(
-                room_jid,
-                D_("Our message with UID {uid} has not been received in time, it has "
-                   "probably been lost. The message was: {msg!r}").format(
-                    uid=uid, msg=str(msg)))
-
-        client._xep_0384_muc_cache.clear()
-        log.warning("Cache for OMEMO MUC has expired")
+        @param items_event: The event.
+        @param profile: The profile this event belongs to.
+        """
 
-    @defer.inlineCallbacks
-    def _sendMessageDataTrigger(self, client, mess_data):
-        encryption = mess_data.get(C.MESS_KEY_ENCRYPTION)
-        if encryption is None or encryption['plugin'].namespace != NS_OMEMO:
-            return
-        message_elt = mess_data["xml"]
-        if mess_data['type'] == C.MESS_TYPE_GROUPCHAT:
-            feedback_jid = room_jid = mess_data['to']
-            to_jids = self.getJIDsForRoom(client, room_jid)
-        else:
-            feedback_jid = to_jid = mess_data["to"].userhostJID()
-            to_jids = [to_jid]
-        log.debug("encrypting message")
-        body = None
-        for child in list(message_elt.children):
-            if child.name == "body":
-                # we remove all unencrypted body,
-                # and will only encrypt the first one
-                if body is None:
-                    body = child
-                message_elt.children.remove(child)
-            elif child.name == "html":
-                # we don't want any XHTML-IM element
-                message_elt.children.remove(child)
+        sender = cast(jid.JID, items_event.sender)
+        items = cast(List[domish.Element], items_event.items)
 
-        if body is None:
-            log.warning("No message found")
+        if len(items) > 1:
+            log.warning("Ignoring device list update with more than one element.")
             return
 
-        body = str(body)
+        item = next(iter(items), None)
+        if item is None:
+            log.debug("Ignoring empty device list update.")
+            return
+
+        item_elt = domish_to_etree(item)
 
-        if mess_data['type'] == C.MESS_TYPE_GROUPCHAT:
-            key = (room_jid, mess_data['uid'])
-            # XXX: we can't encrypt message for our own device for security reason
-            #      so we keep the plain text version in cache until we receive the
-            #      message. We don't send it directly to bridge to keep a workflow
-            #      similar to plain text MUC, so when we see it in frontend we know
-            #      that it has been sent correctly.
-            client._xep_0384_muc_cache[key] = body
-            timer = client._xep_0384_muc_cache_timer
-            if timer is None:
-                client._xep_0384_muc_cache_timer = reactor.callLater(
-                    MUC_CACHE_TTL, self._expireMUCCache, client)
+        device_list: Dict[int, Optional[str]] = {}
+        namespace: Optional[str] = None
+
+        list_elt = item_elt.find(f"{{{twomemo.twomemo.NAMESPACE}}}devices")
+        if list_elt is not None:
+            try:
+                device_list = twomemo.etree.parse_device_list(list_elt)
+            except XMLSchemaValidationError:
+                pass
             else:
-                timer.reset(MUC_CACHE_TTL)
-
-        encryption_data = yield defer.ensureDeferred(self.encryptMessage(
-            client, to_jids, body, feedback_jid=feedback_jid))
+                namespace = twomemo.twomemo.NAMESPACE
 
-        encrypted_elt = message_elt.addElement((NS_OMEMO, 'encrypted'))
-        header_elt = encrypted_elt.addElement('header')
-        header_elt['sid'] = str(encryption_data['sid'])
+        list_elt = item_elt.find(f"{{{oldmemo.oldmemo.NAMESPACE}}}list")
+        if list_elt is not None:
+            try:
+                device_list = oldmemo.etree.parse_device_list(list_elt)
+            except XMLSchemaValidationError:
+                pass
+            else:
+                namespace = oldmemo.oldmemo.NAMESPACE
 
-        for key_data in encryption_data['keys'].values():
-            for rid, data in key_data.items():
-                key_elt = header_elt.addElement(
-                    'key',
-                    content=b64enc(data['data'])
-                )
-                key_elt['rid'] = str(rid)
-                if data['pre_key']:
-                    key_elt['prekey'] = 'true'
+        if namespace is None:
+            log.warning(
+                f"Malformed device list update item:"
+                f" {ET.tostring(item_elt, encoding='unicode')}"
+            )
+            return
 
-        header_elt.addElement(
-            'iv',
-            content=b64enc(encryption_data['iv']))
-        try:
-            encrypted_elt.addElement(
-                'payload',
-                content=b64enc(encryption_data['payload']))
-        except KeyError:
-            pass
+        session_manager = await self.__prepare_for_profile(profile)
+
+        await session_manager.update_device_list(
+            namespace,
+            sender.userhost(),
+            device_list
+        )
diff --git a/sat/plugins/plugin_xep_0420.py b/sat/plugins/plugin_xep_0420.py
--- a/sat/plugins/plugin_xep_0420.py
+++ b/sat/plugins/plugin_xep_0420.py
@@ -16,15 +16,12 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-# Type-check with `mypy --strict --disable-error-code no-untyped-call`
-# Lint with `pylint`
-
 from abc import ABC, abstractmethod
 from datetime import datetime
 import enum
 import secrets
 import string
-from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast
+from typing import Dict, NamedTuple, Optional, Set, Tuple, cast
 
 from lxml import etree
 
@@ -55,7 +52,7 @@
 ]
 
 
-log = cast(Logger, getLogger(__name__))
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
 
 
 PLUGIN_INFO = {
@@ -291,19 +288,15 @@
         # Note the serialized byte size of the content element before adding any children
         empty_content_byte_size = len(content.toXml().encode("utf-8"))
 
-        # Just for type safety
-        stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
-        content_children = cast(List[Union[domish.Element, str]], content.children)
-
         # Move elements that are not explicitly forbidden from being encrypted from the
         # stanza to the content element.
-        for child in list(cast(Iterator[domish.Element], stanza.elements())):
+        for child in list(stanza.elements()):
             if (
                 child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
                 and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
             ):
                 # Remove the child from the stanza
-                stanza_children.remove(child)
+                stanza.children.remove(child)
 
                 # A namespace of ``None`` can be used on domish elements to inherit the
                 # namespace from the parent. When moving elements from the stanza root to
@@ -316,7 +309,7 @@
                     child.defaultUri = C.NS_CLIENT
 
                 # Add the child with corrected namespaces to the content element
-                content_children.append(child)
+                content.addChild(child)
 
         # Add the affixes requested by the profile
         if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED:
@@ -345,7 +338,7 @@
             time_element["stamp"] = XEP_0082.format_datetime()
 
         if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED:
-            recipient = cast(Optional[str], stanza.getAttribute("to", None))
+            recipient = stanza.getAttribute("to", None)
             if recipient is None:
                 raise ValueError(
                     "<to/> affix requested, but stanza doesn't have the 'to' attribute"
@@ -356,7 +349,7 @@
             to_element["jid"] = jid.JID(recipient).userhost()
 
         if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED:
-            sender = cast(Optional[str], stanza.getAttribute("from", None))
+            sender = stanza.getAttribute("from", None)
             if sender is None:
                 raise ValueError(
                     "<from/> affix requested, but stanza doesn't have the 'from'"
@@ -370,7 +363,7 @@
             if policy is not SCEAffixPolicy.NOT_NEEDED:
                 envelope.addChild(affix.create(stanza))
 
-        return cast(str, envelope.toXml()).encode("utf-8")
+        return envelope.toXml().encode("utf-8")
 
     @staticmethod
     def unpack_stanza(
@@ -431,7 +424,7 @@
 
         # Prepare the envelope and content elements
         envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
-        content = cast(domish.Element, next(envelope.elements(NS_SCE, "content")))
+        content = next(envelope.elements(NS_SCE, "content"))
 
         # Verify the affixes
         rpad_element = cast(
@@ -468,7 +461,7 @@
         if to_element is not None:
             recipient_value = jid.JID(to_element["jid"])
 
-            recipient_actual = cast(Optional[str], stanza.getAttribute("to", None))
+            recipient_actual = stanza.getAttribute("to", None)
             if recipient_actual is None:
                 raise AffixVerificationFailed(
                     "'To' affix is included in the envelope, but the stanza is lacking a"
@@ -491,7 +484,7 @@
         if from_element is not None:
             sender_value = jid.JID(from_element["jid"])
 
-            sender_actual = cast(Optional[str], stanza.getAttribute("from", None))
+            sender_actual = stanza.getAttribute("from", None)
             if sender_actual is None:
                 raise AffixVerificationFailed(
                     "'From' affix is included in the envelope, but the stanza is lacking"
@@ -551,13 +544,9 @@
                 + custom_missing_string
             )
 
-        # Just for type safety
-        content_children = cast(List[Union[domish.Element, str]], content.children)
-        stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
-
         # Move elements that are not explicitly forbidden from being encrypted from the
         # content element to the stanza.
-        for child in list(cast(Iterator[domish.Element], content.elements())):
+        for child in list(content.elements()):
             if (
                 child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
                 or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
@@ -568,10 +557,10 @@
                 )
             else:
                 # Remove the child from the content element
-                content_children.remove(child)
+                content.children.remove(child)
 
                 # Add the child to the stanza
-                stanza_children.append(child)
+                stanza.addChild(child)
 
         return SCEAffixValues(
             rpad_value,
diff --git a/sat/tools/xmpp_datetime.py b/sat/tools/xmpp_datetime.py
--- a/sat/tools/xmpp_datetime.py
+++ b/sat/tools/xmpp_datetime.py
@@ -16,14 +16,12 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-# Type-check with `mypy --strict`
-# Lint with `pylint`
-
-from sat.core import exceptions
 from datetime import date, datetime, time, timezone
 import re
 from typing import Optional, Tuple
 
+from sat.core import exceptions
+
 
 __all__ = [  # pylint: disable=unused-variable
     "format_date",
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -51,8 +51,9 @@
     'urwid >= 1.2.0, < 3',
     'urwid-satext == 0.9.*',
     'wokkel >= 18.0.0, < 19.0.0',
-    'omemo >= 0.11.0, < 0.13.0',
-    'omemo-backend-signal < 0.3',
+    'omemo >= 1.0.0, < 2',
+    'twomemo >= 1.0.0, < 2',
+    'oldmemo >= 1.0.0, < 2',
     'pyyaml < 7.0.0',
     'sqlalchemy >= 1.4',
     'alembic',
diff --git a/stubs/twisted/__init__.pyi b/stubs/twisted/__init__.pyi
new file mode 100644
diff --git a/stubs/twisted/words/__init__.pyi b/stubs/twisted/words/__init__.pyi
new file mode 100644
diff --git a/stubs/twisted/words/protocols/__init__.pyi b/stubs/twisted/words/protocols/__init__.pyi
new file mode 100644
diff --git a/stubs/twisted/words/protocols/jabber/__init__.pyi b/stubs/twisted/words/protocols/jabber/__init__.pyi
new file mode 100644
diff --git a/stubs/twisted/words/protocols/jabber/error.pyi b/stubs/twisted/words/protocols/jabber/error.pyi
new file mode 100644
--- /dev/null
+++ b/stubs/twisted/words/protocols/jabber/error.pyi
@@ -0,0 +1,14 @@
+from typing import Optional
+
+from twisted.words.xish import domish
+
+
+class BaseError(Exception):
+    namespace: str
+    condition: str
+    text: Optional[str]
+    textLang: Optional[str]
+    appCondition: Optional[domish.Element]
+
+class StanzaError(BaseError):
+    ...
diff --git a/stubs/twisted/words/protocols/jabber/jid.pyi b/stubs/twisted/words/protocols/jabber/jid.pyi
new file mode 100644
--- /dev/null
+++ b/stubs/twisted/words/protocols/jabber/jid.pyi
@@ -0,0 +1,30 @@
+from typing import Tuple, overload
+
+
+User = str
+Host = str
+Resource = str
+
+
+# Note: these typings are incomplete and evolve as needed.
+class JID:
+    user: User
+    host: Host
+    resource: Resource
+
+    @overload
+    def __init__(self, str: str, tuple: None = None) -> None:
+        ...
+
+    @overload
+    def __init__(self, str: None, tuple: Tuple[User, Host, Resource]) -> None:
+        ...
+
+    def userhost(self) -> str:
+        ...
+
+    def userhostJID(self) -> "JID":
+        ...
+
+    def full(self) -> str:
+        ...
diff --git a/stubs/twisted/words/xish/__init__.pyi b/stubs/twisted/words/xish/__init__.pyi
new file mode 100644
diff --git a/stubs/twisted/words/xish/domish.pyi b/stubs/twisted/words/xish/domish.pyi
new file mode 100644
--- /dev/null
+++ b/stubs/twisted/words/xish/domish.pyi
@@ -0,0 +1,80 @@
+from typing import (
+    Dict, Iterator, List, Literal, Optional, Tuple, TypeVar, Union, overload
+)
+
+
+URI = str
+Name = str
+QName = Tuple[Optional[URI], Name]
+AttributeKey = Union[QName, Name]
+Attributes = Dict[AttributeKey, str]
+Prefix = str
+
+
+D = TypeVar("D")
+
+
+# Note: these typings are incomplete and evolve as needed.
+class Element:
+    uri: Optional[URI]
+    name: Name
+    defaultUri: Optional[URI]
+    children: List[Union["Element", str]]
+    attributes: Attributes
+
+    def __init__(
+        self,
+        qname: QName,
+        defaultUri: Optional[URI] = None,
+        attribs: Optional[Attributes] = None,
+        localPrefixes: Optional[Dict[URI, Prefix]] = None
+    ) -> None:
+        ...
+
+    def __getitem__(self, key: AttributeKey) -> str:
+        ...
+
+    def __setitem__(self, key: AttributeKey, value: str) -> None:
+        ...
+
+    @overload
+    def getAttribute(
+        self,
+        attribname: AttributeKey,
+        default: None = None
+    ) -> Union[str, None]:
+        ...
+
+    @overload
+    def getAttribute(self, attribname: AttributeKey, default: D) -> Union[str, D]:
+        ...
+
+    def addChild(self, node: "Element") -> "Element":
+        ...
+
+    def addContent(self, text: str) -> str:
+        ...
+
+    def addElement(
+        self,
+        name: Union[QName, Name],
+        defaultUri: Optional[URI] = None,
+        content: Optional[str] = None
+    ) -> "Element":
+        ...
+
+    def elements(
+        self,
+        uri: Optional[URI] = None,
+        name: Optional[Name] = None
+    ) -> Iterator["Element"]:
+        ...
+
+    def toXml(
+        self,
+        prefixes: Optional[Dict[URI, Prefix]] = None,
+        closeElement: Literal[0, 1] = 1,
+        defaultUri: str = "",
+        prefixesInScope: Optional[List[Prefix]] = None
+    ) -> str:
+        ...
diff --git a/tests/unit/test_plugin_xep_0082.py b/tests/unit/test_plugin_xep_0082.py
--- a/tests/unit/test_plugin_xep_0082.py
+++ b/tests/unit/test_plugin_xep_0082.py
@@ -16,9 +16,6 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-# Type-check with `mypy --strict`
-# Lint with `pylint`
-
 from datetime import date, datetime, time, timezone
 
 import pytest
diff --git a/tests/unit/test_plugin_xep_0420.py b/tests/unit/test_plugin_xep_0420.py
--- a/tests/unit/test_plugin_xep_0420.py
+++ b/tests/unit/test_plugin_xep_0420.py
@@ -16,11 +16,8 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-# Type-check with `mypy --strict --disable-error-code no-untyped-call`
-# Lint with `pylint`
-
 from datetime import datetime, timezone
-from typing import Callable, Iterator, Optional, cast
+from typing import Callable, cast
 
 import pytest
 
@@ -77,8 +74,8 @@
         </xs:element>"""
 
     def create(self, stanza: domish.Element) -> domish.Element:
-        recipient = cast(Optional[str], stanza.getAttribute("to", None))
-        sender = cast(Optional[str], stanza.getAttribute("from", None))
+        recipient = stanza.getAttribute("to", None)
+        sender = stanza.getAttribute("from", None)
 
         if recipient is None or sender is None:
             raise ValueError(
@@ -487,21 +484,15 @@
 
     envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
     envelope = string_to_domish(envelope_serialized.decode("utf-8"))
-    content = next(cast(Iterator[domish.Element], envelope.elements(NS_SCE, "content")))
+    content = next(envelope.elements(NS_SCE, "content"))
 
     # The body should have been assigned ``jabber:client`` as its namespace
-    assert next(
-        cast(Iterator[domish.Element], content.elements("jabber:client", "body")),
-        None
-    ) is not None
+    assert next(content.elements("jabber:client", "body"), None) is not None
 
     XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
 
     # The body should still have ``jabber:client`` after unpacking
-    assert next(
-        cast(Iterator[domish.Element], stanza.elements("jabber:client", "body")),
-        None
-    ) is not None
+    assert next(stanza.elements("jabber:client", "body"), None) is not None
 
 
 def test_non_encryptable_elements() -> None:  # pylint: disable=missing-function-docstring
@@ -520,18 +511,11 @@
 
     envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
     envelope = string_to_domish(envelope_serialized.decode("utf-8"))
-    content = next(cast(Iterator[domish.Element], envelope.elements(NS_SCE, "content")))
+    content = next(envelope.elements(NS_SCE, "content"))
 
     # The store hint must not have been moved to the content element
-    assert next(
-        cast(Iterator[domish.Element], stanza.elements(NS_HINTS, "store")),
-        None
-    ) is not None
-
-    assert next(
-        cast(Iterator[domish.Element], content.elements(NS_HINTS, "store")),
-        None
-    ) is None
+    assert next(stanza.elements(NS_HINTS, "store"), None) is not None
+    assert next(content.elements(NS_HINTS, "store"), None) is None
 
     stanza = string_to_domish(
         """<message from="foo@example.com" to="bar@example.com"></message>"""
@@ -548,10 +532,7 @@
 
     XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
 
-    assert next(
-        cast(Iterator[domish.Element], stanza.elements(NS_HINTS, "store")),
-        None
-    ) is None
+    assert next(stanza.elements(NS_HINTS, "store"), None) is None
 
 
 def test_schema_validation() -> None:  # pylint: disable=missing-function-docstring