HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: /var/www/vhost/disk-apps/pwa.sports-crowd.com/node_modules/@grpc/grpc-js/src/subchannel.ts
/*
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

import { ChannelCredentials } from './channel-credentials';
import { Metadata } from './metadata';
import { ChannelOptions } from './channel-options';
import { ConnectivityState } from './connectivity-state';
import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
import * as logging from './logging';
import { LogVerbosity, Status } from './constants';
import { GrpcUri, uriToString } from './uri-parser';
import {
  SubchannelAddress,
  subchannelAddressToString,
} from './subchannel-address';
import {
  SubchannelRef,
  ChannelzTrace,
  ChannelzChildrenTracker,
  SubchannelInfo,
  registerChannelzSubchannel,
  ChannelzCallTracker,
  unregisterChannelzRef,
} from './channelz';
import {
  ConnectivityStateListener,
  SubchannelInterface,
} from './subchannel-interface';
import { SubchannelCallInterceptingListener } from './subchannel-call';
import { SubchannelCall } from './subchannel-call';
import { CallEventTracker, SubchannelConnector, Transport } from './transport';

const TRACER_NAME = 'subchannel';

/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
 * have a constant for the max signed 32 bit integer, so this is a simple way
 * to calculate it */
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);

export class Subchannel {
  /**
   * The subchannel's current connectivity state. Invariant: `session` === `null`
   * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
   */
  private connectivityState: ConnectivityState = ConnectivityState.IDLE;
  /**
   * The underlying http2 session used to make requests.
   */
  private transport: Transport | null = null;
  /**
   * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
   * CONNECTING instead of IDLE when the backoff timeout ends.
   */
  private continueConnecting = false;
  /**
   * A list of listener functions that will be called whenever the connectivity
   * state changes. Will be modified by `addConnectivityStateListener` and
   * `removeConnectivityStateListener`
   */
  private stateListeners: Set<ConnectivityStateListener> = new Set();

  private backoffTimeout: BackoffTimeout;

  private keepaliveTime: number;
  /**
   * Tracks channels and subchannel pools with references to this subchannel
   */
  private refcount = 0;

  /**
   * A string representation of the subchannel address, for logging/tracing
   */
  private subchannelAddressString: string;

  // Channelz info
  private readonly channelzEnabled: boolean = true;
  private channelzRef: SubchannelRef;
  private channelzTrace: ChannelzTrace;
  private callTracker = new ChannelzCallTracker();
  private childrenTracker = new ChannelzChildrenTracker();

  // Channelz socket info
  private streamTracker = new ChannelzCallTracker();

  /**
   * A class representing a connection to a single backend.
   * @param channelTarget The target string for the channel as a whole
   * @param subchannelAddress The address for the backend that this subchannel
   *     will connect to
   * @param options The channel options, plus any specific subchannel options
   *     for this subchannel
   * @param credentials The channel credentials used to establish this
   *     connection
   */
  constructor(
    private channelTarget: GrpcUri,
    private subchannelAddress: SubchannelAddress,
    private options: ChannelOptions,
    private credentials: ChannelCredentials,
    private connector: SubchannelConnector
  ) {
    const backoffOptions: BackoffOptions = {
      initialDelay: options['grpc.initial_reconnect_backoff_ms'],
      maxDelay: options['grpc.max_reconnect_backoff_ms'],
    };
    this.backoffTimeout = new BackoffTimeout(() => {
      this.handleBackoffTimer();
    }, backoffOptions);
    this.backoffTimeout.unref();
    this.subchannelAddressString = subchannelAddressToString(subchannelAddress);

    this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;

    if (options['grpc.enable_channelz'] === 0) {
      this.channelzEnabled = false;
    }
    this.channelzTrace = new ChannelzTrace();
    this.channelzRef = registerChannelzSubchannel(
      this.subchannelAddressString,
      () => this.getChannelzInfo(),
      this.channelzEnabled
    );
    if (this.channelzEnabled) {
      this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
    }
    this.trace(
      'Subchannel constructed with options ' +
        JSON.stringify(options, undefined, 2)
    );
  }

  private getChannelzInfo(): SubchannelInfo {
    return {
      state: this.connectivityState,
      trace: this.channelzTrace,
      callTracker: this.callTracker,
      children: this.childrenTracker.getChildLists(),
      target: this.subchannelAddressString,
    };
  }

