HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/twisted/words/test/test_jabberclient.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.words.protocols.jabber.client}
"""

from __future__ import absolute_import, division

from hashlib import sha1

from twisted.internet import defer
from twisted.python.compat import unicode
from twisted.trial import unittest
from twisted.words.protocols.jabber import client, error, jid, xmlstream
from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer
from twisted.words.xish import utility

try:
    from twisted.internet import ssl
except ImportError:
    ssl = None
    skipWhenNoSSL = "SSL not available"
else:
    skipWhenNoSSL = None

IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]'
IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]'
NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND
NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session'
IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION

class CheckVersionInitializerTests(unittest.TestCase):
    def setUp(self):
        a = xmlstream.Authenticator()
        xs = xmlstream.XmlStream(a)
        self.init = client.CheckVersionInitializer(xs)


    def testSupported(self):
        """
        Test supported version number 1.0
        """
        self.init.xmlstream.version = (1, 0)
        self.init.initialize()


    def testNotSupported(self):
        """
        Test unsupported version number 0.0, and check exception.
        """
        self.init.xmlstream.version = (0, 0)
        exc = self.assertRaises(error.StreamError, self.init.initialize)
        self.assertEqual('unsupported-version', exc.condition)



class InitiatingInitializerHarness(object):
    """
    Testing harness for interacting with XML stream initializers.

    This sets up an L{utility.XmlPipe} to create a communication channel between
    the initializer and the stubbed receiving entity. It features a sink and
    source side that both act similarly to a real L{xmlstream.XmlStream}. The
    sink is augmented with an authenticator to which initializers can be added.

    The harness also provides some utility methods to work with event observers
    and deferreds.
    """

    def setUp(self):
        self.output = []
        self.pipe = utility.XmlPipe()
        self.xmlstream = self.pipe.sink
        self.authenticator = xmlstream.ConnectAuthenticator('example.org')
        self.xmlstream.authenticator = self.authenticator


    def waitFor(self, event, handler):
        """
        Observe an output event, returning a deferred.

        The returned deferred will be fired when the given event has been
        observed on the source end of the L{XmlPipe} tied to the protocol
        under test. The handler is added as the first callback.

        @param event: The event to be observed. See
            L{utility.EventDispatcher.addOnetimeObserver}.
        @param handler: The handler to be called with the observed event object.
        @rtype: L{defer.Deferred}.
        """
        d = defer.Deferred()
        d.addCallback(handler)
        self.pipe.source.addOnetimeObserver(event, d.callback)
        return d



class IQAuthInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
    """
    Tests for L{client.IQAuthInitializer}.
    """

    def setUp(self):
        super(IQAuthInitializerTests, self).setUp()
        self.init = client.IQAuthInitializer(self.xmlstream)
        self.authenticator.jid = jid.JID('user@example.com/resource')
        self.authenticator.password = u'secret'


    def testPlainText(self):
        """
        Test plain-text authentication.

        Act as a server supporting plain-text authentication and expect the
        C{password} field to be filled with the password. Then act as if
        authentication succeeds.
        """

        def onAuthGet(iq):
            """
            Called when the initializer sent a query for authentication methods.

            The response informs the client that plain-text authentication
            is supported.
            """

            # Create server response
            response = xmlstream.toResponse(iq, 'result')
            response.addElement(('jabber:iq:auth', 'query'))
            response.query.addElement('username')
            response.query.addElement('password')
            response.query.addElement('resource')

            # Set up an observer for the next request we expect.
            d = self.waitFor(IQ_AUTH_SET, onAuthSet)

            # Send server response
            self.pipe.source.send(response)

            return d

        def onAuthSet(iq):
            """
            Called when the initializer sent the authentication request.

            The server checks the credentials and responds with an empty result
            signalling success.
            """
            self.assertEqual('user', unicode(iq.query.username))
            self.assertEqual('secret', unicode(iq.query.password))
            self.assertEqual('resource', unicode(iq.query.resource))

            # Send server response
            response = xmlstream.toResponse(iq, 'result')
            self.pipe.source.send(response)

        # Set up an observer for the request for authentication fields
        d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)

        # Start the initializer
        d2 = self.init.initialize()
        return defer.gatherResults([d1, d2])


    def testDigest(self):
        """
        Test digest authentication.

        Act as a server supporting digest authentication and expect the
        C{digest} field to be filled with a sha1 digest of the concatenated
        stream session identifier and password. Then act as if authentication
        succeeds.
        """

        def onAuthGet(iq):
            """
            Called when the initializer sent a query for authentication methods.

            The response informs the client that digest authentication is
            supported.
            """

            # Create server response
            response = xmlstream.toResponse(iq, 'result')
            response.addElement(('jabber:iq:auth', 'query'))
            response.query.addElement('username')
            response.query.addElement('digest')
            response.query.addElement('resource')

            # Set up an observer for the next request we expect.
            d = self.waitFor(IQ_AUTH_SET, onAuthSet)

            # Send server response
            self.pipe.source.send(response)

            return d

        def onAuthSet(iq):
            """
            Called when the initializer sent the authentication request.

            The server checks the credentials and responds with an empty result
            signalling success.
            """
            self.assertEqual('user', unicode(iq.query.username))
            self.assertEqual(sha1(b'12345secret').hexdigest(),
                              unicode(iq.query.digest))
            self.assertEqual('resource', unicode(iq.query.resource))

            # Send server response
            response = xmlstream.toResponse(iq, 'result')
            self.pipe.source.send(response)

        # Digest authentication relies on the stream session identifier. Set it.
        self.xmlstream.sid = u'12345'

        # Set up an observer for the request for authentication fields
        d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)

        # Start the initializer
        d2 = self.init.initialize()

        return defer.gatherResults([d1, d2])


    def testFailRequestFields(self):
        """
        Test initializer failure of request for fields for authentication.
        """
        def onAuthGet(iq):
            """
            Called when the initializer sent a query for authentication methods.

            The server responds that the client is not authorized to authenticate.
            """
            response = error.StanzaError('not-authorized').toResponse(iq)
            self.pipe.source.send(response)

        # Set up an observer for the request for authentication fields
        d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)

        # Start the initializer
        d2 = self.init.initialize()

        # The initialized should fail with a stanza error.
        self.assertFailure(d2, error.StanzaError)

        return defer.gatherResults([d1, d2])


    def testFailAuth(self):
        """
        Test initializer failure to authenticate.
        """

        def onAuthGet(iq):
            """
            Called when the initializer sent a query for authentication methods.

            The response informs the client that plain-text authentication
            is supported.
            """

            # Send server response
            response = xmlstream.toResponse(iq, 'result')
            response.addElement(('jabber:iq:auth', 'query'))
            response.query.addElement('username')
            response.query.addElement('password')
            response.query.addElement('resource')

            # Set up an observer for the next request we expect.
            d = self.waitFor(IQ_AUTH_SET, onAuthSet)

            # Send server response
            self.pipe.source.send(response)

            return d

        def onAuthSet(iq):
            """
            Called when the initializer sent the authentication request.

            The server checks the credentials and responds with a not-authorized
            stanza error.
            """
            response = error.StanzaError('not-authorized').toResponse(iq)
            self.pipe.source.send(response)

        # Set up an observer for the request for authentication fields
        d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)

        # Start the initializer
        d2 = self.init.initialize()

        # The initializer should fail with a stanza error.
        self.assertFailure(d2, error.StanzaError)

        return defer.gatherResults([d1, d2])



class BindInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
    """
    Tests for L{client.BindInitializer}.
    """

    def setUp(self):
        super(BindInitializerTests, self).setUp()
        self.init = client.BindInitializer(self.xmlstream)
        self.authenticator.jid = jid.JID('user@example.com/resource')


    def testBasic(self):
        """
        Set up a stream, and act as if resource binding succeeds.
        """
        def onBind(iq):
            response = xmlstream.toResponse(iq, 'result')
            response.addElement((NS_BIND, 'bind'))
            response.bind.addElement('jid',
                                     content=u'user@example.com/other resource')
            self.pipe.source.send(response)

        def cb(result):
            self.assertEqual(jid.JID('user@example.com/other resource'),
                              self.authenticator.jid)

        d1 = self.waitFor(IQ_BIND_SET, onBind)
        d2 = self.init.start()
        d2.addCallback(cb)
        return defer.gatherResults([d1, d2])


    def testFailure(self):
        """
        Set up a stream, and act as if resource binding fails.
        """
        def onBind(iq):
            response = error.StanzaError('conflict').toResponse(iq)
            self.pipe.source.send(response)

        d1 = self.waitFor(IQ_BIND_SET, onBind)
        d2 = self.init.start()
        self.assertFailure(d2, error.StanzaError)
        return defer.gatherResults([d1, d2])



class SessionInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
    """
    Tests for L{client.SessionInitializer}.
    """

    def setUp(self):
        super(SessionInitializerTests, self).setUp()
        self.init = client.SessionInitializer(self.xmlstream)


    def testSuccess(self):
        """
        Set up a stream, and act as if session establishment succeeds.
        """

        def onSession(iq):
            response = xmlstream.toResponse(iq, 'result')
            self.pipe.source.send(response)

        d1 = self.waitFor(IQ_SESSION_SET, onSession)
        d2 = self.init.start()
        return defer.gatherResults([d1, d2])


    def testFailure(self):
        """
        Set up a stream, and act as if session establishment fails.
        """
        def onSession(iq):
            response = error.StanzaError('forbidden').toResponse(iq)
            self.pipe.source.send(response)

        d1 = self.waitFor(IQ_SESSION_SET, onSession)
        d2 = self.init.start()
        self.assertFailure(d2, error.StanzaError)
        return defer.gatherResults([d1, d2])



class BasicAuthenticatorTests(unittest.TestCase):
    """
    Test for both BasicAuthenticator and basicClientFactory.
    """

    def test_basic(self):
        """
        Authenticator and stream are properly constructed by the factory.

        The L{xmlstream.XmlStream} protocol created by the factory has the new
        L{client.BasicAuthenticator} instance in its C{authenticator}
        attribute.  It is set up with C{jid} and C{password} as passed to the
        factory, C{otherHost} taken from the client JID. The stream futher has
        two initializers, for TLS and authentication, of which the first has
        its C{required} attribute set to C{True}.
        """
        self.client_jid = jid.JID('user@example.com/resource')

        # Get an XmlStream instance. Note that it gets initialized with the
        # XMPPAuthenticator (that has its associateWithXmlStream called) that
        # is in turn initialized with the arguments to the factory.
        xs = client.basicClientFactory(self.client_jid,
                                       'secret').buildProtocol(None)

        # test authenticator's instance variables
        self.assertEqual('example.com', xs.authenticator.otherHost)
        self.assertEqual(self.client_jid, xs.authenticator.jid)
        self.assertEqual('secret', xs.authenticator.password)

        # test list of initializers
        tls, auth = xs.initializers

        self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer)
        self.assertIsInstance(auth, client.IQAuthInitializer)

        self.assertFalse(tls.required)



class XMPPAuthenticatorTests(unittest.TestCase):
    """
    Test for both XMPPAuthenticator and XMPPClientFactory.
    """

    def test_basic(self):
        """
        Test basic operations.

        Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let
        it produce a protocol instance. Then inspect the instance variables of
        the authenticator and XML stream objects.
        """
        self.client_jid = jid.JID('user@example.com/resource')

        # Get an XmlStream instance. Note that it gets initialized with the
        # XMPPAuthenticator (that has its associateWithXmlStream called) that
        # is in turn initialized with the arguments to the factory.
        xs = client.XMPPClientFactory(self.client_jid,
                                      'secret').buildProtocol(None)

        # test authenticator's instance variables
        self.assertEqual('example.com', xs.authenticator.otherHost)
        self.assertEqual(self.client_jid, xs.authenticator.jid)
        self.assertEqual('secret', xs.authenticator.password)

        # test list of initializers
        version, tls, sasl, bind, session = xs.initializers

        self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer)
        self.assertIsInstance(sasl, SASLInitiatingInitializer)
        self.assertIsInstance(bind, client.BindInitializer)
        self.assertIsInstance(session, client.SessionInitializer)

        self.assertTrue(tls.required)
        self.assertTrue(sasl.required)
        self.assertTrue(bind.required)
        self.assertFalse(session.required)


    def test_tlsConfiguration(self):
        """
        A TLS configuration is passed to the TLS initializer.
        """
        configs = []

        def init(self, xs, required=True, configurationForTLS=None):
            configs.append(configurationForTLS)

        self.client_jid = jid.JID('user@example.com/resource')

        # Get an XmlStream instance. Note that it gets initialized with the
        # XMPPAuthenticator (that has its associateWithXmlStream called) that
        # is in turn initialized with the arguments to the factory.
        configurationForTLS = ssl.CertificateOptions()
        factory = client.XMPPClientFactory(
            self.client_jid, 'secret',
            configurationForTLS=configurationForTLS)
        self.patch(xmlstream.TLSInitiatingInitializer, "__init__", init)
        xs = factory.buildProtocol(None)

        # test list of initializers
        version, tls, sasl, bind, session = xs.initializers

        self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer)
        self.assertIs(configurationForTLS, configs[0])


    test_tlsConfiguration.skip = skipWhenNoSSL