File: //proc/self/root/bin/unattended-upgrades
#!/usr/bin/python3
# Copyright (c) 2005-2018 Canonical Ltd
#
# AUTHOR:
# Michael Vogt <mvo@ubuntu.com>
# Balint Reczey <rbalint@ubuntu.com>
# This file is part of unattended-upgrades
#
# unattended-upgrades is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or (at
# your option) any later version.
#
# unattended-upgrades is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with unattended-upgrades; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import atexit
import copy
import datetime
import errno
import email.charset
import fcntl
import fnmatch
import gettext
try:
from gi.repository.Gio import NetworkMonitor
except ImportError:
pass
import grp
import io
import locale
import logging
import logging.handlers
import re
import os
import select
import signal
import socket
import string
import subprocess
import sys
import syslog
try:
from typing import AbstractSet, cast, DefaultDict, Dict, Iterable, List
AbstractSet # pyflakes
DefaultDict # pyflakes
Dict # pyflakes
Iterable # pyflakes
List # pyflakes
from typing import Set, Tuple, Union
Set # pyflakes
Tuple # pyflakes
Union # pyflakes
except ImportError:
pass
from collections import defaultdict, namedtuple
from datetime import date
from email.message import Message
from gettext import gettext as _
from io import StringIO
from optparse import (
OptionParser,
SUPPRESS_HELP,
)
from subprocess import (
Popen,
PIPE,
)
from textwrap import wrap
import apt
import apt_inst
import apt_pkg
import distro_info
# the reboot required flag file used by packages
REBOOT_REQUIRED_FILE = "/var/run/reboot-required"
KEPT_PACKAGES_FILE = "var/lib/unattended-upgrades/kept-back"
MAIL_BINARY = "/usr/bin/mail"
SENDMAIL_BINARY = "/usr/sbin/sendmail"
USERS = "/usr/bin/users"
# no py3 lsb_release in debian :/
DISTRO_CODENAME = subprocess.check_output(
["lsb_release", "-c", "-s"], universal_newlines=True).strip() # type: str
DISTRO_DESC = subprocess.check_output(
["lsb_release", "-d", "-s"], universal_newlines=True).strip() # type: str
DISTRO_ID = subprocess.check_output(
["lsb_release", "-i", "-s"], universal_newlines=True).strip() # type: str
# Number of days before release of devel where we enable unattended
# upgrades.
DEVEL_UNTIL_RELEASE = datetime.timedelta(days=21)
# progress information is written here
PROGRESS_LOG = "/var/run/unattended-upgrades.progress"
PID_FILE = "/var/run/unattended-upgrades.pid"
LOCK_FILE = "/var/run/unattended-upgrades.lock"
# set from the sigint signal handler
SIGNAL_STOP_REQUEST = False
# messages to be logged only once
logged_msgs = set() # type: AbstractSet[str]
NEVER_PIN = -32768
class LoggingDateTime:
"""The date/time representation for the dpkg log file timestamps"""
LOG_DATE_TIME_FMT = "%Y-%m-%d %H:%M:%S"
@classmethod
def as_string(cls):
# type: () -> str
"""Return the current date and time as LOG_DATE_TIME_FMT string"""
return datetime.datetime.now().strftime(cls.LOG_DATE_TIME_FMT)
@classmethod
def from_string(cls, logstr):
# type: (str) -> datetime.datetime
"""Take a LOG_DATE_TIME_FMT string and return datetime object"""
return datetime.datetime.strptime(logstr, cls.LOG_DATE_TIME_FMT)
class UnknownMatcherError(ValueError):
pass
class NoAllowedOriginError(ValueError):
pass
PkgPin = namedtuple('PkgPin', ['pkg', 'priority'])
PkgFilePin = namedtuple('PkgFilePin', ['id', 'priority'])
class UnattendedUpgradesCache(apt.Cache):
def __init__(self, rootdir):
# type: (str) -> None
self._cached_candidate_pkgnames = set() # type: Set[str]
self.allowed_origins = get_allowed_origins()
logging.info(_("Allowed origins are: %s"),
", ".join(self.allowed_origins))
self.blacklist = apt_pkg.config.value_list(
"Unattended-Upgrade::Package-Blacklist")
logging.info(_("Initial blacklist: %s"), " ".join(self.blacklist))
self.whitelist = apt_pkg.config.value_list(
"Unattended-Upgrade::Package-Whitelist")
self.strict_whitelist = apt_pkg.config.find_b(
"Unattended-Upgrade::Package-Whitelist-Strict", False)
logging.info(_("Initial whitelist (%s): %s"),
"strict" if self.strict_whitelist else "not strict",
" ".join(self.whitelist))
apt.Cache.__init__(self, rootdir=rootdir)
# pre-heat lazy-loaded modules to avoid crash on python upgrade
datetime.datetime.strptime("", "")
# generate versioned_kernel_pkgs_regexp for later use
self.versioned_kernel_pkgs_regexp = versioned_kernel_pkgs_regexp()
self.running_kernel_pkgs_regexp = running_kernel_pkgs_regexp()
if self.versioned_kernel_pkgs_regexp:
logging.debug("Using %s regexp to find kernel packages",
self.versioned_kernel_pkgs_regexp.pattern)
else:
logging.debug("APT::VersionedKernelPackages is not set")
if self.running_kernel_pkgs_regexp:
logging.debug("Using %s regexp to find running kernel packages",
self.running_kernel_pkgs_regexp.pattern)
def find_better_version(self, pkg):
# type (apt.Package) -> apt.package.Version
if pkg.is_installed and pkg.versions[0] > pkg.installed:
logging.debug(
"Package %s has a higher version available, checking if it is "
"from an allowed origin and is not pinned down.", pkg.name)
for v in pkg.versions:
if pkg.installed < v \
and pkg.installed.policy_priority <= v.policy_priority \
and is_in_allowed_origin(v, self.allowed_origins):
return v
return None
def find_kept_packages(self, dry_run):
# type: (bool) -> KeptPkgs
""" Find kept packages not collected already """
kept_packages = KeptPkgs(set)
if dry_run:
logging.info(_("The list of kept packages can't be calculated in "
"dry-run mode."))
return kept_packages
for pkg in self:
better_version = self.find_better_version(pkg)
if better_version:
logging.info(self.kept_package_excuse(pkg._pkg,
self.blacklist,
self.whitelist,
self.strict_whitelist,
better_version))
kept_packages.add(pkg, better_version, self)
return kept_packages
def kept_package_excuse(self, pkg, # apt.Package
blacklist, # type: List[str]
whitelist, # type: List[str]
strict_whitelist, # type: bool
better_version # type: apt.package.Version
):
# type: (...) -> str
""" Log the excuse the package is kept back for """
if pkg.selected_state == apt_pkg.SELSTATE_HOLD:
return _("Package %s is marked to be held back.") % pkg.name
elif is_pkgname_in_blacklist(pkg.name, blacklist):
return _("Package %s is blacklisted.") % pkg.name
elif whitelist:
if strict_whitelist:
if not is_pkgname_in_whitelist(pkg.name, whitelist):
return (_(
"Package %s is not on the strict whitelist.")
% pkg.name)
else:
if not is_pkgname_in_whitelist(pkg.name, whitelist):
return (_(
"Package %s is not whitelisted and it is not a"
" dependency of a whitelisted package.")
% pkg.name)
elif not any([o.trusted for o in better_version.origins]):
return _("Package %s's origin is not trusted.") % pkg.name
return (_("Package %s is kept back because a related package"
" is kept back or due to local apt_preferences(5).")
% pkg.name)
def pinning_from_regex_list(self, regexps, priority):
# type: (List[str], int) -> List[PkgPin]
""" Represent blacklist as Python regexps as list of pkg pinnings"""
pins = [] # type: List[PkgPin]
for regex in regexps:
if python_regex_is_posix(regex):
pins.append(PkgPin('/^' + regex + '/', priority))
else:
# Python regex is not also an equivalent POSIX regexp.
# This is expected to be rare. Go through all the package names
# and pin all the matching ones.
for pkg in self._cache.packages:
if re.match(regex, pkg.name):
pins.append(PkgPin(pkg.name, priority))
return pins
def pinning_from_config(self):
# type: () -> List[Union[PkgPin, PkgFilePin]]
""" Represent configuration as list of pinnings
Assumes self.allowed_origins to be already set.
"""
pins = [] # type: List[Union[PkgPin, PkgFilePin]]
# mark not allowed origins with 'never' pin
for pkg_file in self._cache.file_list: # type: ignore
if not is_allowed_origin(pkg_file, self.allowed_origins):
# Set the magic 'never' pin on not allowed origins
logging.debug("Marking not allowed %s with %s pin", pkg_file,
NEVER_PIN)
pins.append(PkgFilePin(pkg_file.id, NEVER_PIN))
# TODO(rbalint) pin not trusted origins with NEVER_PIN
elif self.strict_whitelist:
# set even allowed origins to -1 and set individual package
# priorities up later
pins.append(PkgFilePin(pkg_file.id, -1))
# mark blacklisted packages with 'never' pin
pins.extend(self.pinning_from_regex_list( # type: ignore
self.blacklist, NEVER_PIN))
# set priority of whitelisted packages to high
pins.extend(self.pinning_from_regex_list( # type: ignore
self.whitelist, 900))
if self.strict_whitelist:
policy = self._depcache.policy
# pin down already pinned packages which are not on the whitelist
# to not install locally pinned up packages accidentally
for pkg in self._cache.packages:
if pkg.has_versions:
pkg_ver = policy.get_candidate_ver(pkg) # type: ignore
if pkg_ver is not None \
and policy.get_priority(pkg_ver) > -1:
# the pin is higher than set for allowed origins, thus
# there is extra pinning configuration
if not is_pkgname_in_whitelist(pkg.name,
self.whitelist):
pins.append(PkgPin(pkg.name, NEVER_PIN))
return pins
def apply_pinning(self, pins):
# type: (List[Union[PkgPin, PkgFilePin]]) -> None
""" Apply the list of pins """
policy = self._depcache.policy
pkg_files = {f.id: f for f in self._cache.file_list} # type: ignore
for pin in pins:
logging.debug("Applying pinning: %s" % str(pin))
if isinstance(pin, PkgPin):
policy.create_pin('Version', pin.pkg, '*', # type: ignore
pin.priority)
elif isinstance(pin, PkgFilePin):
logging.debug("Applying pin %s to package_file: %s"
% (pin.priority, str(pkg_files[pin.id])))
policy.set_priority(pkg_files[pin.id], # type: ignore
pin.priority)
def open(self, progress=None):
apt.Cache.open(self, progress)
# apply pinning generated from unattended-upgrades configuration
self.apply_pinning(self.pinning_from_config())
def adjust_candidate(self, pkg):
# type: (apt.Package) -> bool
""" Adjust origin and return True if adjustment took place
This is needed when e.g. a package is available in
the security pocket but there is also a package in the
updates pocket with a higher version number
"""
try:
new_cand = ver_in_allowed_origin(pkg, self.allowed_origins)
# Only adjust to lower versions to avoid flipping back and forth
# and to avoid picking a newer version, not selected by apt.
# This helps avoiding upgrades to experimental's packages.
if pkg.candidate is not None and new_cand < pkg.candidate:
logging.debug("adjusting candidate version: %s" % new_cand)
pkg.candidate = new_cand
return True
else:
return False
except NoAllowedOriginError:
return False
def call_checked(self, function, pkg, **kwargs):
""" Call function and check if package is in the wanted state
"""
try:
function(pkg, **kwargs)
except SystemError as e:
logging.warning(
_("package %s upgradable but fails to "
"be marked for upgrade (%s)"), pkg.name, e)
self.clear()
return False
return ((function == apt.package.Package.mark_upgrade
or function == apt.package.Package.mark_install)
and (pkg.marked_upgrade or pkg.marked_install))
def call_adjusted(self, function, pkg, **kwargs):
"""Call function, but with adjusting
packages in changes to come from allowed origins
Note that as a side effect more package's candidate can be
adjusted than only the one's in the final changes set.
"""
new_pkgs_to_adjust = [] # List[str]
if not is_pkg_change_allowed(pkg, self.blacklist, self.whitelist,
self.strict_whitelist):
return
if function == apt.package.Package.mark_upgrade \
and not pkg.is_upgradable:
if not apt_pkg.config.find_b("Unattended-Upgrade::Allow-downgrade",
False):
return
else:
function = apt.package.Package.mark_install
marking_succeeded = self.call_checked(function, pkg, **kwargs)
if not marking_succeeded or \
not check_changes_for_sanity(self, desired_pkg=pkg):
logging.debug("falling back to adjusting %s's dependencies"
% pkg.name)
self.clear()
# adjust candidates in advance if needed
for pkg_name in self._cached_candidate_pkgnames:
self.adjust_candidate(self[pkg_name])
self.adjust_candidate(pkg)
for dep in transitive_dependencies(pkg, self, level=1):
try:
self.adjust_candidate(self[dep])
except KeyError:
pass
self.call_checked(function, pkg, **kwargs)
for marked_pkg in self.get_changes():
if marked_pkg.name in self._cached_candidate_pkgnames:
continue
if not is_in_allowed_origin(marked_pkg.candidate,
self.allowed_origins):
try:
ver_in_allowed_origin(marked_pkg,
self.allowed_origins)
# important! this avoids downgrades below
if pkg.is_installed and not pkg.is_upgradable and \
apt_pkg.config.find_b("Unattended-Upgrade::Allow-"
"downgrade", False):
continue
new_pkgs_to_adjust.append(marked_pkg)
except NoAllowedOriginError:
pass
if new_pkgs_to_adjust:
new_pkg_adjusted = False
for pkg_to_adjust in new_pkgs_to_adjust:
if self.adjust_candidate(pkg_to_adjust):
self._cached_candidate_pkgnames.add(pkg_to_adjust.name)
new_pkg_adjusted = True
if new_pkg_adjusted:
self.call_adjusted(function, pkg, **kwargs)
def mark_upgrade_adjusted(self, pkg, **kwargs):
self.call_adjusted(apt.package.Package.mark_upgrade, pkg, **kwargs)
def mark_install_adjusted(self, pkg, **kwargs):
self.call_adjusted(apt.package.Package.mark_install, pkg, **kwargs)
class LogInstallProgress(apt.progress.base.InstallProgress):
""" Install progress that writes to self.progress_log
(/var/run/unattended-upgrades.progress by default)
"""
def __init__(self, logfile_dpkg, verbose=False,
progress_log="var/run/unattended-upgrades.progress"):
# type: (str, bool, str) -> None
apt.progress.base.InstallProgress.__init__(self)
self.logfile_dpkg = logfile_dpkg
self.progress_log = os.path.join(apt_pkg.config.find_dir("Dir"),
progress_log)
self.verbose = verbose
self.output_logfd = None # type: int
def status_change(self, pkg, percent, status):
# type: (str, float, str) -> None
with open(self.progress_log, "w") as f:
f.write(_("Progress: %s %% (%s)") % (percent, pkg))
def _fixup_fds(self):
# () -> None
required_fds = [0, 1, 2, # stdin, stdout, stderr
self.writefd,
self.write_stream.fileno(),
self.statusfd,
self.status_stream.fileno()
]
# ensure that our required fds close on exec
for fd in required_fds[3:]:
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)
# close all fds
proc_fd = "/proc/self/fd"
if os.path.exists(proc_fd):
error_count = 0
for fdname in os.listdir(proc_fd):
try:
fd = int(fdname)
except Exception:
print("ERROR: can not get fd for %s" % fdname)
if fd in required_fds:
continue
try:
os.close(fd)
# print("closed: ", fd)
except OSError as e:
# there will be one fd that can not be closed
# as its the fd from pythons internal diropen()
# so its ok to ignore one close error
error_count += 1
if error_count > 1:
print("ERROR: os.close(%s): %s" % (fd, e))
def _redirect_stdin(self):
# type: () -> None
REDIRECT_INPUT = os.devnull
fd = os.open(REDIRECT_INPUT, os.O_RDWR)
os.dup2(fd, 0)
def _redirect_output(self):
# type: () -> None
# do not create log in dry-run mode, just output to stdout/stderr
if not apt_pkg.config.find_b("Debug::pkgDPkgPM", False):
logfd = self._get_logfile_dpkg_fd()
os.dup2(logfd, 1)
os.dup2(logfd, 2)
def _get_logfile_dpkg_fd(self):
# type: () -> int
logfd = os.open(
self.logfile_dpkg, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0o640)
try:
adm_gid = grp.getgrnam("adm").gr_gid
os.fchown(logfd, 0, adm_gid)
except (KeyError, OSError):
pass
return logfd
def update_interface(self):
# type: () -> None
# call super class first
apt.progress.base.InstallProgress.update_interface(self)
self._do_verbose_output_if_needed()
def _do_verbose_output_if_needed(self):
# type: () -> None
# if we are in debug mode, nothing to be more verbose about
if apt_pkg.config.find_b("Debug::pkgDPkgPM", False):
return
# handle verbose
if self.verbose:
if self.output_logfd is None:
self.output_logfd = os.open(self.logfile_dpkg, os.O_RDONLY)
os.lseek(self.output_logfd, 0, os.SEEK_END)
try:
select.select([self.output_logfd], [], [], 0)
# FIXME: this should be OSError, but in py2.7 it is still
# select.error
except select.error as e:
if e.errno != errno.EINTR: # type: ignore
logging.exception("select failed")
# output to stdout in verbose mode only
os.write(1, os.read(self.output_logfd, 1024))
def _log_in_dpkg_log(self, msg):
# type: (str) -> None
logfd = self._get_logfile_dpkg_fd()
os.write(logfd, msg.encode("utf-8"))
os.close(logfd)
def finish_update(self):
# type: () -> None
self._log_in_dpkg_log("Log ended: %s\n\n"
% LoggingDateTime.as_string())
def fork(self):
# type: () -> int
self._log_in_dpkg_log("Log started: %s\n"
% LoggingDateTime.as_string())
pid = os.fork()
if pid == 0:
self._fixup_fds()
self._redirect_stdin()
self._redirect_output()
return pid
class Unlocked:
"""
Context manager for unlocking the apt lock while cache.commit() is run
"""
def __enter__(self):
# type: () -> None
try:
apt_pkg.pkgsystem_unlock_inner()
except Exception:
# earlier python-apt used to leak lock
logging.warning("apt_pkg.pkgsystem_unlock() failed due to not "
"holding the lock but trying to continue")
pass
def __exit__(self, exc_type, exc_value, exc_tb):
# type: (object, object, object) -> None
apt_pkg.pkgsystem_lock_inner()
class KeptPkgs(defaultdict):
"""
Packages to keep by highest allowed pretty-printed origin
"""
def add(self, pkg, # type: apt.Package
version, # type: apt.package.Version
cache # type: UnattendedUpgradesCache
):
# type: (...) -> None
for origin in version.origins:
if is_allowed_origin(origin, cache.allowed_origins):
self[origin.origin + " " + origin.archive].add(pkg.name)
return
class UnattendedUpgradesResult:
"""
Represent the (potentially partial) results of an unattended-upgrades
run
"""
def __init__(self,
success, # type: bool
result_str="", # type: str
pkgs=[], # type: List[str]
pkgs_kept_back=KeptPkgs(set), # type: KeptPkgs
pkgs_removed=[], # type: List[str]
pkgs_kept_installed=[], # type: List[str]
update_stamp=False # type: bool
):
# type: (...) -> None
self.success = success
self.result_str = result_str
self.pkgs = pkgs
self.pkgs_kept_back = pkgs_kept_back
self.pkgs_removed = pkgs_removed
self.pkgs_kept_installed = pkgs_kept_installed
self.update_stamp = update_stamp
def is_dpkg_journal_dirty():
# type: () -> bool
"""
Return True if the dpkg journal is dirty
(similar to debSystem::CheckUpdates)
"""
d = os.path.join(
os.path.dirname(apt_pkg.config.find_file("Dir::State::status")),
"updates")
for f in os.listdir(d):
if re.match("[0-9]+", f):
return True
return False
def signal_handler(signal, frame):
# type: (int, object) -> None
logging.warning("SIGTERM received, will stop")
global SIGNAL_STOP_REQUEST
SIGNAL_STOP_REQUEST = True
def log_once(msg):
# type: (str) -> None
global logged_msgs
if msg not in logged_msgs:
logging.info(msg)
logged_msgs.add(msg) # type: ignore
def should_stop():
# type: () -> bool
"""
Return True if u-u needs to stop due to signal received or due to the
system started to run on battery.
"""
if SIGNAL_STOP_REQUEST:
logging.warning("SIGNAL received, stopping")
return True
try:
if apt_pkg.config.find_b("Unattended-Upgrade::OnlyOnACPower", True) \
and subprocess.call("on_ac_power") == 1:
logging.warning("System is on battery power, stopping")
return True
except FileNotFoundError:
log_once(
_("Checking if system is running on battery is skipped. Please "
"install powermgmt-base package to check power status and skip "
"installing updates when the system is running on battery."))
if apt_pkg.config.find_b(
"Unattended-Upgrade::Skip-Updates-On-Metered-Connections", True):
try:
if NetworkMonitor.get_network_metered(
NetworkMonitor.get_default()):
logging.warning(_("System is on metered connection, stopping"))
return True
except NameError:
log_once(_("Checking if connection is metered is skipped. Please "
"install python3-gi package to detect metered "
"connections and skip downloading updates."))
return False
def substitute(line):
# type: (str) -> str
""" substitude known mappings and return a new string
Currently supported ${distro-release}
"""
mapping = {"distro_codename": get_distro_codename(),
"distro_id": get_distro_id()}
return string.Template(line).substitute(mapping)
def get_distro_codename():
# type: () -> str
return DISTRO_CODENAME
def get_distro_id():
# type: () -> str
return DISTRO_ID
def versioned_kernel_pkgs_regexp():
apt_versioned_kernel_pkgs = apt_pkg.config.value_list(
"APT::VersionedKernelPackages")
if apt_versioned_kernel_pkgs:
return re.compile("(" + "|".join(
["^" + p + "-[1-9][0-9]*\\.[0-9]+\\.[0-9]+-[0-9]+(-.+)?$"
for p in apt_versioned_kernel_pkgs]) + ")")
else:
return None
def running_kernel_pkgs_regexp():
apt_versioned_kernel_pkgs = apt_pkg.config.value_list(
"APT::VersionedKernelPackages")
if apt_versioned_kernel_pkgs:
running_kernel_version = subprocess.check_output(
["uname", "-r"], universal_newlines=True).rstrip()
kernel_escaped = re.escape(running_kernel_version)
try:
kernel_noflavor_escaped = re.escape(
re.match("[1-9][0-9]*\\.[0-9]+\\.[0-9]+-[0-9]+",
running_kernel_version)[0])
return re.compile("(" + "|".join(
[("^" + p + "-" + kernel_escaped + "$|^"
+ p + "-" + kernel_noflavor_escaped + "$")
for p in apt_versioned_kernel_pkgs]) + ")")
except TypeError:
# flavor could not be cut from version
return re.compile("(" + "|".join(
[("^" + p + "-" + kernel_escaped + "$")
for p in apt_versioned_kernel_pkgs]) + ")")
else:
return None
def get_allowed_origins_legacy():
# type: () -> List[str]
""" legacy support for old Allowed-Origins var """
allowed_origins = [] # type: List[str]
key = "Unattended-Upgrade::Allowed-Origins"
try:
for s in apt_pkg.config.value_list(key):
# if there is a ":" use that as seperator, else use spaces
if re.findall(r'(?<!\\):', s):
(distro_id, distro_codename) = re.split(r'(?<!\\):', s)
else:
(distro_id, distro_codename) = s.split()
# unescape "\:" back to ":"
distro_id = re.sub(r'\\:', ':', distro_id)
# escape "," (see LP: #824856) - can this be simpler?
distro_id = re.sub(r'([^\\]),', r'\1\\,', distro_id)
distro_codename = re.sub(r'([^\\]),', r'\1\\,', distro_codename)
# convert to new format
allowed_origins.append("o=%s,a=%s" % (substitute(distro_id),
substitute(distro_codename)))
except ValueError:
logging.error(_("Unable to parse %s." % key))
raise
return allowed_origins
def get_allowed_origins():
# type: () -> List[str]
""" return a list of allowed origins from apt.conf
This will take substitutions (like distro_id) into account.
"""
allowed_origins = get_allowed_origins_legacy()
key = "Unattended-Upgrade::Origins-Pattern"
try:
for s in apt_pkg.config.value_list(key):
allowed_origins.append(substitute(s))
except ValueError:
logging.error(_("Unable to parse %s." % key))
raise
return allowed_origins
def match_whitelist_string(whitelist, origin):
# type: (str, Union[apt.package.Origin, apt_pkg.PackageFile]) -> bool
"""
take a whitelist string in the form "origin=Debian,label=Debian-Security"
and match against the given python-apt origin. A empty whitelist string
never matches anything.
"""
whitelist = whitelist.strip()
if whitelist == "":
logging.warning("empty match string matches nothing")
return False
res = True
# make "\," the html quote equivalent
whitelist = whitelist.replace("\\,", "%2C")
for token in whitelist.split(","):
# strip and unquote the "," back
(what, value) = [s.strip().replace("%2C", ",")
for s in token.split("=")]
# logging.debug("matching %s=%s against %s" % (
# what, value, origin))
# support substitution here as well
value = substitute(value)
# first char is apt-cache policy output, send is the name
# in the Release file
if what in ("o", "origin"):
match = fnmatch.fnmatch(origin.origin, value)
elif what in ("l", "label"):
match = fnmatch.fnmatch(origin.label, value)
elif what in ("a", "suite", "archive"):
match = fnmatch.fnmatch(origin.archive, value)
elif what in ("c", "component"):
match = fnmatch.fnmatch(origin.component, value)
elif what in ("site",):
match = fnmatch.fnmatch(origin.site, value)
elif what in ("n", "codename",):
match = fnmatch.fnmatch(origin.codename, value)
else:
raise UnknownMatcherError(
"Unknown whitelist entry for matcher %s (token %s)" % (
what, token))
# update res
res = res and match
# logging.debug("matching %s=%s against %s" % (
# what, value, origin))
return res
def python_regex_is_posix(expression):
# type: (str) -> bool
""" Returns if the Python regex is also an equivalent POSIX regex """
return re.match("^[a-zA-Z0-9\\^\\$\\+\\.]*$", expression) is not None
def cache_commit(cache, # type: apt.Cache
logfile_dpkg, # type: str
verbose, # type: bool
iprogress=None, # type: apt.progress.base.InstallProgress
):
# type: (...) -> Tuple[bool, Exception]
"""Commit the changes from the given cache to the system"""
error = None
res = False
if iprogress is None:
iprogress = LogInstallProgress(logfile_dpkg, verbose)
try:
res = cache.commit(install_progress=iprogress)
cache.open()
except SystemError as e:
error = e
if verbose:
logging.exception("Exception happened during upgrade.")
cache.clear()
return res, error
def upgrade_normal(cache, logfile_dpkg, verbose):
# type: (apt.Cache, str, bool) -> bool
res, error = cache_commit(cache, logfile_dpkg, verbose)
if res:
logging.info(_("All upgrades installed"))
else:
logging.error(_("Installing the upgrades failed!"))
logging.error(_("error message: %s"), error)
logging.error(_("dpkg returned a error! See %s for details"),
logfile_dpkg)
return res
def upgrade_in_minimal_steps(cache, # type: UnattendedUpgradesCache
pkgs_to_upgrade, # type: List[str]
logfile_dpkg="", # type: str
verbose=False, # type: bool
):
# type: (...) -> bool
install_log = LogInstallProgress(logfile_dpkg, verbose)
res = True
# to upgrade contains the package names
to_upgrade = set(pkgs_to_upgrade)
for pkgname in upgrade_order(to_upgrade, cache):
# upgrade packages and dependencies in increasing expected size of
# package sets to upgrade/install together
if pkgname not in to_upgrade:
# pkg is upgraded in a previous set
continue
if should_stop():
return False
pkg = cache[pkgname]
try:
if pkg.is_upgradable \
or candidate_version_changed(pkg):
cache.mark_upgrade_adjusted(
pkg, from_user=not pkg.is_auto_installed)
elif not pkg.is_installed:
cache.mark_install_adjusted(pkg, from_user=False)
else:
continue
except Exception as e:
logging.warning(
_("package %s upgradable but fails to "
"be marked for upgrade (%s)"), pkgname, e)
cache.clear()
res = False
continue
# double check that we are not running into side effects like
# what could have been caused LP: #1020680
if not check_changes_for_sanity(cache):
logging.info("While building minimal partition: "
"cache has not allowed changes")
cache.clear()
continue
changes = [p.name for p in cache.get_changes()]
if not changes:
continue
# write progress log information
if len(pkgs_to_upgrade) > 0:
all_count = len(pkgs_to_upgrade)
remaining_count = all_count - len(to_upgrade)
percent = remaining_count / float(all_count * 100.0)
else:
percent = 100.0
install_log.status_change(pkg=",".join(changes),
percent=percent,
status="")
# apply changes
logging.debug("applying set %s" % changes)
res, error = cache_commit(cache, logfile_dpkg, verbose, install_log)
if error:
if verbose:
logging.exception("Exception happened during upgrade.")
logging.error(_("Installing the upgrades failed!"))
logging.error(_("error message: %s"), error)
logging.error(_("dpkg returned a error! See %s for details"),
logfile_dpkg)
return False
to_upgrade = to_upgrade - set(changes)
logging.debug("left to upgrade %s" % to_upgrade)
if len(to_upgrade) == 0:
logging.info(_("All upgrades installed"))
break
return res
def is_allowed_origin(origin, allowed_origins):
# type: (Union[apt.package.Origin, apt_pkg.PackageFile], List[str]) -> bool
# local origin is allowed by default
if origin.component == 'now' and origin.archive == 'now' and \
not origin.label and not origin.site:
return True
for allowed in allowed_origins:
if match_whitelist_string(allowed, origin):
return True
return False
def is_in_allowed_origin(ver, allowed_origins):
# type: (apt.package.Version, List[str]) -> bool
if not ver:
return False
for origin in ver.origins:
if is_allowed_origin(origin, allowed_origins):
return True
return False
def ver_in_allowed_origin(pkg, allowed_origins):
# type: (apt.Package, List[str]) -> apt.package.Version
for ver in pkg.versions:
if is_in_allowed_origin(ver, allowed_origins):
# leave as soon as we have the highest new candidate
return ver
raise NoAllowedOriginError()
def is_pkgname_in_blacklist(pkgname, blacklist):
# type: (str, List[str]) -> bool
for blacklist_regexp in blacklist:
if re.match(blacklist_regexp, pkgname):
logging.debug("skipping blacklisted package %s" % pkgname)
return True
return False
def is_pkgname_in_whitelist(pkgname, whitelist):
# type: (str, List[str]) -> bool
# a empty whitelist means the user does not want to use this feature
if not whitelist:
return True
for whitelist_regexp in whitelist:
if re.match(whitelist_regexp, pkgname):
logging.debug("only upgrading the following package %s" %
pkgname)
return True
return False
def is_pkg_change_allowed(pkg, blacklist, whitelist, strict_whitelist):
# type: (apt.Package, List[str], List[str], bool) -> bool
if is_pkgname_in_blacklist(pkg.name, blacklist):
logging.debug("pkg %s package has been blacklisted" % pkg.name)
return False
# a strict whitelist will not allow any changes not in the
# whitelist, most people will want the relaxed whitelist
# that whitelists a package but pulls in the package
# dependencies
if strict_whitelist and \
not is_pkgname_in_whitelist(pkg.name, whitelist):
logging.debug("pkg %s package is not whitelisted" %
pkg.name)
return False
if pkg._pkg.selected_state == apt_pkg.SELSTATE_HOLD:
logging.debug("pkg %s is on hold" % pkg.name)
return False
return True
def transitive_dependencies(pkg, # type: apt.Package
cache, # type: apt.Cache
acc=set(), # type AbstractSet[str]
valid_types=None, # type: AbstractSet[str]
level=None # type: int
):
# type (...) -> AbstractSet[str]
""" All (transitive) dependencies of the package
Note that alternative (|) dependencies are collected, too
"""
if not pkg.candidate or level is not None and level < 1:
return acc
for dep in pkg.candidate.dependencies:
for base_dep in dep:
if base_dep.name not in acc:
if not valid_types or base_dep.rawtype in valid_types:
acc.add(base_dep.name)
try:
transitive_dependencies(
cache[base_dep.name], cache, acc, valid_types,
level=(level - 1 if level is not None else None))
except KeyError:
pass
return acc
def upgrade_order(to_upgrade, cache):
# type: (AbstractSet[str], apt.Cache) -> List[str]
""" Sort pkg names by the expected number of other packages to be upgraded
with it. The calculation is not 100% accurate, it is an approximation.
"""
upgrade_set_sizes = {}
# calculate upgrade sets
follow_deps = {'Depends', 'PreDepends', 'Recommends'}
for pkgname in to_upgrade:
pkg = cache[pkgname]
upgrade_set_sizes[pkgname] = len(transitive_dependencies(
pkg, cache, valid_types=follow_deps).intersection(to_upgrade))
return sorted(upgrade_set_sizes, key=upgrade_set_sizes.get)
def check_changes_for_sanity(cache, desired_pkg=None):
# type: (UnattendedUpgradesCache, apt.Package) -> bool
sanity_check_result = sanity_problem(cache, desired_pkg)
if sanity_check_result is None:
return True
else:
logging.debug("sanity check failed for: %s : %s"
% (str({str(p.candidate) for p in cache.get_changes()}),
sanity_check_result))
return False
def sanity_problem(cache, desired_pkg):
# type: (UnattendedUpgradesCache, apt.Package) -> str
if cache._depcache.broken_count != 0:
return ("there are broken packages in the cache")
# If there are no packages to be installed they were kept back
if cache.install_count == 0:
return ("no package is selected to be upgraded or installed")
changes = cache.get_changes()
for pkg in changes:
if pkg.marked_delete:
return ("pkg %s is marked to be deleted" % pkg.name)
if pkg.marked_install or pkg.marked_upgrade:
# apt will never fallback from a trusted to a untrusted
# origin so its good enough if we have a single trusted one
if not any([o.trusted for o in pkg.candidate.origins]):
return ("pkg %s is not from a trusted origin" % pkg.name)
if not is_in_allowed_origin(pkg.candidate, cache.allowed_origins):
return ("pkg %s is not in an allowed origin" % pkg.name)
if not is_pkg_change_allowed(pkg,
cache.blacklist,
cache.whitelist,
cache.strict_whitelist):
return ("pkg %s is blacklisted or is not whitelisted"
% pkg.name)
# check if the package is unsafe to upgrade unattended
ignore_require_restart = apt_pkg.config.find_b(
"Unattended-Upgrade::IgnoreAppsRequireRestart", False)
upgrade_requires = pkg.candidate.record.get("Upgrade-Requires")
if pkg.marked_upgrade and ignore_require_restart is False \
and upgrade_requires == "app-restart":
return ("pkg %s requires app-restart, it is not safe to "
"upgrade it unattended")
# check that the package we want to upgrade is in the change set
if desired_pkg and desired_pkg not in changes:
return ("pkg %s to be marked for upgrade/install is not marked "
"accordingly" % desired_pkg.name)
return None
def is_deb(file):
# type: (str) -> bool
if file.endswith(".deb"):
return True
else:
return False
def pkgname_from_deb(debfile):
# type: (str) -> str
# FIXME: add error checking here
try:
control = apt_inst.DebFile(debfile).control.extractdata("control")
sections = apt_pkg.TagSection(control)
return sections["Package"]
except (IOError, SystemError) as e:
logging.error("failed to read deb file %s (%s)" % (debfile, e))
# dumb fallback
return debfile.split("_")[0]
def get_md5sum_for_file_in_deb(deb_file, conf_file):
# type: (str, str) -> str
dpkg_cmd = ["dpkg-deb", "--fsys-tarfile", deb_file]
tar_cmd = ["tar", "-x", "-O", "-f", "-", "." + conf_file]
md5_cmd = ["md5sum"]
dpkg_p = Popen(dpkg_cmd, stdout=PIPE)
tar_p = Popen(tar_cmd, stdin=dpkg_p.stdout, stdout=PIPE,
universal_newlines=True)
md5_p = Popen(md5_cmd, stdin=tar_p.stdout, stdout=PIPE,
universal_newlines=True)
pkg_md5sum = md5_p.communicate()[0].split()[0]
for __p in [dpkg_p, tar_p, md5_p]:
p = cast(Popen, __p)
p.stdout.close()
p.wait()
return pkg_md5sum
def get_md5sum_for_file_installed(conf_file, prefix):
# type: (str, str) -> str
try:
with open(prefix + conf_file, 'rb') as fb:
for hash_string in apt_pkg.Hashes(fb).hashes: # type: ignore
if hash_string.hashtype == 'MD5Sum':
return hash_string.hashvalue
return None
except IsADirectoryError:
# the package replaces a directory wih a configuration file
#
# if the package changed this way it is safe to assume that
# the transition happens without showing a prompt but if the admin
# created the directory the admin will need to resolve it after
# being notified about the unexpected prompt
logging.debug("found conffile %s is a directory on the system "
% conf_file)
return "dir"
except FileNotFoundError:
# if the local file got deleted by the admin thats ok but it may still
# trigger a conffile promp (see debian #788049)
logging.debug("conffile %s in missing on the system" % conf_file)
return ""
def map_conf_file(conf_file, conffiles):
# type: (str, Union[AbstractSet[str], Dict[str, str]]) -> str
"""Find respective conffile in a set of conffiles with some heuristics
"""
if conf_file in conffiles:
return conf_file
elif os.path.join(conf_file, os.path.basename(conf_file)) in conffiles:
# new /etc/foo may be old /etc/foo/foo, like in LP: #1822745
return os.path.join(conf_file, os.path.basename(conf_file))
elif os.path.dirname(conf_file) in conffiles:
# new /etc/foo/foo may be old /etc/foo, probably by accident
return os.path.dirname(conf_file)
# TODO: peek into package's dpkg-maintscript-helper mv_conffile usage
else:
return None
# prefix is *only* needed for the build-in tests
def conffile_prompt(destFile, prefix=""):
# type: (str, str) -> bool
logging.debug("check_conffile_prompt(%s)" % destFile)
pkgname = pkgname_from_deb(destFile)
# get the conffiles for the /var/lib/dpkg/status file
status_file = apt_pkg.config.find("Dir::State::status")
with open(status_file, "r") as f:
tagfile = apt_pkg.TagFile(f)
conffiles = ""
for section in tagfile:
if section.get("Package") == pkgname:
logging.debug("found pkg: %s" % pkgname)
if "Conffiles" in section:
conffiles = section.get("Conffiles")
break
# get conffile value from pkg, its ok if the new version
# does not have conffiles anymore
pkg_conffiles = set() # type: AbstractSet[str]
try:
deb = apt_inst.DebFile(destFile)
pkg_conffiles = set(deb.control.extractdata(
"conffiles").strip().decode("utf-8").split("\n"))
except SystemError as e:
print(_("Apt returned an error, exiting"))
print(_("error message: %s") % e)
logging.error(_("Apt returned an error, exiting"))
logging.error(_("error message: %s"), e)
raise
except LookupError as e:
logging.debug("No conffiles in deb %s (%s)" % (destFile, e))
if not pkg_conffiles:
return False
# Conffiles:
# /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c
dpkg_status_conffiles = {}
for line in conffiles.splitlines():
# ignore empty lines
line = line.strip()
if not line:
continue
# show what we do
logging.debug("conffile line: %s", line)
li = line.split()
conf_file = li[0]
md5 = li[1]
if len(li) > 2:
obs = li[2]
else:
obs = None
# ignore if conffile is obsolete
if obs == "obsolete":
continue
# ignore state "newconffile" until its clearer if there
# might be a dpkg prompt (LP: #936870)
if md5 == "newconffile":
continue
new_conf_file = map_conf_file(conf_file, pkg_conffiles)
if not new_conf_file:
logging.debug("%s not in package conffiles %s" % (
conf_file, pkg_conffiles))
continue
# record for later
dpkg_status_conffiles[conf_file] = md5
# test against the installed file, if the local file got deleted
# by the admin thats ok but it may still trigger a conffile prompt
# (see debian #788049)
current_md5 = get_md5sum_for_file_installed(conf_file, prefix)
logging.debug("current md5: %s" % current_md5)
# hashes are the same, no conffile prompt
if current_md5 == md5:
continue
# calculate md5sum from the deb (may take a bit)
pkg_md5sum = get_md5sum_for_file_in_deb(destFile, new_conf_file)
logging.debug("pkg_md5sum: %s" % pkg_md5sum)
# the md5sum in the deb is unchanged, this will not
# trigger a conffile prompt
if pkg_md5sum == md5:
continue
# if we made it to this point:
# current_md5 != pkg_md5sum != md5
# and that will trigger a conffile prompt, we can
# stop processing at this point and just return True
return True
# now check if there are conffiles in the pkg that where not there
# in the previous version in the dpkg status file
if pkg_conffiles:
for conf_file in pkg_conffiles:
old_conf_file = map_conf_file(conf_file, dpkg_status_conffiles)
if not old_conf_file:
pkg_md5sum = get_md5sum_for_file_in_deb(destFile, conf_file)
current_md5 = get_md5sum_for_file_installed(conf_file, prefix)
if current_md5 != "" and pkg_md5sum != current_md5:
return True
return False
def dpkg_conffile_prompt():
# type: () -> bool
if "DPkg::Options" not in apt_pkg.config:
return True
options = apt_pkg.config.value_list("DPkg::Options")
for option in options:
option = option.strip()
if option in ["--force-confold", "--force-confnew"]:
return False
return True
def rewind_cache(cache, pkgs_to_upgrade):
# type: (UnattendedUpgradesCache, List[apt.Package]) -> None
""" set the cache back to the state with packages_to_upgrade """
cache.clear()
for pkg2 in pkgs_to_upgrade:
cache.mark_install_adjusted(pkg2, from_user=not pkg2.is_auto_installed)
if cache.broken_count > 0:
raise AssertionError("rewind_cache created a broken cache")
def host():
# type: () -> str
return socket.getfqdn()
def wrap_indent(t, subsequent_indent=" "):
# type: (str, str) -> str
return "\n".join(wrap(t, break_on_hyphens=False,
subsequent_indent=subsequent_indent))
def setup_apt_listchanges(conf="/etc/apt/listchanges.conf"):
# type: (str) -> None
""" deal with apt-listchanges """
# apt-listchanges will always send a mail if there is a mail address
# set in the config regardless of the frontend used, so set it to
# mail if we have a sendmail and to none if not (as it appears to
# not check if sendmail is there or not), debian bug #579733
if os.path.exists(SENDMAIL_BINARY):
os.environ["APT_LISTCHANGES_FRONTEND"] = "mail"
else:
os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
def _send_mail_using_mailx(from_address, to_address, subject, body):
# type: (str, str, str, str) -> int
# ensure that the body is a byte stream and that we do not
# break on encoding errors (the default error mode is "strict")
encoded_body = body.encode(
locale.getpreferredencoding(False), errors="replace")
# we use a binary pipe to stdin to ensure we do not break on
# unicode encoding errors (e.g. because the user is running a
# ascii only system like the buildds)
mail = subprocess.Popen(
[MAIL_BINARY, "-r", from_address, "-s", subject, to_address],
stdin=subprocess.PIPE, universal_newlines=False)
mail.stdin.write(encoded_body)
mail.stdin.close()
ret = mail.wait()
return ret
def _send_mail_using_sendmail(from_address, to_address, subject, body):
# type: (str, str, str, str) -> int
# format as a proper mail
msg = Message()
msg['Subject'] = subject
msg['From'] = from_address
msg['To'] = to_address
msg['Auto-Submitted'] = "auto-generated"
# order is important here, Message() first, then Charset()
# then msg.set_charset()
charset = email.charset.Charset("utf-8")
charset.body_encoding = email.charset.QP # type: ignore
msg.set_payload(body, charset)
# and send it away
sendmail = subprocess.Popen(
[SENDMAIL_BINARY, "-oi", "-t"],
stdin=subprocess.PIPE, universal_newlines=True)
sendmail.stdin.write(msg.as_string())
sendmail.stdin.close()
ret = sendmail.wait()
return ret
def send_summary_mail(pkgs, # type: List[str]
res, # type: bool
result_str, # type: str
pkgs_kept_back, # type: KeptPkgs
pkgs_removed, # type: List[str]
pkgs_kept_installed, # type: List[str]
mem_log, # type: StringIO
dpkg_log_content, # type: str
):
# type: (...) -> None
""" send mail (if configured in Unattended-Upgrade::Mail) """
to_email = apt_pkg.config.find("Unattended-Upgrade::Mail", "")
if not to_email:
return
if not os.path.exists(MAIL_BINARY) and not os.path.exists(SENDMAIL_BINARY):
logging.error(_("No /usr/bin/mail or /usr/sbin/sendmail, "
"can not send mail. "
"You probably want to install the mailx package."))
return
# The admin may well wish to get a mail report regardless of what was done.
# This is now set by Unattended-Upgrade::MailReport values of:
# "always", "only-on-error" or "on-change"
# (you can achieve "never" by not setting Unattended-Upgrade::Mail).
# If this is not set, then set it using any legacy MailOnlyOnError
# setting (default True)
#
mail_opt = apt_pkg.config.find("Unattended-Upgrade::MailReport")
if (mail_opt == ""): # None set - map from legacy value
if apt_pkg.config.find_b("Unattended-Upgrade::MailOnlyOnError", False):
mail_opt = "only-on-error"
else:
mail_opt = "on-change"
# if the operation was successful and the user has requested to get
# mails only on errors, just exit here
if (res and (mail_opt == "only-on-error")):
return
# if the run was successful but nothing had to be done skip sending email
# unless the admin wants it anyway
if (((mail_opt != "always") and res and not pkgs and not pkgs_kept_back
and not pkgs_removed)):
return
# Check if reboot-required flag is present
reboot_flag_str = _(
"[reboot required]") if os.path.isfile(REBOOT_REQUIRED_FILE) else ""
# Check if packages are kept on hold
hold_flag_str = (_("[package on hold]") if pkgs_kept_back
or pkgs_kept_installed else "")
logging.debug("Sending mail to %s" % to_email)
subject = _(
"{hold_flag}{reboot_flag} unattended-upgrades result for "
"{machine}: {result}").format(
hold_flag=hold_flag_str, reboot_flag=reboot_flag_str,
machine=host(), result="SUCCESS" if res else "FAILURE").strip()
body = wrap_indent(_("Unattended upgrade result: %s") % result_str)
body += "\n\n"
if os.path.isfile(REBOOT_REQUIRED_FILE):
body += _(
"Warning: A reboot is required to complete this upgrade, "
"or a previous one.\n\n")
if pkgs:
if res:
body += _("Packages that were upgraded:\n")
else:
body += _("Packages that attempted to upgrade:\n")
body += " " + wrap_indent(" ".join(pkgs))
body += "\n\n"
if pkgs_kept_back:
body += _("Packages with upgradable origin but kept back:\n")
for origin, origin_pkgs in pkgs_kept_back.items():
body += " " + origin + ":\n"
body += " " + wrap_indent(" ".join(origin_pkgs),
subsequent_indent=" ") + "\n"
body += "\n"
if pkgs_removed:
body += _("Packages that were auto-removed:\n")
body += " " + wrap_indent(" ".join(pkgs_removed))
body += "\n\n"
if pkgs_kept_installed:
body += _("Packages that were kept from being auto-removed:\n")
body += " " + wrap_indent(" ".join(pkgs_kept_installed))
body += "\n\n"
if dpkg_log_content:
body += _("Package installation log:") + "\n"
body += dpkg_log_content
body += "\n\n"
body += _("Unattended-upgrades log:\n")
body += mem_log.getvalue()
from_email = apt_pkg.config.find("Unattended-Upgrade::Sender", "root")
if os.path.exists(SENDMAIL_BINARY):
ret = _send_mail_using_sendmail(from_email, to_email, subject, body)
elif os.path.exists(MAIL_BINARY):
ret = _send_mail_using_mailx(from_email, to_email, subject, body)
else:
raise AssertionError(
"This should never be reached as we previously validated that we "
"either have sendmail or mailx. Maybe they've been removed in "
"this right moment?")
logging.debug("mail returned: %s", ret)
def do_install(cache, # type: UnattendedUpgradesCache
pkgs_to_upgrade, # type: List[str]
options, # type: Options
logfile_dpkg, # type: str
):
# type: (...) -> bool
setup_apt_listchanges()
logging.info(_("Writing dpkg log to %s"), logfile_dpkg)
if cache.get_changes():
cache.clear()
pkg_install_success = False
try:
if options.minimal_upgrade_steps:
# try upgrade all "pkgs" in minimal steps
pkg_install_success = upgrade_in_minimal_steps(
cache, pkgs_to_upgrade,
logfile_dpkg,
options.verbose or options.debug)
else:
mark_pkgs_to_upgrade(cache, pkgs_to_upgrade)
pkg_install_success = upgrade_normal(
cache, logfile_dpkg, options.verbose or options.debug)
except Exception as e:
# print unhandled exceptions here this way, while stderr is redirected
os.write(2, ("Exception: %s\n" % e).encode('utf-8'))
pkg_install_success = False
return pkg_install_success
def _setup_alternative_rootdir(rootdir):
# type: (str) -> None
# clear system unattended-upgrade stuff
apt_pkg.config.clear("Unattended-Upgrade")
# read rootdir (taken from apt.Cache, but we need to run it
# here before the cache gets initialized
if os.path.exists(rootdir + "/etc/apt/apt.conf"):
apt_pkg.read_config_file(apt_pkg.config,
rootdir + "/etc/apt/apt.conf")
if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"):
apt_pkg.read_config_dir(apt_pkg.config,
rootdir + "/etc/apt/apt.conf.d")
logdir = os.path.join(rootdir, "var", "log", "unattended-upgrades")
if not os.path.exists(logdir):
os.makedirs(logdir)
apt.apt_pkg.config.set("Unattended-Upgrade::LogDir", logdir)
def _get_logdir():
# type: () -> str
logdir = apt_pkg.config.find_dir(
"Unattended-Upgrade::LogDir",
# COMPAT only
apt_pkg.config.find_dir("APT::UnattendedUpgrades::LogDir",
"/var/log/unattended-upgrades/"))
return logdir
def _setup_logging(options):
# type: (Options) -> StringIO
# ensure this is run only once
if len(logging.root.handlers) > 0:
return None
# init the logging
logdir = _get_logdir()
logfile = os.path.join(
logdir,
apt_pkg.config.find(
"Unattended-Upgrade::LogFile",
# COMPAT only
apt_pkg.config.find("APT::UnattendedUpgrades::LogFile",
"unattended-upgrades.log")))
if not options.dry_run and not os.path.exists(logdir):
os.makedirs(logdir)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
filename=logfile)
# additional logging
logger = logging.getLogger()
mem_log = StringIO()
if options.apt_debug:
apt_pkg.config.set("Debug::pkgProblemResolver", "1")
apt_pkg.config.set("Debug::pkgDepCache::AutoInstall", "1")
if options.debug:
logger.setLevel(logging.DEBUG)
stdout_handler = logging.StreamHandler(sys.stdout)
logger.addHandler(stdout_handler)
elif options.verbose:
logger.setLevel(logging.INFO)
stdout_handler = logging.StreamHandler(sys.stdout)
logger.addHandler(stdout_handler)
if apt_pkg.config.find("Unattended-Upgrade::Mail", ""):
mem_log_handler = logging.StreamHandler(mem_log)
logger.addHandler(mem_log_handler)
# Configure syslog if necessary
syslogEnable = apt_pkg.config.find_b("Unattended-Upgrade::SyslogEnable",
False)
if syslogEnable:
syslogFacility = apt_pkg.config.find(
"Unattended-Upgrade::SyslogFacility",
"daemon")
syslogHandler = logging.handlers.SysLogHandler(
address='/dev/log',
facility=syslogFacility) # type: ignore
syslogHandler.setFormatter(
logging.Formatter("unattended-upgrade: %(message)s"))
known = syslogHandler.facility_names.keys() # type: ignore
if syslogFacility.lower() in known:
logger.addHandler(syslogHandler)
logging.info("Enabled logging to syslog via %s facility "
% syslogFacility)
else:
logging.warning("Syslog facility %s was not found"
% syslogFacility)
return mem_log
def logged_in_users():
# type: () -> AbstractSet[str]
"""Return a list of logged in users"""
# the "users" command always returns a single line with:
# "user1, user1, user2"
users = subprocess.check_output(
USERS, universal_newlines=True).rstrip('\n')
return set(users.split())
def reboot_if_requested_and_needed():
# type: () -> None
"""auto-reboot (if required and the config for this is set)"""
if not os.path.exists(REBOOT_REQUIRED_FILE):
return
if not apt_pkg.config.find_b(
"Unattended-Upgrade::Automatic-Reboot", False):
return
# see if we need to check for logged in users
if not apt_pkg.config.find_b(
"Unattended-Upgrade::Automatic-Reboot-WithUsers", True):
users = logged_in_users()
if users:
msg = gettext.ngettext(
"Found %s, but not rebooting because %s is logged in." % (
REBOOT_REQUIRED_FILE, users),
"Found %s, but not rebooting because %s are logged in." % (
REBOOT_REQUIRED_FILE, users),
len(users))
logging.warning(msg)
return
# reboot at the specified time
when = apt_pkg.config.find(
"Unattended-Upgrade::Automatic-Reboot-Time", "now")
logging.warning("Found %s, rebooting" % REBOOT_REQUIRED_FILE)
cmd = ["/sbin/shutdown", "-r", when]
try:
shutdown_msg = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if shutdown_msg.strip():
logging.warning("Shutdown msg: %s", shutdown_msg.strip())
except Exception as e:
logging.error("Failed to issue shutdown: %s", e)
def write_stamp_file():
# type: () -> None
statedir = os.path.join(apt_pkg.config.find_dir("Dir::State"), "periodic")
if not os.path.exists(statedir):
os.makedirs(statedir)
with open(os.path.join(statedir, "unattended-upgrades-stamp"), "w"):
pass
def try_to_upgrade(pkg, # type: apt.Package
pkgs_to_upgrade, # type: List[apt.Package]
cache, # type: UnattendedUpgradesCache
):
# type: (...) -> None
try:
try:
# try to adjust pkg itself first, if that throws an exception it
# can't be upgraded on its own
cache.adjust_candidate(pkg)
if not pkg.is_upgradable and not apt_pkg.config.find_b(
"Unattended-Upgrade::Allow-downgrade", False):
return
except NoAllowedOriginError:
return
cache._cached_candidate_pkgnames.add(pkg.name)
cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed)
if check_changes_for_sanity(cache, pkg):
# add to packages to upgrade
pkgs_to_upgrade.append(pkg)
else:
rewind_cache(cache, pkgs_to_upgrade)
except (SystemError, NoAllowedOriginError) as e:
# can't upgrade
logging.warning(
_("package %s upgradable but fails to "
"be marked for upgrade (%s)"), pkg.name, e)
rewind_cache(cache, pkgs_to_upgrade)
def candidate_version_changed(pkg, # type: apt.Package
):
return (pkg.is_installed and pkg.candidate
and pkg.candidate.version != pkg.installed.version
and apt_pkg.config.find_b(
'Unattended-Upgrade::Allow-downgrade', False))
def calculate_upgradable_pkgs(cache, # type: UnattendedUpgradesCache
options, # type: Options
):
# type: (...) -> List[apt.Package]
pkgs_to_upgrade = [] # type: List[apt.Package]
# now do the actual upgrade
for pkg in cache:
if options.debug and pkg.is_upgradable \
or candidate_version_changed(pkg):
logging.debug("Checking: %s (%s)" % (
pkg.name, getattr(pkg.candidate, "origins", [])))
if (pkg.is_upgradable or candidate_version_changed(pkg)
and is_pkgname_in_whitelist(pkg.name, cache.whitelist)):
try:
ver_in_allowed_origin(pkg, cache.allowed_origins)
except NoAllowedOriginError:
continue
try_to_upgrade(pkg,
pkgs_to_upgrade,
cache)
if cache.get_changes():
cache.clear()
return pkgs_to_upgrade
def get_dpkg_log_content(logfile_dpkg, install_start_time):
# type: (str, datetime.datetime) -> str
logging.debug("Extracting content from %s since %s" % (
logfile_dpkg, install_start_time))
content = []
found_start = False
try:
with io.open(logfile_dpkg, encoding='utf-8', errors='replace') as fp:
# read until we find the last "Log started: "
for line in fp.readlines():
# scan for the first entry we need (minimal-step mode
# creates a new stanza for each individual install)
if not found_start and line.startswith("Log started: "):
stanza_start = LoggingDateTime.from_string(
line[len("Log started: "):-1])
if stanza_start >= install_start_time:
found_start = True
if found_start:
# skip progress indicator until #860931 is fixed in apt
# and dpkg
if re.match(
"^\\(Reading database \\.\\.\\. ()|([0-9]+%)$",
line):
continue
content.append(line)
return "".join(content)
except FileNotFoundError:
return ""
def get_auto_removable(cache):
# type: (apt.Cache) -> AbstractSet[str]
return {pkg.name for pkg in cache
if pkg.is_auto_removable}
def is_autoremove_valid(cache, # type: UnattendedUpgradesCache
pkgname, # type: str
auto_removable, # type: AbstractSet[str]
):
# type: (...) -> bool
changes = cache.get_changes()
if not changes:
# package is already removed
return True
pkgnames = {pkg.name for pkg in changes}
for pkg in changes:
if not is_pkg_change_allowed(pkg, cache.blacklist, cache.whitelist,
cache.strict_whitelist):
logging.warning(
_("Keeping the following auto-removable package(s) because "
"they include %s which is set to be kept unmodified: %s"),
pkg.name, " ".join(sorted(pkgnames)))
return False
if not pkgnames.issubset(auto_removable):
if pkgname != "":
logging.warning(
_("Keeping auto-removable %s package(s) because it would"
" also remove the following packages which should "
"be kept in this step: %s"), pkgname,
" ".join(sorted(pkgnames - auto_removable)))
else:
logging.warning(
_("Keeping %s auto-removable package(s) because it would"
" also remove the following packages which should "
"be kept in this step: %s"), len(auto_removable),
" ".join(sorted(pkgnames - auto_removable)))
return False
for packagename in pkgnames:
if cache.running_kernel_pkgs_regexp and \
cache.running_kernel_pkgs_regexp.match(packagename):
logging.warning(
_("Keeping the following auto-removable package(s) because "
"they include %s which package is related to the running "
"kernel: %s"), packagename, " ".join(sorted(pkgnames)))
return False
if cache.install_count > 0:
logging.error(
"The following packages are marked for installation or upgrade "
"which is not allowed when performing autoremovals: %s",
" ".join([pkg.name for pkg in changes if not pkg.marked_delete]))
return False
return True
def do_auto_remove(cache, # type: UnattendedUpgradesCache
auto_removable, # type: AbstractSet[str]
logfile_dpkg, # type: str
minimal_steps, # type: bool
verbose=False, # type: bool
dry_run=False # type: bool
):
# type: (...) -> Tuple[bool, List[str], List[str]]
res = True
if not auto_removable:
return (res, [], [])
pkgs_removed = [] # type: List[str]
pkgs_kept_installed = [] # type: List[str]
if minimal_steps:
for pkgname in auto_removable:
if should_stop():
pkgs_kept_installed = list(auto_removable - set(pkgs_removed))
return (False, pkgs_removed, pkgs_kept_installed)
logging.debug("marking %s for removal" % pkgname)
if pkgname in pkgs_removed:
continue
cache[pkgname].mark_delete()
if not is_autoremove_valid(cache, pkgname, auto_removable):
# this situation can occur when removing newly unused packages
# would also remove old unused packages which are not set
# for removal, thus getting there is not handled as an error
pkgs_kept_installed.append(pkgname)
cache.clear()
continue
if not dry_run:
changes = cache.get_changes()
pkgnames = {pkg.name for pkg in changes}
res, error = cache_commit(cache, logfile_dpkg, verbose)
if not res:
break
pkgs_removed.extend(pkgnames)
else:
cache.clear()
else:
for pkgname in auto_removable:
cache[pkgname].mark_delete()
if is_autoremove_valid(cache, "", auto_removable):
# do it in one step
if not dry_run:
res, error = cache_commit(cache, logfile_dpkg, verbose)
else:
cache.clear()
else:
cache.clear()
if res:
logging.info(_("Packages that were successfully auto-removed: %s"),
" ".join(sorted(pkgs_removed)))
logging.info(_("Packages that are kept back: %s"),
" ".join(sorted(pkgs_kept_installed)))
if not res:
cache.clear()
logging.error(_("Auto-removing the packages failed!"))
logging.error(_("Error message: %s"), error)
logging.error(_("dpkg returned an error! See %s for details"),
logfile_dpkg)
return (res, pkgs_removed, pkgs_kept_installed)
def clean_downloaded_packages(fetcher):
# type: (apt_pkg.Acquire) -> None
archivedir = os.path.dirname(
apt_pkg.config.find_dir("Dir::Cache::archives"))
for item in fetcher.items:
if os.path.dirname(os.path.abspath(item.destfile)) == archivedir:
try:
os.unlink(item.destfile)
except OSError:
pass
def is_update_day():
# type: () -> bool
# check if patch days are configured
patch_days = apt_pkg.config.value_list("Unattended-Upgrade::Update-Days")
if not patch_days:
return True
# validate patch days
today = date.today()
# abbreviated localized dayname
if today.strftime("%a") in patch_days:
return True
# full localized dayname
if today.strftime("%A") in patch_days:
return True
# by number (Sun: 0, Mon: 1, ...)
if today.strftime("%w") in patch_days:
return True
# today is not a patch day
logging.info(
"Skipping update check: today is %s,%s,%s but patch days are %s",
today.strftime("%w"), today.strftime("%a"), today.strftime("%A"),
", ".join(patch_days))
return False
def update_kept_pkgs_file(kept_pkgs, kept_file):
# type: (DefaultDict[str, List[str]], str) -> None
if kept_pkgs:
pkgs_all_origins = set()
for origin_pkgs in kept_pkgs.values():
pkgs_all_origins.update(origin_pkgs)
try:
with open(kept_file, "w") as kf:
kf.write(" ".join(sorted(pkgs_all_origins)))
except FileNotFoundError:
logging.error(_("Could not open %s for saving list of packages "
"kept back." % kept_file))
else:
if os.path.exists(kept_file):
os.remove(kept_file)
def main(options, rootdir="/"):
# type: (Options, str) -> int
# useful for testing
if not rootdir == "/":
_setup_alternative_rootdir(rootdir)
# see debian #776752
install_start_time = datetime.datetime.now().replace(microsecond=0)
# setup logging
mem_log = _setup_logging(options)
# get log
logfile_dpkg = os.path.join(_get_logdir(), 'unattended-upgrades-dpkg.log')
if not os.path.exists(logfile_dpkg):
with open(logfile_dpkg, 'w'):
pass
# lock for the shutdown check
shutdown_lock = apt_pkg.get_lock(LOCK_FILE)
if shutdown_lock < 0:
logging.error("Lock file is already taken, exiting")
return 1
try:
res = run(options, rootdir, mem_log, logfile_dpkg,
install_start_time)
if res.success and res.result_str:
# complete, successful run
update_kept_pkgs_file(res.pkgs_kept_back,
os.path.join(rootdir, KEPT_PACKAGES_FILE))
if res.result_str and not options.dry_run:
# there is some meaningful result which is worth an email
log_content = get_dpkg_log_content(logfile_dpkg,
install_start_time)
send_summary_mail(res.pkgs, res.success, res.result_str,
res.pkgs_kept_back, res.pkgs_removed,
res.pkgs_kept_installed, mem_log,
log_content)
if res.update_stamp:
# write timestamp file
write_stamp_file()
if not options.dry_run:
# check if the user wants a reboot
reboot_if_requested_and_needed()
os.close(shutdown_lock)
if res.success:
return 0
else:
return 1
except Exception as e:
logger = logging.getLogger()
logger.exception(_("An error occurred: %s"), e)
log_content = get_dpkg_log_content(logfile_dpkg,
install_start_time)
if not options.dry_run:
send_summary_mail(["<unknown>"], False, _("An error occurred"),
None, [], [], mem_log, log_content)
# Re-raise exceptions for apport
raise
def mark_pkgs_to_upgrade(cache, pkgs_to_upgrade):
# type (apt.Cache, List[str]) -> None
for pkg_name in pkgs_to_upgrade:
pkg = cache[pkg_name]
if pkg.is_upgradable \
or (pkg.is_installed
and pkg.candidate.version != pkg.installed.version) \
and apt_pkg.config.find_b("Unattended-Upgrade::Allow-downgrade",
False):
cache.mark_upgrade_adjusted(pkg,
from_user=not pkg.is_auto_installed)
elif not pkg.is_installed:
cache.mark_install_adjusted(pkg, from_user=False)
def run(options, # type: Options
rootdir, # type: str
mem_log, # type: StringIO
logfile_dpkg, # type: str
install_start_time, # type: datetime.datetime
):
# type: (...) -> UnattendedUpgradesResult
# check if today is a patch day
if not is_update_day():
return UnattendedUpgradesResult(True)
# check if u-u should be stopped already
if should_stop():
return UnattendedUpgradesResult(False)
# check to see if want to auto-upgrade the devel release
if apt_pkg.config.find("Unattended-Upgrade::DevRelease") == "auto":
try:
if DISTRO_ID.lower() == 'ubuntu':
devel = (distro_info.UbuntuDistroInfo() .
devel(result="object"))
elif DISTRO_ID.lower() == 'debian':
devel = (distro_info.DebianDistroInfo() .
devel(result="object"))
else:
devel = (distro_info.DistroInfo(DISTRO_ID) .
devel(result="object"))
except Exception as e:
logging.warning("Could not figure out development release: %s" % e)
else:
if ((devel.series == DISTRO_CODENAME
and devel.release is not None
and devel.release - date.today() > DEVEL_UNTIL_RELEASE)):
syslog.syslog((_("Not running on this development "
"release before %s") %
(devel.release - DEVEL_UNTIL_RELEASE
- datetime.timedelta(days=1))))
logging.warning(_("Not running on this development "
"release before %s") %
(devel.release - DEVEL_UNTIL_RELEASE
- datetime.timedelta(days=1)))
return UnattendedUpgradesResult(True)
logging.debug("Running on the development release")
elif "(development branch)" in DISTRO_DESC and not\
apt_pkg.config.find_b("Unattended-Upgrade::DevRelease", True):
syslog.syslog(_("Not running on the development release."))
logging.info(_("Not running on the development release."))
return UnattendedUpgradesResult(True)
logging.info(_("Starting unattended upgrades script"))
# check and get lock
try:
apt_pkg.pkgsystem_lock()
except SystemError:
logging.error(_("Lock could not be acquired (another package "
"manager running?)"))
print(_("Cache lock can not be acquired, exiting"))
return UnattendedUpgradesResult(
False, _("Lock could not be acquired"))
# check if the journal is dirty and if so, take emergceny action
# the alternative is to leave the system potentially unsecure until
# the user comes in and fixes
if is_dpkg_journal_dirty() and \
apt_pkg.config.find_b("Unattended-Upgrade::AutoFixInterruptedDpkg",
True):
logging.warning(
_("Unclean dpkg state detected, trying to correct"))
print(_("Unclean dpkg state detected, trying to correct"))
env = copy.copy(os.environ)
env["DPKG_FRONTEND_LOCKED"] = "1"
try:
with Unlocked():
output = subprocess.check_output(
["dpkg", "--force-confold", "--configure", "-a"],
env=env,
universal_newlines=True)
except subprocess.CalledProcessError as e:
output = e.output
logging.warning(_("dpkg --configure -a output:\n%s"), output)
# get a cache
try:
cache = UnattendedUpgradesCache(rootdir=rootdir)
except SystemError as error:
print(_("Apt returned an error, exiting"))
print(_("error message: %s") % error)
logging.error(_("Apt returned an error, exiting"))
logging.error(_("error message: %s"), error)
return UnattendedUpgradesResult(
False, _("Apt returned an error, exiting"))
if cache._depcache.broken_count > 0:
print(_("Cache has broken packages, exiting"))
logging.error(_("Cache has broken packages, exiting"))
return UnattendedUpgradesResult(
False, _("Cache has broken packages, exiting"))
# FIXME: make this into a ContextManager
# be nice when calculating the upgrade as its pretty CPU intensive
old_priority = os.nice(0)
try:
# Check that we will be able to restore the priority
os.nice(-1)
os.nice(20)
except OSError as e:
if e.errno in (errno.EPERM, errno.EACCES):
pass
else:
raise
auto_removable = get_auto_removable(cache)
# find out about the packages that are upgradable (in an allowed_origin)
pkgs_to_upgrade = calculate_upgradable_pkgs(cache, options)
pkgs_to_upgrade.sort(key=lambda p: p.name)
pkgs = [pkg.name for pkg in pkgs_to_upgrade]
logging.debug("pkgs that look like they should be upgraded: %s"
% "\n".join(pkgs))
# FIXME: make this into a ContextManager
# stop being nice
os.nice(old_priority - os.nice(0))
# download what looks good
mark_pkgs_to_upgrade(cache, pkgs)
if options.debug:
fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress())
else:
fetcher = apt_pkg.Acquire()
list = apt_pkg.SourceList()
list.read_main_list()
recs = cache._records
pm = apt_pkg.PackageManager(cache._depcache)
# don't start downloading during shutdown
# TODO: download files one by one and check for stop request after each of
# them
if should_stop():
return UnattendedUpgradesResult(False, _("Upgrade was interrupted"))
try:
pm.get_archives(fetcher, list, recs)
except SystemError as e:
logging.error(_("GetArchives() failed: %s"), e)
try:
res = fetcher.run()
logging.debug("fetch.run() result: %s", res)
except SystemError as e:
logging.error("fetch.run() result: %s", e)
if options.download_only:
return UnattendedUpgradesResult(True)
if cache.get_changes():
cache.clear()
pkg_conffile_prompt = False
if dpkg_conffile_prompt():
# now check the downloaded debs for conffile conflicts and build
# a blacklist
conffile_blacklist = [] # type: List[str]
for item in fetcher.items:
logging.debug("%s" % item)
if item.status == item.STAT_ERROR:
print(_("An error occurred: %s") % item.error_text)
logging.error(_("An error occurred: %s"), item.error_text)
if not item.complete:
print(_("The URI %s failed to download, aborting") %
item.desc_uri)
logging.error(_("The URI %s failed to download, aborting"),
item.desc_uri)
return UnattendedUpgradesResult(
False, (_("The URI %s failed to download, aborting") %
item.desc_uri))
if not os.path.exists(item.destfile):
print(_("Download finished, but file %s not there?!?") %
item.destfile)
logging.error("Download finished, but file %s not "
"there?!?", item.destfile)
return UnattendedUpgradesResult(
False, (_("Download finished, but file %s not there?!?") %
item.destfile))
if not item.is_trusted and not apt_pkg.config.find_b(
"APT::Get::AllowUnauthenticated", False):
logging.debug("%s is blacklisted because it is not trusted")
pkg_name = pkgname_from_deb(item.destfile)
if not is_pkgname_in_blacklist(pkg_name, cache.blacklist):
conffile_blacklist.append("%s$" % re.escape(pkg_name))
if not is_deb(item.destfile):
logging.debug("%s is not a .deb file" % item)
continue
if conffile_prompt(item.destfile):
# skip package (means to re-run the whole marking again
# and making sure that the package will not be pulled in by
# some other package again!)
#
# print to stdout to ensure that this message is part of
# the cron mail (only if no summary mail is requested)
email = apt_pkg.config.find("Unattended-Upgrade::Mail", "")
if not email:
print(_("Package %s has conffile prompt and needs "
"to be upgraded manually") %
pkgname_from_deb(item.destfile))
# log to the logfile
logging.warning(_("Package %s has conffile prompt and "
"needs to be upgraded manually"),
pkgname_from_deb(item.destfile))
pkg_name = pkgname_from_deb(item.destfile)
if not is_pkgname_in_blacklist(pkg_name, cache.blacklist):
conffile_blacklist.append("%s$" % re.escape(pkg_name))
pkg_conffile_prompt = True
# redo the selection about the packages to upgrade based on the new
# blacklist
logging.debug("Packages blacklist due to conffile prompts: %s"
% conffile_blacklist)
# find out about the packages that are upgradable (in a allowed_origin)
if len(conffile_blacklist) > 0:
for regex in conffile_blacklist:
cache.blacklist.append(regex)
cache.apply_pinning(cache.pinning_from_regex_list(
conffile_blacklist, NEVER_PIN)) # type: ignore
old_pkgs_to_upgrade = pkgs_to_upgrade[:]
pkgs_to_upgrade = []
for pkg in old_pkgs_to_upgrade:
logging.debug("Checking the black and whitelist: %s" %
(pkg.name))
cache.mark_upgrade_adjusted(
pkg, from_user=not pkg.is_auto_installed)
if check_changes_for_sanity(cache):
pkgs_to_upgrade.append(pkg)
else:
logging.info(_("package %s not upgraded"), pkg.name)
cache.clear()
for pkg2 in pkgs_to_upgrade:
cache.call_adjusted(
apt.package.Package.mark_upgrade, pkg2,
from_user=not pkg2.is_auto_installed)
if cache.get_changes():
cache.clear()
else:
logging.debug("dpkg is configured not to cause conffile prompts")
# auto-removals
kernel_pkgs_remove_success = True # type: bool
kernel_pkgs_removed = [] # type: List[str]
kernel_pkgs_kept_installed = [] # type: List[str]
if (auto_removable and apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-Unused-Kernel-Packages", True)):
# remove unused kernels before installing new ones because the newly
# installed ones may fill up /boot and break the system right before
# removing old ones could take place
#
# this step may also remove _auto-removable_ reverse dependencies
# of kernel packages
auto_removable_kernel_pkgs = {
p for p in auto_removable
if (cache.versioned_kernel_pkgs_regexp
and cache.versioned_kernel_pkgs_regexp.match(p)
and not cache.running_kernel_pkgs_regexp.match(p))}
if auto_removable_kernel_pkgs:
logging.info(_("Removing unused kernel packages: %s"),
" ".join(auto_removable_kernel_pkgs))
(kernel_pkgs_remove_success,
kernel_pkgs_removed,
kernel_pkgs_kept_installed) = do_auto_remove(
cache, auto_removable_kernel_pkgs, logfile_dpkg,
options.minimal_upgrade_steps,
options.verbose or options.debug, options.dry_run)
auto_removable = get_auto_removable(cache)
previous_autoremovals = auto_removable
if apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-Unused-Dependencies", False):
pending_autoremovals = previous_autoremovals
else:
pending_autoremovals = set()
# exit if there is nothing to do and nothing to report
if (len(pending_autoremovals) == 0
and len(pkgs_to_upgrade) == 0):
logging.info(_("No packages found that can be upgraded unattended "
"and no pending auto-removals"))
pkgs_kept_back = cache.find_kept_packages(options.dry_run)
return UnattendedUpgradesResult(
kernel_pkgs_remove_success,
_("No packages found that can be upgraded unattended and no "
"pending auto-removals"),
pkgs_removed=kernel_pkgs_removed,
pkgs_kept_back=pkgs_kept_back,
pkgs_kept_installed=kernel_pkgs_kept_installed,
update_stamp=True)
# check if its configured for install on shutdown, if so, the
# environment UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN will
# be set by the unatteded-upgrades-shutdown script
if ("UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN" not in os.environ
and apt_pkg.config.find_b(
"Unattended-Upgrade::InstallOnShutdown", False)):
logger = logging.getLogger()
logger.debug("Configured to install on shutdown, so exiting now")
return UnattendedUpgradesResult(True)
# check if we are in dry-run mode
if options.dry_run:
logging.info("Option --dry-run given, *not* performing real actions")
apt_pkg.config.set("Debug::pkgDPkgPM", "1")
# do the install based on the new list of pkgs
pkgs = [pkg.name for pkg in pkgs_to_upgrade]
logging.info(_("Packages that will be upgraded: %s"), " ".join(pkgs))
# only perform install step if we actually have packages to install
pkg_install_success = True
if len(pkgs_to_upgrade) > 0:
# do install
pkg_install_success = do_install(cache,
pkgs,
options,
logfile_dpkg)
# Was the overall run succesful: only if everything installed
# fine and nothing was held back because of a conffile prompt.
successful_run = (kernel_pkgs_remove_success and pkg_install_success
and not pkg_conffile_prompt)
# now check if any auto-removing needs to be done
if cache._depcache.broken_count > 0:
print(_("Cache has broken packages, exiting"))
logging.error(_("Cache has broken packages, exiting"))
return UnattendedUpgradesResult(
False, _("Cache has broken packages, exiting"), pkgs=pkgs)
# make sure we start autoremovals with a clear cache
if cache.get_changes():
cache.clear()
# the user wants *all* auto-removals to be removed
# (unless u-u got signalled to stop gracefully quickly)
pkgs_removed = [] # type: List[str]
pkgs_kept_installed = [] # type: List[str]
if ((apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-Unused-Dependencies", False)
and not SIGNAL_STOP_REQUEST)):
auto_removals = get_auto_removable(cache)
(pkg_remove_success,
pkgs_removed,
pkgs_kept_installed) = do_auto_remove(
cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps,
options.verbose or options.debug,
options.dry_run)
successful_run = successful_run and pkg_remove_success
# the user wants *only new* auto-removals to be removed
elif apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-New-Unused-Dependencies", True):
# calculate the new auto-removals
new_pending_autoremovals = get_auto_removable(cache)
auto_removals = new_pending_autoremovals - previous_autoremovals
(pkg_remove_success,
pkgs_removed,
pkgs_kept_installed) = do_auto_remove(
cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps,
options.verbose or options.debug,
options.dry_run)
successful_run = successful_run and pkg_remove_success
logging.debug("InstCount=%i DelCount=%i BrokenCount=%i"
% (cache._depcache.inst_count,
cache._depcache.del_count,
cache._depcache.broken_count))
# clean after success install (if needed)
keep_key = "Unattended-Upgrade::Keep-Debs-After-Install"
if (not apt_pkg.config.find_b(keep_key, False)
and not options.dry_run
and pkg_install_success):
clean_downloaded_packages(fetcher)
pkgs_kept_back = cache.find_kept_packages(options.dry_run)
return UnattendedUpgradesResult(
successful_run, _("All upgrades installed"), pkgs,
pkgs_kept_back,
kernel_pkgs_removed + pkgs_removed,
kernel_pkgs_kept_installed + pkgs_kept_installed,
update_stamp=True)
class Options:
def __init__(self):
self.download_only = False
self.dry_run = False
self.debug = False
self.apt_debug = False
self.verbose = False
self.minimal_upgrade_steps = False
if __name__ == "__main__":
localesApp = "unattended-upgrades"
localesDir = "/usr/share/locale"
gettext.bindtextdomain(localesApp, localesDir)
gettext.textdomain(localesApp)
# set debconf to NON_INTERACTIVE
os.environ["DEBIAN_FRONTEND"] = "noninteractive"
# this ensures the commandline is logged in /var/log/apt/history.log
apt_pkg.config.set("Commandline::AsString", " ".join(sys.argv))
# COMPAT with the mispelling
minimal_steps_default = (
apt_pkg.config.find_b("Unattended-Upgrades::MinimalSteps", True)
and apt_pkg.config.find_b("Unattended-Upgrade::MinimalSteps", True))
# init the options
parser = OptionParser()
parser.add_option("-d", "--debug",
action="store_true",
default=apt_pkg.config.find_b(
"Unattended-Upgrade::Debug", False),
help=_("print debug messages"))
parser.add_option("", "--apt-debug",
action="store_true", default=False,
help=_("make apt/libapt print verbose debug messages"))
parser.add_option("-v", "--verbose",
action="store_true",
default=apt_pkg.config.find_b(
"Unattended-Upgrade::Verbose", False),
help=_("print info messages"))
parser.add_option("", "--dry-run",
action="store_true", default=False,
help=_("Simulation, download but do not install"))
parser.add_option("", "--download-only",
action="store_true", default=False,
help=_("Only download, do not even try to install."))
parser.add_option("", "--minimal-upgrade-steps",
action="store_true", default=minimal_steps_default,
help=_("Upgrade in minimal steps (and allow "
"interrupting with SIGTERM) (default)"))
parser.add_option("", "--no-minimal-upgrade-steps",
action="store_false", default=minimal_steps_default,
dest="minimal_upgrade_steps",
help=_("Upgrade all packages together instead of in "
"smaller sets"))
parser.add_option("", "--minimal_upgrade_steps",
action="store_true",
help=SUPPRESS_HELP,
default=minimal_steps_default)
options = cast(Options, (parser.parse_args())[0])
if os.getuid() != 0:
print(_("You need to be root to run this application"))
sys.exit(1)
# ensure that we are not killed when the terminal goes away e.g. on
# shutdown
signal.signal(signal.SIGHUP, signal.SIG_IGN)
# setup signal handler for graceful stopping
signal.signal(signal.SIGTERM, signal_handler)
# write pid to let other processes find this one
pidf = os.path.join(apt_pkg.config.find_dir("Dir"),
"var", "run", "unattended-upgrades.pid")
# clean up pid file on exit
with open(pidf, "w") as fp:
fp.write("%s" % os.getpid())
atexit.register(os.remove, pidf)
# run the main code
sys.exit(main(options))