  private trace(text: string): void {
    logging.trace(
      LogVerbosity.DEBUG,
      TRACER_NAME,
      '(' +
        this.channelzRef.id +
        ') ' +
        this.subchannelAddressString +
        ' ' +
        text
    );
  }

  private refTrace(text: string): void {
    logging.trace(
      LogVerbosity.DEBUG,
      'subchannel_refcount',
      '(' +
        this.channelzRef.id +
        ') ' +
        this.subchannelAddressString +
        ' ' +
        text
    );
  }

  private handleBackoffTimer() {
    if (this.continueConnecting) {
      this.transitionToState(
        [ConnectivityState.TRANSIENT_FAILURE],
        ConnectivityState.CONNECTING
      );
    } else {
      this.transitionToState(
        [ConnectivityState.TRANSIENT_FAILURE],
        ConnectivityState.IDLE
      );
    }
  }

  /**
   * Start a backoff timer with the current nextBackoff timeout
   */
  private startBackoff() {
    this.backoffTimeout.runOnce();
  }

  private stopBackoff() {
    this.backoffTimeout.stop();
    this.backoffTimeout.reset();
  }

  private startConnectingInternal() {
    let options = this.options;
    if (options['grpc.keepalive_time_ms']) {
      const adjustedKeepaliveTime = Math.min(
        this.keepaliveTime,
        KEEPALIVE_MAX_TIME_MS
      );
      options = { ...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime };
    }
    this.connector
      .connect(this.subchannelAddress, this.credentials, options)
      .then(
        transport => {
          if (
            this.transitionToState(
              [ConnectivityState.CONNECTING],
              ConnectivityState.READY
            )
          ) {
            this.transport = transport;
            if (this.channelzEnabled) {
              this.childrenTracker.refChild(transport.getChannelzRef());
            }
            transport.addDisconnectListener(tooManyPings => {
              this.transitionToState(
                [ConnectivityState.READY],
                ConnectivityState.IDLE
              );
              if (tooManyPings && this.keepaliveTime > 0) {
                this.keepaliveTime *= 2;
                logging.log(
                  LogVerbosity.ERROR,
                  `Connection to ${uriToString(this.channelTarget)} at ${
                    this.subchannelAddressString
                  } rejected by server because of excess pings. Increasing ping interval to ${
                    this.keepaliveTime
                  } ms`
                );
              }
            });
          } else {
            /* If we can't transition from CONNECTING to READY here, we will
             * not be using this transport, so release its resources. */
            transport.shutdown();
          }
        },
        error => {
          this.transitionToState(
            [ConnectivityState.CONNECTING],
            ConnectivityState.TRANSIENT_FAILURE,
            `${error}`
          );
        }
      );
  }

  /**
   * Initiate a state transition from any element of oldStates to the new
   * state. If the current connectivityState is not in oldStates, do nothing.
   * @param oldStates The set of states to transition from
   * @param newState The state to transition to
   * @returns True if the state changed, false otherwise
   */
  private transitionToState(
    oldStates: ConnectivityState[],
    newState: ConnectivityState,
    errorMessage?: string
  ): boolean {
    if (oldStates.indexOf(this.connectivityState) === -1) {
      return false;
    }
    this.trace(
      ConnectivityState[this.connectivityState] +
        ' -> ' +
        ConnectivityState[newState]
    );
    if (this.channelzEnabled) {
      this.channelzTrace.addTrace(
        'CT_INFO',
        'Connectivity state change to ' + ConnectivityState[newState]
      );
    }
    const previousState = this.connectivityState;
    this.connectivityState = newState;
    switch (newState) {
      case ConnectivityState.READY:
        this.stopBackoff();
        break;
      case ConnectivityState.CONNECTING:
        this.startBackoff();
        this.startConnectingInternal();
        this.continueConnecting = false;
        break;
      case ConnectivityState.TRANSIENT_FAILURE:
        if (this.channelzEnabled && this.transport) {
          this.childrenTracker.unrefChild(this.transport.getChannelzRef());
        }
        this.transport?.shutdown();
        this.transport = null;
        /* If the backoff timer has already ended by the time we get to the
         * TRANSIENT_FAILURE state, we want to immediately transition out of
         * TRANSIENT_FAILURE as though the backoff timer is ending right now */
        if (!this.backoffTimeout.isRunning()) {
          process.nextTick(() => {
            this.handleBackoffTimer();
          });
        }
        break;
      case ConnectivityState.IDLE:
        if (this.channelzEnabled && this.transport) {
          this.childrenTracker.unrefChild(this.transport.getChannelzRef());
        }
        this.transport?.shutdown();
        this.transport = null;
        break;
      default:
        throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
    }
    for (const listener of this.stateListeners) {
      listener(this, previousState, newState, this.keepaliveTime, errorMessage);
    }
    return true;
  }

