File: //usr/bin/X11/X11/X11/X11/X11/X11/hibagent
#!/usr/bin/python3
# The EC2 Spot hibernation agent. This agent does several things:
# 1. Upon startup it checks for sufficient swap space to allow hibernate and fails
#    if it's present but there's not enough of it.
# 2. If there's no swap space, it creates it and launches a background thread to
#    touch all of its blocks to make sure that EBS volumes are pre-warmed.
# 3. It updates the offset of the swap file in the kernel using SNAPSHOT_SET_SWAP_AREA ioctl.
# 4. It daemonizes and starts a polling thread to listen for instance termination notifications.
#
# This file is compatible both with Python 2 and Python 3
import argparse
import array
import atexit
import ctypes as ctypes
import fcntl
import mmap
import os
import struct
import sys
import syslog
import requests
from subprocess import check_call, check_output
from threading import Thread
from math import ceil
from time import sleep
try:
    from urllib.request import urlopen, Request
except ImportError:
    from urllib2 import urlopen, Request, HTTPError
try:
    from ConfigParser import ConfigParser, NoSectionError, NoOptionError
except:
    from configparser import ConfigParser, NoSectionError, NoOptionError
GRUB_FILE = '/boot/grub/menu.lst'
GRUB2_DIR = '/etc/default/grub.d'
SWAP_RESERVED_SIZE = 16384
log_to_syslog = True
log_to_stderr = True
IMDS_BASEURL = 'http://169.254.169.254'
IMDS_API_TOKEN_PATH = 'latest/api/token'
IMDS_SPOT_ACTION_PATH = 'latest/meta-data/hibernation/configured'
def log(message):
    if log_to_syslog:
        syslog.syslog(message)
    if log_to_stderr:
        sys.stderr.write("%s\n" % message)
def fallocate(fl, size):
    try:
        _libc = ctypes.CDLL('libc.so.6')
        _fallocate = _libc.fallocate
        _fallocate.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_ulong, ctypes.c_ulong]
        # (FD, mode, offset, len)
        res = _fallocate(fl.fileno(), 0, 0, size)
        if res != 0:
            raise Exception("Failed to perform fallocate(). Result: %d" % res)
    except Exception as e:
        log("Failed to call fallocate(), will use resize. Err: %s" % str(e))
        fl.seek(size-1)
        fl.write(chr(0))
def mlockall():
    log("Locking all the code in memory")
    try:
        _libc = ctypes.CDLL('libc.so.6')
        _mlockall = _libc.mlockall
        _mlockall.argtypes = [ctypes.c_int]
        _MCL_CURRENT = 1
        _MCL_FUTURE = 2
        _mlockall(_MCL_CURRENT | _MCL_FUTURE)
    except Exception as e:
        log("Failed to lock hibernation agent into RAM. Error: %s" % str(e))
def get_file_block_number(filename):
    with open(filename, 'r') as handle:
        buf = array.array('L', [0])
        # from linux/fs.h
        FIBMAP = 0x01
        result = fcntl.ioctl(handle.fileno(), FIBMAP, buf)
    if result < 0:
        raise Exception("Failed to get the file offset. Error=%d" % result)
    return buf[0]
def get_swap_space():
    # Format is (tab-separated):
    # Filename Type Size Used Priority
    # / swapfile file 536870908 0 - 1
    with open('/proc/swaps') as swp:
        lines = swp.readlines()[1:]
        if not lines:
            return 0
        return int(lines[0].split()[2]) * 1024
def get_partuuid(device):
    return check_output(
        ['lsblk', '-dno', 'PARTUUID', device]).decode('ascii').strip()
