File: //proc/self/root/usr/lib/python3/dist-packages/UpdateManager/backend/__init__.py
#!/usr/bin/env python
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
"""Integration of package managers into UpdateManager"""
# (c) 2005-2009 Canonical, GPL
from __future__ import absolute_import
import gi
gi.require_version('Snapd', '1')
from gi.repository import GLib, Gtk, Snapd
from apt import Cache
import json
import logging
import os
import re
import subprocess
from gettext import gettext as _
from threading import Thread
from UpdateManager.Core.MyCache import MyCache
from UpdateManager.Core.utils import inhibit_sleep, get_dist_version
from UpdateManager.Dialogs import Dialog
class InstallBackend(Dialog):
    ACTION_UPDATE = 0
    ACTION_PRE_INSTALL = 1
    ACTION_INSTALL = 2
    def __init__(self, window_main, action):
        Dialog.__init__(self, window_main)
        self.action = action
        self.sleep_cookie = None
    def start(self):
        os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
        # Do not suspend during the update process
        self.sleep_cookie = inhibit_sleep()
        if self.action == self.ACTION_PRE_INSTALL:
            unfresh_cache = self.window_main.cache
            fresh_cache = Cache(rootdir=self.window_main.cache.rootdir)
            # Install OEM packages, update, then do ACTION_INSTALL
            pkgs_install_oem = []
            pkgs_upgrade_oem = []
            for pkg in self.window_main.oem_metapackages:
                unfresh_pkg = unfresh_cache[pkg]
                fresh_pkg = fresh_cache[pkg]
                if (unfresh_pkg.marked_install
                        and not fresh_pkg.is_installed):
                    pkgs_install_oem.append(pkg)
                elif (unfresh_pkg.marked_upgrade
                      and fresh_pkg.is_upgradable):
                    pkgs_upgrade_oem.append(pkg)
            self.commit_oem(pkgs_install_oem, pkgs_upgrade_oem)
        elif self.action == self.ACTION_INSTALL:
            # Get the packages which should be installed and update
            pkgs_install = []
            pkgs_upgrade = []
            pkgs_remove = []
            # Get a fresh cache in case update-manager's is outdated to
            # skip operations that already took place
            fresh_cache = Cache(rootdir=self.window_main.cache.rootdir)
            for pkg in self.window_main.cache:
                try:
                    if pkg.marked_install \
                       and not fresh_cache[pkg.name].is_installed:
                        pkgname = pkg.name
                        if pkg.is_auto_installed:
                            pkgname += "#auto"
                        pkgs_install.append(pkgname)
                    elif (pkg.marked_upgrade
                          and fresh_cache[pkg.name].is_upgradable):
                        pkgs_upgrade.append(pkg.name)
                    elif (pkg.marked_delete
                          and fresh_cache[pkg.name].is_installed):
                        pkgs_remove.append(pkg.name)
                except KeyError:
                    # pkg missing from fresh_cache can't be modified
                    pass
            self.commit(pkgs_install, pkgs_upgrade, pkgs_remove)
        else:
            self.update()
    def update(self):
        """Run a update to refresh the package list"""
        raise NotImplementedError
    def commit_oem(self, pkgs_install_oem, pkgs_upgrade_oem):
        """ Install these OEM packages """
        self._action_done(self.ACTION_PRE_INSTALL,
                          authorized=True, success=True,
                          error_string=None, error_desc=None,
                          trans_failed=None)
    def commit(self, pkgs_install, pkgs_upgrade, pkgs_remove):
        """Commit the cache changes """
        raise NotImplementedError
    def get_snap_seeds(self):
        seeded_snaps = {}
        unseeded_snaps = {}
        curr_channel = "stable/ubuntu-" + get_dist_version()
        try:
            d2s_file = open(
                '/usr/share/ubuntu-release-upgrader/deb2snap.json', 'r')
            d2s = json.load(d2s_file)
            d2s_file.close()
            for snap in d2s["seeded"]:
                seed = d2s["seeded"][snap]
                deb = seed.get("deb", None)
                to_channel = seed.get("to_channel", curr_channel)
                seeded_snaps[snap] = (deb, to_channel)
            for snap in d2s["unseeded"]:
                unseed = d2s["unseeded"][snap]
                from_channel = unseed.get("from_channel", curr_channel)
                unseeded_snaps[snap] = (from_channel)
        except Exception as e:
            logging.debug("error reading deb2snap.json file (%s)" % e)
        return seeded_snaps, unseeded_snaps
    def get_deb2snap_dups(self):
        # update and grab the latest cache
        try:
            if self.window_main.cache is None:
                self.window_main.cache = MyCache(None)
            else:
                self.window_main.cache.open(None)
                self.window_main.cache._initDepCache()
            cache = self.window_main.cache
        except Exception as e:
            # just return an empty array for now, it's perfectly safe to
            # postpone this duplicates check to a later update.
            logging.debug("error reading cache (%s)" % e)
            return []
        duplicates = []
        seeded_snaps, _ = self.get_snap_seeds()
        for snap, (deb, _) in seeded_snaps.items():
            # if the deb is installed and was not manually installed,
            # replace it
            if (deb in cache and cache[deb].is_installed):
                deb_is_auto = True
                cache[deb].mark_delete()
                for pkg in cache.get_changes():
                    if (pkg.is_installed and pkg.marked_delete
                       and not pkg.is_auto_installed):
                        deb_is_auto = False
                        break
                cache.clear()
                if deb_is_auto:
                    duplicates.append(deb)
        return duplicates
    def get_snap_transitions(self):
        # populate snap_list with deb2snap transitions
        snap_list = {}
        seeded_snaps, unseeded_snaps = self.get_snap_seeds()
        for snap, (deb, to_channel) in seeded_snaps.items():
            snap_object = {}
            # check if the snap is already installed
            snap_info = subprocess.Popen(["snap", "info", snap],
                                         universal_newlines=True,
                                         stdout=subprocess.PIPE).communicate()
            if re.search("^installed: ", snap_info[0], re.MULTILINE):
                logging.debug("Snap %s is installed" % snap)
                continue
            elif (deb in self.window_main.duplicate_packages):
                # install the snap if the deb was just marked delete
                snap_object['command'] = 'install'
                snap_object['channel'] = to_channel
                snap_list[snap] = snap_object
        for snap, (from_channel) in unseeded_snaps.items():
            snap_object = {}
            # check if the snap is already installed
            snap_info = subprocess.Popen(["snap", "info", snap],
                                         universal_newlines=True,
                                         stdout=subprocess.PIPE).communicate()
            if re.search("^installed: ", snap_info[0], re.MULTILINE):
                logging.debug("Snap %s is installed" % snap)
                # its not tracking the release channel so don't remove
                re_channel = "stable/ubuntu-[0-9][0-9].[0-9][0-9]"
                if not re.search(r"^tracking:.*%s" % re_channel,
                                 snap_info[0], re.MULTILINE):
                    logging.debug("Snap %s is not tracking the release channel"
                                  % snap)
                    continue
                snap_object['command'] = 'remove'
                # check if this snap is being used by any other snaps
                conns = subprocess.Popen(["snap", "connections", snap],
                                         universal_newlines=True,
                                         stdout=subprocess.PIPE).communicate()
                for conn in conns[0].split('\n'):
                    conn_cols = conn.split()
                    if len(conn_cols) != 4:
                        continue
                    plug = conn_cols[1]
                    slot = conn_cols[2]
                    if slot.startswith(snap + ':'):
                        plug_snap = plug.split(':')[0]
                        if plug_snap != '-' and \
                           plug_snap not in unseeded_snaps:
                            logging.debug("Snap %s is being used by %s. "
                                          "Switching it to stable track"
                                          % (snap, plug_snap))
                            snap_object['command'] = 'refresh'
                            snap_object['channel'] = 'stable'
                            break
                snap_list[snap] = snap_object
        return snap_list
    def update_snap_cb(self, client, change, _, user_data):
        index, count, progress_bar = user_data
        if not progress_bar:
            return
        # determine how much of this change has been done
        task_total = 0
        task_done = 0
        for task in change.get_tasks():
            task_total += task.get_progress_total()
            task_done += task.get_progress_done()
        task_fraction = task_done / task_total
        # determine how much total progress has been made
        total_fraction = (task_fraction / count) + (index / count)
        # change.get_tasks() can increase between callbacks so we must
        # avoid jumping backward in progress here
        if total_fraction > progress_bar.get_fraction():
            GLib.idle_add(progress_bar.set_fraction, total_fraction)
    def update_snaps(self):
        # update status and progress bar
        def update_status(status):
            GLib.idle_add(self.label_details.set_label, status)
        def update_progress(progress_bar):
            progress_bar.pulse()
            return True
        update_status(_("Updating snaps"))
        progress_bar = None
        progress_timer = None
        progress_bars = self.progressbar_slot.get_children()
        if progress_bars and isinstance(progress_bars[0], Gtk.ProgressBar):
            progress_bar = progress_bars[0]
            progress_timer = GLib.timeout_add(100, update_progress,
                                              progress_bar)
        # populate snap_list with deb2snap transitions
        snap_list = self.get_snap_transitions()
        if progress_timer:
            GLib.source_remove(progress_timer)
            progress_bar.set_fraction(0)
        # (un)install (un)seeded snap(s)
        try:
            client = Snapd.Client()
            client.connect_sync()
            index = 0
            count = len(snap_list)
            for snap, snap_object in snap_list.items():
                command = snap_object['command']
                if command == 'refresh':
                    update_status(_("Refreshing %s snap" % snap))
                    client.refresh_sync(snap, snap_object['channel'],
                                        self.update_snap_cb,
                                        progress_callback_data=(index, count,
                                                                progress_bar))
                elif command == 'remove':
                    update_status(_("Removing %s snap" % snap))
                    client.remove_sync(snap, self.update_snap_cb,
                                       progress_callback_data=(index, count,
                                                               progress_bar))
                else:
                    update_status(_("Installing %s snap" % snap))
                    client.install_sync(snap, snap_object['channel'],
                                        self.update_snap_cb,
                                        progress_callback_data=(index, count,
                                                                progress_bar))
                index += 1
        except GLib.Error as e:
            logging.debug("error updating snaps (%s)" % e)
            GLib.idle_add(self.window_main.start_error, False,
                          _("Upgrade only partially completed."),
                          _("An error occurred while updating snaps. "
                            "Please check your network connection."))
            return
        # continue with the rest of the updates
        GLib.idle_add(self.window_main.start_available)
    def _action_done(self, action, authorized, success, error_string,
                     error_desc, trans_failed=False):
        # If the progress dialog should be closed automatically afterwards
        #settings = Gio.Settings.new("com.ubuntu.update-manager")
        #close_after_install = settings.get_boolean(
        #    "autoclose-install-window")
        # FIXME: confirm with mpt whether this should still be a setting
        #close_after_install = False
        if action == self.ACTION_PRE_INSTALL and success:
            # Now do the regular updates
            self.action = self.ACTION_INSTALL
            self.start()
        elif action == self.ACTION_INSTALL:
            if (success and os.path.exists("/usr/bin/snap")
               and hasattr(self, 'pane_update_progress')):
                Thread(target=self.update_snaps).start()
            elif success:
                self.window_main.start_available()
            elif error_string:
                self.window_main.start_error(trans_failed, error_string,
                                             error_desc)
            else:
                # exit gracefuly, we can't just exit as this will trigger
                # a crash if system.exit() is called in a exception handler
                GLib.timeout_add(1, self.window_main.exit)
        else:
            if error_string:
                self.window_main.start_error(True, error_string, error_desc)
            elif (success and os.path.exists("/usr/bin/snap")
                  and hasattr(self, 'pane_update_progress')):
                self.window_main.duplicate_packages = self.get_deb2snap_dups()
                self.window_main.start_available()
            else:
                is_cancelled_update = not success
                self.window_main.start_available(is_cancelled_update)
