HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/lib/python3/dist-packages/twisted/conch/insults/helper.py
# -*- test-case-name: twisted.conch.test.test_helper -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Partial in-memory terminal emulator

@author: Jp Calderone
"""

from __future__ import print_function

import re, string

from zope.interface import implementer

from incremental import Version

from twisted.internet import defer, protocol, reactor
from twisted.python import log, _textattributes
from twisted.python.compat import iterbytes
from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
from twisted.conch.insults import insults

FOREGROUND = 30
BACKGROUND = 40
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)



class _FormattingState(_textattributes._FormattingStateMixin):
    """
    Represents the formatting state/attributes of a single character.

    Character set, intensity, underlinedness, blinkitude, video
    reversal, as well as foreground and background colors made up a
    character's attributes.
    """
    compareAttributes = (
        'charset', 'bold', 'underline', 'blink', 'reverseVideo', 'foreground',
        'background', '_subtracting')


    def __init__(self, charset=insults.G0, bold=False, underline=False,
                 blink=False, reverseVideo=False, foreground=WHITE,
                 background=BLACK, _subtracting=False):
        self.charset = charset
        self.bold = bold
        self.underline = underline
        self.blink = blink
        self.reverseVideo = reverseVideo
        self.foreground = foreground
        self.background = background
        self._subtracting = _subtracting


    @deprecated(Version('Twisted', 13, 1, 0))
    def wantOne(self, **kw):
        """
        Add a character attribute to a copy of this formatting state.

        @param **kw: An optional attribute name and value can be provided with
            a keyword argument.

        @return: A formatting state instance with the new attribute.

        @see: L{DefaultFormattingState._withAttribute}.
        """
        k, v = kw.popitem()
        return self._withAttribute(k, v)


    def toVT102(self):
        # Spit out a vt102 control sequence that will set up
        # all the attributes set here.  Except charset.
        attrs = []
        if self._subtracting:
            attrs.append(0)
        if self.bold:
            attrs.append(insults.BOLD)
        if self.underline:
            attrs.append(insults.UNDERLINE)
        if self.blink:
            attrs.append(insults.BLINK)
        if self.reverseVideo:
            attrs.append(insults.REVERSE_VIDEO)
        if self.foreground != WHITE:
            attrs.append(FOREGROUND + self.foreground)
        if self.background != BLACK:
            attrs.append(BACKGROUND + self.background)
        if attrs:
            return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
        return ''

CharacterAttribute = _FormattingState

deprecatedModuleAttribute(
    Version('Twisted', 13, 1, 0),
    'Use twisted.conch.insults.text.assembleFormattedText instead.',
    'twisted.conch.insults.helper',
    'CharacterAttribute')



# XXX - need to support scroll regions and scroll history
@implementer(insults.ITerminalTransport)
class TerminalBuffer(protocol.Protocol):
    """
    An in-memory terminal emulator.
    """
    for keyID in (b'UP_ARROW', b'DOWN_ARROW', b'RIGHT_ARROW', b'LEFT_ARROW',
                  b'HOME', b'INSERT', b'DELETE', b'END', b'PGUP', b'PGDN',
                  b'F1', b'F2', b'F3', b'F4', b'F5', b'F6', b'F7', b'F8', b'F9',
                  b'F10', b'F11', b'F12'):
        execBytes = keyID + b" = object()"
        execStr = execBytes.decode("ascii")
        exec(execStr)

    TAB = b'\t'
    BACKSPACE = b'\x7f'

    width = 80
    height = 24

    fill = b' '
    void = object()

    def getCharacter(self, x, y):
        return self.lines[y][x]


    def connectionMade(self):
        self.reset()


    def write(self, data):
        """
        Add the given printable bytes to the terminal.

        Line feeds in L{bytes} will be replaced with carriage return / line
        feed pairs.
        """
        for b in iterbytes(data.replace(b'\n', b'\r\n')):
            self.insertAtCursor(b)


    def _currentFormattingState(self):
        return _FormattingState(self.activeCharset, **self.graphicRendition)


    def insertAtCursor(self, b):
        """
        Add one byte to the terminal at the cursor and make consequent state
        updates.

        If b is a carriage return, move the cursor to the beginning of the
        current row.

        If b is a line feed, move the cursor to the next row or scroll down if
        the cursor is already in the last row.

        Otherwise, if b is printable, put it at the cursor position (inserting
        or overwriting as dictated by the current mode) and move the cursor.
        """
        if b == b'\r':
            self.x = 0
        elif b == b'\n':
            self._scrollDown()
        elif b in string.printable.encode("ascii"):
            if self.x >= self.width:
                self.nextLine()
            ch = (b, self._currentFormattingState())
            if self.modes.get(insults.modes.IRM):
                self.lines[self.y][self.x:self.x] = [ch]
                self.lines[self.y].pop()
            else:
                self.lines[self.y][self.x] = ch
            self.x += 1


    def _emptyLine(self, width):
        return [(self.void, self._currentFormattingState())
                for i in range(width)]


    def _scrollDown(self):
        self.y += 1
        if self.y >= self.height:
            self.y -= 1
            del self.lines[0]
            self.lines.append(self._emptyLine(self.width))


    def _scrollUp(self):
        self.y -= 1
        if self.y < 0:
            self.y = 0
            del self.lines[-1]
            self.lines.insert(0, self._emptyLine(self.width))


    def cursorUp(self, n=1):
        self.y = max(0, self.y - n)


    def cursorDown(self, n=1):
        self.y = min(self.height - 1, self.y + n)


    def cursorBackward(self, n=1):
        self.x = max(0, self.x - n)


    def cursorForward(self, n=1):
        self.x = min(self.width, self.x + n)


    def cursorPosition(self, column, line):
        self.x = column
        self.y = line


    def cursorHome(self):
        self.x = self.home.x
        self.y = self.home.y


    def index(self):
        self._scrollDown()


    def reverseIndex(self):
        self._scrollUp()


    def nextLine(self):
        """
        Update the cursor position attributes and scroll down if appropriate.
        """
        self.x = 0
        self._scrollDown()


    def saveCursor(self):
        self._savedCursor = (self.x, self.y)


    def restoreCursor(self):
        self.x, self.y = self._savedCursor
        del self._savedCursor


    def setModes(self, modes):
        for m in modes:
            self.modes[m] = True


    def resetModes(self, modes):
        for m in modes:
            try:
                del self.modes[m]
            except KeyError:
                pass


    def setPrivateModes(self, modes):
        """
        Enable the given modes.

        Track which modes have been enabled so that the implementations of
        other L{insults.ITerminalTransport} methods can be properly implemented
        to respect these settings.

        @see: L{resetPrivateModes}
        @see: L{insults.ITerminalTransport.setPrivateModes}
        """
        for m in modes:
            self.privateModes[m] = True


    def resetPrivateModes(self, modes):
        """
        Disable the given modes.

        @see: L{setPrivateModes}
        @see: L{insults.ITerminalTransport.resetPrivateModes}
        """
        for m in modes:
            try:
                del self.privateModes[m]
            except KeyError:
                pass


    def applicationKeypadMode(self):
        self.keypadMode = 'app'


    def numericKeypadMode(self):
        self.keypadMode = 'num'


    def selectCharacterSet(self, charSet, which):
        self.charsets[which] = charSet


    def shiftIn(self):
        self.activeCharset = insults.G0


    def shiftOut(self):
        self.activeCharset = insults.G1


    def singleShift2(self):
        oldActiveCharset = self.activeCharset
        self.activeCharset = insults.G2
        f = self.insertAtCursor
        def insertAtCursor(b):
            f(b)
            del self.insertAtCursor
            self.activeCharset = oldActiveCharset
        self.insertAtCursor = insertAtCursor


    def singleShift3(self):
        oldActiveCharset = self.activeCharset
        self.activeCharset = insults.G3
        f = self.insertAtCursor
        def insertAtCursor(b):
            f(b)
            del self.insertAtCursor
            self.activeCharset = oldActiveCharset
        self.insertAtCursor = insertAtCursor


    def selectGraphicRendition(self, *attributes):
        for a in attributes:
            if a == insults.NORMAL:
                self.graphicRendition = {
                    'bold': False,
                    'underline': False,
                    'blink': False,
                    'reverseVideo': False,
                    'foreground': WHITE,
                    'background': BLACK}
            elif a == insults.BOLD:
                self.graphicRendition['bold'] = True
            elif a == insults.UNDERLINE:
                self.graphicRendition['underline'] = True
            elif a == insults.BLINK:
                self.graphicRendition['blink'] = True
            elif a == insults.REVERSE_VIDEO:
                self.graphicRendition['reverseVideo'] = True
            else:
                try:
                    v = int(a)
                except ValueError:
                    log.msg("Unknown graphic rendition attribute: " + repr(a))
                else:
                    if FOREGROUND <= v <= FOREGROUND + N_COLORS:
                        self.graphicRendition['foreground'] = v - FOREGROUND
                    elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
                        self.graphicRendition['background'] = v - BACKGROUND
                    else:
                        log.msg("Unknown graphic rendition attribute: " + repr(a))


    def eraseLine(self):
        self.lines[self.y] = self._emptyLine(self.width)


    def eraseToLineEnd(self):
        width = self.width - self.x
        self.lines[self.y][self.x:] = self._emptyLine(width)


    def eraseToLineBeginning(self):
        self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)


    def eraseDisplay(self):
        self.lines = [self._emptyLine(self.width) for i in range(self.height)]


    def eraseToDisplayEnd(self):
        self.eraseToLineEnd()
        height = self.height - self.y - 1
        self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]


    def eraseToDisplayBeginning(self):
        self.eraseToLineBeginning()
        self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]


    def deleteCharacter(self, n=1):
        del self.lines[self.y][self.x:self.x+n]
        self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))


    def insertLine(self, n=1):
        self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
        del self.lines[self.height:]


    def deleteLine(self, n=1):
        del self.lines[self.y:self.y+n]
        self.lines.extend([self._emptyLine(self.width) for i in range(n)])


    def reportCursorPosition(self):
        return (self.x, self.y)


    def reset(self):
        self.home = insults.Vector(0, 0)
        self.x = self.y = 0
        self.modes = {}
        self.privateModes = {}
        self.setPrivateModes([insults.privateModes.AUTO_WRAP,
                              insults.privateModes.CURSOR_MODE])
        self.numericKeypad = 'app'
        self.activeCharset = insults.G0
        self.graphicRendition = {
            'bold': False,
            'underline': False,
            'blink': False,
            'reverseVideo': False,
            'foreground': WHITE,
            'background': BLACK}
        self.charsets = {
            insults.G0: insults.CS_US,
            insults.G1: insults.CS_US,
            insults.G2: insults.CS_ALTERNATE,
            insults.G3: insults.CS_ALTERNATE_SPECIAL}
        self.eraseDisplay()


    def unhandledControlSequence(self, buf):
        print('Could not handle', repr(buf))


    def __bytes__(self):
        lines = []
        for L in self.lines:
            buf = []
            length = 0
            for (ch, attr) in L:
                if ch is not self.void:
                    buf.append(ch)
                    length = len(buf)
                else:
                    buf.append(self.fill)
            lines.append(b''.join(buf[:length]))
        return b'\n'.join(lines)



class ExpectationTimeout(Exception):
    pass



class ExpectableBuffer(TerminalBuffer):
    _mark = 0

    def connectionMade(self):
        TerminalBuffer.connectionMade(self)
        self._expecting = []


    def write(self, data):
        TerminalBuffer.write(self, data)
        self._checkExpected()


    def cursorHome(self):
        TerminalBuffer.cursorHome(self)
        self._mark = 0


    def _timeoutExpected(self, d):
        d.errback(ExpectationTimeout())
        self._checkExpected()


    def _checkExpected(self):
        s = self.__bytes__()[self._mark:]
        while self._expecting:
            expr, timer, deferred = self._expecting[0]
            if timer and not timer.active():
                del self._expecting[0]
                continue
            for match in expr.finditer(s):
                if timer:
                    timer.cancel()
                del self._expecting[0]
                self._mark += match.end()
                s = s[match.end():]
                deferred.callback(match)
                break
            else:
                return


    def expect(self, expression, timeout=None, scheduler=reactor):
        d = defer.Deferred()
        timer = None
        if timeout:
            timer = scheduler.callLater(timeout, self._timeoutExpected, d)
        self._expecting.append((re.compile(expression), timer, d))
        self._checkExpected()
        return d

__all__ = [
    'CharacterAttribute',  'TerminalBuffer', 'ExpectableBuffer']