HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/macaroonbakery/tests/test_checker.py
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import base64
import json
from collections import namedtuple
from datetime import timedelta
from unittest import TestCase

import macaroonbakery.bakery as bakery
import macaroonbakery.checkers as checkers
import pymacaroons
from macaroonbakery.tests.common import epoch, test_checker, test_context
from pymacaroons.verifier import FirstPartyCaveatVerifierDelegate, Verifier


class TestChecker(TestCase):
    def setUp(self):
        self._discharges = []

    def test_authorize_with_open_access_and_no_macaroons(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer(
            {bakery.Op(entity='something', action='read'):
                {bakery.EVERYONE}})
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)
        auth_info = client.do(test_context, ts, [
            bakery.Op(entity='something', action='read'),
        ])
        self.assertEqual(len(self._discharges), 0)
        self.assertIsNotNone(auth_info)
        self.assertIsNone(auth_info.identity)
        self.assertEqual(len(auth_info.macaroons), 0)

    def test_authorization_denied(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = bakery.ClosedAuthorizer()
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        with self.assertRaises(bakery.PermissionDenied):
            client.do(ctx, ts, [bakery.Op(entity='something', action='read')])

    def test_authorize_with_authentication_required(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer(
            {bakery.Op(entity='something', action='read'): {'bob'}})
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)

        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        auth_info = client.do(ctx, ts, [bakery.Op(entity='something',
                                                  action='read')])
        self.assertEqual(self._discharges,
                         [_DischargeRecord(location='ids', user='bob')])
        self.assertIsNotNone(auth_info)
        self.assertEqual(auth_info.identity.id(), 'bob')
        self.assertEqual(len(auth_info.macaroons), 1)

    def test_authorize_multiple_ops(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer(
            {
                bakery.Op(entity='something', action='read'): {'bob'},
                bakery.Op(entity='otherthing', action='read'): {'bob'}
            }
        )
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        client.do(ctx, ts, [
            bakery.Op(entity='something', action='read'),
            bakery.Op(entity='otherthing', action='read')
        ])
        self.assertEqual(self._discharges,
                         [_DischargeRecord(location='ids', user='bob')])

    def test_capability(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer(
            {bakery.Op(entity='something', action='read'): {'bob'}})
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)

        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        m = client.discharged_capability(
            ctx, ts, [bakery.Op(entity='something', action='read')])
        # Check that we can exercise the capability directly on the service
        # with no discharging required.
        auth_info = ts.do(test_context, [m], [
            bakery.Op(entity='something', action='read'),
        ])
        self.assertIsNotNone(auth_info)
        self.assertIsNone(auth_info.identity)
        self.assertEqual(len(auth_info.macaroons), 1)
        self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
                         m[0].identifier_bytes)

    def test_capability_multiple_entities(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'bob'},
            bakery.Op(entity='e2', action='read'): {'bob'},
            bakery.Op(entity='e3', action='read'): {'bob'},
        })
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        m = client.discharged_capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
            bakery.Op(entity='e3', action='read'),
        ])
        self.assertEqual(self._discharges,
                         [_DischargeRecord(location='ids', user='bob')])

        # Check that we can exercise the capability directly on the service
        # with no discharging required.
        ts.do(test_context, [m], [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
            bakery.Op(entity='e3', action='read'),
        ])

        # Check that we can exercise the capability to act on a subset of
        # the operations.
        ts.do(test_context, [m], [
            bakery.Op(entity='e2', action='read'),
            bakery.Op(entity='e3', action='read'),
        ])
        ts.do(test_context, [m],
              [bakery.Op(entity='e3', action='read')])

    def test_multiple_capabilities(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'alice'},
            bakery.Op(entity='e2', action='read'): {'bob'},
        })
        ts = _Service('myservice', auth, ids, locator)

        # Acquire two capabilities as different users and check
        # that we can combine them together to do both operations
        # at once.
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
        m1 = _Client(locator).discharged_capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
        ])
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        m2 = _Client(locator).discharged_capability(ctx, ts,
                                                    [bakery.Op(
                                                        entity='e2',
                                                        action='read')])
        self.assertEqual(self._discharges, [
            _DischargeRecord(location='ids', user='alice'),
            _DischargeRecord(location='ids', user='bob'),
        ])
        auth_info = ts.do(test_context, [m1, m2], [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
        ])
        self.assertIsNotNone(auth_info)
        self.assertIsNone(auth_info.identity)
        self.assertEqual(len(auth_info.macaroons), 2)
        self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
                         m1[0].identifier_bytes)
        self.assertEqual(auth_info.macaroons[1][0].identifier_bytes,
                         m2[0].identifier_bytes)

    def test_combine_capabilities(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'alice'},
            bakery.Op(entity='e2', action='read'): {'bob'},
            bakery.Op(entity='e3', action='read'): {'bob', 'alice'},
        })
        ts = _Service('myservice', auth, ids, locator)

        # Acquire two capabilities as different users and check
        # that we can combine them together into a single capability
        # capable of both operations.
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
        m1 = _Client(locator).discharged_capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e3', action='read'),
        ])
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        m2 = _Client(locator).discharged_capability(
            ctx, ts, [bakery.Op(entity='e2', action='read')])

        m = ts.capability(test_context, [m1, m2], [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
            bakery.Op(entity='e3', action='read'),
        ])
        ts.do(test_context, [[m.macaroon]], [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
            bakery.Op(entity='e3', action='read'),
        ])

    def test_partially_authorized_request(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'alice'},
            bakery.Op(entity='e2', action='read'): {'bob'},
        })
        ts = _Service('myservice', auth, ids, locator)

        # Acquire a capability for e1 but rely on authentication to
        # authorize e2.
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
        m = _Client(locator).discharged_capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
        ])
        client = _Client(locator)
        client.add_macaroon(ts, 'authz', m)

        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        client.discharged_capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
        ])

    def test_auth_with_third_party_caveats(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)

        # We make an authorizer that requires a third party discharge
        # when authorizing.
        def authorize_with_tp_discharge(ctx, id, op):
            if (id is not None and id.id() == 'bob' and
                    op == bakery.Op(entity='something', action='read')):
                return True, [checkers.Caveat(condition='question',
                                              location='other third party')]
            return False, None

        auth = bakery.AuthorizerFunc(authorize_with_tp_discharge)
        ts = _Service('myservice', auth, ids, locator)

        class _LocalDischargeChecker(bakery.ThirdPartyCaveatChecker):
            def check_third_party_caveat(_, ctx, info):
                if info.condition != 'question':
                    raise ValueError('third party condition not recognized')
                self._discharges.append(_DischargeRecord(
                    location='other third party',
                    user=ctx.get(_DISCHARGE_USER_KEY)
                ))
                return []

        locator['other third party'] = _Discharger(
            key=bakery.generate_key(),
            checker=_LocalDischargeChecker(),
            locator=locator,
        )
        client = _Client(locator)
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        client.do(ctx, ts, [bakery.Op(entity='something', action='read')])
        self.assertEqual(self._discharges, [
            _DischargeRecord(location='ids', user='bob'),
            _DischargeRecord(location='other third party', user='bob')
        ])

    def test_capability_combines_first_party_caveats(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'alice'},
            bakery.Op(entity='e2', action='read'): {'bob'},
        })
        ts = _Service('myservice', auth, ids, locator)

        # Acquire two capabilities as different users, add some first party
        # caveats that we can combine them together into a single capability
        # capable of both operations.
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
        m1 = _Client(locator).capability(
            ctx, ts, [bakery.Op(entity='e1', action='read')])
        m1.macaroon.add_first_party_caveat('true 1')
        m1.macaroon.add_first_party_caveat('true 2')
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        m2 = _Client(locator).capability(
            ctx, ts, [bakery.Op(entity='e2', action='read')])
        m2.macaroon.add_first_party_caveat('true 3')
        m2.macaroon.add_first_party_caveat('true 4')

        client = _Client(locator)
        client.add_macaroon(ts, 'authz1', [m1.macaroon])
        client.add_macaroon(ts, 'authz2', [m2.macaroon])

        m = client.capability(test_context, ts, [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
        ])
        self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [
            'true 1',
            'true 2',
            'true 3',
            'true 4',
        ])

    def test_first_party_caveat_squashing(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'alice'},
            bakery.Op(entity='e2', action='read'): {'alice'},
        })
        ts = _Service('myservice', auth, ids, locator)
        tests = [
            ('duplicates removed', [
                checkers.Caveat(condition='true 1', namespace='testns'),
                checkers.Caveat(condition='true 2', namespace='testns'),
                checkers.Caveat(condition='true 1', namespace='testns'),
                checkers.Caveat(condition='true 1', namespace='testns'),
                checkers.Caveat(condition='true 3', namespace='testns'),
            ], [
                checkers.Caveat(condition='true 1', namespace='testns'),
                checkers.Caveat(condition='true 2', namespace='testns'),
                checkers.Caveat(condition='true 3', namespace='testns'),
            ]), ('earliest time before', [
                checkers.time_before_caveat(epoch + timedelta(days=1)),
                checkers.Caveat(condition='true 1', namespace='testns'),
                checkers.time_before_caveat(
                    epoch + timedelta(days=0, hours=1)),
                checkers.time_before_caveat(epoch + timedelta(
                    days=0, hours=0, minutes=5)),
            ], [
                checkers.time_before_caveat(epoch + timedelta(
                    days=0, hours=0, minutes=5)),
                checkers.Caveat(condition='true 1', namespace='testns'),
            ]), ('operations and declared caveats removed', [
                checkers.deny_caveat(['foo']),
                checkers.allow_caveat(['read', 'write']),
                checkers.declared_caveat('username', 'bob'),
                checkers.Caveat(condition='true 1', namespace='testns'),
            ], [
                checkers.Caveat(condition='true 1', namespace='testns'),
            ])
        ]
        for test in tests:
            print(test[0])

            # Make a first macaroon with all the required first party caveats.
            ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
            m1 = _Client(locator).capability(
                ctx, ts, [bakery.Op(entity='e1', action='read')])
            m1.add_caveats(test[1], None, None)

            # Make a second macaroon that's not used to check that it's
            # caveats are not added.
            m2 = _Client(locator).capability(
                ctx, ts, [bakery.Op(entity='e1', action='read')])
            m2.add_caveat(checkers.Caveat(
                condition='true notused', namespace='testns'), None, None)
            client = _Client(locator)
            client.add_macaroon(ts, 'authz1', [m1.macaroon])
            client.add_macaroon(ts, 'authz2', [m2.macaroon])

            m3 = client.capability(
                test_context, ts, [bakery.Op(entity='e1', action='read')])
            self.assertEqual(
                _macaroon_conditions(m3.macaroon.caveats, False),
                _resolve_caveats(m3.namespace, test[2]))

    def test_login_only(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = bakery.ClosedAuthorizer()
        ts = _Service('myservice', auth, ids, locator)

        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP])
        self.assertIsNotNone(auth_info)
        self.assertEqual(auth_info.identity.id(), 'bob')

    def test_allow_any(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer(
            {
                bakery.Op(entity='e1', action='read'): {'alice'},
                bakery.Op(entity='e2', action='read'): {'bob'},
            })
        ts = _Service('myservice', auth, ids, locator)

        # Acquire a capability for e1 but rely on authentication to
        # authorize e2.
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
        m = _Client(locator).discharged_capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
        ])

        client = _Client(locator)
        client.add_macaroon(ts, 'authz', m)

        self._discharges = []
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        with self.assertRaises(_DischargeRequiredError):
            client.do_any(
                ctx, ts, [
                    bakery.LOGIN_OP,
                    bakery.Op(entity='e1', action='read'),
                    bakery.Op(entity='e1', action='read')
                ]
            )
            self.assertEqual(len(self._discharges), 0)

        # Log in as bob.
        _, err = client.do(ctx, ts, [bakery.LOGIN_OP])

        # All the previous actions should now be allowed.
        auth_info, allowed = client.do_any(ctx, ts, [
            bakery.LOGIN_OP,
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e1', action='read'),
        ])
        self.assertEqual(auth_info.identity.id(), 'bob')
        self.assertEqual(len(auth_info.macaroons), 2)
        self.assertEqual(allowed, [True, True, True])

    def test_auth_with_identity_from_context(self):
        locator = _DischargerLocator()
        ids = _BasicAuthIdService()
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'sherlock'},
            bakery.Op(entity='e2', action='read'): {'bob'},
        })
        ts = _Service('myservice', auth, ids, locator)

        # Check that we can perform the ops with basic auth in the
        # context.
        ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
        auth_info = _Client(locator).do(
            ctx, ts, [bakery.Op(entity='e1', action='read')])
        self.assertEqual(auth_info.identity.id(), 'sherlock')
        self.assertEqual(len(auth_info.macaroons), 0)

    def test_auth_login_op_with_identity_from_context(self):
        locator = _DischargerLocator()
        ids = _BasicAuthIdService()
        ts = _Service('myservice', bakery.ClosedAuthorizer(), ids, locator)

        # Check that we can use LoginOp
        # when auth isn't granted through macaroons.
        ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
        auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP])
        self.assertEqual(auth_info.identity.id(), 'sherlock')
        self.assertEqual(len(auth_info.macaroons), 0)

    def test_operation_allow_caveat(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'bob'},
            bakery.Op(entity='e1', action='write'): {'bob'},
            bakery.Op(entity='e2', action='read'): {'bob'},
        })
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)

        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        m = client.capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e1', action='write'),
            bakery.Op(entity='e2', action='read'),
        ])

        # Sanity check that we can do a write.
        ts.do(test_context, [[m.macaroon]],
              [bakery.Op(entity='e1', action='write')])

        m.add_caveat(checkers.allow_caveat(['read']), None, None)

        # A read operation should work.
        ts.do(test_context, [[m.macaroon]], [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
        ])

        # A write operation should fail
        # even though the original macaroon allowed it.
        with self.assertRaises(_DischargeRequiredError):
            ts.do(test_context, [[m.macaroon]], [
                bakery.Op(entity='e1', action='write'),
            ])

    def test_operation_deny_caveat(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = _OpAuthorizer({
            bakery.Op(entity='e1', action='read'): {'bob'},
            bakery.Op(entity='e1', action='write'): {'bob'},
            bakery.Op(entity='e2', action='read'): {'bob'},
        })
        ts = _Service('myservice', auth, ids, locator)
        client = _Client(locator)

        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        m = client.capability(ctx, ts, [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e1', action='write'),
            bakery.Op(entity='e2', action='read'),
        ])

        # Sanity check that we can do a write.
        ts.do(test_context, [[m.macaroon]], [
              bakery.Op(entity='e1', action='write')])

        m.add_caveat(checkers.deny_caveat(['write']), None, None)

        # A read operation should work.
        ts.do(test_context, [[m.macaroon]], [
            bakery.Op(entity='e1', action='read'),
            bakery.Op(entity='e2', action='read'),
        ])

        # A write operation should fail
        # even though the original macaroon allowed it.
        with self.assertRaises(_DischargeRequiredError):
            ts.do(test_context, [[m.macaroon]], [
                  bakery.Op(entity='e1', action='write')])

    def test_duplicate_login_macaroons(self):
        locator = _DischargerLocator()
        ids = _IdService('ids', locator, self)
        auth = bakery.ClosedAuthorizer()
        ts = _Service('myservice', auth, ids, locator)

        # Acquire a login macaroon for bob.
        client1 = _Client(locator)
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
        auth_info = client1.do(ctx, ts, [bakery.LOGIN_OP])
        self.assertEqual(auth_info.identity.id(), 'bob')

        # Acquire a login macaroon for alice.
        client2 = _Client(locator)
        ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
        auth_info = client2.do(ctx, ts, [bakery.LOGIN_OP])
        self.assertEqual(auth_info.identity.id(), 'alice')

        # Combine the two login macaroons into one client.
        client3 = _Client(locator)
        client3.add_macaroon(ts, '1.bob',
                             client1._macaroons[ts.name()]['authn'])
        client3.add_macaroon(ts, '2.alice',
                             client2._macaroons[ts.name()]['authn'])

        # We should authenticate as bob (because macaroons are presented
        # ordered by "cookie" name)
        auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP])
        self.assertEqual(auth_info.identity.id(), 'bob')
        self.assertEqual(len(auth_info.macaroons), 1)

        # Try them the other way around and we should authenticate as alice.
        client3 = _Client(locator)
        client3.add_macaroon(ts, '1.alice',
                             client2._macaroons[ts.name()]['authn'])
        client3.add_macaroon(ts, '2.bob',
                             client1._macaroons[ts.name()]['authn'])

        auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP])
        self.assertEqual(auth_info.identity.id(), 'alice')
        self.assertEqual(len(auth_info.macaroons), 1)

    def test_macaroon_ops_fatal_error(self):
        # When we get a non-VerificationError error from the
        # opstore, we don't do any more verification.
        checker = bakery.Checker(
            macaroon_opstore=_MacaroonStoreWithError())
        m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2)
        with self.assertRaises(bakery.AuthInitError):
            checker.auth([m]).allow(test_context, [bakery.LOGIN_OP])


