HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/twisted/web/test/test_xmlrpc.py
# -*- test-case-name: twisted.web.test.test_xmlrpc -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for  XML-RPC support in L{twisted.web.xmlrpc}.
"""

from __future__ import division, absolute_import

from twisted.python.compat import nativeString, networkString, NativeStringIO
from io import BytesIO

import datetime

from twisted.trial import unittest
from twisted.web import xmlrpc
from twisted.web.xmlrpc import XMLRPC, payloadTemplate, addIntrospection
from twisted.web.xmlrpc import _QueryFactory, withRequest, xmlrpclib
from twisted.web import server, client, http, static
from twisted.internet import reactor, defer
from twisted.internet.error import ConnectionDone
from twisted.python import failure
from twisted.python.reflect import namedModule
from twisted.test.proto_helpers import MemoryReactor, EventLoggingObserver
from twisted.web.test.test_web import DummyRequest
from twisted.logger import (globalLogPublisher, FilteringLogObserver,
                            LogLevelFilterPredicate, LogLevel)
try:
    namedModule('twisted.internet.ssl')
except ImportError:
    sslSkip = "OpenSSL not present"
else:
    sslSkip = None


class AsyncXMLRPCTests(unittest.TestCase):
    """
    Tests for L{XMLRPC}'s support of Deferreds.
    """
    def setUp(self):
        self.request = DummyRequest([''])
        self.request.method = 'POST'
        self.request.content = NativeStringIO(
            payloadTemplate % ('async', xmlrpclib.dumps(())))

        result = self.result = defer.Deferred()
        class AsyncResource(XMLRPC):
            def xmlrpc_async(self):
                return result

        self.resource = AsyncResource()


    def test_deferredResponse(self):
        """
        If an L{XMLRPC} C{xmlrpc_*} method returns a L{defer.Deferred}, the
        response to the request is the result of that L{defer.Deferred}.
        """
        self.resource.render(self.request)
        self.assertEqual(self.request.written, [])

        self.result.callback("result")

        resp = xmlrpclib.loads(b"".join(self.request.written))
        self.assertEqual(resp, (('result',), None))
        self.assertEqual(self.request.finished, 1)


    def test_interruptedDeferredResponse(self):
        """
        While waiting for the L{Deferred} returned by an L{XMLRPC} C{xmlrpc_*}
        method to fire, the connection the request was issued over may close.
        If this happens, neither C{write} nor C{finish} is called on the
        request.
        """
        self.resource.render(self.request)
        self.request.processingFailed(
            failure.Failure(ConnectionDone("Simulated")))
        self.result.callback("result")
        self.assertEqual(self.request.written, [])
        self.assertEqual(self.request.finished, 0)



class TestRuntimeError(RuntimeError):
    pass



class TestValueError(ValueError):
    pass



class Test(XMLRPC):

    # If you add xmlrpc_ methods to this class, go change test_listMethods
    # below.

    FAILURE = 666
    NOT_FOUND = 23
    SESSION_EXPIRED = 42

    def xmlrpc_echo(self, arg):
        return arg

    # the doc string is part of the test
    def xmlrpc_add(self, a, b):
        """
        This function add two numbers.
        """
        return a + b

    xmlrpc_add.signature = [['int', 'int', 'int'],
                            ['double', 'double', 'double']]

    # the doc string is part of the test
    def xmlrpc_pair(self, string, num):
        """
        This function puts the two arguments in an array.
        """
        return [string, num]

    xmlrpc_pair.signature = [['array', 'string', 'int']]

    # the doc string is part of the test
    def xmlrpc_defer(self, x):
        """Help for defer."""
        return defer.succeed(x)

    def xmlrpc_deferFail(self):
        return defer.fail(TestValueError())

    # don't add a doc string, it's part of the test
    def xmlrpc_fail(self):
        raise TestRuntimeError

    def xmlrpc_fault(self):
        return xmlrpc.Fault(12, "hello")

    def xmlrpc_deferFault(self):
        return defer.fail(xmlrpc.Fault(17, "hi"))

    def xmlrpc_snowman(self, payload):
        """
        Used to test that we can pass Unicode.
        """
        snowman = u"\u2603"
        if snowman != payload:
            return xmlrpc.Fault(13, "Payload not unicode snowman")
        return snowman

    def xmlrpc_complex(self):
        return {"a": ["b", "c", 12, []], "D": "foo"}

    def xmlrpc_dict(self, map, key):
        return map[key]
    xmlrpc_dict.help = 'Help for dict.'

    @withRequest
    def xmlrpc_withRequest(self, request, other):
        """
        A method decorated with L{withRequest} which can be called by
        a test to verify that the request object really is passed as
        an argument.
        """
        return (
            # as a proof that request is a request
            request.method +
            # plus proof other arguments are still passed along
            ' ' + other)


    def lookupProcedure(self, procedurePath):
        try:
            return XMLRPC.lookupProcedure(self, procedurePath)
        except xmlrpc.NoSuchFunction:
            if procedurePath.startswith("SESSION"):
                raise xmlrpc.Fault(self.SESSION_EXPIRED,
                                   "Session non-existent/expired.")
            else:
                raise



class TestLookupProcedure(XMLRPC):
    """
    This is a resource which customizes procedure lookup to be used by the tests
    of support for this customization.
    """
    def echo(self, x):
        return x


    def lookupProcedure(self, procedureName):
        """
        Lookup a procedure from a fixed set of choices, either I{echo} or
        I{system.listeMethods}.
        """
        if procedureName == 'echo':
            return self.echo
        raise xmlrpc.NoSuchFunction(
            self.NOT_FOUND, 'procedure %s not found' % (procedureName,))



class TestListProcedures(XMLRPC):
    """
    This is a resource which customizes procedure enumeration to be used by the
    tests of support for this customization.
    """
    def listProcedures(self):
        """
        Return a list of a single method this resource will claim to support.
        """
        return ['foo']



class TestAuthHeader(Test):
    """
    This is used to get the header info so that we can test
    authentication.
    """
    def __init__(self):
        Test.__init__(self)
        self.request = None

    def render(self, request):
        self.request = request
        return Test.render(self, request)

    def xmlrpc_authinfo(self):
        return self.request.getUser(), self.request.getPassword()



class TestQueryProtocol(xmlrpc.QueryProtocol):
    """
    QueryProtocol for tests that saves headers received and sent,
    inside the factory.
    """

    def connectionMade(self):
        self.factory.transport = self.transport
        xmlrpc.QueryProtocol.connectionMade(self)

    def handleHeader(self, key, val):
        self.factory.headers[key.lower()] = val

    def sendHeader(self, key, val):
        """
        Keep sent headers so we can inspect them later.
        """
        self.factory.sent_headers[key.lower()] = val
        xmlrpc.QueryProtocol.sendHeader(self, key, val)



class TestQueryFactory(xmlrpc._QueryFactory):
    """
    QueryFactory using L{TestQueryProtocol} for saving headers.
    """
    protocol = TestQueryProtocol

    def __init__(self, *args, **kwargs):
        self.headers = {}
        self.sent_headers = {}
        xmlrpc._QueryFactory.__init__(self, *args, **kwargs)


class TestQueryFactoryCancel(xmlrpc._QueryFactory):
    """
    QueryFactory that saves a reference to the
    L{twisted.internet.interfaces.IConnector} to test connection lost.
    """

    def startedConnecting(self, connector):
        self.connector = connector


class XMLRPCTests(unittest.TestCase):

    def setUp(self):
        self.p = reactor.listenTCP(0, server.Site(Test()),
                                   interface="127.0.0.1")
        self.port = self.p.getHost().port
        self.factories = []

    def tearDown(self):
        self.factories = []
        return self.p.stopListening()

    def queryFactory(self, *args, **kwargs):
        """
        Specific queryFactory for proxy that uses our custom
        L{TestQueryFactory}, and save factories.
        """
        factory = TestQueryFactory(*args, **kwargs)
        self.factories.append(factory)
        return factory

    def proxy(self, factory=None):
        """
        Return a new xmlrpc.Proxy for the test site created in
        setUp(), using the given factory as the queryFactory, or
        self.queryFactory if no factory is provided.
        """
        p = xmlrpc.Proxy(networkString("http://127.0.0.1:%d/" % self.port))
        if factory is None:
            p.queryFactory = self.queryFactory
        else:
            p.queryFactory = factory
        return p

    def test_results(self):
        inputOutput = [
            ("add", (2, 3), 5),
            ("defer", ("a",), "a"),
            ("dict", ({"a": 1}, "a"), 1),
            ("pair", ("a", 1), ["a", 1]),
            ("snowman", (u"\u2603"), u"\u2603"),
            ("complex", (), {"a": ["b", "c", 12, []], "D": "foo"})]

        dl = []
        for meth, args, outp in inputOutput:
            d = self.proxy().callRemote(meth, *args)
            d.addCallback(self.assertEqual, outp)
            dl.append(d)
        return defer.DeferredList(dl, fireOnOneErrback=True)


    def test_headers(self):
        """
        Verify that headers sent from the client side and the ones we
        get back from the server side are correct.

        """
        d = self.proxy().callRemote("snowman", u"\u2603")

        def check_server_headers(ing):
            self.assertEqual(
                self.factories[0].headers[b'content-type'],
                b'text/xml; charset=utf-8')
            self.assertEqual(
                self.factories[0].headers[b'content-length'], b'129')

        def check_client_headers(ign):
            self.assertEqual(
                self.factories[0].sent_headers[b'user-agent'],
                b'Twisted/XMLRPClib')
            self.assertEqual(
                self.factories[0].sent_headers[b'content-type'],
                b'text/xml; charset=utf-8')
            self.assertEqual(
                self.factories[0].sent_headers[b'content-length'], b'155')

        d.addCallback(check_server_headers)
        d.addCallback(check_client_headers)
        return d


    def test_errors(self):
        """
        Verify that for each way a method exposed via XML-RPC can fail, the
        correct 'Content-type' header is set in the response and that the
        client-side Deferred is errbacked with an appropriate C{Fault}
        instance.
        """
        logObserver = EventLoggingObserver()
        filtered = FilteringLogObserver(
            logObserver,
            [LogLevelFilterPredicate(defaultLogLevel=LogLevel.critical)]
        )
        globalLogPublisher.addObserver(filtered)
        self.addCleanup(lambda: globalLogPublisher.removeObserver(filtered))
        dl = []
        for code, methodName in [(666, "fail"), (666, "deferFail"),
                                 (12, "fault"), (23, "noSuchMethod"),
                                 (17, "deferFault"), (42, "SESSION_TEST")]:
            d = self.proxy().callRemote(methodName)
            d = self.assertFailure(d, xmlrpc.Fault)
            d.addCallback(lambda exc, code=code:
                self.assertEqual(exc.faultCode, code))
            dl.append(d)
        d = defer.DeferredList(dl, fireOnOneErrback=True)
        def cb(ign):
            for factory in self.factories:
                self.assertEqual(factory.headers[b'content-type'],
                                  b'text/xml; charset=utf-8')
            self.assertEquals(2, len(logObserver))
            f1 = logObserver[0]["log_failure"].value
            f2 = logObserver[1]["log_failure"].value

            if isinstance(f1, TestValueError):
                self.assertIsInstance(f2, TestRuntimeError)
            else:
                self.assertIsInstance(f1, TestRuntimeError)
                self.assertIsInstance(f2, TestValueError)

            self.flushLoggedErrors(TestRuntimeError, TestValueError)
        d.addCallback(cb)
        return d


    def test_cancel(self):
        """
        A deferred from the Proxy can be cancelled, disconnecting
        the L{twisted.internet.interfaces.IConnector}.
        """
        def factory(*args, **kw):
            factory.f = TestQueryFactoryCancel(*args, **kw)
            return factory.f
        d = self.proxy(factory).callRemote('add', 2, 3)
        self.assertNotEqual(factory.f.connector.state, "disconnected")
        d.cancel()
        self.assertEqual(factory.f.connector.state, "disconnected")
        d = self.assertFailure(d, defer.CancelledError)
        return d


    def test_errorGet(self):
        """
        A classic GET on the xml server should return a NOT_ALLOWED.
        """
        agent = client.Agent(reactor)
        d = agent.request(b"GET", networkString("http://127.0.0.1:%d/" % (self.port,)))
        def checkResponse(response):
            self.assertEqual(response.code, http.NOT_ALLOWED)
        d.addCallback(checkResponse)
        return d

    def test_errorXMLContent(self):
        """
        Test that an invalid XML input returns an L{xmlrpc.Fault}.
        """
        agent = client.Agent(reactor)
        d = agent.request(
            uri=networkString("http://127.0.0.1:%d/" % (self.port,)),
            method=b"POST",
            bodyProducer=client.FileBodyProducer(BytesIO(b"foo")))
        d.addCallback(client.readBody)
        def cb(result):
            self.assertRaises(xmlrpc.Fault, xmlrpclib.loads, result)
        d.addCallback(cb)
        return d


    def test_datetimeRoundtrip(self):

        """
        If an L{xmlrpclib.DateTime} is passed as an argument to an XML-RPC
        call and then returned by the server unmodified, the result should
        be equal to the original object.
        """
        when = xmlrpclib.DateTime()
        d = self.proxy().callRemote("echo", when)
        d.addCallback(self.assertEqual, when)
        return d


    def test_doubleEncodingError(self):
        """
        If it is not possible to encode a response to the request (for example,
        because L{xmlrpclib.dumps} raises an exception when encoding a
        L{Fault}) the exception which prevents the response from being
        generated is logged and the request object is finished anyway.
        """
        logObserver = EventLoggingObserver()
        filtered = FilteringLogObserver(
            logObserver,
            [LogLevelFilterPredicate(defaultLogLevel=LogLevel.critical)]
        )
        globalLogPublisher.addObserver(filtered)
        self.addCleanup(lambda: globalLogPublisher.removeObserver(filtered))
        d = self.proxy().callRemote("echo", "")

        # *Now* break xmlrpclib.dumps.  Hopefully the client already used it.
        def fakeDumps(*args, **kwargs):
            raise RuntimeError("Cannot encode anything at all!")
        self.patch(xmlrpclib, 'dumps', fakeDumps)

        # It doesn't matter how it fails, so long as it does.  Also, it happens
        # to fail with an implementation detail exception right now, not
        # something suitable as part of a public interface.
        d = self.assertFailure(d, Exception)

        def cbFailed(ignored):
            # The fakeDumps exception should have been logged.
            self.assertEquals(1, len(logObserver))
            self.assertIsInstance(
                logObserver[0]["log_failure"].value,
                RuntimeError
            )
            self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
        d.addCallback(cbFailed)
        return d


    def test_closeConnectionAfterRequest(self):
        """
        The connection to the web server is closed when the request is done.
        """
        d = self.proxy().callRemote('echo', '')
        def responseDone(ignored):
            [factory] = self.factories
            self.assertFalse(factory.transport.connected)
            self.assertTrue(factory.transport.disconnected)
        return d.addCallback(responseDone)


    def test_tcpTimeout(self):
        """
        For I{HTTP} URIs, L{xmlrpc.Proxy.callRemote} passes the value it
        received for the C{connectTimeout} parameter as the C{timeout} argument
        to the underlying connectTCP call.
        """
        reactor = MemoryReactor()
        proxy = xmlrpc.Proxy(b"http://127.0.0.1:69", connectTimeout=2.0,
                             reactor=reactor)
        proxy.callRemote("someMethod")
        self.assertEqual(reactor.tcpClients[0][3], 2.0)


    def test_sslTimeout(self):
        """
        For I{HTTPS} URIs, L{xmlrpc.Proxy.callRemote} passes the value it
        received for the C{connectTimeout} parameter as the C{timeout} argument
        to the underlying connectSSL call.
        """
        reactor = MemoryReactor()
        proxy = xmlrpc.Proxy(b"https://127.0.0.1:69", connectTimeout=3.0,
                             reactor=reactor)
        proxy.callRemote("someMethod")
        self.assertEqual(reactor.sslClients[0][4], 3.0)
    test_sslTimeout.skip = sslSkip



class XMLRPCProxyWithoutSlashTests(XMLRPCTests):
    """
    Test with proxy that doesn't add a slash.
    """

    def proxy(self, factory=None):
        p = xmlrpc.Proxy(networkString("http://127.0.0.1:%d" % self.port))
        if factory is None:
            p.queryFactory = self.queryFactory
        else:
            p.queryFactory = factory
        return p



class XMLRPCPublicLookupProcedureTests(unittest.TestCase):
    """
    Tests for L{XMLRPC}'s support of subclasses which override
    C{lookupProcedure} and C{listProcedures}.
    """

    def createServer(self, resource):
        self.p = reactor.listenTCP(
            0, server.Site(resource), interface="127.0.0.1")
        self.addCleanup(self.p.stopListening)
        self.port = self.p.getHost().port
        self.proxy = xmlrpc.Proxy(
            networkString('http://127.0.0.1:%d' % self.port))


    def test_lookupProcedure(self):
        """
        A subclass of L{XMLRPC} can override C{lookupProcedure} to find
        procedures that are not defined using a C{xmlrpc_}-prefixed method name.
        """
        self.createServer(TestLookupProcedure())
        what = "hello"
        d = self.proxy.callRemote("echo", what)
        d.addCallback(self.assertEqual, what)
        return d


    def test_errors(self):
        """
        A subclass of L{XMLRPC} can override C{lookupProcedure} to raise
        L{NoSuchFunction} to indicate that a requested method is not available
        to be called, signalling a fault to the XML-RPC client.
        """
        self.createServer(TestLookupProcedure())
        d = self.proxy.callRemote("xxxx", "hello")
        d = self.assertFailure(d, xmlrpc.Fault)
        return d


    def test_listMethods(self):
        """
        A subclass of L{XMLRPC} can override C{listProcedures} to define
        Overriding listProcedures should prevent introspection from being
        broken.
        """
        resource = TestListProcedures()
        addIntrospection(resource)
        self.createServer(resource)
        d = self.proxy.callRemote("system.listMethods")
        def listed(procedures):
            # The list will also include other introspection procedures added by
            # addIntrospection.  We just want to see "foo" from our customized
            # listProcedures.
            self.assertIn('foo', procedures)
        d.addCallback(listed)
        return d



class SerializationConfigMixin:
    """
    Mixin which defines a couple tests which should pass when a particular flag
    is passed to L{XMLRPC}.

    These are not meant to be exhaustive serialization tests, since L{xmlrpclib}
    does all of the actual serialization work.  They are just meant to exercise
    a few codepaths to make sure we are calling into xmlrpclib correctly.

    @ivar flagName: A C{str} giving the name of the flag which must be passed to
        L{XMLRPC} to allow the tests to pass.  Subclasses should set this.

    @ivar value: A value which the specified flag will allow the serialization
        of.  Subclasses should set this.
    """
    def setUp(self):
        """
        Create a new XML-RPC server with C{allowNone} set to C{True}.
        """
        kwargs = {self.flagName: True}
        self.p = reactor.listenTCP(
            0, server.Site(Test(**kwargs)), interface="127.0.0.1")
        self.addCleanup(self.p.stopListening)
        self.port = self.p.getHost().port
        self.proxy = xmlrpc.Proxy(
            networkString("http://127.0.0.1:%d/" % (self.port,)), **kwargs)


    def test_roundtripValue(self):
        """
        C{self.value} can be round-tripped over an XMLRPC method call/response.
        """
        d = self.proxy.callRemote('defer', self.value)
        d.addCallback(self.assertEqual, self.value)
        return d


    def test_roundtripNestedValue(self):
        """
        A C{dict} which contains C{self.value} can be round-tripped over an
        XMLRPC method call/response.
        """
        d = self.proxy.callRemote('defer', {'a': self.value})
        d.addCallback(self.assertEqual, {'a': self.value})
        return d



class XMLRPCAllowNoneTests(SerializationConfigMixin, unittest.TestCase):
    """
    Tests for passing L{None} when the C{allowNone} flag is set.
    """
    flagName = "allowNone"
    value = None


class XMLRPCUseDateTimeTests(SerializationConfigMixin, unittest.TestCase):
    """
    Tests for passing a C{datetime.datetime} instance when the C{useDateTime}
    flag is set.
    """
    flagName = "useDateTime"
    value = datetime.datetime(2000, 12, 28, 3, 45, 59)


class XMLRPCAuthenticatedTests(XMLRPCTests):
    """
    Test with authenticated proxy. We run this with the same input/output as
    above.
    """
    user = b"username"
    password = b"asecret"

    def setUp(self):
        self.p = reactor.listenTCP(0, server.Site(TestAuthHeader()),
                                   interface="127.0.0.1")
        self.port = self.p.getHost().port
        self.factories = []


    def test_authInfoInURL(self):
        url = "http://%s:%s@127.0.0.1:%d/" % (
            nativeString(self.user), nativeString(self.password), self.port)
        p = xmlrpc.Proxy(networkString(url))
        d = p.callRemote("authinfo")
        d.addCallback(self.assertEqual, [self.user, self.password])
        return d


    def test_explicitAuthInfo(self):
        p = xmlrpc.Proxy(networkString("http://127.0.0.1:%d/" % (
            self.port,)), self.user, self.password)
        d = p.callRemote("authinfo")
        d.addCallback(self.assertEqual, [self.user, self.password])
        return d


    def test_longPassword(self):
        """
        C{QueryProtocol} uses the C{base64.b64encode} function to encode user
        name and password in the I{Authorization} header, so that it doesn't
        embed new lines when using long inputs.
        """
        longPassword = self.password * 40
        p = xmlrpc.Proxy(networkString("http://127.0.0.1:%d/" % (
            self.port,)), self.user, longPassword)
        d = p.callRemote("authinfo")
        d.addCallback(self.assertEqual, [self.user, longPassword])
        return d


    def test_explicitAuthInfoOverride(self):
        p = xmlrpc.Proxy(networkString("http://wrong:info@127.0.0.1:%d/" % (
            self.port,)), self.user, self.password)
        d = p.callRemote("authinfo")
        d.addCallback(self.assertEqual, [self.user, self.password])
        return d


class XMLRPCIntrospectionTests(XMLRPCTests):

    def setUp(self):
        xmlrpc = Test()
        addIntrospection(xmlrpc)
        self.p = reactor.listenTCP(0, server.Site(xmlrpc),interface="127.0.0.1")
        self.port = self.p.getHost().port
        self.factories = []

    def test_listMethods(self):

        def cbMethods(meths):
            meths.sort()
            self.assertEqual(
                meths,
                ['add', 'complex', 'defer', 'deferFail',
                 'deferFault', 'dict', 'echo', 'fail', 'fault',
                 'pair', 'snowman', 'system.listMethods',
                 'system.methodHelp',
                 'system.methodSignature', 'withRequest'])

        d = self.proxy().callRemote("system.listMethods")
        d.addCallback(cbMethods)
        return d

    def test_methodHelp(self):
        inputOutputs = [
            ("defer", "Help for defer."),
            ("fail", ""),
            ("dict", "Help for dict.")]

        dl = []
        for meth, expected in inputOutputs:
            d = self.proxy().callRemote("system.methodHelp", meth)
            d.addCallback(self.assertEqual, expected)
            dl.append(d)
        return defer.DeferredList(dl, fireOnOneErrback=True)

    def test_methodSignature(self):
        inputOutputs = [
            ("defer", ""),
            ("add", [['int', 'int', 'int'],
                     ['double', 'double', 'double']]),
            ("pair", [['array', 'string', 'int']])]

        dl = []
        for meth, expected in inputOutputs:
            d = self.proxy().callRemote("system.methodSignature", meth)
            d.addCallback(self.assertEqual, expected)
            dl.append(d)
        return defer.DeferredList(dl, fireOnOneErrback=True)


class XMLRPCClientErrorHandlingTests(unittest.TestCase):
    """
    Test error handling on the xmlrpc client.
    """
    def setUp(self):
        self.resource = static.Data(
            b"This text is not a valid XML-RPC response.",
            b"text/plain")
        self.resource.isLeaf = True

        self.port = reactor.listenTCP(0, server.Site(self.resource),
                                                     interface='127.0.0.1')

    def tearDown(self):
        return self.port.stopListening()

    def test_erroneousResponse(self):
        """
        Test that calling the xmlrpc client on a static http server raises
        an exception.
        """
        proxy = xmlrpc.Proxy(networkString("http://127.0.0.1:%d/" %
                                           (self.port.getHost().port,)))
        return self.assertFailure(proxy.callRemote("someMethod"), ValueError)



class QueryFactoryParseResponseTests(unittest.TestCase):
    """
    Test the behaviour of L{_QueryFactory.parseResponse}.
    """

    def setUp(self):
        # The _QueryFactory that we are testing. We don't care about any
        # of the constructor parameters.
        self.queryFactory = _QueryFactory(
            path=None, host=None, method='POST', user=None, password=None,
            allowNone=False, args=())
        # An XML-RPC response that will parse without raising an error.
        self.goodContents = xmlrpclib.dumps(('',))
        # An 'XML-RPC response' that will raise a parsing error.
        self.badContents = 'invalid xml'
        # A dummy 'reason' to pass to clientConnectionLost. We don't care
        # what it is.
        self.reason = failure.Failure(ConnectionDone())


    def test_parseResponseCallbackSafety(self):
        """
        We can safely call L{_QueryFactory.clientConnectionLost} as a callback
        of L{_QueryFactory.parseResponse}.
        """
        d = self.queryFactory.deferred
        # The failure mode is that this callback raises an AlreadyCalled
        # error. We have to add it now so that it gets called synchronously
        # and triggers the race condition.
        d.addCallback(self.queryFactory.clientConnectionLost, self.reason)
        self.queryFactory.parseResponse(self.goodContents)
        return d


    def test_parseResponseErrbackSafety(self):
        """
        We can safely call L{_QueryFactory.clientConnectionLost} as an errback
        of L{_QueryFactory.parseResponse}.
        """
        d = self.queryFactory.deferred
        # The failure mode is that this callback raises an AlreadyCalled
        # error. We have to add it now so that it gets called synchronously
        # and triggers the race condition.
        d.addErrback(self.queryFactory.clientConnectionLost, self.reason)
        self.queryFactory.parseResponse(self.badContents)
        return d


    def test_badStatusErrbackSafety(self):
        """
        We can safely call L{_QueryFactory.clientConnectionLost} as an errback
        of L{_QueryFactory.badStatus}.
        """
        d = self.queryFactory.deferred
        # The failure mode is that this callback raises an AlreadyCalled
        # error. We have to add it now so that it gets called synchronously
        # and triggers the race condition.
        d.addErrback(self.queryFactory.clientConnectionLost, self.reason)
        self.queryFactory.badStatus('status', 'message')
        return d

    def test_parseResponseWithoutData(self):
        """
        Some server can send a response without any data:
        L{_QueryFactory.parseResponse} should catch the error and call the
        result errback.
        """
        content = """
<methodResponse>
 <params>
  <param>
  </param>
 </params>
</methodResponse>"""
        d = self.queryFactory.deferred
        self.queryFactory.parseResponse(content)
        return self.assertFailure(d, IndexError)



class XMLRPCWithRequestTests(unittest.TestCase):

    def setUp(self):
        self.resource = Test()


    def test_withRequest(self):
        """
        When an XML-RPC method is called and the implementation is
        decorated with L{withRequest}, the request object is passed as
        the first argument.
        """
        request = DummyRequest('/RPC2')
        request.method = "POST"
        request.content = NativeStringIO(xmlrpclib.dumps(
            ("foo",), 'withRequest'))
        def valid(n, request):
            data = xmlrpclib.loads(request.written[0])
            self.assertEqual(data, (('POST foo',), None))
        d = request.notifyFinish().addCallback(valid, request)
        self.resource.render_POST(request)
        return d