HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/lib/python3/dist-packages/awscli/customizations/cliinputjson.py
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import json

from awscli.paramfile import get_paramfile, LOCAL_PREFIX_MAP
from awscli.argprocess import ParamError
from awscli.customizations.arguments import OverrideRequiredArgsArgument


def register_cli_input_json(cli):
    cli.register('building-argument-table', add_cli_input_json)


def add_cli_input_json(session, argument_table, **kwargs):
    # This argument cannot support operations with streaming output which
    # is designated by the argument name `outfile`.
    if 'outfile' not in argument_table:
        cli_input_json_argument = CliInputJSONArgument(session)
        cli_input_json_argument.add_to_arg_table(argument_table)


class CliInputJSONArgument(OverrideRequiredArgsArgument):
    """This argument inputs a JSON string as the entire input for a command.

    Ideally, the value to this argument should be a filled out JSON file
    generated by ``--generate-cli-skeleton``. The items in the JSON string
    will not clobber other arguments entered into the command line.
    """
    ARG_DATA = {
        'name': 'cli-input-json',
        'help_text': 'Performs service operation based on the JSON string '
                     'provided. The JSON string follows the format provided '
                     'by ``--generate-cli-skeleton``. If other arguments are '
                     'provided on the command line, the CLI values will override '
                     'the JSON-provided values. It is not possible to pass '
                     'arbitrary binary values using a JSON-provided value as '
                     'the string will be taken literally.'
    }

    def __init__(self, session):
        super(CliInputJSONArgument, self).__init__(session)

    def _register_argument_action(self):
        self._session.register(
            'calling-command.*', self.add_to_call_parameters)
        super(CliInputJSONArgument, self)._register_argument_action()

    def add_to_call_parameters(self, call_parameters, parsed_args,
                               parsed_globals, **kwargs):

        # Check if ``--cli-input-json`` was specified in the command line.
        input_json = getattr(parsed_args, 'cli_input_json', None)
        if input_json is not None:
            # Retrieve the JSON from the file if needed.
            retrieved_json = get_paramfile(input_json, LOCAL_PREFIX_MAP)
            # Nothing was retrieved from the file. So assume the argument
            # is already a JSON string.
            if retrieved_json is None:
                retrieved_json = input_json
            try:
                # Try to load the JSON string into a python dictionary
                input_data = json.loads(retrieved_json)
            except ValueError as e:
                raise ParamError(
                    self.name, "Invalid JSON: %s\nJSON received: %s"
                    % (e, retrieved_json))
            # Add the members from the input JSON to the call parameters.
            self._update_call_parameters(call_parameters, input_data)

    def _update_call_parameters(self, call_parameters, input_data):
        for input_key in input_data.keys():
            # Only add the values to ``call_parameters`` if not already
            # present.
            if input_key not in call_parameters:
                call_parameters[input_key] = input_data[input_key]