HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //lib/python3/dist-packages/pynvim/msgpack_rpc/event_loop/uv.py
"""Event loop implementation that uses pyuv(libuv-python bindings)."""
import sys
from collections import deque

import pyuv

from .base import BaseEventLoop


class UvEventLoop(BaseEventLoop):

    """`BaseEventLoop` subclass that uses `pvuv` as a backend."""

    def _init(self):
        self._loop = pyuv.Loop()
        self._async = pyuv.Async(self._loop, self._on_async)
        self._connection_error = None
        self._error_stream = None
        self._callbacks = deque()

    def _on_connect(self, stream, error):
        self.stop()
        if error:
            msg = 'Cannot connect to {}: {}'.format(
                self._connect_address, pyuv.errno.strerror(error))
            self._connection_error = IOError(msg)
            return
        self._read_stream = self._write_stream = stream

    def _on_read(self, handle, data, error):
        if error or not data:
            msg = pyuv.errno.strerror(error) if error else 'EOF'
            self._on_error(msg)
            return
        if handle == self._error_stream:
            return
        self._on_data(data)

    def _on_write(self, handle, error):
        if error:
            msg = pyuv.errno.strerror(error)
            self._on_error(msg)

    def _on_exit(self, handle, exit_status, term_signal):
        self._on_error('EOF')

    def _disconnected(self, *args):
        raise IOError('Not connected to Nvim')

    def _connect_tcp(self, address, port):
        stream = pyuv.TCP(self._loop)
        self._connect_address = '{}:{}'.format(address, port)
        stream.connect((address, port), self._on_connect)

    def _connect_socket(self, path):
        stream = pyuv.Pipe(self._loop)
        self._connect_address = path
        stream.connect(path, self._on_connect)

    def _connect_stdio(self):
        self._read_stream = pyuv.Pipe(self._loop)
        self._read_stream.open(sys.stdin.fileno())
        self._write_stream = pyuv.Pipe(self._loop)
        self._write_stream.open(sys.stdout.fileno())

    def _connect_child(self, argv):
        self._write_stream = pyuv.Pipe(self._loop)
        self._read_stream = pyuv.Pipe(self._loop)
        self._error_stream = pyuv.Pipe(self._loop)
        stdin = pyuv.StdIO(self._write_stream,
                           flags=pyuv.UV_CREATE_PIPE + pyuv.UV_READABLE_PIPE)
        stdout = pyuv.StdIO(self._read_stream,
                            flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE)
        stderr = pyuv.StdIO(self._error_stream,
                            flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE)
        pyuv.Process.spawn(self._loop,
                           args=argv,
                           exit_callback=self._on_exit,
                           flags=pyuv.UV_PROCESS_WINDOWS_HIDE,
                           stdio=(stdin, stdout, stderr,))
        self._error_stream.start_read(self._on_read)

    def _start_reading(self):
        if self._transport_type in ['tcp', 'socket']:
            self._loop.run()
            if self._connection_error:
                self.run = self.send = self._disconnected
                raise self._connection_error
        self._read_stream.start_read(self._on_read)

    def _send(self, data):
        self._write_stream.write(data, self._on_write)

    def _run(self):
        self._loop.run(pyuv.UV_RUN_DEFAULT)

    def _stop(self):
        self._loop.stop()

    def _close(self):
        pass

    def _threadsafe_call(self, fn):
        self._callbacks.append(fn)
        self._async.send()

    def _on_async(self, handle):
        while self._callbacks:
            self._callbacks.popleft()()

    def _setup_signals(self, signals):
        self._signal_handles = []

        def handler(h, signum):
            self._on_signal(signum)

        for signum in signals:
            handle = pyuv.Signal(self._loop)
            handle.start(handler, signum)
            self._signal_handles.append(handle)

    def _teardown_signals(self):
        for handle in self._signal_handles:
            handle.stop()