HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/UpdateManager/Core/MyCache.py
# MyCache.py
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
#
#  Copyright (c) 2004-2008 Canonical
#
#  Author: Michael Vogt <mvo@debian.org>
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License as
#  published by the Free Software Foundation; either version 2 of the
#  License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
#  USA

from __future__ import absolute_import, print_function

import warnings
warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
import apt
import apt_pkg
import logging
import os
try:
    from urllib.error import HTTPError
    from urllib.request import urlopen
    from urllib.parse import urlsplit
except ImportError:
    from urllib2 import HTTPError, urlopen
    from urlparse import urlsplit
try:
    from http.client import BadStatusLine
except ImportError:
    from httplib import BadStatusLine
import socket
import subprocess
import re
import DistUpgrade.DistUpgradeCache
from gettext import gettext as _
try:
    from launchpadlib.launchpad import Launchpad
except ImportError:
    Launchpad = None

SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences"
CHANGELOGS_POOL = "https://changelogs.ubuntu.com/changelogs/pool/"
CHANGELOGS_URI = CHANGELOGS_POOL + "%s/%s/%s/%s_%s/%s"


class HttpsChangelogsUnsupportedError(Exception):
    """ https changelogs with credentials are unsupported because of the
        lack of certitifcation validation in urllib2 which allows MITM
        attacks to steal the credentials
    """
    pass


