hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::{Bounded, Unbounded};
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39    pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43    fn id(&self) -> LocationId {
44        LocationId::Atomic(Box::new(self.tick.id()))
45    }
46
47    fn flow_state(&self) -> &FlowState {
48        self.tick.flow_state()
49    }
50
51    fn is_top_level() -> bool {
52        L::is_top_level()
53    }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58    L: Location<'a>,
59{
60    type Root = L::Root;
61
62    fn root(&self) -> Self::Root {
63        self.tick.root()
64    }
65}
66
67#[sealed]
68impl<L> NoTick for Atomic<L> {}
69
70pub trait DeferTick {
71    fn defer_tick(self) -> Self;
72}
73
74/// Marks the stream as being inside the single global clock domain.
75#[derive(Clone)]
76pub struct Tick<L> {
77    pub(crate) id: usize,
78    pub(crate) l: L,
79}
80
81impl<L: DynLocation> DynLocation for Tick<L> {
82    fn id(&self) -> LocationId {
83        LocationId::Tick(self.id, Box::new(self.l.id()))
84    }
85
86    fn flow_state(&self) -> &FlowState {
87        self.l.flow_state()
88    }
89
90    fn is_top_level() -> bool {
91        false
92    }
93}
94
95impl<'a, L> Location<'a> for Tick<L>
96where
97    L: Location<'a>,
98{
99    type Root = L::Root;
100
101    fn root(&self) -> Self::Root {
102        self.l.root()
103    }
104}
105
106impl<'a, L> Tick<Atomic<L>>
107where
108    L: Location<'a>,
109{
110    pub fn as_regular_tick(&self) -> Tick<L> {
111        self.l.tick.clone()
112    }
113}
114
115impl<'a, L> Tick<L>
116where
117    L: Location<'a>,
118{
119    pub fn outer(&self) -> &L {
120        &self.l
121    }
122
123    pub fn spin_batch(
124        &self,
125        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
126    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
127    where
128        L: NoTick,
129    {
130        let out = self
131            .l
132            .spin()
133            .flat_map_ordered(q!(move |_| 0..batch_size))
134            .map(q!(|_| ()));
135
136        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
137    }
138
139    pub fn singleton<T>(
140        &self,
141        e: impl QuotedWithContext<'a, T, Tick<L>>,
142    ) -> Singleton<T, Self, Bounded>
143    where
144        T: Clone,
145    {
146        let e = e.splice_untyped_ctx(self);
147
148        Singleton::new(
149            self.clone(),
150            HydroNode::SingletonSource {
151                value: e.into(),
152                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
153            },
154        )
155    }
156
157    /// Creates an [`Optional`] which has a null value on every tick.
158    ///
159    /// # Example
160    /// ```rust
161    /// # use hydro_lang::prelude::*;
162    /// # use futures::StreamExt;
163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
164    /// let tick = process.tick();
165    /// let optional = tick.none::<i32>();
166    /// optional.unwrap_or(tick.singleton(q!(123)))
167    /// # .all_ticks()
168    /// # }, |mut stream| async move {
169    /// // 123
170    /// # assert_eq!(stream.next().await.unwrap(), 123);
171    /// # }));
172    /// ```
173    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
174        let e = q!([]);
175        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
176
177        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
178            self.clone(),
179            HydroNode::Source {
180                source: HydroSource::Iter(e.into()),
181                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
182            },
183        );
184
185        unit_optional.map(q!(|_| unreachable!())) // always empty
186    }
187
188    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
189    /// null on all subsequent ticks.
190    ///
191    /// This is useful for bootstrapping stateful computations which need an initial value.
192    ///
193    /// # Example
194    /// ```rust
195    /// # use hydro_lang::prelude::*;
196    /// # use futures::StreamExt;
197    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
198    /// let tick = process.tick();
199    /// // ticks are lazy by default, forces the second tick to run
200    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
201    /// let optional = tick.optional_first_tick(q!(5));
202    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
203    /// # }, |mut stream| async move {
204    /// // 5, 123, 123, 123, ...
205    /// # assert_eq!(stream.next().await.unwrap(), 5);
206    /// # assert_eq!(stream.next().await.unwrap(), 123);
207    /// # assert_eq!(stream.next().await.unwrap(), 123);
208    /// # assert_eq!(stream.next().await.unwrap(), 123);
209    /// # }));
210    /// ```
211    pub fn optional_first_tick<T: Clone>(
212        &self,
213        e: impl QuotedWithContext<'a, T, Tick<L>>,
214    ) -> Optional<T, Self, Bounded> {
215        let e_arr = q!([e]);
216        let e = e_arr.splice_untyped_ctx(self);
217
218        Optional::new(
219            self.clone(),
220            HydroNode::Batch {
221                inner: Box::new(HydroNode::Source {
222                    source: HydroSource::Iter(e.into()),
223                    metadata: self
224                        .outer()
225                        .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
226                }),
227                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
228            },
229        )
230    }
231
232    #[expect(
233        private_bounds,
234        reason = "only Hydro collections can implement ReceiverComplete"
235    )]
236    pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
237    where
238        S: CycleCollection<'a, ForwardRef, Location = Self>,
239        L: NoTick,
240    {
241        let next_id = self.flow_state().borrow_mut().next_cycle_id();
242        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
243
244        (
245            ForwardHandle {
246                completed: false,
247                ident: ident.clone(),
248                expected_location: Location::id(self),
249                _phantom: PhantomData,
250            },
251            S::create_source(ident, self.clone()),
252        )
253    }
254
255    #[expect(
256        private_bounds,
257        reason = "only Hydro collections can implement ReceiverComplete"
258    )]
259    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
260    where
261        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
262        L: NoTick,
263    {
264        let next_id = self.flow_state().borrow_mut().next_cycle_id();
265        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
266
267        (
268            TickCycleHandle {
269                completed: false,
270                ident: ident.clone(),
271                expected_location: Location::id(self),
272                _phantom: PhantomData,
273            },
274            S::create_source(ident, self.clone()).defer_tick(),
275        )
276    }
277
278    #[expect(
279        private_bounds,
280        reason = "only Hydro collections can implement ReceiverComplete"
281    )]
282    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
283    where
284        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
285    {
286        let next_id = self.flow_state().borrow_mut().next_cycle_id();
287        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
288
289        (
290            TickCycleHandle {
291                completed: false,
292                ident: ident.clone(),
293                expected_location: Location::id(self),
294                _phantom: PhantomData,
295            },
296            // no need to defer_tick, create_source_with_initial does it for us
297            S::create_source_with_initial(ident, initial, self.clone()),
298        )
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use stageleft::q;
305
306    use crate::live_collections::sliced::sliced;
307    use crate::location::Location;
308    use crate::nondet::nondet;
309    use crate::prelude::FlowBuilder;
310
311    #[test]
312    fn sim_atomic_stream() {
313        let flow = FlowBuilder::new();
314        let node = flow.process::<()>();
315        let external = flow.external::<()>();
316
317        let (input_write, write_req) = node.source_external_bincode(&external);
318        let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
319
320        let tick = node.tick();
321        let atomic_write = write_req.atomic(&tick);
322        let current_state = atomic_write.clone().fold(
323            q!(|| 0),
324            q!(|state: &mut i32, v: i32| {
325                *state += v;
326            }),
327        );
328
329        let write_ack = atomic_write.end_atomic().send_bincode_external(&external);
330        let read_response = sliced! {
331            let batch_of_req = use(read_req, nondet!(/** test */));
332            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
333            batch_of_req.cross_singleton(latest_singleton)
334        }
335        .send_bincode_external(&external);
336
337        let sim_compiled = flow.sim().compiled();
338        let instances = sim_compiled.exhaustive(async |mut compiled| {
339            let write_send = compiled.connect(&input_write);
340            let read_send = compiled.connect(&input_read);
341            let mut write_ack_recv = compiled.connect(&write_ack);
342            let mut read_response_recv = compiled.connect(&read_response);
343            compiled.launch();
344
345            write_send.send(1);
346            write_ack_recv.assert_yields([1]).await;
347            read_send.send(());
348            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
349        });
350
351        assert_eq!(instances, 1);
352
353        let instances_read_before_write = sim_compiled.exhaustive(async |mut compiled| {
354            let write_send = compiled.connect(&input_write);
355            let read_send = compiled.connect(&input_read);
356            let mut write_ack_recv = compiled.connect(&write_ack);
357            let mut read_response_recv = compiled.connect(&read_response);
358            compiled.launch();
359
360            write_send.send(1);
361            read_send.send(());
362            write_ack_recv.assert_yields([1]).await;
363            let _ = read_response_recv.next().await;
364        });
365
366        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
367    }
368
369    #[test]
370    #[should_panic]
371    fn sim_non_atomic_stream() {
372        // shows that atomic is necessary
373        let flow = FlowBuilder::new();
374        let node = flow.process::<()>();
375        let external = flow.external::<()>();
376
377        let (input_write, write_req) = node.source_external_bincode(&external);
378        let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
379
380        let current_state = write_req.clone().fold(
381            q!(|| 0),
382            q!(|state: &mut i32, v: i32| {
383                *state += v;
384            }),
385        );
386
387        let write_ack = write_req.send_bincode_external(&external);
388
389        let read_response = sliced! {
390            let batch_of_req = use(read_req, nondet!(/** test */));
391            let latest_singleton = use(current_state, nondet!(/** test */));
392            batch_of_req.cross_singleton(latest_singleton)
393        }
394        .send_bincode_external(&external);
395
396        flow.sim().exhaustive(async |mut compiled| {
397            let write_send = compiled.connect(&input_write);
398            let read_send = compiled.connect(&input_read);
399            let mut write_ack_recv = compiled.connect(&write_ack);
400            let mut read_response_recv = compiled.connect(&read_response);
401            compiled.launch();
402
403            write_send.send(1);
404            write_ack_recv.assert_yields([1]).await;
405            read_send.send(());
406
407            if let Some((_, v)) = read_response_recv.next().await {
408                assert_eq!(v, 1);
409            }
410        });
411    }
412}