HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/certbot_apache/tests/http_01_test.py
"""Test for certbot_apache.http_01."""
import unittest
import mock

from acme import challenges
from acme.magic_typing import List  # pylint: disable=unused-import, no-name-in-module

from certbot import achallenges
from certbot import errors
from certbot.compat import filesystem
from certbot.compat import os
from certbot.tests import acme_util

from certbot_apache.parser import get_aug_path
from certbot_apache.tests import util


NUM_ACHALLS = 3


class ApacheHttp01Test(util.ApacheTest):
    """Test for certbot_apache.http_01.ApacheHttp01."""

    def setUp(self, *args, **kwargs):  # pylint: disable=arguments-differ
        super(ApacheHttp01Test, self).setUp(*args, **kwargs)

        self.account_key = self.rsa512jwk
        self.achalls = []  # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
        vh_truth = util.get_vh_truth(
            self.temp_dir, "debian_apache_2_4/multiple_vhosts")
        # Takes the vhosts for encryption-example.demo, certbot.demo
        # and vhost.in.rootconf
        self.vhosts = [vh_truth[0], vh_truth[3], vh_truth[10]]

        for i in range(NUM_ACHALLS):
            self.achalls.append(
                achallenges.KeyAuthorizationAnnotatedChallenge(
                    challb=acme_util.chall_to_challb(
                        challenges.HTTP01(token=((chr(ord('a') + i).encode() * 16))),
                        "pending"),
                    domain=self.vhosts[i].name, account_key=self.account_key))

        modules = ["ssl", "rewrite", "authz_core", "authz_host"]
        for mod in modules:
            self.config.parser.modules.add("mod_{0}.c".format(mod))
            self.config.parser.modules.add(mod + "_module")

        from certbot_apache.http_01 import ApacheHttp01
        self.http = ApacheHttp01(self.config)

    def test_empty_perform(self):
        self.assertFalse(self.http.perform())

    @mock.patch("certbot_apache.configurator.ApacheConfigurator.enable_mod")
    def test_enable_modules_apache_2_2(self, mock_enmod):
        self.config.version = (2, 2)
        self.config.parser.modules.remove("authz_host_module")
        self.config.parser.modules.remove("mod_authz_host.c")

        enmod_calls = self.common_enable_modules_test(mock_enmod)
        self.assertEqual(enmod_calls[0][0][0], "authz_host")

    @mock.patch("certbot_apache.configurator.ApacheConfigurator.enable_mod")
    def test_enable_modules_apache_2_4(self, mock_enmod):
        self.config.parser.modules.remove("authz_core_module")
        self.config.parser.modules.remove("mod_authz_core.c")

        enmod_calls = self.common_enable_modules_test(mock_enmod)
        self.assertEqual(enmod_calls[0][0][0], "authz_core")

    def common_enable_modules_test(self, mock_enmod):
        """Tests enabling mod_rewrite and other modules."""
        self.config.parser.modules.remove("rewrite_module")
        self.config.parser.modules.remove("mod_rewrite.c")

        self.http.prepare_http01_modules()

        self.assertTrue(mock_enmod.called)
        calls = mock_enmod.call_args_list
        other_calls = []
        for call in calls:
            if call[0][0] != "rewrite":
                other_calls.append(call)

        # If these lists are equal, we never enabled mod_rewrite
        self.assertNotEqual(calls, other_calls)
        return other_calls

    def test_same_vhost(self):
        vhost = next(v for v in self.config.vhosts if v.name == "certbot.demo")
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain=vhost.name, account_key=self.account_key),
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'b' * 16))),
                    "pending"),
                domain=next(iter(vhost.aliases)), account_key=self.account_key)
        ]
        self.common_perform_test(achalls, [vhost])

    def test_anonymous_vhost(self):
        vhosts = [v for v in self.config.vhosts if not v.ssl]
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain="something.nonexistent", account_key=self.account_key)]
        self.common_perform_test(achalls, vhosts)

    def test_configure_multiple_vhosts(self):
        vhosts = [v for v in self.config.vhosts if "duplicate.example.com" in v.get_names()]
        self.assertEqual(len(vhosts), 2)
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain="duplicate.example.com", account_key=self.account_key)]
        self.common_perform_test(achalls, vhosts)

    def test_no_vhost(self):
        for achall in self.achalls:
            self.http.add_chall(achall)
        self.config.config.http01_port = 12345
        self.assertRaises(errors.PluginError, self.http.perform)

    def test_perform_1_achall_apache_2_2(self):
        self.combinations_perform_test(num_achalls=1, minor_version=2)

    def test_perform_1_achall_apache_2_4(self):
        self.combinations_perform_test(num_achalls=1, minor_version=4)

    def test_perform_2_achall_apache_2_2(self):
        self.combinations_perform_test(num_achalls=2, minor_version=2)

    def test_perform_2_achall_apache_2_4(self):
        self.combinations_perform_test(num_achalls=2, minor_version=4)

    def test_perform_3_achall_apache_2_2(self):
        self.combinations_perform_test(num_achalls=3, minor_version=2)

    def test_perform_3_achall_apache_2_4(self):
        self.combinations_perform_test(num_achalls=3, minor_version=4)

    def test_activate_disabled_vhost(self):
        vhosts = [v for v in self.config.vhosts if v.name == "certbot.demo"]
        achalls = [
            achallenges.KeyAuthorizationAnnotatedChallenge(
                challb=acme_util.chall_to_challb(
                    challenges.HTTP01(token=((b'a' * 16))),
                    "pending"),
                domain="certbot.demo", account_key=self.account_key)]
        vhosts[0].enabled = False
        self.common_perform_test(achalls, vhosts)
        matches = self.config.parser.find_dir(
            "Include", vhosts[0].filep,
            get_aug_path(self.config.parser.loc["default"]))
        self.assertEqual(len(matches), 1)

    def combinations_perform_test(self, num_achalls, minor_version):
        """Test perform with the given achall count and Apache version."""
        achalls = self.achalls[:num_achalls]
        vhosts = self.vhosts[:num_achalls]
        self.config.version = (2, minor_version)
        self.common_perform_test(achalls, vhosts)

    def common_perform_test(self, achalls, vhosts):
        """Tests perform with the given achalls."""
        challenge_dir = self.http.challenge_dir
        self.assertFalse(os.path.exists(challenge_dir))
        for achall in achalls:
            self.http.add_chall(achall)

        expected_response = [
            achall.response(self.account_key) for achall in achalls]
        self.assertEqual(self.http.perform(), expected_response)

        self.assertTrue(os.path.isdir(self.http.challenge_dir))
        self.assertTrue(filesystem.has_min_permissions(self.http.challenge_dir, 0o755))
        self._test_challenge_conf()

        for achall in achalls:
            self._test_challenge_file(achall)

        for vhost in vhosts:
            matches = self.config.parser.find_dir("Include",
                                                self.http.challenge_conf_pre,
                                                vhost.path)
            self.assertEqual(len(matches), 1)
            matches = self.config.parser.find_dir("Include",
                                                self.http.challenge_conf_post,
                                                vhost.path)
            self.assertEqual(len(matches), 1)

        self.assertTrue(os.path.exists(challenge_dir))

    def _test_challenge_conf(self):
        with open(self.http.challenge_conf_pre) as f:
            pre_conf_contents = f.read()

        with open(self.http.challenge_conf_post) as f:
            post_conf_contents = f.read()

        self.assertTrue("RewriteEngine on" in pre_conf_contents)
        self.assertTrue("RewriteRule" in pre_conf_contents)

        self.assertTrue(self.http.challenge_dir in post_conf_contents)
        if self.config.version < (2, 4):
            self.assertTrue("Allow from all" in post_conf_contents)
        else:
            self.assertTrue("Require all granted" in post_conf_contents)

    def _test_challenge_file(self, achall):
        name = os.path.join(self.http.challenge_dir, achall.chall.encode("token"))
        validation = achall.validation(self.account_key)

        self.assertTrue(filesystem.has_min_permissions(name, 0o644))
        with open(name, 'rb') as f:
            self.assertEqual(f.read(), validation.encode())


if __name__ == "__main__":
    unittest.main()  # pragma: no cover