tools/mpremote: Improve error output.

Makes the filesystem command give standard error messages rather than
just printing the exception from the device.

Makes the distinction between CommandError and TransportError clearer.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
This commit is contained in:
Jim Mussared
2023-06-15 18:09:39 +10:00
committed by Damien George
parent 6f8157d880
commit dd6f78f014
3 changed files with 70 additions and 55 deletions

View File

@@ -63,8 +63,7 @@ def do_connect(state, args=None):
msg = er.args[0] msg = er.args[0]
if msg.startswith("failed to access"): if msg.startswith("failed to access"):
msg += " (it may be in use by another program)" msg += " (it may be in use by another program)"
print(msg) raise CommandError(msg)
sys.exit(1)
def do_disconnect(state, _args=None): def do_disconnect(state, _args=None):
@@ -322,39 +321,48 @@ def do_filesystem(state, args):
if command == "ls" and not paths: if command == "ls" and not paths:
paths = [""] paths = [""]
# Handle each path sequentially. try:
for path in paths: # Handle each path sequentially.
if verbose: for path in paths:
if command == "cp": if verbose:
print("{} {} {}".format(command, path, cp_dest)) if command == "cp":
else: print("{} {} {}".format(command, path, cp_dest))
print("{} :{}".format(command, path)) else:
print("{} :{}".format(command, path))
if command == "cat": if command == "cat":
state.transport.fs_printfile(path) state.transport.fs_printfile(path)
elif command == "ls": elif command == "ls":
for result in state.transport.fs_listdir(path): for result in state.transport.fs_listdir(path):
print( print(
"{:12} {}{}".format( "{:12} {}{}".format(
result.st_size, result.name, "/" if result.st_mode & 0x4000 else "" result.st_size, result.name, "/" if result.st_mode & 0x4000 else ""
)
) )
) elif command == "mkdir":
elif command == "mkdir": state.transport.fs_mkdir(path)
state.transport.fs_mkdir(path) elif command == "rm":
elif command == "rm": state.transport.fs_rmfile(path)
state.transport.fs_rmfile(path) elif command == "rmdir":
elif command == "rmdir": state.transport.fs_rmdir(path)
state.transport.fs_rmdir(path) elif command == "touch":
elif command == "touch": state.transport.fs_touchfile(path)
state.transport.fs_touchfile(path) elif command.endswith("sum") and command[-4].isdigit():
elif command.endswith("sum") and command[-4].isdigit(): digest = state.transport.fs_hashfile(path, command[:-3])
digest = state.transport.fs_hashfile(path, command[:-3]) print(digest.hex())
print(digest.hex()) elif command == "cp":
elif command == "cp": if args.recursive:
if args.recursive: do_filesystem_recursive_cp(state, path, cp_dest, len(paths) > 1)
do_filesystem_recursive_cp(state, path, cp_dest, len(paths) > 1) else:
else: do_filesystem_cp(state, path, cp_dest, len(paths) > 1)
do_filesystem_cp(state, path, cp_dest, len(paths) > 1) except FileNotFoundError as er:
raise CommandError("{}: {}: No such file or directory.".format(command, er.args[0]))
except IsADirectoryError as er:
raise CommandError("{}: {}: Is a directory.".format(command, er.args[0]))
except FileExistsError as er:
raise CommandError("{}: {}: File exists.".format(command, er.args[0]))
except TransportError as er:
raise CommandError("Error with transport:\n{}".format(er.args[0]))
def do_edit(state, args): def do_edit(state, args):
@@ -362,7 +370,7 @@ def do_edit(state, args):
state.did_action() state.did_action()
if not os.getenv("EDITOR"): if not os.getenv("EDITOR"):
raise TransportError("edit: $EDITOR not set") raise CommandError("edit: $EDITOR not set")
for src in args.files: for src in args.files:
src = src.lstrip(":") src = src.lstrip(":")
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src)) dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
@@ -393,8 +401,7 @@ def _do_execbuffer(state, buf, follow):
stdout_write_bytes(ret_err) stdout_write_bytes(ret_err)
sys.exit(1) sys.exit(1)
except TransportError as er: except TransportError as er:
print(er) raise CommandError(er.args[0])
sys.exit(1)
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(1) sys.exit(1)

View File

