File: //usr/lib/python3/dist-packages/uaclient/livepatch.py
import datetime
import json
import logging
import re
from functools import lru_cache
from typing import List, Optional, Tuple
from uaclient import (
    event_logger,
    exceptions,
    messages,
    serviceclient,
    system,
    util,
)
from uaclient.data_types import (
    BoolDataValue,
    DataObject,
    Field,
    IncorrectTypeError,
    StringDataValue,
    data_list,
)
from uaclient.files import state_files
HTTP_PROXY_OPTION = "http-proxy"
HTTPS_PROXY_OPTION = "https-proxy"
LIVEPATCH_CMD = "/snap/bin/canonical-livepatch"
LIVEPATCH_API_V1_KERNELS_SUPPORTED = "/v1/api/kernels/supported"
event = event_logger.get_event_logger()
class LivepatchPatchFixStatus(DataObject):
    fields = [
        Field("name", StringDataValue, required=False, dict_key="Name"),
        Field("patched", BoolDataValue, required=False, dict_key="Patched"),
    ]
    def __init__(
        self,
        name: Optional[str],
        patched: Optional[bool],
    ):
        self.name = name
        self.patched = patched
class LivepatchPatchStatus(DataObject):
    fields = [
        Field("state", StringDataValue, required=False, dict_key="State"),
        Field(
            "fixes",
            data_list(LivepatchPatchFixStatus),
            required=False,
            dict_key="Fixes",
        ),
    ]
    def __init__(
        self,
        state: Optional[str],
        fixes: Optional[List[LivepatchPatchFixStatus]],
    ):
        self.state = state
        self.fixes = fixes
class LivepatchStatusStatus(DataObject):
    fields = [
        Field("kernel", StringDataValue, required=False, dict_key="Kernel"),
        Field(
            "livepatch",
            LivepatchPatchStatus,
            required=False,
            dict_key="Livepatch",
        ),
        Field(
            "supported",
            StringDataValue,
            required=False,
            dict_key="Supported",
        ),
    ]
    def __init__(
        self,
        kernel: Optional[str],
        livepatch: Optional[LivepatchPatchStatus],
        supported: Optional[str],
    ):
        self.kernel = kernel
        self.livepatch = livepatch
        self.supported = supported
class LivepatchStatus(DataObject):
    fields = [
        Field(
            "status",
            data_list(LivepatchStatusStatus),
            required=False,
            dict_key="Status",
        ),
    ]
    def __init__(
        self,
        status: Optional[List[LivepatchStatusStatus]],
    ):
        self.status = status
def status() -> Optional[LivepatchStatusStatus]:
    if not is_livepatch_installed():
        logging.debug("canonical-livepatch is not installed")
        return None
    try:
        out, _ = system.subp([LIVEPATCH_CMD, "status", "--format", "json"])
    except exceptions.ProcessExecutionError:
        with util.disable_log_to_console():
            logging.warning(
                "canonical-livepatch returned error when checking status"
            )
        return None
    try:
        status_json = json.loads(out)
    except json.JSONDecodeError:
        with util.disable_log_to_console():
            logging.warning(
                "canonical-livepatch status returned invalid json: {}".format(
                    out
                )
            )
        return None
    try:
        status_root = LivepatchStatus.from_dict(status_json)
    except IncorrectTypeError:
        with util.disable_log_to_console():
            logging.warning(
                "canonical-livepatch status returned unexpected "
                "structure: {}".format(out)
            )
        return None
    if status_root.status is None or len(status_root.status) < 1:
        logging.debug("canonical-livepatch has no status")
        return None
    return status_root.status[0]
class UALivepatchClient(serviceclient.UAServiceClient):
    cfg_url_base_attr = "livepatch_url"
    api_error_cls = exceptions.UrlError
    def is_kernel_supported(
        self, version: str, flavor: str, arch: str, codename: str
    ) -> Optional[bool]:
        """
        :returns: True if supported
                  False if unsupported
                  None if API returns error or ambiguous response
        """
        query_params = {
            "kernel-version": version,
            "flavour": flavor,
            "architecture": arch,
            "codename": codename,
        }
        headers = self.headers()
        try:
            result, _headers = self.request_url(
                LIVEPATCH_API_V1_KERNELS_SUPPORTED,
                query_params=query_params,
                headers=headers,
            )
        except Exception as e:
            with util.disable_log_to_console():
                logging.warning(
                    "error while checking livepatch supported kernels API"
                )
                logging.warning(e)
            return None
        if not isinstance(result, dict):
            logging.warning(
                "livepatch api returned something that isn't a dict"
            )
            return None
        return bool(result.get("Supported", False))
def _on_supported_kernel_cli() -> Optional[bool]:
    lp_status = status()
    if lp_status is None:
        return None
    if lp_status.supported == "supported":
        return True
    if lp_status.supported == "unsupported":
        return False
    return None
