File: //proc/self/root/lib/python3/dist-packages/certbot/tests/ocsp_test.py
"""Tests for ocsp.py"""
# pylint: disable=protected-access
import contextlib
import unittest
from datetime import datetime, timedelta
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes  # type: ignore
from cryptography.exceptions import UnsupportedAlgorithm, InvalidSignature
from cryptography import x509
try:
    # Only cryptography>=2.5 has ocsp module
    # and signature_hash_algorithm attribute in OCSPResponse class
    from cryptography.x509 import ocsp as ocsp_lib  # pylint: disable=import-error
    getattr(ocsp_lib.OCSPResponse, 'signature_hash_algorithm')
except (ImportError, AttributeError):  # pragma: no cover
    ocsp_lib = None  # type: ignore
import mock
import pytz
from certbot import errors
from certbot.tests import util as test_util
out = """Missing = in header key=value
ocsp: Use -help for summary.
"""
class OCSPTestOpenSSL(unittest.TestCase):
    """
    OCSP revokation tests using OpenSSL binary.
    """
    def setUp(self):
        from certbot import ocsp
        with mock.patch('certbot.ocsp.Popen') as mock_popen:
            with mock.patch('certbot.util.exe_exists') as mock_exists:
                mock_communicate = mock.MagicMock()
                mock_communicate.communicate.return_value = (None, out)
                mock_popen.return_value = mock_communicate
                mock_exists.return_value = True
                self.checker = ocsp.RevocationChecker(enforce_openssl_binary_usage=True)
    def tearDown(self):
        pass
    @mock.patch('certbot.ocsp.logger.info')
    @mock.patch('certbot.ocsp.Popen')
    @mock.patch('certbot.util.exe_exists')
    def test_init(self, mock_exists, mock_popen, mock_log):
        mock_communicate = mock.MagicMock()
        mock_communicate.communicate.return_value = (None, out)
        mock_popen.return_value = mock_communicate
        mock_exists.return_value = True
        from certbot import ocsp
        checker = ocsp.RevocationChecker(enforce_openssl_binary_usage=True)
        self.assertEqual(mock_popen.call_count, 1)
        self.assertEqual(checker.host_args("x"), ["Host=x"])
        mock_communicate.communicate.return_value = (None, out.partition("\n")[2])
        checker = ocsp.RevocationChecker(enforce_openssl_binary_usage=True)
        self.assertEqual(checker.host_args("x"), ["Host", "x"])
        self.assertEqual(checker.broken, False)
        mock_exists.return_value = False
        mock_popen.call_count = 0
        checker = ocsp.RevocationChecker(enforce_openssl_binary_usage=True)
        self.assertEqual(mock_popen.call_count, 0)
        self.assertEqual(mock_log.call_count, 1)
        self.assertEqual(checker.broken, True)
    @mock.patch('certbot.ocsp._determine_ocsp_server')
    @mock.patch('certbot.util.run_script')
    def test_ocsp_revoked(self, mock_run, mock_determine):
        now = pytz.UTC.fromutc(datetime.utcnow())
        cert_obj = mock.MagicMock()
        cert_obj.cert = "x"
        cert_obj.chain = "y"
        cert_obj.target_expiry = now + timedelta(hours=2)
        self.checker.broken = True
        mock_determine.return_value = ("", "")
        self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
        self.checker.broken = False
        mock_run.return_value = tuple(openssl_happy[1:])
        self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
        self.assertEqual(mock_run.call_count, 0)
        mock_determine.return_value = ("http://x.co", "x.co")
        self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
        mock_run.side_effect = errors.SubprocessError("Unable to load certificate launcher")
        self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
        self.assertEqual(mock_run.call_count, 2)
        # cert expired
        cert_obj.target_expiry = now
        mock_determine.return_value = ("", "")
        count_before = mock_determine.call_count
        self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
        self.assertEqual(mock_determine.call_count, count_before)
    def test_determine_ocsp_server(self):
        cert_path = test_util.vector_path('ocsp_certificate.pem')
        from certbot import ocsp
        result = ocsp._determine_ocsp_server(cert_path)
        self.assertEqual(('http://ocsp.test4.buypass.com', 'ocsp.test4.buypass.com'), result)
    @mock.patch('certbot.ocsp.logger')
    @mock.patch('certbot.util.run_script')
    def test_translate_ocsp(self, mock_run, mock_log):
        # pylint: disable=protected-access
        mock_run.return_value = openssl_confused
        from certbot import ocsp
        self.assertEqual(ocsp._translate_ocsp_query(*openssl_happy), False)
        self.assertEqual(ocsp._translate_ocsp_query(*openssl_confused), False)
        self.assertEqual(mock_log.debug.call_count, 1)
        self.assertEqual(mock_log.warning.call_count, 0)
        mock_log.debug.call_count = 0
        self.assertEqual(ocsp._translate_ocsp_query(*openssl_unknown), False)
        self.assertEqual(mock_log.debug.call_count, 1)
        self.assertEqual(mock_log.warning.call_count, 0)
        self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp), False)
        self.assertEqual(mock_log.debug.call_count, 2)
        self.assertEqual(ocsp._translate_ocsp_query(*openssl_broken), False)
        self.assertEqual(mock_log.warning.call_count, 1)
        mock_log.info.call_count = 0
        self.assertEqual(ocsp._translate_ocsp_query(*openssl_revoked), True)
        self.assertEqual(mock_log.info.call_count, 0)
        self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
        self.assertEqual(mock_log.info.call_count, 1)
