nuttx/tools/ci/testrun/utils/common.py

337 lines
10 KiB
Python
Raw Normal View History

#!/usr/bin/python3
import os
import re
import subprocess
import time
import pexpect
import pexpect.fdpexpect
import serial
rootPath = os.path.dirname(os.path.abspath(__file__))
class connectNuttx(object):
def __init__(
self,
board,
vela_path,
dev,
log_path,
fs,
ota_version,
core,
sudo,
ci,
method,
target,
):
self.board = board
self.path = vela_path
self.dev = dev
self.log_path = log_path
self.fs = fs
self.version = ota_version
self.sudo = sudo
self.ci = ci
self.core = core
self.method = method
self.target = target
self.enter = "\r"
self.debug_flag = 0
# get PROMPT value and rate value
self.PROMPT = getConfigValue(
self.path, self.board, core=self.core, flag="NSH_PROMPT_STRING"
)
self.rate = getConfigValue(
self.path, self.board, core=self.core, flag="UART0_BAUD"
)
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
def setup(self):
self.start_time = time.strftime("%Y%m%d_%H%M%S", time.localtime())
print("============ start at %s ============" % self.start_time)
if self.target == "sim":
# start nuttx for sim
start.startNuttx(self, self.path, self.log_path, self.board, self.sudo)
if self.target == "qemu":
start.startQemu(self, self.path, self.log_path, self.board, self.sudo)
if self.target in ["target", "module"]:
# start minicom
if self.method == "minicom":
start.startMinicom(
self, self.dev, self.board, self.log_path, self.core, self.rate
)
else:
start.startSerial(
self, self.dev, self.board, self.log_path, self.core, self.rate
)
def sendcontrol(self, char):
char = char.lower()
a = ord(char)
if 97 <= a <= 122:
a = a - ord("a") + 1
byte = bytes([a])
return byte
d = {
"@": 0,
"`": 0,
"[": 27,
"{": 27,
"\\": 28,
"|": 28,
"]": 29,
"}": 29,
"^": 30,
"~": 30,
"_": 31,
"?": 127,
}
if char not in d:
return b""
byte = bytes(d[char])
return byte
def sendControlCmd(self, cmd, expect="ap>", timeout=10):
if self.method == "minicom":
self.process.sendcontrol(cmd)
time.sleep(1)
self.process.sendcontrol(cmd)
else:
byte = self.sendcontrol(cmd)
time.sleep(1)
self.process.send(byte)
time.sleep(1)
self.process.send(byte)
time.sleep(1)
self.process.sendline("\n")
ret = self.process.expect_exact(expect)
return ret
# send command to nsh
def sendCommand(self, cmd, expect="", timeout=10, flag=""):
if self.method != "minicom":
time.sleep(0.5)
if not expect:
expect = self.PROMPT
self.process.buffer = b""
self.process.sendline(cmd)
try:
ret = self.process.expect(expect, timeout=timeout)
except pexpect.TIMEOUT:
print("Debug: TIMEOUT '%s' exist and run next test case" % cmd)
ret = -1
except pexpect.EOF:
print("Debug: EOF raise exception")
ret = -2
finally:
if self.debug_flag:
self.debug(cmd, ret)
self.process.buffer = b""
self.process.sendline("\n")
if flag:
is_newline = self.process.expect_exact(flag, timeout=timeout)
else:
is_newline = self.process.expect_exact(self.PROMPT, timeout=timeout)
if self.debug_flag:
self.debug("NEWLINE", is_newline)
if self.method != "minicom":
time.sleep(0.5)
return ret
def switch_to_original_core(self):
if self.target == "target":
self.sendControlCmd("c")
if self.core != "ap":
self.process.sendline("cu -l /dev/tty%s\n" % self.core.upper())
self.process.expect_exact(self.PROMPT)
def debug(self, cmd, ret):
print("********************* DEBUG START ********************")
if cmd == "\n":
cmd = r"\n"
print("cmd: %s\n" % cmd)
print("ret: %s\n" % str(ret))
print("before: %s\n" % repr(self.process.before))
print("after: %s\n" % repr(self.process.after))
print("buffer: %s\n" % repr(self.process.buffer))
print("********************** DEBUG END **********************")
def cleanup(self):
if self.target == "sim":
self.process.sendline("poweroff")
if self.target == "qemu":
self.sendControlCmd("a", self.PROMPT)
self.process.sendline("x")
class start:
def startMinicom(self, dev, board, log_path, core, rate):
self.log = "{}/{}_{}.cap".format(log_path, dev[-4:], self.start_time)
self.process = pexpect.spawn(
r"sudo minicom -D {} -b {} -o -C {}".format(dev, rate, self.log),
maxread=200000,
)
self.process.expect("Welcome to minicom")
self.switch_to_original_core()
def startSerial(self, dev, board, log_path, core, rate):
self.log = "{}/{}_{}.cap".format(log_path, dev[-4:], self.start_time)
self.logFile = open(self.log, "ab+")
self.ser = serial.Serial(port=dev, baudrate=int(rate))
self.process = pexpect.fdpexpect.fdspawn(
self.ser, "wb", maxread=20000, logfile=self.logFile
)
self.switch_to_original_core()
def startNuttx(self, path, log_path, board="sim", sudo=False):
os.chdir(path)
self.log = "{}/{}_{}.log".format(log_path, board, self.start_time)
if sudo:
if board in ["sim_rpserver", "sim_rpproxy"]:
os.chdir(path)
self.process = pexpect.spawn(
"bash", ["-c", "sudo ./rpserver/nuttx/nuttx | tee %s" % self.log]
)
else:
self.process = pexpect.spawn(
"bash", ["-c", "sudo ./nuttx | tee %s" % self.log]
)
else:
self.process = pexpect.spawn("bash", ["-c", "./nuttx | tee %s" % self.log])
self.process.expect(self.PROMPT)
def startQemu(self, path, log_path, board="qemu", sudo=False):
os.chdir(path)
self.log = "{}/{}_{}.log".format(log_path, board, self.start_time)
flag1 = getConfigValue(path, board, core=None, flag="ARCH_CHIP")
if flag1 == "imx6":
self.process = pexpect.spawn(
"bash",
[
"-c",
"qemu-system-arm -M sabrelite -smp 1 -bios none -kernel ./nuttx -nographic | tee %s"
% self.log,
],
)
if flag1 == "qemu-rv":
flag2 = getConfigValue(path, board, core=None, flag="ARCH_RV64")
if flag2:
riscv = "qemu-system-riscv64"
else:
riscv = "qemu-system-riscv32"
self.process = pexpect.spawn(
"bash",
[
"-c",
"%s -M virt -bios ./nuttx -nographic | tee %s" % (riscv, self.log),
],
)
self.process.expect(self.PROMPT)
def runCmd(cmd):
p = subprocess.Popen(
cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate()
recode = p.returncode
if recode != 0:
print("Debug: run command '%s' failed" % cmd)
return stdout
# find file
def findFile(path, flag, board, core=None, match=True):
fList = []
for root, dirs, files in os.walk(path):
for name in files:
if not match and flag in name:
fList.append(os.path.join(root, name))
if match and name == flag and ".o" not in name:
fList.append(os.path.join(root, name))
if len(fList) != 1:
fList = [x for x in fList if board in x and core + "/" in x]
print(fList)
return fList
# get CONFIG_NSH_PROMPT_STRING value
def getConfigValue(path, board, core, flag):
value = ""
l1 = findFile(path, ".config", board, core=core)
with open(l1[0], "r+") as f:
lines = f.readlines()
f.close()
print(lines)
print(flag)
for line in lines:
if flag + "=" in line:
value = line.split("=")[1]
if '"' in value:
value = value.strip('"').strip()
print(value)
return value
# read log and extract timestamp
def getTimestamp(filename, str1, str2, pattern):
with open(filename, "r") as f:
buff = f.read()
pat = re.compile(str1 + "(.*?)" + str2, re.S)
result = pat.findall(buff)
timestamp = re.findall(pattern, str(result)) # extract timestamp
return timestamp
# calculate the time difference for each time
def getDif(timestamp, rateANDinterance):
lst = []
for i in range(1, len(timestamp)):
difts = int(timestamp[i]) - int(timestamp[i - 1])
float_num = (difts / 1000000) - rateANDinterance
lst.append(float_num)
return lst
# read log
def getLog(filename, str1, str2):
l1 = []
with open(filename, "r", encoding="utf-8", errors="ignore") as f:
buff = f.read()
f.close()
pat = re.compile(str1 + "(.*?)" + str2, re.S)
result = pat.findall(buff)
if "gtest" in str1:
return result[0].strip("\n")
else:
# add line to l1
for i in result[0].strip("\n").split("\n"):
l1.append(i)
return l1
def rmfile(p, core, file):
if p.core == core:
p.sendCommand("rm -r" + file)
else:
p.process.sendline("cu -l /dev/tty" + core.upper())
p.sendCommand("\n", core, flag=core + ">")
p.sendCommand("rm -r " + file, core, flag=core + ">")
if p.core == "ap":
p.sendControlCmd("c")
else:
p.switch_to_original_core(p)