import cocotb from cocotb.triggers import RisingEdge, FallingEdge, Edge from cocotb.triggers import Timer from cocotbext.axi import (AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStreamMonitor, AxiStreamFrame) from random import randint, randrange from cocotb.regression import TestFactory from itertools import cycle async def reset_target(dut): dut.rst_n.value = 0 dut.out_tready.value = 0 dut.flush_output.value = 0 dut.clear.value = 0 await Timer(10, units='ns') await RisingEdge(dut.clk) dut.rst_n.value = 1 async def clock_driver(dut): while True: dut.clk.value = 0 await Timer(10, units='ns') dut.clk.value = 1 await Timer(10, units='ns') @cocotb.test() async def test_fifo_fill(dut): axi_in = AxiStreamSource(AxiStreamBus.from_prefix(dut, 'in'), dut.clk, dut.rst_n, reset_active_level=False) cocotb.start_soon(clock_driver(dut)) await reset_target(dut) frame = bytearray([randint(0, 2**8-1) for _ in range(128+2)]) await axi_in.send(frame) await axi_in.wait() await RisingEdge(dut.clk) await RisingEdge(dut.clk) assert dut.in_tready.value == 1, 'FIFO full + pipeline full' assert dut.full.value == 1, 'FIFO full?' await axi_in.send(bytearray([randint(0, 255)])) await axi_in.wait() await RisingEdge(dut.clk) await RisingEdge(dut.clk) assert dut.in_tready.value == 0, 'FIFO full + pipeline full' assert dut.full.value == 1, 'FIFO full' assert dut.empty.value == 0, 'FIFO empty' async def stress_test(dut, tx_pause=None, rx_pause=None, count=512): axi_in = AxiStreamSource(AxiStreamBus.from_prefix(dut, 'in'), dut.clk, dut.rst_n, reset_active_level=False) axi_out = AxiStreamSink(AxiStreamBus.from_prefix(dut, "out"), dut.clk, dut.rst_n, reset_active_level=False) cocotb.start_soon(clock_driver(dut)) await reset_target(dut) if tx_pause: axi_in.set_pause_generator(tx_pause) if rx_pause: axi_out.set_pause_generator(rx_pause) frame = bytearray([randint(0, 255) for _ in range(count)]) await axi_in.send(frame) await axi_in.wait() for _ in range(count*100): await RisingEdge(dut.clk) if dut.flacdec_double_mem_fifo_1.fill_level.value == 0: break else: assert False, 'Timeout' for _ in range(100): await RisingEdge(dut.clk) rx_data = await axi_out.read(axi_out.count()) assert len(rx_data) == len(frame), 'RX TX data count mismatch' for tx, rx in zip(frame, rx_data): assert tx == rx, 'TX, RX data mismatch' stress_test_fac = TestFactory(stress_test) stress_test_fac.add_option('count', [1, 5, randint(1000, 2**16)]) def generate_random_wait_seq(percent=50, length=20): seq = [1 if randint(0, 100) >= percent else 0 for _ in range(length)] return seq stress_test_fac.add_option('tx_pause', [None, cycle(generate_random_wait_seq(10, 30)), cycle(generate_random_wait_seq(80, 30))]) stress_test_fac.add_option('rx_pause', [None, cycle(generate_random_wait_seq(10, 30)), cycle(generate_random_wait_seq(80, 30))]) stress_test_fac.generate_tests()