File: //usr/share/system-config-printer/firewallsettings.py
#!/usr/bin/python3
## system-config-printer
## Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2015 Red Hat, Inc.
## Authors:
##  Tim Waugh <twaugh@redhat.com>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# config is generated from config.py.in by configure
import config
import dbus
import json
from debug import *
IPP_CLIENT_SERVICE   = "ipp-client"
IPP_CLIENT_PORT      = "631"
IPP_CLIENT_PROTOCOL  = "udp"
IPP_SERVER_SERVICE   = "ipp"
IPP_SERVER_PORT      = "631"
IPP_SERVER_PROTOCOL  = "tcp"
MDNS_SERVICE         = "mdns"
MDNS_PORT            = "5353"
MDNS_PROTOCOL        = "udp"
SAMBA_CLIENT_SERVICE = "samba-client"
class FirewallD:
    def __init__ (self):
        try:
            from firewall.client import FirewallClient
            self._fw = FirewallClient ()
            if not self._fw.connected:
                debugprint ("FirewallD seems to be installed but not running")
                self._fw = None
                self._zone = None
                self.running = False
                return
            zone_name = self._get_active_zone ()
            if zone_name:
                self._zone = self._fw.config().getZoneByName (zone_name)
            else:
                self._zone = None
            self.running = True
            debugprint ("Using /org/fedoraproject/FirewallD1")
        except (ImportError, dbus.exceptions.DBusException):
            self._fw = None
            self._zone = None
            self.running = False
    def _get_active_zone (self):
        zones = list(self._fw.getActiveZones().keys())
        if not zones:
            debugprint ("FirewallD: no changeable zone")
            return None
        elif len (zones) == 1:
            # most probable case
            return zones[0]
        else:
            # Do we need to handle the 'more active zones' case ?
            # It's quite unlikely case because that would mean that more
            # network connections are up and running and they are
            # in different network zones at the same time.
            debugprint ("FirewallD returned more zones, taking first one")
            return zones[0]
    def _get_fw_data (self, reply_handler=None, error_handler=None):
        try:
            debugprint ("%s in _get_fw_data: _fw_data is %s" %
                        (self, repr(self._fw_data.getServices())))
            if self._fw_data:
                debugprint ("Using cached firewall data")
                if reply_handler:
                    reply_handler (self._fw_data)
        except AttributeError:
            try:
                self._fw_data = self._zone.getSettings ()
                debugprint ("Firewall data obtained")
                if reply_handler:
                    reply_handler (self._fw_data) 
            except (dbus.exceptions.DBusException, AttributeError, ValueError) as e:
                self._fw_data = None
                debugprint ("Exception examining firewall")
                if error_handler:
                    error_handler (e)
        return self._fw_data
    def read (self, reply_handler=None, error_handler=None):
        if reply_handler:
            self._get_fw_data (reply_handler,
                               error_handler)
        else:
            self._get_fw_data ()
    def write (self):
        try:
            if self._zone:
                self._zone.update (self._fw_data)
            self._fw.reload ()
        except dbus.exceptions.DBusException:
            nonfatalException ()
    def add_service (self, service):
        if not self._get_fw_data ():
            return
        from firewall.errors import FirewallError
        import firewall.errors
        try:
            self._fw_data.addService (service)
        except FirewallError as e:
            if e.code is firewall.errors.ALREADY_ENABLED:
                pass
            else:
                raise FirewallError (e.code, e.msg)
    def check_ipp_client_allowed (self):
        if not self._get_fw_data ():
            return True
        return (IPP_CLIENT_SERVICE in self._fw_data.getServices () or
               [IPP_CLIENT_PORT, IPP_CLIENT_PROTOCOL] in self._fw_data.getPorts ())
    def check_ipp_server_allowed (self):
        if not self._get_fw_data ():
            return True
        return (IPP_SERVER_SERVICE in self._fw_data.getServices () or
               [IPP_SERVER_PORT, IPP_SERVER_PROTOCOL] in self._fw_data.getPorts ())
    def check_samba_client_allowed (self):
        if not self._get_fw_data ():
            return True
        return (SAMBA_CLIENT_SERVICE in self._fw_data.getServices ())
    def check_mdns_allowed (self):
        if not self._get_fw_data ():
            return True
        return (MDNS_SERVICE in self._fw_data.getServices () or
               [MDNS_PORT, MDNS_PROTOCOL] in self._fw_data.getPorts ())