def patch_grub_config(swap_device, offset, grub_file, grub2_dir):
    log("Updating GRUB to use the device %s with offset %d for resume"
        % (swap_device, offset))
    if grub_file and os.path.exists(grub_file):
        lines = []
        with open(grub_file) as fl:
            for ln in fl.readlines():
                params = ln.split()
                if not params or params[0] != 'kernel':
                    lines.append(ln)
                    continue
    
                new_params = []
                for param in params:
                    if "resume_offset=" in param or "resume=" in param:
                        continue
                    new_params.append(param)
    
                if "no_console_suspend=" not in ln:
                    new_params.append("no_console_suspend=1")
                new_params.append("resume_offset=%d" % offset)
                new_params.append("resume=%s" % swap_device)
    
                lines.append(" ".join(new_params)+"\n")
    
        with open(grub_file, "w") as fl:
            fl.write("".join(lines))
    # Do GRUB2 update as well
    if grub2_dir and os.path.exists(grub2_dir):
        offset_file = os.path.join(grub2_dir, '99-set-swap.cfg')
        if swap_device.startswith("/dev"):
            swap_device = "PARTUUID=%s" % get_partuuid(swap_device)
        if not os.path.exists(offset_file):
            with open(offset_file, 'w') as fl:
                fl.write('GRUB_CMDLINE_LINUX_DEFAULT="$GRUB_CMDLINE_LINUX_DEFAULT no_console_suspend=1 '
                         'resume_offset=%d resume=%s"\n' % (offset, swap_device))
            check_call('/usr/sbin/update-grub2')
    log("GRUB configuration is updated")
def update_kernel_swap_offset(grub_update):
    with open('/proc/swaps') as swp:
        lines = swp.readlines()[1:]
        if not lines:
            raise Exception("Swap file is not found")
        filename = lines[0].split()[0]
    log("Updating the kernel offset for the swapfile: %s" % filename)
    statbuf = os.stat(filename)
    dev = statbuf.st_dev
    offset = get_file_block_number(filename)
    if grub_update:
        # Find the mount point for the swap file ('df -P /swap')
        df_out = check_output(['df', '-P', filename]).decode('ascii')
        dev_str = df_out.split("\n")[1].split()[0]
        patch_grub_config(dev_str, offset, GRUB_FILE, GRUB2_DIR)
    else:
        log("Skipping GRUB configuration update")
    log("Setting swap device to %d with offset %d" % (dev, offset))
    # Set the kernel swap offset, see https://www.kernel.org/doc/Documentation/power/userland-swsusp.txt
    # From linux/suspend_ioctls.h
    SNAPSHOT_SET_SWAP_AREA = 0x400C330D
    buf = struct.pack('LI', offset, dev)
    with open('/dev/snapshot', 'r') as snap:
        fcntl.ioctl(snap, SNAPSHOT_SET_SWAP_AREA, buf)
    log("Done updating the swap offset")
class SwapInitializer(object):
    def __init__(self, filename, swap_size, touch_swap, mkswap, swapon):
        self.filename = filename
        self.swap_size = swap_size
        self.need_to_hurry = False
        self.mkswap = mkswap
        self.swapon = swapon
        self.touch_swap = touch_swap
    def do_allocate(self):
        log("Allocating %d bytes in %s" % (self.swap_size, self.filename))
        with open(self.filename, 'w+') as fl:
            fallocate(fl, self.swap_size)
        os.chmod(self.filename, 0o600)
    def init_swap(self):
        """
            Initialize the swap using direct IO to avoid polluting the page cache
        """
        try:
            cur_swap_size = os.stat(self.filename).st_size
            if cur_swap_size >= self.swap_size:
                log("Swap file size (%d bytes) is already large enough" % cur_swap_size)
                return
        except OSError:
            pass
        self.do_allocate()
        if not self.touch_swap:
            log("Swap pre-heating is skipped, the swap blocks won't be touched during "
                "initialization to ensure they are ready")
            return
        written = 0
        log("Opening %s for direct IO" % self.filename)
        fd = os.open(self.filename, os.O_RDWR | os.O_DIRECT | os.O_SYNC | os.O_DSYNC)
        if fd < 0:
            raise Exception("Failed to initialize the swap. Err: %s" % os.strerror(os.errno))
        filler_block = None
        try:
            # Create a filler block that is correctly aligned for direct IO
            filler_block = mmap.mmap(-1, 1024 * 1024)
            # We're using 'b' to avoid optimizations that might happen for zero-filled pages
            filler_block.write(b'b' * 1024 * 1024)
            log("Touching all blocks in %s" % self.filename)
            while written < self.swap_size and not self.need_to_hurry:
                res = os.write(fd, filler_block)
                if res <= 0:
                    raise Exception("Failed to touch a block. Err: %s" % os.strerror(os.errno))
                written += res
        finally:
            os.close(fd)
            if filler_block:
                filler_block.close()
        log("Swap file %s is ready" % self.filename)
    def turn_on_swap(self):
        # Do mkswap
        try:
            mkswap = self.mkswap.format(swapfile=self.filename)
            log("Running: %s" % mkswap)
            check_call(mkswap, shell=True)
            swapon = self.swapon.format(swapfile=self.filename)
            log("Running: %s" % swapon)
            check_call(swapon, shell=True)
        except Exception as e:
            log("Failed to initialize swap, reason: %s" % str(e))
