HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //home/ubuntu/neovim/src/nvim/event/rstream.c
#include <assert.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <uv.h>

#include "nvim/event/multiqueue.h"
#include "nvim/event/rstream.h"
#include "nvim/event/stream.h"
#include "nvim/log.h"
#include "nvim/macros_defs.h"
#include "nvim/main.h"
#include "nvim/os/os_defs.h"
#include "nvim/types_defs.h"

#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/rstream.c.generated.h"
#endif

void rstream_init_fd(Loop *loop, RStream *stream, int fd)
  FUNC_ATTR_NONNULL_ARG(1, 2)
{
  stream_init(loop, &stream->s, fd, NULL);
  rstream_init(stream);
}

void rstream_init_stream(RStream *stream, uv_stream_t *uvstream)
  FUNC_ATTR_NONNULL_ARG(1, 2)
{
  stream_init(NULL, &stream->s, -1, uvstream);
  rstream_init(stream);
}

void rstream_init(RStream *stream)
  FUNC_ATTR_NONNULL_ARG(1)
{
  stream->fpos = 0;
  stream->read_cb = NULL;
  stream->num_bytes = 0;
  stream->buffer = alloc_block();
  stream->read_pos = stream->write_pos = stream->buffer;
}

void rstream_start_inner(RStream *stream)
  FUNC_ATTR_NONNULL_ARG(1)
{
  if (stream->s.uvstream) {
    uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
  } else {
    uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
  }
}

/// Starts watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
void rstream_start(RStream *stream, stream_read_cb cb, void *data)
  FUNC_ATTR_NONNULL_ARG(1)
{
  stream->read_cb = cb;
  stream->s.cb_data = data;
  stream->want_read = true;
  if (!stream->paused_full) {
    rstream_start_inner(stream);
  }
}

/// Stops watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
void rstream_stop_inner(RStream *stream)
  FUNC_ATTR_NONNULL_ALL
{
  if (stream->s.uvstream) {
    uv_read_stop(stream->s.uvstream);
  } else {
    uv_idle_stop(&stream->s.uv.idle);
  }
}

/// Stops watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
void rstream_stop(RStream *stream)
  FUNC_ATTR_NONNULL_ALL
{
  rstream_stop_inner(stream);
  stream->want_read = false;
}

// Callbacks used by libuv

/// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
  RStream *stream = handle->data;
  buf->base = stream->write_pos;
  // `uv_buf_t.len` happens to have different size on Windows (as a treat)
  buf->len = UV_BUF_LEN(rstream_space(stream));
}

/// Callback invoked by libuv after it copies the data into the buffer provided
/// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
/// 0-length buffer.
static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{
  RStream *stream = uvstream->data;

  if (cnt <= 0) {
    // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
    // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
    //
    // We don't need to do anything with the buffer because the next call
    // to `alloc_cb` will return the same unused pointer (`rbuffer_produced`
    // won't be called)
    if (cnt == UV_ENOBUFS || cnt == 0) {
      return;
    } else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
      // The TTY driver might signal EOF without closing the stream
      invoke_read_cb(stream, true);
    } else {
      DLOG("closing Stream (%p): %s (%s)", (void *)stream,
           uv_err_name((int)cnt), os_strerror((int)cnt));
      // Read error or EOF, either way stop the stream and invoke the callback
      // with eof == true
      uv_read_stop(uvstream);
      invoke_read_cb(stream, true);
    }
    return;
  }

  // at this point we're sure that cnt is positive, no error occurred
  size_t nread = (size_t)cnt;
  stream->num_bytes += nread;
  stream->write_pos += cnt;
  invoke_read_cb(stream, false);
}

static size_t rstream_space(RStream *stream)
{
  return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos);
}

/// Called by the by the 'idle' handle to emulate a reading event
///
/// Idle callbacks are invoked once per event loop:
///  - to perform some very low priority activity.
///  - to keep the loop "alive" (so there is always an event to process)
static void fread_idle_cb(uv_idle_t *handle)
{
  uv_fs_t req;
  RStream *stream = handle->data;

  stream->uvbuf.base = stream->write_pos;
  // `uv_buf_t.len` happens to have different size on Windows.
  stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream));

  // Synchronous read
  uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->fpos, NULL);

  uv_fs_req_cleanup(&req);

  if (req.result <= 0) {
    uv_idle_stop(&stream->s.uv.idle);
    invoke_read_cb(stream, true);
    return;
  }

  // no errors (req.result (ssize_t) is positive), it's safe to use.
  stream->write_pos += req.result;
  stream->fpos += req.result;
  invoke_read_cb(stream, false);
}

static void read_event(void **argv)
{
  RStream *stream = argv[0];
  stream->pending_read = false;
  if (stream->read_cb) {
    size_t available = rstream_available(stream);
    size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data,
                                      stream->did_eof);
    assert(consumed <= available);
    rstream_consume(stream, consumed);
  }
  stream->s.pending_reqs--;
  if (stream->s.closed && !stream->s.pending_reqs) {
    stream_close_handle(&stream->s, true);
  }
}

size_t rstream_available(RStream *stream)
{
  return (size_t)(stream->write_pos - stream->read_pos);
}

void rstream_consume(RStream *stream, size_t consumed)
{
  stream->read_pos += consumed;
  size_t remaining = (size_t)(stream->write_pos - stream->read_pos);
  if (remaining > 0 && stream->read_pos > stream->buffer) {
    memmove(stream->buffer, stream->read_pos, remaining);
    stream->read_pos = stream->buffer;
    stream->write_pos = stream->buffer + remaining;
  } else if (remaining == 0) {
    stream->read_pos = stream->write_pos = stream->buffer;
  }

  if (stream->want_read && stream->paused_full && rstream_space(stream)) {
    assert(stream->read_cb);
    stream->paused_full = false;
    rstream_start_inner(stream);
  }
}

static void invoke_read_cb(RStream *stream, bool eof)
{
  stream->did_eof |= eof;

  if (!rstream_space(stream)) {
    rstream_stop_inner(stream);
    stream->paused_full = true;
  }

  // we cannot use pending_reqs as a socket can have both pending reads and writes
  if (stream->pending_read) {
    return;
  }

  // Don't let the stream be closed before the event is processed.
  stream->s.pending_reqs++;
  stream->pending_read = true;
  CREATE_EVENT(stream->s.events, read_event, stream);
}

void rstream_may_close(RStream *stream)
{
  stream_may_close(&stream->s, true);
}