File: /var/www/vhost/disk-apps/pwa.sports-crowd.com/node_modules/undici/lib/api/readable.js
// Ported from https://github.com/nodejs/undici/pull/907
'use strict'
const assert = require('node:assert')
const { Readable } = require('node:stream')
const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors')
const util = require('../core/util')
const { ReadableStreamFrom } = require('../core/util')
const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')
const kAbort = Symbol('kAbort')
const kContentType = Symbol('kContentType')
const kContentLength = Symbol('kContentLength')
const noop = () => {}
class BodyReadable extends Readable {
  constructor ({
    resume,
    abort,
    contentType = '',
    contentLength,
    highWaterMark = 64 * 1024 // Same as nodejs fs streams.
  }) {
    super({
      autoDestroy: true,
      read: resume,
      highWaterMark
    })
    this._readableState.dataEmitted = false
    this[kAbort] = abort
    this[kConsume] = null
    this[kBody] = null
    this[kContentType] = contentType
    this[kContentLength] = contentLength
    // Is stream being consumed through Readable API?
    // This is an optimization so that we avoid checking
    // for 'data' and 'readable' listeners in the hot path
    // inside push().
    this[kReading] = false
  }
  destroy (err) {
    if (!err && !this._readableState.endEmitted) {
      err = new RequestAbortedError()
    }
    if (err) {
      this[kAbort]()
    }
    return super.destroy(err)
  }
  _destroy (err, callback) {
    // Workaround for Node "bug". If the stream is destroyed in same
    // tick as it is created, then a user who is waiting for a
    // promise (i.e micro tick) for installing a 'error' listener will
    // never get a chance and will always encounter an unhandled exception.
    if (!this[kReading]) {
      setImmediate(() => {
        callback(err)
      })
    } else {
      callback(err)
    }
  }
  on (ev, ...args) {
    if (ev === 'data' || ev === 'readable') {
      this[kReading] = true
    }
    return super.on(ev, ...args)
  }
  addListener (ev, ...args) {
    return this.on(ev, ...args)
  }
  off (ev, ...args) {
    const ret = super.off(ev, ...args)
    if (ev === 'data' || ev === 'readable') {
      this[kReading] = (
        this.listenerCount('data') > 0 ||
        this.listenerCount('readable') > 0
      )
    }
    return ret
  }
  removeListener (ev, ...args) {
    return this.off(ev, ...args)
  }
  push (chunk) {
    if (this[kConsume] && chunk !== null) {
      consumePush(this[kConsume], chunk)
      return this[kReading] ? super.push(chunk) : true
    }
    return super.push(chunk)
  }
  // https://fetch.spec.whatwg.org/#dom-body-text
  async text () {
    return consume(this, 'text')
  }
  // https://fetch.spec.whatwg.org/#dom-body-json
  async json () {
    return consume(this, 'json')
  }
  // https://fetch.spec.whatwg.org/#dom-body-blob
  async blob () {
    return consume(this, 'blob')
  }
  // https://fetch.spec.whatwg.org/#dom-body-arraybuffer
  async arrayBuffer () {
    return consume(this, 'arrayBuffer')
  }
  // https://fetch.spec.whatwg.org/#dom-body-formdata
  async formData () {
    // TODO: Implement.
    throw new NotSupportedError()
  }
  // https://fetch.spec.whatwg.org/#dom-body-bodyused
  get bodyUsed () {
    return util.isDisturbed(this)
  }
  // https://fetch.spec.whatwg.org/#dom-body-body
  get body () {
    if (!this[kBody]) {
      this[kBody] = ReadableStreamFrom(this)
      if (this[kConsume]) {
        // TODO: Is this the best way to force a lock?
        this[kBody].getReader() // Ensure stream is locked.
        assert(this[kBody].locked)
      }
    }
    return this[kBody]
  }
  async dump (opts) {
    let limit = Number.isFinite(opts?.limit) ? opts.limit : 128 * 1024
    const signal = opts?.signal
    if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) {
      throw new InvalidArgumentError('signal must be an AbortSignal')
    }
    signal?.throwIfAborted()
    if (this._readableState.closeEmitted) {
      return null
    }
    return await new Promise((resolve, reject) => {
      if (this[kContentLength] > limit) {
        this.destroy(new AbortError())
      }
      const onAbort = () => {
        this.destroy(signal.reason ?? new AbortError())
      }
      signal?.addEventListener('abort', onAbort)
      this
        .on('close', function () {
          signal?.removeEventListener('abort', onAbort)
          if (signal?.aborted) {
            reject(signal.reason ?? new AbortError())
          } else {
            resolve(null)
          }
        })
        .on('error', noop)
        .on('data', function (chunk) {
          limit -= chunk.length
          if (limit <= 0) {
            this.destroy()
          }
        })
        .resume()
    })
  }
}
// https://streams.spec.whatwg.org/#readablestream-locked
function isLocked (self) {
  // Consume is an implicit lock.
  return (self[kBody] && self[kBody].locked === true) || self[kConsume]
}
// https://fetch.spec.whatwg.org/#body-unusable
function isUnusable (self) {
  return util.isDisturbed(self) || isLocked(self)
}
async function consume (stream, type) {
  assert(!stream[kConsume])
  return new Promise((resolve, reject) => {
    if (isUnusable(stream)) {
      const rState = stream._readableState
      if (rState.destroyed && rState.closeEmitted === false) {
        stream
          .on('error', err => {
            reject(err)
          })
          .on('close', () => {
            reject(new TypeError('unusable'))
          })
      } else {
        reject(rState.errored ?? new TypeError('unusable'))
      }
    } else {
      queueMicrotask(() => {
        stream[kConsume] = {
          type,
          stream,
          resolve,
          reject,
          length: 0,
          body: []
        }
        stream
          .on('error', function (err) {
            consumeFinish(this[kConsume], err)
          })
          .on('close', function () {
            if (this[kConsume].body !== null) {
              consumeFinish(this[kConsume], new RequestAbortedError())
            }
          })
        consumeStart(stream[kConsume])
      })
    }
  })
}
function consumeStart (consume) {
  if (consume.body === null) {
    return
  }
  const { _readableState: state } = consume.stream
  if (state.bufferIndex) {
    const start = state.bufferIndex
    const end = state.buffer.length
    for (let n = start; n < end; n++) {
      consumePush(consume, state.buffer[n])
    }
  } else {
    for (const chunk of state.buffer) {
      consumePush(consume, chunk)
    }
  }
  if (state.endEmitted) {
    consumeEnd(this[kConsume])
  } else {
    consume.stream.on('end', function () {
      consumeEnd(this[kConsume])
    })
  }
  consume.stream.resume()
  while (consume.stream.read() != null) {
    // Loop
  }
}
/**
 * @param {Buffer[]} chunks
 * @param {number} length
 */
