HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/lib/python3/dist-packages/awscli/customizations/s3/filters.py
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import fnmatch
import os

from awscli.customizations.s3.utils import split_s3_bucket_key


LOG = logging.getLogger(__name__)


def create_filter(parameters):
    """Given the CLI parameters dict, create a Filter object."""
    # We need to evaluate all the filters based on the source
    # directory.
    if parameters['filters']:
        cli_filters = parameters['filters']
        real_filters = []
        for filter_type, filter_pattern in cli_filters:
            real_filters.append((filter_type.lstrip('-'),
                                 filter_pattern))
        source_location = parameters['src']
        if source_location.startswith('s3://'):
            # This gives us (bucket, keyname) and we want
            # the bucket to be the root dir.
            src_rootdir = _get_s3_root(source_location,
                                       parameters['dir_op'])
        else:
            src_rootdir = _get_local_root(parameters['src'], parameters['dir_op'])

        destination_location = parameters['dest']
        if destination_location.startswith('s3://'):
            dst_rootdir = _get_s3_root(parameters['dest'],
                                       parameters['dir_op'])
        else:
            dst_rootdir = _get_local_root(parameters['dest'],
                                          parameters['dir_op'])

        return Filter(real_filters, src_rootdir, dst_rootdir)
    else:
        return Filter({}, None, None)


def _get_s3_root(source_location, dir_op):
    # Obtain the bucket and the key.
    bucket, key = split_s3_bucket_key(source_location)
    if not dir_op and not key.endswith('/'):
        # If we are not performing an operation on a directory and the key
        # is of the form: ``prefix/key``. We only want ``prefix`` included in
        # the the s3 root and not ``key``.
        key = '/'.join(key.split('/')[:-1])
    # Rejoin the bucket and key back together.
    s3_path = '/'.join([bucket, key])
    return s3_path


def _get_local_root(source_location, dir_op):
    if dir_op:
        rootdir = os.path.abspath(source_location)
    else:
        rootdir = os.path.abspath(os.path.dirname(source_location))
    return rootdir


class Filter(object):
    """
    This is a universal exclude/include filter.
    """
    def __init__(self, patterns, rootdir, dst_rootdir):
        """
        :var patterns: A list of patterns. A pattern consits of a list
            whose first member is a string 'exclude' or 'include'.
            The second member is the actual rule.
        :var rootdir: The root directory where the patterns are evaluated.
            This will generally be the directory of the source location.
        :var dst_rootdir: The destination root directory where the patterns are
            evaluated.  This is only useful when the --delete option is
            also specified.

        """
        self._original_patterns = patterns
        self.patterns = self._full_path_patterns(patterns, rootdir)
        self.dst_patterns = self._full_path_patterns(patterns, dst_rootdir)

    def _full_path_patterns(self, original_patterns, rootdir):
        # We need to transform the patterns into patterns that have
        # the root dir prefixed, so things like ``--exclude "*"``
        # will actually be ['exclude', '/path/to/root/*']
        full_patterns = []
        for pattern in original_patterns:
            full_patterns.append(
                (pattern[0], os.path.join(rootdir, pattern[1])))
        return full_patterns

    def call(self, file_infos):
        """
        This function iterates over through the yielded file_info objects.  It
        determines the type of the file and applies pattern matching to
        determine if the rule applies.  While iterating though the patterns the
        file is assigned a boolean flag to determine if a file should be
        yielded on past the filer.  Anything identified by the exclude filter
        has its flag set to false.  Anything identified by the include filter
        has its flag set to True.  All files begin with the flag set to true.
        Rules listed at the end will overwrite flags thrown by rules listed
        before it.
        """
        for file_info in file_infos:
            file_path = file_info.src
            file_status = (file_info, True)
            for pattern, dst_pattern in zip(self.patterns, self.dst_patterns):
                current_file_status = self._match_pattern(pattern, file_info)
                if current_file_status is not None:
                    file_status = current_file_status
                dst_current_file_status = self._match_pattern(dst_pattern, file_info)
                if dst_current_file_status is not None:
                    file_status = dst_current_file_status
            LOG.debug("=%s final filtered status, should_include: %s",
                      file_path, file_status[1])
            if file_status[1]:
                yield file_info

    def _match_pattern(self, pattern, file_info):
        file_status = None
        file_path = file_info.src
        pattern_type = pattern[0]
        if file_info.src_type == 'local':
            path_pattern = pattern[1].replace('/', os.sep)
        else:
            path_pattern = pattern[1].replace(os.sep, '/')
        is_match = fnmatch.fnmatch(file_path, path_pattern)
        if is_match and pattern_type == 'include':
            file_status = (file_info, True)
            LOG.debug("%s matched include filter: %s",
                        file_path, path_pattern)
        elif is_match and pattern_type == 'exclude':
            file_status = (file_info, False)
            LOG.debug("%s matched exclude filter: %s",
                        file_path, path_pattern)
        else:
            LOG.debug("%s did not match %s filter: %s",
                        file_path, pattern_type, path_pattern)
        return file_status