blob: b9e10ad94d5ffc23d60b555a5aac94417643f8e2 [file] [log] [blame]
"""
/*****************************************************************************
Licensed to Accellera Systems Initiative Inc. (Accellera) under one or
more contributor license agreements. See the NOTICE file distributed
with this work for additional information regarding copyright ownership.
Accellera licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with the
License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License.
*****************************************************************************/
Python Script to test Endianness Conversion Functions for OSCI TLM-2
There is a simple testbench programme in C++ which runs a single
transaction through a single conversion function, to a simple target
memory and back. This script will run the testbench many times and test for
- incomplete execution, seg-faults, etc
- distributability of conversion function: each transaction should have
the same effect on initiator/target memory as a set of smaller transactions
that sum to it
- equivalence: all conversion functions should have the same functional
effect as each other
The approach is to provide the initial state of the initiator and
target memory (as strings) and to capture their final states, so that
only the effect of the transaction on the data buffers is measured.
Script works out for itself which conversion functions are legal for a
given transaction and applies all of them. Note that where data is
wider than bus, only one conversion function is legal and testing is
somewhat limited.
Testing space (select a transaction at random from the space):
- with and without byte-enables (generated at random for each data word
and can be all-zero)
- data widths of (1,2,4,8,16)
- bus widths of (1,2,4,8,16), lower priority for those smaller than data
width
- transaction lengths of (1..32) x data width, higher probability for
lower values
- base address (0..1023) at bus_width steps
- offset address (0..bus width) with a higher priority for 0
- address in initiator buffer uniform random
- read or write
- byte-enable length may be smaller than transasction length
- may be a streaming burst
Transaction breakdown
- individual words (always)
- one random breakdown with each segment containing between 1 and N-1
words, where N is the length in words
- one breakdown with two segments interleaved, using (additional) byte
enables to suppress the part where the other segment is active
Data buffer definition: starts at 0, randomly filled
with lower case letters and numbers. Size 2 kB. Addresses are limited to
1 kB.
"""
import random
import string
class transaction:
""" contains read_not_write, address, length, byte_enable,
bus_width, data_width, data_pointer, stream_width """
def __init__(self, **a):
self.__dict__ = a
def __str__(self):
if self.read_not_write:
a = "R: "
else:
a = "W: "
a += "addr = %d, len = %d, bus = %d, word = %d, data = %d" % (
self.address,
self.length,
self.bus_width,
self.data_width,
self.data_pointer,
)
if self.byte_enable:
a += ", be = " + self.byte_enable
else:
a += ", be = x"
a += ", sw = %d" % (self.stream_width)
return a
def txn_generator(nr):
pr_read = 0.5
pr_byte_enable = 0.5
pr_enabled = 0.5
bus_widths = [1, 2, 4, 8, 16]
data_widths = [1, 2, 4, 8, 16] + [1, 2, 4, 8] + [1, 2, 4] + [1, 2]
lengths = (
list(range(1, 33))
+ list(range(1, 17))
+ list(range(1, 9))
+ list(range(1, 5))
+ list(range(1, 3))
)
pr_short_be = 0.2
pr_stream = 0.1
nr_generated = 0
while nr_generated < nr:
# create a random transaction
bus_width = random.choice(bus_widths)
while True:
data_width = random.choice(data_widths)
if data_width <= bus_width:
break
if random.random() < 0.25:
break
length = random.choice(lengths)
addr_base = random.choice(list(range(0, 1024, bus_width)))
addr_offset = random.choice(
list(range(bus_width)) + [0] * int(bus_width // 2)
)
txn = transaction(
bus_width=bus_width,
data_width=data_width,
read_not_write=random.random() < pr_read,
length=length * data_width,
address=addr_base + addr_offset,
byte_enable=False,
stream_width=length * data_width,
data_pointer=random.randint(0, 1023),
)
if random.random() < pr_byte_enable:
belen = length
if random.random() < pr_short_be:
belen = min(random.choice(lengths), length)
bep = ["0" * data_width, "1" * data_width]
txn.byte_enable = "".join(
[random.choice(bep) for x in range(belen)]
)
if random.random() < pr_stream and length > 1:
strlen = length
while True:
strlen -= 1
if strlen == 1 or (
random.random() < 0.5
and (length / strlen) * strlen == length
):
break
txn.stream_width = strlen * data_width
nr_generated += 1
yield txn
# test code for transaction generator
if False:
for t in txn_generator(20):
print(t)
raise Exception
# end test code
class memory_state_cl:
buffer_size = 2048
repeats = 10 * buffer_size // 36
population = (string.ascii_lowercase + string.digits) * int(repeats)
def __init__(self):
self.initiator = "".join(
random.sample(
memory_state_cl.population, memory_state_cl.buffer_size
)
)
self.target = "".join(
random.sample(
memory_state_cl.population, memory_state_cl.buffer_size
)
)
def copy(self):
r = memory_state_cl()
r.initiator = self.initiator
r.target = self.target
return r
def __eq__(self, golden):
return (
self.initiator == golden.initiator and self.target == golden.target
)
def __ne__(self, golden):
return (
self.initiator != golden.initiator or self.target != golden.target
)
def __str__(self):
return (
"initiator = " + self.initiator + "\n" + "target = " + self.target
)
# all fragmentation generators
def __FRAG__null(txn):
yield txn
def __FRAG__word(txn):
curr_address = txn.address
reset_address = curr_address + txn.stream_width
if txn.byte_enable:
full_byte_enable = txn.byte_enable * (
1 + txn.length / len(txn.byte_enable)
)
be_pos = 0
d_pos = txn.data_pointer
end = txn.length + d_pos
while d_pos < end:
new_txn = transaction(
bus_width=txn.bus_width,
data_width=txn.data_width,
read_not_write=txn.read_not_write,
length=txn.data_width,
address=curr_address,
byte_enable=False,
stream_width=txn.data_width,
data_pointer=d_pos,
)
curr_address += txn.data_width
if curr_address == reset_address:
curr_address = txn.address
d_pos += txn.data_width
if txn.byte_enable:
new_txn.byte_enable = full_byte_enable[
be_pos : be_pos + txn.data_width
]
be_pos += txn.data_width
yield new_txn
def __FRAG__stream(txn):
if txn.byte_enable:
full_byte_enable = txn.byte_enable * (
1 + txn.length / len(txn.byte_enable)
)
be_pos = 0
bytes_done = 0
while bytes_done < txn.length:
new_txn = transaction(
bus_width=txn.bus_width,
data_width=txn.data_width,
read_not_write=txn.read_not_write,
length=txn.stream_width,
address=txn.address,
byte_enable=False,
stream_width=txn.stream_width,
data_pointer=bytes_done + txn.data_pointer,
)
if txn.byte_enable:
new_txn.byte_enable = full_byte_enable[
be_pos : be_pos + txn.stream_width
]
be_pos += txn.stream_width
yield new_txn
bytes_done += txn.stream_width
def __FRAG__random(stream_txn):
for txn in __FRAG__stream(stream_txn):
# txn has full byte enables and no stream feature guaranteed
pr_nofrag = 0.5
end_address = txn.address + txn.length
curr_address = txn.address
be_pos = 0
d_pos = txn.data_pointer
while curr_address < end_address:
new_txn = transaction(
bus_width=txn.bus_width,
data_width=txn.data_width,
read_not_write=txn.read_not_write,
length=txn.data_width,
address=curr_address,
byte_enable=txn.byte_enable,
stream_width=txn.data_width,
data_pointer=d_pos,
)
curr_address += txn.data_width
d_pos += txn.data_width
if txn.byte_enable:
new_txn.byte_enable = txn.byte_enable[
be_pos : be_pos + txn.data_width
]
be_pos += txn.data_width
while random.random() < pr_nofrag and curr_address < end_address:
new_txn.length += txn.data_width
new_txn.stream_width += txn.data_width
curr_address += txn.data_width
d_pos += txn.data_width
if txn.byte_enable:
new_txn.byte_enable += txn.byte_enable[
be_pos : be_pos + txn.data_width
]
be_pos += txn.data_width
yield new_txn
def __FRAG__randinterleave(stream_txn):
for txn in __FRAG__stream(stream_txn):
# txn has full byte enables and no stream feature guaranteed
pr_frag = 0.5
txns = [
transaction(
bus_width=txn.bus_width,
data_width=txn.data_width,
read_not_write=txn.read_not_write,
length=txn.length,
address=txn.address,
byte_enable="",
stream_width=txn.length,
data_pointer=txn.data_pointer,
),
transaction(
bus_width=txn.bus_width,
data_width=txn.data_width,
read_not_write=txn.read_not_write,
length=txn.length,
address=txn.address,
byte_enable="",
stream_width=txn.length,
data_pointer=txn.data_pointer,
),
]
curr = 0
be_pos = 0
on = "1" * txn.data_width
off = "0" * txn.data_width
while be_pos < txn.length:
if txn.byte_enable:
bew = txn.byte_enable[be_pos : be_pos + txn.data_width]
else:
bew = on
txns[curr].byte_enable += bew
txns[1 - curr].byte_enable += off
be_pos += txn.data_width
if random.random() < pr_frag:
curr = 1 - curr
yield txns[0]
yield txns[1]
fragmenters = [globals()[n] for n in globals().keys() if n[:8] == "__FRAG__"]
# test code for fragmenters
if False:
for t in txn_generator(1):
print(t)
print()
for u in fragmenters[4](t):
print(u)
raise Exception
# end test code
# conversion functions are determined by an index (shared with C++) and
# a function that tests if they can be applied to a transaction
def __CHCK__generic(txn):
__CHCK__generic.nr = 0
return True
def __CHCK__word(txn):
__CHCK__word.nr = 1
if txn.data_width > txn.bus_width:
return False
if txn.stream_width < txn.length:
return False
if txn.byte_enable and len(txn.byte_enable) < txn.length:
return False
return True
def __CHCK__aligned(txn):
__CHCK__aligned.nr = 2
if txn.data_width > txn.bus_width:
return False
if txn.stream_width < txn.length:
return False
if txn.byte_enable and len(txn.byte_enable) < txn.length:
return False
base_addr = txn.address / txn.bus_width
if base_addr * txn.bus_width != txn.address:
return False
nr_bus_words = txn.length / txn.bus_width
if nr_bus_words * txn.bus_width != txn.length:
return False
return True
def __CHCK__single(txn):
__CHCK__single.nr = 3
if txn.length != txn.data_width:
return False
base_addr = txn.address / txn.bus_width
end_base_addr = (txn.address + txn.length - 1) / txn.bus_width
if base_addr != end_base_addr:
return False
return True
def __CHCK__local_single(txn):
__CHCK__local_single.nr = 4
if txn.length != txn.data_width:
return False
return True
all_converters = [
globals()[n] for n in globals().keys() if n[:8] == "__CHCK__"
]
for x in all_converters:
x.usage = 0
class TesterFailure(Exception):
pass
class SystemCFailure(Exception):
pass
class ConverterDifference(Exception):
pass
class FragmenterDifference(Exception):
pass
from subprocess import Popen, PIPE
# test a single fragment in multiple ways
def test_a_fragment(f, ms):
# f is the (fragment of a) transaction
# ms is the memory state to use at start of test
# run the same fragment through all applicable conversion functions
# and check they all do the same thing
# use the same sub-process for all of them
# build complete stdin
convs = [c for c in all_converters if c(f)]
if len(convs) == 0:
raise TesterFailure(f.str())
txtin = "\n".join(
[("%s\n%s\nconverter = %d\n" % (f, ms, c.nr)) for c in convs]
)
# run and get stdout
txtout = "no output"
try:
sp = Popen(
"../build-unix/test_endian_conv.exe", stdin=PIPE, stdout=PIPE
)
txtout = sp.communicate(txtin)[0]
tmp = txtout.splitlines()
initiators = [l.split()[-1] for l in tmp if l[:14] == " initiator = "]
targets = [l.split()[-1] for l in tmp if l[:11] == " target = "]
except:
raise SystemCFailure("\n" + txtin + txtout)
if sp.returncode != 0:
raise SystemCFailure("\n" + txtin + txtout)
if len(initiators) != len(convs):
raise SystemCFailure("\n" + txtin + txtout)
if len(targets) != len(convs):
raise SystemCFailure("\n" + txtin + txtout)
for c in convs:
c.usage += 1
ms_out = memory_state_cl()
ms_out.initiator = initiators[0]
ms_out.target = targets[0]
for i in range(1, len(convs)):
if initiators[i] != ms_out.initiator or targets[i] != ms_out.target:
raise ConverterDifference(
"""
%s
start memory:
%s
converter = %d
golden memory:
%s
actual memory:
%s"""
% (f, ms, i, golden_ms, ms_out)
)
return ms_out
# main loop
from sys import argv
print("Testing Endianness Conversion Functions")
print("March 2008")
print("OSCI TLM-2")
try:
nr_txns_to_test = int(argv[1])
except:
print("No command line input for number of tests, using default")
nr_txns_to_test = 1000
print("Number to test:", nr_txns_to_test)
# generate and test a number of transactions
for txn in txn_generator(nr_txns_to_test):
# each transaction has a random initial memory state
initial_memory = memory_state_cl()
# iterate over all defined fragmentation functions
first_time = True
for fragmenter in fragmenters:
# all versions of the transaction start in the same place
memory_state = initial_memory.copy()
# now iterate over the fragments of the transaction, accumulating
# the memory state
for partial_txn in fragmenter(txn):
memory_state = test_a_fragment(partial_txn, memory_state)
if first_time:
golden_memory_state = memory_state.copy()
first_time = False
else:
if memory_state != golden_memory_state:
raise FragmenterDifference(
"""
fragmenter: %s
transaction:
%s
start memory:
%s
golden memory:
%s
actual memory:
%s"""
% (
fragmenter,
txn,
initial_memory,
golden_memory_state,
memory_state,
)
)
print("."),
print()
print("Conversion functions usage frequency:")
for c in all_converters:
print(c.nr, c.__name__, c.usage)