HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //lib/python3/dist-packages/macaroonbakery/tests/test_discharge_all.py
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import unittest

import macaroonbakery.bakery as bakery
import macaroonbakery.checkers as checkers
from macaroonbakery.tests import common
from pymacaroons.verifier import Verifier


def always_ok(predicate):
    return True


class TestDischargeAll(unittest.TestCase):
    def test_discharge_all_no_discharges(self):
        root_key = b'root key'
        m = bakery.Macaroon(
            root_key=root_key, id=b'id0', location='loc0',
            version=bakery.LATEST_VERSION,
            namespace=common.test_checker().namespace())
        ms = bakery.discharge_all(m, no_discharge(self))
        self.assertEqual(len(ms), 1)
        self.assertEqual(ms[0], m.macaroon)
        v = Verifier()
        v.satisfy_general(always_ok)
        v.verify(m.macaroon, root_key, None)

    def test_discharge_all_many_discharges(self):
        root_key = b'root key'
        m0 = bakery.Macaroon(
            root_key=root_key, id=b'id0', location='loc0',
            version=bakery.LATEST_VERSION)

        class State(object):
            total_required = 40
            id = 1

        def add_caveats(m):
            for i in range(0, 1):
                if State.total_required == 0:
                    break
                cid = 'id{}'.format(State.id)
                m.macaroon.add_third_party_caveat(
                    location='somewhere',
                    key='root key {}'.format(cid).encode('utf-8'),
                    key_id=cid.encode('utf-8'))
                State.id += 1
                State.total_required -= 1

        add_caveats(m0)

        def get_discharge(cav, payload):
            self.assertEqual(payload, None)
            m = bakery.Macaroon(
                root_key='root key {}'.format(
                    cav.caveat_id.decode('utf-8')).encode('utf-8'),
                id=cav.caveat_id, location='',
                version=bakery.LATEST_VERSION)

            add_caveats(m)
            return m

        ms = bakery.discharge_all(m0, get_discharge)

        self.assertEqual(len(ms), 41)

        v = Verifier()
        v.satisfy_general(always_ok)
        v.verify(ms[0], root_key, ms[1:])

    def test_discharge_all_many_discharges_with_real_third_party_caveats(self):
        # This is the same flow as TestDischargeAllManyDischarges except that
        # we're using actual third party caveats as added by
        # Macaroon.add_caveat and we use a larger number of caveats
        # so that caveat ids will need to get larger.
        locator = bakery.ThirdPartyStore()
        bakeries = {}
        total_discharges_required = 40

        class M:
            bakery_id = 0
            still_required = total_discharges_required

        def add_bakery():
            M.bakery_id += 1
            loc = 'loc{}'.format(M.bakery_id)
            bakeries[loc] = common.new_bakery(loc, locator)
            return loc

        ts = common.new_bakery('ts-loc', locator)

        def checker(_, ci):
            caveats = []
            if ci.condition != 'something':
                self.fail('unexpected condition')
            for i in range(0, 2):
                if M.still_required <= 0:
                    break
                caveats.append(checkers.Caveat(location=add_bakery(),
                                               condition='something'))
                M.still_required -= 1
            return caveats

        root_key = b'root key'
        m0 = bakery.Macaroon(
            root_key=root_key, id=b'id0', location='ts-loc',
            version=bakery.LATEST_VERSION)

        m0.add_caveat(checkers. Caveat(location=add_bakery(),
                                       condition='something'),
                      ts.oven.key, locator)

        # We've added a caveat (the first) so one less caveat is required.
        M.still_required -= 1

        class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
            def check_third_party_caveat(self, ctx, info):
                return checker(ctx, info)

        def get_discharge(cav, payload):
            return bakery.discharge(
                common.test_context, cav.caveat_id, payload,
                bakeries[cav.location].oven.key,
                ThirdPartyCaveatCheckerF(), locator)

        ms = bakery.discharge_all(m0, get_discharge)

        self.assertEqual(len(ms), total_discharges_required + 1)

        v = Verifier()
        v.satisfy_general(always_ok)
        v.verify(ms[0], root_key, ms[1:])

    def test_discharge_all_local_discharge(self):
        oc = common.new_bakery('ts', None)
        client_key = bakery.generate_key()
        m = oc.oven.macaroon(bakery.LATEST_VERSION, common.ages,
                             [
                                 bakery.local_third_party_caveat(
                                     client_key.public_key,
                                     bakery.LATEST_VERSION)
                             ], [bakery.LOGIN_OP])
        ms = bakery.discharge_all(m, no_discharge(self), client_key)
        oc.checker.auth([ms]).allow(common.test_context,
                                    [bakery.LOGIN_OP])

    def test_discharge_all_local_discharge_version1(self):
        oc = common.new_bakery('ts', None)
        client_key = bakery.generate_key()
        m = oc.oven.macaroon(bakery.VERSION_1, common.ages, [
            bakery.local_third_party_caveat(
                client_key.public_key, bakery.VERSION_1)
        ], [bakery.LOGIN_OP])
        ms = bakery.discharge_all(m, no_discharge(self), client_key)
        oc.checker.auth([ms]).allow(common.test_context,
                                    [bakery.LOGIN_OP])


def no_discharge(test):
    def get_discharge(cav, payload):
        test.fail("get_discharge called unexpectedly")

    return get_discharge