@unittest.skipIf(not ocsp_lib,
                 reason='This class tests functionalities available only on cryptography>=2.5.0')
class OSCPTestCryptography(unittest.TestCase):
    """
    OCSP revokation tests using Cryptography >= 2.4.0
    """
    def setUp(self):
        from certbot import ocsp
        self.checker = ocsp.RevocationChecker()
        self.cert_path = test_util.vector_path('ocsp_certificate.pem')
        self.chain_path = test_util.vector_path('ocsp_issuer_certificate.pem')
        self.cert_obj = mock.MagicMock()
        self.cert_obj.cert = self.cert_path
        self.cert_obj.chain = self.chain_path
        now = pytz.UTC.fromutc(datetime.utcnow())
        self.cert_obj.target_expiry = now + timedelta(hours=2)
    @mock.patch('certbot.ocsp._determine_ocsp_server')
    @mock.patch('certbot.ocsp._check_ocsp_cryptography')
    def test_ensure_cryptography_toggled(self, mock_revoke, mock_determine):
        mock_determine.return_value = ('http://example.com', 'example.com')
        self.checker.ocsp_revoked(self.cert_obj)
        mock_revoke.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com')
    def test_revoke(self):
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertTrue(revoked)
    def test_responder_is_issuer(self):
        issuer = x509.load_pem_x509_certificate(
            test_util.load_vector('ocsp_issuer_certificate.pem'), default_backend())
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED,
                        ocsp_lib.OCSPResponseStatus.SUCCESSFUL) as mocks:
            mocks['mock_response'].return_value.responder_name = issuer.subject
            self.checker.ocsp_revoked(self.cert_obj)
        # Here responder and issuer are the same. So only the signature of the OCSP
        # response is checked (using the issuer/responder public key).
        self.assertEqual(mocks['mock_check'].call_count, 1)
        self.assertEqual(mocks['mock_check'].call_args[0][0].public_numbers(),
                         issuer.public_key().public_numbers())
    def test_responder_is_authorized_delegate(self):
        issuer = x509.load_pem_x509_certificate(
            test_util.load_vector('ocsp_issuer_certificate.pem'), default_backend())
        responder = x509.load_pem_x509_certificate(
            test_util.load_vector('ocsp_responder_certificate.pem'), default_backend())
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED,
                        ocsp_lib.OCSPResponseStatus.SUCCESSFUL) as mocks:
            self.checker.ocsp_revoked(self.cert_obj)
        # Here responder and issuer are not the same. Two signatures will be checked then,
        # first to verify the responder cert (using the issuer public key), second to
        # to verify the OCSP response itself (using the responder public key).
        self.assertEqual(mocks['mock_check'].call_count, 2)
        self.assertEqual(mocks['mock_check'].call_args_list[0][0][0].public_numbers(),
                         issuer.public_key().public_numbers())
        self.assertEqual(mocks['mock_check'].call_args_list[1][0][0].public_numbers(),
                         responder.public_key().public_numbers())
    def test_revoke_resiliency(self):
        # Server return an invalid HTTP response
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.UNKNOWN, ocsp_lib.OCSPResponseStatus.SUCCESSFUL,
                        http_status_code=400):
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # OCSP response in invalid
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.UNKNOWN, ocsp_lib.OCSPResponseStatus.UNAUTHORIZED):
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # OCSP response is valid, but certificate status is unknown
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.UNKNOWN, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # The OCSP response says that the certificate is revoked, but certificate
        # does not contain the OCSP extension.
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
            with mock.patch('cryptography.x509.Extensions.get_extension_for_class',
                            side_effect=x509.ExtensionNotFound(
                                'Not found', x509.AuthorityInformationAccessOID.OCSP)):
                revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # OCSP response uses an unsupported signature.
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL,
                        check_signature_side_effect=UnsupportedAlgorithm('foo')):
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # OSCP signature response is invalid.
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL,
                        check_signature_side_effect=InvalidSignature('foo')):
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # Assertion error on OCSP response validity
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL,
                        check_signature_side_effect=AssertionError('foo')):
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # No responder cert in OCSP response
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED,
                        ocsp_lib.OCSPResponseStatus.SUCCESSFUL) as mocks:
            mocks['mock_response'].return_value.certificates = []
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        # Responder cert is not signed by certificate issuer
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED,
                        ocsp_lib.OCSPResponseStatus.SUCCESSFUL) as mocks:
            cert = mocks['mock_response'].return_value.certificates[0]
            mocks['mock_response'].return_value.certificates[0] = mock.Mock(
                issuer='fake', subject=cert.subject)
            revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
        with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
            # This mock is necessary to avoid the first call contained in _determine_ocsp_server
            # of the method cryptography.x509.Extensions.get_extension_for_class.
            with mock.patch('certbot.ocsp._determine_ocsp_server') as mock_server:
                mock_server.return_value = ('https://example.com', 'example.com')
                with mock.patch('cryptography.x509.Extensions.get_extension_for_class',
                                side_effect=x509.ExtensionNotFound(
                                    'Not found', x509.AuthorityInformationAccessOID.OCSP)):
                    revoked = self.checker.ocsp_revoked(self.cert_obj)
        self.assertFalse(revoked)