class _DischargerLocator(bakery.ThirdPartyLocator):
    def __init__(self, dischargers=None):
        if dischargers is None:
            dischargers = {}
        self._dischargers = dischargers

    def third_party_info(self, loc):
        d = self._dischargers.get(loc)
        if d is None:
            return None
        return bakery.ThirdPartyInfo(
            public_key=d._key.public_key,
            version=bakery.LATEST_VERSION,
        )

    def __setitem__(self, key, item):
        self._dischargers[key] = item

    def __getitem__(self, key):
        return self._dischargers[key]

    def get(self, key):
        return self._dischargers.get(key)


class _IdService(bakery.IdentityClient,
                 bakery.ThirdPartyCaveatChecker):
    def __init__(self, location, locator, test_class):
        self._location = location
        self._test = test_class
        key = bakery.generate_key()
        self._discharger = _Discharger(key=key, checker=self, locator=locator)
        locator[location] = self._discharger

    def check_third_party_caveat(self, ctx, info):
        if info.condition != 'is-authenticated-user':
            raise bakery.CaveatNotRecognizedError(
                'third party condition not recognized')

        username = ctx.get(_DISCHARGE_USER_KEY, '')
        if username == '':
            raise bakery.ThirdPartyCaveatCheckFailed('no current user')
        self._test._discharges.append(
            _DischargeRecord(location=self._location, user=username))
        return [checkers.declared_caveat('username', username)]

    def identity_from_context(self, ctx):
        return None, [checkers.Caveat(location=self._location,
                                      condition='is-authenticated-user')]

    def declared_identity(self, ctx, declared):
        user = declared.get('username')
        if user is None:
            raise bakery.IdentityError('no username declared')
        return bakery.SimpleIdentity(user)