class MyCache(DistUpgrade.DistUpgradeCache.MyCache):

    CHANGELOG_ORIGIN = "Ubuntu"

    def __init__(self, progress, rootdir=None):
        apt.Cache.__init__(self, progress, rootdir)
        # save for later
        self.rootdir = rootdir
        # raise if we have packages in reqreinst state
        # and let the caller deal with that (runs partial upgrade)
        assert len(self.req_reinstall_pkgs) == 0
        # check if the dpkg journal is ok (we need to do that here
        # too because libapt will only do it when it tries to lock
        # the packaging system)
        assert(not self._dpkgJournalDirty())
        # init the regular cache
        self._initDepCache()
        self.all_changes = {}
        self.all_news = {}
        # on broken packages, try to fix via saveDistUpgrade()
        if self._depcache.broken_count > 0:
            self.saveDistUpgrade()
        assert (self._depcache.broken_count == 0
                and self._depcache.del_count == 0)
        self.launchpad = None
        # generate versioned_kernel_pkgs_regexp for later use
        apt_versioned_kernel_pkgs = apt_pkg.config.value_list(
            "APT::VersionedKernelPackages")
        if apt_versioned_kernel_pkgs:
            self.versioned_kernel_pkgs_regexp = re.compile("(" + "|".join(
                ["^" + p for p in apt_versioned_kernel_pkgs]) + ")")
            running_kernel_version = subprocess.check_output(
                ["uname", "-r"], universal_newlines=True).rstrip()
            self.running_kernel_pkgs_regexp = re.compile("(" + "|".join(
                [("^" + p + ".*" + running_kernel_version)
                 if not p.startswith(".*") else (running_kernel_version + p)
                 for p in apt_versioned_kernel_pkgs]) + ")")
        else:
            self.versioned_kernel_pkgs_regexp = None
            self.running_kernel_pkgs_regexp = None

    def _dpkgJournalDirty(self):
        """
        test if the dpkg journal is dirty
        (similar to debSystem::CheckUpdates)
        """
        d = os.path.dirname(
            apt_pkg.config.find_file("Dir::State::status")) + "/updates"
        for f in os.listdir(d):
            if re.match("[0-9]+", f):
                return True
        return False

    def _initDepCache(self):
        #apt_pkg.config.set("Debug::pkgPolicy","1")
        #self.depcache = apt_pkg.GetDepCache(self.cache)
        #self._depcache = apt_pkg.GetDepCache(self._cache)
        self._depcache.read_pinfile()
        if os.path.exists(SYNAPTIC_PINFILE):
            self._depcache.read_pinfile(SYNAPTIC_PINFILE)
        self._depcache.init()

    def clear(self):
        self._initDepCache()

    @property
    def required_download(self):
        """ get the size of the packages that are required to download """
        pm = apt_pkg.PackageManager(self._depcache)
        fetcher = apt_pkg.Acquire()
        pm.get_archives(fetcher, self._list, self._records)
        return fetcher.fetch_needed

    @property
    def install_count(self):
        return self._depcache.inst_count

    def keep_count(self):
        return self._depcache.keep_count

    @property
    def del_count(self):
        return self._depcache.del_count

    def _check_dependencies(self, target, deps):
        """Return True if any of the dependencies in deps match target."""
        # TODO: handle virtual packages
        for dep_or in deps:
            if not dep_or:
                continue
            match = True
            for base_dep in dep_or:
                if (base_dep.name != target.package.shortname
                    or not apt_pkg.check_dep(
                        target.version, base_dep.relation, base_dep.version)):
                    match = False
            if match:
                return True
        return False

    def find_removal_justification(self, pkg):
        target = pkg.installed
        if not target:
            return False
        for cpkg in self:
            candidate = cpkg.candidate
            if candidate is not None:
                if (self._check_dependencies(
                        target, candidate.get_dependencies("Conflicts"))
                    and self._check_dependencies(
                        target, candidate.get_dependencies("Replaces"))):
                    logging.info(
                        "%s Conflicts/Replaces %s; allowing removal" % (
                            candidate.package.shortname, pkg.shortname))
                    return True
        return False

    def saveDistUpgrade(self):
        """ this functions mimics a upgrade but will never remove anything """
        #self._apply_dselect_upgrade()
        self._depcache.upgrade(True)
        wouldDelete = self._depcache.del_count
        if wouldDelete > 0:
            deleted_pkgs = [pkg for pkg in self if pkg.marked_delete]
            assert wouldDelete == len(deleted_pkgs)
            for pkg in deleted_pkgs:
                if self.find_removal_justification(pkg):
                    wouldDelete -= 1
        if wouldDelete > 0:
            self.clear()
            assert (self._depcache.broken_count == 0
                    and self._depcache.del_count == 0)
        else:
            assert self._depcache.broken_count == 0
        #self._apply_dselect_upgrade()
        self._depcache.upgrade()
        return wouldDelete

    def _strip_epoch(self, verstr):
        " strip of the epoch "
        vers_no_epoch = verstr.split(":")
        if len(vers_no_epoch) > 1:
            verstr = "".join(vers_no_epoch[1:])
        return verstr

    def _get_changelog_or_news(self, name, fname, strict_versioning=False,
                               changelogs_uri=None):
        " helper that fetches the file in question "
        # don't touch the gui in this function, it needs to be thread-safe
        pkg = self[name]

        # get the src package name
        srcpkg = pkg.candidate.source_name

        # assume "main" section
        src_section = "main"
        # use the section of the candidate as a starting point
        section = pkg._pcache._depcache.get_candidate_ver(pkg._pkg).section

        # get the source version
        srcver_epoch = pkg.candidate.source_version
        srcver = self._strip_epoch(srcver_epoch)

        split_section = section.split("/")
        if len(split_section) > 1:
            src_section = split_section[0]

        # lib is handled special
        prefix = srcpkg[0]
        if srcpkg.startswith("lib"):
            prefix = "lib" + srcpkg[3]

        # the changelogs_uri argument overrides the default changelogs_uri,
        # this is useful for e.g. PPAs where we construct the changelogs
        # path differently
        if changelogs_uri:
            uri = changelogs_uri
        else:
            uri = CHANGELOGS_URI % (src_section, prefix, srcpkg, srcpkg,
                                    srcver, fname)

        # https uris are not supported when they contain a username/password
        # because the urllib2 https implementation will not check certificates
        # and so its possible to do a man-in-the-middle attack to steal the
        # credentials
        res = urlsplit(uri)
        if res.scheme == "https" and res.username:
            raise HttpsChangelogsUnsupportedError(
                "https locations with username/password are not"
                "supported to fetch changelogs")

        # print("Trying: %s " % uri)
        changelog = urlopen(uri)
        #print(changelog.read())
        # do only get the lines that are new
        alllines = ""
        regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(srcpkg))

        while True:
            line = changelog.readline().decode("UTF-8", "replace")
            if line == "":
                break
            match = re.match(regexp, line)
            if match:
                # strip epoch from installed version
                # and from changelog too
                installed = getattr(pkg.installed, "version", None)
                if installed and ":" in installed:
                    installed = installed.split(":", 1)[1]
                changelogver = match.group(1)
                if changelogver and ":" in changelogver:
                    changelogver = changelogver.split(":", 1)[1]
                # we test for "==" here for changelogs
                # to ensure that the version
                # is actually really in the changelog - if not
                # just display it all, this catches cases like:
                # gcc-defaults with "binver=4.3.1" and srcver=1.76
                #
                # for NEWS.Debian we do require the changelogver > installed
                if strict_versioning:
                    if (installed
                            and apt_pkg.version_compare(changelogver,
                                                        installed) < 0):
                        break
                else:
                    if (installed
                            and apt_pkg.version_compare(changelogver,
                                                        installed) == 0):
                        break
            alllines = alllines + line
        return alllines

    def _extract_ppa_changelog_uri(self, name):
        """Return the changelog URI from the Launchpad API

        Return None in case of an error.
        """
        if not Launchpad:
            logging.warning("Launchpadlib not available, cannot retrieve PPA "
                            "changelog")
            return None

        cdt = self[name].candidate
        for uri in cdt.uris:
            if urlsplit(uri).hostname != 'ppa.launchpad.net':
                continue
            match = re.search('http.*/(.*)/(.*)/ubuntu/.*', uri)
            if match is not None:
                user, ppa = match.group(1), match.group(2)
                break
        else:
            logging.error("Unable to find a valid PPA candidate URL.")
            return

        # Login on launchpad if we are not already
        if self.launchpad is None:
            self.launchpad = Launchpad.login_anonymously('update-manager',
                                                         'production',
                                                         version='devel')

        archive = self.launchpad.archives.getByReference(
            reference='~%s/ubuntu/%s' % (user, ppa)
        )
        if archive is None:
            logging.error("Unable to retrieve the archive from the Launchpad "
                          "API.")
            return

        spphs = archive.getPublishedSources(source_name=cdt.source_name,
                                            exact_match=True,
                                            version=cdt.source_version)
        if not spphs:
            logging.error("No published sources were retrieved from the "
                          "Launchpad API.")
            return

        return spphs[0].changelogUrl()

    def _guess_third_party_changelogs_uri_by_source(self, name):
        pkg = self[name]
        deb_uri = pkg.candidate.uri
        if deb_uri is None:
            return None
        srcrec = pkg.candidate.record.get("Source")
        if not srcrec:
            return None
        # srcpkg can be "apt" or "gcc-default (1.0)"
        srcpkg = srcrec.split("(")[0].strip()
        if "(" in srcrec:
            srcver = srcrec.split("(")[1].rstrip(")")
        else:
            srcver = pkg.candidate.source_version
        base_uri = deb_uri.rpartition("/")[0]
        return base_uri + "/%s_%s.changelog" % (srcpkg, srcver)

    def _guess_third_party_changelogs_uri_by_binary(self, name):
        """ guess changelogs uri based on ArchiveURI by replacing .deb
            with .changelog
        """
        # there is always a pkg and a pkg.candidate, no need to add
        # check here
        pkg = self[name]
        deb_uri = pkg.candidate.uri
        if deb_uri:
            return "%s.changelog" % deb_uri.rsplit(".", 1)[0]
        return None

    def get_news_and_changelog(self, name, lock):
        self.get_news(name)
        self.get_changelog(name)
        try:
            lock.release()
        except Exception:
            pass

    def get_news(self, name):
        " get the NEWS.Debian file from the changelogs location "
        try:
            news = self._get_changelog_or_news(name, "NEWS.Debian", True)
        except Exception:
            return
        if news:
            self.all_news[name] = news

    def _fetch_changelog_for_third_party_package(self, name, origins):
        # Special case for PPAs
        changelogs_uri_ppa = None
        for origin in origins:
            if origin.origin.startswith('LP-PPA-'):
                try:
                    changelogs_uri_ppa = self._extract_ppa_changelog_uri(name)
                    break
                except Exception:
                    logging.exception("Unable to connect to the Launchpad "
                                      "API.")
        # Try non official changelog location
        changelogs_uri_binary = \
            self._guess_third_party_changelogs_uri_by_binary(name)
        changelogs_uri_source = \
            self._guess_third_party_changelogs_uri_by_source(name)
        error_message = ""
        for changelogs_uri in [changelogs_uri_ppa,
                               changelogs_uri_binary,
                               changelogs_uri_source]:
            if changelogs_uri:
                try:
                    changelog = self._get_changelog_or_news(
                        name, "changelog", False, changelogs_uri)
                    self.all_changes[name] += changelog
                except (HTTPError, HttpsChangelogsUnsupportedError):
                    # no changelogs_uri or 404
                    error_message = _(
                        "This update does not come from a "
                        "source that supports changelogs.")
                except (IOError, BadStatusLine, socket.error):
                    # network errors and others
                    logging.exception("error on changelog fetching")
                    error_message = _(
                        "Failed to download the list of changes. \n"
                        "Please check your Internet connection.")
        self.all_changes[name] += error_message

    def get_changelog(self, name):
        " get the changelog file from the changelog location "
        origins = self[name].candidate.origins
        self.all_changes[name] = _("Changes for %s versions:\n"
                                   "Installed version: %s\n"
                                   "Available version: %s\n\n") % \
            (name, getattr(self[name].installed, "version", None),
             self[name].candidate.version)
        if self.CHANGELOG_ORIGIN not in [o.origin for o in origins]:
            self._fetch_changelog_for_third_party_package(name, origins)
            return
        # fixup epoch handling version
        srcpkg = self[name].candidate.source_name
        srcver_epoch = self[name].candidate.source_version.replace(':', '%3A')
        try:
            changelog = self._get_changelog_or_news(name, "changelog")
            if len(changelog) == 0:
                changelog = _("The changelog does not contain any relevant "
                              "changes.\n\n"
                              "Please use http://launchpad.net/ubuntu/+source/"
                              "%s/%s/+changelog\n"
                              "until the changes become available or try "
                              "again later.") % (srcpkg, srcver_epoch)
        except HTTPError:
            changelog = _("The list of changes is not available yet.\n\n"
                          "Please use http://launchpad.net/ubuntu/+source/"
                          "%s/%s/+changelog\n"
                          "until the changes become available or try again "
                          "later.") % (srcpkg, srcver_epoch)
        except (IOError, BadStatusLine, socket.error) as e:
            print("caught exception: ", e)
            changelog = _("Failed to download the list "
                          "of changes. \nPlease "
                          "check your Internet "
                          "connection.")
        self.all_changes[name] += changelog