HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/twisted/cred/test/test_digestauth.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.cred._digest} and the associated bits in
L{twisted.cred.credentials}.
"""

from __future__ import division, absolute_import

import base64

from binascii import hexlify
from hashlib import md5, sha1

from zope.interface.verify import verifyObject
from twisted.trial.unittest import TestCase
from twisted.internet.address import IPv4Address
from twisted.cred.error import LoginFailed
from twisted.cred.credentials import calcHA1, calcHA2, IUsernameDigestHash
from twisted.cred.credentials import calcResponse, DigestCredentialFactory
from twisted.python.compat import networkString

def b64encode(s):
    return base64.b64encode(s).strip()



class FakeDigestCredentialFactory(DigestCredentialFactory):
    """
    A Fake Digest Credential Factory that generates a predictable
    nonce and opaque
    """
    def __init__(self, *args, **kwargs):
        super(FakeDigestCredentialFactory, self).__init__(*args, **kwargs)
        self.privateKey = b"0"


    def _generateNonce(self):
        """
        Generate a static nonce
        """
        return b'178288758716122392881254770685'


    def _getTime(self):
        """
        Return a stable time
        """
        return 0



class DigestAuthTests(TestCase):
    """
    L{TestCase} mixin class which defines a number of tests for
    L{DigestCredentialFactory}.  Because this mixin defines C{setUp}, it
    must be inherited before L{TestCase}.
    """
    def setUp(self):
        """
        Create a DigestCredentialFactory for testing
        """
        self.username = b"foobar"
        self.password = b"bazquux"
        self.realm = b"test realm"
        self.algorithm = b"md5"
        self.cnonce = b"29fc54aa1641c6fa0e151419361c8f23"
        self.qop = b"auth"
        self.uri = b"/write/"
        self.clientAddress = IPv4Address('TCP', '10.2.3.4', 43125)
        self.method = b'GET'
        self.credentialFactory = DigestCredentialFactory(
            self.algorithm, self.realm)


    def test_MD5HashA1(self, _algorithm=b'md5', _hash=md5):
        """
        L{calcHA1} accepts the C{'md5'} algorithm and returns an MD5 hash of
        its parameters, excluding the nonce and cnonce.
        """
        nonce = b'abc123xyz'
        hashA1 = calcHA1(_algorithm, self.username, self.realm, self.password,
                         nonce, self.cnonce)
        a1 = b":".join((self.username, self.realm, self.password))
        expected = hexlify(_hash(a1).digest())
        self.assertEqual(hashA1, expected)


    def test_MD5SessionHashA1(self):
        """
        L{calcHA1} accepts the C{'md5-sess'} algorithm and returns an MD5 hash
        of its parameters, including the nonce and cnonce.
        """
        nonce = b'xyz321abc'
        hashA1 = calcHA1(b'md5-sess', self.username, self.realm, self.password,
                         nonce, self.cnonce)
        a1 = self.username + b':' + self.realm + b':' + self.password
        ha1 = hexlify(md5(a1).digest())
        a1 = ha1 + b':' + nonce + b':' + self.cnonce
        expected = hexlify(md5(a1).digest())
        self.assertEqual(hashA1, expected)


    def test_SHAHashA1(self):
        """
        L{calcHA1} accepts the C{'sha'} algorithm and returns a SHA hash of its
        parameters, excluding the nonce and cnonce.
        """
        self.test_MD5HashA1(b'sha', sha1)


    def test_MD5HashA2Auth(self, _algorithm=b'md5', _hash=md5):
        """
        L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
        its arguments, excluding the entity hash for QOP other than
        C{'auth-int'}.
        """
        method = b'GET'
        hashA2 = calcHA2(_algorithm, method, self.uri, b'auth', None)
        a2 = method + b':' + self.uri
        expected = hexlify(_hash(a2).digest())
        self.assertEqual(hashA2, expected)


    def test_MD5HashA2AuthInt(self, _algorithm=b'md5', _hash=md5):
        """
        L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
        its arguments, including the entity hash for QOP of C{'auth-int'}.
        """
        method = b'GET'
        hentity = b'foobarbaz'
        hashA2 = calcHA2(_algorithm, method, self.uri, b'auth-int', hentity)
        a2 = method + b':' + self.uri + b':' + hentity
        expected = hexlify(_hash(a2).digest())
        self.assertEqual(hashA2, expected)


    def test_MD5SessHashA2Auth(self):
        """
        L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth'} and
        returns the same value as it does for the C{'md5'} algorithm.
        """
        self.test_MD5HashA2Auth(b'md5-sess')


    def test_MD5SessHashA2AuthInt(self):
        """
        L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth-int'}
        and returns the same value as it does for the C{'md5'} algorithm.
        """
        self.test_MD5HashA2AuthInt(b'md5-sess')


    def test_SHAHashA2Auth(self):
        """
        L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
        its arguments, excluding the entity hash for QOP other than
        C{'auth-int'}.
        """
        self.test_MD5HashA2Auth(b'sha', sha1)


    def test_SHAHashA2AuthInt(self):
        """
        L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
        its arguments, including the entity hash for QOP of C{'auth-int'}.
        """
        self.test_MD5HashA2AuthInt(b'sha', sha1)


    def test_MD5HashResponse(self, _algorithm=b'md5', _hash=md5):
        """
        L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
        of its parameters, excluding the nonce count, client nonce, and QoP
        value if the nonce count and client nonce are L{None}
        """
        hashA1 = b'abc123'
        hashA2 = b'789xyz'
        nonce = b'lmnopq'

        response = hashA1 + b':' + nonce + b':' + hashA2
        expected = hexlify(_hash(response).digest())

        digest = calcResponse(hashA1, hashA2, _algorithm, nonce, None, None,
                              None)
        self.assertEqual(expected, digest)


    def test_MD5SessionHashResponse(self):
        """
        L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
        hash of its parameters, excluding the nonce count, client nonce, and
        QoP value if the nonce count and client nonce are L{None}
        """
        self.test_MD5HashResponse(b'md5-sess')


    def test_SHAHashResponse(self):
        """
        L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
        of its parameters, excluding the nonce count, client nonce, and QoP
        value if the nonce count and client nonce are L{None}
        """
        self.test_MD5HashResponse(b'sha', sha1)


    def test_MD5HashResponseExtra(self, _algorithm=b'md5', _hash=md5):
        """
        L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
        of its parameters, including the nonce count, client nonce, and QoP
        value if they are specified.
        """
        hashA1 = b'abc123'
        hashA2 = b'789xyz'
        nonce = b'lmnopq'
        nonceCount = b'00000004'
        clientNonce = b'abcxyz123'
        qop = b'auth'

        response = hashA1 + b':' + nonce + b':' + nonceCount + b':' +\
                   clientNonce + b':' + qop + b':' + hashA2
        expected = hexlify(_hash(response).digest())

        digest = calcResponse(
            hashA1, hashA2, _algorithm, nonce, nonceCount, clientNonce, qop)
        self.assertEqual(expected, digest)


    def test_MD5SessionHashResponseExtra(self):
        """
        L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
        hash of its parameters, including the nonce count, client nonce, and
        QoP value if they are specified.
        """
        self.test_MD5HashResponseExtra(b'md5-sess')


    def test_SHAHashResponseExtra(self):
        """
        L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
        of its parameters, including the nonce count, client nonce, and QoP
        value if they are specified.
        """
        self.test_MD5HashResponseExtra(b'sha', sha1)


    def formatResponse(self, quotes=True, **kw):
        """
        Format all given keyword arguments and their values suitably for use as
        the value of an HTTP header.

        @types quotes: C{bool}
        @param quotes: A flag indicating whether to quote the values of each
            field in the response.

        @param **kw: Keywords and C{bytes} values which will be treated as field
            name/value pairs to include in the result.

        @rtype: C{bytes}
        @return: The given fields formatted for use as an HTTP header value.
        """
        if 'username' not in kw:
            kw['username'] = self.username
        if 'realm' not in kw:
            kw['realm'] = self.realm
        if 'algorithm' not in kw:
            kw['algorithm'] = self.algorithm
        if 'qop' not in kw:
            kw['qop'] = self.qop
        if 'cnonce' not in kw:
            kw['cnonce'] = self.cnonce
        if 'uri' not in kw:
            kw['uri'] = self.uri
        if quotes:
            quote = b'"'
        else:
            quote = b''

        return b', '.join([
                b"".join((networkString(k), b"=", quote, v, quote))
                for (k, v)
                in kw.items()
                if v is not None])


    def getDigestResponse(self, challenge, ncount):
        """
        Calculate the response for the given challenge
        """
        nonce = challenge.get('nonce')
        algo = challenge.get('algorithm').lower()
        qop = challenge.get('qop')

        ha1 = calcHA1(
            algo, self.username, self.realm, self.password, nonce, self.cnonce)
        ha2 = calcHA2(algo, b"GET", self.uri, qop, None)
        expected = calcResponse(
            ha1, ha2, algo, nonce, ncount, self.cnonce, qop)
        return expected


    def test_response(self, quotes=True):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        and parses it into an L{IUsernameHashedPassword} provider.
        """
        challenge = self.credentialFactory.getChallenge(
            self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            quotes=quotes,
            nonce=challenge['nonce'],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge['opaque'])
        creds = self.credentialFactory.decode(
            clientResponse, self.method, self.clientAddress.host)
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b'wrong'))


    def test_responseWithoutQuotes(self):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        which does not quote the values of its fields and parses it into an
        L{IUsernameHashedPassword} provider in the same way it would a
        response which included quoted field values.
        """
        self.test_response(False)


    def test_responseWithCommaURI(self):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        which quotes the values of its fields and includes a C{b","} in the URI
        field.
        """
        self.uri = b"/some,path/"
        self.test_response(True)


    def test_caseInsensitiveAlgorithm(self):
        """
        The case of the algorithm value in the response is ignored when
        checking the credentials.
        """
        self.algorithm = b'MD5'
        self.test_response()


    def test_md5DefaultAlgorithm(self):
        """
        The algorithm defaults to MD5 if it is not supplied in the response.
        """
        self.algorithm = None
        self.test_response()


    def test_responseWithoutClientIP(self):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        even if the client address it is passed is L{None}.
        """
        challenge = self.credentialFactory.getChallenge(None)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge['nonce'],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge['opaque'])
        creds = self.credentialFactory.decode(clientResponse, self.method,
                                              None)
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b'wrong'))


    def test_multiResponse(self):
        """
        L{DigestCredentialFactory.decode} handles multiple responses to a
        single challenge.
        """
        challenge = self.credentialFactory.getChallenge(
            self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge['nonce'],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge['opaque'])

        creds = self.credentialFactory.decode(clientResponse, self.method,
                                              self.clientAddress.host)
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b'wrong'))

        nc = b"00000002"
        clientResponse = self.formatResponse(
            nonce=challenge['nonce'],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge['opaque'])

        creds = self.credentialFactory.decode(clientResponse, self.method,
                                              self.clientAddress.host)
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b'wrong'))


    def test_failsWithDifferentMethod(self):
        """
        L{DigestCredentialFactory.decode} returns an L{IUsernameHashedPassword}
        provider which rejects a correct password for the given user if the
        challenge response request is made using a different HTTP method than
        was used to request the initial challenge.
        """
        challenge = self.credentialFactory.getChallenge(
            self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge['nonce'],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge['opaque'])
        creds = self.credentialFactory.decode(clientResponse, b'POST',
                                              self.clientAddress.host)
        self.assertFalse(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b'wrong'))


    def test_noUsername(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
        has no username field or if the username field is empty.
        """
        # Check for no username
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(username=None),
            self.method, self.clientAddress.host)
        self.assertEqual(str(e), "Invalid response, no username given.")

        # Check for an empty username
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(username=b""),
            self.method, self.clientAddress.host)
        self.assertEqual(str(e), "Invalid response, no username given.")


    def test_noNonce(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
        has no nonce.
        """
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(opaque=b"abc123"),
            self.method, self.clientAddress.host)
        self.assertEqual(str(e), "Invalid response, no nonce given.")


    def test_noOpaque(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
        has no opaque.
        """
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(),
            self.method, self.clientAddress.host)
        self.assertEqual(str(e), "Invalid response, no opaque given.")


    def test_checkHash(self):
        """
        L{DigestCredentialFactory.decode} returns an L{IUsernameDigestHash}
        provider which can verify a hash of the form 'username:realm:password'.
        """
        challenge = self.credentialFactory.getChallenge(
            self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge['nonce'],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge['opaque'])

        creds = self.credentialFactory.decode(clientResponse, self.method,
                                              self.clientAddress.host)
        self.assertTrue(verifyObject(IUsernameDigestHash, creds))

        cleartext = self.username + b":" + self.realm + b":" + self.password
        hash = md5(cleartext)
        self.assertTrue(creds.checkHash(hexlify(hash.digest())))
        hash.update(b'wrong')
        self.assertFalse(creds.checkHash(hexlify(hash.digest())))


    def test_invalidOpaque(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
        value does not contain all the required parts.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm,
                                                        self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            b'badOpaque',
            challenge['nonce'],
            self.clientAddress.host)
        self.assertEqual(str(exc), 'Invalid response, invalid opaque value')

        badOpaque = b'foo-' + b64encode(b'nonce,clientip')

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badOpaque,
            challenge['nonce'],
            self.clientAddress.host)
        self.assertEqual(str(exc), 'Invalid response, invalid opaque value')

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            b'',
            challenge['nonce'],
            self.clientAddress.host)
        self.assertEqual(str(exc), 'Invalid response, invalid opaque value')

        badOpaque = b'foo-' + b64encode(
            b",".join((challenge['nonce'],
                       networkString(self.clientAddress.host),
                       b"foobar")))
        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badOpaque,
            challenge['nonce'],
            self.clientAddress.host)
        self.assertEqual(
            str(exc), 'Invalid response, invalid opaque/time values')


    def test_incompatibleNonce(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
        nonce from the response does not match the nonce encoded in the opaque.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm,
                                                        self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        badNonceOpaque = credentialFactory._generateOpaque(
            b'1234567890',
            self.clientAddress.host)

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badNonceOpaque,
            challenge['nonce'],
            self.clientAddress.host)
        self.assertEqual(
            str(exc),
            'Invalid response, incompatible opaque/nonce values')

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badNonceOpaque,
            b'',
            self.clientAddress.host)
        self.assertEqual(
            str(exc),
            'Invalid response, incompatible opaque/nonce values')


    def test_incompatibleClientIP(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the
        request comes from a client IP other than what is encoded in the
        opaque.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm,
                                                        self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        badAddress = '10.0.0.1'
        # Sanity check
        self.assertNotEqual(self.clientAddress.host, badAddress)

        badNonceOpaque = credentialFactory._generateOpaque(
            challenge['nonce'], badAddress)

        self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badNonceOpaque,
            challenge['nonce'],
            self.clientAddress.host)


    def test_oldNonce(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
        opaque is older than C{DigestCredentialFactory.CHALLENGE_LIFETIME_SECS}
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm,
                                                        self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        key = b",".join((challenge['nonce'],
                         networkString(self.clientAddress.host),
                         b'-137876876'))
        digest = hexlify(md5(key + credentialFactory.privateKey).digest())
        ekey = b64encode(key)

        oldNonceOpaque = b"-".join((digest, ekey.strip(b'\n')))

        self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            oldNonceOpaque,
            challenge['nonce'],
            self.clientAddress.host)


    def test_mismatchedOpaqueChecksum(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
        checksum fails verification.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm,
                                                        self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        key = b",".join((challenge['nonce'],
                         networkString(self.clientAddress.host),
                         b'0'))

        digest = hexlify(md5(key + b'this is not the right pkey').digest())
        badChecksum = b"-".join((digest, b64encode(key)))

        self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badChecksum,
            challenge['nonce'],
            self.clientAddress.host)


    def test_incompatibleCalcHA1Options(self):
        """
        L{calcHA1} raises L{TypeError} when any of the pszUsername, pszRealm,
        or pszPassword arguments are specified with the preHA1 keyword
        argument.
        """
        arguments = (
            (b"user", b"realm", b"password", b"preHA1"),
            (None, b"realm", None, b"preHA1"),
            (None, None, b"password", b"preHA1"),
            )

        for pszUsername, pszRealm, pszPassword, preHA1 in arguments:
            self.assertRaises(
                TypeError,
                calcHA1,
                b"md5",
                pszUsername,
                pszRealm,
                pszPassword,
                b"nonce",
                b"cnonce",
                preHA1=preHA1)


    def test_noNewlineOpaque(self):
        """
        L{DigestCredentialFactory._generateOpaque} returns a value without
        newlines, regardless of the length of the nonce.
        """
        opaque = self.credentialFactory._generateOpaque(
            b"long nonce " * 10, None)
        self.assertNotIn(b'\n', opaque)