_DISCHARGE_USER_KEY = checkers.ContextKey('user-key')

_DischargeRecord = namedtuple('_DISCHARGE_RECORD', ['location', 'user'])


class _Discharger(object):
    ''' utility class that has a discharge function with the same signature of
    get_discharge for discharge_all.
    '''

    def __init__(self, key, locator, checker):
        self._key = key
        self._locator = locator
        self._checker = checker

    def discharge(self, ctx, cav, payload):
        return bakery.discharge(
            ctx,
            key=self._key,
            id=cav.caveat_id,
            caveat=payload,
            checker=self._checker,
            locator=self._locator,
        )


class _OpAuthorizer(bakery.Authorizer):
    '''Implements bakery.Authorizer by looking the operation
    up in the given map. If the username is in the associated list
    or the list contains "everyone", authorization is granted.
    '''

    def __init__(self, auth=None):
        if auth is None:
            auth = {}
        self._auth = auth

    def authorize(self, ctx, id, ops):
        return bakery.ACLAuthorizer(
            allow_public=True,
            get_acl=lambda ctx, op: self._auth.get(op, [])).authorize(
            ctx, id, ops)


class _MacaroonStore(object):
    ''' Stores root keys in memory and puts all operations in the macaroon id.
    '''

    def __init__(self, key, locator):
        self._root_key_store = bakery.MemoryKeyStore()
        self._key = key
        self._locator = locator

    def new_macaroon(self, caveats, namespace, ops):
        root_key, id = self._root_key_store.root_key()
        m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops}
        data = json.dumps(m_id)
        m = bakery.Macaroon(
            root_key=root_key, id=data, location='',
            version=bakery.LATEST_VERSION,
            namespace=namespace)
        m.add_caveats(caveats, self._key, self._locator)
        return m

    def macaroon_ops(self, ms):
        if len(ms) == 0:
            raise ValueError('no macaroons provided')

        m_id = json.loads(ms[0].identifier_bytes.decode('utf-8'))
        root_key = self._root_key_store.get(
            base64.urlsafe_b64decode(m_id['id'].encode('utf-8')))

        v = Verifier()

        class NoValidationOnFirstPartyCaveat(FirstPartyCaveatVerifierDelegate):
            def verify_first_party_caveat(self, verifier, caveat, signature):
                return True

        v.first_party_caveat_verifier_delegate = \
            NoValidationOnFirstPartyCaveat()
        ok = v.verify(macaroon=ms[0], key=root_key,
                      discharge_macaroons=ms[1:])
        if not ok:
            raise bakery.VerificationError('invalid signature')
        conditions = []
        for m in ms:
            cavs = m.first_party_caveats()
            for cav in cavs:
                conditions.append(cav.caveat_id_bytes.decode('utf-8'))
        ops = []
        for op in m_id['ops']:
            ops.append(bakery.Op(entity=op[0], action=op[1]))
        return ops, conditions


