HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/lib/python3/dist-packages/oauthlib/oauth1/rfc5849/endpoints/resource.py
# -*- coding: utf-8 -*-
"""
oauthlib.oauth1.rfc5849.endpoints.resource
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This module is an implementation of the resource protection provider logic of
OAuth 1.0 RFC 5849.
"""
from __future__ import absolute_import, unicode_literals

import logging

from .. import errors
from .base import BaseEndpoint

log = logging.getLogger(__name__)


class ResourceEndpoint(BaseEndpoint):

    """An endpoint responsible for protecting resources.

    Typical use is to instantiate with a request validator and invoke the
    ``validate_protected_resource_request`` in a decorator around a view
    function. If the request is valid, invoke and return the response of the
    view. If invalid create and return an error response directly from the
    decorator.

    See :doc:`/oauth1/validator` for details on which validator methods to implement
    for this endpoint.

    An example decorator::

        from functools import wraps
        from your_validator import your_validator
        from oauthlib.oauth1 import ResourceEndpoint
        endpoint = ResourceEndpoint(your_validator)

        def require_oauth(realms=None):
            def decorator(f):
                @wraps(f)
                def wrapper(request, *args, **kwargs):
                    v, r = provider.validate_protected_resource_request(
                            request.url,
                            http_method=request.method,
                            body=request.data,
                            headers=request.headers,
                            realms=realms or [])
                    if v:
                        return f(*args, **kwargs)
                    else:
                        return abort(403)
    """

    def validate_protected_resource_request(self, uri, http_method='GET',
                                            body=None, headers=None, realms=None):
        """Create a request token response, with a new request token if valid.

        :param uri: The full URI of the token request.
        :param http_method: A valid HTTP verb, i.e. GET, POST, PUT, HEAD, etc.
        :param body: The request body as a string.
        :param headers: The request headers as a dict.
        :param realms: A list of realms the resource is protected under.
                       This will be supplied to the ``validate_realms``
                       method of the request validator.
        :returns: A tuple of 2 elements.
                  1. True if valid, False otherwise.
                  2. An oauthlib.common.Request object.
        """
        try:
            request = self._create_request(uri, http_method, body, headers)
        except errors.OAuth1Error:
            return False, None

        try:
            self._check_transport_security(request)
            self._check_mandatory_parameters(request)
        except errors.OAuth1Error:
            return False, request

        if not request.resource_owner_key:
            return False, request

        if not self.request_validator.check_access_token(
                request.resource_owner_key):
            return False, request

        if not self.request_validator.validate_timestamp_and_nonce(
                request.client_key, request.timestamp, request.nonce, request,
                access_token=request.resource_owner_key):
            return False, request

        # The server SHOULD return a 401 (Unauthorized) status code when
        # receiving a request with invalid client credentials.
        # Note: This is postponed in order to avoid timing attacks, instead
        # a dummy client is assigned and used to maintain near constant
        # time request verification.
        #
        # Note that early exit would enable client enumeration
        valid_client = self.request_validator.validate_client_key(
            request.client_key, request)
        if not valid_client:
            request.client_key = self.request_validator.dummy_client

        # The server SHOULD return a 401 (Unauthorized) status code when
        # receiving a request with invalid or expired token.
        # Note: This is postponed in order to avoid timing attacks, instead
        # a dummy token is assigned and used to maintain near constant
        # time request verification.
        #
        # Note that early exit would enable resource owner enumeration
        valid_resource_owner = self.request_validator.validate_access_token(
            request.client_key, request.resource_owner_key, request)
        if not valid_resource_owner:
            request.resource_owner_key = self.request_validator.dummy_access_token

        # Note that `realm`_ is only used in authorization headers and how
        # it should be interepreted is not included in the OAuth spec.
        # However they could be seen as a scope or realm to which the
        # client has access and as such every client should be checked
        # to ensure it is authorized access to that scope or realm.
        # .. _`realm`: https://tools.ietf.org/html/rfc2617#section-1.2
        #
        # Note that early exit would enable client realm access enumeration.
        #
        # The require_realm indicates this is the first step in the OAuth
        # workflow where a client requests access to a specific realm.
        # This first step (obtaining request token) need not require a realm
        # and can then be identified by checking the require_resource_owner
        # flag and abscence of realm.
        #
        # Clients obtaining an access token will not supply a realm and it will
        # not be checked. Instead the previously requested realm should be
        # transferred from the request token to the access token.
        #
        # Access to protected resources will always validate the realm but note
        # that the realm is now tied to the access token and not provided by
        # the client.
        valid_realm = self.request_validator.validate_realms(request.client_key,
                                                             request.resource_owner_key, request, uri=request.uri,
                                                             realms=realms)

        valid_signature = self._check_signature(request)

        # log the results to the validator_log
        # this lets us handle internal reporting and analysis
        request.validator_log['client'] = valid_client
        request.validator_log['resource_owner'] = valid_resource_owner
        request.validator_log['realm'] = valid_realm
        request.validator_log['signature'] = valid_signature

        # We delay checking validity until the very end, using dummy values for
        # calculations and fetching secrets/keys to ensure the flow of every
        # request remains almost identical regardless of whether valid values
        # have been supplied. This ensures near constant time execution and
        # prevents malicious users from guessing sensitive information
        v = all((valid_client, valid_resource_owner, valid_realm,
                 valid_signature))
        if not v:
            log.info("[Failure] request verification failed.")
            log.info("Valid client: %s", valid_client)
            log.info("Valid token: %s", valid_resource_owner)
            log.info("Valid realm: %s", valid_realm)
            log.info("Valid signature: %s", valid_signature)
        return v, request