hydro_lang/live_collections/
sliced.rs

1//! Utilities for transforming live collections via slicing.
2
3use super::boundedness::{Bounded, Unbounded};
4use crate::live_collections::keyed_singleton::BoundedValue;
5use crate::live_collections::stream::{Ordering, Retries};
6use crate::location::{Location, NoTick, Tick};
7use crate::nondet::NonDet;
8
9#[doc(hidden)]
10pub fn __sliced_wrap_invoke<A, B, O: Unslicable>(
11    a: A,
12    b: B,
13    f: impl FnOnce(A, B) -> O,
14) -> O::Unsliced {
15    let o_slice = f(a, b);
16    o_slice.unslice()
17}
18
19#[doc(hidden)]
20#[macro_export]
21macro_rules! __sliced_parse_uses__ {
22    (
23        @uses [$($uses:tt)*]
24        let $name:ident = use $(::$style:ident)?($expr:expr, $nondet:expr); $($rest:tt)*
25    ) => {
26        $crate::__sliced_parse_uses__!(
27            @uses [$($uses)* { $name, ($($style)?), $expr, $nondet }]
28            $($rest)*
29        )
30    };
31
32    (
33        @uses [{ $first_name:ident, ($($first_style:ident)?), $first:expr, $nondet_first:expr } $({ $rest_name:ident, ($($rest_style:ident)?), $rest:expr, $nondet_expl:expr })*]
34        $($body:tt)*
35    ) => {
36        {
37            let _ = $nondet_first;
38            $(let _ = $nondet_expl;)*
39
40            let __styled = (
41                $($crate::live_collections::sliced::style::$first_style)?($first),
42                $($($crate::live_collections::sliced::style::$rest_style)?($rest),)*
43            );
44
45            let __tick = $crate::live_collections::sliced::Slicable::preferred_tick(&__styled).unwrap_or_else(|| $crate::live_collections::sliced::Slicable::get_location(&__styled.0).tick());
46            let __backtraces = {
47                use $crate::compile::ir::backtrace::__macro_get_backtrace;
48                (
49                    $crate::macro_support::copy_span::copy_span!($first, {
50                        __macro_get_backtrace(1)
51                    }),
52                    $($crate::macro_support::copy_span::copy_span!($rest, {
53                        __macro_get_backtrace(1)
54                    }),)*
55                )
56            };
57            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces, $nondet_first);
58            let (
59                $first_name,
60                $($rest_name,)*
61            ) = __sliced;
62
63            $crate::live_collections::sliced::Unslicable::unslice({
64                $($body)*
65            })
66        }
67    };
68}
69
70#[macro_export]
71/// Transforms a live collection with a computation relying on a slice of another live collection.
72/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
73/// collection, such as joining a stream with the latest values from a singleton.
74///
75/// # Syntax
76/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
77/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
78/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
79/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
80///
81/// ```rust,ignore
82/// let stream = sliced! {
83///     let name1 = use(collection1, nondet!(/** explanation */));
84///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
85///     
86///     // arbitrary statements can follow
87///     let intermediate = name1.map(...);
88///     intermediate.cross_singleton(name2)
89/// };
90/// ```
91///
92/// # Example with two collections
93/// ```rust
94/// # use hydro_lang::prelude::*;
95/// # use futures::StreamExt;
96/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
97/// let singleton = process.singleton(q!(5));
98/// let stream = process.source_iter(q!(vec![1, 2, 3]));
99/// let out: Stream<(i32, i32), _> = sliced! {
100///     let batch_of_req = use(stream, nondet!(/** test */));
101///     let latest_singleton = use(singleton, nondet!(/** test */));
102///
103///     let mapped = batch_of_req.map(q!(|x| x * 2));
104///     mapped.cross_singleton(latest_singleton)
105/// };
106/// # out
107/// # }, |mut stream| async move {
108/// # assert_eq!(stream.next().await.unwrap(), (2, 5));
109/// # assert_eq!(stream.next().await.unwrap(), (4, 5));
110/// # assert_eq!(stream.next().await.unwrap(), (6, 5));
111/// # }));
112/// ```
113macro_rules! __sliced__ {
114    ($($tt:tt)*) => {
115        $crate::__sliced_parse_uses__!(
116            @uses []
117            $($tt)*
118        )
119    };
120}
121
122pub use crate::__sliced__ as sliced;
123
124/// Styles for use with the `sliced!` macro.
125pub mod style {
126    use super::Slicable;
127    use crate::live_collections::boundedness::{Bounded, Unbounded};
128    use crate::live_collections::keyed_singleton::BoundedValue;
129    use crate::live_collections::stream::{Ordering, Retries, Stream};
130    use crate::location::{Location, NoTick, Tick};
131    use crate::nondet::NonDet;
132
133    /// Marks a live collection to be treated atomically during slicing.
134    pub struct Atomic<T>(pub T);
135
136    /// Wraps a live collection to be treated atomically during slicing.
137    pub fn atomic<T>(t: T) -> Atomic<T> {
138        Atomic(t)
139    }
140
141    impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Slicable<'a, L>
142        for Atomic<Stream<T, crate::location::Atomic<L>, Unbounded, O, R>>
143    {
144        type Slice = Stream<T, Tick<L>, Bounded, O, R>;
145        type Backtrace = crate::compile::ir::backtrace::Backtrace;
146
147        fn preferred_tick(&self) -> Option<Tick<L>> {
148            Some(self.0.location().tick().as_regular_tick())
149        }
150
151        fn get_location(&self) -> &L {
152            panic!("Atomic location has no accessible inner location")
153        }
154
155        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
156            assert_eq!(
157                self.0.location().tick().as_regular_tick().id(),
158                tick.id(),
159                "Mismatched tick for atomic slicing"
160            );
161
162            let out = self.0.batch_atomic(nondet);
163            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
164            out
165        }
166    }
167
168    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
169        for Atomic<crate::live_collections::Singleton<T, crate::location::Atomic<L>, Unbounded>>
170    {
171        type Slice = crate::live_collections::Singleton<T, Tick<L>, Bounded>;
172        type Backtrace = crate::compile::ir::backtrace::Backtrace;
173
174        fn preferred_tick(&self) -> Option<Tick<L>> {
175            Some(self.0.location().tick().as_regular_tick())
176        }
177
178        fn get_location(&self) -> &L {
179            panic!("Atomic location has no accessible inner location")
180        }
181
182        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
183            assert_eq!(
184                self.0.location().tick().as_regular_tick().id(),
185                tick.id(),
186                "Mismatched tick for atomic slicing"
187            );
188
189            let out = self.0.snapshot_atomic(nondet);
190            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
191            out
192        }
193    }
194
195    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
196        for Atomic<crate::live_collections::Optional<T, crate::location::Atomic<L>, Unbounded>>
197    {
198        type Slice = crate::live_collections::Optional<T, Tick<L>, Bounded>;
199        type Backtrace = crate::compile::ir::backtrace::Backtrace;
200
201        fn preferred_tick(&self) -> Option<Tick<L>> {
202            Some(self.0.location().tick().as_regular_tick())
203        }
204
205        fn get_location(&self) -> &L {
206            panic!("Atomic location has no accessible inner location")
207        }
208
209        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
210            assert_eq!(
211                self.0.location().tick().as_regular_tick().id(),
212                tick.id(),
213                "Mismatched tick for atomic slicing"
214            );
215
216            let out = self.0.snapshot_atomic(nondet);
217            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
218            out
219        }
220    }
221
222    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
223        for Atomic<
224            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, Unbounded>,
225        >
226    {
227        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
228        type Backtrace = crate::compile::ir::backtrace::Backtrace;
229
230        fn preferred_tick(&self) -> Option<Tick<L>> {
231            Some(self.0.location().tick().as_regular_tick())
232        }
233
234        fn get_location(&self) -> &L {
235            panic!("Atomic location has no accessible inner location")
236        }
237
238        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
239            assert_eq!(
240                self.0.location().tick().as_regular_tick().id(),
241                tick.id(),
242                "Mismatched tick for atomic slicing"
243            );
244
245            let out = self.0.snapshot_atomic(nondet);
246            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
247            out
248        }
249    }
250
251    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
252        for Atomic<
253            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, BoundedValue>,
254        >
255    {
256        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
257        type Backtrace = crate::compile::ir::backtrace::Backtrace;
258
259        fn preferred_tick(&self) -> Option<Tick<L>> {
260            Some(self.0.location().tick().as_regular_tick())
261        }
262
263        fn get_location(&self) -> &L {
264            panic!("Atomic location has no accessible inner location")
265        }
266
267        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
268            assert_eq!(
269                self.0.location().tick().as_regular_tick().id(),
270                tick.id(),
271                "Mismatched tick for atomic slicing"
272            );
273
274            let out = self.0.batch_atomic(nondet);
275            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
276            out
277        }
278    }
279}
280
281/// A trait for live collections which can be sliced into bounded versions at a tick.
282pub trait Slicable<'a, L: Location<'a>> {
283    /// The sliced version of this live collection.
284    type Slice;
285
286    /// The type of backtrace associated with this slice.
287    type Backtrace;
288
289    /// Gets the preferred tick to slice at. Used for atomic slicing.
290    fn preferred_tick(&self) -> Option<Tick<L>>;
291
292    /// Gets the location associated with this live collection.
293    fn get_location(&self) -> &L;
294
295    /// Slices this live collection at the given tick.
296    ///
297    /// # Non-Determinism
298    /// Slicing a live collection may involve non-determinism, such as choosing which messages
299    /// to include in a batch. The provided `nondet` parameter should be used to explain the impact
300    /// of this non-determinism on the program's behavior.
301    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice;
302}
303
304/// A trait for live collections which can be yielded out of a slice back into their original form.
305pub trait Unslicable {
306    /// The unsliced version of this live collection.
307    type Unsliced;
308
309    /// Unslices a sliced live collection back into its original form.
310    fn unslice(self) -> Self::Unsliced;
311}
312
313impl<'a, L: Location<'a>> Slicable<'a, L> for () {
314    type Slice = ();
315    type Backtrace = ();
316
317    fn get_location(&self) -> &L {
318        unreachable!()
319    }
320
321    fn preferred_tick(&self) -> Option<Tick<L>> {
322        None
323    }
324
325    fn slice(self, _tick: &Tick<L>, __backtrace: Self::Backtrace, _nondet: NonDet) -> Self::Slice {}
326}
327
328impl Unslicable for () {
329    type Unsliced = ();
330
331    fn unslice(self) -> Self::Unsliced {}
332}
333
334macro_rules! impl_slicable_for_tuple {
335    ($($T:ident, $T_bt:ident, $idx:tt),*) => {
336        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),*> Slicable<'a, L> for ($($T,)*) {
337            type Slice = ($($T::Slice,)*);
338            type Backtrace = ($($T::Backtrace,)*);
339
340            fn get_location(&self) -> &L {
341                self.0.get_location()
342            }
343
344            fn preferred_tick(&self) -> Option<Tick<L>> {
345                let mut preferred: Option<Tick<L>> = None;
346                $(
347                    if let Some(tick) = self.$idx.preferred_tick() {
348                        preferred = Some(match preferred {
349                            Some(current) => {
350                                if $crate::location::Location::id(&current) == $crate::location::Location::id(&tick) {
351                                    current
352                                } else {
353                                    panic!("Mismatched preferred ticks for sliced collections")
354                                }
355                            },
356                            None => tick,
357                        });
358                    }
359                )*
360                preferred
361            }
362
363            #[expect(non_snake_case, reason = "macro codegen")]
364            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
365                let ($($T,)*) = self;
366                let ($($T_bt,)*) = backtrace;
367                ($($T.slice(tick, $T_bt, nondet),)*)
368            }
369        }
370    };
371}
372
373#[cfg(stageleft_runtime)]
374impl_slicable_for_tuple!(S1, S1_bt, 0);
375#[cfg(stageleft_runtime)]
376impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
377#[cfg(stageleft_runtime)]
378impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
379#[cfg(stageleft_runtime)]
380impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
381#[cfg(stageleft_runtime)]
382impl_slicable_for_tuple!(
383    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
384); // 5 slices ought to be enough for anyone
385
386impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Slicable<'a, L>
387    for super::Stream<T, L, Unbounded, O, R>
388{
389    type Slice = super::Stream<T, Tick<L>, Bounded, O, R>;
390    type Backtrace = crate::compile::ir::backtrace::Backtrace;
391
392    fn get_location(&self) -> &L {
393        self.location()
394    }
395
396    fn preferred_tick(&self) -> Option<Tick<L>> {
397        None
398    }
399
400    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
401        let out = self.batch(tick, nondet);
402        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
403        out
404    }
405}
406
407impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
408    for super::Stream<T, Tick<L>, Bounded, O, R>
409{
410    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
411
412    fn unslice(self) -> Self::Unsliced {
413        self.all_ticks()
414    }
415}
416
417impl<'a, T, L: Location<'a>> Slicable<'a, L> for super::Singleton<T, L, Unbounded> {
418    type Slice = super::Singleton<T, Tick<L>, Bounded>;
419    type Backtrace = crate::compile::ir::backtrace::Backtrace;
420
421    fn get_location(&self) -> &L {
422        self.location()
423    }
424
425    fn preferred_tick(&self) -> Option<Tick<L>> {
426        None
427    }
428
429    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
430        let out = self.snapshot(tick, nondet);
431        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
432        out
433    }
434}
435
436impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
437    type Unsliced = super::Singleton<T, L, Unbounded>;
438
439    fn unslice(self) -> Self::Unsliced {
440        self.latest()
441    }
442}
443
444impl<'a, T, L: Location<'a>> Slicable<'a, L> for super::Optional<T, L, Unbounded> {
445    type Slice = super::Optional<T, Tick<L>, Bounded>;
446    type Backtrace = crate::compile::ir::backtrace::Backtrace;
447
448    fn get_location(&self) -> &L {
449        self.location()
450    }
451
452    fn preferred_tick(&self) -> Option<Tick<L>> {
453        None
454    }
455
456    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
457        let out = self.snapshot(tick, nondet);
458        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
459        out
460    }
461}
462
463impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
464    type Unsliced = super::Optional<T, L, Unbounded>;
465
466    fn unslice(self) -> Self::Unsliced {
467        self.latest()
468    }
469}
470
471impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Slicable<'a, L>
472    for super::KeyedStream<K, V, L, Unbounded, O, R>
473{
474    type Slice = super::KeyedStream<K, V, Tick<L>, Bounded, O, R>;
475    type Backtrace = crate::compile::ir::backtrace::Backtrace;
476
477    fn get_location(&self) -> &L {
478        self.location()
479    }
480
481    fn preferred_tick(&self) -> Option<Tick<L>> {
482        None
483    }
484
485    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
486        let out = self.batch(tick, nondet);
487        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
488        out
489    }
490}
491
492impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
493    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
494{
495    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
496
497    fn unslice(self) -> Self::Unsliced {
498        self.all_ticks()
499    }
500}
501
502impl<'a, K, V, L: Location<'a>> Slicable<'a, L> for super::KeyedSingleton<K, V, L, Unbounded> {
503    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
504    type Backtrace = crate::compile::ir::backtrace::Backtrace;
505
506    fn get_location(&self) -> &L {
507        self.location()
508    }
509
510    fn preferred_tick(&self) -> Option<Tick<L>> {
511        None
512    }
513
514    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
515        let out = self.snapshot(tick, nondet);
516        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
517        out
518    }
519}
520
521impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
522    for super::KeyedSingleton<K, V, L, BoundedValue>
523{
524    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
525    type Backtrace = crate::compile::ir::backtrace::Backtrace;
526
527    fn get_location(&self) -> &L {
528        self.location()
529    }
530
531    fn preferred_tick(&self) -> Option<Tick<L>> {
532        None
533    }
534
535    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
536        let out = self.batch(tick, nondet);
537        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
538        out
539    }
540}