class _Service(object):
    '''Represents a service that requires authorization.

    Clients can make requests to the service to perform operations
    and may receive a macaroon to discharge if the authorization
    process requires it.
    '''

    def __init__(self, name, auth, idm, locator):
        self._name = name
        self._store = _MacaroonStore(bakery.generate_key(), locator)
        self._checker = bakery.Checker(
            checker=test_checker(),
            authorizer=auth,
            identity_client=idm,
            macaroon_opstore=self._store)

    def name(self):
        return self._name

    def do(self, ctx, ms, ops):
        try:
            authInfo = self._checker.auth(ms).allow(ctx, ops)
        except bakery.DischargeRequiredError as exc:
            self._discharge_required_error(exc)
        return authInfo

    def do_any(self, ctx, ms, ops):
        # makes a request to the service to perform any of the given
        # operations. It reports which operations have succeeded.
        try:
            authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops)
            return authInfo, allowed
        except bakery.DischargeRequiredError as exc:
            self._discharge_required_error(exc)

    def capability(self, ctx, ms, ops):
        try:
            conds = self._checker.auth(ms).allow_capability(ctx, ops)
        except bakery.DischargeRequiredError as exc:
            self._discharge_required_error(exc)

        m = self._store.new_macaroon(None, self._checker.namespace(), ops)
        for cond in conds:
            m.macaroon.add_first_party_caveat(cond)
        return m

    def _discharge_required_error(self, err):
        m = self._store.new_macaroon(err.cavs(), self._checker.namespace(),
                                     err.ops())
        name = 'authz'
        if len(err.ops()) == 1 and err.ops()[0] == bakery.LOGIN_OP:
            name = 'authn'
        raise _DischargeRequiredError(name=name, m=m)


