File: //lib/python3/dist-packages/awscli/customizations/s3events.py
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Add S3 specific event streaming output arg."""
from awscli.arguments import CustomArgument
STREAM_HELP_TEXT = 'Filename where the records will be saved'
class DocSectionNotFoundError(Exception):
    pass
def register_event_stream_arg(event_handlers):
    event_handlers.register(
        'building-argument-table.s3api.select-object-content',
        add_event_stream_output_arg)
    event_handlers.register_last(
        'doc-output.s3api.select-object-content',
        replace_event_stream_docs
    )
def add_event_stream_output_arg(argument_table, operation_model,
                                session, **kwargs):
    argument_table['outfile'] = S3SelectStreamOutputArgument(
        name='outfile', help_text=STREAM_HELP_TEXT,
        cli_type_name='string', positional_arg=True,
        stream_key=operation_model.output_shape.serialization['payload'],
        session=session)
def replace_event_stream_docs(help_command, **kwargs):
    doc = help_command.doc
    current = ''
    while current != '======\nOutput\n======':
        try:
            current = doc.pop_write()
        except IndexError:
            # This should never happen, but in the rare case that it does
            # we should be raising something with a helpful error message.
            raise DocSectionNotFoundError(
                'Could not find the "output" section for the command: %s'
                % help_command)
    doc.write('======\nOutput\n======\n')
    doc.write("This command generates no output.  The selected "
              "object content is written to the specified outfile.\n")
class S3SelectStreamOutputArgument(CustomArgument):
    _DOCUMENT_AS_REQUIRED = True
    def __init__(self, stream_key, session, **kwargs):
        super(S3SelectStreamOutputArgument, self).__init__(**kwargs)
        # This is the key in the response body where we can find the
        # streamed contents.
        self._stream_key = stream_key
        self._output_file = None
        self._session = session
    def add_to_params(self, parameters, value):
        self._output_file = value
        self._session.register('after-call.s3.SelectObjectContent',
                               self.save_file)
    def save_file(self, parsed, **kwargs):
        # This method is hooked into after-call which fires
        # before the error checking happens in the client.
        # Therefore if the stream_key is not in the parsed
        # response we immediately return and let the default
        # error handling happen.
        if self._stream_key not in parsed:
            return
        event_stream = parsed[self._stream_key]
        with open(self._output_file, 'wb') as fp:
            for event in event_stream:
                if 'Records' in event:
                    fp.write(event['Records']['Payload'])
        # We don't want to include the streaming param in
        # the returned response, it's not JSON serializable.
        del parsed[self._stream_key]