HEX
Server: Apache/2.4.41 (Ubuntu)
System: Linux ip-172-31-42-149 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 07:00:04 UTC 2025 aarch64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/self/root/lib/python3/dist-packages/twisted/test/test_dirdbm.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Test cases for dirdbm module.
"""

import shutil
from base64 import b64decode

from twisted.trial import unittest
from twisted.persisted import dirdbm
from twisted.python import rebuild
from twisted.python.filepath import FilePath



class DirDbmTests(unittest.TestCase):

    def setUp(self):
        self.path = FilePath(self.mktemp())
        self.dbm = dirdbm.open(self.path.path)
        self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'))


    def test_all(self):
        k = b64decode("//==")
        self.dbm[k] = b"a"
        self.dbm[k] = b"a"
        self.assertEqual(self.dbm[k], b"a")


    def test_rebuildInteraction(self):
        s = dirdbm.Shelf('dirdbm.rebuild.test')
        s[b'key'] = b'value'
        rebuild.rebuild(dirdbm)


    def test_dbm(self):
        d = self.dbm

        # Insert keys
        keys = []
        values = set()
        for k, v in self.items:
            d[k] = v
            keys.append(k)
            values.add(v)
        keys.sort()

        # Check they exist
        for k, v in self.items:
            self.assertIn(k, d)
            self.assertEqual(d[k], v)

        # Check non existent key
        try:
            d[b"XXX"]
        except KeyError:
            pass
        else:
            assert 0, "didn't raise KeyError on non-existent key"

        # Check keys(), values() and items()
        dbkeys = d.keys()
        dbvalues = set(d.values())
        dbitems = set(d.items())
        dbkeys.sort()
        items = set(self.items)
        self.assertEqual(keys, dbkeys,
                         ".keys() output didn't match: %s != %s" %
                         (repr(keys), repr(dbkeys)))
        self.assertEqual(values, dbvalues,
                         ".values() output didn't match: %s != %s" %
                         (repr(values), repr(dbvalues)))
        self.assertEqual(items, dbitems,
                         "items() didn't match: %s != %s" %
                         (repr(items), repr(dbitems)))

        copyPath = self.mktemp()
        d2 = d.copyTo(copyPath)

        copykeys = d.keys()
        copyvalues = set(d.values())
        copyitems = set(d.items())
        copykeys.sort()

        self.assertEqual(dbkeys, copykeys,
                         ".copyTo().keys() didn't match: %s != %s" %
                         (repr(dbkeys), repr(copykeys)))
        self.assertEqual(dbvalues, copyvalues,
                         ".copyTo().values() didn't match: %s != %s" %
                         (repr(dbvalues), repr(copyvalues)))
        self.assertEqual(dbitems, copyitems,
                         ".copyTo().items() didn't match: %s != %s" %
                         (repr(dbkeys), repr(copyitems)))

        d2.clear()
        self.assertTrue(len(d2.keys()) == len(d2.values()) ==
                        len(d2.items()) == len(d2) == 0, ".clear() failed")
        self.assertNotEqual(len(d), len(d2))
        shutil.rmtree(copyPath)

        # Delete items
        for k, v in self.items:
            del d[k]
            self.assertNotIn(k, d, "key is still in database, even though we deleted it")
        self.assertEqual(len(d.keys()), 0, "database has keys")
        self.assertEqual(len(d.values()), 0, "database has values")
        self.assertEqual(len(d.items()), 0, "database has items")
        self.assertEqual(len(d), 0, "database has items")


    def test_modificationTime(self):
        import time
        # The mtime value for files comes from a different place than the
        # gettimeofday() system call. On linux, gettimeofday() can be
        # slightly ahead (due to clock drift which gettimeofday() takes into
        # account but which open()/write()/close() do not), and if we are
        # close to the edge of the next second, time.time() can give a value
        # which is larger than the mtime which results from a subsequent
        # write(). I consider this a kernel bug, but it is beyond the scope
        # of this test. Thus we keep the range of acceptability to 3 seconds time.
        # -warner
        self.dbm[b"k"] = b"v"
        self.assertTrue(abs(time.time() - self.dbm.getModificationTime(b"k")) <= 3)
        self.assertRaises(KeyError, self.dbm.getModificationTime, b"nokey")


    def test_recovery(self):
        """
        DirDBM: test recovery from directory after a faked crash
        """
        k = self.dbm._encode(b"key1")
        with self.path.child(k + b".rpl").open(mode="wb") as f:
            f.write(b"value")

        k2 = self.dbm._encode(b"key2")
        with self.path.child(k2).open(mode="wb") as f:
            f.write(b"correct")
        with self.path.child(k2 + b".rpl").open(mode="wb") as f:
            f.write(b"wrong")

        with self.path.child("aa.new").open(mode="wb") as f:
            f.write(b"deleted")

        dbm = dirdbm.DirDBM(self.path.path)
        self.assertEqual(dbm[b"key1"], b"value")
        self.assertEqual(dbm[b"key2"], b"correct")
        self.assertFalse(self.path.globChildren("*.new"))
        self.assertFalse(self.path.globChildren("*.rpl"))


    def test_nonStringKeys(self):
        """
        L{dirdbm.DirDBM} operations only support string keys: other types
        should raise a L{TypeError}.
        """
        self.assertRaises(TypeError, self.dbm.__setitem__, 2, "3")
        try:
            self.assertRaises(TypeError, self.dbm.__setitem__, "2", 3)
        except unittest.FailTest:
            # dirdbm.Shelf.__setitem__ supports non-string values
            self.assertIsInstance(self.dbm, dirdbm.Shelf)
        self.assertRaises(TypeError, self.dbm.__getitem__, 2)
        self.assertRaises(TypeError, self.dbm.__delitem__, 2)
        self.assertRaises(TypeError, self.dbm.has_key, 2)
        self.assertRaises(TypeError, self.dbm.__contains__, 2)
        self.assertRaises(TypeError, self.dbm.getModificationTime, 2)


    def test_failSet(self):
        """
        Failure path when setting an item.
        """
        def _writeFail(path, data):
            path.setContent(data)
            raise IOError("fail to write")

        self.dbm[b"failkey"] = b"test"
        self.patch(self.dbm, "_writeFile", _writeFail)
        self.assertRaises(IOError, self.dbm.__setitem__, b"failkey", b"test2")


class ShelfTests(DirDbmTests):

    def setUp(self):
        self.path = FilePath(self.mktemp())
        self.dbm = dirdbm.Shelf(self.path.path)
        self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'),
                      (b'int', 12), (b'float', 12.0), (b'tuple', (None, 12)))


testCases = [DirDbmTests, ShelfTests]