class BackgroundInitializerRunner(object):
    def __init__(self, swapper, update_grub):
        self.swapper = swapper
        self.thread = None
        self.error = None
        self.update_grub = update_grub
    def start_init(self):
        self.thread = Thread(target=self.do_async_init, name="SwapInitializer")
        self.thread.setDaemon(True)
        self.thread.start()
    def check_finished(self):
        if self.thread is not None:
            self.thread.join(timeout=0)
            if self.thread.isAlive():
                return False
            self.thread = None
        log("Background swap initialization thread is complete.")
        if self.error is not None:
            raise self.error
        return True
    def force_completion(self):
        log("We're out of time, stopping the background swap initialization.")
        self.swapper.need_to_hurry = True
        self.thread.join()
        log("Background swap initialization thread has stopped.")
        self.thread = None
        if self.error is not None:
            raise self.error
    def do_async_init(self):
        try:
            self.swapper.init_swap()
            self.swapper.turn_on_swap()
            update_kernel_swap_offset(self.update_grub)
        except Exception as ex:
            log("Failed to initialize swap, reason: %s" % str(ex))
            self.error = ex
class ItnPoller(object):
    def __init__(self, url, hibernate_cmd, initializer):
        self.url = url
        self.hibernate_cmd = hibernate_cmd
        self.initializer = initializer
    def poll_loop(self):
        log("Starting the hibernation polling loop")
        while True:
            self.run_loop_iteration()
            sleep(1)
    def run_loop_iteration(self):
        if self.initializer and self.initializer.check_finished():
            self.initializer = None
        if self.poll_for_termination():
            if self.initializer:
                self.initializer.force_completion()
                self.initializer = None
            self.do_hibernate()
    def poll_for_termination(self):
        # noinspection PyBroadException
        response1 = None
        response2 = None
        try:
            request1 = Request("http://169.254.169.254/latest/api/token")
            request1.add_header('X-aws-ec2-metadata-token-ttl-seconds', '21600')
            request1.get_method = lambda:"PUT"
            response1 = urlopen(request1)
            token = response1.read()
            request2 = Request(self.url)
            request2.add_header('X-aws-ec2-metadata-token', token)
            response2 = urlopen(request2)
            res = response2.read()
            return b"hibernate" in res
        except:
            return False
        finally:
            if response1:
                response1.close()
            if response2:
                response2.close()
    def do_hibernate(self):
        log("Attempting to hibernate")
        try:
            check_call(self.hibernate_cmd, shell=True)
        except Exception as e:
            log("Failed to hibernate, reason: %s" % str(e))
        # We're not guaranteed to be stopped immediately after the hibernate
        # command fires. So wait a little bit to avoid triggering ourselves twice
        sleep(2)
