File: /var/www/vhost/disk-apps/pwa.sports-crowd.com/node_modules/undici/lib/dispatcher/client-h2.js
'use strict'
const assert = require('node:assert')
const { pipeline } = require('node:stream')
const util = require('../core/util.js')
const {
  RequestContentLengthMismatchError,
  RequestAbortedError,
  SocketError,
  InformationalError
} = require('../core/errors.js')
const {
  kUrl,
  kReset,
  kClient,
  kRunning,
  kPending,
  kQueue,
  kPendingIdx,
  kRunningIdx,
  kError,
  kSocket,
  kStrictContentLength,
  kOnError,
  kMaxConcurrentStreams,
  kHTTP2Session,
  kResume
} = require('../core/symbols.js')
const kOpenStreams = Symbol('open streams')
// Experimental
let h2ExperimentalWarned = false
/** @type {import('http2')} */
let http2
try {
  http2 = require('node:http2')
} catch {
  // @ts-ignore
  http2 = { constants: {} }
}
const {
  constants: {
    HTTP2_HEADER_AUTHORITY,
    HTTP2_HEADER_METHOD,
    HTTP2_HEADER_PATH,
    HTTP2_HEADER_SCHEME,
    HTTP2_HEADER_CONTENT_LENGTH,
    HTTP2_HEADER_EXPECT,
    HTTP2_HEADER_STATUS
  }
} = http2
function parseH2Headers (headers) {
  const result = []
  for (const [name, value] of Object.entries(headers)) {
    // h2 may concat the header value by array
    // e.g. Set-Cookie
    if (Array.isArray(value)) {
      for (const subvalue of value) {
        // we need to provide each header value of header name
        // because the headers handler expect name-value pair
        result.push(Buffer.from(name), Buffer.from(subvalue))
      }
    } else {
      result.push(Buffer.from(name), Buffer.from(value))
    }
  }
  return result
}
async function connectH2 (client, socket) {
  client[kSocket] = socket
  if (!h2ExperimentalWarned) {
    h2ExperimentalWarned = true
    process.emitWarning('H2 support is experimental, expect them to change at any time.', {
      code: 'UNDICI-H2'
    })
  }
  const session = http2.connect(client[kUrl], {
    createConnection: () => socket,
    peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
  })
  session[kOpenStreams] = 0
  session[kClient] = client
  session[kSocket] = socket
  util.addListener(session, 'error', onHttp2SessionError)
  util.addListener(session, 'frameError', onHttp2FrameError)
  util.addListener(session, 'end', onHttp2SessionEnd)
  util.addListener(session, 'goaway', onHTTP2GoAway)
  util.addListener(session, 'close', function () {
    const { [kClient]: client } = this
    const { [kSocket]: socket } = client
    const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))
    client[kHTTP2Session] = null
    if (client.destroyed) {
      assert(client[kPending] === 0)
      // Fail entire queue.
      const requests = client[kQueue].splice(client[kRunningIdx])
      for (let i = 0; i < requests.length; i++) {
        const request = requests[i]
        util.errorRequest(client, request, err)
      }
    }
  })
  session.unref()
  client[kHTTP2Session] = session
  socket[kHTTP2Session] = session
  util.addListener(socket, 'error', function (err) {
    assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
    this[kError] = err
    this[kClient][kOnError](err)
  })
  util.addListener(socket, 'end', function () {
    util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  })
  util.addListener(socket, 'close', function () {
    const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
    client[kSocket] = null
    if (this[kHTTP2Session] != null) {
      this[kHTTP2Session].destroy(err)
    }
    client[kPendingIdx] = client[kRunningIdx]
    assert(client[kRunning] === 0)
    client.emit('disconnect', client[kUrl], [client], err)
    client[kResume]()
  })
  let closed = false
  socket.on('close', () => {
    closed = true
  })
  return {
    version: 'h2',
    defaultPipelining: Infinity,
    write (...args) {
      // TODO (fix): return
      writeH2(client, ...args)
    },
    resume () {
    },
    destroy (err, callback) {
      if (closed) {
        queueMicrotask(callback)
      } else {
        // Destroying the socket will trigger the session close
        socket.destroy(err).on('close', callback)
      }
    },
    get destroyed () {
      return socket.destroyed
    },
    busy () {
      return false
    }
  }
}
function onHttp2SessionError (err) {
  assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  this[kSocket][kError] = err
  this[kClient][kOnError](err)
}
function onHttp2FrameError (type, code, id) {
  if (id === 0) {
    const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
    this[kSocket][kError] = err
    this[kClient][kOnError](err)
  }
}
function onHttp2SessionEnd () {
  const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
  this.destroy(err)
  util.destroy(this[kSocket], err)
}
/**
 * This is the root cause of #3011
 * We need to handle GOAWAY frames properly, and trigger the session close
 * along with the socket right away
 */
