HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/lib/python3/dist-packages/awscli/customizations/datapipeline/translator.py
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import json
from awscli.clidriver import CLIOperationCaller


class PipelineDefinitionError(Exception):
    def __init__(self, msg, definition):
        full_msg = (
            "Error in pipeline definition: %s\n" % msg)
        super(PipelineDefinitionError, self).__init__(full_msg)
        self.msg = msg
        self.definition = definition


# Method to convert the dictionary input to a string
# This is required for escaping
def dict_to_string(dictionary, indent=2):
    return json.dumps(dictionary, indent=indent)


# Method to parse the arguments to get the region value
def get_region(session, parsed_globals):
    region = parsed_globals.region
    if region is None:
        region = session.get_config_variable('region')
    return region


# Method to display the response for a particular CLI operation
def display_response(session, operation_name, result, parsed_globals):
    cli_operation_caller = CLIOperationCaller(session)
    # Calling a private method. Should be changed after the functionality
    # is moved outside CliOperationCaller.
    cli_operation_caller._display_response(
        operation_name, result, parsed_globals)


def api_to_definition(definition):
    # When we're translating from api_response -> definition
    # we have to be careful *not* to mutate the existing
    # response as other code might need to the original
    # api_response.
    if 'pipelineObjects' in definition:
        definition['objects'] = _api_to_objects_definition(
            definition.pop('pipelineObjects'))
    if 'parameterObjects' in definition:
        definition['parameters'] = _api_to_parameters_definition(
            definition.pop('parameterObjects'))
    if 'parameterValues' in definition:
        definition['values'] = _api_to_values_definition(
            definition.pop('parameterValues'))
    return definition


def definition_to_api_objects(definition):
    if 'objects' not in definition:
        raise PipelineDefinitionError('Missing "objects" key', definition)
    api_elements = []
    # To convert to the structure expected by the service,
    # we convert the existing structure to a list of dictionaries.
    # Each dictionary has a 'fields', 'id', and 'name' key.
    for element in definition['objects']:
        try:
            element_id = element.pop('id')
        except KeyError:
            raise PipelineDefinitionError('Missing "id" key of element: %s' %
                                          json.dumps(element), definition)
        api_object = {'id': element_id}
        # If a name is provided, then we use that for the name,
        # otherwise the id is used for the name.
        name = element.pop('name', element_id)
        api_object['name'] = name
        # Now we need the field list.  Each element in the field list is a dict
        # with a 'key', 'stringValue'|'refValue'
        fields = []
        for key, value in sorted(element.items()):
            fields.extend(_parse_each_field(key, value))
        api_object['fields'] = fields
        api_elements.append(api_object)
    return api_elements


def definition_to_api_parameters(definition):
    if 'parameters' not in definition:
        return None
    parameter_objects = []
    for element in definition['parameters']:
        try:
            parameter_id = element.pop('id')
        except KeyError:
            raise PipelineDefinitionError('Missing "id" key of parameter: %s' %
                                          json.dumps(element), definition)
        parameter_object = {'id': parameter_id}
        # Now we need the attribute list.  Each element in the attribute list
        # is a dict with a 'key', 'stringValue'
        attributes = []
        for key, value in sorted(element.items()):
            attributes.extend(_parse_each_field(key, value))
        parameter_object['attributes'] = attributes
        parameter_objects.append(parameter_object)
    return parameter_objects


def definition_to_parameter_values(definition):
    if 'values' not in definition:
        return None
    parameter_values = []
    for key in definition['values']:
        parameter_values.extend(
            _convert_single_parameter_value(key, definition['values'][key]))

    return parameter_values


def _parse_each_field(key, value):
    values = []
    if isinstance(value, list):
        for item in value:
            values.append(_convert_single_field(key, item))
    else:
        values.append(_convert_single_field(key, value))
    return values


def _convert_single_field(key, value):
    field = {'key': key}
    if isinstance(value, dict) and list(value.keys()) == ['ref']:
        field['refValue'] = value['ref']
    else:
        field['stringValue'] = value
    return field


def _convert_single_parameter_value(key, values):
    parameter_values = []
    if isinstance(values, list):
        for each_value in values:
            parameter_value = {'id': key, 'stringValue': each_value}
            parameter_values.append(parameter_value)
    else:
        parameter_value = {'id': key, 'stringValue': values}
        parameter_values.append(parameter_value)
    return parameter_values


def _api_to_objects_definition(api_response):
    pipeline_objects = []
    for element in api_response:
        current = {
            'id': element['id'],
            'name': element['name']
        }
        for field in element['fields']:
            key = field['key']
            if 'stringValue' in field:
                value = field['stringValue']
            else:
                value = {'ref': field['refValue']}
            _add_value(key, value, current)
        pipeline_objects.append(current)
    return pipeline_objects


def _api_to_parameters_definition(api_response):
    parameter_objects = []
    for element in api_response:
        current = {
            'id': element['id']
        }
        for attribute in element['attributes']:
            _add_value(attribute['key'], attribute['stringValue'], current)
        parameter_objects.append(current)
    return parameter_objects


def _api_to_values_definition(api_response):
    pipeline_values = {}
    for element in api_response:
        _add_value(element['id'], element['stringValue'], pipeline_values)
    return pipeline_values


def _add_value(key, value, current_map):
    if key not in current_map:
        current_map[key] = value
    elif isinstance(current_map[key], list):
        # Dupe keys result in values aggregating
        # into a list.
        current_map[key].append(value)
    else:
        converted_list = [current_map[key], value]
        current_map[key] = converted_list