@@ -42,19 +42,27 @@ class TransportError(Exception):
pass pass
class TransportExecError(TransportError):
def __init__(self, status_code, error_output):
self.status_code = status_code
self.error_output = error_output
super().__init__(error_output)
listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"]) listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
# Takes a Transport error (containing the text of an OSError traceback) and # Takes a Transport error (containing the text of an OSError traceback) and
# raises it as the corresponding OSError-derived exception. # raises it as the corresponding OSError-derived exception.
def _convert_filesystem_error(e, info): def _convert_filesystem_error(e, info):
if len(e.args) >= 3: if "OSError" in e.error_output and "ENOENT" in e.error_output:
if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]: return FileNotFoundError(info)
return FileNotFoundError(info) if "OSError" in e.error_output and "EISDIR" in e.error_output:
if b"OSError" in e.args[2] and b"EISDIR" in e.args[2]: return IsADirectoryError(info)
return IsADirectoryError(info) if "OSError" in e.error_output and "EEXIST" in e.error_output:
if b"OSError" in e.args[2] and b"EEXIST" in e.args[2]: return FileExistsError(info)
return FileExistsError(info) if "OSError" in e.error_output and "ENODEV" in e.error_output:
return FileNotFoundError(info)
return e return e
@@ -72,7 +80,7 @@ class Transport:
buf.extend(b"[") buf.extend(b"[")
self.exec(cmd, data_consumer=repr_consumer) self.exec(cmd, data_consumer=repr_consumer)
buf.extend(b"]") buf.extend(b"]")
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, src) from None raise _convert_filesystem_error(e, src) from None
return [ return [
@@ -84,7 +92,7 @@ class Transport:
try: try:
self.exec("import os") self.exec("import os")
return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src))) return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src)))
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, src) from None raise _convert_filesystem_error(e, src) from None
def fs_exists(self, src): def fs_exists(self, src):
@@ -109,7 +117,7 @@ class Transport:
) )
try: try:
self.exec(cmd, data_consumer=stdout_write_bytes) self.exec(cmd, data_consumer=stdout_write_bytes)
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, src) from None raise _convert_filesystem_error(e, src) from None
def fs_readfile(self, src, chunk_size=256, progress_callback=None): def fs_readfile(self, src, chunk_size=256, progress_callback=None):
@@ -128,7 +136,7 @@ class Transport:
if progress_callback: if progress_callback:
progress_callback(len(contents), src_size) progress_callback(len(contents), src_size)
self.exec("f.close()") self.exec("f.close()")
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, src) from None raise _convert_filesystem_error(e, src) from None
return contents return contents
@@ -148,37 +156,37 @@ class Transport:
if progress_callback: if progress_callback:
progress_callback(written, src_size) progress_callback(written, src_size)
self.exec("f.close()") self.exec("f.close()")
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, dest) from None raise _convert_filesystem_error(e, dest) from None
def fs_mkdir(self, path): def fs_mkdir(self, path):
try: try:
self.exec("import os\nos.mkdir('%s')" % path) self.exec("import os\nos.mkdir('%s')" % path)
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, path) from None raise _convert_filesystem_error(e, path) from None
def fs_rmdir(self, path): def fs_rmdir(self, path):
try: try:
self.exec("import os\nos.rmdir('%s')" % path) self.exec("import os\nos.rmdir('%s')" % path)
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, path) from None raise _convert_filesystem_error(e, path) from None
def fs_rmfile(self, path): def fs_rmfile(self, path):
try: try:
self.exec("import os\nos.remove('%s')" % path) self.exec("import os\nos.remove('%s')" % path)
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, path) from None raise _convert_filesystem_error(e, path) from None
def fs_touchfile(self, path): def fs_touchfile(self, path):
try: try:
self.exec("f=open('%s','a')\nf.close()" % path) self.exec("f=open('%s','a')\nf.close()" % path)
except TransportError as e: except TransportExecError as e:
raise _convert_filesystem_error(e, path) from None raise _convert_filesystem_error(e, path) from None
def fs_hashfile(self, path, algo, chunk_size=256): def fs_hashfile(self, path, algo, chunk_size=256):
try: try:
self.exec("import hashlib\nh = hashlib.{algo}()".format(algo=algo)) self.exec("import hashlib\nh = hashlib.{algo}()".format(algo=algo))
except TransportError: except TransportExecError:
# hashlib (or hashlib.{algo}) not available on device. Do the hash locally. # hashlib (or hashlib.{algo}) not available on device. Do the hash locally.
data = self.fs_readfile(path, chunk_size=chunk_size) data = self.fs_readfile(path, chunk_size=chunk_size)
return getattr(hashlib, algo)(data).digest() return getattr(hashlib, algo)(data).digest()

View File

@@ -38,7 +38,7 @@
import ast, io, os, re, struct, sys, time import ast, io, os, re, struct, sys, time
from errno import EPERM from errno import EPERM
from .console import VT_ENABLED from .console import VT_ENABLED
from .transport import TransportError, Transport from .transport import TransportError, TransportExecError, Transport
class SerialTransport(Transport): class SerialTransport(Transport):
@@ -267,7 +267,7 @@ class SerialTransport(Transport):
def exec(self, command, data_consumer=None): def exec(self, command, data_consumer=None):
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
if ret_err: if ret_err:
raise TransportError("exception", ret, ret_err) raise TransportExecError(ret, ret_err.decode())
return ret return ret
def execfile(self, filename): def execfile(self, filename):