1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::{Bounded, Unbounded};
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39 pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43 fn id(&self) -> LocationId {
44 LocationId::Atomic(Box::new(self.tick.id()))
45 }
46
47 fn flow_state(&self) -> &FlowState {
48 self.tick.flow_state()
49 }
50
51 fn is_top_level() -> bool {
52 L::is_top_level()
53 }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58 L: Location<'a>,
59{
60 type Root = L::Root;
61
62 fn root(&self) -> Self::Root {
63 self.tick.root()
64 }
65}
66
67#[sealed]
68impl<L> NoTick for Atomic<L> {}
69
70pub trait DeferTick {
71 fn defer_tick(self) -> Self;
72}
73
74#[derive(Clone)]
76pub struct Tick<L> {
77 pub(crate) id: usize,
78 pub(crate) l: L,
79}
80
81impl<L: DynLocation> DynLocation for Tick<L> {
82 fn id(&self) -> LocationId {
83 LocationId::Tick(self.id, Box::new(self.l.id()))
84 }
85
86 fn flow_state(&self) -> &FlowState {
87 self.l.flow_state()
88 }
89
90 fn is_top_level() -> bool {
91 false
92 }
93}
94
95impl<'a, L> Location<'a> for Tick<L>
96where
97 L: Location<'a>,
98{
99 type Root = L::Root;
100
101 fn root(&self) -> Self::Root {
102 self.l.root()
103 }
104}
105
106impl<'a, L> Tick<Atomic<L>>
107where
108 L: Location<'a>,
109{
110 pub fn as_regular_tick(&self) -> Tick<L> {
111 self.l.tick.clone()
112 }
113}
114
115impl<'a, L> Tick<L>
116where
117 L: Location<'a>,
118{
119 pub fn outer(&self) -> &L {
120 &self.l
121 }
122
123 pub fn spin_batch(
124 &self,
125 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
126 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
127 where
128 L: NoTick,
129 {
130 let out = self
131 .l
132 .spin()
133 .flat_map_ordered(q!(move |_| 0..batch_size))
134 .map(q!(|_| ()));
135
136 out.batch(self, nondet!())
137 }
138
139 pub fn singleton<T>(
140 &self,
141 e: impl QuotedWithContext<'a, T, Tick<L>>,
142 ) -> Singleton<T, Self, Bounded>
143 where
144 T: Clone,
145 {
146 let e = e.splice_untyped_ctx(self);
147
148 Singleton::new(
149 self.clone(),
150 HydroNode::SingletonSource {
151 value: e.into(),
152 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
153 },
154 )
155 }
156
157 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
174 let e = q!([]);
175 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
176
177 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
178 self.clone(),
179 HydroNode::Source {
180 source: HydroSource::Iter(e.into()),
181 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
182 },
183 );
184
185 unit_optional.map(q!(|_| unreachable!())) }
187
188 pub fn optional_first_tick<T: Clone>(
212 &self,
213 e: impl QuotedWithContext<'a, T, Tick<L>>,
214 ) -> Optional<T, Self, Bounded> {
215 let e_arr = q!([e]);
216 let e = e_arr.splice_untyped_ctx(self);
217
218 Optional::new(
219 self.clone(),
220 HydroNode::Batch {
221 inner: Box::new(HydroNode::Source {
222 source: HydroSource::Iter(e.into()),
223 metadata: self
224 .outer()
225 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
226 }),
227 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
228 },
229 )
230 }
231
232 #[expect(
233 private_bounds,
234 reason = "only Hydro collections can implement ReceiverComplete"
235 )]
236 pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
237 where
238 S: CycleCollection<'a, ForwardRef, Location = Self>,
239 L: NoTick,
240 {
241 let next_id = self.flow_state().borrow_mut().next_cycle_id();
242 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
243
244 (
245 ForwardHandle {
246 completed: false,
247 ident: ident.clone(),
248 expected_location: Location::id(self),
249 _phantom: PhantomData,
250 },
251 S::create_source(ident, self.clone()),
252 )
253 }
254
255 #[expect(
256 private_bounds,
257 reason = "only Hydro collections can implement ReceiverComplete"
258 )]
259 pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
260 where
261 S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
262 L: NoTick,
263 {
264 let next_id = self.flow_state().borrow_mut().next_cycle_id();
265 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
266
267 (
268 TickCycleHandle {
269 completed: false,
270 ident: ident.clone(),
271 expected_location: Location::id(self),
272 _phantom: PhantomData,
273 },
274 S::create_source(ident, self.clone()).defer_tick(),
275 )
276 }
277
278 #[expect(
279 private_bounds,
280 reason = "only Hydro collections can implement ReceiverComplete"
281 )]
282 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
283 where
284 S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
285 {
286 let next_id = self.flow_state().borrow_mut().next_cycle_id();
287 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
288
289 (
290 TickCycleHandle {
291 completed: false,
292 ident: ident.clone(),
293 expected_location: Location::id(self),
294 _phantom: PhantomData,
295 },
296 S::create_source_with_initial(ident, initial, self.clone()),
298 )
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use stageleft::q;
305
306 use crate::live_collections::sliced::sliced;
307 use crate::location::Location;
308 use crate::nondet::nondet;
309 use crate::prelude::FlowBuilder;
310
311 #[test]
312 fn sim_atomic_stream() {
313 let flow = FlowBuilder::new();
314 let node = flow.process::<()>();
315 let external = flow.external::<()>();
316
317 let (input_write, write_req) = node.source_external_bincode(&external);
318 let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
319
320 let tick = node.tick();
321 let atomic_write = write_req.atomic(&tick);
322 let current_state = atomic_write.clone().fold(
323 q!(|| 0),
324 q!(|state: &mut i32, v: i32| {
325 *state += v;
326 }),
327 );
328
329 let write_ack = atomic_write.end_atomic().send_bincode_external(&external);
330 let read_response = sliced! {
331 let batch_of_req = use(read_req, nondet!());
332 let latest_singleton = use::atomic(current_state, nondet!());
333 batch_of_req.cross_singleton(latest_singleton)
334 }
335 .send_bincode_external(&external);
336
337 let sim_compiled = flow.sim().compiled();
338 let instances = sim_compiled.exhaustive(async |mut compiled| {
339 let write_send = compiled.connect(&input_write);
340 let read_send = compiled.connect(&input_read);
341 let mut write_ack_recv = compiled.connect(&write_ack);
342 let mut read_response_recv = compiled.connect(&read_response);
343 compiled.launch();
344
345 write_send.send(1);
346 write_ack_recv.assert_yields([1]).await;
347 read_send.send(());
348 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
349 });
350
351 assert_eq!(instances, 1);
352
353 let instances_read_before_write = sim_compiled.exhaustive(async |mut compiled| {
354 let write_send = compiled.connect(&input_write);
355 let read_send = compiled.connect(&input_read);
356 let mut write_ack_recv = compiled.connect(&write_ack);
357 let mut read_response_recv = compiled.connect(&read_response);
358 compiled.launch();
359
360 write_send.send(1);
361 read_send.send(());
362 write_ack_recv.assert_yields([1]).await;
363 let _ = read_response_recv.next().await;
364 });
365
366 assert_eq!(instances_read_before_write, 3); }
368
369 #[test]
370 #[should_panic]
371 fn sim_non_atomic_stream() {
372 let flow = FlowBuilder::new();
374 let node = flow.process::<()>();
375 let external = flow.external::<()>();
376
377 let (input_write, write_req) = node.source_external_bincode(&external);
378 let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
379
380 let current_state = write_req.clone().fold(
381 q!(|| 0),
382 q!(|state: &mut i32, v: i32| {
383 *state += v;
384 }),
385 );
386
387 let write_ack = write_req.send_bincode_external(&external);
388
389 let read_response = sliced! {
390 let batch_of_req = use(read_req, nondet!());
391 let latest_singleton = use(current_state, nondet!());
392 batch_of_req.cross_singleton(latest_singleton)
393 }
394 .send_bincode_external(&external);
395
396 flow.sim().exhaustive(async |mut compiled| {
397 let write_send = compiled.connect(&input_write);
398 let read_send = compiled.connect(&input_read);
399 let mut write_ack_recv = compiled.connect(&write_ack);
400 let mut read_response_recv = compiled.connect(&read_response);
401 compiled.launch();
402
403 write_send.send(1);
404 write_ack_recv.assert_yields([1]).await;
405 read_send.send(());
406
407 if let Some((_, v)) = read_response_recv.next().await {
408 assert_eq!(v, 1);
409 }
410 });
411 }
412}