File: //proc/self/root/lib/python3/dist-packages/twisted/application/runner/test/test_runner.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.application.runner._runner}.
"""
from signal import SIGTERM
from io import BytesIO
import errno
from attr import attrib, attrs, Factory
from twisted.logger import (
    LogLevel, LogPublisher, LogBeginner,
    FileLogObserver, FilteringLogObserver, LogLevelFilterPredicate,
)
from twisted.test.proto_helpers import MemoryReactor
from ...runner import _runner
from .._exit import ExitStatus
from .._pidfile import PIDFile, NonePIDFile
from .._runner import Runner
from .test_pidfile import DummyFilePath
import twisted.trial.unittest
class RunnerTests(twisted.trial.unittest.TestCase):
    """
    Tests for L{Runner}.
    """
    def setUp(self):
        # Patch exit and kill so we can capture usage and prevent actual exits
        # and kills.
        self.exit = DummyExit()
        self.kill = DummyKill()
        self.patch(_runner, "exit", self.exit)
        self.patch(_runner, "kill", self.kill)
        # Patch getpid so we get a known result
        self.pid = 1337
        self.pidFileContent = u"{}\n".format(self.pid).encode("utf-8")
        # Patch globalLogBeginner so that we aren't trying to install multiple
        # global log observers.
        self.stdout = BytesIO()
        self.stderr = BytesIO()
        self.stdio = DummyStandardIO(self.stdout, self.stderr)
        self.warnings = DummyWarningsModule()
        self.globalLogPublisher = LogPublisher()
        self.globalLogBeginner = LogBeginner(
            self.globalLogPublisher,
            self.stdio.stderr, self.stdio,
            self.warnings,
        )
        self.patch(_runner, "stderr", self.stderr)
        self.patch(_runner, "globalLogBeginner", self.globalLogBeginner)
    def test_runInOrder(self):
        """
        L{Runner.run} calls the expected methods in order.
        """
        runner = DummyRunner(reactor=MemoryReactor())
        runner.run()
        self.assertEqual(
            runner.calledMethods,
            [
                "killIfRequested",
                "startLogging",
                "startReactor",
                "reactorExited",
            ]
        )
    def test_runUsesPIDFile(self):
        """
        L{Runner.run} uses the provided PID file.
        """
        pidFile = DummyPIDFile()
        runner = Runner(reactor=MemoryReactor(), pidFile=pidFile)
        self.assertFalse(pidFile.entered)
        self.assertFalse(pidFile.exited)
        runner.run()
        self.assertTrue(pidFile.entered)
        self.assertTrue(pidFile.exited)
    def test_runAlreadyRunning(self):
        """
        L{Runner.run} exits with L{ExitStatus.EX_USAGE} and the expected
        message if a process is already running that corresponds to the given
        PID file.
        """
        pidFile = PIDFile(DummyFilePath(self.pidFileContent))
        pidFile.isRunning = lambda: True
        runner = Runner(reactor=MemoryReactor(), pidFile=pidFile)
        runner.run()
        self.assertEqual(self.exit.status, ExitStatus.EX_CONFIG)
        self.assertEqual(self.exit.message, "Already running.")
    def test_killNotRequested(self):
        """
        L{Runner.killIfRequested} when C{kill} is false doesn't exit and
        doesn't indiscriminately murder anyone.
        """
        runner = Runner(reactor=MemoryReactor())
        runner.killIfRequested()
        self.assertEqual(self.kill.calls, [])
        self.assertFalse(self.exit.exited)
    def test_killRequestedWithoutPIDFile(self):
        """
        L{Runner.killIfRequested} when C{kill} is true but C{pidFile} is
        L{nonePIDFile} exits with L{ExitStatus.EX_USAGE} and the expected
        message; and also doesn't indiscriminately murder anyone.
        """
        runner = Runner(reactor=MemoryReactor(), kill=True)
        runner.killIfRequested()
        self.assertEqual(self.kill.calls, [])
        self.assertEqual(self.exit.status, ExitStatus.EX_USAGE)
        self.assertEqual(self.exit.message, "No PID file specified.")
    def test_killRequestedWithPIDFile(self):
        """
        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}
        performs a targeted killing of the appropriate process.
        """
        pidFile = PIDFile(DummyFilePath(self.pidFileContent))
        runner = Runner(reactor=MemoryReactor(), kill=True, pidFile=pidFile)
        runner.killIfRequested()
        self.assertEqual(self.kill.calls, [(self.pid, SIGTERM)])
        self.assertEqual(self.exit.status, ExitStatus.EX_OK)
        self.assertIdentical(self.exit.message, None)
    def test_killRequestedWithPIDFileCantRead(self):
        """
        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}
        that it can't read exits with L{ExitStatus.EX_IOERR}.
        """
        pidFile = PIDFile(DummyFilePath(None))
        def read():
            raise OSError(errno.EACCES, "Permission denied")
        pidFile.read = read
        runner = Runner(reactor=MemoryReactor(), kill=True, pidFile=pidFile)
        runner.killIfRequested()
        self.assertEqual(self.exit.status, ExitStatus.EX_IOERR)
        self.assertEqual(self.exit.message, "Unable to read PID file.")
    def test_killRequestedWithPIDFileEmpty(self):
        """
        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}
        containing no value exits with L{ExitStatus.EX_DATAERR}.
        """
        pidFile = PIDFile(DummyFilePath(b""))
        runner = Runner(reactor=MemoryReactor(), kill=True, pidFile=pidFile)
        runner.killIfRequested()
        self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)
        self.assertEqual(self.exit.message, "Invalid PID file.")
    def test_killRequestedWithPIDFileNotAnInt(self):
        """
        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}
        containing a non-integer value exits with L{ExitStatus.EX_DATAERR}.
        """
        pidFile = PIDFile(DummyFilePath(b"** totally not a number, dude **"))
        runner = Runner(reactor=MemoryReactor(), kill=True, pidFile=pidFile)
        runner.killIfRequested()
        self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)
        self.assertEqual(self.exit.message, "Invalid PID file.")
    def test_startLogging(self):
        """
        L{Runner.startLogging} sets up a filtering observer with a log level
        predicate set to the given log level that contains a file observer of
        the given type which writes to the given file.
        """
        logFile = BytesIO()
        # Patch the log beginner so that we don't try to start the already
        # running (started by trial) logging system.
        class LogBeginner(object):
            def beginLoggingTo(self, observers):
                LogBeginner.observers = observers
        self.patch(_runner, "globalLogBeginner", LogBeginner())
        # Patch FilteringLogObserver so we can capture its arguments
        class MockFilteringLogObserver(FilteringLogObserver):
            def __init__(
                self, observer, predicates,
                negativeObserver=lambda event: None
            ):
                MockFilteringLogObserver.observer = observer
                MockFilteringLogObserver.predicates = predicates
                FilteringLogObserver.__init__(
                    self, observer, predicates, negativeObserver
                )
        self.patch(_runner, "FilteringLogObserver", MockFilteringLogObserver)
        # Patch FileLogObserver so we can capture its arguments
        class MockFileLogObserver(FileLogObserver):
            def __init__(self, outFile):
                MockFileLogObserver.outFile = outFile
                FileLogObserver.__init__(self, outFile, str)
        # Start logging
        runner = Runner(
            reactor=MemoryReactor(),
            defaultLogLevel=LogLevel.critical,
            logFile=logFile,
            fileLogObserverFactory=MockFileLogObserver,
        )
        runner.startLogging()
        # Check for a filtering observer
        self.assertEqual(len(LogBeginner.observers), 1)
        self.assertIsInstance(LogBeginner.observers[0], FilteringLogObserver)
        # Check log level predicate with the correct default log level
        self.assertEqual(len(MockFilteringLogObserver.predicates), 1)
        self.assertIsInstance(
            MockFilteringLogObserver.predicates[0],
            LogLevelFilterPredicate
        )
        self.assertIdentical(
            MockFilteringLogObserver.predicates[0].defaultLogLevel,
            LogLevel.critical
        )
        # Check for a file observer attached to the filtering observer
        self.assertIsInstance(
            MockFilteringLogObserver.observer, MockFileLogObserver
        )
        # Check for the file we gave it
        self.assertIdentical(
            MockFilteringLogObserver.observer.outFile, logFile
        )
    def test_startReactorWithReactor(self):
        """
        L{Runner.startReactor} with the C{reactor} argument runs the given
        reactor.
        """
        reactor = MemoryReactor()
        runner = Runner(reactor=reactor)
        runner.startReactor()
        self.assertTrue(reactor.hasRun)
    def test_startReactorWhenRunning(self):
        """
        L{Runner.startReactor} ensures that C{whenRunning} is called with
        C{whenRunningArguments} when the reactor is running.
        """
        self._testHook("whenRunning", "startReactor")
    def test_whenRunningWithArguments(self):
        """
        L{Runner.whenRunning} calls C{whenRunning} with
        C{whenRunningArguments}.
        """
        self._testHook("whenRunning")
    def test_reactorExitedWithArguments(self):
        """
        L{Runner.whenRunning} calls C{reactorExited} with
        C{reactorExitedArguments}.
        """
        self._testHook("reactorExited")
    def _testHook(self, methodName, callerName=None):
        """
        Verify that the named hook is run with the expected arguments as
        specified by the arguments used to create the L{Runner}, when the
        specified caller is invoked.
        @param methodName: The name of the hook to verify.
        @type methodName: L{str}
        @param callerName: The name of the method that is expected to cause the
            hook to be called.
            If C{None}, use the L{Runner} method with the same name as the
            hook.
        @type callerName: L{str}
        """
        if callerName is None:
            callerName = methodName
        arguments = dict(a=object(), b=object(), c=object())
        argumentsSeen = []
        def hook(**arguments):
            argumentsSeen.append(arguments)
        runnerArguments = {
            methodName: hook,
            "{}Arguments".format(methodName): arguments.copy(),
        }
        runner = Runner(reactor=MemoryReactor(), **runnerArguments)
        hookCaller = getattr(runner, callerName)
        hookCaller()
        self.assertEqual(len(argumentsSeen), 1)
        self.assertEqual(argumentsSeen[0], arguments)
@attrs(frozen=True)
class DummyRunner(Runner):
    """
    Stub for L{Runner}.
    Keep track of calls to some methods without actually doing anything.
    """
    calledMethods = attrib(default=Factory(list))
    def killIfRequested(self):
        self.calledMethods.append("killIfRequested")
    def startLogging(self):
        self.calledMethods.append("startLogging")
    def startReactor(self):
        self.calledMethods.append("startReactor")
    def reactorExited(self):
        self.calledMethods.append("reactorExited")
class DummyPIDFile(NonePIDFile):
    """
    Stub for L{PIDFile}.
    Tracks context manager entry/exit without doing anything.
    """
    def __init__(self):
        NonePIDFile.__init__(self)
        self.entered = False
        self.exited  = False
    def __enter__(self):
        self.entered = True
        return self
    def __exit__(self, excType, excValue, traceback):
        self.exited  = True
class DummyExit(object):
    """
    Stub for L{exit} that remembers whether it's been called and, if it has,
    what arguments it was given.
    """
    def __init__(self):
        self.exited = False
    def __call__(self, status, message=None):
        assert not self.exited
        self.status  = status
        self.message = message
        self.exited  = True
class DummyKill(object):
    """
    Stub for L{os.kill} that remembers whether it's been called and, if it has,
    what arguments it was given.
    """
    def __init__(self):
        self.calls = []
    def __call__(self, pid, sig):
        self.calls.append((pid, sig))
class DummyStandardIO(object):
    """
    Stub for L{sys} which provides L{BytesIO} streams as stdout and stderr.
    """
    def __init__(self, stdout, stderr):
        self.stdout = stdout
        self.stderr = stderr
class DummyWarningsModule(object):
    """
    Stub for L{warnings} which provides a C{showwarning} method that is a no-op.
    """
    def showwarning(*args, **kwargs):
        """
        Do nothing.
        @param args: ignored.
        @param kwargs: ignored.
        """