HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/lib/python3/dist-packages/twisted/names/test/test_rootresolve.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Test cases for Twisted.names' root resolver.
"""

from zope.interface import implementer
from zope.interface.verify import verifyClass

from twisted.python.log import msg
from twisted.trial import util
from twisted.trial.unittest import SynchronousTestCase, TestCase
from twisted.internet.defer import Deferred, succeed, gatherResults, TimeoutError
from twisted.internet.interfaces import IResolverSimple
from twisted.names import client, root
from twisted.names.root import Resolver
from twisted.names.dns import (
    IN, HS, A, NS, CNAME, OK, ENAME, Record_CNAME,
    Name, Query, Message, RRHeader, Record_A, Record_NS)
from twisted.names.error import DNSNameError, ResolverError
from twisted.names.test.test_util import MemoryReactor



def getOnePayload(results):
    """
    From the result of a L{Deferred} returned by L{IResolver.lookupAddress},
    return the payload of the first record in the answer section.
    """
    ans, auth, add = results
    return ans[0].payload


def getOneAddress(results):
    """
    From the result of a L{Deferred} returned by L{IResolver.lookupAddress},
    return the first IPv4 address from the answer section.
    """
    return getOnePayload(results).dottedQuad()



class RootResolverTests(TestCase):
    """
    Tests for L{twisted.names.root.Resolver}.
    """
    def _queryTest(self, filter):
        """
        Invoke L{Resolver._query} and verify that it sends the correct DNS
        query.  Deliver a canned response to the query and return whatever the
        L{Deferred} returned by L{Resolver._query} fires with.

        @param filter: The value to pass for the C{filter} parameter to
            L{Resolver._query}.
        """
        reactor = MemoryReactor()
        resolver = Resolver([], reactor=reactor)
        d = resolver._query(
            Query(b'foo.example.com', A, IN), [('1.1.2.3', 1053)], (30,),
            filter)

        # A UDP port should have been started.
        portNumber, transport = reactor.udpPorts.popitem()

        # And a DNS packet sent.
        [(packet, address)] = transport._sentPackets

        message = Message()
        message.fromStr(packet)

        # It should be a query with the parameters used above.
        self.assertEqual(message.queries, [Query(b'foo.example.com', A, IN)])
        self.assertEqual(message.answers, [])
        self.assertEqual(message.authority, [])
        self.assertEqual(message.additional, [])

        response = []
        d.addCallback(response.append)
        self.assertEqual(response, [])

        # Once a reply is received, the Deferred should fire.
        del message.queries[:]
        message.answer = 1
        message.answers.append(RRHeader(
            b'foo.example.com', payload=Record_A('5.8.13.21')))
        transport._protocol.datagramReceived(
            message.toStr(), ('1.1.2.3', 1053))
        return response[0]


    def test_filteredQuery(self):
        """
        L{Resolver._query} accepts a L{Query} instance and an address, issues
        the query, and returns a L{Deferred} which fires with the response to
        the query.  If a true value is passed for the C{filter} parameter, the
        result is a three-tuple of lists of records.
        """
        answer, authority, additional = self._queryTest(True)
        self.assertEqual(
            answer,
            [RRHeader(b'foo.example.com', payload=Record_A('5.8.13.21', ttl=0))])
        self.assertEqual(authority, [])
        self.assertEqual(additional, [])


    def test_unfilteredQuery(self):
        """
        Similar to L{test_filteredQuery}, but for the case where a false value
        is passed for the C{filter} parameter.  In this case, the result is a
        L{Message} instance.
        """
        message = self._queryTest(False)
        self.assertIsInstance(message, Message)
        self.assertEqual(message.queries, [])
        self.assertEqual(
            message.answers,
            [RRHeader(b'foo.example.com', payload=Record_A('5.8.13.21', ttl=0))])
        self.assertEqual(message.authority, [])
        self.assertEqual(message.additional, [])


    def _respond(self, answers=[], authority=[], additional=[], rCode=OK):
        """
        Create a L{Message} suitable for use as a response to a query.

        @param answers: A C{list} of two-tuples giving data for the answers
            section of the message.  The first element of each tuple is a name
            for the L{RRHeader}.  The second element is the payload.
        @param authority: A C{list} like C{answers}, but for the authority
            section of the response.
        @param additional: A C{list} like C{answers}, but for the
            additional section of the response.
        @param rCode: The response code the message will be created with.

        @return: A new L{Message} initialized with the given values.
        """
        response = Message(rCode=rCode)
        for (section, data) in [(response.answers, answers),
                                (response.authority, authority),
                                (response.additional, additional)]:
            section.extend([
                    RRHeader(name, record.TYPE, getattr(record, 'CLASS', IN),
                             payload=record)
                    for (name, record) in data])
        return response


    def _getResolver(self, serverResponses, maximumQueries=10):
        """
        Create and return a new L{root.Resolver} modified to resolve queries
        against the record data represented by C{servers}.

        @param serverResponses: A mapping from dns server addresses to
            mappings.  The inner mappings are from query two-tuples (name,
            type) to dictionaries suitable for use as **arguments to
            L{_respond}.  See that method for details.
        """
        roots = ['1.1.2.3']
        resolver = Resolver(roots, maximumQueries)

        def query(query, serverAddresses, timeout, filter):
            msg("Query for QNAME %s at %r" % (query.name, serverAddresses))
            for addr in serverAddresses:
                try:
                    server = serverResponses[addr]
                except KeyError:
                    continue
                records = server[query.name.name, query.type]
                return succeed(self._respond(**records))
        resolver._query = query
        return resolver


    def test_lookupAddress(self):
        """
        L{root.Resolver.lookupAddress} looks up the I{A} records for the
        specified hostname by first querying one of the root servers the
        resolver was created with and then following the authority delegations
        until a result is received.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'foo.example.com', A): {
                    'authority': [(b'foo.example.com', Record_NS(b'ns1.example.com'))],
                    'additional': [(b'ns1.example.com', Record_A('34.55.89.144'))],
                    },
                },
            ('34.55.89.144', 53): {
                (b'foo.example.com', A): {
                    'answers': [(b'foo.example.com', Record_A('10.0.0.1'))],
                    }
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'foo.example.com')
        d.addCallback(getOneAddress)
        d.addCallback(self.assertEqual, '10.0.0.1')
        return d


    def test_lookupChecksClass(self):
        """
        If a response includes a record with a class different from the one
        in the query, it is ignored and lookup continues until a record with
        the right class is found.
        """
        badClass = Record_A('10.0.0.1')
        badClass.CLASS = HS
        servers = {
            ('1.1.2.3', 53): {
                (b'foo.example.com', A): {
                    'answers': [(b'foo.example.com', badClass)],
                    'authority': [(b'foo.example.com', Record_NS(b'ns1.example.com'))],
                    'additional': [(b'ns1.example.com', Record_A('10.0.0.2'))],
                },
            },
            ('10.0.0.2', 53): {
                (b'foo.example.com', A): {
                    'answers': [(b'foo.example.com', Record_A('10.0.0.3'))],
                },
            },
        }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'foo.example.com')
        d.addCallback(getOnePayload)
        d.addCallback(self.assertEqual, Record_A('10.0.0.3'))
        return d


    def test_missingGlue(self):
        """
        If an intermediate response includes no glue records for the
        authorities, separate queries are made to find those addresses.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'foo.example.com', A): {
                    'authority': [(b'foo.example.com', Record_NS(b'ns1.example.org'))],
                    # Conspicuous lack of an additional section naming ns1.example.com
                    },
                (b'ns1.example.org', A): {
                    'answers': [(b'ns1.example.org', Record_A('10.0.0.1'))],
                    },
                },
            ('10.0.0.1', 53): {
                (b'foo.example.com', A): {
                    'answers': [(b'foo.example.com', Record_A('10.0.0.2'))],
                    },
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'foo.example.com')
        d.addCallback(getOneAddress)
        d.addCallback(self.assertEqual, '10.0.0.2')
        return d


    def test_missingName(self):
        """
        If a name is missing, L{Resolver.lookupAddress} returns a L{Deferred}
        which fails with L{DNSNameError}.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'foo.example.com', A): {
                    'rCode': ENAME,
                    },
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'foo.example.com')
        return self.assertFailure(d, DNSNameError)


    def test_answerless(self):
        """
        If a query is responded to with no answers or nameserver records, the
        L{Deferred} returned by L{Resolver.lookupAddress} fires with
        L{ResolverError}.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'example.com', A): {
                    },
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'example.com')
        return self.assertFailure(d, ResolverError)


    def test_delegationLookupError(self):
        """
        If there is an error resolving the nameserver in a delegation response,
        the L{Deferred} returned by L{Resolver.lookupAddress} fires with that
        error.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'example.com', A): {
                    'authority': [(b'example.com', Record_NS(b'ns1.example.com'))],
                    },
                (b'ns1.example.com', A): {
                    'rCode': ENAME,
                    },
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'example.com')
        return self.assertFailure(d, DNSNameError)


    def test_delegationLookupEmpty(self):
        """
        If there are no records in the response to a lookup of a delegation
        nameserver, the L{Deferred} returned by L{Resolver.lookupAddress} fires
        with L{ResolverError}.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'example.com', A): {
                    'authority': [(b'example.com', Record_NS(b'ns1.example.com'))],
                    },
                (b'ns1.example.com', A): {
                    },
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'example.com')
        return self.assertFailure(d, ResolverError)


    def test_lookupNameservers(self):
        """
        L{Resolver.lookupNameservers} is like L{Resolver.lookupAddress}, except
        it queries for I{NS} records instead of I{A} records.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'example.com', A): {
                    'rCode': ENAME,
                    },
                (b'example.com', NS): {
                    'answers': [(b'example.com', Record_NS(b'ns1.example.com'))],
                    },
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupNameservers(b'example.com')
        def getOneName(results):
            ans, auth, add = results
            return ans[0].payload.name
        d.addCallback(getOneName)
        d.addCallback(self.assertEqual, Name(b'ns1.example.com'))
        return d


    def test_returnCanonicalName(self):
        """
        If a I{CNAME} record is encountered as the answer to a query for
        another record type, that record is returned as the answer.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'example.com', A): {
                    'answers': [(b'example.com', Record_CNAME(b'example.net')),
                                (b'example.net', Record_A('10.0.0.7'))],
                    },
                },
            }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'example.com')
        d.addCallback(lambda results: results[0]) # Get the answer section
        d.addCallback(
            self.assertEqual,
            [RRHeader(b'example.com', CNAME, payload=Record_CNAME(b'example.net')),
             RRHeader(b'example.net', A, payload=Record_A('10.0.0.7'))])
        return d


    def test_followCanonicalName(self):
        """
        If no record of the requested type is included in a response, but a
        I{CNAME} record for the query name is included, queries are made to
        resolve the value of the I{CNAME}.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'example.com', A): {
                    'answers': [(b'example.com', Record_CNAME(b'example.net'))],
                },
                (b'example.net', A): {
                    'answers': [(b'example.net', Record_A('10.0.0.5'))],
                },
            },
        }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'example.com')
        d.addCallback(lambda results: results[0]) # Get the answer section
        d.addCallback(
            self.assertEqual,
            [RRHeader(b'example.com', CNAME, payload=Record_CNAME(b'example.net')),
             RRHeader(b'example.net', A, payload=Record_A('10.0.0.5'))])
        return d


    def test_detectCanonicalNameLoop(self):
        """
        If there is a cycle between I{CNAME} records in a response, this is
        detected and the L{Deferred} returned by the lookup method fails
        with L{ResolverError}.
        """
        servers = {
            ('1.1.2.3', 53): {
                (b'example.com', A): {
                    'answers': [(b'example.com', Record_CNAME(b'example.net')),
                                (b'example.net', Record_CNAME(b'example.com'))],
                },
            },
        }
        resolver = self._getResolver(servers)
        d = resolver.lookupAddress(b'example.com')
        return self.assertFailure(d, ResolverError)


    def test_boundedQueries(self):
        """
        L{Resolver.lookupAddress} won't issue more queries following
        delegations than the limit passed to its initializer.
        """
        servers = {
            ('1.1.2.3', 53): {
                # First query - force it to start over with a name lookup of
                # ns1.example.com
                (b'example.com', A): {
                    'authority': [(b'example.com', Record_NS(b'ns1.example.com'))],
                },
                # Second query - let it resume the original lookup with the
                # address of the nameserver handling the delegation.
                (b'ns1.example.com', A): {
                    'answers': [(b'ns1.example.com', Record_A('10.0.0.2'))],
                },
            },
            ('10.0.0.2', 53): {
                # Third query - let it jump straight to asking the
                # delegation server by including its address here (different
                # case from the first query).
                (b'example.com', A): {
                    'authority': [(b'example.com', Record_NS(b'ns2.example.com'))],
                    'additional': [(b'ns2.example.com', Record_A('10.0.0.3'))],
                },
            },
            ('10.0.0.3', 53): {
                # Fourth query - give it the answer, we're done.
                (b'example.com', A): {
                    'answers': [(b'example.com', Record_A('10.0.0.4'))],
                },
            },
        }

        # Make two resolvers.  One which is allowed to make 3 queries
        # maximum, and so will fail, and on which may make 4, and so should
        # succeed.
        failer = self._getResolver(servers, 3)
        failD = self.assertFailure(
            failer.lookupAddress(b'example.com'), ResolverError)

        succeeder = self._getResolver(servers, 4)
        succeedD = succeeder.lookupAddress(b'example.com')
        succeedD.addCallback(getOnePayload)
        succeedD.addCallback(self.assertEqual, Record_A('10.0.0.4'))

        return gatherResults([failD, succeedD])