class SystemConfigFirewall:
    DBUS_INTERFACE = "org.fedoraproject.Config.Firewall"
    DBUS_PATH = "/org/fedoraproject/Config/Firewall"
    def __init__(self):
        try:
            bus = dbus.SystemBus ()
            obj = bus.get_object (self.DBUS_INTERFACE, self.DBUS_PATH)
            self._fw = dbus.Interface (obj, self.DBUS_INTERFACE)
            debugprint ("Using system-config-firewall")
        except dbus.exceptions.DBusException:
            debugprint ("No firewall ")
            self._fw = None
            self._fw_data = (None, None)
    def _get_fw_data (self, reply_handler=None, error_handler=None):
        try:
            debugprint ("%s in _get_fw_data: _fw_data is %s" %
                        (self, repr(self._fw_data)))
            if self._fw_data:
                debugprint ("Using cached firewall data")
                if reply_handler is None:
                    return self._fw_data
                self._client_reply_handler (self._fw_data)
        except AttributeError:
            try:
                if reply_handler:
                    self._fw.read (reply_handler=reply_handler,
                                   error_handler=error_handler)
                    return
                p = self._fw.read ()
                self._fw_data = json.loads (p)
            except (dbus.exceptions.DBusException, AttributeError, ValueError) as e:
                self._fw_data = (None, None)
                if error_handler:
                    debugprint ("Exception examining firewall")
                    self._client_error_handler (e)
        return self._fw_data
    def read (self, reply_handler=None, error_handler=None):
        if reply_handler:
            self._client_reply_handler = reply_handler
            self._client_error_handler = error_handler
            self._get_fw_data (reply_handler=self.reply_handler,
                               error_handler=self.error_handler)
        else:
            self._get_fw_data ()
    def reply_handler (self, result):
        try:
            self._fw_data = json.loads (result)
        except ValueError as e:
            self.error_handler (e)
            return
        debugprint ("Firewall data obtained")
        self._client_reply_handler (self._fw_data)
    def error_handler (self, exc):
        debugprint ("Exception fetching firewall data")
        if self._client_error_handler:
            self._client_error_handler (exc)
        else:
            debugprint ("Exception: %r" % exc)
    def write (self):
        try:
            self._fw.write (json.dumps (self._fw_data[0]))
        except:
            pass
    def _check_any_allowed (self, search):
        (args, filename) = self._get_fw_data ()
        if filename is None: return True
        isect = set (search).intersection (set (args))
        return len (isect) != 0
    def add_service (self, service):
        try:
            (args, filename) = self._fw_data
        except AttributeError:
            (args, filename) = self._get_fw_data ()
        if filename is None: return
        args.append ("--service=" + service)
        self._fw_data = (args, filename)
    def check_ipp_client_allowed (self):
        return self._check_any_allowed (set(["--port=%s:%s" %
                                        (IPP_CLIENT_PORT, IPP_CLIENT_PROTOCOL),
                                             "--service=" + IPP_CLIENT_SERVICE]))
    def check_ipp_server_allowed (self):
        return self._check_any_allowed (set(["--port=%s:%s" %
                                        (IPP_SERVER_PORT, IPP_SERVER_PROTOCOL),
                                             "--service=" + IPP_SERVER_SERVICE]))
    def check_samba_client_allowed (self):
        return self._check_any_allowed (set(["--service=" + SAMBA_CLIENT_SERVICE]))
    def check_mdns_allowed (self):
        return self._check_any_allowed (set(["--port=%s:%s" %
                                                    (MDNS_PORT, MDNS_PROTOCOL),
                                             "--service=" + MDNS_SERVICE]))