class _DischargeRequiredError(Exception):
    def __init__(self, name, m):
        Exception.__init__(self, 'discharge required')
        self._name = name
        self._m = m

    def m(self):
        return self._m

    def name(self):
        return self._name


class _Client(object):
    max_retries = 3

    def __init__(self, dischargers):
        self._key = bakery.generate_key()
        self._macaroons = {}
        self._dischargers = dischargers

    def do(self, ctx, svc, ops):
        class _AuthInfo:
            authInfo = None

        def svc_do(ms):
            _AuthInfo.authInfo = svc.do(ctx, ms, ops)

        self._do_func(ctx, svc, svc_do)
        return _AuthInfo.authInfo

    def do_any(self, ctx, svc, ops):
        return svc.do_any(ctx, self._request_macaroons(svc), ops)

    def capability(self, ctx, svc, ops):
        # capability returns a capability macaroon for the given operations.

        class _M:
            m = None

        def svc_capability(ms):
            _M.m = svc.capability(ctx, ms, ops)
            return

        self._do_func(ctx, svc, svc_capability)
        return _M.m

    def discharged_capability(self, ctx, svc, ops):
        m = self.capability(ctx, svc, ops)
        return self._discharge_all(ctx, m)

    def _do_func(self, ctx, svc, f):
        for i in range(0, self.max_retries):
            try:
                f(self._request_macaroons(svc))
                return
            except _DischargeRequiredError as exc:
                ms = self._discharge_all(ctx, exc.m())
                self.add_macaroon(svc, exc.name(), ms)
        raise ValueError('discharge failed too many times')

    def _clear_macaroons(self, svc):
        if svc is None:
            self._macaroons = {}
            return
        if svc.name() in self._macaroons:
            del self._macaroons[svc.name()]

    def add_macaroon(self, svc, name, m):
        if svc.name() not in self._macaroons:
            self._macaroons[svc.name()] = {}
        self._macaroons[svc.name()][name] = m

    def _request_macaroons(self, svc):
        mmap = self._macaroons.get(svc.name(), [])
        # Put all the macaroons in the slice ordered by key
        # so that we have deterministic behaviour in the tests.
        names = []
        for name in mmap:
            names.append(name)
        names = sorted(names)
        ms = [None] * len(names)
        for i, name in enumerate(names):
            ms[i] = mmap[name]
        return ms

    def _discharge_all(self, ctx, m):
        def get_discharge(cav, payload):
            d = self._dischargers.get(cav.location)
            if d is None:
                raise ValueError('third party discharger '
                                 '{} not found'.format(cav.location))
            return d.discharge(ctx, cav, payload)

        return bakery.discharge_all(m, get_discharge)


