Lint tools/flash_writer.py
This commit is contained in:
parent
7386cc3665
commit
d71e0d75b1
@ -35,10 +35,10 @@ import_serial_module = True
|
||||
# When SDK release, please set SDK_RELEASE as True.
|
||||
SDK_RELEASE = False
|
||||
|
||||
if SDK_RELEASE :
|
||||
if SDK_RELEASE:
|
||||
PRINT_RAW_COMMAND = False
|
||||
REBOOT_AT_END = True
|
||||
else :
|
||||
else:
|
||||
PRINT_RAW_COMMAND = True
|
||||
REBOOT_AT_END = True
|
||||
|
||||
@ -76,55 +76,134 @@ class ConfigArgs:
|
||||
PKGAPP_NAME = []
|
||||
PKGUPD_NAME = []
|
||||
|
||||
|
||||
ROM_MSG = [b"Welcome to nash"]
|
||||
XMDM_MSG = "Waiting for XMODEM (CRC or 1K) transfer. Ctrl-X to cancel."
|
||||
|
||||
class ConfigArgsLoader():
|
||||
|
||||
class ConfigArgsLoader:
|
||||
def __init__(self):
|
||||
self.parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
|
||||
self.parser.add_argument("package_name", help="the name of the package to install", nargs='*')
|
||||
self.parser.add_argument("-f", "--file", dest="file_name", help="save file", action='append')
|
||||
self.parser.add_argument("-e", "--erase", dest="erase_name", help="erase file", action='append')
|
||||
self.parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.RawTextHelpFormatter
|
||||
)
|
||||
self.parser.add_argument(
|
||||
"package_name", help="the name of the package to install", nargs="*"
|
||||
)
|
||||
self.parser.add_argument(
|
||||
"-f", "--file", dest="file_name", help="save file", action="append"
|
||||
)
|
||||
self.parser.add_argument(
|
||||
"-e", "--erase", dest="erase_name", help="erase file", action="append"
|
||||
)
|
||||
|
||||
self.parser.add_argument("-S", "--sys", dest="pkgsys_name", help="the name of the system package to install", action='append')
|
||||
self.parser.add_argument("-A", "--app", dest="pkgapp_name", help="the name of the application package to install", action='append')
|
||||
self.parser.add_argument("-U", "--upd", dest="pkgupd_name", help="the name of the updater package to install", action='append')
|
||||
self.parser.add_argument(
|
||||
"-S",
|
||||
"--sys",
|
||||
dest="pkgsys_name",
|
||||
help="the name of the system package to install",
|
||||
action="append",
|
||||
)
|
||||
self.parser.add_argument(
|
||||
"-A",
|
||||
"--app",
|
||||
dest="pkgapp_name",
|
||||
help="the name of the application package to install",
|
||||
action="append",
|
||||
)
|
||||
self.parser.add_argument(
|
||||
"-U",
|
||||
"--upd",
|
||||
dest="pkgupd_name",
|
||||
help="the name of the updater package to install",
|
||||
action="append",
|
||||
)
|
||||
|
||||
self.parser.add_argument("-a", "--auto-reset", dest="auto_reset",
|
||||
action="store_true", default=None,
|
||||
help="try to auto reset develop board if possible")
|
||||
self.parser.add_argument("-d", "--dtr-reset", dest="dtr_reset",
|
||||
action="store_true", default=None,
|
||||
help="try to auto reset develop board if possible")
|
||||
self.parser.add_argument("-n", "--no-set-bootable", dest="no_set_bootable",
|
||||
action="store_true", default=None,
|
||||
help="not to set bootable")
|
||||
self.parser.add_argument(
|
||||
"-a",
|
||||
"--auto-reset",
|
||||
dest="auto_reset",
|
||||
action="store_true",
|
||||
default=None,
|
||||
help="try to auto reset develop board if possible",
|
||||
)
|
||||
self.parser.add_argument(
|
||||
"-d",
|
||||
"--dtr-reset",
|
||||
dest="dtr_reset",
|
||||
action="store_true",
|
||||
default=None,
|
||||
help="try to auto reset develop board if possible",
|
||||
)
|
||||
self.parser.add_argument(
|
||||
"-n",
|
||||
"--no-set-bootable",
|
||||
dest="no_set_bootable",
|
||||
action="store_true",
|
||||
default=None,
|
||||
help="not to set bootable",
|
||||
)
|
||||
|
||||
group = self.parser.add_argument_group()
|
||||
group.add_argument("-i", "--server-ip", dest="server_ip",
|
||||
help="the ip address connected to the telnet server")
|
||||
group.add_argument("-p", "--server-port", dest="server_port", type=int,
|
||||
help="the port connected to the telnet server")
|
||||
group.add_argument(
|
||||
"-i",
|
||||
"--server-ip",
|
||||
dest="server_ip",
|
||||
help="the ip address connected to the telnet server",
|
||||
)
|
||||
group.add_argument(
|
||||
"-p",
|
||||
"--server-port",
|
||||
dest="server_port",
|
||||
type=int,
|
||||
help="the port connected to the telnet server",
|
||||
)
|
||||
|
||||
group = self.parser.add_argument_group()
|
||||
group.add_argument("-c", "--serial-port", dest="serial_port", help="the serial port")
|
||||
group.add_argument("-b", "--xmodem-baudrate", dest="xmodem_baud", help="Use the faster baudrate in xmodem")
|
||||
group.add_argument(
|
||||
"-c", "--serial-port", dest="serial_port", help="the serial port"
|
||||
)
|
||||
group.add_argument(
|
||||
"-b",
|
||||
"--xmodem-baudrate",
|
||||
dest="xmodem_baud",
|
||||
help="Use the faster baudrate in xmodem",
|
||||
)
|
||||
|
||||
mutually_group = self.parser.add_mutually_exclusive_group()
|
||||
mutually_group.add_argument("-t", "--telnet-protocol", dest="telnet_protocol",
|
||||
action="store_true", default=None,
|
||||
help="use the telnet protocol for binary transmission")
|
||||
mutually_group.add_argument("-s", "--serial-protocol", dest="serial_protocol",
|
||||
action="store_true", default=None,
|
||||
help="use the serial port for binary transmission, default options")
|
||||
mutually_group.add_argument(
|
||||
"-t",
|
||||
"--telnet-protocol",
|
||||
dest="telnet_protocol",
|
||||
action="store_true",
|
||||
default=None,
|
||||
help="use the telnet protocol for binary transmission",
|
||||
)
|
||||
mutually_group.add_argument(
|
||||
"-s",
|
||||
"--serial-protocol",
|
||||
dest="serial_protocol",
|
||||
action="store_true",
|
||||
default=None,
|
||||
help="use the serial port for binary transmission, default options",
|
||||
)
|
||||
|
||||
mutually_group2 = self.parser.add_mutually_exclusive_group()
|
||||
mutually_group2.add_argument("-F", "--force-wait-reset", dest="wait_reset",
|
||||
action="store_true", default=None,
|
||||
help="force wait for pressing RESET button")
|
||||
mutually_group2.add_argument("-N", "--no-wait-reset", dest="wait_reset",
|
||||
action="store_false", default=None,
|
||||
help="if possible, skip to wait for pressing RESET button")
|
||||
mutually_group2.add_argument(
|
||||
"-F",
|
||||
"--force-wait-reset",
|
||||
dest="wait_reset",
|
||||
action="store_true",
|
||||
default=None,
|
||||
help="force wait for pressing RESET button",
|
||||
)
|
||||
mutually_group2.add_argument(
|
||||
"-N",
|
||||
"--no-wait-reset",
|
||||
dest="wait_reset",
|
||||
action="store_false",
|
||||
default=None,
|
||||
help="if possible, skip to wait for pressing RESET button",
|
||||
)
|
||||
|
||||
def update_config(self):
|
||||
args = self.parser.parse_args()
|
||||
@ -145,9 +224,9 @@ class ConfigArgsLoader():
|
||||
if ConfigArgs.PROTOCOL_TYPE == None:
|
||||
proto = os.environ.get("CXD56_PROTOCOL")
|
||||
if proto is not None:
|
||||
if 's' in proto:
|
||||
if "s" in proto:
|
||||
ConfigArgs.PROTOCOL_TYPE = PROTOCOL_SERIAL
|
||||
elif 't' in proto:
|
||||
elif "t" in proto:
|
||||
ConfigArgs.PROTOCOL_TYPE = PROTOCOL_TELNET
|
||||
|
||||
if ConfigArgs.PROTOCOL_TYPE == None:
|
||||
@ -172,7 +251,11 @@ class ConfigArgsLoader():
|
||||
if port is not None:
|
||||
ConfigArgs.SERVER_PORT = port
|
||||
else:
|
||||
print("CXD56_TELNETSRV_PORT is not set, Use " + str(ConfigArgs.SERVER_PORT) + ".")
|
||||
print(
|
||||
"CXD56_TELNETSRV_PORT is not set, Use "
|
||||
+ str(ConfigArgs.SERVER_PORT)
|
||||
+ "."
|
||||
)
|
||||
if args.server_ip is not None:
|
||||
ConfigArgs.SERVER_IP = args.server_ip
|
||||
else:
|
||||
@ -180,7 +263,11 @@ class ConfigArgsLoader():
|
||||
if ip is not None:
|
||||
ConfigArgs.SERVER_IP = ip
|
||||
else:
|
||||
print("CXD56_TELNETSRV_IP is not set, Use " + ConfigArgs.SERVER_IP + ".")
|
||||
print(
|
||||
"CXD56_TELNETSRV_IP is not set, Use "
|
||||
+ ConfigArgs.SERVER_IP
|
||||
+ "."
|
||||
)
|
||||
|
||||
if args.xmodem_baud is not None:
|
||||
ConfigArgs.XMODEM_BAUD = args.xmodem_baud
|
||||
@ -197,11 +284,12 @@ class ConfigArgsLoader():
|
||||
if args.wait_reset is not None:
|
||||
ConfigArgs.WAIT_RESET = args.wait_reset
|
||||
|
||||
|
||||
class TelnetDev:
|
||||
def __init__(self):
|
||||
srv_ipaddr = ConfigArgs.SERVER_IP
|
||||
srv_port = ConfigArgs.SERVER_PORT
|
||||
self.recvbuf = b''
|
||||
self.recvbuf = b""
|
||||
try:
|
||||
self.telnet = telnetlib.Telnet(host=srv_ipaddr, port=srv_port, timeout=10)
|
||||
# There is a ack to be sent after connecting to the telnet server.
|
||||
@ -211,22 +299,22 @@ class TelnetDev:
|
||||
sys.exit(e.args[0])
|
||||
|
||||
def readline(self, size=None):
|
||||
res = b''
|
||||
ch = b''
|
||||
res = b""
|
||||
ch = b""
|
||||
while ch != ConfigArgs.EOL:
|
||||
ch = self.getc_raw(1, timeout=0.1)
|
||||
if ch == b'':
|
||||
if ch == b"":
|
||||
return res
|
||||
res += ch
|
||||
return res
|
||||
|
||||
def getc_raw(self, size, timeout=1):
|
||||
res = b''
|
||||
res = b""
|
||||
tm = time.monotonic()
|
||||
while size > 0:
|
||||
while self.recvbuf == b'':
|
||||
while self.recvbuf == b"":
|
||||
self.recvbuf = self.telnet.read_eager()
|
||||
if self.recvbuf == b'':
|
||||
if self.recvbuf == b"":
|
||||
if (time.monotonic() - tm) > timeout:
|
||||
return res
|
||||
time.sleep(0.1)
|
||||
@ -241,7 +329,7 @@ class TelnetDev:
|
||||
def discard_inputs(self, timeout=1.0):
|
||||
while True:
|
||||
ch = self.getc_raw(1, timeout=timeout)
|
||||
if ch == b'':
|
||||
if ch == b"":
|
||||
break
|
||||
|
||||
def getc(self, size, timeout=1):
|
||||
@ -269,12 +357,12 @@ class TelnetDev:
|
||||
if MAX_DOT_COUNT < cur_count:
|
||||
cur_count = MAX_DOT_COUNT
|
||||
for idx in range(cur_count - self.count):
|
||||
print('#',end='')
|
||||
sys.stdout.flush()
|
||||
print("#", end="", flush=true)
|
||||
self.count = cur_count
|
||||
if self.count == MAX_DOT_COUNT:
|
||||
print("\n")
|
||||
|
||||
|
||||
class SerialDev:
|
||||
def __init__(self):
|
||||
if import_serial_module is False:
|
||||
@ -288,9 +376,14 @@ class SerialDev:
|
||||
else:
|
||||
port = ConfigArgs.SERIAL_PORT
|
||||
try:
|
||||
self.serial = serial.Serial(port, baudrate=115200,
|
||||
parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
|
||||
bytesize=serial.EIGHTBITS, timeout=0.1)
|
||||
self.serial = serial.Serial(
|
||||
port,
|
||||
baudrate=115200,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
timeout=0.1,
|
||||
)
|
||||
except Exception as e:
|
||||
print("Cannot open port : " + port)
|
||||
sys.exit(e.args[0])
|
||||
@ -328,7 +421,7 @@ class SerialDev:
|
||||
break
|
||||
|
||||
def setBaudrate(self, baudrate):
|
||||
# self.serial.setBaudrate(baudrate)
|
||||
# self.serial.setBaudrate(baudrate)
|
||||
self.serial.baudrate = baudrate
|
||||
|
||||
def reboot(self):
|
||||
@ -350,12 +443,13 @@ class SerialDev:
|
||||
if MAX_DOT_COUNT < cur_count:
|
||||
cur_count = MAX_DOT_COUNT
|
||||
for idx in range(cur_count - self.count):
|
||||
print('#',end='')
|
||||
print("#", end="")
|
||||
sys.stdout.flush()
|
||||
self.count = cur_count
|
||||
if self.count == MAX_DOT_COUNT:
|
||||
print("\n")
|
||||
|
||||
|
||||
class FlashWriter:
|
||||
def __init__(self, protocol_sel=PROTOCOL_SERIAL):
|
||||
if protocol_sel == PROTOCOL_TELNET:
|
||||
@ -363,28 +457,28 @@ class FlashWriter:
|
||||
else:
|
||||
self.serial = SerialDev()
|
||||
|
||||
def cancel_autoboot(self) :
|
||||
boot_msg = ''
|
||||
def cancel_autoboot(self):
|
||||
boot_msg = ""
|
||||
self.serial.reboot() # Target reboot before send 'r'
|
||||
while boot_msg == '' :
|
||||
while boot_msg == "":
|
||||
rx = self.serial.readline().strip()
|
||||
self.serial.write(b"r") # Send "r" key to avoid auto boot
|
||||
for msg in ROM_MSG :
|
||||
if msg in rx :
|
||||
for msg in ROM_MSG:
|
||||
if msg in rx:
|
||||
boot_msg = msg
|
||||
break
|
||||
while True :
|
||||
while True:
|
||||
rx = self.serial.readline().decode(errors="replace").strip()
|
||||
if "updater" in rx :
|
||||
if "updater" in rx:
|
||||
# Workaround : Sometime first character is dropped.
|
||||
# Send line feed as air shot before actual command.
|
||||
self.serial.write(b"\n") # Send line feed
|
||||
self.serial.discard_inputs()# Clear input buffer to sync
|
||||
self.serial.discard_inputs() # Clear input buffer to sync
|
||||
return boot_msg.decode(errors="ignore")
|
||||
|
||||
def recv(self):
|
||||
rx = self.serial.readline()
|
||||
if PRINT_RAW_COMMAND :
|
||||
if PRINT_RAW_COMMAND:
|
||||
serial_line = rx.decode(errors="replace")
|
||||
if serial_line.strip() != "" and not serial_line.startswith(XMDM_MSG):
|
||||
print(serial_line, end="")
|
||||
@ -408,37 +502,39 @@ class FlashWriter:
|
||||
def send(self, string):
|
||||
self.serial.write(str(string).encode() + b"\n")
|
||||
rx = self.serial.readline()
|
||||
if PRINT_RAW_COMMAND :
|
||||
if PRINT_RAW_COMMAND:
|
||||
print(rx.decode(errors="replace"), end="")
|
||||
|
||||
def read_output(self, prompt_text) :
|
||||
def read_output(self, prompt_text):
|
||||
output = []
|
||||
while True :
|
||||
while True:
|
||||
rx = self.serial.readline()
|
||||
if prompt_text.encode() in rx :
|
||||
if prompt_text.encode() in rx:
|
||||
time.sleep(0.1)
|
||||
break
|
||||
if rx != "" :
|
||||
if rx != "":
|
||||
output.append(rx.decode(errors="ignore").rstrip())
|
||||
return output
|
||||
|
||||
def install_files(self, files, command) :
|
||||
def install_files(self, files, command):
|
||||
if ConfigArgs.XMODEM_BAUD:
|
||||
command += " -b " + ConfigArgs.XMODEM_BAUD
|
||||
if os.name == 'nt':
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc_win, 'xmodem1k')
|
||||
if os.name == "nt":
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc_win, "xmodem1k")
|
||||
else:
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc, 'xmodem1k')
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc, "xmodem1k")
|
||||
for file in files:
|
||||
with open(file, "rb") as bin :
|
||||
with open(file, "rb") as bin:
|
||||
self.send(command)
|
||||
print("Install " + file)
|
||||
self.wait(XMDM_MSG)
|
||||
print("|0%" +
|
||||
"-" * (int(MAX_DOT_COUNT / 2) - 6) +
|
||||
"50%" +
|
||||
"-" * (MAX_DOT_COUNT - int(MAX_DOT_COUNT / 2) - 5) +
|
||||
"100%|")
|
||||
print(
|
||||
"|0%"
|
||||
+ "-" * (int(MAX_DOT_COUNT / 2) - 6)
|
||||
+ "50%"
|
||||
+ "-" * (MAX_DOT_COUNT - int(MAX_DOT_COUNT / 2) - 5)
|
||||
+ "100%|"
|
||||
)
|
||||
if ConfigArgs.XMODEM_BAUD:
|
||||
self.serial.setBaudrate(ConfigArgs.XMODEM_BAUD)
|
||||
self.serial.discard_inputs() # Clear input buffer to sync
|
||||
@ -448,17 +544,17 @@ class FlashWriter:
|
||||
self.serial.setBaudrate(115200)
|
||||
self.wait_for_prompt()
|
||||
|
||||
def save_files(self, files) :
|
||||
def save_files(self, files):
|
||||
if ConfigArgs.XMODEM_BAUD:
|
||||
command = "save_file -b " + ConfigArgs.XMODEM_BAUD + " -x "
|
||||
else:
|
||||
command = "save_file -x "
|
||||
if os.name == 'nt':
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc_win, 'xmodem1k')
|
||||
if os.name == "nt":
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc_win, "xmodem1k")
|
||||
else:
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc, 'xmodem1k')
|
||||
modem = xmodem.XMODEM(self.serial.getc, self.serial.putc, "xmodem1k")
|
||||
for file in files:
|
||||
with open(file, "rb") as bin :
|
||||
with open(file, "rb") as bin:
|
||||
self.send(command + os.path.basename(file))
|
||||
print("Save " + file)
|
||||
self.wait(XMDM_MSG)
|
||||
@ -473,14 +569,15 @@ class FlashWriter:
|
||||
self.send("chmod d+rw " + os.path.basename(file))
|
||||
self.wait_for_prompt()
|
||||
|
||||
def delete_files(self, files) :
|
||||
for file in files :
|
||||
def delete_files(self, files):
|
||||
for file in files:
|
||||
self.delete_binary(file)
|
||||
|
||||
def delete_binary(self, bin_name) :
|
||||
def delete_binary(self, bin_name):
|
||||
self.send("rm " + bin_name)
|
||||
self.wait_for_prompt()
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
config_loader = ConfigArgsLoader()
|
||||
@ -516,29 +613,34 @@ def main():
|
||||
|
||||
if do_wait_reset:
|
||||
# Wait to reset the board
|
||||
print('Please press RESET button on target board')
|
||||
print("Please press RESET button on target board")
|
||||
sys.stdout.flush()
|
||||
bootrom_msg = writer.cancel_autoboot()
|
||||
|
||||
# Remove files
|
||||
if ConfigArgs.ERASE_NAME :
|
||||
if ConfigArgs.ERASE_NAME:
|
||||
print(">>> Remove existing files ...")
|
||||
writer.delete_files(ConfigArgs.ERASE_NAME)
|
||||
|
||||
# Install files
|
||||
if ConfigArgs.PACKAGE_NAME or ConfigArgs.PKGSYS_NAME or ConfigArgs.PKGAPP_NAME or ConfigArgs.PKGUPD_NAME:
|
||||
if (
|
||||
ConfigArgs.PACKAGE_NAME
|
||||
or ConfigArgs.PKGSYS_NAME
|
||||
or ConfigArgs.PKGAPP_NAME
|
||||
or ConfigArgs.PKGUPD_NAME
|
||||
):
|
||||
print(">>> Install files ...")
|
||||
if ConfigArgs.PACKAGE_NAME :
|
||||
if ConfigArgs.PACKAGE_NAME:
|
||||
writer.install_files(ConfigArgs.PACKAGE_NAME, "install")
|
||||
if ConfigArgs.PKGSYS_NAME :
|
||||
if ConfigArgs.PKGSYS_NAME:
|
||||
writer.install_files(ConfigArgs.PKGSYS_NAME, "install")
|
||||
if ConfigArgs.PKGAPP_NAME :
|
||||
if ConfigArgs.PKGAPP_NAME:
|
||||
writer.install_files(ConfigArgs.PKGAPP_NAME, "install")
|
||||
if ConfigArgs.PKGUPD_NAME :
|
||||
if ConfigArgs.PKGUPD_NAME:
|
||||
writer.install_files(ConfigArgs.PKGUPD_NAME, "install -k updater.key")
|
||||
|
||||
# Save files
|
||||
if ConfigArgs.FILE_NAME :
|
||||
if ConfigArgs.FILE_NAME:
|
||||
print(">>> Save files ...")
|
||||
writer.save_files(ConfigArgs.FILE_NAME)
|
||||
|
||||
@ -552,12 +654,13 @@ def main():
|
||||
writer.send("sync")
|
||||
writer.wait_for_prompt()
|
||||
|
||||
if REBOOT_AT_END :
|
||||
if REBOOT_AT_END:
|
||||
print("Restarting the board ...")
|
||||
writer.send("reboot")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
sys.exit(main())
|
||||
|
Loading…
Reference in New Issue
Block a user