HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: /var/www/vhost/disk-apps/pwa.sports-crowd.com/node_modules/@grpc/grpc-js/src/retrying-call.ts
/*
 * Copyright 2022 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

import { CallCredentials } from './call-credentials';
import { LogVerbosity, Status } from './constants';
import { Deadline } from './deadline';
import { Metadata } from './metadata';
import { CallConfig } from './resolver';
import * as logging from './logging';
import {
  Call,
  InterceptingListener,
  MessageContext,
  StatusObject,
  WriteCallback,
  WriteObject,
} from './call-interface';
import {
  LoadBalancingCall,
  StatusObjectWithProgress,
} from './load-balancing-call';
import { InternalChannel } from './internal-channel';

const TRACER_NAME = 'retrying_call';

export class RetryThrottler {
  private tokens: number;
  constructor(
    private readonly maxTokens: number,
    private readonly tokenRatio: number,
    previousRetryThrottler?: RetryThrottler
  ) {
    if (previousRetryThrottler) {
      /* When carrying over tokens from a previous config, rescale them to the
       * new max value */
      this.tokens =
        previousRetryThrottler.tokens *
        (maxTokens / previousRetryThrottler.maxTokens);
    } else {
      this.tokens = maxTokens;
    }
  }

  addCallSucceeded() {
    this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
  }

  addCallFailed() {
    this.tokens = Math.min(this.tokens - 1, 0);
  }

  canRetryCall() {
    return this.tokens > this.maxTokens / 2;
  }
}

export class MessageBufferTracker {
  private totalAllocated = 0;
  private allocatedPerCall: Map<number, number> = new Map<number, number>();

  constructor(private totalLimit: number, private limitPerCall: number) {}

  allocate(size: number, callId: number): boolean {
    const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
    if (
      this.limitPerCall - currentPerCall < size ||
      this.totalLimit - this.totalAllocated < size
    ) {
      return false;
    }
    this.allocatedPerCall.set(callId, currentPerCall + size);
    this.totalAllocated += size;
    return true;
  }

  free(size: number, callId: number) {
    if (this.totalAllocated < size) {
      throw new Error(
        `Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`
      );
    }
    this.totalAllocated -= size;
    const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
    if (currentPerCall < size) {
      throw new Error(
        `Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`
      );
    }
    this.allocatedPerCall.set(callId, currentPerCall - size);
  }

  freeAll(callId: number) {
    const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
    if (this.totalAllocated < currentPerCall) {
      throw new Error(
        `Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`
      );
    }
    this.totalAllocated -= currentPerCall;
    this.allocatedPerCall.delete(callId);
  }
}

type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';

interface UnderlyingCall {
  state: UnderlyingCallState;
  call: LoadBalancingCall;
  nextMessageToSend: number;
}

/**
 * A retrying call can be in one of these states:
 * RETRY: Retries are configured and new attempts may be sent
 * HEDGING: Hedging is configured and new attempts may be sent
 * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and
 * transparent retry attempts may still be sent
 * COMMITTED: One attempt is committed, and no new attempts will be
 * sent
 */
type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';

/**
 * The different types of objects that can be stored in the write buffer, with
 * the following meanings:
 * MESSAGE: This is a message to be sent.
 * HALF_CLOSE: When this entry is reached, the calls should send a half-close.
 * FREED: This slot previously contained a message that has been sent on all
 * child calls and is no longer needed.
 */
type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';

/**
 * Entry in the buffer of messages to send to the remote end.
 */
interface WriteBufferEntry {
  entryType: WriteBufferEntryType;
  /**
   * Message to send.
   * Only populated if entryType is MESSAGE.
   */
  message?: WriteObject;
  /**
   * Callback to call after sending the message.
   * Only populated if entryType is MESSAGE and the call is in the COMMITTED
   * state.
   */
  callback?: WriteCallback;
  /**
   * Indicates whether the message is allocated in the buffer tracker. Ignored
   * if entryType is not MESSAGE. Should be the return value of
   * bufferTracker.allocate.
   */
  allocated: boolean;
}

const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';

