#!/usr/bin/env python3 # tools/gdbserver.py # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. The # ASF licenses this file to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance with the # License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # import argparse import binascii import logging import multiprocessing import os import re import shutil import socket import struct import subprocess import sys import elftools from elftools.elf.elffile import ELFFile # ELF section flags SHF_WRITE = 0x1 SHF_ALLOC = 0x2 SHF_EXEC = 0x4 SHF_WRITE_ALLOC = SHF_WRITE | SHF_ALLOC SHF_ALLOC_EXEC = SHF_ALLOC | SHF_EXEC GDB_SIGNAL_DEFAULT = 7 UINT16_MAX = 65535 DEFAULT_GDB_INIT_CMD = "-ex 'bt full' -ex 'info reg' -ex 'display /40i $pc-40'" logger = logging.getLogger() # The global register table is dictionary like {arch:{reg:ndx}} # # where arch is the CPU architecture name; # reg is the name of the register as used in log file # ndx is the index of the register in GDB group registers list # # Registers with multiple convenient names can have multiple entries here, one # for each name and with the same index. reg_table = { "arm": { "R0": 0, "R1": 1, "R2": 2, "R3": 3, "R4": 4, "R5": 5, "R6": 6, "FP": 7, "R8": 8, "SB": 9, "SL": 10, "R11": 11, "IP": 12, "SP": 13, "LR": 14, "PC": 15, "xPSR": 16, }, "arm-a": { "R0": 0, "R1": 1, "R2": 2, "R3": 3, "R4": 4, "R5": 5, "R6": 6, "R7": 7, "R8": 8, "SB": 9, "SL": 10, "FP": 11, "IP": 12, "SP": 13, "LR": 14, "PC": 15, "CPSR": 41, }, "arm-t": { "R0": 0, "R1": 1, "R2": 2, "R3": 3, "R4": 4, "R5": 5, "R6": 6, "FP": 7, "R8": 8, "SB": 9, "SL": 10, "R11": 11, "IP": 12, "SP": 13, "LR": 14, "PC": 15, "CPSR": 41, }, # rv64 works with gdb-multiarch on Ubuntu "riscv": { "ZERO": 0, "RA": 1, "SP": 2, "GP": 3, "TP": 4, "T0": 5, "T1": 6, "T2": 7, "FP": 8, "S1": 9, "A0": 10, "A1": 11, "A2": 12, "A3": 13, "A4": 14, "A5": 15, "A6": 16, "A7": 17, "S2": 18, "S3": 19, "S4": 20, "S5": 21, "S6": 22, "S7": 23, "S8": 24, "S9": 25, "S10": 26, "S11": 27, "T3": 28, "T4": 29, "T5": 30, "T6": 31, "PC": 32, "S0": 8, "EPC": 32, }, # use xtensa-esp32s3-elf-gdb register table "esp32s3": { "PC": 0, "PS": 73, "A0": 1, "A1": 2, "A2": 3, "A3": 4, "A4": 5, "A5": 6, "A6": 7, "A7": 8, "A8": 9, "A9": 10, "A10": 11, "A11": 12, "A12": 13, "A13": 14, "A14": 15, "A15": 16, "WINDOWBASE": 69, "WINDOWSTART": 70, "CAUSE": 190, "VADDR": 196, "LBEG": 65, "LEND": 66, "LCNT": 67, "SAR": 68, "SCOM": 76, }, # use xt-gdb register table "xtensa": { "PC": 32, "PS": 742, "A0": 256, "A1": 257, "A2": 258, "A3": 259, "A4": 260, "A5": 261, "A6": 262, "A7": 263, "A8": 264, "A9": 265, "A10": 266, "A11": 267, "A12": 268, "A13": 269, "A14": 270, "A15": 271, "WINDOWBASE": 584, "WINDOWSTART": 585, "CAUSE": 744, "VADDR": 750, "LBEG": 512, "LEND": 513, "LCNT": 514, "SAR": 515, "SCOM": 524, }, } # make sure the a0-a15 can be remapped to the correct register reg_fix_value = { "esp32s3": { "WINDOWBASE": (0, 69), "WINDOWSTART": (1, 70), "PS": (0x40000, 73), }, "xtensa": { "WINDOWBASE": (0, 584), "WINDOWSTART": (1, 585), "PS": (0x40000, 742), }, "riscv": { "ZERO": 0, }, } def str_get_after(s, sub): index = s.find(sub) if index == -1: return None return s[index + len(sub) :] def pack_memory(start, end, data): return {"start": start, "end": end, "data": data} class DumpELFFile: """ Class to parse ELF file for memory content in various sections. There are read-only sections (e.g. text and rodata) where the memory content does not need to be dumped via coredump and can be retrieved from the ELF file. """ def __init__(self, elffile: str): self.elffile = elffile self.__memories = [] self.__arch = None self.__xlen = None self.__text = 0 def parse(self): self.__memories = [] elf = ELFFile.load_from_path(self.elffile) self.__arch = elf.get_machine_arch().lower().replace("-", "") self.__xlen = elf.elfclass for section in elf.iter_sections(): # REALLY NEED to match exact type as all other sections # (debug, text, etc.) are descendants where # isinstance() would match. if ( type(section) is not elftools.elf.sections.Section ): # pylint: disable=unidiomatic-typecheck continue size = section["sh_size"] flags = section["sh_flags"] start = section["sh_addr"] end = start + size - 1 store = False desc = "?" if section["sh_type"] == "SHT_PROGBITS": if (flags & SHF_ALLOC_EXEC) == SHF_ALLOC_EXEC: # Text section store = True desc = "text" elif (flags & SHF_WRITE_ALLOC) == SHF_WRITE_ALLOC: # Data or Rodata section, rodata store in ram in some case store = True desc = "data or rodata" elif (flags & SHF_ALLOC) == SHF_ALLOC: # Read only data section store = True desc = "read-only data" if store: memory = pack_memory(start, end, section.data()) logger.debug( f"ELF Section: {hex(memory['start'])} to {hex(memory['end'])} of size {len(memory['data'])} ({desc})" ) self.__memories.append(memory) # record first text segment address for segment in elf.iter_segments(): if segment.header.p_flags & 1 and not self.__text: self.__text = segment.header.p_vaddr symtab = elf.get_section_by_name(".symtab") self.symbol = {} for symbol in symtab.iter_symbols(): if symbol["st_info"]["type"] != "STT_OBJECT": continue if symbol.name in ( "g_tcbinfo", "g_pidhash", "g_npidhash", "g_last_regs", "g_running_tasks", ): self.symbol[symbol.name] = symbol logger.debug( f"name:{symbol.name} size:{symbol['st_size']} value:{hex(symbol['st_value'])}" ) elf.close() return True def merge(self, other): if other.arch() == self.arch() and other.xlen() == self.xlen(): self.__memories += other.get_memories() else: raise TypeError("inconsistent ELF types") def get_memories(self): return self.__memories def arch(self): return self.__arch def xlen(self): return self.__xlen def text(self): return self.__text class DumpLogFile: def __init__(self, logfile): self.logfile = logfile self.registers = [] self.__memories = list() self.reg_table = dict() self.reg_len = 32 def _init_register(self): # registers list should be able to hold the max index self.registers = [b"x"] * (max(self.reg_table.values()) + 1) def _parse_register(self, line): line = str_get_after(line, "up_dump_register:") if line is None: return False line = line.strip() # find register value find_res = re.findall(r"(?P\w+): (?P[0-9a-fA-F]+)", line) for reg_name, reg_val in find_res: if reg_name in self.reg_table: reg_index = self.reg_table[reg_name] self.registers[reg_index] = int(reg_val, 16) self.reg_len = max(self.reg_len, len(reg_val) * 4) return True def _parse_fix_register(self, arch): if arch in reg_fix_value: for reg_name, reg_vals in reg_fix_value[arch].items(): reg_index = self.reg_table[reg_name] self.registers[reg_index] = reg_vals[0] def _parse_stack(self, line, start, data): line = str_get_after(line, "stack_dump:") if line is None: return None line = line.strip() # find stack-dump match_res = re.match(r"(?P0x\w+): (?P( ?\w+)+)", line) if match_res is None: return None addr_start = int(match_res.groupdict()["ADDR_START"], 16) if start + len(data) != addr_start: # stack is not contiguous if len(data) == 0: start = addr_start else: self.__memories.append(pack_memory(start, start + len(data), data)) data = b"" start = addr_start reg_fmt = " unknown value # Send in "xxxxxxxx" pkt += b"x" * self.reg_digits self.put_gdb_packet(pkt) if not self.threadinfo: put_register_packet(self.registers) else: for thread in self.threadinfo: if thread["tcb"]["pid"] == self.current_thread: if thread["tcb"]["tcbptr"] in self.running_tasks.keys(): put_register_packet(self.running_tasks[thread["tcb"]["tcbptr"]]) else: put_register_packet(thread["gdb_regs"]) break def handle_register_single_read_packet(self, pkt): logger.debug(f"pkt: {pkt}") def put_one_register_packet(regs): regval = None reg = int(pkt[1:].decode("utf8"), 16) if self.regfix: for reg_name, reg_vals in reg_fix_value[self.arch].items(): if reg == reg_vals[1]: logger.debug(f"{reg_name} fix to {reg_vals[0]}") regval = reg_vals[0] if regval is None: # tcbinfo index to gdb index reg_gdb_index = list(reg_table[self.arch].values()) if reg in reg_gdb_index: reg = reg_gdb_index.index(reg) regval = regs[reg] elif reg < len(regs) and regs[reg] != b"x": regval = regs[reg] if regval is not None: bval = struct.pack(self.reg_fmt, regval) self.put_gdb_packet(binascii.hexlify(bval)) else: self.put_gdb_packet(b"x" * self.reg_digits) if not self.threadinfo: put_one_register_packet(self.registers) else: for thread in self.threadinfo: if thread["tcb"]["pid"] == self.current_thread: if thread["tcb"]["tcbptr"] in self.running_tasks.keys(): put_one_register_packet( self.running_tasks[thread["tcb"]["tcbptr"]] ) else: put_one_register_packet(thread["gdb_regs"]) break def handle_register_group_write_packet(self): # the 'G' packet for writing to a group of registers # # We don't support writing so return error self.put_gdb_packet(b"E01") def handle_register_single_write_packet(self, pkt): # the 'P' packet for writing to registers index, value = pkt[1:].split(b"=") reg_val = 0 for i in range(0, len(value), 2): data = value[i : i + 2] reg_val = reg_val + (int(data.decode("utf8"), 16) << (i * 4)) reg = int(index.decode("utf8"), 16) if reg < len(self.registers): self.registers[reg] = reg_val self.put_gdb_packet(b"OK") def get_mem_region(self, addr): for mem in self.mem_regions: if mem["start"] <= addr < mem["end"]: return mem return None def handle_memory_read_packet(self, pkt): # the 'm' packet for reading memory: m, # extract address and length from packet # and convert them into usable integer values addr, length = pkt[1:].split(b",") s_addr = int(addr, 16) length = int(length, 16) remaining = length addr = s_addr barray = b"" r = self.get_mem_region(addr) while remaining > 0: if r is None: barray = None break offset = addr - r["start"] barray += r["data"][offset : offset + 1] addr += 1 remaining -= 1 if barray is not None: pkt = binascii.hexlify(barray) self.put_gdb_packet(pkt) else: self.put_gdb_packet(b"E01") def handle_memory_write_packet(self, pkt): # the 'M' packet for writing to memory # # We don't support writing so return error self.put_gdb_packet(b"E02") def handle_is_thread_active(self, pkt): self.current_thread = int(pkt[1:]) - 1 self.put_gdb_packet(b"OK") def handle_thread_context(self, pkt): if b"g" == pkt[1:2]: self.current_thread = int(pkt[2:]) - 1 elif b"c" == pkt[1:2]: self.current_thread = int(pkt[3:]) - 1 if self.current_thread == -1: self.current_thread = 0 self.put_gdb_packet(b"OK") def parse_thread(self): def unpack_data(addr, size, fmt): r = self.get_mem_region(addr) offset = addr - r["start"] data = r["data"][offset : offset + size] return struct.unpack(fmt, data) TCBINFO_FMT = "<8HQ" # uint16_t pid_off; /* Offset of tcb.pid */ # uint16_t state_off; /* Offset of tcb.task_state */ # uint16_t pri_off; /* Offset of tcb.sched_priority */ # uint16_t name_off; /* Offset of tcb.name */ # uint16_t stack_off; /* Offset of tcb.stack_alloc_ptr */ # uint16_t stack_size_off; /* Offset of tcb.adj_stack_size */ # uint16_t regs_off; /* Offset of tcb.regs */ # uint16_t regs_num; /* Num of general regs */ # union # { # uint8_t u[8]; # FAR const uint16_t *p; # } unpacked_data = unpack_data( self.elffile.symbol["g_tcbinfo"]["st_value"], self.elffile.symbol["g_tcbinfo"]["st_size"], TCBINFO_FMT, ) tcbinfo = { "pid_off": int(unpacked_data[0]), "state_off": int(unpacked_data[1]), "pri_off": int(unpacked_data[2]), "name_off": int(unpacked_data[3]), "stack_off": int(unpacked_data[4]), "stack_size_off": int(unpacked_data[5]), "regs_off": int(unpacked_data[6]), "regs_num": int(unpacked_data[7]), "reg_off": int(unpacked_data[8]), } unpacked_data = unpack_data( self.elffile.symbol["g_npidhash"]["st_value"], self.elffile.symbol["g_npidhash"]["st_size"], "