function chunksDecode (chunks, length) {
  if (chunks.length === 0 || length === 0) {
    return ''
  }
  const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length)
  const bufferLength = buffer.length
  // Skip BOM.
  const start =
    bufferLength > 2 &&
    buffer[0] === 0xef &&
    buffer[1] === 0xbb &&
    buffer[2] === 0xbf
      ? 3
      : 0
  return buffer.utf8Slice(start, bufferLength)
}
function consumeEnd (consume) {
  const { type, body, resolve, stream, length } = consume
  try {
    if (type === 'text') {
      resolve(chunksDecode(body, length))
    } else if (type === 'json') {
      resolve(JSON.parse(chunksDecode(body, length)))
    } else if (type === 'arrayBuffer') {
      const dst = new Uint8Array(length)
      let pos = 0
      for (const buf of body) {
        dst.set(buf, pos)
        pos += buf.byteLength
      }
      resolve(dst.buffer)
    } else if (type === 'blob') {
      resolve(new Blob(body, { type: stream[kContentType] }))
    }
    consumeFinish(consume)
  } catch (err) {
    stream.destroy(err)
  }
}
function consumePush (consume, chunk) {
  consume.length += chunk.length
  consume.body.push(chunk)
}
function consumeFinish (consume, err) {
  if (consume.body === null) {
    return
  }
  if (err) {
    consume.reject(err)
  } else {
    consume.resolve()
  }
  consume.type = null
  consume.stream = null
  consume.resolve = null
  consume.reject = null
  consume.length = 0
  consume.body = null
}
module.exports = { Readable: BodyReadable, chunksDecode }