HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/lib/python3/dist-packages/macaroonbakery/tests/test_client.py
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import base64
import datetime
import json
import threading

import macaroonbakery.bakery as bakery
import macaroonbakery.checkers as checkers
import macaroonbakery.httpbakery as httpbakery
import pymacaroons
import requests
import macaroonbakery._utils as utils

from fixtures import (
    EnvironmentVariable,
    TestWithFixtures,
)
from httmock import HTTMock, urlmatch
from six.moves.urllib.parse import parse_qs
from six.moves.urllib.request import Request

try:
    from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
except ImportError:
    from http.server import HTTPServer, BaseHTTPRequestHandler

AGES = datetime.datetime.utcnow() + datetime.timedelta(days=1)
TEST_OP = bakery.Op(entity='test', action='test')


class TestClient(TestWithFixtures):
    def setUp(self):
        super(TestClient, self).setUp()
        # http_proxy would cause requests to talk to the proxy, which is
        # unlikely to know how to talk to the test server.
        self.useFixture(EnvironmentVariable('http_proxy'))
        self.useFixture(EnvironmentVariable('HTTP_PROXY'))

    def test_single_service_first_party(self):
        b = new_bakery('loc', None, None)

        def handler(*args):
            GetHandler(b, None, None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            srv_macaroon = b.oven.macaroon(
                version=bakery.LATEST_VERSION, expiry=AGES,
                caveats=None, ops=[TEST_OP])
            self.assertEquals(srv_macaroon.macaroon.location, 'loc')
            client = httpbakery.Client()
            client.cookies.set_cookie(requests.cookies.create_cookie(
                'macaroon-test', base64.b64encode(json.dumps([
                    srv_macaroon.to_dict().get('m')
                ]).encode('utf-8')).decode('utf-8')
            ))
            resp = requests.get(
                url='http://' + httpd.server_address[0] + ':' +
                    str(httpd.server_address[1]),
                cookies=client.cookies, auth=client.auth())
            resp.raise_for_status()
            self.assertEquals(resp.text, 'done')
        finally:
            httpd.shutdown()

    def test_single_service_third_party(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.LATEST_VERSION,
                    )

        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            self.assertEqual(url.path, '/discharge')
            qs = parse_qs(request.body)
            content = {q: qs[q][0] for q in qs}
            m = httpbakery.discharge(checkers.AuthContext(), content, d.key, d,
                                     alwaysOK3rd)
            return {
                'status_code': 200,
                'content': {
                    'Macaroon': m.to_dict()
                }
            }

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4', None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            server_url = 'http://' + httpd.server_address[0] + ':' + str(httpd.server_address[1])
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            client = httpbakery.Client()
            with HTTMock(discharge):
                resp = requests.get(
                    url=server_url,
                    cookies=client.cookies,
                    auth=client.auth())
            resp.raise_for_status()
            self.assertEquals(resp.text, 'done')
        finally:
            httpd.shutdown()

    def test_single_service_third_party_with_path(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4/some/path':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.LATEST_VERSION,
                    )

        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            self.assertEqual(url.path, '/some/path/discharge')
            qs = parse_qs(request.body)
            content = {q: qs[q][0] for q in qs}
            m = httpbakery.discharge(checkers.AuthContext(), content, d.key, d,
                                     alwaysOK3rd)
            return {
                'status_code': 200,
                'content': {
                    'Macaroon': m.to_dict()
                }
            }

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4/some/path', None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            server_url = 'http://' + httpd.server_address[0] + ':' + str(httpd.server_address[1])
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            client = httpbakery.Client()
            with HTTMock(discharge):
                resp = requests.get(
                    url=server_url,
                    cookies=client.cookies,
                    auth=client.auth())
            resp.raise_for_status()
            self.assertEquals(resp.text, 'done')
        finally:
            httpd.shutdown()

    def test_single_service_third_party_version_1_caveat(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.VERSION_1,
                    )

        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            qs = parse_qs(request.body)
            content = {q: qs[q][0] for q in qs}
            m = httpbakery.discharge(checkers.AuthContext(), content, d.key, d,
                                     alwaysOK3rd)
            return {
                'status_code': 200,
                'content': {
                    'Macaroon': m.to_dict()
                }
            }

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4', None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            server_url = 'http://' + httpd.server_address[0] + ':' + str(httpd.server_address[1])
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            client = httpbakery.Client()
            with HTTMock(discharge):
                resp = requests.get(
                    url=server_url,
                    cookies=client.cookies,
                    auth=client.auth())
            resp.raise_for_status()
            self.assertEquals(resp.text, 'done')
        finally:
            httpd.shutdown()

    def test_cookie_domain_host_not_fqdn(self):
        # See
        # https://github.com/go-macaroon-bakery/py-macaroon-bakery/issues/53

        b = new_bakery('loc', None, None)

        def handler(*args):
            GetHandler(b, None, None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            srv_macaroon = b.oven.macaroon(
                version=bakery.LATEST_VERSION, expiry=AGES,
                caveats=None, ops=[TEST_OP])
            self.assertEquals(srv_macaroon.macaroon.location, 'loc')
            client = httpbakery.Client()
            # Note: by using "localhost" instead of the presumably numeric address held
            # in httpd.server_address, we're triggering the no-FQDN logic in the cookie
            # code.
            resp = requests.get(
                url='http://localhost:' + str(httpd.server_address[1]),
                cookies=client.cookies, auth=client.auth())
            resp.raise_for_status()
            self.assertEquals(resp.text, 'done')
        except httpbakery.BakeryException:
            pass  # interacion required exception is expected
        finally:
            httpd.shutdown()

        # the cookie has the .local domain appended
        [cookie] = client.cookies
        self.assertEqual(cookie.name, 'macaroon-test')
        self.assertEqual(cookie.domain, 'localhost.local')

    def test_single_party_with_header(self):
        b = new_bakery('loc', None, None)

        def handler(*args):
            GetHandler(b, None, None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            srv_macaroon = b.oven.macaroon(
                version=bakery.LATEST_VERSION,
                expiry=AGES, caveats=None, ops=[TEST_OP])
            self.assertEquals(srv_macaroon.macaroon.location, 'loc')
            headers = {
                'Macaroons': base64.b64encode(json.dumps([
                    srv_macaroon.to_dict().get('m')
                ]).encode('utf-8'))
            }
            resp = requests.get(
                url='http://' + httpd.server_address[0] + ':' +
                    str(httpd.server_address[1]),
                headers=headers)
            resp.raise_for_status()
            self.assertEquals(resp.text, 'done')
        finally:
            httpd.shutdown()

    def test_expiry_cookie_is_set(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.LATEST_VERSION,
                    )

        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            qs = parse_qs(request.body)
            content = {q: qs[q][0] for q in qs}
            m = httpbakery.discharge(checkers.AuthContext(), content, d.key, d,
                                     alwaysOK3rd)
            return {
                'status_code': 200,
                'content': {
                    'Macaroon': m.to_dict()
                }
            }

        ages = datetime.datetime.utcnow() + datetime.timedelta(days=1)

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4', None, None, None, ages, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            client = httpbakery.Client()
            with HTTMock(discharge):
                resp = requests.get(
                    url='http://' + httpd.server_address[0] + ':' +
                        str(httpd.server_address[1]),
                    cookies=client.cookies,
                    auth=client.auth())
            resp.raise_for_status()
            m = bakery.Macaroon.from_dict(json.loads(
                base64.b64decode(client.cookies.get('macaroon-test')).decode('utf-8'))[0])
            t = checkers.macaroons_expiry_time(
                checkers.Namespace(), [m.macaroon])
            self.assertEquals(ages, t)
            self.assertEquals(resp.text, 'done')
        finally:
            httpd.shutdown()

    def test_expiry_cookie_set_in_past(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.LATEST_VERSION,
                    )

        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            qs = parse_qs(request.body)
            content = {q: qs[q][0] for q in qs}
            m = httpbakery.discharge(checkers.AuthContext(), content, d.key, d,
                                     alwaysOK3rd)
            return {
                'status_code': 200,
                'content': {
                    'Macaroon': m.to_dict()
                }
            }

        ages = datetime.datetime.utcnow() - datetime.timedelta(days=1)

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4', None, None, None, ages, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            client = httpbakery.Client()
            with HTTMock(discharge):
                with self.assertRaises(httpbakery.BakeryException) as ctx:
                    requests.get(
                        url='http://' + httpd.server_address[0] + ':' +
                            str(httpd.server_address[1]),
                        cookies=client.cookies,
                        auth=client.auth())
            self.assertEqual(ctx.exception.args[0],
                             'too many (3) discharge requests')
        finally:
            httpd.shutdown()

    def test_too_many_discharge(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.LATEST_VERSION,
                    )

        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            wrong_macaroon = bakery.Macaroon(
                root_key=b'some key', id=b'xxx',
                location='some other location',
                version=bakery.VERSION_0)
            return {
                'status_code': 200,
                'content': {
                    'Macaroon': wrong_macaroon.to_dict()
                }
            }

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4', None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            client = httpbakery.Client()
            with HTTMock(discharge):
                with self.assertRaises(httpbakery.BakeryException) as ctx:
                    requests.get(
                        url='http://' + httpd.server_address[0] + ':' +
                            str(httpd.server_address[1]),
                        cookies=client.cookies,
                        auth=client.auth())
            self.assertEqual(ctx.exception.args[0],
                             'too many (3) discharge requests')
        finally:
            httpd.shutdown()

    def test_third_party_discharge_refused(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.LATEST_VERSION,
                    )

        def check(cond, arg):
            raise bakery.ThirdPartyCaveatCheckFailed('boo! cond' + cond)

        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            qs = parse_qs(request.body)
            content = {q: qs[q][0] for q in qs}
            httpbakery.discharge(checkers.AuthContext(), content, d.key, d,
                                 ThirdPartyCaveatCheckerF(check))

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4', None, None, None, AGES, *args)
        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()
            client = httpbakery.Client()
            with HTTMock(discharge):
                with self.assertRaises(bakery.ThirdPartyCaveatCheckFailed):
                    requests.get(
                        url='http://' + httpd.server_address[0] + ':' +
                            str(httpd.server_address[1]),
                        cookies=client.cookies,
                        auth=client.auth())
        finally:
            httpd.shutdown()

    def test_discharge_with_interaction_required_error(self):
        class _DischargerLocator(bakery.ThirdPartyLocator):
            def __init__(self):
                self.key = bakery.generate_key()

            def third_party_info(self, loc):
                if loc == 'http://1.2.3.4':
                    return bakery.ThirdPartyInfo(
                        public_key=self.key.public_key,
                        version=bakery.LATEST_VERSION,
                    )
        d = _DischargerLocator()
        b = new_bakery('loc', d, None)

        @urlmatch(path='.*/discharge')
        def discharge(url, request):
            return {
                'status_code': 401,
                'content': {
                    'Code': httpbakery.ERR_INTERACTION_REQUIRED,
                    'Message': 'interaction required',
                    'Info': {
                        'WaitURL': 'http://0.1.2.3/',
                        'VisitURL': 'http://0.1.2.3/',
                    },
                }
            }

        def handler(*args):
            GetHandler(b, 'http://1.2.3.4', None, None, None, AGES, *args)

        try:
            httpd = HTTPServer(('', 0), handler)
            thread = threading.Thread(target=httpd.serve_forever)
            thread.start()

            class MyInteractor(httpbakery.LegacyInteractor):
                def legacy_interact(self, ctx, location, visit_url):
                    raise httpbakery.InteractionError('cannot visit')

                def interact(self, ctx, location, interaction_required_err):
                    pass

                def kind(self):
                    return httpbakery.WEB_BROWSER_INTERACTION_KIND

            client = httpbakery.Client(interaction_methods=[MyInteractor()])

            with HTTMock(discharge):
                with self.assertRaises(httpbakery.InteractionError):
                    requests.get(
                        'http://' + httpd.server_address[0] + ':' + str(
                            httpd.server_address[1]),
                        cookies=client.cookies,
                        auth=client.auth())
        finally:
            httpd.shutdown()

    def test_extract_macaroons_from_request(self):
        def encode_macaroon(m):
            macaroons = '[' + utils.macaroon_to_json_string(m) + ']'
            return base64.urlsafe_b64encode(utils.to_bytes(macaroons)).decode('ascii')

        req = Request('http://example.com')
        m1 = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2, identifier='one')
        req.add_header('Macaroons', encode_macaroon(m1))
        m2 = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2, identifier='two')
        jar = requests.cookies.RequestsCookieJar()
        jar.set_cookie(utils.cookie(
            name='macaroon-auth',
            value=encode_macaroon(m2),
            url='http://example.com',
        ))
        jar.set_cookie(utils.cookie(
            name='macaroon-empty',
            value='',
            url='http://example.com',
        ))
        jar.add_cookie_header(req)

        macaroons = httpbakery.extract_macaroons(req)
        self.assertEquals(len(macaroons), 2)
        macaroons.sort(key=lambda ms: ms[0].identifier)
        self.assertEquals(macaroons[0][0].identifier, m1.identifier)
        self.assertEquals(macaroons[1][0].identifier, m2.identifier)

    def test_handle_error_cookie_path(self):
        macaroon = bakery.Macaroon(
            root_key=b'some key', id=b'xxx',
            location='some location',
            version=bakery.VERSION_0)
        info = {
            'Macaroon': macaroon.to_dict(),
            'MacaroonPath': '.',
            'CookieNameSuffix': 'test'
        }
        error = httpbakery.Error(
            code=407,
            message='error',
            version=bakery.LATEST_VERSION,
            info=httpbakery.ErrorInfo.from_dict(info))
        client = httpbakery.Client()
        client.handle_error(error, 'http://example.com/some/path')
        [cookie] = client.cookies
        self.assertEqual(cookie.path, "/some/")


class GetHandler(BaseHTTPRequestHandler):
    '''A mock HTTP server that serves a GET request'''
    def __init__(self, bakery, auth_location, mutate_error,
                 caveats, version, expiry, *args):
        '''
        @param bakery used to check incoming requests and macaroons
        for discharge-required errors.
        @param auth_location holds the location of any 3rd party
        authorizer. If this is not None, a 3rd party caveat will be
        added addressed to this location.
        @param mutate_error if non None, will be called with any
        discharge-required error before responding to the client.
        @param caveats called to get caveats to add to the returned
        macaroon.
        @param version holds the version of the bakery that the
        server will purport to serve.
        @param expiry holds the expiry for the macaroon that will be created
        in _write_discharge_error
        '''
        self._bakery = bakery
        self._auth_location = auth_location
        self._mutate_error = mutate_error
        self._caveats = caveats
        self._server_version = version
        self._expiry = expiry
        BaseHTTPRequestHandler.__init__(self, *args)

    def do_GET(self):
        '''do_GET implements a handler for the HTTP GET method'''
        ctx = checkers.AuthContext()
        auth_checker = self._bakery.checker.auth(
            httpbakery.extract_macaroons(self.headers))
        try:
            auth_checker.allow(ctx, [TEST_OP])
        except (bakery.PermissionDenied,
                bakery.VerificationError) as exc:
            return self._write_discharge_error(exc)
        self.send_response(200)
        self.end_headers()
        content_len = int(self.headers.get('content-length', 0))
        content = 'done'
        if self.path != '/no-body'and content_len > 0:
            body = self.rfile.read(content_len)
            content = content + ' ' + body
        self.wfile.write(content.encode('utf-8'))
        return

    def _write_discharge_error(self, exc):
        version = httpbakery.request_version(self.headers)
        if version < bakery.LATEST_VERSION:
            self._server_version = version

        caveats = []
        if self._auth_location != '':
            caveats = [
                checkers.Caveat(location=self._auth_location,
                                condition='is-ok')
            ]
        if self._caveats is not None:
            caveats.extend(self._caveats)

        m = self._bakery.oven.macaroon(
            version=bakery.LATEST_VERSION, expiry=self._expiry,
            caveats=caveats, ops=[TEST_OP])

        content, headers = httpbakery.discharge_required_response(
            m, '/', 'test', exc.args[0])
        self.send_response(401)
        for h in headers:
            self.send_header(h, headers[h])
        self.send_header('Connection', 'close')
        self.end_headers()
        self.wfile.write(content)


def new_bakery(location, locator, checker):
    '''Return a new bakery instance.
    @param location Location of the bakery {str}.
    @param locator Locator for third parties {ThirdPartyLocator or None}
    @param checker Caveat checker {FirstPartyCaveatChecker or None}
    @return {Bakery}
    '''
    if checker is None:
        c = checkers.Checker()
        c.namespace().register('testns', '')
        c.register('is', 'testns', check_is_something)
        checker = c
    key = bakery.generate_key()
    return bakery.Bakery(
        location=location,
        locator=locator,
        key=key,
        checker=checker,
    )


def is_something_caveat():
    return checkers.Caveat(condition='is something', namespace='testns')


def check_is_something(ctx, cond, arg):
    if arg != 'something':
        return '{} doesn\'t match "something"'.format(arg)
    return None


class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
    def __init__(self, check):
        self._check = check

    def check_third_party_caveat(self, ctx, info):
        cond, arg = checkers.parse_caveat(info.condition)
        return self._check(cond, arg)

alwaysOK3rd = ThirdPartyCaveatCheckerF(lambda cond, arg: [])