HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/usr/lib/python3/dist-packages/twisted/test/test_stringtransport.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.test.proto_helpers}.
"""

from zope.interface.verify import verifyObject

from twisted.internet.interfaces import (ITransport, IPushProducer, IConsumer,
    IReactorTCP, IReactorSSL, IReactorUNIX, IAddress, IListeningPort,
    IConnector)
from twisted.internet.address import IPv4Address
from twisted.trial.unittest import TestCase
from twisted.test.proto_helpers import (StringTransport, MemoryReactor,
    RaisingMemoryReactor, NonStreamingProducer)
from twisted.internet.protocol import ClientFactory, Factory


class StringTransportTests(TestCase):
    """
    Tests for L{twisted.test.proto_helpers.StringTransport}.
    """
    def setUp(self):
        self.transport = StringTransport()


    def test_interfaces(self):
        """
        L{StringTransport} instances provide L{ITransport}, L{IPushProducer},
        and L{IConsumer}.
        """
        self.assertTrue(verifyObject(ITransport, self.transport))
        self.assertTrue(verifyObject(IPushProducer, self.transport))
        self.assertTrue(verifyObject(IConsumer, self.transport))


    def test_registerProducer(self):
        """
        L{StringTransport.registerProducer} records the arguments supplied to
        it as instance attributes.
        """
        producer = object()
        streaming = object()
        self.transport.registerProducer(producer, streaming)
        self.assertIs(self.transport.producer, producer)
        self.assertIs(self.transport.streaming, streaming)


    def test_disallowedRegisterProducer(self):
        """
        L{StringTransport.registerProducer} raises L{RuntimeError} if a
        producer is already registered.
        """
        producer = object()
        self.transport.registerProducer(producer, True)
        self.assertRaises(
            RuntimeError, self.transport.registerProducer, object(), False)
        self.assertIs(self.transport.producer, producer)
        self.assertTrue(self.transport.streaming)


    def test_unregisterProducer(self):
        """
        L{StringTransport.unregisterProducer} causes the transport to forget
        about the registered producer and makes it possible to register a new
        one.
        """
        oldProducer = object()
        newProducer = object()
        self.transport.registerProducer(oldProducer, False)
        self.transport.unregisterProducer()
        self.assertIsNone(self.transport.producer)
        self.transport.registerProducer(newProducer, True)
        self.assertIs(self.transport.producer, newProducer)
        self.assertTrue(self.transport.streaming)


    def test_invalidUnregisterProducer(self):
        """
        L{StringTransport.unregisterProducer} raises L{RuntimeError} if called
        when no producer is registered.
        """
        self.assertRaises(RuntimeError, self.transport.unregisterProducer)


    def test_initialProducerState(self):
        """
        L{StringTransport.producerState} is initially C{'producing'}.
        """
        self.assertEqual(self.transport.producerState, 'producing')


    def test_pauseProducing(self):
        """
        L{StringTransport.pauseProducing} changes the C{producerState} of the
        transport to C{'paused'}.
        """
        self.transport.pauseProducing()
        self.assertEqual(self.transport.producerState, 'paused')


    def test_resumeProducing(self):
        """
        L{StringTransport.resumeProducing} changes the C{producerState} of the
        transport to C{'producing'}.
        """
        self.transport.pauseProducing()
        self.transport.resumeProducing()
        self.assertEqual(self.transport.producerState, 'producing')


    def test_stopProducing(self):
        """
        L{StringTransport.stopProducing} changes the C{'producerState'} of the
        transport to C{'stopped'}.
        """
        self.transport.stopProducing()
        self.assertEqual(self.transport.producerState, 'stopped')


    def test_stoppedTransportCannotPause(self):
        """
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport has been stopped.
        """
        self.transport.stopProducing()
        self.assertRaises(RuntimeError, self.transport.pauseProducing)


    def test_stoppedTransportCannotResume(self):
        """
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport has been stopped.
        """
        self.transport.stopProducing()
        self.assertRaises(RuntimeError, self.transport.resumeProducing)


    def test_disconnectingTransportCannotPause(self):
        """
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        """
        self.transport.loseConnection()
        self.assertRaises(RuntimeError, self.transport.pauseProducing)


    def test_disconnectingTransportCannotResume(self):
        """
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        """
        self.transport.loseConnection()
        self.assertRaises(RuntimeError, self.transport.resumeProducing)


    def test_loseConnectionSetsDisconnecting(self):
        """
        L{StringTransport.loseConnection} toggles the C{disconnecting} instance
        variable to C{True}.
        """
        self.assertFalse(self.transport.disconnecting)
        self.transport.loseConnection()
        self.assertTrue(self.transport.disconnecting)


    def test_specifiedHostAddress(self):
        """
        If a host address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getHost}.
        """
        address = object()
        self.assertIs(StringTransport(address).getHost(), address)


    def test_specifiedPeerAddress(self):
        """
        If a peer address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getPeer}.
        """
        address = object()
        self.assertIs(
            StringTransport(peerAddress=address).getPeer(), address)


    def test_defaultHostAddress(self):
        """
        If no host address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getHost}.
        """
        address = StringTransport().getHost()
        self.assertIsInstance(address, IPv4Address)


    def test_defaultPeerAddress(self):
        """
        If no peer address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getPeer}.
        """
        address = StringTransport().getPeer()
        self.assertIsInstance(address, IPv4Address)



class ReactorTests(TestCase):
    """
    Tests for L{MemoryReactor} and L{RaisingMemoryReactor}.
    """

    def test_memoryReactorProvides(self):
        """
        L{MemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        """
        memoryReactor = MemoryReactor()
        verifyObject(IReactorTCP, memoryReactor)
        verifyObject(IReactorSSL, memoryReactor)
        verifyObject(IReactorUNIX, memoryReactor)


    def test_raisingReactorProvides(self):
        """
        L{RaisingMemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        """
        raisingReactor = RaisingMemoryReactor()
        verifyObject(IReactorTCP, raisingReactor)
        verifyObject(IReactorSSL, raisingReactor)
        verifyObject(IReactorUNIX, raisingReactor)


    def test_connectDestination(self):
        """
        L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
        L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
        C{getDestination} method returns an L{IAddress} with attributes which
        reflect the values passed.
        """
        memoryReactor = MemoryReactor()
        for connector in [memoryReactor.connectTCP(
                              "test.example.com", 8321, ClientFactory()),
                          memoryReactor.connectSSL(
                              "test.example.com", 8321, ClientFactory(),
                              None)]:
            verifyObject(IConnector, connector)
            address = connector.getDestination()
            verifyObject(IAddress, address)
            self.assertEqual(address.host, "test.example.com")
            self.assertEqual(address.port, 8321)
        connector = memoryReactor.connectUNIX(b"/fake/path", ClientFactory())
        verifyObject(IConnector, connector)
        address = connector.getDestination()
        verifyObject(IAddress, address)
        self.assertEqual(address.name, b"/fake/path")


    def test_listenDefaultHost(self):
        """
        L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
        L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
        C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
        will have a default host of C{'0.0.0.0'}, and a port that reflects the
        value passed, and C{listenUNIX} will have a name that reflects the path
        passed.
        """
        memoryReactor = MemoryReactor()
        for port in [memoryReactor.listenTCP(8242, Factory()),
                     memoryReactor.listenSSL(8242, Factory(), None)]:
            verifyObject(IListeningPort, port)
            address = port.getHost()
            verifyObject(IAddress, address)
            self.assertEqual(address.host, '0.0.0.0')
            self.assertEqual(address.port, 8242)
        port = memoryReactor.listenUNIX(b"/path/to/socket", Factory())
        verifyObject(IListeningPort, port)
        address = port.getHost()
        verifyObject(IAddress, address)
        self.assertEqual(address.name, b"/path/to/socket")


    def test_readers(self):
        """
        Adding, removing, and listing readers works.
        """
        reader = object()
        reactor = MemoryReactor()

        reactor.addReader(reader)
        reactor.addReader(reader)

        self.assertEqual(reactor.getReaders(), [reader])

        reactor.removeReader(reader)

        self.assertEqual(reactor.getReaders(), [])


    def test_writers(self):
        """
        Adding, removing, and listing writers works.
        """
        writer = object()
        reactor = MemoryReactor()

        reactor.addWriter(writer)
        reactor.addWriter(writer)

        self.assertEqual(reactor.getWriters(), [writer])

        reactor.removeWriter(writer)

        self.assertEqual(reactor.getWriters(), [])



class TestConsumer(object):
    """
    A very basic test consumer for use with the NonStreamingProducerTests.
    """
    def __init__(self):
        self.writes = []
        self.producer = None
        self.producerStreaming = None


    def registerProducer(self, producer, streaming):
        """
        Registers a single producer with this consumer. Just keeps track of it.

        @param producer: The producer to register.
        @param streaming: Whether the producer is a streaming one or not.
        """
        self.producer = producer
        self.producerStreaming = streaming


    def unregisterProducer(self):
        """
        Forget the producer we had previously registered.
        """
        self.producer = None
        self.producerStreaming = None


    def write(self, data):
        """
        Some data was written to the consumer: stores it for later use.

        @param data: The data to write.
        """
        self.writes.append(data)



class NonStreamingProducerTests(TestCase):
    """
    Tests for the L{NonStreamingProducer} to validate behaviour.
    """
    def test_producesOnly10Times(self):
        """
        When the L{NonStreamingProducer} has resumeProducing called 10 times,
        it writes the counter each time and then fails.
        """
        consumer = TestConsumer()
        producer = NonStreamingProducer(consumer)
        consumer.registerProducer(producer, False)

        self.assertIs(consumer.producer, producer)
        self.assertIs(producer.consumer, consumer)
        self.assertFalse(consumer.producerStreaming)

        for _ in range(10):
            producer.resumeProducing()

        # We should have unregistered the producer and printed the 10 results.
        expectedWrites = [
            b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9'
        ]
        self.assertIsNone(consumer.producer)
        self.assertIsNone(consumer.producerStreaming)
        self.assertIsNone(producer.consumer)
        self.assertEqual(consumer.writes, expectedWrites)

        # Another attempt to produce fails.
        self.assertRaises(RuntimeError, producer.resumeProducing)


    def test_cannotPauseProduction(self):
        """
        When the L{NonStreamingProducer} is paused, it raises a
        L{RuntimeError}.
        """
        consumer = TestConsumer()
        producer = NonStreamingProducer(consumer)
        consumer.registerProducer(producer, False)

        # Produce once, just to be safe.
        producer.resumeProducing()

        self.assertRaises(RuntimeError, producer.pauseProducing)