class ResolverFactoryArguments(Exception):
    """
    Raised by L{raisingResolverFactory} with the *args and **kwargs passed to
    that function.
    """
    def __init__(self, args, kwargs):
        """
        Store the supplied args and kwargs as attributes.

        @param args: Positional arguments.
        @param kwargs: Keyword arguments.
        """
        self.args = args
        self.kwargs = kwargs



def raisingResolverFactory(*args, **kwargs):
    """
    Raise a L{ResolverFactoryArguments} exception containing the
    positional and keyword arguments passed to resolverFactory.

    @param args: A L{list} of all the positional arguments supplied by
        the caller.

    @param kwargs: A L{list} of all the keyword arguments supplied by
        the caller.
    """
    raise ResolverFactoryArguments(args, kwargs)



class RootResolverResolverFactoryTests(TestCase):
    """
    Tests for L{root.Resolver._resolverFactory}.
    """
    def test_resolverFactoryArgumentPresent(self):
        """
        L{root.Resolver.__init__} accepts a C{resolverFactory}
        argument and assigns it to C{self._resolverFactory}.
        """
        r = Resolver(hints=[None], resolverFactory=raisingResolverFactory)
        self.assertIs(r._resolverFactory, raisingResolverFactory)


    def test_resolverFactoryArgumentAbsent(self):
        """
        L{root.Resolver.__init__} sets L{client.Resolver} as the
        C{_resolverFactory} if a C{resolverFactory} argument is not
        supplied.
        """
        r = Resolver(hints=[None])
        self.assertIs(r._resolverFactory, client.Resolver)


    def test_resolverFactoryOnlyExpectedArguments(self):
        """
        L{root.Resolver._resolverFactory} is supplied with C{reactor} and
        C{servers} keyword arguments.
        """
        dummyReactor = object()
        r = Resolver(hints=['192.0.2.101'],
                     resolverFactory=raisingResolverFactory,
                     reactor=dummyReactor)

        e = self.assertRaises(ResolverFactoryArguments,
                              r.lookupAddress, 'example.com')

        self.assertEqual(
            ((), {'reactor': dummyReactor, 'servers': [('192.0.2.101', 53)]}),
            (e.args, e.kwargs)
        )



