| """ |
| /***************************************************************************** |
| |
| Licensed to Accellera Systems Initiative Inc. (Accellera) under one or |
| more contributor license agreements. See the NOTICE file distributed |
| with this work for additional information regarding copyright ownership. |
| Accellera licenses this file to you under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with the |
| License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| implied. See the License for the specific language governing |
| permissions and limitations under the License. |
| |
| *****************************************************************************/ |
| |
| Python Script to test Endianness Conversion Functions for OSCI TLM-2 |
| |
| There is a simple testbench programme in C++ which runs a single |
| transaction through a single conversion function, to a simple target |
| memory and back. This script will run the testbench many times and test for |
| - incomplete execution, seg-faults, etc |
| - distributability of conversion function: each transaction should have |
| the same effect on initiator/target memory as a set of smaller transactions |
| that sum to it |
| - equivalence: all conversion functions should have the same functional |
| effect as each other |
| |
| The approach is to provide the initial state of the initiator and |
| target memory (as strings) and to capture their final states, so that |
| only the effect of the transaction on the data buffers is measured. |
| |
| Script works out for itself which conversion functions are legal for a |
| given transaction and applies all of them. Note that where data is |
| wider than bus, only one conversion function is legal and testing is |
| somewhat limited. |
| |
| Testing space (select a transaction at random from the space): |
| - with and without byte-enables (generated at random for each data word |
| and can be all-zero) |
| - data widths of (1,2,4,8,16) |
| - bus widths of (1,2,4,8,16), lower priority for those smaller than data |
| width |
| - transaction lengths of (1..32) x data width, higher probability for |
| lower values |
| - base address (0..1023) at bus_width steps |
| - offset address (0..bus width) with a higher priority for 0 |
| - address in initiator buffer uniform random |
| - read or write |
| - byte-enable length may be smaller than transasction length |
| - may be a streaming burst |
| |
| Transaction breakdown |
| - individual words (always) |
| - one random breakdown with each segment containing between 1 and N-1 |
| words, where N is the length in words |
| - one breakdown with two segments interleaved, using (additional) byte |
| enables to suppress the part where the other segment is active |
| |
| Data buffer definition: starts at 0, randomly filled |
| with lower case letters and numbers. Size 2 kB. Addresses are limited to |
| 1 kB. |
| """ |
| |
| |
| import random |
| import string |
| |
| |
| class transaction: |
| """ contains read_not_write, address, length, byte_enable, |
| bus_width, data_width, data_pointer, stream_width """ |
| |
| def __init__(self, **a): |
| self.__dict__ = a |
| |
| def __str__(self): |
| if self.read_not_write: |
| a = "R: " |
| else: |
| a = "W: " |
| a += "addr = %d, len = %d, bus = %d, word = %d, data = %d" % ( |
| self.address, |
| self.length, |
| self.bus_width, |
| self.data_width, |
| self.data_pointer, |
| ) |
| if self.byte_enable: |
| a += ", be = " + self.byte_enable |
| else: |
| a += ", be = x" |
| a += ", sw = %d" % (self.stream_width) |
| return a |
| |
| |
| def txn_generator(nr): |
| pr_read = 0.5 |
| pr_byte_enable = 0.5 |
| pr_enabled = 0.5 |
| bus_widths = [1, 2, 4, 8, 16] |
| data_widths = [1, 2, 4, 8, 16] + [1, 2, 4, 8] + [1, 2, 4] + [1, 2] |
| lengths = ( |
| list(range(1, 33)) |
| + list(range(1, 17)) |
| + list(range(1, 9)) |
| + list(range(1, 5)) |
| + list(range(1, 3)) |
| ) |
| pr_short_be = 0.2 |
| pr_stream = 0.1 |
| nr_generated = 0 |
| while nr_generated < nr: |
| # create a random transaction |
| bus_width = random.choice(bus_widths) |
| while True: |
| data_width = random.choice(data_widths) |
| if data_width <= bus_width: |
| break |
| if random.random() < 0.25: |
| break |
| length = random.choice(lengths) |
| addr_base = random.choice(list(range(0, 1024, bus_width))) |
| addr_offset = random.choice( |
| list(range(bus_width)) + [0] * int(bus_width // 2) |
| ) |
| txn = transaction( |
| bus_width=bus_width, |
| data_width=data_width, |
| read_not_write=random.random() < pr_read, |
| length=length * data_width, |
| address=addr_base + addr_offset, |
| byte_enable=False, |
| stream_width=length * data_width, |
| data_pointer=random.randint(0, 1023), |
| ) |
| if random.random() < pr_byte_enable: |
| belen = length |
| if random.random() < pr_short_be: |
| belen = min(random.choice(lengths), length) |
| bep = ["0" * data_width, "1" * data_width] |
| txn.byte_enable = "".join( |
| [random.choice(bep) for x in range(belen)] |
| ) |
| if random.random() < pr_stream and length > 1: |
| strlen = length |
| while True: |
| strlen -= 1 |
| if strlen == 1 or ( |
| random.random() < 0.5 |
| and (length / strlen) * strlen == length |
| ): |
| break |
| txn.stream_width = strlen * data_width |
| nr_generated += 1 |
| yield txn |
| |
| |
| # test code for transaction generator |
| if False: |
| for t in txn_generator(20): |
| print(t) |
| raise Exception |
| # end test code |
| |
| |
| class memory_state_cl: |
| buffer_size = 2048 |
| repeats = 10 * buffer_size // 36 |
| population = (string.ascii_lowercase + string.digits) * int(repeats) |
| |
| def __init__(self): |
| self.initiator = "".join( |
| random.sample( |
| memory_state_cl.population, memory_state_cl.buffer_size |
| ) |
| ) |
| self.target = "".join( |
| random.sample( |
| memory_state_cl.population, memory_state_cl.buffer_size |
| ) |
| ) |
| |
| def copy(self): |
| r = memory_state_cl() |
| r.initiator = self.initiator |
| r.target = self.target |
| return r |
| |
| def __eq__(self, golden): |
| return ( |
| self.initiator == golden.initiator and self.target == golden.target |
| ) |
| |
| def __ne__(self, golden): |
| return ( |
| self.initiator != golden.initiator or self.target != golden.target |
| ) |
| |
| def __str__(self): |
| return ( |
| "initiator = " + self.initiator + "\n" + "target = " + self.target |
| ) |
| |
| |
| # all fragmentation generators |
| def __FRAG__null(txn): |
| yield txn |
| |
| |
| def __FRAG__word(txn): |
| curr_address = txn.address |
| reset_address = curr_address + txn.stream_width |
| if txn.byte_enable: |
| full_byte_enable = txn.byte_enable * ( |
| 1 + txn.length / len(txn.byte_enable) |
| ) |
| be_pos = 0 |
| d_pos = txn.data_pointer |
| end = txn.length + d_pos |
| while d_pos < end: |
| new_txn = transaction( |
| bus_width=txn.bus_width, |
| data_width=txn.data_width, |
| read_not_write=txn.read_not_write, |
| length=txn.data_width, |
| address=curr_address, |
| byte_enable=False, |
| stream_width=txn.data_width, |
| data_pointer=d_pos, |
| ) |
| curr_address += txn.data_width |
| if curr_address == reset_address: |
| curr_address = txn.address |
| d_pos += txn.data_width |
| if txn.byte_enable: |
| new_txn.byte_enable = full_byte_enable[ |
| be_pos : be_pos + txn.data_width |
| ] |
| be_pos += txn.data_width |
| yield new_txn |
| |
| |
| def __FRAG__stream(txn): |
| if txn.byte_enable: |
| full_byte_enable = txn.byte_enable * ( |
| 1 + txn.length / len(txn.byte_enable) |
| ) |
| be_pos = 0 |
| bytes_done = 0 |
| while bytes_done < txn.length: |
| new_txn = transaction( |
| bus_width=txn.bus_width, |
| data_width=txn.data_width, |
| read_not_write=txn.read_not_write, |
| length=txn.stream_width, |
| address=txn.address, |
| byte_enable=False, |
| stream_width=txn.stream_width, |
| data_pointer=bytes_done + txn.data_pointer, |
| ) |
| if txn.byte_enable: |
| new_txn.byte_enable = full_byte_enable[ |
| be_pos : be_pos + txn.stream_width |
| ] |
| be_pos += txn.stream_width |
| yield new_txn |
| bytes_done += txn.stream_width |
| |
| |
| def __FRAG__random(stream_txn): |
| for txn in __FRAG__stream(stream_txn): |
| # txn has full byte enables and no stream feature guaranteed |
| pr_nofrag = 0.5 |
| end_address = txn.address + txn.length |
| curr_address = txn.address |
| be_pos = 0 |
| d_pos = txn.data_pointer |
| while curr_address < end_address: |
| new_txn = transaction( |
| bus_width=txn.bus_width, |
| data_width=txn.data_width, |
| read_not_write=txn.read_not_write, |
| length=txn.data_width, |
| address=curr_address, |
| byte_enable=txn.byte_enable, |
| stream_width=txn.data_width, |
| data_pointer=d_pos, |
| ) |
| curr_address += txn.data_width |
| d_pos += txn.data_width |
| if txn.byte_enable: |
| new_txn.byte_enable = txn.byte_enable[ |
| be_pos : be_pos + txn.data_width |
| ] |
| be_pos += txn.data_width |
| while random.random() < pr_nofrag and curr_address < end_address: |
| new_txn.length += txn.data_width |
| new_txn.stream_width += txn.data_width |
| curr_address += txn.data_width |
| d_pos += txn.data_width |
| if txn.byte_enable: |
| new_txn.byte_enable += txn.byte_enable[ |
| be_pos : be_pos + txn.data_width |
| ] |
| be_pos += txn.data_width |
| yield new_txn |
| |
| |
| def __FRAG__randinterleave(stream_txn): |
| for txn in __FRAG__stream(stream_txn): |
| # txn has full byte enables and no stream feature guaranteed |
| pr_frag = 0.5 |
| txns = [ |
| transaction( |
| bus_width=txn.bus_width, |
| data_width=txn.data_width, |
| read_not_write=txn.read_not_write, |
| length=txn.length, |
| address=txn.address, |
| byte_enable="", |
| stream_width=txn.length, |
| data_pointer=txn.data_pointer, |
| ), |
| transaction( |
| bus_width=txn.bus_width, |
| data_width=txn.data_width, |
| read_not_write=txn.read_not_write, |
| length=txn.length, |
| address=txn.address, |
| byte_enable="", |
| stream_width=txn.length, |
| data_pointer=txn.data_pointer, |
| ), |
| ] |
| curr = 0 |
| be_pos = 0 |
| on = "1" * txn.data_width |
| off = "0" * txn.data_width |
| while be_pos < txn.length: |
| if txn.byte_enable: |
| bew = txn.byte_enable[be_pos : be_pos + txn.data_width] |
| else: |
| bew = on |
| txns[curr].byte_enable += bew |
| txns[1 - curr].byte_enable += off |
| be_pos += txn.data_width |
| if random.random() < pr_frag: |
| curr = 1 - curr |
| yield txns[0] |
| yield txns[1] |
| |
| |
| fragmenters = [globals()[n] for n in globals().keys() if n[:8] == "__FRAG__"] |
| |
| # test code for fragmenters |
| if False: |
| for t in txn_generator(1): |
| print(t) |
| print() |
| for u in fragmenters[4](t): |
| print(u) |
| raise Exception |
| # end test code |
| |
| |
| # conversion functions are determined by an index (shared with C++) and |
| # a function that tests if they can be applied to a transaction |
| def __CHCK__generic(txn): |
| __CHCK__generic.nr = 0 |
| return True |
| |
| |
| def __CHCK__word(txn): |
| __CHCK__word.nr = 1 |
| if txn.data_width > txn.bus_width: |
| return False |
| if txn.stream_width < txn.length: |
| return False |
| if txn.byte_enable and len(txn.byte_enable) < txn.length: |
| return False |
| return True |
| |
| |
| def __CHCK__aligned(txn): |
| __CHCK__aligned.nr = 2 |
| if txn.data_width > txn.bus_width: |
| return False |
| if txn.stream_width < txn.length: |
| return False |
| if txn.byte_enable and len(txn.byte_enable) < txn.length: |
| return False |
| base_addr = txn.address / txn.bus_width |
| if base_addr * txn.bus_width != txn.address: |
| return False |
| nr_bus_words = txn.length / txn.bus_width |
| if nr_bus_words * txn.bus_width != txn.length: |
| return False |
| return True |
| |
| |
| def __CHCK__single(txn): |
| __CHCK__single.nr = 3 |
| if txn.length != txn.data_width: |
| return False |
| base_addr = txn.address / txn.bus_width |
| end_base_addr = (txn.address + txn.length - 1) / txn.bus_width |
| if base_addr != end_base_addr: |
| return False |
| return True |
| |
| |
| def __CHCK__local_single(txn): |
| __CHCK__local_single.nr = 4 |
| if txn.length != txn.data_width: |
| return False |
| return True |
| |
| |
| all_converters = [ |
| globals()[n] for n in globals().keys() if n[:8] == "__CHCK__" |
| ] |
| for x in all_converters: |
| x.usage = 0 |
| |
| |
| class TesterFailure(Exception): |
| pass |
| |
| |
| class SystemCFailure(Exception): |
| pass |
| |
| |
| class ConverterDifference(Exception): |
| pass |
| |
| |
| class FragmenterDifference(Exception): |
| pass |
| |
| |
| from subprocess import Popen, PIPE |
| |
| # test a single fragment in multiple ways |
| def test_a_fragment(f, ms): |
| # f is the (fragment of a) transaction |
| # ms is the memory state to use at start of test |
| |
| # run the same fragment through all applicable conversion functions |
| # and check they all do the same thing |
| # use the same sub-process for all of them |
| |
| # build complete stdin |
| convs = [c for c in all_converters if c(f)] |
| if len(convs) == 0: |
| raise TesterFailure(f.str()) |
| txtin = "\n".join( |
| [("%s\n%s\nconverter = %d\n" % (f, ms, c.nr)) for c in convs] |
| ) |
| |
| # run and get stdout |
| txtout = "no output" |
| try: |
| sp = Popen( |
| "../build-unix/test_endian_conv.exe", stdin=PIPE, stdout=PIPE |
| ) |
| txtout = sp.communicate(txtin)[0] |
| tmp = txtout.splitlines() |
| initiators = [l.split()[-1] for l in tmp if l[:14] == " initiator = "] |
| targets = [l.split()[-1] for l in tmp if l[:11] == " target = "] |
| except: |
| raise SystemCFailure("\n" + txtin + txtout) |
| if sp.returncode != 0: |
| raise SystemCFailure("\n" + txtin + txtout) |
| if len(initiators) != len(convs): |
| raise SystemCFailure("\n" + txtin + txtout) |
| if len(targets) != len(convs): |
| raise SystemCFailure("\n" + txtin + txtout) |
| for c in convs: |
| c.usage += 1 |
| |
| ms_out = memory_state_cl() |
| ms_out.initiator = initiators[0] |
| ms_out.target = targets[0] |
| for i in range(1, len(convs)): |
| if initiators[i] != ms_out.initiator or targets[i] != ms_out.target: |
| raise ConverterDifference( |
| """ |
| %s |
| start memory: |
| %s |
| converter = %d |
| golden memory: |
| %s |
| actual memory: |
| %s""" |
| % (f, ms, i, golden_ms, ms_out) |
| ) |
| |
| return ms_out |
| |
| |
| # main loop |
| |
| from sys import argv |
| |
| print("Testing Endianness Conversion Functions") |
| print("March 2008") |
| print("OSCI TLM-2") |
| |
| try: |
| nr_txns_to_test = int(argv[1]) |
| except: |
| print("No command line input for number of tests, using default") |
| nr_txns_to_test = 1000 |
| |
| print("Number to test:", nr_txns_to_test) |
| |
| # generate and test a number of transactions |
| for txn in txn_generator(nr_txns_to_test): |
| |
| # each transaction has a random initial memory state |
| initial_memory = memory_state_cl() |
| |
| # iterate over all defined fragmentation functions |
| first_time = True |
| for fragmenter in fragmenters: |
| |
| # all versions of the transaction start in the same place |
| memory_state = initial_memory.copy() |
| |
| # now iterate over the fragments of the transaction, accumulating |
| # the memory state |
| for partial_txn in fragmenter(txn): |
| memory_state = test_a_fragment(partial_txn, memory_state) |
| |
| if first_time: |
| golden_memory_state = memory_state.copy() |
| first_time = False |
| else: |
| if memory_state != golden_memory_state: |
| raise FragmenterDifference( |
| """ |
| fragmenter: %s |
| transaction: |
| %s |
| start memory: |
| %s |
| golden memory: |
| %s |
| actual memory: |
| %s""" |
| % ( |
| fragmenter, |
| txn, |
| initial_memory, |
| golden_memory_state, |
| memory_state, |
| ) |
| ) |
| |
| print("."), |
| print() |
| |
| |
| print("Conversion functions usage frequency:") |
| for c in all_converters: |
| print(c.nr, c.__name__, c.usage) |