export class RetryingCall implements Call {
  private state: RetryingCallState;
  private listener: InterceptingListener | null = null;
  private initialMetadata: Metadata | null = null;
  private underlyingCalls: UnderlyingCall[] = [];
  private writeBuffer: WriteBufferEntry[] = [];
  /**
   * The offset of message indices in the writeBuffer. For example, if
   * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
   * is in writeBuffer[5].
   */
  private writeBufferOffset = 0;
  /**
   * Tracks whether a read has been started, so that we know whether to start
   * reads on new child calls. This only matters for the first read, because
   * once a message comes in the child call becomes committed and there will
   * be no new child calls.
   */
  private readStarted = false;
  private transparentRetryUsed = false;
  /**
   * Number of attempts so far
   */
  private attempts = 0;
  private hedgingTimer: NodeJS.Timeout | null = null;
  private committedCallIndex: number | null = null;
  private initialRetryBackoffSec = 0;
  private nextRetryBackoffSec = 0;
  constructor(
    private readonly channel: InternalChannel,
    private readonly callConfig: CallConfig,
    private readonly methodName: string,
    private readonly host: string,
    private readonly credentials: CallCredentials,
    private readonly deadline: Deadline,
    private readonly callNumber: number,
    private readonly bufferTracker: MessageBufferTracker,
    private readonly retryThrottler?: RetryThrottler
  ) {
    if (callConfig.methodConfig.retryPolicy) {
      this.state = 'RETRY';
      const retryPolicy = callConfig.methodConfig.retryPolicy;
      this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(
        retryPolicy.initialBackoff.substring(
          0,
          retryPolicy.initialBackoff.length - 1
        )
      );
    } else if (callConfig.methodConfig.hedgingPolicy) {
      this.state = 'HEDGING';
    } else {
      this.state = 'TRANSPARENT_ONLY';
    }
  }
  getCallNumber(): number {
    return this.callNumber;
  }

  private trace(text: string): void {
    logging.trace(
      LogVerbosity.DEBUG,
      TRACER_NAME,
      '[' + this.callNumber + '] ' + text
    );
  }

  private reportStatus(statusObject: StatusObject) {
    this.trace(
      'ended with status: code=' +
        statusObject.code +
        ' details="' +
        statusObject.details +
        '"'
    );
    this.bufferTracker.freeAll(this.callNumber);
    this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
    this.writeBuffer = [];
    process.nextTick(() => {
      // Explicitly construct status object to remove progress field
      this.listener?.onReceiveStatus({
        code: statusObject.code,
        details: statusObject.details,
        metadata: statusObject.metadata,
      });
    });
  }

  cancelWithStatus(status: Status, details: string): void {
    this.trace(
      'cancelWithStatus code: ' + status + ' details: "' + details + '"'
    );
    this.reportStatus({ code: status, details, metadata: new Metadata() });
    for (const { call } of this.underlyingCalls) {
      call.cancelWithStatus(status, details);
    }
  }
  getPeer(): string {
    if (this.committedCallIndex !== null) {
      return this.underlyingCalls[this.committedCallIndex].call.getPeer();
    } else {
      return 'unknown';
    }
  }

  private getBufferEntry(messageIndex: number): WriteBufferEntry {
    return (
      this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {
        entryType: 'FREED',
        allocated: false,
      }
    );
  }

  private getNextBufferIndex() {
    return this.writeBufferOffset + this.writeBuffer.length;
  }

  private clearSentMessages() {
    if (this.state !== 'COMMITTED') {
      return;
    }
    const earliestNeededMessageIndex =
      this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
    for (
      let messageIndex = this.writeBufferOffset;
      messageIndex < earliestNeededMessageIndex;
      messageIndex++
    ) {
      const bufferEntry = this.getBufferEntry(messageIndex);
      if (bufferEntry.allocated) {
        this.bufferTracker.free(
          bufferEntry.message!.message.length,
          this.callNumber
        );
      }
    }
    this.writeBuffer = this.writeBuffer.slice(
      earliestNeededMessageIndex - this.writeBufferOffset
    );
    this.writeBufferOffset = earliestNeededMessageIndex;
  }