def daemonize(pidfile):
    """
        Convert the process into a daemon, doing the usual Unix magic
    """
    try:
        pid = os.fork()
        if pid > 0:
            # Exit from first parent
            sys.exit(0)
    except OSError as e:
        log("Fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
        sys.exit(1)
    # Decouple from parent environment
    os.chdir("/")
    os.setsid()
    os.umask(0)
    # Second fork
    try:
        pid = os.fork()
        if pid > 0:
            # Exit from second parent
            sys.exit(0)
    except OSError as e:
        log("Fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
        sys.exit(1)
    # Write the PID file
    pid = str(os.getpid())
    with open(pidfile, "w+") as fl:
        fl.write("%s\n" % pid)
    atexit.register(lambda: os.unlink(pidfile))
    # Redirect standard file descriptors to null to avoid blocking
    nul = open('/dev/null', 'a+')
    os.dup2(nul.fileno(), sys.stdin.fileno())
    os.dup2(nul.fileno(), sys.stdout.fileno())
    os.dup2(nul.fileno(), sys.stderr.fileno())
def detect_hibernate_cmd():
    if os.path.exists("/run/systemd/system"):
        return "/bin/systemctl hibernate"
    else:
        return "/usr/sbin/pm-hibernate"
class Config(object):
    def __init__(self, config, args):
        def get(section, name):
            try:
                return config.get(section, name)
            except NoSectionError:
                return None
            except NoOptionError:
                return None
        def get_int(section, name):
            v = get(section, name)
            if v is None:
                return None
            return int(v)
        self.lock_in_ram = self.merge(
            self.to_bool(get('core', 'lock-in-ram')), self.to_bool(args.lock_in_ram), True)
        self.log_to_syslog = self.merge(
            self.to_bool(get('core', 'log-to-syslog')), self.to_bool(args.log_to_syslog), True)
        self.log_to_stderr = self.merge(
            self.to_bool(get('core', 'log-to-stderr')), self.to_bool(args.log_to_stderr), True)
        self.touch_swap = self.merge(
            self.to_bool(get('core', 'touch-swap')), self.to_bool(args.touch_swap), True)
        self.grub_update = self.merge(
            self.to_bool(get('core', 'grub-update')), self.to_bool(args.grub_update), True)
        self.ephemeral_check = self.merge(
            self.to_bool(get('core', 'check-ephemeral-volumes')),
            self.to_bool(args.check_ephemeral_volumes), True)
        self.freeze_timeout_curve = self.merge(get('core', 'freeze-timeout-curve'), args.freeze_timeout_curve,
                                               '0-8:20,8-16:40,16-64:60,64-128:150,128-256:200,256-:400')
        self.swap_percentage = self.merge(
            get_int('swap', 'percentage-of-ram'), args.swap_ram_percentage, 100)
        self.swap_mb = self.merge(
            get_int('swap', 'target-size-mb'), args.swap_target_size_mb, 4000)
        self.mkswap = self.merge(get('swap', 'mkswap'), args.mkswap, '/sbin/mkswap {swapfile}')
        self.swapon = self.merge(get('swap', 'swapon'), args.swapon, '/sbin/swapon {swapfile}')
        self.swapfile = self.merge(get('swap', 'swapfile'), args.swapfile, '/swap')
        self.hibernate = self.merge(
            get('pm-utils', 'hibernate-command'), args.hibernate, detect_hibernate_cmd())
        self.url = self.merge(
            get('notification', 'monitored-url'), args.monitored_url,
            'http://169.254.169.254/latest/meta-data/spot/instance-action')
    def merge(self, cf_value, arg_value, def_val):
        if arg_value is not None:
            return arg_value
        if cf_value is not None:
            return cf_value
        return def_val
    def to_bool(self, bool_str):
        """Parse the string and return the boolean value encoded or raise an exception"""
        if bool_str is None:
            return None
        if bool_str.lower() in ['true', 't', '1']:
            return True
        elif bool_str.lower() in ['false', 'f', '0']:
            return False
        # if here we couldn't parse it
        raise ValueError("%s is not recognized as a boolean value" % bool_str)
    def __str__(self):
        return str(self.__dict__)
def get_pm_freeze_timeout(freeze_timeout_curve, ram_bytes):
    if not freeze_timeout_curve:
        return None
    ram_gb = ceil(ram_bytes / (1024.0*1024.0*1024.0))
    try:
        for curve_part in freeze_timeout_curve.split(","):
            ram_sizes, timeout = curve_part.split(":")
            sizes_parts = ram_sizes.split("-")
            if len(sizes_parts) == 2 and sizes_parts[1] and sizes_parts[0]:
                ram_min = int(sizes_parts[0])
                ram_max = int(sizes_parts[1])
            elif len(sizes_parts) == 1 and sizes_parts[0] or \
                 len(sizes_parts) == 2 and not sizes_parts[1]:
                ram_min = int(sizes_parts[0])
                ram_max = None
            else:
                raise Exception("can't parse %s, expected <int>-[<int>]" % ram_sizes)
            if (ram_min <= ram_gb and ram_max is None) or (ram_min <= ram_gb < ram_max):
                return int(timeout)
    except Exception as ex:
        log("Failed to parse the freeze timeout curve, error: %s. "
            "The pm_freeze_timeout will not be adjusted." % str(ex))
        return None
    log("Failed to find a fitting PM freeze timeout curve segment "
        "for %d GB of RAM. The pm_freeze_timeout will not be adjusted." % ram_gb)
    return None
def adjust_pm_timeout(timeout):
    try:
        with open('/sys/power/pm_freeze_timeout') as fl:
            cur_timeout = int(fl.read()) / 1000
        if cur_timeout >= timeout:
            log("Info current pm_freeze_timeout (%d seconds) is greater than or equal "
                "to the requested (%d seconds) timeout, doing nothing" % (cur_timeout, timeout))
        else:
            with open('/sys/power/pm_freeze_timeout', 'w') as fl:
                fl.write("%d" % (timeout*1000))
            log("Adjusted pm_freeze_timeout to %d from %d" % (timeout, cur_timeout))
    except Exception as e:
        log("Failed to adjust pm_freeze_timeout to %d. Error: %s" % (timeout, str(e)))
        exit(1)
def get_imds_token(seconds=21600):
    """ Get a token to access instance metadata. """
    log("Requesting new IMDSv2 token.")
    request_header = {'X-aws-ec2-metadata-token-ttl-seconds': '{}'.format(seconds)}
    token_url = '{}/{}'.format(IMDS_BASEURL, IMDS_API_TOKEN_PATH)
    response = requests.put(token_url, headers=request_header)
    response.close()
    if response.status_code != requests.codes.ok:
        return None
    return response.text
def hibernation_enabled():
    """Returns a boolean indicating whether hibernation-option.configured is enabled or not."""
    imds_token = get_imds_token()
    if imds_token is None:
        log("IMDS_V2 http endpoint is disabled")
        # IMDS http endpoint is disabled
        return False
    request_header = {'X-aws-ec2-metadata-token': imds_token}
    response = requests.get("{}/{}".format(IMDS_BASEURL, IMDS_SPOT_ACTION_PATH),
                 headers=request_header)
    response.close()
    if response.status_code != requests.codes.ok or response.text.lower() == "false":
        return False
    log("Hibernation Configured Flag found")
    return True
def main():
    # Parse arguments
    parser = argparse.ArgumentParser(description="An EC2 agent that watches for instance stop "
                                                 "notifications and initiates hibernation")
    parser.add_argument('-c', '--config', help='Configuration file to use', type=str)
    parser.add_argument('-i', '--pidfile', help='The file to write PID to', type=str,
                        default='/run/hibagent')
    parser.add_argument('-f', '--foreground', help="Run in foreground, don't daemonize", action="store_true")
    parser.add_argument("-l", "--lock-in-ram", help='Lock the code in RAM', type=str)
    parser.add_argument("-syslog", "--log-to-syslog", help='Log to syslog', type=str)
    parser.add_argument("-stderr", "--log-to-stderr", help='Log to stderr', type=str)
    parser.add_argument("-touch", "--touch-swap", help='Do swap initialization', type=str)
    parser.add_argument("-grub", "--grub-update", help='Update GRUB config with resume offset', type=str)
    parser.add_argument("-e", "--check-ephemeral-volumes", help='Check if ephemeral volumes are mounted', type=str)
    parser.add_argument("-u", "--freeze-timeout-curve", help='The pm_freeze_timeout curve (by RAM size)', type=str)
    parser.add_argument("-p", "--swap-ram-percentage", help='The target swap size as a percentage of RAM', type=int)
    parser.add_argument("-s", "--swap-target-size-mb", help='The target swap size in megabytes', type=int)
    parser.add_argument("-w", "--swapfile", help="Swap file name", type=str)
    parser.add_argument('--mkswap', help='The command line utility to set up swap', type=str)
    parser.add_argument('--swapon', help='The command line utility to turn on swap', type=str)
    parser.add_argument('--hibernate', help='The command line utility to initiate hibernation', type=str)
    parser.add_argument('--monitored-url', help='The URL to monitor for notifications', type=str)
    args = parser.parse_args()
    config_file = ConfigParser()
    if args.config:
        config_file.read(args.config)
    config = Config(config_file, args)
    global log_to_syslog, log_to_stderr
    log_to_stderr = config.log_to_stderr
    log_to_syslog = config.log_to_syslog
    log("Effective config: %s" % config)
    # Let's first check if we need to kill the Spot Hibernate Agent
    if hibernation_enabled():
        log("Spot Instance Launch has enabled Hibernation Configured Flag. hibagent exiting!!")
        exit(0)
    target_swap_size = config.swap_mb * 1024 * 1024
    ram_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
    swap_percentage_size = ram_bytes * config.swap_percentage // 100
    if swap_percentage_size > target_swap_size:
        target_swap_size = int(swap_percentage_size)
    log("Will check if swap is at least: %d megabytes" % (target_swap_size // (1024*1024)))
    timeout = get_pm_freeze_timeout(config.freeze_timeout_curve, ram_bytes)
    if timeout:
        adjust_pm_timeout(timeout)
    # Validate the swap configuration
    cur_swap = get_swap_space()
    bi = None
    if cur_swap >= target_swap_size - SWAP_RESERVED_SIZE:
        log("There's sufficient swap available (have %d, need %d)" %
            (cur_swap, target_swap_size))
        update_kernel_swap_offset(config.grub_update)
    elif cur_swap > 0:
        log("There's not enough swap space available (have %d, need %d), exiting" %
            (cur_swap, target_swap_size))
        exit(1)
    else:
        log("No swap is present, will create and initialize it")
        # We need to create swap, but first validate that we have enough free space
        swap_dev = os.path.dirname(config.swapfile)
        st = os.statvfs(swap_dev)
        free_bytes = st.f_bavail * st.f_frsize
        free_space_needed = target_swap_size + 10 * 1024 * 1024
        if free_space_needed >= free_bytes:
            log("There's not enough space (%d present, %d needed) on the swap device: %s" % (
                free_bytes, free_space_needed, swap_dev))
            exit(1)
        log("There's enough space (%d present, %d needed) on the swap device: %s" % (
            free_bytes, free_space_needed, swap_dev))
        sw = SwapInitializer(config.swapfile, target_swap_size, config.touch_swap,
                             config.mkswap, config.swapon)
        bi = BackgroundInitializerRunner(sw, config.grub_update)
    # Daemonize now! The parent process will not return from this method
    if not args.foreground:
        log("Initial checks are finished, daemonizing and writing PID into %s" % args.pidfile)
        daemonize(args.pidfile)
    else:
        log("Initial checks are finished, will run in foreground now")
    poller = ItnPoller(config.url, config.hibernate, bi)
    if config.lock_in_ram:
        mlockall()
    # This loop will now be running inside the child
    if bi:
        bi.start_init()
    poller.poll_loop()
if __name__ == '__main__':
    main()