| #!/usr/bin/python3 |
| |
| import argparse |
| import sys |
| import socket |
| |
| # https://github.com/ntfreak/openocd/blob/master/doc/manual/jtag/drivers/remote_bitbang.txt |
| |
| class Tap: |
| def __init__(self, port): |
| self.port = port |
| self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.socket.connect(("localhost", port)) |
| |
| def execute(self, commands): |
| sent = self.socket.send(commands) |
| assert len(commands) == sent |
| read_count = 0 |
| for command in commands: |
| if command == ord('R'): |
| read_count += 1 |
| result = b"" |
| while len(result) < read_count: |
| result += self.socket.recv(read_count - len(result)) |
| assert len(result) == read_count |
| return result |
| |
| class Chain: |
| def __init__(self, debug=False): |
| self.debug = debug |
| self.taps = [] |
| |
| def append(self, tap): |
| self.taps.append(tap) |
| |
| def execute(self, commands): |
| values = [] |
| for i, tap in enumerate(self.taps): |
| tmp_commands = [] |
| for command in commands: |
| if i > 0 and ord('0') <= command <= ord('7'): |
| # Replace TDI with the value from the previous TAP. |
| v = values.pop(0) |
| command &= 0xfe |
| if v == ord('1'): |
| command |= 1 |
| |
| if i < len(self.taps) - 1: |
| if command != ord('R'): |
| tmp_commands.append(command) |
| if ord('0') <= command <= ord('7'): |
| # Read TDO before every scan. |
| tmp_commands.append(ord('R')) |
| else: |
| tmp_commands.append(command) |
| assert len(values) == 0 |
| values = list(tap.execute(bytes(tmp_commands))) |
| if self.debug: |
| sys.stdout.write(" %d %r -> %r\n" % (i, bytes(tmp_commands), |
| bytes(values))) |
| return bytes(values) |
| |
| def main(): |
| parser = argparse.ArgumentParser( |
| description='Combine multiple remote_bitbang processes into a ' |
| 'single scan-chain.') |
| parser.add_argument("listen_port", type=int, |
| help="port to listen on") |
| parser.add_argument("tap_port", nargs="+", type=int, |
| help="port of a remote_bitbang TAP to connect to") |
| parser.add_argument("--debug", action='store_true', |
| help="Print out debug messages.") |
| args = parser.parse_args() |
| |
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| server.bind(("localhost", args.listen_port)) |
| server.listen(1) |
| |
| chain = Chain(args.debug) |
| for port in args.tap_port: |
| chain.append(Tap(port)) |
| |
| sys.stdout.write("Listening on port %d.\n" % server.getsockname()[1]) |
| sys.stdout.flush() |
| |
| while True: |
| (client, _) = server.accept() |
| |
| while True: |
| try: |
| commands = client.recv(4096) |
| except (ConnectionResetError, OSError): |
| sys.stdout.write("Client disconnected due to exception.\n") |
| break |
| |
| if len(commands) == 0: |
| sys.stdout.write("Client disconnected.\n") |
| break |
| |
| if args.debug: |
| sys.stdout.write("%r\n" % commands) |
| result = chain.execute(commands) |
| if args.debug: |
| sys.stdout.write(" -> %r\n" % result) |
| client.send(result) |
| |
| client.close() |
| sys.stdout.flush() |
| |
| if __name__ == '__main__': |
| sys.exit(main()) |