  private commitCall(index: number) {
    if (this.state === 'COMMITTED') {
      return;
    }
    if (this.underlyingCalls[index].state === 'COMPLETED') {
      return;
    }
    this.trace(
      'Committing call [' +
        this.underlyingCalls[index].call.getCallNumber() +
        '] at index ' +
        index
    );
    this.state = 'COMMITTED';
    this.committedCallIndex = index;
    for (let i = 0; i < this.underlyingCalls.length; i++) {
      if (i === index) {
        continue;
      }
      if (this.underlyingCalls[i].state === 'COMPLETED') {
        continue;
      }
      this.underlyingCalls[i].state = 'COMPLETED';
      this.underlyingCalls[i].call.cancelWithStatus(
        Status.CANCELLED,
        'Discarded in favor of other hedged attempt'
      );
    }
    this.clearSentMessages();
  }

  private commitCallWithMostMessages() {
    if (this.state === 'COMMITTED') {
      return;
    }
    let mostMessages = -1;
    let callWithMostMessages = -1;
    for (const [index, childCall] of this.underlyingCalls.entries()) {
      if (
        childCall.state === 'ACTIVE' &&
        childCall.nextMessageToSend > mostMessages
      ) {
        mostMessages = childCall.nextMessageToSend;
        callWithMostMessages = index;
      }
    }
    if (callWithMostMessages === -1) {
      /* There are no active calls, disable retries to force the next call that
       * is started to be committed. */
      this.state = 'TRANSPARENT_ONLY';
    } else {
      this.commitCall(callWithMostMessages);
    }
  }

  private isStatusCodeInList(list: (Status | string)[], code: Status) {
    return list.some(
      value =>
        value === code ||
        value.toString().toLowerCase() === Status[code].toLowerCase()
    );
  }