class _BasicAuthIdService(bakery.IdentityClient):
    def identity_from_context(self, ctx):
        user, pwd = _basic_auth_from_context(ctx)
        if user != 'sherlock' or pwd != 'holmes':
            return None, None
        return bakery.SimpleIdentity(user), None

    def declared_identity(self, ctx, declared):
        raise bakery.IdentityError('no identity declarations in basic auth'
                                   ' id service')


_BASIC_AUTH_KEY = checkers.ContextKey('user-key')


class _BasicAuth(object):
    def __init__(self, user, password):
        self.user = user
        self.password = password


def _context_with_basic_auth(ctx, user, password):
    return ctx.with_value(_BASIC_AUTH_KEY, _BasicAuth(user, password))


def _basic_auth_from_context(ctx):
    auth = ctx.get(_BASIC_AUTH_KEY, _BasicAuth('', ''))
    return auth.user, auth.password


def _macaroon_conditions(caveats, allow_third):
    conds = [''] * len(caveats)
    for i, cav in enumerate(caveats):
        if cav.location is not None and cav.location != '':
            if not allow_third:
                raise ValueError('found unexpected third party caveat:'
                                 ' {}'.format(cav.location))
            continue
        conds[i] = cav.caveat_id.decode('utf-8')
    return conds


def _resolve_caveats(ns, caveats):
    conds = [''] * len(caveats)
    for i, cav in enumerate(caveats):
        if cav.location is not None and cav.location != '':
            raise ValueError('found unexpected third party caveat')
        conds[i] = ns.resolve_caveat(cav).condition
    return conds


class _MacaroonStoreWithError(object):
    def new_macaroon(self, caveats, ns, ops):
        raise ValueError('some error')

    def macaroon_ops(self, ms):
        raise ValueError('some error')