HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/sos/policies/runtimes/__init__.py
# Copyright (C) 2020 Red Hat, Inc., Jake Hunsaker <jhunsake@redhat.com>

# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.

import re

from pipes import quote
from sos.utilities import sos_get_command_output, is_executable


class ContainerRuntime():
    """Encapsulates a container runtime that provides the ability to plugins to
    check runtime status, check for the presence of specific containers, and
    to format commands to run in those containers

    :param policy: The loaded policy for the system
    :type policy: ``Policy()``

    :cvar name: The name of the container runtime, e.g. 'podman'
    :vartype name: ``str``

    :cvar containers: A list of containers known to the runtime
    :vartype containers: ``list``

    :cvar images: A list of images known to the runtime
    :vartype images: ``list``

    :cvar binary: The binary command to run for the runtime, must exit within
                  $PATH
    :vartype binary: ``str``
    """

    name = 'Undefined'
    containers = []
    images = []
    volumes = []
    binary = ''
    active = False

    def __init__(self, policy=None):
        self.policy = policy
        self.run_cmd = "%s exec " % self.binary

    def load_container_info(self):
        """If this runtime is found to be active, attempt to load information
        on the objects existing in the runtime.
        """
        self.containers = self.get_containers()
        self.images = self.get_images()
        self.volumes = self.get_volumes()

    def check_is_active(self):
        """Check to see if the container runtime is both present AND active.

        Active in this sense means that the runtime can be used to glean
        information about the runtime itself and containers that are running.

        :returns: ``True`` if the runtime is active, else ``False``
        :rtype: ``bool``
        """
        if is_executable(self.binary, self.policy.sysroot):
            self.active = True
            return True
        return False

    def check_can_copy(self):
        """Check if the runtime supports copying files out of containers and
        onto the host filesystem
        """
        return True

    def get_containers(self, get_all=False):
        """Get a list of containers present on the system.

        :param get_all: If set, include stopped containers as well
        :type get_all: ``bool``
        """
        containers = []
        _cmd = "%s ps %s" % (self.binary, '-a' if get_all else '')
        if self.active:
            out = sos_get_command_output(_cmd, chroot=self.policy.sysroot)
            if out['status'] == 0:
                for ent in out['output'].splitlines()[1:]:
                    ent = ent.split()
                    # takes the form (container_id, container_name)
                    containers.append((ent[0], ent[-1]))
        return containers

    def get_container_by_name(self, name):
        """Get the container ID for the container matching the provided
        name

        :param name: The name of the container, note this can be a regex
        :type name: ``str``

        :returns: The id of the first container to match `name`, else ``None``
        :rtype: ``str``
        """
        if not self.active or name is None:
            return None
        for c in self.containers:
            if re.match(name, c[1]):
                return c[0]
        return None

    def get_images(self):
        """Get a list of images present on the system

        :returns: A list of 2-tuples containing (image_name, image_id)
        :rtype: ``list``
        """
        images = []
        fmt = '{{lower .Repository}}:{{lower .Tag}} {{lower .ID}}'
        if self.active:
            out = sos_get_command_output(
                "%s images --format '%s'" % (self.binary, fmt),
                chroot=self.policy.sysroot
            )
            if out['status'] == 0:
                for ent in out['output'].splitlines():
                    ent = ent.split()
                    # takes the form (image_name, image_id)
                    images.append((ent[0], ent[1]))
        return images

    def get_volumes(self):
        """Get a list of container volumes present on the system

        :returns: A list of volume IDs on the system
        :rtype: ``list``
        """
        vols = []
        if self.active:
            out = sos_get_command_output(
                "%s volume ls" % self.binary,
                chroot=self.policy.sysroot
            )
            if out['status'] == 0:
                for ent in out['output'].splitlines()[1:]:
                    ent = ent.split()
                    vols.append(ent[-1])
        return vols

    def container_exists(self, container):
        """Check if a given container ID or name exists on the system from the
        perspective of the container runtime.

        Note that this will only check _running_ containers

        :param container:       The name or ID of the container
        :type container:        ``str``

        :returns:               True if the container exists, else False
        :rtype:                 ``bool``
        """
        for _contup in self.containers:
            if container in _contup:
                return True
        return False

    def fmt_container_cmd(self, container, cmd, quotecmd):
        """Format a command to run inside a container using the runtime

        :param container: The name or ID of the container in which to run
        :type container: ``str``

        :param cmd: The command to run inside `container`
        :type cmd: ``str``

        :param quotecmd: Whether the cmd should be quoted.
        :type quotecmd: ``bool``

        :returns: Formatted string to run `cmd` inside `container`
        :rtype: ``str``
        """
        if quotecmd:
            quoted_cmd = quote(cmd)
        else:
            quoted_cmd = cmd
        return "%s %s %s" % (self.run_cmd, container, quoted_cmd)

    def fmt_registry_credentials(self, username, password):
        """Format a string to pass to the 'run' command of the runtime to
        enable authorization for pulling the image during `sos collect`, if
        needed using username and optional password creds

        :param username:    The name of the registry user
        :type username:     ``str``

        :param password:    The password of the registry user
        :type password:     ``str`` or ``None``

        :returns:  The string to use to enable a run command to pull the image
        :rtype:    ``str``
        """
        return "--creds=%s%s" % (username, ':' + password if password else '')

    def fmt_registry_authfile(self, authfile):
        """Format a string to pass to the 'run' command of the runtime to
        enable authorization for pulling the image during `sos collect`, if
        needed using an authfile.
        """
        if authfile:
            return "--authfile %s" % authfile
        return ''

    def get_logs_command(self, container):
        """Get the command string used to dump container logs from the
        runtime

        :param container: The name or ID of the container to get logs for
        :type container: ``str``

        :returns: Formatted runtime command to get logs from `container`
        :type: ``str``
        """
        return "%s logs -t %s" % (self.binary, container)

    def get_copy_command(self, container, path, dest, sizelimit=None):
        """Generate the command string used to copy a file out of a container
        by way of the runtime.

        :param container:   The name or ID of the container
        :type container:    ``str``

        :param path:        The path to copy from the container. Note that at
                            this time, no supported runtime supports globbing
        :type path:         ``str``

        :param dest:        The destination on the *host* filesystem to write
                            the file to
        :type dest:         ``str``

        :param sizelimit:   Limit the collection to the last X bytes of the
                            file at PATH
        :type sizelimit:    ``int``

        :returns:   Formatted runtime command to copy a file from a container
        :rtype:     ``str``
        """
        if sizelimit:
            return "%s %s tail -c %s %s" % (self.run_cmd, container, sizelimit,
                                            path)
        return "%s cp %s:%s %s" % (self.binary, container, path, dest)

# vim: set et ts=4 sw=4 :