@contextlib.contextmanager
def _ocsp_mock(certificate_status, response_status,
               http_status_code=200, check_signature_side_effect=None):
    with mock.patch('certbot.ocsp.ocsp.load_der_ocsp_response') as mock_response:
        mock_response.return_value = _construct_mock_ocsp_response(
            certificate_status, response_status)
        with mock.patch('certbot.ocsp.requests.post') as mock_post:
            mock_post.return_value = mock.Mock(status_code=http_status_code)
            with mock.patch('certbot.ocsp.crypto_util.verify_signed_payload') as mock_check:
                if check_signature_side_effect:
                    mock_check.side_effect = check_signature_side_effect
                yield {
                    'mock_response': mock_response,
                    'mock_post': mock_post,
                    'mock_check': mock_check,
                }
def _construct_mock_ocsp_response(certificate_status, response_status):
    cert = x509.load_pem_x509_certificate(
        test_util.load_vector('ocsp_certificate.pem'), default_backend())
    issuer = x509.load_pem_x509_certificate(
        test_util.load_vector('ocsp_issuer_certificate.pem'), default_backend())
    responder = x509.load_pem_x509_certificate(
        test_util.load_vector('ocsp_responder_certificate.pem'), default_backend())
    builder = ocsp_lib.OCSPRequestBuilder()
    builder = builder.add_certificate(cert, issuer, hashes.SHA1())
    request = builder.build()
    return mock.Mock(
        response_status=response_status,
        certificate_status=certificate_status,
        serial_number=request.serial_number,
        issuer_key_hash=request.issuer_key_hash,
        issuer_name_hash=request.issuer_name_hash,
        responder_name=responder.subject,
        certificates=[responder],
        hash_algorithm=hashes.SHA1(),
        next_update=datetime.now() + timedelta(days=1),
        this_update=datetime.now() - timedelta(days=1),
        signature_algorithm_oid=x509.oid.SignatureAlgorithmOID.RSA_WITH_SHA1,
    )
# pylint: disable=line-too-long
openssl_confused = ("", """
/etc/letsencrypt/live/example.org/cert.pem: good
	This Update: Dec 17 00:00:00 2016 GMT
	Next Update: Dec 24 00:00:00 2016 GMT
""",
"""
Response Verify Failure
139903674214048:error:27069065:OCSP routines:OCSP_basic_verify:certificate verify error:ocsp_vfy.c:138:Verify error:unable to get local issuer certificate
""")
openssl_happy = ("blah.pem", """
blah.pem: good
	This Update: Dec 20 18:00:00 2016 GMT
	Next Update: Dec 27 18:00:00 2016 GMT
""",
"Response verify OK")
openssl_revoked = ("blah.pem", """
blah.pem: revoked
	This Update: Dec 20 01:00:00 2016 GMT
	Next Update: Dec 27 01:00:00 2016 GMT
	Revocation Time: Dec 20 01:46:34 2016 GMT
""",
"""Response verify OK""")
openssl_unknown = ("blah.pem", """
blah.pem: unknown
	This Update: Dec 20 18:00:00 2016 GMT
	Next Update: Dec 27 18:00:00 2016 GMT
""",
"Response verify OK")
openssl_broken = ("", "tentacles", "Response verify OK")
openssl_expired_ocsp = ("blah.pem", """
blah.pem: WARNING: Status times invalid.
140659132298912:error:2707307D:OCSP routines:OCSP_check_validity:status expired:ocsp_cl.c:372:
good
	This Update: Apr  6 00:00:00 2016 GMT
	Next Update: Apr 13 00:00:00 2016 GMT
""",
"""Response verify OK""")
openssl_expired_ocsp_revoked = ("blah.pem", """
blah.pem: WARNING: Status times invalid.
140659132298912:error:2707307D:OCSP routines:OCSP_check_validity:status expired:ocsp_cl.c:372:
revoked
	This Update: Apr  6 00:00:00 2016 GMT
	Next Update: Apr 13 00:00:00 2016 GMT
""",
"""Response verify OK""")
if __name__ == '__main__':
    unittest.main()  # pragma: no cover