tools/minidumpserver: refactor codes

VELAPLATFO-16411

1. refactor code style and simplify parse method
2. change mem find algorythm to bi-search

Signed-off-by: xinbingnan <xinbingnan@xiaomi.com>
This commit is contained in:
xinbingnan 2023-09-12 13:48:41 +08:00 committed by Xiang Xiao
parent 9b68971853
commit 4c75318778

View File

@ -36,80 +36,10 @@ SHF_EXEC = 0x4
SHF_WRITE_ALLOC = SHF_WRITE | SHF_ALLOC SHF_WRITE_ALLOC = SHF_WRITE | SHF_ALLOC
SHF_ALLOC_EXEC = SHF_ALLOC | SHF_EXEC SHF_ALLOC_EXEC = SHF_ALLOC | SHF_EXEC
GDB_SIGNAL_DEFAULT = 7
logger = logging.getLogger() logger = logging.getLogger()
class dump_elf_file:
"""
Class to parse ELF file for memory content in various sections.
There are read-only sections (e.g. text and rodata) where
the memory content does not need to be dumped via coredump
and can be retrieved from the ELF file.
"""
def __init__(self, elffile: str):
self.elffile = elffile
self.fd = None
self.elf = None
self.memories = list()
def open(self):
self.fd = open(self.elffile, "rb")
self.elf = ELFFile(self.fd)
def close(self):
self.fd.close()
def parse(self):
if self.fd is None:
self.open()
for section in self.elf.iter_sections():
# REALLY NEED to match exact type as all other sections
# (debug, text, etc.) are descendants where
# isinstance() would match.
if (
type(section) is not elftools.elf.sections.Section
): # pylint: disable=unidiomatic-typecheck
continue
size = section["sh_size"]
flags = section["sh_flags"]
start = section["sh_addr"]
end = start + size - 1
store = False
desc = "?"
if section["sh_type"] == "SHT_PROGBITS":
if (flags & SHF_ALLOC_EXEC) == SHF_ALLOC_EXEC:
# Text section
store = True
desc = "text"
elif (flags & SHF_WRITE_ALLOC) == SHF_WRITE_ALLOC:
# Data section
#
# Running app changes the content so no need
# to store
pass
elif (flags & SHF_ALLOC) == SHF_ALLOC:
# Read only data section
store = True
desc = "read-only data"
if store:
memory = {"start": start, "end": end, "data": section.data()}
logger.info(
"ELF Section: 0x%x to 0x%x of size %d (%s)"
% (memory["start"], memory["end"], len(memory["data"]), desc)
)
self.memories.append(memory)
return True
reg_table = { reg_table = {
"arm": { "arm": {
"R0": 0, "R0": 0,
@ -280,127 +210,183 @@ reg_fix_value = {
} }
class dump_log_file: def str_get_after(s, sub):
index = s.find(sub)
if index == -1:
return None
return s[index + len(sub) :]
def pack_memory(start, end, data):
return {"start": start, "end": end, "data": data}
class DumpELFFile:
"""
Class to parse ELF file for memory content in various sections.
There are read-only sections (e.g. text and rodata) where
the memory content does not need to be dumped via coredump
and can be retrieved from the ELF file.
"""
def __init__(self, elffile: str):
self.elffile = elffile
self.__memories = []
def parse(self):
self.__memories = []
elf = ELFFile.load_from_path(self.elffile)
for section in elf.iter_sections():
# REALLY NEED to match exact type as all other sections
# (debug, text, etc.) are descendants where
# isinstance() would match.
if (
type(section) is not elftools.elf.sections.Section
): # pylint: disable=unidiomatic-typecheck
continue
size = section["sh_size"]
flags = section["sh_flags"]
start = section["sh_addr"]
end = start + size - 1
store = False
desc = "?"
if section["sh_type"] == "SHT_PROGBITS":
if (flags & SHF_ALLOC_EXEC) == SHF_ALLOC_EXEC:
# Text section
store = True
desc = "text"
elif (flags & SHF_WRITE_ALLOC) == SHF_WRITE_ALLOC:
# Data section
#
# Running app changes the content so no need
# to store
pass
elif (flags & SHF_ALLOC) == SHF_ALLOC:
# Read only data section
store = True
desc = "read-only data"
if store:
memory = pack_memory(start, end, section.data())
logger.debug(
f"ELF Section: {hex(memory['start'])} to {hex(memory['end'])} of size {len(memory['data'])} ({desc})"
)
self.__memories.append(memory)
elf.close()
return True
def get_memories(self):
return self.__memories
class DumpLogFile:
def __init__(self, logfile: str): def __init__(self, logfile: str):
self.logfile = logfile self.logfile = logfile
self.fd = None
self.arch = ""
self.registers = [] self.registers = []
self.memories = list() self.__memories = list()
self.reg_table = dict()
def open(self): def _init_register(self):
self.fd = open(self.logfile, "r") self.registers = [b"x"] * (max(self.reg_table.values()) + 1)
def close(self): def _parse_register(self, line):
self.fd.closeself() line = str_get_after(line, "up_dump_register:")
if line is None:
return False
line = line.strip()
# find register value
find_res = re.findall(r"(?P<REG>\w+): (?P<REGV>[0-9a-fA-F]+)", line)
for reg_name, reg_val in find_res:
if reg_name in self.reg_table:
reg_index = self.reg_table[reg_name]
self.registers[reg_index] = int(reg_val, 16)
return True
def _parse_fix_register(self, arch):
if arch in reg_fix_value:
for reg_name, reg_vals in reg_fix_value[arch].items():
reg_index = self.reg_table[reg_name]
self.registers[reg_index] = reg_vals
def _parse_stack(self, line, start, data):
line = str_get_after(line, "stack_dump:")
if line is None:
return None
line = line.strip()
# find stack-dump
match_res = re.match(r"(?P<ADDR_START>0x\w+): (?P<VALS>( ?\w+)+)", line)
if match_res is None:
return None
addr_start = int(match_res.groupdict()["ADDR_START"], 16)
if start + len(data) != addr_start:
# stack is not contiguous
if len(data) == 0:
start = addr_start
else:
self.__memories.append(pack_memory(start, start + len(data), data))
data = b""
start = addr_start
for val in match_res.groupdict()["VALS"].split():
data = data + struct.pack("<I", int(val, 16))
return start, data
def parse(self, arch): def parse(self, arch):
self.reg_table = reg_table[arch]
self._init_register()
data = bytes() data = bytes()
start = 0 start = 0
if self.fd is None:
self.open()
linenumber = 0 with open(self.logfile, "r") as f:
try: lines = f.readlines()
while 1:
line = self.fd.readline()
if line == "":
break
linenumber += 1 for line_num, line in enumerate(lines):
tmp = re.search("up_dump_register:", line) if line == "":
if tmp is not None: break
# find arch
if arch is None:
self.arch = tmp.group(1)
else:
self.arch = arch
if self.arch not in reg_table: try:
logger.error("%s not supported" % (self.arch)) if self._parse_register(line):
# init register list
if len(self.registers) == 0:
for x in range(max(reg_table[self.arch].values()) + 1):
self.registers.append(b"x")
# find register value
line = line[tmp.span()[1] :]
line = line.replace("\n", " ")
while 1:
tmp = re.search("([^ ]+):", line)
if tmp is None:
break
register = tmp.group(1)
line = line[tmp.span()[1] :]
tmp = re.search("([0-9a-fA-F]+) ", line)
if tmp is None:
break
if register in reg_table[self.arch].keys():
self.registers[reg_table[self.arch][register]] = int(
"0x" + tmp.group().replace(" ", ""), 16
)
line = line[tmp.span()[1] :]
continue continue
if self.arch in reg_fix_value: res = self._parse_stack(line, start, data)
for register in reg_fix_value[self.arch].keys(): if res:
self.registers[reg_table[self.arch][register]] = reg_fix_value[ start, data = res
self.arch continue
][register]
tmp = re.search("stack_dump:", line) except Exception as e:
if tmp is not None: logger.error("parse log file error: %s line_number %d" % (e, line_num))
# find stackdump sys.exit(1)
line = line[tmp.span()[1] :]
tmp = re.search("([0-9a-fA-F]+):", line)
if tmp is not None:
line_start = int("0x" + tmp.group()[:-1], 16)
if start + len(data) != line_start: self._parse_fix_register(arch)
# stack is not contiguous if data:
if len(data) == 0: self.__memories.append(pack_memory(start, start + len(data), data))
start = line_start
else:
memory = {
"start": start,
"end": start + len(data),
"data": data,
}
self.memories.append(memory)
data = b""
start = line_start
line = line[tmp.span()[1] :] def get_memories(self):
line = line.replace("\n", " ") return self.__memories
while 1:
# record stack value
tmp = re.search(" ([0-9a-fA-F]+)", line)
if tmp is None:
break
data = data + struct.pack(
"<I", int("0x" + tmp.group().replace(" ", ""), 16)
)
line = line[tmp.span()[1] :]
except Exception as e:
logger.error("parse log file error: %s linenumber %d" % (e, linenumber))
os._exit(0)
if len(data):
memory = {"start": start, "end": start + len(data), "data": data}
self.memories.append(memory)
GDB_SIGNAL_DEFAULT = 7 class GDBStub:
def __init__(self, logfile: DumpLogFile, elffile: DumpELFFile):
class gdb_stub:
def __init__(self, logfile: dump_log_file, elffile: dump_elf_file):
self.logfile = logfile self.logfile = logfile
self.elffile = elffile self.elffile = elffile
self.socket = None self.socket = None
self.gdb_signal = GDB_SIGNAL_DEFAULT self.gdb_signal = GDB_SIGNAL_DEFAULT
self.mem_regions = self.elffile.memories + self.logfile.memories self.mem_regions = self.elffile.get_memories() + self.logfile.get_memories()
self.mem_regions.sort(key=lambda x: x["start"])
def get_gdb_packet(self): def get_gdb_packet(self):
socket = self.socket socket = self.socket
@ -511,19 +497,29 @@ class gdb_stub:
# the 'm' packet for reading memory: m<addr>,<len> # the 'm' packet for reading memory: m<addr>,<len>
def get_mem_region(addr): def get_mem_region(addr):
for r in self.mem_regions: left = 0
if r["start"] <= addr <= r["end"]: right = len(self.mem_regions) - 1
return r while left <= right:
mid = (left + right) // 2
if (
self.mem_regions[mid]["start"]
<= addr
<= self.mem_regions[mid]["end"]
):
return self.mem_regions[mid]
elif addr < self.mem_regions[mid]["start"]:
right = mid - 1
else:
left = mid + 1
return None return None
# extract address and length from packet # extract address and length from packet
# and convert them into usable integer values # and convert them into usable integer values
addr, length = pkt[1:].split(b",") addr, length = pkt[1:].split(b",")
s_addr = int(b"0x" + addr, 16) s_addr = int(addr, 16)
length = int(b"0x" + length, 16) length = int(length, 16)
# FIXME: Need more efficient way of extracting memory content
remaining = length remaining = length
addr = s_addr addr = s_addr
barray = b"" barray = b""
@ -533,10 +529,6 @@ class gdb_stub:
barray = None barray = None
break break
if addr > r["end"]:
r = get_mem_region(addr)
continue
offset = addr - r["start"] offset = addr - r["start"]
barray += r["data"][offset : offset + 1] barray += r["data"][offset : offset + 1]
@ -596,27 +588,33 @@ class gdb_stub:
self.put_gdb_packet(b"") self.put_gdb_packet(b"")
if __name__ == "__main__": def arg_parser():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-e", "--elffile", required=True, help="elffile") parser.add_argument("-e", "--elffile", required=True, help="elffile")
parser.add_argument("-l", "--logfile", required=True, help="logfile") parser.add_argument("-l", "--logfile", required=True, help="logfile")
parser.add_argument( parser.add_argument(
"-a", "-a",
"--arch", "--arch",
help="select architecture,if not use this options,\ help="select architecture,if not use this options",
The architecture will be inferred from the logfile", required=True,
choices=[arch for arch in reg_table.keys()], choices=[arch for arch in reg_table.keys()],
) )
parser.add_argument("-p", "--port", help="gdbport", type=int, default=1234) parser.add_argument("-p", "--port", help="gdbport", type=int, default=1234)
parser.add_argument("--debug", action="store_true", default=False) parser.add_argument("--debug", action="store_true", default=False)
return parser.parse_args()
args = parser.parse_args()
def config_log(debug):
if debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
logging.basicConfig(format="[%(levelname)s][%(name)s] %(message)s")
def main(args):
if not os.path.isfile(args.elffile): if not os.path.isfile(args.elffile):
logger.error(f"Cannot find file {args.elffile}, exiting...") logger.error(f"Cannot find file {args.elffile}, exiting...")
sys.exit(1) sys.exit(1)
@ -625,18 +623,14 @@ if __name__ == "__main__":
logger.error(f"Cannot find file {args.logfile}, exiting...") logger.error(f"Cannot find file {args.logfile}, exiting...")
sys.exit(1) sys.exit(1)
if args.debug: config_log(args.debug)
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
log = dump_log_file(args.logfile) log = DumpLogFile(args.logfile)
log.parse(args.arch) log.parse(args.arch)
elf = dump_elf_file(args.elffile) elf = DumpELFFile(args.elffile)
elf.parse() elf.parse()
gdbstub = gdb_stub(log, elf) gdb_stub = GDBStub(log, elf)
logging.basicConfig(format="[%(levelname)s][%(name)s] %(message)s")
gdbserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) gdbserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -649,6 +643,7 @@ if __name__ == "__main__":
logger.info(f"Waiting GDB connection on port {args.port} ...") logger.info(f"Waiting GDB connection on port {args.port} ...")
logger.info("Press Ctrl+C to stop ...") logger.info("Press Ctrl+C to stop ...")
logger.info(f'Hint: gdb {args.elffile} -ex "target remote localhost:{args.port}"')
while True: while True:
try: try:
@ -656,8 +651,12 @@ if __name__ == "__main__":
if conn: if conn:
logger.info(f"Accepted GDB connection from {remote}") logger.info(f"Accepted GDB connection from {remote}")
gdbstub.run(conn) gdb_stub.run(conn)
except KeyboardInterrupt: except KeyboardInterrupt:
break break
gdbserver.close() gdbserver.close()
if __name__ == "__main__":
main(arg_parser())