HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //lib/python3/dist-packages/cloudinit/sources/azure/imds.py
# Copyright (C) 2022 Microsoft Corporation.
#
# This file is part of cloud-init. See LICENSE file for license information.

from typing import Dict

import requests

from cloudinit import log as logging
from cloudinit import util
from cloudinit.sources.helpers.azure import report_diagnostic_event
from cloudinit.url_helper import UrlError, readurl

LOG = logging.getLogger(__name__)

IMDS_URL = "http://169.254.169.254/metadata"


class ReadUrlRetryHandler:
    def __init__(
        self,
        *,
        retry_codes=(
            404,  # not found (yet)
            410,  # gone / unavailable (yet)
            429,  # rate-limited/throttled
            500,  # server error
        ),
        max_connection_errors: int = 10,
        logging_backoff: float = 1.0,
    ) -> None:
        self.logging_backoff = logging_backoff
        self.max_connection_errors = max_connection_errors
        self.retry_codes = retry_codes
        self._logging_threshold = 1.0
        self._request_count = 0

    def exception_callback(self, req_args, exception) -> bool:
        self._request_count += 1
        if not isinstance(exception, UrlError):
            report_diagnostic_event(
                "Polling IMDS failed with unexpected exception: %r"
                % (exception),
                logger_func=LOG.warning,
            )
            return False

        log = True
        retry = True

        # Check for connection errors which may occur early boot, but
        # are otherwise indicative that we are not connecting with the
        # primary NIC.
        if isinstance(
            exception.cause, (requests.ConnectionError, requests.Timeout)
        ):
            self.max_connection_errors -= 1
            if self.max_connection_errors < 0:
                retry = False
        elif exception.code not in self.retry_codes:
            retry = False

        if self._request_count >= self._logging_threshold:
            self._logging_threshold *= self.logging_backoff
        else:
            log = False

        if log or not retry:
            report_diagnostic_event(
                "Polling IMDS failed attempt %d with exception: %r"
                % (self._request_count, exception),
                logger_func=LOG.info,
            )
        return retry


def _fetch_url(
    url: str, *, log_response: bool = True, retries: int = 10, timeout: int = 2
) -> bytes:
    """Fetch URL from IMDS.

    :raises UrlError: on error fetching metadata.
    """
    handler = ReadUrlRetryHandler()

    try:
        response = readurl(
            url,
            exception_cb=handler.exception_callback,
            headers={"Metadata": "true"},
            infinite=False,
            log_req_resp=log_response,
            retries=retries,
            timeout=timeout,
        )
    except UrlError as error:
        report_diagnostic_event(
            "Failed to fetch metadata from IMDS: %s" % error,
            logger_func=LOG.warning,
        )
        raise

    return response.contents


def _fetch_metadata(
    url: str,
    retries: int = 10,
) -> Dict:
    """Fetch IMDS metadata.

    :raises UrlError: on error fetching metadata.
    :raises ValueError: on error parsing metadata.
    """
    metadata = _fetch_url(url, retries=retries)

    try:
        return util.load_json(metadata)
    except ValueError as error:
        report_diagnostic_event(
            "Failed to parse metadata from IMDS: %s" % error,
            logger_func=LOG.warning,
        )
        raise


def fetch_metadata_with_api_fallback(retries: int = 10) -> Dict:
    """Fetch extended metadata, falling back to non-extended as required.

    :raises UrlError: on error fetching metadata.
    :raises ValueError: on error parsing metadata.
    """
    try:
        url = IMDS_URL + "/instance?api-version=2021-08-01&extended=true"
        return _fetch_metadata(url, retries=retries)
    except UrlError as error:
        if error.code == 400:
            report_diagnostic_event(
                "Falling back to IMDS api-version: 2019-06-01",
                logger_func=LOG.warning,
            )
            url = IMDS_URL + "/instance?api-version=2019-06-01"
            return _fetch_metadata(url, retries=retries)
        raise


def fetch_reprovision_data() -> bytes:
    """Fetch extended metadata, falling back to non-extended as required.

    :raises UrlError: on error.
    """
    url = IMDS_URL + "/reprovisiondata?api-version=2019-06-01"

    handler = ReadUrlRetryHandler(
        logging_backoff=2.0,
        max_connection_errors=0,
        retry_codes=(
            404,
            410,
        ),
    )
    response = readurl(
        url,
        exception_cb=handler.exception_callback,
        headers={"Metadata": "true"},
        infinite=True,
        log_req_resp=False,
        timeout=2,
    )

    report_diagnostic_event(
        f"Polled IMDS {handler._request_count+1} time(s)",
        logger_func=LOG.debug,
    )
    return response.contents