Skip to content

sim

Simulation helpers for streams.

recv_packet(ctx, stream, *, domain='sync', context=None) async

Receive a packet from a packetized stream.

Note

Does not support streams with FIRST semantics, as we can't tell the end of the current packet before receiving the first token of the next packet, and we have nowhere to store that token until the next recv_packet() call.

Parameters:

Name Type Description Default
ctx SimulatorContext

Simulator context.

required
stream Interface

Target stream.

required
domain

Clock domain.

'sync'
context

Context for looking up the clock domain.

None

Returns:

Type Description
list[Any]

Packet contents.

Source code in katsuo/stream/sim.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def recv_packet(ctx: SimulatorContext, stream: stream.Interface, *, domain = 'sync', context = None) -> list[Any]:
    '''Receive a packet from a packetized stream.

    Note:
        Does not support streams with FIRST semantics, as we can't tell the end of the current packet before receiving
        the first token of the next packet, and we have nowhere to store that token until the next `recv_packet()` call.

    Args:
        ctx: Simulator context.
        stream: Target stream.
        domain: Clock domain.
        context: Context for looking up the clock domain.

    Returns:
        Packet contents.
    '''

    shape = stream.payload.shape()
    if not isinstance(shape, Packet):
        raise TypeError('recv_packet() can only be used on streams with Packet payloads')
    if shape.semantics == Packet.Semantics.FIRST:
        raise TypeError('recv_packet() does not support FIRST semantics')

    buf = []

    ctx.set(stream.ready, 1)

    while True:
        payload, = await ctx.tick(domain = domain, context = context).sample(stream.payload).until(stream.valid == 1)

        if shape.semantics.has_first and payload.first:
            buf.clear()

        if shape.semantics.has_end and payload.end:
            break

        buf.append(payload.data)

        if shape.semantics.has_last and payload.last:
            break

    ctx.set(stream.ready, 0)

    return buf

send_packet(ctx, stream, packet, *, domain='sync', context=None) async

Send a packet to a packetized stream.

Parameters:

Name Type Description Default
ctx SimulatorContext

Simulator context.

required
stream Interface

Target stream.

required
packet Iterable[Any]

Packet contents.

required
domain

Clock domain.

'sync'
context

Context for looking up the clock domain.

None
Source code in katsuo/stream/sim.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def send_packet(ctx: SimulatorContext, stream: stream.Interface, packet: Iterable[Any], *, domain = 'sync', context = None):
    '''Send a packet to a packetized stream.

    Args:
        ctx: Simulator context.
        stream: Target stream.
        packet: Packet contents.
        domain: Clock domain.
        context: Context for looking up the clock domain.
    '''

    shape = stream.payload.shape()
    if not isinstance(shape, Packet):
        raise TypeError('send_packet() can only be used on streams with Packet payloads')

    ctx.set(stream.valid, 1)

    for i, e in enumerate(packet):
        ctx.set(stream.payload.data, e)
        if shape.semantics.has_first:
            ctx.set(stream.payload.first, i == 0)
        if shape.semantics.has_last:
            ctx.set(stream.payload.last, i == len(packet) - 1)
        if shape.semantics.has_end:
            ctx.set(stream.payload.end, 0)

        await ctx.tick(domain = domain, context = context).until(stream.ready == 1)

    if shape.semantics.has_end:
        ctx.set(stream.payload.end, 1)
        await ctx.tick(domain = domain, context = context).until(stream.ready == 1)
        ctx.set(stream.payload.end, 0)

    ctx.set(stream.valid, 0)

stream_get(ctx, stream, *, domain='sync', context=None) async

Get a payload token from a stream.

Parameters:

Name Type Description Default
ctx SimulatorContext

Simulator context.

required
stream Interface

Target stream.

required
domain

Clock domain.

'sync'
context

Context for looking up the clock domain.

None

Returns:

Type Description

Payload token.

Source code in katsuo/stream/sim.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
async def stream_get(ctx: SimulatorContext, stream: stream.Interface, *, domain = 'sync', context = None):
    '''Get a payload token from a stream.

    Args:
        ctx: Simulator context.
        stream: Target stream.
        domain: Clock domain.
        context: Context for looking up the clock domain.

    Returns:
        Payload token.
    '''

    ctx.set(stream.ready, 1)

    payload, = await ctx.tick(domain = domain, context = context).sample(stream.payload).until(stream.valid == 1)

    ctx.set(stream.ready, 0)
    return payload

stream_put(ctx, stream, payload, *, domain='sync', context=None) async

Put a payload token into a stream.

Parameters:

Name Type Description Default
ctx SimulatorContext

Simulator context.

required
stream Interface

Target stream.

required
payload

Payload token.

required
domain

Clock domain.

'sync'
context

Context for looking up the clock domain.

None
Source code in katsuo/stream/sim.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def stream_put(ctx: SimulatorContext, stream: stream.Interface, payload, *, domain = 'sync', context = None):
    '''Put a payload token into a stream.

    Args:
        ctx: Simulator context.
        stream: Target stream.
        payload: Payload token.
        domain: Clock domain.
        context: Context for looking up the clock domain.
    '''

    ctx.set(stream.valid, 1)
    ctx.set(stream.payload, payload)

    await ctx.tick(domain = domain, context = context).until(stream.ready == 1)

    ctx.set(stream.valid, 0)