function onHTTP2GoAway (code) {
  const err = new RequestAbortedError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
  // We need to trigger the close cycle right away
  // We need to destroy the session and the socket
  // Requests should be failed with the error after the current one is handled
  this[kSocket][kError] = err
  this[kClient][kOnError](err)
  this.unref()
  util.destroy(this[kSocket], err)
}
// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
function shouldSendContentLength (method) {
  return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
}
function writeH2 (client, request) {
  const session = client[kHTTP2Session]
  const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
  if (upgrade) {
    util.errorRequest(client, request, new Error('Upgrade not supported for H2'))
    return false
  }
  if (request.aborted) {
    return false
  }
  const headers = {}
  for (let n = 0; n < reqHeaders.length; n += 2) {
    const key = reqHeaders[n + 0]
    const val = reqHeaders[n + 1]
    if (Array.isArray(val)) {
      for (let i = 0; i < val.length; i++) {
        if (headers[key]) {
          headers[key] += `,${val[i]}`
        } else {
          headers[key] = val[i]
        }
      }
    } else {
      headers[key] = val
    }
  }
  /** @type {import('node:http2').ClientHttp2Stream} */
  let stream
  const { hostname, port } = client[kUrl]
  headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
  headers[HTTP2_HEADER_METHOD] = method
  const abort = (err) => {
    if (request.aborted || request.completed) {
      return
    }
    err = err || new RequestAbortedError()
    util.errorRequest(client, request, err)
    if (stream != null) {
      util.destroy(stream, err)
    }
    // We do not destroy the socket as we can continue using the session
    // the stream get's destroyed and the session remains to create new streams
    util.destroy(body, err)
  }
  try {
    // We are already connected, streams are pending.
    // We can call on connect, and wait for abort
    request.onConnect(abort)
  } catch (err) {
    util.errorRequest(client, request, err)
  }
  if (method === 'CONNECT') {
    session.ref()
    // We are already connected, streams are pending, first request
    // will create a new stream. We trigger a request to create the stream and wait until
    // `ready` event is triggered
    // We disabled endStream to allow the user to write to the stream
    stream = session.request(headers, { endStream: false, signal })
    if (stream.id && !stream.pending) {
      request.onUpgrade(null, null, stream)
      ++session[kOpenStreams]
    } else {
      stream.once('ready', () => {
        request.onUpgrade(null, null, stream)
        ++session[kOpenStreams]
      })
    }
    stream.once('close', () => {
      session[kOpenStreams] -= 1
      if (session[kOpenStreams] === 0) session.unref()
    })
    return true
  }
  // https://tools.ietf.org/html/rfc7540#section-8.3
  // :path and :scheme headers must be omitted when sending CONNECT
  headers[HTTP2_HEADER_PATH] = path
  headers[HTTP2_HEADER_SCHEME] = 'https'
  // https://tools.ietf.org/html/rfc7231#section-4.3.1
  // https://tools.ietf.org/html/rfc7231#section-4.3.2
  // https://tools.ietf.org/html/rfc7231#section-4.3.5
  // Sending a payload body on a request that does not
  // expect it can cause undefined behavior on some
  // servers and corrupt connection state. Do not
  // re-use the connection for further requests.
  const expectsPayload = (
    method === 'PUT' ||
    method === 'POST' ||
    method === 'PATCH'
  )
  if (body && typeof body.read === 'function') {
    // Try to read EOF in order to get length.
    body.read(0)
  }
  let contentLength = util.bodyLength(body)
  if (contentLength == null) {
    contentLength = request.contentLength
  }
  if (contentLength === 0 || !expectsPayload) {
    // https://tools.ietf.org/html/rfc7230#section-3.3.2
    // A user agent SHOULD NOT send a Content-Length header field when
    // the request message does not contain a payload body and the method
    // semantics do not anticipate such a body.
    contentLength = null
  }
  // https://github.com/nodejs/undici/issues/2046
  // A user agent may send a Content-Length header with 0 value, this should be allowed.
  if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
    if (client[kStrictContentLength]) {
      util.errorRequest(client, request, new RequestContentLengthMismatchError())
      return false
    }
    process.emitWarning(new RequestContentLengthMismatchError())
  }
  if (contentLength != null) {
    assert(body, 'no body must not have content length')
    headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
  }
  session.ref()
  const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
  if (expectContinue) {
    headers[HTTP2_HEADER_EXPECT] = '100-continue'
    stream = session.request(headers, { endStream: shouldEndStream, signal })
    stream.once('continue', writeBodyH2)
  } else {
    stream = session.request(headers, {
      endStream: shouldEndStream,
      signal
    })
    writeBodyH2()
  }
  // Increment counter as we have new streams open
  ++session[kOpenStreams]
  stream.once('response', headers => {
    const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
    request.onResponseStarted()
    // Due to the stream nature, it is possible we face a race condition
    // where the stream has been assigned, but the request has been aborted
    // the request remains in-flight and headers hasn't been received yet
    // for those scenarios, best effort is to destroy the stream immediately
    // as there's no value to keep it open.
    if (request.aborted) {
      const err = new RequestAbortedError()
      util.errorRequest(client, request, err)
      util.destroy(stream, err)
      return
    }
    if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
      stream.pause()
    }
    stream.on('data', (chunk) => {
      if (request.onData(chunk) === false) {
        stream.pause()
      }
    })
  })
  stream.once('end', () => {
    // When state is null, it means we haven't consumed body and the stream still do not have
    // a state.
    // Present specially when using pipeline or stream
    if (stream.state?.state == null || stream.state.state < 6) {
      request.onComplete([])
      return
    }
    // Stream is closed or half-closed-remote (6), decrement counter and cleanup
    // It does not have sense to continue working with the stream as we do not
    // have yet RST_STREAM support on client-side
    if (session[kOpenStreams] === 0) {
      session.unref()
    }
    abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
  })
  stream.once('close', () => {
    session[kOpenStreams] -= 1
    if (session[kOpenStreams] === 0) {
      session.unref()
    }
  })
  stream.once('error', function (err) {
    abort(err)
  })
  stream.once('frameError', (type, code) => {
    abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
  })
  // stream.on('aborted', () => {
  //   // TODO(HTTP/2): Support aborted
  // })
  // stream.on('timeout', () => {
  //   // TODO(HTTP/2): Support timeout
  // })
  // stream.on('push', headers => {
  //   // TODO(HTTP/2): Support push
  // })
  // stream.on('trailers', headers => {
  //   // TODO(HTTP/2): Support trailers
  // })
  return true
  function writeBodyH2 () {
    /* istanbul ignore else: assertion */
    if (!body || contentLength === 0) {
      writeBuffer(
        abort,
        stream,
        null,
        client,
        request,
        client[kSocket],
        contentLength,
        expectsPayload
      )
    } else if (util.isBuffer(body)) {
      writeBuffer(
        abort,
        stream,
        body,
        client,
        request,
        client[kSocket],
        contentLength,
        expectsPayload
      )
    } else if (util.isBlobLike(body)) {
      if (typeof body.stream === 'function') {
        writeIterable(
          abort,
          stream,
          body.stream(),
          client,
          request,
          client[kSocket],
          contentLength,
          expectsPayload
        )
      } else {
        writeBlob(
          abort,
          stream,
          body,
          client,
          request,
          client[kSocket],
          contentLength,
          expectsPayload
        )
      }
    } else if (util.isStream(body)) {
      writeStream(
        abort,
        client[kSocket],
        expectsPayload,
        stream,
        body,
        client,
        request,
        contentLength
      )
    } else if (util.isIterable(body)) {
      writeIterable(
        abort,
        stream,
        body,
        client,
        request,
        client[kSocket],
        contentLength,
        expectsPayload
      )
    } else {
      assert(false)
    }
  }
}
function writeBuffer (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  try {
    if (body != null && util.isBuffer(body)) {
      assert(contentLength === body.byteLength, 'buffer body must have content length')
      h2stream.cork()
      h2stream.write(body)
      h2stream.uncork()
      h2stream.end()
      request.onBodySent(body)
    }
    if (!expectsPayload) {
      socket[kReset] = true
    }
    request.onRequestSent()
    client[kResume]()
  } catch (error) {
    abort(error)
  }
}
function writeStream (abort, socket, expectsPayload, h2stream, body, client, request, contentLength) {
  assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  // For HTTP/2, is enough to pipe the stream
  const pipe = pipeline(
    body,
    h2stream,
    (err) => {
      if (err) {
        util.destroy(pipe, err)
        abort(err)
      } else {
        util.removeAllListeners(pipe)
        request.onRequestSent()
        if (!expectsPayload) {
          socket[kReset] = true
        }
        client[kResume]()
      }
    }
  )
  util.addListener(pipe, 'data', onPipeData)
  function onPipeData (chunk) {
    request.onBodySent(chunk)
  }
}
async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  assert(contentLength === body.size, 'blob body must have content length')
  try {
    if (contentLength != null && contentLength !== body.size) {
      throw new RequestContentLengthMismatchError()
    }
    const buffer = Buffer.from(await body.arrayBuffer())
    h2stream.cork()
    h2stream.write(buffer)
    h2stream.uncork()
    h2stream.end()
    request.onBodySent(buffer)
    request.onRequestSent()
    if (!expectsPayload) {
      socket[kReset] = true
    }
    client[kResume]()
  } catch (err) {
    abort(err)
  }
}
async function writeIterable (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  let callback = null
  function onDrain () {
    if (callback) {
      const cb = callback
      callback = null
      cb()
    }
  }
  const waitForDrain = () => new Promise((resolve, reject) => {
    assert(callback === null)
    if (socket[kError]) {
      reject(socket[kError])
    } else {
      callback = resolve
    }
  })
  h2stream
    .on('close', onDrain)
    .on('drain', onDrain)
  try {
    // It's up to the user to somehow abort the async iterable.
    for await (const chunk of body) {
      if (socket[kError]) {
        throw socket[kError]
      }
      const res = h2stream.write(chunk)
      request.onBodySent(chunk)
      if (!res) {
        await waitForDrain()
      }
    }
    h2stream.end()
    request.onRequestSent()
    if (!expectsPayload) {
      socket[kReset] = true
    }
    client[kResume]()
  } catch (err) {
    abort(err)
  } finally {
    h2stream
      .off('close', onDrain)
      .off('drain', onDrain)
  }
}
module.exports = connectH2