HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/landscape/lib/twisted_util.py
import io

from twisted.internet.defer import DeferredList, Deferred
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.process import Process, ProcessReader
from twisted.internet import reactor
from twisted.python.failure import Failure
from twisted.python.compat import itervalues, networkString

from landscape.lib.encoding import encode_values


class SignalError(Exception):
    """An error if the process was terminated by a signal."""


def gather_results(deferreds, consume_errors=False):
    d = DeferredList(deferreds, fireOnOneErrback=1,
                     consumeErrors=consume_errors)
    d.addCallback(lambda r: [x[1] for x in r])
    d.addErrback(lambda f: f.value.subFailure)
    return d


class AllOutputProcessProtocol(ProcessProtocol):
    """A process protocol for getting stdout, stderr and exit code."""

    def __init__(self, deferred, stdin=None, line_received=None):
        self.deferred = deferred
        self.outBuf = io.BytesIO()
        self.errBuf = io.BytesIO()
        self.errReceived = self.errBuf.write
        self.stdin = stdin
        self.line_received = line_received
        self._partial_line = b""

    def connectionMade(self):
        if self.stdin is not None:
            self.transport.write(networkString(self.stdin))
            self.transport.closeStdin()

    def outReceived(self, data):
        self.outBuf.write(data)

        if self.line_received is None:
            return

        # data may contain more than one line, so we split the output and save
        # the last line. If it's an empty string nothing happens, otherwise it
        # will be returned once complete
        lines = data.split(b"\n")
        lines[0] = self._partial_line + lines[0]
        self._partial_line = lines.pop()

        for line in lines:
            self.line_received(line)

    def processEnded(self, reason):
        if self._partial_line:
            self.line_received(self._partial_line)
            self._partial_line = b""
        out = self.outBuf.getvalue()
        err = self.errBuf.getvalue()
        e = reason.value
        code = e.exitCode
        if e.signal:
            failure = Failure(SignalError(out, err, e.signal))
            self.deferred.errback(failure)
        else:
            self.deferred.callback((out, err, code))


def spawn_process(executable, args=(), env={}, path=None, uid=None, gid=None,
                  usePTY=False, wait_pipes=True, line_received=None,
                  stdin=None):
    """
    Spawn a process using Twisted reactor.

    Return a deferred which will be called with process stdout, stderr and exit
    code.

    @param wait_pipes: if set to False, don't wait for stdin/stdout pipes to
        close when process ends.
    @param line_received: an optional callback called with every line of
        output from the process as parameter.

    @note: compared to reactor.spawnProcess, this version does NOT require the
    executable name as first element of args.
    """

    list_args = [executable]
    list_args.extend(args)

    result = Deferred()
    protocol = AllOutputProcessProtocol(result, stdin=stdin,
                                        line_received=line_received)
    env = encode_values(env)
    process = reactor.spawnProcess(protocol, executable, args=list_args,
                                   env=env, path=path, uid=uid, gid=gid,
                                   usePTY=usePTY)

    if not wait_pipes:

        def maybeCallProcessEnded():
            """A less strict version of Process.maybeCallProcessEnded.

            This behaves exactly like the original method, but in case the
            process has ended already and sent us a SIGCHLD, it doesn't wait
            for the stdin/stdout pipes to close, because the child process
            itself might have passed them to its own child processes.

            @note: Twisted 8.2 now has a processExited hook that could
                be used in place of this workaround.
            """
            if process.pipes and not process.pid:
                for pipe in itervalues(process.pipes):
                    if isinstance(pipe, ProcessReader):
                        # Read whatever is left
                        pipe.doRead()
                    pipe.stopReading()
                process.pipes = {}
            Process.maybeCallProcessEnded(process)

        process.maybeCallProcessEnded = maybeCallProcessEnded

    return result