It's not necessary to support this, which allows an arbitrary memory address to be specified and potentially allows invalid memory accesses. Requiring an object with the buffer protocol is safer, and also means that the length of the region is always specified. Signed-off-by: Damien George <damien@micropython.org>
414 lines
14 KiB
Python
414 lines
14 KiB
Python
# Test VfsRom filesystem.
|
|
|
|
try:
|
|
import errno, sys, struct, os, uctypes, vfs
|
|
|
|
vfs.VfsRom
|
|
except (ImportError, AttributeError):
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
try:
|
|
import select
|
|
except ImportError:
|
|
select = None
|
|
|
|
import unittest
|
|
|
|
IFDIR = 0x4000
|
|
IFREG = 0x8000
|
|
|
|
SEEK_SET = 0
|
|
SEEK_CUR = 1
|
|
SEEK_END = 2
|
|
|
|
# An mpy file with four constant objects: str, bytes, long-int, float.
|
|
test_mpy = (
|
|
# header
|
|
b"M\x06\x00\x1f" # mpy file header
|
|
b"\x06" # n_qstr
|
|
b"\x05" # n_obj
|
|
# qstrs
|
|
b"\x0etest.py\x00" # qstr0 = "test.py"
|
|
b"\x0f" # qstr1 = "<module>"
|
|
b"\x0estr_obj\x00" # qstr2 = "str_obj"
|
|
b"\x12bytes_obj\x00" # qstr3 = "bytes_obj"
|
|
b"\x0eint_obj\x00" # qstr4 = "int_obj"
|
|
b"\x12float_obj\x00" # qstr5 = "float_obj"
|
|
# objects
|
|
b"\x05\x14this is a str object\x00"
|
|
b"\x06\x16this is a bytes object\x00"
|
|
b"\x07\x0a1234567890" # long-int object
|
|
b"\x08\x041.23" # float object
|
|
b"\x05\x07str_obj\x00" # str object of existing qstr
|
|
# bytecode
|
|
b"\x81\x28" # 21 bytes, no children, bytecode
|
|
b"\x00\x02" # prelude
|
|
b"\x01" # simple name (<module>)
|
|
b"\x23\x00" # LOAD_CONST_OBJ(0)
|
|
b"\x16\x02" # STORE_NAME(str_obj)
|
|
b"\x23\x01" # LOAD_CONST_OBJ(1)
|
|
b"\x16\x03" # STORE_NAME(bytes_obj)
|
|
b"\x23\x02" # LOAD_CONST_OBJ(2)
|
|
b"\x16\x04" # STORE_NAME(int_obj)
|
|
b"\x23\x03" # LOAD_CONST_OBJ(3)
|
|
b"\x16\x05" # STORE_NAME(float_obj)
|
|
b"\x51" # LOAD_CONST_NONE
|
|
b"\x63" # RETURN_VALUE
|
|
)
|
|
|
|
|
|
class VfsRomWriter:
|
|
ROMFS_HEADER = b"\xd2\xcd\x31"
|
|
|
|
ROMFS_RECORD_KIND_UNUSED = 0
|
|
ROMFS_RECORD_KIND_PADDING = 1
|
|
ROMFS_RECORD_KIND_DATA_VERBATIM = 2
|
|
ROMFS_RECORD_KIND_DATA_POINTER = 3
|
|
ROMFS_RECORD_KIND_DIRECTORY = 4
|
|
ROMFS_RECORD_KIND_FILE = 5
|
|
|
|
def __init__(self):
|
|
self._dir_stack = [(None, bytearray())]
|
|
|
|
def _encode_uint(self, value):
|
|
encoded = [value & 0x7F]
|
|
value >>= 7
|
|
while value != 0:
|
|
encoded.insert(0, 0x80 | (value & 0x7F))
|
|
value >>= 7
|
|
return bytes(encoded)
|
|
|
|
def _pack(self, kind, payload):
|
|
return self._encode_uint(kind) + self._encode_uint(len(payload)) + payload
|
|
|
|
def _extend(self, data):
|
|
buf = self._dir_stack[-1][1]
|
|
buf.extend(data)
|
|
return len(buf)
|
|
|
|
def finalise(self):
|
|
_, data = self._dir_stack.pop()
|
|
encoded_kind = VfsRomWriter.ROMFS_HEADER
|
|
encoded_len = self._encode_uint(len(data))
|
|
if (len(encoded_kind) + len(encoded_len) + len(data)) % 2 == 1:
|
|
encoded_len = b"\x80" + encoded_len
|
|
data = encoded_kind + encoded_len + data
|
|
return data
|
|
|
|
def opendir(self, dirname):
|
|
self._dir_stack.append((dirname, bytearray()))
|
|
|
|
def closedir(self):
|
|
dirname, dirdata = self._dir_stack.pop()
|
|
dirdata = self._encode_uint(len(dirname)) + bytes(dirname, "ascii") + dirdata
|
|
self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DIRECTORY, dirdata))
|
|
|
|
def mkdata(self, data):
|
|
assert len(self._dir_stack) == 1
|
|
return self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, data)) - len(
|
|
data
|
|
)
|
|
|
|
def mkfile(self, filename, filedata, extra_payload=b""):
|
|
filename = bytes(filename, "ascii")
|
|
payload = self._encode_uint(len(filename))
|
|
payload += filename
|
|
payload += extra_payload
|
|
if isinstance(filedata, tuple):
|
|
sub_payload = self._encode_uint(filedata[0])
|
|
sub_payload += self._encode_uint(filedata[1])
|
|
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER, sub_payload)
|
|
else:
|
|
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, filedata)
|
|
self._dir_stack[-1][1].extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_FILE, payload))
|
|
|
|
|
|
def _make_romfs(fs, files, data_map):
|
|
for filename, contents in files:
|
|
if isinstance(contents, tuple):
|
|
fs.opendir(filename)
|
|
_make_romfs(fs, contents, data_map)
|
|
fs.closedir()
|
|
elif isinstance(contents, int):
|
|
fs.mkfile(filename, data_map[contents])
|
|
else:
|
|
fs.mkfile(filename, contents)
|
|
|
|
|
|
def make_romfs(files, data=None):
|
|
fs = VfsRomWriter()
|
|
data_map = {}
|
|
if data:
|
|
for k, v in data.items():
|
|
data_map[k] = len(v), fs.mkdata(v)
|
|
_make_romfs(fs, files, data_map)
|
|
return fs.finalise()
|
|
|
|
|
|
# A class to test if a value is within a range, needed because MicroPython's range
|
|
# doesn't support arbitrary objects.
|
|
class Range:
|
|
def __init__(self, lower, upper):
|
|
self._lower = lower
|
|
self._upper = upper
|
|
|
|
def __repr__(self):
|
|
return "Range({}, {})".format(self._lower, self._upper)
|
|
|
|
def __contains__(self, value):
|
|
return self._lower <= value < self._upper
|
|
|
|
|
|
class TestBase(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
fs_inner = make_romfs((("test_inner.txt", b"contents_inner"), ("c.py", b"")))
|
|
cls.romfs = make_romfs(
|
|
(
|
|
("fs.romfs", 0),
|
|
("test.txt", b"contents"),
|
|
(
|
|
"dir",
|
|
(
|
|
("a.py", b"x = 1"),
|
|
("b.py", b"x = 2"),
|
|
("test.mpy", test_mpy),
|
|
),
|
|
),
|
|
),
|
|
{0: fs_inner},
|
|
)
|
|
cls.romfs_ilistdir = [
|
|
("fs.romfs", IFREG, 0, 46),
|
|
("test.txt", IFREG, 0, 8),
|
|
("dir", IFDIR, 0, 198),
|
|
]
|
|
cls.romfs_listdir = [x[0] for x in cls.romfs_ilistdir]
|
|
cls.romfs_listdir_dir = ["a.py", "b.py", "test.mpy"]
|
|
cls.romfs_listdir_bytes = [bytes(x, "ascii") for x in cls.romfs_listdir]
|
|
cls.romfs_addr = uctypes.addressof(cls.romfs)
|
|
cls.romfs_addr_range = Range(cls.romfs_addr, cls.romfs_addr + len(cls.romfs))
|
|
|
|
|
|
class TestEdgeCases(unittest.TestCase):
|
|
def test_empty_romfs(self):
|
|
empty_romfs = make_romfs(())
|
|
self.assertEqual(empty_romfs, bytes([0x80 | ord("R"), 0x80 | ord("M"), ord("1"), 0]))
|
|
fs = vfs.VfsRom(empty_romfs)
|
|
self.assertIsInstance(fs, vfs.VfsRom)
|
|
fs.mount(True, False)
|
|
self.assertEqual(list(fs.ilistdir("")), [])
|
|
self.assertEqual(fs.stat(""), (IFDIR, 0, 0, 0, 0, 0, 0, 0, 0, 0))
|
|
self.assertEqual(fs.statvfs(""), (1, 0, 0, 0, 0, 0, 0, 0, 0, 32767))
|
|
|
|
def test_error(self):
|
|
for bad_romfs in (b"", b"xxx", b"not a romfs"):
|
|
with self.assertRaises(OSError) as ctx:
|
|
vfs.VfsRom(bad_romfs)
|
|
self.assertEqual(ctx.exception.errno, errno.ENODEV)
|
|
|
|
def test_unknown_record(self):
|
|
fs = VfsRomWriter()
|
|
fs._extend(fs._pack(VfsRomWriter.ROMFS_RECORD_KIND_PADDING, b"payload"))
|
|
fs.mkfile(
|
|
"test",
|
|
b"contents",
|
|
extra_payload=fs._pack(VfsRomWriter.ROMFS_RECORD_KIND_PADDING, b"pad"),
|
|
)
|
|
romfs = fs.finalise()
|
|
fs = vfs.VfsRom(romfs)
|
|
self.assertEqual(list(fs.ilistdir("")), [("test", IFREG, 0, 8)])
|
|
with fs.open("test", "rb") as f:
|
|
self.assertEqual(f.read(), b"contents")
|
|
|
|
|
|
class TestStandalone(TestBase):
|
|
def test_constructor(self):
|
|
self.assertIsInstance(vfs.VfsRom(self.romfs), vfs.VfsRom)
|
|
with self.assertRaises(TypeError):
|
|
vfs.VfsRom(self.romfs_addr)
|
|
|
|
def test_mount(self):
|
|
vfs.VfsRom(self.romfs).mount(True, False)
|
|
with self.assertRaises(OSError):
|
|
vfs.VfsRom(self.romfs).mount(True, True)
|
|
|
|
def test_ilistdir(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
self.assertEqual(list(fs.ilistdir("")), self.romfs_ilistdir)
|
|
self.assertEqual(list(fs.ilistdir("/")), self.romfs_ilistdir)
|
|
with self.assertRaises(OSError):
|
|
fs.ilistdir("does not exist")
|
|
|
|
def test_stat(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
self.assertEqual(fs.stat(""), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(fs.stat("/"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(fs.stat("/test.txt"), (IFREG, 0, 0, 0, 0, 0, 8, 0, 0, 0))
|
|
self.assertEqual(fs.stat("/dir"), (IFDIR, 0, 0, 0, 0, 0, 198, 0, 0, 0))
|
|
with self.assertRaises(OSError):
|
|
fs.stat("/does-not-exist")
|
|
|
|
def test_statvfs(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
self.assertEqual(fs.statvfs(""), (1, 0, 289, 0, 0, 0, 0, 0, 0, 32767))
|
|
|
|
def test_open(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
|
|
with fs.open("/test.txt", "") as f:
|
|
self.assertEqual(f.read(), "contents")
|
|
with fs.open("/test.txt", "rt") as f:
|
|
self.assertEqual(f.read(), "contents")
|
|
with fs.open("/test.txt", "rb") as f:
|
|
self.assertEqual(f.read(), b"contents")
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
fs.open("/file-does-not-exist", "")
|
|
self.assertEqual(ctx.exception.errno, errno.ENOENT)
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
fs.open("/dir", "rb")
|
|
self.assertEqual(ctx.exception.errno, errno.EISDIR)
|
|
|
|
with self.assertRaises(OSError):
|
|
fs.open("/test.txt", "w")
|
|
with self.assertRaises(OSError):
|
|
fs.open("/test.txt", "a")
|
|
with self.assertRaises(OSError):
|
|
fs.open("/test.txt", "+")
|
|
|
|
def test_file_seek(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
with fs.open("/test.txt", "") as f:
|
|
self.assertEqual(f.seek(0, SEEK_SET), 0)
|
|
self.assertEqual(f.seek(3, SEEK_SET), 3)
|
|
self.assertEqual(f.read(), "tents")
|
|
self.assertEqual(f.seek(0, SEEK_SET), 0)
|
|
self.assertEqual(f.seek(100, SEEK_CUR), 8)
|
|
self.assertEqual(f.seek(-1, SEEK_END), 7)
|
|
self.assertEqual(f.read(), "s")
|
|
self.assertEqual(f.seek(1, SEEK_END), 8)
|
|
with self.assertRaises(OSError):
|
|
f.seek(-1, SEEK_SET)
|
|
f.seek(0, SEEK_SET)
|
|
with self.assertRaises(OSError):
|
|
f.seek(-1, SEEK_CUR)
|
|
with self.assertRaises(OSError):
|
|
f.seek(-100, SEEK_END)
|
|
|
|
@unittest.skipIf(select is None, "no select module")
|
|
def test_file_ioctl_invalid(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
with fs.open("/test.txt", "") as f:
|
|
p = select.poll()
|
|
p.register(f)
|
|
with self.assertRaises(OSError):
|
|
p.poll(0)
|
|
|
|
def test_memory_mapping(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
with fs.open("/test.txt", "rb") as f:
|
|
addr = uctypes.addressof(f)
|
|
data = memoryview(f)
|
|
self.assertIn(addr, self.romfs_addr_range)
|
|
self.assertIn(addr + len(data), self.romfs_addr_range)
|
|
self.assertEqual(bytes(data), b"contents")
|
|
|
|
|
|
class TestMounted(TestBase):
|
|
def setUp(self):
|
|
self.orig_sys_path = list(sys.path)
|
|
self.orig_cwd = os.getcwd()
|
|
vfs.mount(vfs.VfsRom(self.romfs), "/test_rom")
|
|
|
|
def tearDown(self):
|
|
vfs.umount("/test_rom")
|
|
os.chdir(self.orig_cwd)
|
|
sys.path = self.orig_sys_path
|
|
|
|
def test_listdir(self):
|
|
self.assertEqual(os.listdir("/test_rom"), self.romfs_listdir)
|
|
self.assertEqual(os.listdir("/test_rom/dir"), self.romfs_listdir_dir)
|
|
self.assertEqual(os.listdir(b"/test_rom"), self.romfs_listdir_bytes)
|
|
|
|
def test_chdir(self):
|
|
os.chdir("/test_rom")
|
|
self.assertEqual(os.listdir(), self.romfs_listdir)
|
|
|
|
os.chdir("/test_rom/")
|
|
self.assertEqual(os.listdir(), self.romfs_listdir)
|
|
|
|
# chdir within the romfs is not implemented.
|
|
with self.assertRaises(OSError):
|
|
os.chdir("/test_rom/dir")
|
|
|
|
def test_stat(self):
|
|
self.assertEqual(os.stat("/test_rom"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(os.stat("/test_rom/"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(os.stat("/test_rom/test.txt"), (IFREG, 0, 0, 0, 0, 0, 8, 0, 0, 0))
|
|
self.assertEqual(os.stat("/test_rom/dir"), (IFDIR, 0, 0, 0, 0, 0, 198, 0, 0, 0))
|
|
with self.assertRaises(OSError):
|
|
os.stat("/test_rom/does-not-exist")
|
|
|
|
def test_open(self):
|
|
with open("/test_rom/test.txt") as f:
|
|
self.assertEqual(f.read(), "contents")
|
|
with open("/test_rom/test.txt", "b") as f:
|
|
self.assertEqual(f.read(), b"contents")
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
open("/test_rom/file-does-not-exist")
|
|
self.assertEqual(ctx.exception.errno, errno.ENOENT)
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
open("/test_rom/dir")
|
|
self.assertEqual(ctx.exception.errno, errno.EISDIR)
|
|
|
|
def test_import_py(self):
|
|
sys.path.append("/test_rom/dir")
|
|
a = __import__("a")
|
|
b = __import__("b")
|
|
self.assertEqual(a.__file__, "/test_rom/dir/a.py")
|
|
self.assertEqual(a.x, 1)
|
|
self.assertEqual(b.__file__, "/test_rom/dir/b.py")
|
|
self.assertEqual(b.x, 2)
|
|
|
|
def test_import_mpy(self):
|
|
sys.path.append("/test_rom/dir")
|
|
test = __import__("test")
|
|
self.assertEqual(test.__file__, "/test_rom/dir/test.mpy")
|
|
self.assertEqual(test.str_obj, "this is a str object")
|
|
self.assertEqual(test.bytes_obj, b"this is a bytes object")
|
|
self.assertEqual(test.int_obj, 1234567890)
|
|
self.assertEqual(test.float_obj, 1.23)
|
|
self.assertIn(uctypes.addressof(test.str_obj), self.romfs_addr_range)
|
|
self.assertIn(uctypes.addressof(test.bytes_obj), self.romfs_addr_range)
|
|
|
|
def test_romfs_inner(self):
|
|
with open("/test_rom/fs.romfs", "rb") as f:
|
|
romfs_inner = vfs.VfsRom(memoryview(f))
|
|
|
|
vfs.mount(romfs_inner, "/test_rom2")
|
|
|
|
self.assertEqual(os.listdir("/test_rom2"), ["test_inner.txt", "c.py"])
|
|
|
|
sys.path.append("/test_rom2")
|
|
self.assertEqual(__import__("c").__file__, "/test_rom2/c.py")
|
|
|
|
with open("/test_rom2/test_inner.txt") as f:
|
|
self.assertEqual(f.read(), "contents_inner")
|
|
|
|
with open("/test_rom2/test_inner.txt", "rb") as f:
|
|
addr = uctypes.addressof(f)
|
|
data = memoryview(f)
|
|
self.assertIn(addr, self.romfs_addr_range)
|
|
self.assertIn(addr + len(data), self.romfs_addr_range)
|
|
|
|
vfs.umount("/test_rom2")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|