blob: b97b607c3f91e2ea85e677aa856e7cc674a210e0 [file] [log] [blame]
import collections
import os
import os.path
import random
import re
import shlex
import subprocess
import sys
import tempfile
import time
import traceback
import pipes
import pexpect
print_log_names = False
real_stdout = sys.stdout
# Note that gdb comes with its own testsuite. I was unable to figure out how to
# run that testsuite against the spike simulator.
def find_file(path):
for directory in (os.getcwd(), os.path.dirname(__file__)):
fullpath = os.path.join(directory, path)
relpath = os.path.relpath(fullpath)
if len(relpath) >= len(fullpath):
relpath = fullpath
if os.path.exists(relpath):
return relpath
return None
class CompileError(Exception):
def __init__(self, stdout, stderr):
super().__init__()
self.stdout = stdout
self.stderr = stderr
gcc_cmd = None
def compile(args): # pylint: disable=redefined-builtin
if gcc_cmd:
cmd = [gcc_cmd]
else:
cmd = ["riscv64-unknown-elf-gcc"]
cmd.append("-g")
for arg in args:
found = find_file(arg)
if found:
cmd.append(found)
else:
cmd.append(arg)
header("Compile")
print("+", " ".join(cmd))
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode:
print(stdout.decode('ascii'), end=" ")
print(stderr.decode('ascii'), end=" ")
header("")
raise CompileError(stdout, stderr)
class Spike:
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-locals
def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
isa=None, progbufsize=None, dmi_rti=None, abstract_rti=None,
support_hasel=True, support_abstract_csr=True,
support_haltgroups=True, vlen=128, elen=64, harts=None):
"""Launch spike. Return tuple of its process and the port it's running
on."""
self.process = None
self.isa = isa
self.progbufsize = progbufsize
self.dmi_rti = dmi_rti
self.abstract_rti = abstract_rti
self.support_abstract_csr = support_abstract_csr
self.support_hasel = support_hasel
self.support_haltgroups = support_haltgroups
self.vlen = vlen
self.elen = elen
self.harts = harts or target.harts or [target]
cmd = self.command(target, halted, timeout, with_jtag_gdb)
self.infinite_loop = target.compile(self.harts[0],
"programs/checksum.c", "programs/tiny-malloc.c",
"programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
cmd.append(self.infinite_loop)
self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
suffix=".log")
logname = self.logfile.name
self.lognames = [logname]
if print_log_names:
real_stdout.write("Temporary spike log: %s\n" % logname)
self.logfile.write(("+ %s\n" % " ".join(cmd)).encode())
self.logfile.flush()
self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=self.logfile, stderr=self.logfile)
if with_jtag_gdb:
self.port = None
for _ in range(30):
m = re.search(r"Listening for remote bitbang connection on "
r"port (\d+).", open(logname).read())
if m:
self.port = int(m.group(1))
os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
break
time.sleep(0.11)
if not self.port:
print_log(logname)
raise Exception("Didn't get spike message about bitbang "
"connection")
# pylint: disable=too-many-branches
def command(self, target, halted, timeout, with_jtag_gdb):
# pylint: disable=no-self-use
if target.sim_cmd:
cmd = shlex.split(target.sim_cmd)
else:
cmd = ["spike"]
cmd += ["-p%d" % len(self.harts)]
assert len(set(t.xlen for t in self.harts)) == 1, \
"All spike harts must have the same XLEN"
if self.isa:
isa = self.isa
else:
isa = "RV%dG" % self.harts[0].xlen
cmd += ["--isa", isa]
cmd += ["--dm-auth"]
if not self.progbufsize is None:
cmd += ["--dm-progsize", str(self.progbufsize)]
cmd += ["--dm-sba", "64"]
if not self.dmi_rti is None:
cmd += ["--dmi-rti", str(self.dmi_rti)]
if not self.abstract_rti is None:
cmd += ["--dm-abstract-rti", str(self.abstract_rti)]
if not self.support_abstract_csr:
cmd.append("--dm-no-abstract-csr")
if not self.support_hasel:
cmd.append("--dm-no-hasel")
if not self.support_haltgroups:
cmd.append("--dm-no-halt-groups")
if 'V' in isa[2:]:
cmd.append("--varch=vlen:%d,elen:%d" % (self.vlen, self.elen))
assert len(set(t.ram for t in self.harts)) == 1, \
"All spike harts must have the same RAM layout"
assert len(set(t.ram_size for t in self.harts)) == 1, \
"All spike harts must have the same RAM layout"
os.environ['WORK_AREA'] = '0x%x' % self.harts[0].ram
cmd += ["-m0x%x:0x%x" % (self.harts[0].ram, self.harts[0].ram_size)]
if timeout:
cmd = ["timeout", str(timeout)] + cmd
if halted:
cmd.append('-H')
if with_jtag_gdb:
cmd += ['--rbb-port', '0']
os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
return cmd
def __del__(self):
if self.process:
try:
self.process.kill()
self.process.wait()
except OSError:
pass
def wait(self, *args, **kwargs):
return self.process.wait(*args, **kwargs)
class MultiSpike:
def __init__(self, spikes):
self.process = None
self.spikes = spikes
self.lognames = sum((spike.lognames for spike in spikes), [])
self.logfile = tempfile.NamedTemporaryFile(prefix="daisychain-",
suffix=".log")
self.lognames.append(self.logfile.name)
# Now create the daisy-chain process.
cmd = ["./rbb_daisychain.py", "0"] + \
[str(spike.port) for spike in spikes]
self.logfile.write(("+ %s\n" % cmd).encode())
self.logfile.flush()
self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=self.logfile, stderr=self.logfile)
self.port = None
for _ in range(30):
m = re.search(r"Listening on port (\d+).",
open(self.lognames[-1]).read())
if m:
self.port = int(m.group(1))
break
time.sleep(0.11)
if not self.port:
print_log(self.lognames[-1])
raise Exception("Didn't get daisy chain message about which port "
"it's listening on.")
os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
os.environ['REMOTE_BITBANG_PORT'] = str(self.port)
def __del__(self):
if self.process:
try:
self.process.kill()
self.process.wait()
except OSError:
pass
class VcsSim:
logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
logname = logfile.name
def __init__(self, sim_cmd=None, debug=False, timeout=300):
if sim_cmd:
cmd = shlex.split(sim_cmd)
else:
cmd = ["simv"]
cmd += ["+jtag_vpi_enable"]
if debug:
cmd[0] = cmd[0] + "-debug"
cmd += ["+vcdplusfile=output/gdbserver.vpd"]
logfile = open(self.logname, "w")
if print_log_names:
real_stdout.write("Temporary VCS log: %s\n" % self.logname)
logfile.write("+ %s\n" % " ".join(cmd))
logfile.flush()
listenfile = open(self.logname, "r")
listenfile.seek(0, 2)
self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=logfile, stderr=logfile)
done = False
start = time.time()
while not done:
# Fail if VCS exits early
exit_code = self.process.poll()
if exit_code is not None:
raise RuntimeError('VCS simulator exited early with status %d'
% exit_code)
line = listenfile.readline()
if not line:
time.sleep(1)
match = re.match(r"^Listening on port (\d+)$", line)
if match:
done = True
self.port = int(match.group(1))
os.environ['JTAG_VPI_PORT'] = str(self.port)
if (time.time() - start) > timeout:
raise Exception("Timed out waiting for VCS to listen for JTAG "
"vpi")
def __del__(self):
try:
self.process.kill()
self.process.wait()
except OSError:
pass
class Openocd:
logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
logname = logfile.name
def __init__(self, server_cmd=None, config=None, debug=False, timeout=60,
freertos=False):
self.timeout = timeout
if server_cmd:
cmd = shlex.split(server_cmd)
else:
cmd = ["openocd"]
if debug:
cmd.append("-d")
# This command needs to come before any config scripts on the command
# line, since they are executed in order.
cmd += [
# Tell OpenOCD to bind gdb to an unused, ephemeral port.
"--command",
"gdb_port 0",
# Disable tcl and telnet servers, since they are unused and because
# the port numbers will conflict if multiple OpenOCD processes are
# running on the same server.
"--command",
"tcl_port disabled",
"--command",
"telnet_port disabled",
]
if config:
self.config_file = find_file(config)
if self.config_file is None:
print("Unable to read file", config)
sys.exit(1)
cmd += ["-f", self.config_file]
if debug:
cmd.append("-d")
extra_env = {}
if freertos:
extra_env['USE_FREERTOS'] = "1"
else:
extra_env['USE_FREERTOS'] = "0"
raw_logfile = open(Openocd.logname, "wb")
try:
spike_dasm = subprocess.Popen("spike-dasm", stdin=subprocess.PIPE,
stdout=raw_logfile, stderr=raw_logfile)
logfile = spike_dasm.stdin
except FileNotFoundError:
logfile = raw_logfile
if print_log_names:
real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT",
"WORK_AREA")
env_entries = [key for key in env_entries if key in os.environ]
parts = [
" ".join("%s=%s" % (key, os.environ[key]) for key in env_entries),
" ".join("%s=%s" % (k, v) for k, v in extra_env.items()),
" ".join(map(pipes.quote, cmd))
]
logfile.write(("+ %s\n" % " ".join(parts)).encode())
logfile.flush()
self.gdb_ports = []
self.process = self.start(cmd, logfile, extra_env)
def start(self, cmd, logfile, extra_env):
combined_env = {**os.environ, **extra_env}
process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=logfile, stderr=logfile, env=combined_env)
try:
# Wait for OpenOCD to have made it through riscv_examine(). When
# using OpenOCD to communicate with a simulator this may take a
# long time, and gdb will time out when trying to connect if we
# attempt too early.
start = time.time()
messaged = False
fd = open(Openocd.logname, "r")
while True:
line = fd.readline()
if not line:
if not process.poll() is None:
raise Exception("OpenOCD exited early.")
time.sleep(0.1)
continue
m = re.search(r"Listening on port (\d+) for gdb connections",
line)
if m:
self.gdb_ports.append(int(m.group(1)))
if "telnet server disabled" in line:
return process
if not messaged and time.time() - start > 1:
messaged = True
print("Waiting for OpenOCD to start...")
if (time.time() - start) > self.timeout:
raise Exception("Timed out waiting for OpenOCD to "
"listen for gdb")
except Exception:
print_log(Openocd.logname)
raise
def __del__(self):
try:
self.process.terminate()
start = time.time()
while time.time() < start + 10:
if self.process.poll():
break
else:
self.process.kill()
except (OSError, AttributeError):
pass
def smp(self):
"""Return true iff OpenOCD internally sees the harts as part of an SMP
group."""
for line in open(self.config_file, "r"):
if "target smp" in line:
return True
return False
class OpenocdCli:
def __init__(self, port=4444):
self.child = pexpect.spawn(
"sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
self.child.expect("> ")
def command(self, cmd):
self.child.sendline(cmd)
self.child.expect(cmd)
self.child.expect("\n")
self.child.expect("> ")
return self.child.before.strip("\t\r\n \0").decode("utf-8")
def reg(self, reg=''):
output = self.command("reg %s" % reg)
matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
values = {r: int(v, 0) for r, v in matches}
if reg:
return values[reg]
return values
def load_image(self, image):
output = self.command("load_image %s" % image)
if 'invalid ELF file, only 32bits files are supported' in output:
raise TestNotApplicable(output)
class CannotAccess(Exception):
def __init__(self, address):
Exception.__init__(self)
self.address = address
class CannotInsertBreakpoint(Exception):
def __init__(self, number):
Exception.__init__(self)
self.number = number
class CouldNotFetch(Exception):
def __init__(self, regname, explanation):
Exception.__init__(self)
self.regname = regname
self.explanation = explanation
class NoSymbol(Exception):
def __init__(self, symbol):
Exception.__init__(self)
self.symbol = symbol
def __repr__(self):
return "NoSymbol(%r)" % self.symbol
Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
'name', 'frame'))
class Repeat:
def __init__(self, count):
self.count = count
def tokenize(text):
index = 0
while index < len(text):
for regex, fn in (
(r"[\s]+", lambda m: None),
(r"[,{}=]", lambda m: m.group(0)),
(r"0x[\da-fA-F]+", lambda m: int(m.group(0)[2:], 16)),
(r"-?\d*\.\d+(e[-+]\d+)?", lambda m: float(m.group(0))),
(r"-?\d+", lambda m: int(m.group(0))),
(r"-?nan\(0x[a-f0-9]+\)", lambda m: float("nan")),
(r"<repeats (\d+) times>", lambda m: Repeat(int(m.group(1)))),
(r"Could not fetch register \"(\w+)\"; (.*)$",
lambda m: CouldNotFetch(m.group(1), m.group(2))),
(r"Cannot access memory at address (0x[0-9a-f]+)",
lambda m: CannotAccess(int(m.group(1), 0))),
(r"Cannot insert breakpoint (\d+).",
lambda m: CannotInsertBreakpoint(int(m.group(1)))),
(r'No symbol "(\w+)" in current context.',
lambda m: NoSymbol(m.group(1))),
(r'"([^"]*)"', lambda m: m.group(1)),
(r"[a-zA-Z][a-zA-Z\d]*", lambda m: m.group(0)),
):
m = re.match(regex, text[index:])
if m:
index += len(m.group(0))
token = fn(m)
if not token is None:
yield token
break
else:
raise Exception(repr(text[index:]))
def parse_dict(tokens):
assert tokens[0] == "{"
tokens.pop(0)
result = {}
while True:
key = tokens.pop(0)
assert tokens.pop(0) == "="
value = parse_tokens(tokens)
result[key] = value
token = tokens.pop(0)
if token == "}":
return result
assert token == ","
def parse_list(tokens):
assert tokens[0] == "{"
tokens.pop(0)
result = []
while True:
result.append(tokens.pop(0))
token = tokens.pop(0)
if isinstance(token, Repeat):
result += [result[-1]] * (token.count - 1)
token = tokens.pop(0)
if token == "}":
return result
assert token == ","
def parse_dict_or_list(tokens):
assert tokens[0] == "{"
if tokens[2] == "=":
return parse_dict(tokens)
else:
return parse_list(tokens)
def parse_tokens(tokens):
if isinstance(tokens[0], Exception):
raise tokens[0]
if isinstance(tokens[0], (float, int)):
return tokens.pop(0)
if tokens[0] == "{":
return parse_dict_or_list(tokens)
if isinstance(tokens[0], str):
return tokens.pop(0)
raise Exception("Unsupported tokens: %r" % tokens)
def parse_rhs(text):
tokens = list(tokenize(text))
result = parse_tokens(tokens)
if tokens:
raise Exception("Unexpected input: %r" % tokens)
return result
class Gdb:
"""A single gdb class which can interact with one or more gdb instances."""
# pylint: disable=too-many-public-methods
# pylint: disable=too-many-instance-attributes
reset_delays = (127, 181, 17, 13, 83, 151, 31, 67, 131, 167, 23, 41, 61,
11, 149, 107, 163, 73, 47, 43, 173, 7, 109, 101, 103, 191, 2, 139,
97, 193, 157, 3, 29, 79, 113, 5, 89, 19, 37, 71, 179, 59, 137, 53)
def __init__(self, target, ports, cmd=None, timeout=60, binaries=None):
assert ports
self.target = target
self.ports = ports
self.cmd = cmd or "riscv64-unknown-elf-gdb"
self.timeout = timeout
self.binaries = binaries or [None] * len(ports)
self.reset_delay_index = 0
self.stack = []
self.harts = {}
self.logfiles = []
self.children = []
for port in ports:
logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
suffix=".log")
self.logfiles.append(logfile)
if print_log_names:
real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
child = pexpect.spawn(self.cmd)
child.logfile = logfile
child.logfile.write(("+ %s\n" % self.cmd).encode())
self.children.append(child)
self.active_child = self.children[0]
def connect(self):
for port, child, binary in zip(self.ports, self.children,
self.binaries):
self.select_child(child)
self.wait()
self.command("set style enabled off", reset_delays=None)
self.command("set confirm off", reset_delays=None)
self.command("set width 0", reset_delays=None)
self.command("set height 0", reset_delays=None)
# Force consistency.
self.command("set print entry-values no", reset_delays=None)
self.command("set remotetimeout %d" % self.timeout,
reset_delays=None)
self.command("target extended-remote localhost:%d" % port, ops=10,
reset_delays=None)
if binary:
output = self.command("file %s" % binary)
assertIn("Reading symbols", output)
threads = self.threads()
for t in threads:
hartid = None
if t.name:
m = re.search(r"Hart (\d+)", t.name)
if m:
hartid = int(m.group(1))
if hartid is None:
if self.harts:
hartid = max(self.harts) + 1
else:
hartid = 0
# solo: True iff this is the only thread on this child
self.harts[hartid] = {'child': child,
'thread': t,
'solo': len(threads) == 1}
def __del__(self):
for child in self.children:
del child
def one_hart_per_gdb(self):
return all(h['solo'] for h in self.harts.values())
def lognames(self):
return [logfile.name for logfile in self.logfiles]
def select_child(self, child):
self.active_child = child
def select_hart(self, hart):
h = self.harts[hart.id]
self.select_child(h['child'])
if not h['solo']:
output = self.command("thread %s" % h['thread'].id, ops=5)
assert "Unknown" not in output
def push_state(self):
self.stack.append({
'active_child': self.active_child
})
def pop_state(self):
state = self.stack.pop()
self.active_child = state['active_child']
def wait(self):
"""Wait for prompt."""
self.active_child.expect(r"\(gdb\)")
def command(self, command, ops=1, reset_delays=0):
"""ops is the estimated number of operations gdb will have to perform
to perform this command. It is used to compute a timeout based on
self.timeout."""
if not reset_delays is None:
if reset_delays == 0:
reset_delays = self.reset_delays[self.reset_delay_index]
self.reset_delay_index = (self.reset_delay_index + 1) % \
len(self.reset_delays)
self.command("monitor riscv reset_delays %d" % reset_delays,
reset_delays=None)
timeout = max(1, ops) * self.timeout
self.active_child.sendline(command)
self.active_child.expect("\n", timeout=timeout)
self.active_child.expect(r"\(gdb\)", timeout=timeout)
return self.active_child.before.strip().decode("utf-8", errors="ignore")
def global_command(self, command):
"""Execute this command on every gdb that we control."""
with PrivateState(self):
for child in self.children:
self.select_child(child)
self.command(command)
def system_command(self, command, ops=20):
"""Execute this command on every unique system that we control."""
done = set()
output = ""
with PrivateState(self):
for i, child in enumerate(self.children):
self.select_child(child)
if self.target.harts[i].system in done:
self.command("set $pc=_start")
else:
output += self.command(command, ops=ops)
done.add(self.target.harts[i].system)
return output
def c(self, wait=True, sync=True, checkOutput=True, ops=20):
"""
Dumb c command.
In RTOS mode, gdb will resume all harts.
In multi-gdb mode, this command will just go to the current gdb, so
will only resume one hart.
"""
if sync:
sync = ""
else:
sync = "&"
if wait:
output = self.command("c%s" % sync, ops=ops)
if checkOutput:
assert "Continuing" in output
assert "Could not insert hardware" not in output
return output
else:
self.active_child.sendline("c%s" % sync)
self.active_child.expect("Continuing", timeout=ops * self.timeout)
return ""
def c_all(self, wait=True):
"""
Resume every hart.
This function works fine when using multiple gdb sessions, but the
caller must be careful when using it nonetheless. gdb's behavior is to
not set breakpoints until just before the hart is resumed, and then
clears them as soon as the hart halts. That means that you can't set
one software breakpoint, and expect multiple harts to hit it. It's
possible that the first hart completes set/run/halt/clear before the
second hart even gets to resume, so it will never hit the breakpoint.
"""
with PrivateState(self):
for child in self.children:
child.sendline("c")
child.expect("Continuing")
if wait:
for child in self.children:
child.expect(r"\(gdb\)")
def interrupt(self, ops=1):
self.active_child.send("\003")
self.active_child.expect(r"\(gdb\)", timeout=self.timeout * ops)
return self.active_child.before.strip().decode()
def interrupt_all(self):
for child in self.children:
self.select_child(child)
self.interrupt()
def x(self, address, size='w', count=1):
output = self.command("x/%d%s %s" % (count, size, address),
ops=count / 16)
values = []
for line in output.splitlines():
for value in line.split(':')[1].strip().split():
values.append(int(value, 0))
if len(values) == 1:
return values[0]
return values
def p_raw(self, obj):
output = self.command("p %s" % obj)
m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
if m:
raise CannotAccess(int(m.group(1), 0))
return output.split('=', 1)[-1].strip()
def p(self, obj, fmt="/x", ops=1):
output = self.command("p%s %s" % (fmt, obj), ops=ops).splitlines()[-1]
rhs = output.split('=', 1)[-1]
return parse_rhs(rhs)
def p_fpr(self, obj, ops=1):
result = self.p(obj, fmt="", ops=ops)
if isinstance(result, dict):
return result['double']
return result
def p_string(self, obj):
output = self.command("p %s" % obj)
value = shlex.split(output.split('=')[-1].strip())[1]
return value
def info_registers(self, group="", ops=5):
output = self.command("info registers %s" % group, ops=ops)
result = {}
for line in output.splitlines():
m = re.match(r"(\w+)\s+({.*})(?:\s+(\(.*\)))?", line)
if m:
parts = m.groups()
else:
parts = line.split()
name = parts[0]
if "Could not fetch" in line:
result[name] = " ".join(parts[1:])
else:
result[name] = parse_rhs(parts[1])
return result
def stepi(self):
output = self.command("stepi", ops=10)
return output
def load(self):
output = self.system_command("load", ops=1000)
assert "failed" not in output
assert "Transfer rate" in output
output = self.system_command("compare-sections", ops=1000)
assert "matched" in output
assert "MIS" not in output
def b(self, location):
output = self.command("b %s" % location, ops=5)
assert "not defined" not in output
assert "Breakpoint" in output
m = re.search(r"Breakpoint (\d+),? ", output)
assert m, output
return int(m.group(1))
def hbreak(self, location):
output = self.command("hbreak %s" % location, ops=5)
assert "not defined" not in output
assert "Hardware assisted breakpoint" in output
return output
def watch(self, expr):
output = self.command("watch %s" % expr, ops=5)
assert "not defined" not in output
assert "atchpoint" in output
return output
def swatch(self, expr):
self.command("show can-use-hw-watchpoints")
self.command("set can-use-hw-watchpoints 0")
output = self.command("watch %s" % expr, ops=5)
assert "not defined" not in output
assert "atchpoint" in output
self.command("set can-use-hw-watchpoints 1")
return output
def threads(self):
output = self.command("info threads", ops=100)
threads = []
for line in output.splitlines():
m = re.match(
r"[\s\*]*(\d+)\s*"
r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
r"\s*(.*)", line)
if m:
threads.append(Thread(*m.groups()))
assert threads
return threads
def thread(self, thread):
return self.command("thread %s" % thread.id)
def where(self):
return self.command("where 1")
class PrivateState:
def __init__(self, gdb):
self.gdb = gdb
def __enter__(self):
self.gdb.push_state()
def __exit__(self, _type, _value, _traceback):
self.gdb.pop_state()
def run_all_tests(module, target, parsed):
todo = []
for name in dir(module):
definition = getattr(module, name)
if isinstance(definition, type) and hasattr(definition, 'test') and \
(not parsed.test or any(test in name for test in parsed.test)):
todo.append((name, definition, None))
if parsed.list_tests:
for name, definition, hart in todo:
print(name)
return 0
try:
os.makedirs(parsed.logs)
except OSError:
# There's a race where multiple instances of the test program might
# decide to create the logs directory at the same time.
pass
overall_start = time.time()
global gdb_cmd # pylint: disable=global-statement
gdb_cmd = parsed.gdb
global gcc_cmd # pylint: disable=global-statement
gcc_cmd = parsed.gcc
examine_added = False
for hart in target.harts:
if parsed.misaval:
hart.misa = int(parsed.misaval, 16)
print("Using $misa from command line: 0x%x" % hart.misa)
elif hart.misa:
print("Using $misa from hart definition: 0x%x" % hart.misa)
elif not examine_added:
todo.insert(0, ("ExamineTarget", ExamineTarget, None))
examine_added = True
results, count = run_tests(parsed, target, todo)
header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
dash=':')
return print_results(results)
good_results = set(('pass', 'not_applicable'))
def run_tests(parsed, target, todo):
results = {}
count = 0
for name, definition, hart in todo:
log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
(time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
log_fd = open(log_name, 'w')
print("[%s] Starting > %s" % (name, log_name))
instance = definition(target, hart)
sys.stdout.flush()
log_fd.write("Test: %s\n" % name)
log_fd.write("Target: %s\n" % type(target).__name__)
start = time.time()
global real_stdout # pylint: disable=global-statement
real_stdout = sys.stdout
sys.stdout = log_fd
try:
result = instance.run()
log_fd.write("Result: %s\n" % result)
log_fd.write("Logfile: %s\n" % log_name)
log_fd.write("Reproduce: %s %s %s\n" % (sys.argv[0], parsed.target,
name))
finally:
sys.stdout = real_stdout
log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
log_fd.flush()
print("[%s] %s in %.2fs" % (name, result, time.time() - start))
if result not in good_results and parsed.print_failures:
sys.stdout.write(open(log_name).read())
sys.stdout.flush()
results.setdefault(result, []).append((name, log_name))
count += 1
if result not in good_results and parsed.fail_fast:
break
return results, count
def print_results(results):
result = 0
for key, value in results.items():
print("%d tests returned %s" % (len(value), key))
if key not in good_results:
result = 1
for name, log_name in value:
print(" %s > %s" % (name, log_name))
return result
def add_test_run_options(parser):
parser.add_argument("--logs", default="logs",
help="Store logs in the specified directory.")
parser.add_argument("--fail-fast", "-f", action="store_true",
help="Exit as soon as any test fails.")
parser.add_argument("--print-failures", action="store_true",
help="When a test fails, print the log file to stdout.")
parser.add_argument("--print-log-names", "--pln", action="store_true",
help="Print names of temporary log files as soon as they are "
"created.")
parser.add_argument("--list-tests", action="store_true",
help="Print out a list of tests, and exit immediately.")
parser.add_argument("test", nargs='*',
help="Run only tests that are named here.")
parser.add_argument("--gcc",
help="The command to use to start gcc.")
parser.add_argument("--gdb",
help="The command to use to start gdb.")
parser.add_argument("--misaval",
help="Don't run ExamineTarget, just assume the misa value which is "
"specified.")
def header(title, dash='-', length=78):
if title:
dashes = dash * (length - 4 - len(title))
before = dashes[:len(dashes)//2]
after = dashes[len(dashes)//2:]
print("%s[ %s ]%s" % (before, title, after))
else:
print(dash * length)
def print_log_handle(name, handle):
header(name)
for l in handle:
sys.stdout.write(l)
print()
def print_log(path):
print_log_handle(path, open(path, "r", errors='ignore'))
class BaseTest:
# pylint: disable=too-many-instance-attributes
compiled = {}
def __init__(self, target, hart=None):
self.target = target
if hart:
self.hart = hart
else:
self.hart = random.choice(target.harts)
self.server = None
self.target_process = None
self.binary = None
self.start = 0
self.logs = []
self.binaries = []
def early_applicable(self):
"""Return a false value if the test has determined it cannot run
without ever needing to talk to the target or server."""
# pylint: disable=no-self-use
return True
def freertos(self):
"""Return a true value if the test is running a FreeRTOS binary where
the debugger should expose FreeRTOS threads to gdb."""
# pylint: disable=no-self-use
return False
def setup(self):
pass
def compile(self):
compile_args = getattr(self, 'compile_args', None)
self.binaries = []
if compile_args:
for hart in self.target.harts:
key = (compile_args, hart.misa)
if key not in BaseTest.compiled:
BaseTest.compiled[key] = \
self.target.compile(hart, *compile_args)
self.binaries.append(BaseTest.compiled.get(key))
def classSetup(self):
self.compile()
self.target_process = self.target.create()
if self.target_process:
self.logs += self.target_process.lognames
try:
self.server = self.target.server(self)
self.logs.append(self.server.logname)
except Exception:
for log in self.logs:
print_log(log)
raise
def classTeardown(self):
del self.server
del self.target_process
def postMortem(self):
pass
def run(self):
"""
If compile_args is set, compile a program and set self.binaries.
Call setup().
Then call test() and return the result, displaying relevant information
if an exception is raised.
"""
sys.stdout.flush()
if self.__class__.__name__ in self.target.skip_tests or \
not self.early_applicable():
return "not_applicable"
self.start = time.time()
try:
self.classSetup()
self.setup()
result = self.test() # pylint: disable=no-member
except TestNotApplicable:
result = "not_applicable"
except Exception as e: # pylint: disable=broad-except
if isinstance(e, TestFailed):
result = "fail"
else:
result = "exception"
if isinstance(e, TestFailed):
# pylint: disable=no-member
header("Message")
print(e.message)
header("Traceback")
traceback.print_exc(file=sys.stdout)
try:
self.postMortem()
except Exception as e: # pylint: disable=broad-except
header("postMortem Exception")
print(e)
traceback.print_exc(file=sys.stdout)
return result
finally:
# Get handles to logs before the files are deleted.
logs = []
for log in self.logs:
logs.append((log, open(log, "r", errors='ignore')))
self.classTeardown()
for name, handle in logs:
print_log_handle(name, handle)
header("End of logs")
if not result:
result = 'pass'
return result
gdb_cmd = None
class GdbTest(BaseTest):
def __init__(self, target, hart=None):
BaseTest.__init__(self, target, hart=hart)
self.gdb = None
def write_nop_program(self, count):
for i in range(count):
# 0x13 is nop
self.gdb.command("p *((int*) 0x%x)=0x13" % (self.hart.ram + i * 4))
self.gdb.p("$pc=0x%x" % self.hart.ram)
def classSetup(self):
BaseTest.classSetup(self)
self.gdb = Gdb(self.target, self.server.gdb_ports, cmd=gdb_cmd,
timeout=self.target.timeout_sec, binaries=self.binaries)
self.logs += self.gdb.lognames()
self.gdb.connect()
self.gdb.global_command("set remotetimeout %d" %
self.target.timeout_sec)
for cmd in self.target.gdb_setup:
self.gdb.command(cmd)
self.gdb.select_hart(self.hart)
# FIXME: OpenOCD doesn't handle PRIV now
#self.gdb.p("$priv=3")
def postMortem(self):
if not self.gdb:
return
self.gdb.interrupt()
self.gdb.command("info breakpoints", reset_delays=None)
self.gdb.command("disassemble", ops=20, reset_delays=None)
self.gdb.command("info registers all", ops=20, reset_delays=None)
self.gdb.command("flush regs", reset_delays=None)
self.gdb.command("info threads", ops=20, reset_delays=None)
def classTeardown(self):
del self.gdb
BaseTest.classTeardown(self)
def parkOtherHarts(self):
"""Park harts besides the currently selected one in loop_forever()."""
for hart in self.target.harts:
# Park all harts that we're not using in a safe place.
if hart != self.hart:
self.gdb.select_hart(hart)
self.gdb.p("$pc=loop_forever")
self.gdb.select_hart(self.hart)
def disable_pmp(self):
# Disable physical memory protection by allowing U mode access to all
# memory.
try:
self.gdb.p("$pmpcfg0=0xf") # TOR, R, W, X
self.gdb.p("$pmpaddr0=0x%x" %
((self.hart.ram + self.hart.ram_size) >> 2))
except CouldNotFetch:
# PMP registers are optional
pass
class GdbSingleHartTest(GdbTest):
def classSetup(self):
GdbTest.classSetup(self)
self.parkOtherHarts()
class ExamineTarget(GdbTest):
def test(self):
for hart in self.target.harts:
self.gdb.select_hart(hart)
hart.misa = self.gdb.p("$misa")
txt = "RV"
misa_xlen = 0
if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
misa_xlen = 32
elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
misa_xlen = 64
elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
misa_xlen = 128
else:
raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
self.hart.misa)
if misa_xlen != hart.xlen:
raise TestFailed("MISA reported XLEN of %d but we were "\
"expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
txt += ("%d" % misa_xlen)
for i in range(26):
if hart.misa & (1<<i):
txt += chr(i + ord('A'))
print(txt, end=" ")
class TestFailed(Exception):
def __init__(self, message, comment=None):
Exception.__init__(self)
self.message = message
if comment:
self.message += ": %s" % comment
class TestNotApplicable(Exception):
def __init__(self, message=""):
Exception.__init__(self)
self.message = message
def assertEqual(a, b):
if a != b:
raise TestFailed("%r != %r" % (a, b))
def assertNotEqual(a, b, comment=None):
if a == b:
raise TestFailed("%r == %r" % (a, b), comment)
def assertIn(a, b):
if a not in b:
raise TestFailed("%r not in %r" % (a, b))
def assertNotIn(a, b, comment=None):
if a in b:
raise TestFailed("%r in %r" % (a, b), comment)
def assertGreater(a, b):
if not a > b:
raise TestFailed("%r not greater than %r" % (a, b))
def assertLess(a, b, comment=None):
if not a < b:
raise TestFailed("%r not less than %r" % (a, b), comment)
def assertTrue(a):
if not a:
raise TestFailed("%r is not True" % a)
def assertRegex(text, regexp):
if not re.search(regexp, text):
raise TestFailed("can't find %r in %r" % (regexp, text))