HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //lib/python3/dist-packages/pynvim/msgpack_rpc/event_loop/base.py
"""Common code for event loop implementations."""
import logging
import signal
import threading


logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warning,)


# When signals are restored, the event loop library may reset SIGINT to SIG_DFL
# which exits the program. To be able to restore the python interpreter to it's
# default state, we keep a reference to the default handler
default_int_handler = signal.getsignal(signal.SIGINT)
main_thread = threading.current_thread()


class BaseEventLoop(object):

    """Abstract base class for all event loops.

    Event loops act as the bottom layer for Nvim sessions created by this
    library. They hide system/transport details behind a simple interface for
    reading/writing bytes to the connected Nvim instance.

    This class exposes public methods for interacting with the underlying
    event loop and delegates implementation-specific work to the following
    methods, which subclasses are expected to implement:

    - `_init()`: Implementation-specific initialization
    - `_connect_tcp(address, port)`: connect to Nvim using tcp/ip
    - `_connect_socket(path)`: Same as tcp, but use a UNIX domain socket or
      named pipe.
    - `_connect_stdio()`: Use stdin/stdout as the connection to Nvim
    - `_connect_child(argv)`: Use the argument vector `argv` to spawn an
      embedded Nvim that has its stdin/stdout connected to the event loop.
    - `_start_reading()`: Called after any of _connect_* methods. Can be used
      to perform any post-connection setup or validation.
    - `_send(data)`: Send `data`(byte array) to Nvim. The data is only
    - `_run()`: Runs the event loop until stopped or the connection is closed.
      calling the following methods when some event happens:
      actually sent when the event loop is running.
      - `_on_data(data)`: When Nvim sends some data.
      - `_on_signal(signum)`: When a signal is received.
      - `_on_error(message)`: When a non-recoverable error occurs(eg:
        connection lost)
    - `_stop()`: Stop the event loop
    - `_interrupt(data)`: Like `stop()`, but may be called from other threads
      this.
    - `_setup_signals(signals)`: Add implementation-specific listeners for
      for `signals`, which is a list of OS-specific signal numbers.
    - `_teardown_signals()`: Removes signal listeners set by `_setup_signals`
    """

    def __init__(self, transport_type, *args):
        """Initialize and connect the event loop instance.

        The only arguments are the transport type and transport-specific
        configuration, like this:

        >>> BaseEventLoop('tcp', '127.0.0.1', 7450)
        Traceback (most recent call last):
            ...
        AttributeError: 'BaseEventLoop' object has no attribute '_init'
        >>> BaseEventLoop('socket', '/tmp/nvim-socket')
        Traceback (most recent call last):
            ...
        AttributeError: 'BaseEventLoop' object has no attribute '_init'
        >>> BaseEventLoop('stdio')
        Traceback (most recent call last):
            ...
        AttributeError: 'BaseEventLoop' object has no attribute '_init'
        >>> BaseEventLoop('child',
                ['nvim', '--embed', '--headless', '-u', 'NONE'])
        Traceback (most recent call last):
            ...
        AttributeError: 'BaseEventLoop' object has no attribute '_init'

        This calls the implementation-specific initialization
        `_init`, one of the `_connect_*` methods(based on `transport_type`)
        and `_start_reading()`
        """
        self._transport_type = transport_type
        self._signames = dict((k, v) for v, k in signal.__dict__.items()
                              if v.startswith('SIG'))
        self._on_data = None
        self._error = None
        self._init()
        try:
            getattr(self, '_connect_{}'.format(transport_type))(*args)
        except Exception as e:
            self.close()
            raise e
        self._start_reading()

    def connect_tcp(self, address, port):
        """Connect to tcp/ip `address`:`port`. Delegated to `_connect_tcp`."""
        pass # replaces next logging statement
        #info('Connecting to TCP address: %s:%d', address, port)
        self._connect_tcp(address, port)

    def connect_socket(self, path):
        """Connect to socket at `path`. Delegated to `_connect_socket`."""
        pass # replaces next logging statement
        #info('Connecting to %s', path)
        self._connect_socket(path)

    def connect_stdio(self):
        """Connect using stdin/stdout. Delegated to `_connect_stdio`."""
        pass # replaces next logging statement
        #info('Preparing stdin/stdout for streaming data')
        self._connect_stdio()

    def connect_child(self, argv):
        """Connect a new Nvim instance. Delegated to `_connect_child`."""
        pass # replaces next logging statement
        #info('Spawning a new nvim instance')
        self._connect_child(argv)

    def send(self, data):
        """Queue `data` for sending to Nvim."""
        pass # replaces next logging statement
        #debug("Sending '%s'", data)
        self._send(data)

    def threadsafe_call(self, fn):
        """Call a function in the event loop thread.

        This is the only safe way to interact with a session from other
        threads.
        """
        self._threadsafe_call(fn)

    def run(self, data_cb):
        """Run the event loop."""
        if self._error:
            err = self._error
            if isinstance(self._error, KeyboardInterrupt):
                # KeyboardInterrupt is not destructive(it may be used in
                # the REPL).
                # After throwing KeyboardInterrupt, cleanup the _error field
                # so the loop may be started again
                self._error = None
            raise err
        self._on_data = data_cb
        if threading.current_thread() == main_thread:
            self._setup_signals([signal.SIGINT, signal.SIGTERM])
        pass # replaces next logging statement
        #debug('Entering event loop')
        self._run()
        pass # replaces next logging statement
        #debug('Exited event loop')
        if threading.current_thread() == main_thread:
            self._teardown_signals()
            signal.signal(signal.SIGINT, default_int_handler)
        self._on_data = None

    def stop(self):
        """Stop the event loop."""
        self._stop()
        pass # replaces next logging statement
        #debug('Stopped event loop')

    def close(self):
        """Stop the event loop."""
        self._close()
        pass # replaces next logging statement
        #debug('Closed event loop')

    def _on_signal(self, signum):
        msg = 'Received {}'.format(self._signames[signum])
        pass # replaces next logging statement
        #debug(msg)
        if signum == signal.SIGINT and self._transport_type == 'stdio':
            # When the transport is stdio, we are probably running as a Nvim
            # child process. In that case, we don't want to be killed by
            # ctrl+C
            return
        cls = Exception
        if signum == signal.SIGINT:
            cls = KeyboardInterrupt
        self._error = cls(msg)
        self.stop()

    def _on_error(self, error):
        pass # replaces next logging statement
        #debug(error)
        self._error = IOError(error)
        self.stop()

    def _on_interrupt(self):
        self.stop()