ROOT_SERVERS = [
    'a.root-servers.net',
    'b.root-servers.net',
    'c.root-servers.net',
    'd.root-servers.net',
    'e.root-servers.net',
    'f.root-servers.net',
    'g.root-servers.net',
    'h.root-servers.net',
    'i.root-servers.net',
    'j.root-servers.net',
    'k.root-servers.net',
    'l.root-servers.net',
    'm.root-servers.net']



@implementer(IResolverSimple)
class StubResolver(object):
    """
    An L{IResolverSimple} implementer which traces all getHostByName
    calls and their deferred results. The deferred results can be
    accessed and fired synchronously.
    """
    def __init__(self):
        """
        @type calls: L{list} of L{tuple} containing C{args} and
            C{kwargs} supplied to C{getHostByName} calls.
        @type pendingResults: L{list} of L{Deferred} returned by
            C{getHostByName}.
        """
        self.calls = []
        self.pendingResults = []


    def getHostByName(self, *args, **kwargs):
        """
        A fake implementation of L{IResolverSimple.getHostByName}

        @param args: A L{list} of all the positional arguments supplied by
           the caller.

        @param kwargs: A L{list} of all the keyword arguments supplied by
           the caller.

        @return: A L{Deferred} which may be fired later from the test
            fixture.
        """
        self.calls.append((args, kwargs))
        d = Deferred()
        self.pendingResults.append(d)
        return d



