File: //usr/lib/python3/dist-packages/sos/policies/distros/__init__.py
# Copyright (C) 2020 Red Hat, Inc., Jake Hunsaker <jhunsake@redhat.com>
# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.
import os
import re
from getpass import getpass
from sos import _sos as _
from sos.policies import Policy
from sos.policies.init_systems import InitSystem
from sos.policies.init_systems.systemd import SystemdInit
from sos.policies.runtimes.crio import CrioContainerRuntime
from sos.policies.runtimes.podman import PodmanContainerRuntime
from sos.policies.runtimes.docker import DockerContainerRuntime
from sos.utilities import shell_out, is_executable, bold
try:
    import requests
    REQUESTS_LOADED = True
except ImportError:
    REQUESTS_LOADED = False
# Container environment variables for detecting if we're in a container
ENV_CONTAINER = 'container'
ENV_HOST_SYSROOT = 'HOST'
class LinuxPolicy(Policy):
    """This policy is meant to be an abc class that provides common
    implementations used in Linux distros"""
    distro = "Linux"
    vendor = "None"
    PATH = "/bin:/sbin:/usr/bin:/usr/sbin"
    init = None
    # _ prefixed class attrs are used for storing any vendor-defined defaults
    # the non-prefixed attrs are used by the upload methods, and will be set
    # to the cmdline/config file values, if provided. If not provided, then
    # those attrs will be set to the _ prefixed values as a fallback.
    # TL;DR Use _upload_* for policy default values, use upload_* when wanting
    # to actual use the value in a method/override
    _upload_url = None
    _upload_directory = '/'
    _upload_user = None
    _upload_password = None
    _upload_method = None
    default_container_runtime = 'docker'
    _preferred_hash_name = None
    upload_url = None
    upload_user = None
    upload_password = None
    # collector-focused class attrs
    containerized = False
    container_image = None
    sos_path_strip = None
    sos_pkg_name = None
    sos_bin_path = '/usr/bin'
    sos_container_name = 'sos-collector-tmp'
    container_version_command = None
    container_authfile = None
    def __init__(self, sysroot=None, init=None, probe_runtime=True,
                 remote_exec=None):
        super(LinuxPolicy, self).__init__(sysroot=sysroot,
                                          probe_runtime=probe_runtime,
                                          remote_exec=remote_exec)
        if sysroot:
            self.sysroot = sysroot
        else:
            self.sysroot = self._container_init() or '/'
        self.init_kernel_modules()
        if init is not None:
            self.init_system = init
        elif os.path.isdir("/run/systemd/system/"):
            self.init_system = SystemdInit(chroot=self.sysroot)
        else:
            self.init_system = InitSystem()
        self.runtimes = {}
        if self.probe_runtime:
            _crun = [
                PodmanContainerRuntime(policy=self),
                DockerContainerRuntime(policy=self),
                CrioContainerRuntime(policy=self)
            ]
            for runtime in _crun:
                if runtime.check_is_active():
                    self.runtimes[runtime.name] = runtime
                    if runtime.name == self.default_container_runtime:
                        self.runtimes['default'] = self.runtimes[runtime.name]
                    self.runtimes[runtime.name].load_container_info()
            if self.runtimes and 'default' not in self.runtimes.keys():
                # still allow plugins to query a runtime present on the system
                # even if that is not the policy default one
                idx = list(self.runtimes.keys())
                self.runtimes['default'] = self.runtimes[idx[0]]
    @classmethod
    def set_forbidden_paths(cls):
        return [
            '/etc/passwd',
            '/etc/shadow'
        ]
    def kernel_version(self):
        return self.release
    def host_name(self):
        return self.hostname
    def is_kernel_smp(self):
        return self.smp
    def get_arch(self):
        return self.machine
    def get_local_name(self):
        """Returns the name usd in the pre_work step"""
        return self.host_name()
    def sanitize_filename(self, name):
        return re.sub(r"[^-a-z,A-Z.0-9]", "", name)
    @classmethod
    def display_help(cls, section):
        if cls == LinuxPolicy:
            cls.display_self_help(section)
        else:
            section.set_title("%s Distribution Policy" % cls.distro)
            cls.display_distro_help(section)
    @classmethod
    def display_self_help(cls, section):
        section.set_title("SoS Distribution Policies")
        section.add_text(
            'Distributions supported by SoS will each have a specific policy '
            'defined for them, to ensure proper operation of SoS on those '
            'systems.'
        )
    @classmethod
    def display_distro_help(cls, section):
        if cls.__doc__ and cls.__doc__ is not LinuxPolicy.__doc__:
            section.add_text(cls.__doc__)
        else:
            section.add_text(
                '\nDetailed help information for this policy is not available'
            )
        # instantiate the requested policy so we can report more interesting
        # information like $PATH and loaded presets
        _pol = cls(None, None, False)
        section.add_text(
            "Default --upload location: %s" % _pol._upload_url
        )
        section.add_text(
            "Default container runtime: %s" % _pol.default_container_runtime,
            newline=False
        )
        section.add_text(
            "$PATH used when running report: %s" % _pol.PATH,
            newline=False
        )
        refsec = section.add_section('Reference URLs')
        for url in cls.vendor_urls:
            refsec.add_text(
                "{:>8}{:<30}{:<40}".format(' ', url[0], url[1]),
                newline=False
            )
        presec = section.add_section('Presets Available With This Policy\n')
        presec.add_text(
            bold(
                "{:>8}{:<20}{:<45}{:<30}".format(' ', 'Preset Name',
                                                 'Description',
                                                 'Enabled Options')
            ),
            newline=False
        )
        for preset in _pol.presets:
            _preset = _pol.presets[preset]
            _opts = ' '.join(_preset.opts.to_args())
            presec.add_text(
                "{:>8}{:<20}{:<45}{:<30}".format(
                    ' ', preset, _preset.desc, _opts
                ),
                newline=False
            )
    def _container_init(self):
        """Check if sos is running in a container and perform container
        specific initialisation based on ENV_HOST_SYSROOT.
        """
        if ENV_CONTAINER in os.environ:
            if os.environ[ENV_CONTAINER] in ['docker', 'oci', 'podman']:
                self._in_container = True
                if ENV_HOST_SYSROOT in os.environ:
                    if not os.environ[ENV_HOST_SYSROOT]:
                        # guard against blank/improperly unset values
                        return None
                    self._tmp_dir = os.path.abspath(
                        os.environ[ENV_HOST_SYSROOT] + self._tmp_dir
                    )
                    return os.environ[ENV_HOST_SYSROOT]
        return None
    def init_kernel_modules(self):
        """Obtain a list of loaded kernel modules to reference later for plugin
        enablement and SoSPredicate checks
        """
        self.kernel_mods = []
        release = os.uname().release
        # first load modules from lsmod
        lines = shell_out("lsmod", timeout=0, chroot=self.sysroot).splitlines()
        self.kernel_mods.extend([
            line.split()[0].strip() for line in lines[1:]
        ])
        # next, include kernel builtins
        builtins = self.join_sysroot(
            "/usr/lib/modules/%s/modules.builtin" % release
        )
        try:
            with open(builtins, "r") as mfile:
                for line in mfile:
                    kmod = line.split('/')[-1].split('.ko')[0]
                    self.kernel_mods.append(kmod)
        except IOError:
            pass
        # finally, parse kconfig looking for specific kconfig strings that
        # have been verified to not appear in either lsmod or modules.builtin
        # regardless of how they are built
        config_strings = {
            'devlink': 'CONFIG_NET_DEVLINK',
            'dm_mod': 'CONFIG_BLK_DEV_DM'
        }
        booted_config = self.join_sysroot("/boot/config-%s" % release)
        kconfigs = []
        try:
            with open(booted_config, "r") as kfile:
                for line in kfile:
                    if '=y' in line:
                        kconfigs.append(line.split('=y')[0])
        except IOError:
            pass
        for builtin in config_strings:
            if config_strings[builtin] in kconfigs:
                self.kernel_mods.append(builtin)
    def join_sysroot(self, path):
        if self.sysroot and self.sysroot != '/':
            path = os.path.join(self.sysroot, path.lstrip('/'))
        return path
    def pre_work(self):
        # this method will be called before the gathering begins
        cmdline_opts = self.commons['cmdlineopts']
        caseid = cmdline_opts.case_id if cmdline_opts.case_id else ""
        # Set the cmdline settings to the class attrs that are referenced later
        # The policy default '_' prefixed versions of these are untouched to
        # allow fallback
        self.upload_url = cmdline_opts.upload_url
        self.upload_user = cmdline_opts.upload_user
        self.upload_directory = cmdline_opts.upload_directory
        self.upload_password = cmdline_opts.upload_pass
        self.upload_archive_name = ''
        if not cmdline_opts.batch and not \
                cmdline_opts.quiet:
            try:
                if caseid:
                    self.commons['cmdlineopts'].case_id = caseid
                else:
                    self.commons['cmdlineopts'].case_id = input(
                        _("Optionally, please enter the case id that you are "
                          "generating this report for [%s]: ") % caseid
                    )
                # Policies will need to handle the prompts for user information
                if cmdline_opts.upload and self.get_upload_url():
                    self.prompt_for_upload_user()
                    self.prompt_for_upload_password()
                self._print()
            except KeyboardInterrupt:
                self._print()
                raise
        if cmdline_opts.case_id:
            self.case_id = cmdline_opts.case_id
        return
    def prompt_for_upload_user(self):
        """Should be overridden by policies to determine if a user needs to
        be provided or not
        """
        if not self.get_upload_user():
            msg = "Please provide upload user for %s: " % self.get_upload_url()
            self.upload_user = input(_(msg))
    def prompt_for_upload_password(self):
        """Should be overridden by policies to determine if a password needs to
        be provided for upload or not
        """
        if not self.get_upload_password() and (self.get_upload_user() !=
                                               self._upload_user):
            msg = ("Please provide the upload password for %s: "
                   % self.get_upload_user())
            self.upload_password = getpass(msg)
    def upload_archive(self, archive):
        """
        Entry point for sos attempts to upload the generated archive to a
        policy or user specified location.
        Curerntly there is support for HTTPS, SFTP, and FTP. HTTPS uploads are
        preferred for policy-defined defaults.
        Policies that need to override uploading methods should override the
        respective upload_https(), upload_sftp(), and/or upload_ftp() methods
        and should NOT override this method.
        :param archive: The archive filepath to use for upload
        :type archive: ``str``
        In order to enable this for a policy, that policy needs to implement
        the following:
        Required Class Attrs
        :_upload_url:     The default location to use. Note these MUST include
                          protocol header
        :_upload_user:    Default username, if any else None
        :_upload_password: Default password, if any else None
        The following Class Attrs may optionally be overidden by the Policy
        :_upload_directory:     Default FTP server directory, if any
        The following methods may be overridden by ``Policy`` as needed
        `prompt_for_upload_user()`
            Determines if sos should prompt for a username or not.
        `get_upload_user()`
            Determines if the default or a different username should be used
        `get_upload_https_auth()`
            Format authentication data for HTTPS uploads
        `get_upload_url_string()`
            Print a more human-friendly string than vendor URLs
        """
        self.upload_archive_name = archive
        if not self.upload_url:
            self.upload_url = self.get_upload_url()
        if not self.upload_url:
            raise Exception("No upload destination provided by policy or by "
                            "--upload-url")
        upload_func = self._determine_upload_type()
        print(_("Attempting upload to %s" % self.get_upload_url_string()))
        return upload_func()
    def _determine_upload_type(self):
        """Based on the url provided, determine what type of upload to attempt.
        Note that this requires users to provide a FQDN address, such as
        https://myvendor.com/api or ftp://myvendor.com instead of
        myvendor.com/api or myvendor.com
        """
        prots = {
            'ftp': self.upload_ftp,
            'sftp': self.upload_sftp,
            'https': self.upload_https
        }
        if self.commons['cmdlineopts'].upload_protocol in prots.keys():
            return prots[self.commons['cmdlineopts'].upload_protocol]
        elif '://' not in self.upload_url:
            raise Exception("Must provide protocol in upload URL")
        prot, url = self.upload_url.split('://')
        if prot not in prots.keys():
            raise Exception("Unsupported or unrecognized protocol: %s" % prot)
        return prots[prot]
    def get_upload_https_auth(self, user=None, password=None):
        """Formats the user/password credentials using basic auth
        :param user: The username for upload
        :type user: ``str``
        :param password: Password for `user` to use for upload
        :type password: ``str``
        :returns: The user/password auth suitable for use in reqests calls
        :rtype: ``requests.auth.HTTPBasicAuth()``
        """
        if not user:
            user = self.get_upload_user()
        if not password:
            password = self.get_upload_password()
        return requests.auth.HTTPBasicAuth(user, password)
    def get_upload_url(self):
        """Helper function to determine if we should use the policy default
        upload url or one provided by the user
        :returns: The URL to use for upload
        :rtype: ``str``
        """
        return self.upload_url or self._upload_url
    def get_upload_url_string(self):
        """Used by distro policies to potentially change the string used to
        report upload location from the URL to a more human-friendly string
        """
        return self.get_upload_url()
    def get_upload_user(self):
        """Helper function to determine if we should use the policy default
        upload user or one provided by the user
        :returns: The username to use for upload
        :rtype: ``str``
        """
        return (os.getenv('SOSUPLOADUSER', None) or
                self.upload_user or
                self._upload_user)
    def get_upload_password(self):
        """Helper function to determine if we should use the policy default
        upload password or one provided by the user
        A user provided password, either via option or the 'SOSUPLOADPASSWORD'
        environment variable will have precendent over any policy value
        :returns: The password to use for upload
        :rtype: ``str``
        """
        return (os.getenv('SOSUPLOADPASSWORD', None) or
                self.upload_password or
                self._upload_password)
    def upload_sftp(self, user=None, password=None):
        """Attempts to upload the archive to an SFTP location.
        Due to the lack of well maintained, secure, and generally widespread
        python libraries for SFTP, sos will shell-out to the system's local ssh
        installation in order to handle these uploads.
        Do not override this method with one that uses python-paramiko, as the
        upstream sos team will reject any PR that includes that dependency.
        """
        # if we somehow don't have sftp available locally, fail early
        if not is_executable('sftp'):
            raise Exception('SFTP is not locally supported')
        # soft dependency on python3-pexpect, which we need to use to control
        # sftp login since as of this writing we don't have a viable solution
        # via ssh python bindings commonly available among downstreams
        try:
            import pexpect
        except ImportError:
            raise Exception('SFTP upload requires python3-pexpect, which is '
                            'not currently installed')
        sftp_connected = False
        if not user:
            user = self.get_upload_user()
        if not password:
            password = self.get_upload_password()
        # need to strip the protocol prefix here
        sftp_url = self.get_upload_url().replace('sftp://', '')
        sftp_cmd = "sftp -oStrictHostKeyChecking=no %s@%s" % (user, sftp_url)
        ret = pexpect.spawn(sftp_cmd, encoding='utf-8')
        sftp_expects = [
            u'sftp>',
            u'password:',
            u'Connection refused',
            pexpect.TIMEOUT,
            pexpect.EOF
        ]
        idx = ret.expect(sftp_expects, timeout=15)
        if idx == 0:
            sftp_connected = True
        elif idx == 1:
            ret.sendline(password)
            pass_expects = [
                u'sftp>',
                u'Permission denied',
                pexpect.TIMEOUT,
                pexpect.EOF
            ]
            sftp_connected = ret.expect(pass_expects, timeout=10) == 0
            if not sftp_connected:
                ret.close()
                raise Exception("Incorrect username or password for %s"
                                % self.get_upload_url_string())
        elif idx == 2:
            raise Exception("Connection refused by %s. Incorrect port?"
                            % self.get_upload_url_string())
        elif idx == 3:
            raise Exception("Timeout hit trying to connect to %s"
                            % self.get_upload_url_string())
        elif idx == 4:
            raise Exception("Unexpected error trying to connect to sftp: %s"
                            % ret.before)
        if not sftp_connected:
            ret.close()
            raise Exception("Unable to connect via SFTP to %s"
                            % self.get_upload_url_string())
        put_cmd = 'put %s %s' % (self.upload_archive_name,
                                 self._get_sftp_upload_name())
        ret.sendline(put_cmd)
        put_expects = [
            u'100%',
            pexpect.TIMEOUT,
            pexpect.EOF,
            u'No such file or directory'
        ]
        put_success = ret.expect(put_expects, timeout=180)
        if put_success == 0:
            ret.sendline('bye')
            return True
        elif put_success == 1:
            raise Exception("Timeout expired while uploading")
        elif put_success == 2:
            raise Exception("Unknown error during upload: %s" % ret.before)
        elif put_success == 3:
            raise Exception("Unable to write archive to destination")
        else:
            raise Exception("Unexpected response from server: %s" % ret.before)
    def _get_sftp_upload_name(self):
        """If a specific file name pattern is required by the SFTP server,
        override this method in the relevant Policy. Otherwise the archive's
        name on disk will be used
        :returns:       Filename as it will exist on the SFTP server
        :rtype:         ``str``
        """
        fname = self.upload_archive_name.split('/')[-1]
        if self.upload_directory:
            fname = os.path.join(self.upload_directory, fname)
        return fname
    def _upload_https_put(self, archive, verify=True):
        """If upload_https() needs to use requests.put(), use this method.
        Policies should override this method instead of the base upload_https()
        :param archive:     The open archive file object
        """
        return requests.put(self.get_upload_url(), data=archive,
                            auth=self.get_upload_https_auth(),
                            verify=verify)
    def _get_upload_headers(self):
        """Define any needed headers to be passed with the POST request here
        """
        return {}
    def _upload_https_post(self, archive, verify=True):
        """If upload_https() needs to use requests.post(), use this method.
        Policies should override this method instead of the base upload_https()
        :param archive:     The open archive file object
        """
        files = {
            'file': (archive.name.split('/')[-1], archive,
                     self._get_upload_headers())
        }
        return requests.post(self.get_upload_url(), files=files,
                             auth=self.get_upload_https_auth(),
                             verify=verify)
    def upload_https(self):
        """Attempts to upload the archive to an HTTPS location.
        :returns: ``True`` if upload is successful
        :rtype: ``bool``
        :raises: ``Exception`` if upload was unsuccessful
        """
        if not REQUESTS_LOADED:
            raise Exception("Unable to upload due to missing python requests "
                            "library")
        with open(self.upload_archive_name, 'rb') as arc:
            if self.commons['cmdlineopts'].upload_method == 'auto':
                method = self._upload_method
            else:
                method = self.commons['cmdlineopts'].upload_method
            verify = self.commons['cmdlineopts'].upload_no_ssl_verify is False
            if method == 'put':
                r = self._upload_https_put(arc, verify)
            else:
                r = self._upload_https_post(arc, verify)
            if r.status_code != 200 and r.status_code != 201:
                if r.status_code == 401:
                    raise Exception(
                        "Authentication failed: invalid user credentials"
                    )
                raise Exception("POST request returned %s: %s"
                                % (r.status_code, r.reason))
            return True
    def upload_ftp(self, url=None, directory=None, user=None, password=None):
        """Attempts to upload the archive to either the policy defined or user
        provided FTP location.
        :param url: The URL to upload to
        :type url: ``str``
        :param directory: The directory on the FTP server to write to
        :type directory: ``str`` or ``None``
        :param user: The user to authenticate with
        :type user: ``str``
        :param password: The password to use for `user`
        :type password: ``str``
        :returns: ``True`` if upload is successful
        :rtype: ``bool``
        :raises: ``Exception`` if upload in unsuccessful
        """
        try:
            import ftplib
            import socket
        except ImportError:
            # socket is part of the standard library, should only fail here on
            # ftplib
            raise Exception("missing python ftplib library")
        if not url:
            url = self.get_upload_url()
        if url is None:
            raise Exception("no FTP server specified by policy, use --upload-"
                            "url to specify a location")
        url = url.replace('ftp://', '')
        if not user:
            user = self.get_upload_user()
        if not password:
            password = self.get_upload_password()
        if not directory:
            directory = self.upload_directory or self._upload_directory
        try:
            session = ftplib.FTP(url, user, password, timeout=15)
            if not session:
                raise Exception("connection failed, did you set a user and "
                                "password?")
            session.cwd(directory)
        except socket.timeout:
            raise Exception("timeout hit while connecting to %s" % url)
        except socket.gaierror:
            raise Exception("unable to connect to %s" % url)
        except ftplib.error_perm as err:
            errno = str(err).split()[0]
            if errno == '503':
                raise Exception("could not login as '%s'" % user)
            if errno == '530':
                raise Exception("invalid password for user '%s'" % user)
            if errno == '550':
                raise Exception("could not set upload directory to %s"
                                % directory)
            raise Exception("error trying to establish session: %s"
                            % str(err))
        try:
            with open(self.upload_archive_name, 'rb') as _arcfile:
                session.storbinary(
                    "STOR %s" % self.upload_archive_name.split('/')[-1],
                    _arcfile
                )
            session.quit()
            return True
        except IOError:
            raise Exception("could not open archive file")
    def set_sos_prefix(self):
        """If sosreport commands need to always be prefixed with something,
        for example running in a specific container image, then it should be
        defined here.
        If no prefix should be set, return an empty string instead of None.
        """
        return ''
    def set_cleanup_cmd(self):
        """If a host requires additional cleanup, the command should be set and
        returned here
        """
        return ''
    def create_sos_container(self, image=None, auth=None, force_pull=False):
        """Returns the command that will create the container that will be
        used for running commands inside a container on hosts that require it.
        This will use the container runtime defined for the host type to
        launch a container. From there, we use the defined runtime to exec into
        the container's namespace.
        :param image:   The name of the image if not using the policy default
        :type image:    ``str`` or ``None``
        :param auth:    The auth string required by the runtime to pull an
                        image from the registry
        :type auth:     ``str`` or ``None``
        :param force_pull:  Should the runtime forcibly pull the image
        :type force_pull:   ``bool``
        :returns:   The command to execute to launch the temp container
        :rtype:     ``str``
        """
        return ''
    def restart_sos_container(self):
        """Restarts the container created for sos collect if it has stopped.
        This is called immediately after create_sos_container() as the command
        to create the container will exit and the container will stop. For
        current container runtimes, subsequently starting the container will
        default to opening a bash shell in the container to keep it running,
        thus allowing us to exec into it again.
        """
        return "%s start %s" % (self.container_runtime,
                                self.sos_container_name)
    def format_container_command(self, cmd):
        """Returns the command that allows us to exec into the created
        container for sos collect.
        :param cmd: The command to run in the sos container
        :type cmd: ``str``
        :returns: The command to execute to run `cmd` in the container
        :rtype: ``str``
        """
        if self.container_runtime:
            return '%s exec %s %s' % (self.container_runtime,
                                      self.sos_container_name,
                                      cmd)
        else:
            return cmd
class GenericLinuxPolicy(LinuxPolicy):
    """This Policy will be returned if no other policy can be loaded. This
    should allow for IndependentPlugins to be executed on any system"""
    vendor_urls = [('Upstream Project', 'https://github.com/sosreport/sos')]
    vendor = 'SoS'
    vendor_text = ('SoS was unable to determine that the distribution of this '
                   'system is supported, and has loaded a generic '
                   'configuration. This may not provide desired behavior, and '
                   'users are encouraged to request a new distribution-specifc'
                   ' policy at the GitHub project above.\n')
# vim: set et ts=4 sw=4 :