def _on_supported_kernel_cache(
    version: str, flavor: str, arch: str, codename: str
) -> Tuple[bool, Optional[bool]]:
    """Check local cache of kernel support
    :return: (is_cache_valid, result)
    """
    try:
        cache_data = state_files.livepatch_support_cache.read()
    except Exception:
        cache_data = None
    if cache_data is not None:
        one_week_ago = datetime.datetime.now(
            datetime.timezone.utc
        ) - datetime.timedelta(days=7)
        if all(
            [
                cache_data.cached_at > one_week_ago,  # less than one week old
                cache_data.version == version,
                cache_data.flavor == flavor,
                cache_data.arch == arch,
                cache_data.codename == codename,
            ]
        ):
            if cache_data.supported is None:
                with util.disable_log_to_console():
                    logging.warning(
                        "livepatch kernel support cache has None value"
                    )
            return (True, cache_data.supported)
    return (False, None)
def _on_supported_kernel_api(
    version: str, flavor: str, arch: str, codename: str
) -> Optional[bool]:
    supported = UALivepatchClient().is_kernel_supported(
        version=version,
        flavor=flavor,
        arch=arch,
        codename=codename,
    )
    # cache response before returning
    state_files.livepatch_support_cache.write(
        state_files.LivepatchSupportCacheData(
            version=version,
            flavor=flavor,
            arch=arch,
            codename=codename,
            supported=supported,
            cached_at=datetime.datetime.now(datetime.timezone.utc),
        )
    )
    if supported is None:
        with util.disable_log_to_console():
            logging.warning(
                "livepatch kernel support API response was ambiguous"
            )
    return supported
@lru_cache(maxsize=None)
def on_supported_kernel() -> Optional[bool]:
    """
    Checks CLI, local cache, and API in that order for kernel support
    If all checks fail to return an authoritative answer, we return None
    """
    # first check cli
    cli_says = _on_supported_kernel_cli()
    if cli_says is not None:
        logging.debug("using livepatch cli for support")
        return cli_says
    # gather required system info to query support
    kernel_info = system.get_kernel_info()
    if (
        kernel_info.flavor is None
        or kernel_info.major is None
        or kernel_info.minor is None
    ):
        logging.warning(
            "unable to determine enough kernel information to "
            "check livepatch support"
        )
        return None
    arch = util.standardize_arch_name(system.get_dpkg_arch())
    codename = system.get_platform_info()["series"]
    lp_api_kernel_ver = "{major}.{minor}".format(
        major=kernel_info.major, minor=kernel_info.minor
    )
    # second check cache
    is_cache_valid, cache_says = _on_supported_kernel_cache(
        lp_api_kernel_ver, kernel_info.flavor, arch, codename
    )
    if is_cache_valid:
        logging.debug("using livepatch support cache")
        return cache_says
    # finally check api
    logging.debug("using livepatch support api")
    return _on_supported_kernel_api(
        lp_api_kernel_ver, kernel_info.flavor, arch, codename
    )
def unconfigure_livepatch_proxy(
    protocol_type: str, retry_sleeps: Optional[List[float]] = None
) -> None:
    """
    Unset livepatch configuration settings for http and https proxies.
    :param protocol_type: String either http or https
    :param retry_sleeps: Optional list of sleep lengths to apply between
        retries. Specifying a list of [0.5, 1] tells subp to retry twice
        on failure; sleeping half a second before the first retry and 1 second
        before the second retry.
    """
    if not is_livepatch_installed():
        return
    system.subp(
        [LIVEPATCH_CMD, "config", "{}-proxy=".format(protocol_type)],
        retry_sleeps=retry_sleeps,
    )
def configure_livepatch_proxy(
    http_proxy: Optional[str] = None,
    https_proxy: Optional[str] = None,
    retry_sleeps: Optional[List[float]] = None,
) -> None:
    """
    Configure livepatch to use http and https proxies.
    :param http_proxy: http proxy to be used by livepatch. If None, it will
                       not be configured
    :param https_proxy: https proxy to be used by livepatch. If None, it will
                        not be configured
    :@param retry_sleeps: Optional list of sleep lengths to apply between
                               snap calls
    """
    from uaclient.entitlements import LivepatchEntitlement
    if http_proxy or https_proxy:
        event.info(
            messages.SETTING_SERVICE_PROXY.format(
                service=LivepatchEntitlement.title
            )
        )
    if http_proxy:
        system.subp(
            [LIVEPATCH_CMD, "config", "http-proxy={}".format(http_proxy)],
            retry_sleeps=retry_sleeps,
        )
    if https_proxy:
        system.subp(
            [LIVEPATCH_CMD, "config", "https-proxy={}".format(https_proxy)],
            retry_sleeps=retry_sleeps,
        )
def get_config_option_value(key: str) -> Optional[str]:
    """
    Gets the config value from livepatch.
    :param key: can be any valid livepatch config option
    :return: the value of the livepatch config option, or None if not set
    """
    out, _ = system.subp([LIVEPATCH_CMD, "config"])
    match = re.search("^{}: (.*)$".format(key), out, re.MULTILINE)
    value = match.group(1) if match else None
    if value:
        # remove quotes if present
        value = re.sub(r"\"(.*)\"", r"\g<1>", value)
    return value.strip() if value else None
def is_livepatch_installed() -> bool:
    return system.which(LIVEPATCH_CMD) is not None