  private getNextRetryBackoffMs() {
    const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
    if (!retryPolicy) {
      return 0;
    }
    const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
    const maxBackoffSec = Number(
      retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1)
    );
    this.nextRetryBackoffSec = Math.min(
      this.nextRetryBackoffSec * retryPolicy.backoffMultiplier,
      maxBackoffSec
    );
    return nextBackoffMs;
  }

  private maybeRetryCall(
    pushback: number | null,
    callback: (retried: boolean) => void
  ) {
    if (this.state !== 'RETRY') {
      callback(false);
      return;
    }
    const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
    if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
      callback(false);
      return;
    }
    let retryDelayMs: number;
    if (pushback === null) {
      retryDelayMs = this.getNextRetryBackoffMs();
    } else if (pushback < 0) {
      this.state = 'TRANSPARENT_ONLY';
      callback(false);
      return;
    } else {
      retryDelayMs = pushback;
      this.nextRetryBackoffSec = this.initialRetryBackoffSec;
    }
    setTimeout(() => {
      if (this.state !== 'RETRY') {
        callback(false);
        return;
      }
      if (this.retryThrottler?.canRetryCall() ?? true) {
        callback(true);
        this.attempts += 1;
        this.startNewAttempt();
      }
    }, retryDelayMs);
  }

  private countActiveCalls(): number {
    let count = 0;
    for (const call of this.underlyingCalls) {
      if (call?.state === 'ACTIVE') {
        count += 1;
      }
    }
    return count;
  }

  private handleProcessedStatus(
    status: StatusObject,
    callIndex: number,
    pushback: number | null
  ) {
    switch (this.state) {
      case 'COMMITTED':
      case 'TRANSPARENT_ONLY':
        this.commitCall(callIndex);
        this.reportStatus(status);
        break;
      case 'HEDGING':
        if (
          this.isStatusCodeInList(
            this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ??
              [],
            status.code
          )
        ) {
          this.retryThrottler?.addCallFailed();
          let delayMs: number;
          if (pushback === null) {
            delayMs = 0;
          } else if (pushback < 0) {
            this.state = 'TRANSPARENT_ONLY';
            this.commitCall(callIndex);
            this.reportStatus(status);
            return;
          } else {
            delayMs = pushback;
          }
          setTimeout(() => {
            this.maybeStartHedgingAttempt();
            // If after trying to start a call there are no active calls, this was the last one
            if (this.countActiveCalls() === 0) {
              this.commitCall(callIndex);
              this.reportStatus(status);
            }
          }, delayMs);
        } else {
          this.commitCall(callIndex);
          this.reportStatus(status);
        }
        break;
      case 'RETRY':
        if (
          this.isStatusCodeInList(
            this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes,
            status.code
          )
        ) {
          this.retryThrottler?.addCallFailed();
          this.maybeRetryCall(pushback, retried => {
            if (!retried) {
              this.commitCall(callIndex);
              this.reportStatus(status);
            }
          });
        } else {
          this.commitCall(callIndex);
          this.reportStatus(status);
        }
        break;
    }
  }

  private getPushback(metadata: Metadata): number | null {
    const mdValue = metadata.get('grpc-retry-pushback-ms');
    if (mdValue.length === 0) {
      return null;
    }
    try {
      return parseInt(mdValue[0] as string);
    } catch (e) {
      return -1;
    }
  }

  private handleChildStatus(
    status: StatusObjectWithProgress,
    callIndex: number
  ) {
    if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
      return;
    }
    this.trace(
      'state=' +
        this.state +
        ' handling status with progress ' +
        status.progress +
        ' from child [' +
        this.underlyingCalls[callIndex].call.getCallNumber() +
        '] in state ' +
        this.underlyingCalls[callIndex].state
    );
    this.underlyingCalls[callIndex].state = 'COMPLETED';
    if (status.code === Status.OK) {
      this.retryThrottler?.addCallSucceeded();
      this.commitCall(callIndex);
      this.reportStatus(status);
      return;
    }
    if (this.state === 'COMMITTED') {
      this.reportStatus(status);
      return;
    }
    const pushback = this.getPushback(status.metadata);
    switch (status.progress) {
      case 'NOT_STARTED':
        // RPC never leaves the client, always safe to retry
        this.startNewAttempt();
        break;
      case 'REFUSED':
        // RPC reaches the server library, but not the server application logic
        if (this.transparentRetryUsed) {
          this.handleProcessedStatus(status, callIndex, pushback);
        } else {
          this.transparentRetryUsed = true;
          this.startNewAttempt();
        }
        break;
      case 'DROP':
        this.commitCall(callIndex);
        this.reportStatus(status);
        break;
      case 'PROCESSED':
        this.handleProcessedStatus(status, callIndex, pushback);
        break;
    }
  }

  private maybeStartHedgingAttempt() {
    if (this.state !== 'HEDGING') {
      return;
    }
    if (!this.callConfig.methodConfig.hedgingPolicy) {
      return;
    }
    const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
    if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
      return;
    }
    this.attempts += 1;
    this.startNewAttempt();
    this.maybeStartHedgingTimer();
  }

  private maybeStartHedgingTimer() {
    if (this.hedgingTimer) {
      clearTimeout(this.hedgingTimer);
    }
    if (this.state !== 'HEDGING') {
      return;
    }
    if (!this.callConfig.methodConfig.hedgingPolicy) {
      return;
    }
    const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
    if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
      return;
    }
    const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
    const hedgingDelaySec = Number(
      hedgingDelayString.substring(0, hedgingDelayString.length - 1)
    );
    this.hedgingTimer = setTimeout(() => {
      this.maybeStartHedgingAttempt();
    }, hedgingDelaySec * 1000);
    this.hedgingTimer.unref?.();
  }

  private startNewAttempt() {
    const child = this.channel.createLoadBalancingCall(
      this.callConfig,
      this.methodName,
      this.host,
      this.credentials,
      this.deadline
    );
    this.trace(
      'Created child call [' +
        child.getCallNumber() +
        '] for attempt ' +
        this.attempts
    );
    const index = this.underlyingCalls.length;
    this.underlyingCalls.push({
      state: 'ACTIVE',
      call: child,
      nextMessageToSend: 0,
    });
    const previousAttempts = this.attempts - 1;
    const initialMetadata = this.initialMetadata!.clone();
    if (previousAttempts > 0) {
      initialMetadata.set(
        PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
        `${previousAttempts}`
      );
    }
    let receivedMetadata = false;
    child.start(initialMetadata, {
      onReceiveMetadata: metadata => {
        this.trace(
          'Received metadata from child [' + child.getCallNumber() + ']'
        );
        this.commitCall(index);
        receivedMetadata = true;
        if (previousAttempts > 0) {
          metadata.set(
            PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
            `${previousAttempts}`
          );
        }
        if (this.underlyingCalls[index].state === 'ACTIVE') {
          this.listener!.onReceiveMetadata(metadata);
        }
      },
      onReceiveMessage: message => {
        this.trace(
          'Received message from child [' + child.getCallNumber() + ']'
        );
        this.commitCall(index);
        if (this.underlyingCalls[index].state === 'ACTIVE') {
          this.listener!.onReceiveMessage(message);
        }
      },
      onReceiveStatus: status => {
        this.trace(
          'Received status from child [' + child.getCallNumber() + ']'
        );
        if (!receivedMetadata && previousAttempts > 0) {
          status.metadata.set(
            PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
            `${previousAttempts}`
          );
        }
        this.handleChildStatus(status, index);
      },
    });
    this.sendNextChildMessage(index);
    if (this.readStarted) {
      child.startRead();
    }
  }

  start(metadata: Metadata, listener: InterceptingListener): void {
    this.trace('start called');
    this.listener = listener;
    this.initialMetadata = metadata;
    this.attempts += 1;
    this.startNewAttempt();
    this.maybeStartHedgingTimer();
  }

  private handleChildWriteCompleted(childIndex: number) {
    const childCall = this.underlyingCalls[childIndex];
    const messageIndex = childCall.nextMessageToSend;
    this.getBufferEntry(messageIndex).callback?.();
    this.clearSentMessages();
    childCall.nextMessageToSend += 1;
    this.sendNextChildMessage(childIndex);
  }

  private sendNextChildMessage(childIndex: number) {
    const childCall = this.underlyingCalls[childIndex];
    if (childCall.state === 'COMPLETED') {
      return;
    }
    if (this.getBufferEntry(childCall.nextMessageToSend)) {
      const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
      switch (bufferEntry.entryType) {
        case 'MESSAGE':
          childCall.call.sendMessageWithContext(
            {
              callback: error => {
                // Ignore error
                this.handleChildWriteCompleted(childIndex);
              },
            },
            bufferEntry.message!.message
          );
          break;
        case 'HALF_CLOSE':
          childCall.nextMessageToSend += 1;
          childCall.call.halfClose();
          break;
        case 'FREED':
          // Should not be possible
          break;
      }
    }
  }

  sendMessageWithContext(context: MessageContext, message: Buffer): void {
    this.trace('write() called with message of length ' + message.length);
    const writeObj: WriteObject = {
      message,
      flags: context.flags,
    };
    const messageIndex = this.getNextBufferIndex();
    const bufferEntry: WriteBufferEntry = {
      entryType: 'MESSAGE',
      message: writeObj,
      allocated: this.bufferTracker.allocate(message.length, this.callNumber),
    };
    this.writeBuffer.push(bufferEntry);
    if (bufferEntry.allocated) {
      context.callback?.();
      for (const [callIndex, call] of this.underlyingCalls.entries()) {
        if (
          call.state === 'ACTIVE' &&
          call.nextMessageToSend === messageIndex
        ) {
          call.call.sendMessageWithContext(
            {
              callback: error => {
                // Ignore error
                this.handleChildWriteCompleted(callIndex);
              },
            },
            message
          );
        }
      }
    } else {
      this.commitCallWithMostMessages();
      // commitCallWithMostMessages can fail if we are between ping attempts
      if (this.committedCallIndex === null) {
        return;
      }
      const call = this.underlyingCalls[this.committedCallIndex];
      bufferEntry.callback = context.callback;
      if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
        call.call.sendMessageWithContext(
          {
            callback: error => {
              // Ignore error
              this.handleChildWriteCompleted(this.committedCallIndex!);
            },
          },
          message
        );
      }
    }
  }
  startRead(): void {
    this.trace('startRead called');
    this.readStarted = true;
    for (const underlyingCall of this.underlyingCalls) {
      if (underlyingCall?.state === 'ACTIVE') {
        underlyingCall.call.startRead();
      }
    }
  }
  halfClose(): void {
    this.trace('halfClose called');
    const halfCloseIndex = this.getNextBufferIndex();
    this.writeBuffer.push({
      entryType: 'HALF_CLOSE',
      allocated: false,
    });
    for (const call of this.underlyingCalls) {
      if (
        call?.state === 'ACTIVE' &&
        call.nextMessageToSend === halfCloseIndex
      ) {
        call.nextMessageToSend += 1;
        call.call.halfClose();
      }
    }
  }
  setCredentials(newCredentials: CallCredentials): void {
    throw new Error('Method not implemented.');
  }
  getMethod(): string {
    return this.methodName;
  }
  getHost(): string {
    return this.host;
  }
}