# try aptdaemon
if os.path.exists("/usr/sbin/aptd") \
   and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ:
    # check if the gtkwidgets are installed as well
    try:
        from .InstallBackendAptdaemon import InstallBackendAptdaemon
    except ImportError:
        logging.exception("importing aptdaemon")
# try synaptic
if os.path.exists("/usr/sbin/synaptic") \
   and "UPDATE_MANAGER_FORCE_BACKEND_APTDAEMON" not in os.environ:
    try:
        from .InstallBackendSynaptic import InstallBackendSynaptic
    except ImportError:
        logging.exception("importing synaptic")
def get_backend(*args, **kwargs):
    """Select and return a package manager backend."""
    # try aptdaemon
    if (os.path.exists("/usr/sbin/aptd")
            and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ):
        # check if the gtkwidgets are installed as well
        try:
            return InstallBackendAptdaemon(*args, **kwargs)
        except NameError:
            logging.exception("using aptdaemon failed")
    # try synaptic
    if (os.path.exists("/usr/sbin/synaptic")
            and "UPDATE_MANAGER_FORCE_BACKEND_APTDAEMON" not in os.environ):
        try:
            return InstallBackendSynaptic(*args, **kwargs)
        except NameError:
            pass
    # nothing found, raise
    raise Exception("No working backend found, please try installing "
                    "aptdaemon or synaptic")