  ref() {
    this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount + 1));
    this.refcount += 1;
  }

  unref() {
    this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1));
    this.refcount -= 1;
    if (this.refcount === 0) {
      if (this.channelzEnabled) {
        this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
      }
      if (this.channelzEnabled) {
        unregisterChannelzRef(this.channelzRef);
      }
      process.nextTick(() => {
        this.transitionToState(
          [ConnectivityState.CONNECTING, ConnectivityState.READY],
          ConnectivityState.IDLE
        );
      });
    }
  }

  unrefIfOneRef(): boolean {
    if (this.refcount === 1) {
      this.unref();
      return true;
    }
    return false;
  }

  createCall(
    metadata: Metadata,
    host: string,
    method: string,
    listener: SubchannelCallInterceptingListener
  ): SubchannelCall {
    if (!this.transport) {
      throw new Error('Cannot create call, subchannel not READY');
    }
    let statsTracker: Partial<CallEventTracker>;
    if (this.channelzEnabled) {
      this.callTracker.addCallStarted();
      this.streamTracker.addCallStarted();
      statsTracker = {
        onCallEnd: status => {
          if (status.code === Status.OK) {
            this.callTracker.addCallSucceeded();
          } else {
            this.callTracker.addCallFailed();
          }
        },
      };
    } else {
      statsTracker = {};
    }
    return this.transport.createCall(
      metadata,
      host,
      method,
      listener,
      statsTracker
    );
  }

  /**
   * If the subchannel is currently IDLE, start connecting and switch to the
   * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
   * the next time it would transition to IDLE, start connecting again instead.
   * Otherwise, do nothing.
   */
  startConnecting() {
    process.nextTick(() => {
      /* First, try to transition from IDLE to connecting. If that doesn't happen
       * because the state is not currently IDLE, check if it is
       * TRANSIENT_FAILURE, and if so indicate that it should go back to
       * connecting after the backoff timer ends. Otherwise do nothing */
      if (
        !this.transitionToState(
          [ConnectivityState.IDLE],
          ConnectivityState.CONNECTING
        )
      ) {
        if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
          this.continueConnecting = true;
        }
      }
    });
  }

  /**
   * Get the subchannel's current connectivity state.
   */
  getConnectivityState() {
    return this.connectivityState;
  }

  /**
   * Add a listener function to be called whenever the subchannel's
   * connectivity state changes.
   * @param listener
   */
  addConnectivityStateListener(listener: ConnectivityStateListener) {
    this.stateListeners.add(listener);
  }

  /**
   * Remove a listener previously added with `addConnectivityStateListener`
   * @param listener A reference to a function previously passed to
   *     `addConnectivityStateListener`
   */
  removeConnectivityStateListener(listener: ConnectivityStateListener) {
    this.stateListeners.delete(listener);
  }

  /**
   * Reset the backoff timeout, and immediately start connecting if in backoff.
   */
  resetBackoff() {
    process.nextTick(() => {
      this.backoffTimeout.reset();
      this.transitionToState(
        [ConnectivityState.TRANSIENT_FAILURE],
        ConnectivityState.CONNECTING
      );
    });
  }

  getAddress(): string {
    return this.subchannelAddressString;
  }

  getChannelzRef(): SubchannelRef {
    return this.channelzRef;
  }

  getRealSubchannel(): this {
    return this;
  }

  realSubchannelEquals(other: SubchannelInterface): boolean {
    return other.getRealSubchannel() === this;
  }

  throttleKeepalive(newKeepaliveTime: number) {
    if (newKeepaliveTime > this.keepaliveTime) {
      this.keepaliveTime = newKeepaliveTime;
    }
  }
}