verifyClass(IResolverSimple, StubResolver)



class BootstrapTests(SynchronousTestCase):
    """
    Tests for L{root.bootstrap}
    """
    def test_returnsDeferredResolver(self):
        """
        L{root.bootstrap} returns an object which is initially a
        L{root.DeferredResolver}.
        """
        deferredResolver = root.bootstrap(StubResolver())
        self.assertIsInstance(deferredResolver, root.DeferredResolver)


    def test_resolves13RootServers(self):
        """
        The L{IResolverSimple} supplied to L{root.bootstrap} is used to lookup
        the IP addresses of the 13 root name servers.
        """
        stubResolver = StubResolver()
        root.bootstrap(stubResolver)
        self.assertEqual(
            stubResolver.calls,
            [((s,), {}) for s in ROOT_SERVERS])


    def test_becomesResolver(self):
        """
        The L{root.DeferredResolver} initially returned by L{root.bootstrap}
        becomes a L{root.Resolver} when the supplied resolver has successfully
        looked up all root hints.
        """
        stubResolver = StubResolver()
        deferredResolver = root.bootstrap(stubResolver)
        for d in stubResolver.pendingResults:
            d.callback('192.0.2.101')
        self.assertIsInstance(deferredResolver, Resolver)


    def test_resolverReceivesRootHints(self):
        """
        The L{root.Resolver} which eventually replaces L{root.DeferredResolver}
        is supplied with the IP addresses of the 13 root servers.
        """
        stubResolver = StubResolver()
        deferredResolver = root.bootstrap(stubResolver)
        for d in stubResolver.pendingResults:
            d.callback('192.0.2.101')
        self.assertEqual(deferredResolver.hints, ['192.0.2.101'] * 13)


    def test_continuesWhenSomeRootHintsFail(self):
        """
        The L{root.Resolver} is eventually created, even if some of the root
        hint lookups fail. Only the working root hint IP addresses are supplied
        to the L{root.Resolver}.
        """
        stubResolver = StubResolver()
        deferredResolver = root.bootstrap(stubResolver)
        results = iter(stubResolver.pendingResults)
        d1 = next(results)
        for d in results:
            d.callback('192.0.2.101')
        d1.errback(TimeoutError())

        def checkHints(res):
            self.assertEqual(deferredResolver.hints, ['192.0.2.101'] * 12)
        d1.addBoth(checkHints)


    def test_continuesWhenAllRootHintsFail(self):
        """
        The L{root.Resolver} is eventually created, even if all of the root hint
        lookups fail. Pending and new lookups will then fail with
        AttributeError.
        """
        stubResolver = StubResolver()
        deferredResolver = root.bootstrap(stubResolver)
        results = iter(stubResolver.pendingResults)
        d1 = next(results)
        for d in results:
            d.errback(TimeoutError())
        d1.errback(TimeoutError())

        def checkHints(res):
            self.assertEqual(deferredResolver.hints, [])
        d1.addBoth(checkHints)

        self.addCleanup(self.flushLoggedErrors, TimeoutError)


    def test_passesResolverFactory(self):
        """
        L{root.bootstrap} accepts a C{resolverFactory} argument which is passed
        as an argument to L{root.Resolver} when it has successfully looked up
        root hints.
        """
        stubResolver = StubResolver()
        deferredResolver = root.bootstrap(
            stubResolver, resolverFactory=raisingResolverFactory)

        for d in stubResolver.pendingResults:
            d.callback('192.0.2.101')

        self.assertIs(
            deferredResolver._resolverFactory, raisingResolverFactory)



class StubDNSDatagramProtocol:
    """
    A do-nothing stand-in for L{DNSDatagramProtocol} which can be used to avoid
    network traffic in tests where that kind of thing doesn't matter.
    """
    def query(self, *a, **kw):
        return Deferred()



_retrySuppression = util.suppress(
    category=DeprecationWarning,
    message=(
        'twisted.names.root.retry is deprecated since Twisted 10.0.  Use a '
        'Resolver object for retry logic.'))