hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37    /// The [`StreamOrder`] corresponding to this type.
38    const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66    /// The weaker of the two orderings.
67    type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72    type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77    type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82    type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87    type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93    MinRetries<Self, Min = Self>
94    + MinRetries<ExactlyOnce, Min = Self>
95    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97    /// The [`StreamRetry`] corresponding to this type.
98    const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122    /// The weaker of the two retry guarantees.
123    type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128    type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133    type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138    type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143    type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162///   (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166    Type,
167    Loc,
168    Bound: Boundedness = Unbounded,
169    Order: Ordering = TotalOrder,
170    Retry: Retries = ExactlyOnce,
171> {
172    pub(crate) location: Loc,
173    pub(crate) ir_node: RefCell<HydroNode>,
174
175    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179    for Stream<T, L, Unbounded, O, R>
180where
181    L: Location<'a>,
182{
183    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184        Stream {
185            location: stream.location,
186            ir_node: stream.ir_node,
187            _phantom: PhantomData,
188        }
189    }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193    for Stream<T, L, B, NoOrder, R>
194where
195    L: Location<'a>,
196{
197    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198        Stream {
199            location: stream.location,
200            ir_node: stream.ir_node,
201            _phantom: PhantomData,
202        }
203    }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207    for Stream<T, L, B, O, AtLeastOnce>
208where
209    L: Location<'a>,
210{
211    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212        Stream {
213            location: stream.location,
214            ir_node: stream.ir_node,
215            _phantom: PhantomData,
216        }
217    }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        Stream::defer_tick(self)
226    }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230    for Stream<T, Tick<L>, Bounded, O, R>
231where
232    L: Location<'a>,
233{
234    type Location = Tick<L>;
235
236    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237        Stream::new(
238            location.clone(),
239            HydroNode::CycleSource {
240                ident,
241                metadata: location.new_node_metadata(Self::collection_kind()),
242            },
243        )
244    }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248    for Stream<T, Tick<L>, Bounded, O, R>
249where
250    L: Location<'a>,
251{
252    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253        assert_eq!(
254            Location::id(&self.location),
255            expected_location,
256            "locations do not match"
257        );
258        self.location
259            .flow_state()
260            .borrow_mut()
261            .push_root(HydroRoot::CycleSink {
262                ident,
263                input: Box::new(self.ir_node.into_inner()),
264                op_metadata: HydroIrOpMetadata::new(),
265            });
266    }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270    for Stream<T, L, B, O, R>
271where
272    L: Location<'a> + NoTick,
273{
274    type Location = L;
275
276    fn create_source(ident: syn::Ident, location: L) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                ident,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288    for Stream<T, L, B, O, R>
289where
290    L: Location<'a> + NoTick,
291{
292    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293        assert_eq!(
294            Location::id(&self.location),
295            expected_location,
296            "locations do not match"
297        );
298        self.location
299            .flow_state()
300            .borrow_mut()
301            .push_root(HydroRoot::CycleSink {
302                ident,
303                input: Box::new(self.ir_node.into_inner()),
304                op_metadata: HydroIrOpMetadata::new(),
305            });
306    }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311    T: Clone,
312    L: Location<'a>,
313{
314    fn clone(&self) -> Self {
315        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317            *self.ir_node.borrow_mut() = HydroNode::Tee {
318                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319                metadata: self.location.new_node_metadata(Self::collection_kind()),
320            };
321        }
322
323        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324            Stream {
325                location: self.location.clone(),
326                ir_node: HydroNode::Tee {
327                    inner: TeeNode(inner.0.clone()),
328                    metadata: metadata.clone(),
329                }
330                .into(),
331                _phantom: PhantomData,
332            }
333        } else {
334            unreachable!()
335        }
336    }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341    L: Location<'a>,
342{
343    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347        Stream {
348            location,
349            ir_node: RefCell::new(ir_node),
350            _phantom: PhantomData,
351        }
352    }
353
354    /// Returns the [`Location`] where this stream is being materialized.
355    pub fn location(&self) -> &L {
356        &self.location
357    }
358
359    pub(crate) fn collection_kind() -> CollectionKind {
360        CollectionKind::Stream {
361            bound: B::BOUND_KIND,
362            order: O::ORDERING_KIND,
363            retry: R::RETRIES_KIND,
364            element_type: stageleft::quote_type::<T>().into(),
365        }
366    }
367
368    /// Produces a stream based on invoking `f` on each element.
369    /// If you do not want to modify the stream and instead only want to view
370    /// each item use [`Stream::inspect`] instead.
371    ///
372    /// # Example
373    /// ```rust
374    /// # use hydro_lang::prelude::*;
375    /// # use futures::StreamExt;
376    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377    /// let words = process.source_iter(q!(vec!["hello", "world"]));
378    /// words.map(q!(|x| x.to_uppercase()))
379    /// # }, |mut stream| async move {
380    /// # for w in vec!["HELLO", "WORLD"] {
381    /// #     assert_eq!(stream.next().await.unwrap(), w);
382    /// # }
383    /// # }));
384    /// ```
385    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386    where
387        F: Fn(T) -> U + 'a,
388    {
389        let f = f.splice_fn1_ctx(&self.location).into();
390        Stream::new(
391            self.location.clone(),
392            HydroNode::Map {
393                f,
394                input: Box::new(self.ir_node.into_inner()),
395                metadata: self
396                    .location
397                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398            },
399        )
400    }
401
402    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404    /// for the output type `U` must produce items in a **deterministic** order.
405    ///
406    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408    ///
409    /// # Example
410    /// ```rust
411    /// # use hydro_lang::prelude::*;
412    /// # use futures::StreamExt;
413    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
414    /// process
415    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
416    ///     .flat_map_ordered(q!(|x| x))
417    /// # }, |mut stream| async move {
418    /// // 1, 2, 3, 4
419    /// # for w in (1..5) {
420    /// #     assert_eq!(stream.next().await.unwrap(), w);
421    /// # }
422    /// # }));
423    /// ```
424    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
425    where
426        I: IntoIterator<Item = U>,
427        F: Fn(T) -> I + 'a,
428    {
429        let f = f.splice_fn1_ctx(&self.location).into();
430        Stream::new(
431            self.location.clone(),
432            HydroNode::FlatMap {
433                f,
434                input: Box::new(self.ir_node.into_inner()),
435                metadata: self
436                    .location
437                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
438            },
439        )
440    }
441
442    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
443    /// for the output type `U` to produce items in any order.
444    ///
445    /// # Example
446    /// ```rust
447    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
448    /// # use futures::StreamExt;
449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
450    /// process
451    ///     .source_iter(q!(vec![
452    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
453    ///         std::collections::HashSet::from_iter(vec![3, 4]),
454    ///     ]))
455    ///     .flat_map_unordered(q!(|x| x))
456    /// # }, |mut stream| async move {
457    /// // 1, 2, 3, 4, but in no particular order
458    /// # let mut results = Vec::new();
459    /// # for w in (1..5) {
460    /// #     results.push(stream.next().await.unwrap());
461    /// # }
462    /// # results.sort();
463    /// # assert_eq!(results, vec![1, 2, 3, 4]);
464    /// # }));
465    /// ```
466    pub fn flat_map_unordered<U, I, F>(
467        self,
468        f: impl IntoQuotedMut<'a, F, L>,
469    ) -> Stream<U, L, B, NoOrder, R>
470    where
471        I: IntoIterator<Item = U>,
472        F: Fn(T) -> I + 'a,
473    {
474        let f = f.splice_fn1_ctx(&self.location).into();
475        Stream::new(
476            self.location.clone(),
477            HydroNode::FlatMap {
478                f,
479                input: Box::new(self.ir_node.into_inner()),
480                metadata: self
481                    .location
482                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
483            },
484        )
485    }
486
487    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
488    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
489    ///
490    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
491    /// not deterministic, use [`Stream::flatten_unordered`] instead.
492    ///
493    /// ```rust
494    /// # use hydro_lang::prelude::*;
495    /// # use futures::StreamExt;
496    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497    /// process
498    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
499    ///     .flatten_ordered()
500    /// # }, |mut stream| async move {
501    /// // 1, 2, 3, 4
502    /// # for w in (1..5) {
503    /// #     assert_eq!(stream.next().await.unwrap(), w);
504    /// # }
505    /// # }));
506    /// ```
507    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
508    where
509        T: IntoIterator<Item = U>,
510    {
511        self.flat_map_ordered(q!(|d| d))
512    }
513
514    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
515    /// for the element type `T` to produce items in any order.
516    ///
517    /// # Example
518    /// ```rust
519    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520    /// # use futures::StreamExt;
521    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
522    /// process
523    ///     .source_iter(q!(vec![
524    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
525    ///         std::collections::HashSet::from_iter(vec![3, 4]),
526    ///     ]))
527    ///     .flatten_unordered()
528    /// # }, |mut stream| async move {
529    /// // 1, 2, 3, 4, but in no particular order
530    /// # let mut results = Vec::new();
531    /// # for w in (1..5) {
532    /// #     results.push(stream.next().await.unwrap());
533    /// # }
534    /// # results.sort();
535    /// # assert_eq!(results, vec![1, 2, 3, 4]);
536    /// # }));
537    /// ```
538    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
539    where
540        T: IntoIterator<Item = U>,
541    {
542        self.flat_map_unordered(q!(|d| d))
543    }
544
545    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
546    /// `f`, preserving the order of the elements.
547    ///
548    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
549    /// not modify or take ownership of the values. If you need to modify the values while filtering
550    /// use [`Stream::filter_map`] instead.
551    ///
552    /// # Example
553    /// ```rust
554    /// # use hydro_lang::prelude::*;
555    /// # use futures::StreamExt;
556    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
557    /// process
558    ///     .source_iter(q!(vec![1, 2, 3, 4]))
559    ///     .filter(q!(|&x| x > 2))
560    /// # }, |mut stream| async move {
561    /// // 3, 4
562    /// # for w in (3..5) {
563    /// #     assert_eq!(stream.next().await.unwrap(), w);
564    /// # }
565    /// # }));
566    /// ```
567    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
568    where
569        F: Fn(&T) -> bool + 'a,
570    {
571        let f = f.splice_fn1_borrow_ctx(&self.location).into();
572        Stream::new(
573            self.location.clone(),
574            HydroNode::Filter {
575                f,
576                input: Box::new(self.ir_node.into_inner()),
577                metadata: self.location.new_node_metadata(Self::collection_kind()),
578            },
579        )
580    }
581
582    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
583    ///
584    /// # Example
585    /// ```rust
586    /// # use hydro_lang::prelude::*;
587    /// # use futures::StreamExt;
588    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589    /// process
590    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
591    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
592    /// # }, |mut stream| async move {
593    /// // 1, 2
594    /// # for w in (1..3) {
595    /// #     assert_eq!(stream.next().await.unwrap(), w);
596    /// # }
597    /// # }));
598    /// ```
599    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
600    where
601        F: Fn(T) -> Option<U> + 'a,
602    {
603        let f = f.splice_fn1_ctx(&self.location).into();
604        Stream::new(
605            self.location.clone(),
606            HydroNode::FilterMap {
607                f,
608                input: Box::new(self.ir_node.into_inner()),
609                metadata: self
610                    .location
611                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
612            },
613        )
614    }
615
616    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
617    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
618    /// If `other` is an empty [`Optional`], no values will be produced.
619    ///
620    /// # Example
621    /// ```rust
622    /// # use hydro_lang::prelude::*;
623    /// # use futures::StreamExt;
624    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625    /// let tick = process.tick();
626    /// let batch = process
627    ///   .source_iter(q!(vec![1, 2, 3, 4]))
628    ///   .batch(&tick, nondet!(/** test */));
629    /// let count = batch.clone().count(); // `count()` returns a singleton
630    /// batch.cross_singleton(count).all_ticks()
631    /// # }, |mut stream| async move {
632    /// // (1, 4), (2, 4), (3, 4), (4, 4)
633    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
634    /// #     assert_eq!(stream.next().await.unwrap(), w);
635    /// # }
636    /// # }));
637    /// ```
638    pub fn cross_singleton<O2>(
639        self,
640        other: impl Into<Optional<O2, L, Bounded>>,
641    ) -> Stream<(T, O2), L, B, O, R>
642    where
643        O2: Clone,
644    {
645        let other: Optional<O2, L, Bounded> = other.into();
646        check_matching_location(&self.location, &other.location);
647
648        Stream::new(
649            self.location.clone(),
650            HydroNode::CrossSingleton {
651                left: Box::new(self.ir_node.into_inner()),
652                right: Box::new(other.ir_node.into_inner()),
653                metadata: self
654                    .location
655                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
656            },
657        )
658    }
659
660    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
661    ///
662    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
663    /// leader of a cluster.
664    ///
665    /// # Example
666    /// ```rust
667    /// # use hydro_lang::prelude::*;
668    /// # use futures::StreamExt;
669    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
670    /// let tick = process.tick();
671    /// // ticks are lazy by default, forces the second tick to run
672    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
673    ///
674    /// let batch_first_tick = process
675    ///   .source_iter(q!(vec![1, 2, 3, 4]))
676    ///   .batch(&tick, nondet!(/** test */));
677    /// let batch_second_tick = process
678    ///   .source_iter(q!(vec![5, 6, 7, 8]))
679    ///   .batch(&tick, nondet!(/** test */))
680    ///   .defer_tick(); // appears on the second tick
681    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
682    /// batch_first_tick.chain(batch_second_tick)
683    ///   .filter_if_some(some_on_first_tick)
684    ///   .all_ticks()
685    /// # }, |mut stream| async move {
686    /// // [1, 2, 3, 4]
687    /// # for w in vec![1, 2, 3, 4] {
688    /// #     assert_eq!(stream.next().await.unwrap(), w);
689    /// # }
690    /// # }));
691    /// ```
692    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
693        self.cross_singleton(signal.map(q!(|_u| ())))
694            .map(q!(|(d, _signal)| d))
695    }
696
697    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
698    ///
699    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
700    /// some local state.
701    ///
702    /// # Example
703    /// ```rust
704    /// # use hydro_lang::prelude::*;
705    /// # use futures::StreamExt;
706    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707    /// let tick = process.tick();
708    /// // ticks are lazy by default, forces the second tick to run
709    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
710    ///
711    /// let batch_first_tick = process
712    ///   .source_iter(q!(vec![1, 2, 3, 4]))
713    ///   .batch(&tick, nondet!(/** test */));
714    /// let batch_second_tick = process
715    ///   .source_iter(q!(vec![5, 6, 7, 8]))
716    ///   .batch(&tick, nondet!(/** test */))
717    ///   .defer_tick(); // appears on the second tick
718    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
719    /// batch_first_tick.chain(batch_second_tick)
720    ///   .filter_if_none(some_on_first_tick)
721    ///   .all_ticks()
722    /// # }, |mut stream| async move {
723    /// // [5, 6, 7, 8]
724    /// # for w in vec![5, 6, 7, 8] {
725    /// #     assert_eq!(stream.next().await.unwrap(), w);
726    /// # }
727    /// # }));
728    /// ```
729    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
730        self.filter_if_some(
731            other
732                .map(q!(|_| ()))
733                .into_singleton()
734                .filter(q!(|o| o.is_none())),
735        )
736    }
737
738    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
739    /// tupled pairs in a non-deterministic order.
740    ///
741    /// # Example
742    /// ```rust
743    /// # use hydro_lang::prelude::*;
744    /// # use std::collections::HashSet;
745    /// # use futures::StreamExt;
746    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
747    /// let tick = process.tick();
748    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
749    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
750    /// stream1.cross_product(stream2)
751    /// # }, |mut stream| async move {
752    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
753    /// # stream.map(|i| assert!(expected.contains(&i)));
754    /// # }));
755    /// ```
756    pub fn cross_product<T2, O2: Ordering>(
757        self,
758        other: Stream<T2, L, B, O2, R>,
759    ) -> Stream<(T, T2), L, B, NoOrder, R>
760    where
761        T: Clone,
762        T2: Clone,
763    {
764        check_matching_location(&self.location, &other.location);
765
766        Stream::new(
767            self.location.clone(),
768            HydroNode::CrossProduct {
769                left: Box::new(self.ir_node.into_inner()),
770                right: Box::new(other.ir_node.into_inner()),
771                metadata: self
772                    .location
773                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
774            },
775        )
776    }
777
778    /// Takes one stream as input and filters out any duplicate occurrences. The output
779    /// contains all unique values from the input.
780    ///
781    /// # Example
782    /// ```rust
783    /// # use hydro_lang::prelude::*;
784    /// # use futures::StreamExt;
785    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
786    /// let tick = process.tick();
787    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
788    /// # }, |mut stream| async move {
789    /// # for w in vec![1, 2, 3, 4] {
790    /// #     assert_eq!(stream.next().await.unwrap(), w);
791    /// # }
792    /// # }));
793    /// ```
794    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
795    where
796        T: Eq + Hash,
797    {
798        Stream::new(
799            self.location.clone(),
800            HydroNode::Unique {
801                input: Box::new(self.ir_node.into_inner()),
802                metadata: self
803                    .location
804                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
805            },
806        )
807    }
808
809    /// Outputs everything in this stream that is *not* contained in the `other` stream.
810    ///
811    /// The `other` stream must be [`Bounded`], since this function will wait until
812    /// all its elements are available before producing any output.
813    /// # Example
814    /// ```rust
815    /// # use hydro_lang::prelude::*;
816    /// # use futures::StreamExt;
817    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818    /// let tick = process.tick();
819    /// let stream = process
820    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
821    ///   .batch(&tick, nondet!(/** test */));
822    /// let batch = process
823    ///   .source_iter(q!(vec![1, 2]))
824    ///   .batch(&tick, nondet!(/** test */));
825    /// stream.filter_not_in(batch).all_ticks()
826    /// # }, |mut stream| async move {
827    /// # for w in vec![3, 4] {
828    /// #     assert_eq!(stream.next().await.unwrap(), w);
829    /// # }
830    /// # }));
831    /// ```
832    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
833    where
834        T: Eq + Hash,
835    {
836        check_matching_location(&self.location, &other.location);
837
838        Stream::new(
839            self.location.clone(),
840            HydroNode::Difference {
841                pos: Box::new(self.ir_node.into_inner()),
842                neg: Box::new(other.ir_node.into_inner()),
843                metadata: self
844                    .location
845                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
846            },
847        )
848    }
849
850    /// An operator which allows you to "inspect" each element of a stream without
851    /// modifying it. The closure `f` is called on a reference to each item. This is
852    /// mainly useful for debugging, and should not be used to generate side-effects.
853    ///
854    /// # Example
855    /// ```rust
856    /// # use hydro_lang::prelude::*;
857    /// # use futures::StreamExt;
858    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
859    /// let nums = process.source_iter(q!(vec![1, 2]));
860    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
861    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
862    /// # }, |mut stream| async move {
863    /// # for w in vec![1, 2] {
864    /// #     assert_eq!(stream.next().await.unwrap(), w);
865    /// # }
866    /// # }));
867    /// ```
868    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
869    where
870        F: Fn(&T) + 'a,
871    {
872        let f = f.splice_fn1_borrow_ctx(&self.location).into();
873
874        Stream::new(
875            self.location.clone(),
876            HydroNode::Inspect {
877                f,
878                input: Box::new(self.ir_node.into_inner()),
879                metadata: self.location.new_node_metadata(Self::collection_kind()),
880            },
881        )
882    }
883
884    /// An operator which allows you to "name" a `HydroNode`.
885    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
886    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
887        {
888            let mut node = self.ir_node.borrow_mut();
889            let metadata = node.metadata_mut();
890            metadata.tag = Some(name.to_string());
891        }
892        self
893    }
894
895    /// Explicitly "casts" the stream to a type with a different ordering
896    /// guarantee. Useful in unsafe code where the ordering cannot be proven
897    /// by the type-system.
898    ///
899    /// # Non-Determinism
900    /// This function is used as an escape hatch, and any mistakes in the
901    /// provided ordering guarantee will propagate into the guarantees
902    /// for the rest of the program.
903    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
904        if O::ORDERING_KIND == O2::ORDERING_KIND {
905            Stream::new(self.location, self.ir_node.into_inner())
906        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
907            // We can always weaken the ordering guarantee
908            Stream::new(
909                self.location.clone(),
910                HydroNode::Cast {
911                    inner: Box::new(self.ir_node.into_inner()),
912                    metadata: self
913                        .location
914                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
915                },
916            )
917        } else {
918            Stream::new(
919                self.location.clone(),
920                HydroNode::ObserveNonDet {
921                    inner: Box::new(self.ir_node.into_inner()),
922                    trusted: false,
923                    metadata: self
924                        .location
925                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
926                },
927            )
928        }
929    }
930
931    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
932    // is not observable
933    fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
934        if O::ORDERING_KIND == O2::ORDERING_KIND {
935            Stream::new(self.location, self.ir_node.into_inner())
936        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
937            // We can always weaken the ordering guarantee
938            Stream::new(
939                self.location.clone(),
940                HydroNode::Cast {
941                    inner: Box::new(self.ir_node.into_inner()),
942                    metadata: self
943                        .location
944                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
945                },
946            )
947        } else {
948            Stream::new(
949                self.location.clone(),
950                HydroNode::ObserveNonDet {
951                    inner: Box::new(self.ir_node.into_inner()),
952                    trusted: true,
953                    metadata: self
954                        .location
955                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
956                },
957            )
958        }
959    }
960
961    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
962    /// which is always safe because that is the weakest possible guarantee.
963    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
964        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
965        self.assume_ordering::<NoOrder>(nondet)
966    }
967
968    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
969    /// enforcing that `O2` is weaker than the input ordering guarantee.
970    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
971        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
972        self.assume_ordering::<O2>(nondet)
973    }
974
975    /// Explicitly "casts" the stream to a type with a different retries
976    /// guarantee. Useful in unsafe code where the lack of retries cannot
977    /// be proven by the type-system.
978    ///
979    /// # Non-Determinism
980    /// This function is used as an escape hatch, and any mistakes in the
981    /// provided retries guarantee will propagate into the guarantees
982    /// for the rest of the program.
983    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
984        if R::RETRIES_KIND == R2::RETRIES_KIND {
985            Stream::new(self.location, self.ir_node.into_inner())
986        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
987            // We can always weaken the retries guarantee
988            Stream::new(
989                self.location.clone(),
990                HydroNode::Cast {
991                    inner: Box::new(self.ir_node.into_inner()),
992                    metadata: self
993                        .location
994                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
995                },
996            )
997        } else {
998            Stream::new(
999                self.location.clone(),
1000                HydroNode::ObserveNonDet {
1001                    inner: Box::new(self.ir_node.into_inner()),
1002                    trusted: false,
1003                    metadata: self
1004                        .location
1005                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1006                },
1007            )
1008        }
1009    }
1010
1011    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1012    // is not observable
1013    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1014        if R::RETRIES_KIND == R2::RETRIES_KIND {
1015            Stream::new(self.location, self.ir_node.into_inner())
1016        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1017            // We can always weaken the retries guarantee
1018            Stream::new(
1019                self.location.clone(),
1020                HydroNode::Cast {
1021                    inner: Box::new(self.ir_node.into_inner()),
1022                    metadata: self
1023                        .location
1024                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1025                },
1026            )
1027        } else {
1028            Stream::new(
1029                self.location.clone(),
1030                HydroNode::ObserveNonDet {
1031                    inner: Box::new(self.ir_node.into_inner()),
1032                    trusted: true,
1033                    metadata: self
1034                        .location
1035                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1036                },
1037            )
1038        }
1039    }
1040
1041    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1042    /// which is always safe because that is the weakest possible guarantee.
1043    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1044        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1045        self.assume_retries::<AtLeastOnce>(nondet)
1046    }
1047
1048    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1049    /// enforcing that `R2` is weaker than the input retries guarantee.
1050    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1051        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1052        self.assume_retries::<R2>(nondet)
1053    }
1054}
1055
1056impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1057where
1058    L: Location<'a>,
1059{
1060    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1061    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1062    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1063        self.assume_retries(
1064            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1065        )
1066    }
1067}
1068
1069impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1070where
1071    L: Location<'a>,
1072{
1073    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1074    ///
1075    /// # Example
1076    /// ```rust
1077    /// # use hydro_lang::prelude::*;
1078    /// # use futures::StreamExt;
1079    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1081    /// # }, |mut stream| async move {
1082    /// // 1, 2, 3
1083    /// # for w in vec![1, 2, 3] {
1084    /// #     assert_eq!(stream.next().await.unwrap(), w);
1085    /// # }
1086    /// # }));
1087    /// ```
1088    pub fn cloned(self) -> Stream<T, L, B, O, R>
1089    where
1090        T: Clone,
1091    {
1092        self.map(q!(|d| d.clone()))
1093    }
1094}
1095
1096impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1097where
1098    L: Location<'a>,
1099{
1100    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1101    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1102    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1103    ///
1104    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1105    /// and there may be duplicates.
1106    ///
1107    /// # Example
1108    /// ```rust
1109    /// # use hydro_lang::prelude::*;
1110    /// # use futures::StreamExt;
1111    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112    /// let tick = process.tick();
1113    /// let bools = process.source_iter(q!(vec![false, true, false]));
1114    /// let batch = bools.batch(&tick, nondet!(/** test */));
1115    /// batch
1116    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1117    ///     .all_ticks()
1118    /// # }, |mut stream| async move {
1119    /// // true
1120    /// # assert_eq!(stream.next().await.unwrap(), true);
1121    /// # }));
1122    /// ```
1123    pub fn fold_commutative_idempotent<A, I, F>(
1124        self,
1125        init: impl IntoQuotedMut<'a, I, L>,
1126        comb: impl IntoQuotedMut<'a, F, L>,
1127    ) -> Singleton<A, L, B>
1128    where
1129        I: Fn() -> A + 'a,
1130        F: Fn(&mut A, T),
1131    {
1132        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1133        self.assume_ordering(nondet)
1134            .assume_retries(nondet)
1135            .fold(init, comb)
1136    }
1137
1138    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1139    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1140    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1141    /// reference, so that it can be modified in place.
1142    ///
1143    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1144    /// and there may be duplicates.
1145    ///
1146    /// # Example
1147    /// ```rust
1148    /// # use hydro_lang::prelude::*;
1149    /// # use futures::StreamExt;
1150    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1151    /// let tick = process.tick();
1152    /// let bools = process.source_iter(q!(vec![false, true, false]));
1153    /// let batch = bools.batch(&tick, nondet!(/** test */));
1154    /// batch
1155    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1156    ///     .all_ticks()
1157    /// # }, |mut stream| async move {
1158    /// // true
1159    /// # assert_eq!(stream.next().await.unwrap(), true);
1160    /// # }));
1161    /// ```
1162    pub fn reduce_commutative_idempotent<F>(
1163        self,
1164        comb: impl IntoQuotedMut<'a, F, L>,
1165    ) -> Optional<T, L, B>
1166    where
1167        F: Fn(&mut T, T) + 'a,
1168    {
1169        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1170        self.assume_ordering(nondet)
1171            .assume_retries(nondet)
1172            .reduce(comb)
1173    }
1174
1175    // only for internal APIs that have been carefully vetted, will eventually be removed once we
1176    // have algebraic verification of these properties
1177    fn reduce_commutative_idempotent_trusted<F>(
1178        self,
1179        comb: impl IntoQuotedMut<'a, F, L>,
1180    ) -> Optional<T, L, B>
1181    where
1182        F: Fn(&mut T, T) + 'a,
1183    {
1184        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1185
1186        let ordered = if B::BOUNDED {
1187            self.assume_ordering_trusted(nondet)
1188        } else {
1189            self.assume_ordering(nondet) // if unbounded, ordering affects intermediate states
1190        };
1191
1192        ordered
1193            .assume_retries_trusted(nondet) // retries never affect intermediate states
1194            .reduce(comb)
1195    }
1196
1197    /// Computes the maximum element in the stream as an [`Optional`], which
1198    /// will be empty until the first element in the input arrives.
1199    ///
1200    /// # Example
1201    /// ```rust
1202    /// # use hydro_lang::prelude::*;
1203    /// # use futures::StreamExt;
1204    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1205    /// let tick = process.tick();
1206    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1207    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1208    /// batch.max().all_ticks()
1209    /// # }, |mut stream| async move {
1210    /// // 4
1211    /// # assert_eq!(stream.next().await.unwrap(), 4);
1212    /// # }));
1213    /// ```
1214    pub fn max(self) -> Optional<T, L, B>
1215    where
1216        T: Ord,
1217    {
1218        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1219            if new > *curr {
1220                *curr = new;
1221            }
1222        }))
1223    }
1224
1225    /// Computes the minimum element in the stream as an [`Optional`], which
1226    /// will be empty until the first element in the input arrives.
1227    ///
1228    /// # Example
1229    /// ```rust
1230    /// # use hydro_lang::prelude::*;
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233    /// let tick = process.tick();
1234    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1235    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1236    /// batch.min().all_ticks()
1237    /// # }, |mut stream| async move {
1238    /// // 1
1239    /// # assert_eq!(stream.next().await.unwrap(), 1);
1240    /// # }));
1241    /// ```
1242    pub fn min(self) -> Optional<T, L, B>
1243    where
1244        T: Ord,
1245    {
1246        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1247            if new < *curr {
1248                *curr = new;
1249            }
1250        }))
1251    }
1252}
1253
1254impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1255where
1256    L: Location<'a>,
1257{
1258    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1259    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1260    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1261    ///
1262    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1263    ///
1264    /// # Example
1265    /// ```rust
1266    /// # use hydro_lang::prelude::*;
1267    /// # use futures::StreamExt;
1268    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1269    /// let tick = process.tick();
1270    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1271    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1272    /// batch
1273    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1274    ///     .all_ticks()
1275    /// # }, |mut stream| async move {
1276    /// // 10
1277    /// # assert_eq!(stream.next().await.unwrap(), 10);
1278    /// # }));
1279    /// ```
1280    pub fn fold_commutative<A, I, F>(
1281        self,
1282        init: impl IntoQuotedMut<'a, I, L>,
1283        comb: impl IntoQuotedMut<'a, F, L>,
1284    ) -> Singleton<A, L, B>
1285    where
1286        I: Fn() -> A + 'a,
1287        F: Fn(&mut A, T),
1288    {
1289        let nondet = nondet!(/** the combinator function is commutative */);
1290        self.assume_ordering(nondet).fold(init, comb)
1291    }
1292
1293    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1294    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1295    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1296    /// reference, so that it can be modified in place.
1297    ///
1298    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1299    ///
1300    /// # Example
1301    /// ```rust
1302    /// # use hydro_lang::prelude::*;
1303    /// # use futures::StreamExt;
1304    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1305    /// let tick = process.tick();
1306    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1307    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1308    /// batch
1309    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1310    ///     .all_ticks()
1311    /// # }, |mut stream| async move {
1312    /// // 10
1313    /// # assert_eq!(stream.next().await.unwrap(), 10);
1314    /// # }));
1315    /// ```
1316    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1317    where
1318        F: Fn(&mut T, T) + 'a,
1319    {
1320        let nondet = nondet!(/** the combinator function is commutative */);
1321        self.assume_ordering(nondet).reduce(comb)
1322    }
1323
1324    /// Computes the number of elements in the stream as a [`Singleton`].
1325    ///
1326    /// # Example
1327    /// ```rust
1328    /// # use hydro_lang::prelude::*;
1329    /// # use futures::StreamExt;
1330    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1331    /// let tick = process.tick();
1332    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1333    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1334    /// batch.count().all_ticks()
1335    /// # }, |mut stream| async move {
1336    /// // 4
1337    /// # assert_eq!(stream.next().await.unwrap(), 4);
1338    /// # }));
1339    /// ```
1340    pub fn count(self) -> Singleton<usize, L, B> {
1341        self.assume_ordering_trusted(nondet!(
1342            /// Order does not affect eventual count, and also does not affect intermediate states.
1343        ))
1344        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1345    }
1346}
1347
1348impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1349where
1350    L: Location<'a>,
1351{
1352    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1353    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1354    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1355    ///
1356    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1357    ///
1358    /// # Example
1359    /// ```rust
1360    /// # use hydro_lang::prelude::*;
1361    /// # use futures::StreamExt;
1362    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1363    /// let tick = process.tick();
1364    /// let bools = process.source_iter(q!(vec![false, true, false]));
1365    /// let batch = bools.batch(&tick, nondet!(/** test */));
1366    /// batch
1367    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1368    ///     .all_ticks()
1369    /// # }, |mut stream| async move {
1370    /// // true
1371    /// # assert_eq!(stream.next().await.unwrap(), true);
1372    /// # }));
1373    /// ```
1374    pub fn fold_idempotent<A, I, F>(
1375        self,
1376        init: impl IntoQuotedMut<'a, I, L>,
1377        comb: impl IntoQuotedMut<'a, F, L>,
1378    ) -> Singleton<A, L, B>
1379    where
1380        I: Fn() -> A + 'a,
1381        F: Fn(&mut A, T),
1382    {
1383        let nondet = nondet!(/** the combinator function is idempotent */);
1384        self.assume_retries(nondet).fold(init, comb)
1385    }
1386
1387    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1388    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1389    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1390    /// reference, so that it can be modified in place.
1391    ///
1392    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1393    ///
1394    /// # Example
1395    /// ```rust
1396    /// # use hydro_lang::prelude::*;
1397    /// # use futures::StreamExt;
1398    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1399    /// let tick = process.tick();
1400    /// let bools = process.source_iter(q!(vec![false, true, false]));
1401    /// let batch = bools.batch(&tick, nondet!(/** test */));
1402    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1403    /// # }, |mut stream| async move {
1404    /// // true
1405    /// # assert_eq!(stream.next().await.unwrap(), true);
1406    /// # }));
1407    /// ```
1408    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1409    where
1410        F: Fn(&mut T, T) + 'a,
1411    {
1412        let nondet = nondet!(/** the combinator function is idempotent */);
1413        self.assume_retries(nondet).reduce(comb)
1414    }
1415
1416    /// Computes the first element in the stream as an [`Optional`], which
1417    /// will be empty until the first element in the input arrives.
1418    ///
1419    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1420    /// re-ordering of elements may cause the first element to change.
1421    ///
1422    /// # Example
1423    /// ```rust
1424    /// # use hydro_lang::prelude::*;
1425    /// # use futures::StreamExt;
1426    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1427    /// let tick = process.tick();
1428    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1429    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1430    /// batch.first().all_ticks()
1431    /// # }, |mut stream| async move {
1432    /// // 1
1433    /// # assert_eq!(stream.next().await.unwrap(), 1);
1434    /// # }));
1435    /// ```
1436    pub fn first(self) -> Optional<T, L, B> {
1437        self.reduce_idempotent(q!(|_, _| {}))
1438    }
1439
1440    /// Computes the last element in the stream as an [`Optional`], which
1441    /// will be empty until an element in the input arrives.
1442    ///
1443    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1444    /// re-ordering of elements may cause the last element to change.
1445    ///
1446    /// # Example
1447    /// ```rust
1448    /// # use hydro_lang::prelude::*;
1449    /// # use futures::StreamExt;
1450    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1451    /// let tick = process.tick();
1452    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1453    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1454    /// batch.last().all_ticks()
1455    /// # }, |mut stream| async move {
1456    /// // 4
1457    /// # assert_eq!(stream.next().await.unwrap(), 4);
1458    /// # }));
1459    /// ```
1460    pub fn last(self) -> Optional<T, L, B> {
1461        self.reduce_idempotent(q!(|curr, new| *curr = new))
1462    }
1463}
1464
1465impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1466where
1467    L: Location<'a>,
1468{
1469    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1470    ///
1471    /// # Example
1472    /// ```rust
1473    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1474    /// # use futures::StreamExt;
1475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1476    /// let tick = process.tick();
1477    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1478    /// numbers.enumerate()
1479    /// # }, |mut stream| async move {
1480    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1481    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1482    /// #     assert_eq!(stream.next().await.unwrap(), w);
1483    /// # }
1484    /// # }));
1485    /// ```
1486    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1487        Stream::new(
1488            self.location.clone(),
1489            HydroNode::Enumerate {
1490                input: Box::new(self.ir_node.into_inner()),
1491                metadata: self.location.new_node_metadata(Stream::<
1492                    (usize, T),
1493                    L,
1494                    B,
1495                    TotalOrder,
1496                    ExactlyOnce,
1497                >::collection_kind()),
1498            },
1499        )
1500    }
1501
1502    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1503    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1504    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1505    ///
1506    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1507    /// to depend on the order of elements in the stream.
1508    ///
1509    /// # Example
1510    /// ```rust
1511    /// # use hydro_lang::prelude::*;
1512    /// # use futures::StreamExt;
1513    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1514    /// let tick = process.tick();
1515    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1516    /// let batch = words.batch(&tick, nondet!(/** test */));
1517    /// batch
1518    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1519    ///     .all_ticks()
1520    /// # }, |mut stream| async move {
1521    /// // "HELLOWORLD"
1522    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1523    /// # }));
1524    /// ```
1525    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1526        self,
1527        init: impl IntoQuotedMut<'a, I, L>,
1528        comb: impl IntoQuotedMut<'a, F, L>,
1529    ) -> Singleton<A, L, B> {
1530        let init = init.splice_fn0_ctx(&self.location).into();
1531        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1532
1533        let core = HydroNode::Fold {
1534            init,
1535            acc: comb,
1536            input: Box::new(self.ir_node.into_inner()),
1537            metadata: self
1538                .location
1539                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1540        };
1541
1542        Singleton::new(self.location, core)
1543    }
1544
1545    /// Collects all the elements of this stream into a single [`Vec`] element.
1546    ///
1547    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1548    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1549    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1550    /// the vector at an arbitrary point in time.
1551    ///
1552    /// # Example
1553    /// ```rust
1554    /// # use hydro_lang::prelude::*;
1555    /// # use futures::StreamExt;
1556    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1557    /// let tick = process.tick();
1558    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1559    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1560    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1561    /// # }, |mut stream| async move {
1562    /// // [ vec![1, 2, 3, 4] ]
1563    /// # for w in vec![vec![1, 2, 3, 4]] {
1564    /// #     assert_eq!(stream.next().await.unwrap(), w);
1565    /// # }
1566    /// # }));
1567    /// ```
1568    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1569        self.fold(
1570            q!(|| vec![]),
1571            q!(|acc, v| {
1572                acc.push(v);
1573            }),
1574        )
1575    }
1576
1577    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1578    /// and emitting each intermediate result.
1579    ///
1580    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1581    /// containing all intermediate accumulated values. The scan operation can also terminate early
1582    /// by returning `None`.
1583    ///
1584    /// The function takes a mutable reference to the accumulator and the current element, and returns
1585    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1586    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1587    ///
1588    /// # Examples
1589    ///
1590    /// Basic usage - running sum:
1591    /// ```rust
1592    /// # use hydro_lang::prelude::*;
1593    /// # use futures::StreamExt;
1594    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1595    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1596    ///     q!(|| 0),
1597    ///     q!(|acc, x| {
1598    ///         *acc += x;
1599    ///         Some(*acc)
1600    ///     }),
1601    /// )
1602    /// # }, |mut stream| async move {
1603    /// // Output: 1, 3, 6, 10
1604    /// # for w in vec![1, 3, 6, 10] {
1605    /// #     assert_eq!(stream.next().await.unwrap(), w);
1606    /// # }
1607    /// # }));
1608    /// ```
1609    ///
1610    /// Early termination example:
1611    /// ```rust
1612    /// # use hydro_lang::prelude::*;
1613    /// # use futures::StreamExt;
1614    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1615    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1616    ///     q!(|| 1),
1617    ///     q!(|state, x| {
1618    ///         *state = *state * x;
1619    ///         if *state > 6 {
1620    ///             None // Terminate the stream
1621    ///         } else {
1622    ///             Some(-*state)
1623    ///         }
1624    ///     }),
1625    /// )
1626    /// # }, |mut stream| async move {
1627    /// // Output: -1, -2, -6
1628    /// # for w in vec![-1, -2, -6] {
1629    /// #     assert_eq!(stream.next().await.unwrap(), w);
1630    /// # }
1631    /// # }));
1632    /// ```
1633    pub fn scan<A, U, I, F>(
1634        self,
1635        init: impl IntoQuotedMut<'a, I, L>,
1636        f: impl IntoQuotedMut<'a, F, L>,
1637    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1638    where
1639        I: Fn() -> A + 'a,
1640        F: Fn(&mut A, T) -> Option<U> + 'a,
1641    {
1642        let init = init.splice_fn0_ctx(&self.location).into();
1643        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1644
1645        Stream::new(
1646            self.location.clone(),
1647            HydroNode::Scan {
1648                init,
1649                acc: f,
1650                input: Box::new(self.ir_node.into_inner()),
1651                metadata: self.location.new_node_metadata(
1652                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1653                ),
1654            },
1655        )
1656    }
1657
1658    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1659    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1660    /// until the first element in the input arrives.
1661    ///
1662    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1663    /// to depend on the order of elements in the stream.
1664    ///
1665    /// # Example
1666    /// ```rust
1667    /// # use hydro_lang::prelude::*;
1668    /// # use futures::StreamExt;
1669    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1670    /// let tick = process.tick();
1671    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1672    /// let batch = words.batch(&tick, nondet!(/** test */));
1673    /// batch
1674    ///     .map(q!(|x| x.to_string()))
1675    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1676    ///     .all_ticks()
1677    /// # }, |mut stream| async move {
1678    /// // "HELLOWORLD"
1679    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1680    /// # }));
1681    /// ```
1682    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1683        self,
1684        comb: impl IntoQuotedMut<'a, F, L>,
1685    ) -> Optional<T, L, B> {
1686        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1687        let core = HydroNode::Reduce {
1688            f,
1689            input: Box::new(self.ir_node.into_inner()),
1690            metadata: self
1691                .location
1692                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1693        };
1694
1695        Optional::new(self.location, core)
1696    }
1697}
1698
1699impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1700    /// Produces a new stream that interleaves the elements of the two input streams.
1701    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1702    ///
1703    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1704    /// [`Bounded`], you can use [`Stream::chain`] instead.
1705    ///
1706    /// # Example
1707    /// ```rust
1708    /// # use hydro_lang::prelude::*;
1709    /// # use futures::StreamExt;
1710    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1711    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1712    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1713    /// # }, |mut stream| async move {
1714    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1715    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1716    /// #     assert_eq!(stream.next().await.unwrap(), w);
1717    /// # }
1718    /// # }));
1719    /// ```
1720    pub fn interleave<O2: Ordering, R2: Retries>(
1721        self,
1722        other: Stream<T, L, Unbounded, O2, R2>,
1723    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1724    where
1725        R: MinRetries<R2>,
1726    {
1727        Stream::new(
1728            self.location.clone(),
1729            HydroNode::Chain {
1730                first: Box::new(self.ir_node.into_inner()),
1731                second: Box::new(other.ir_node.into_inner()),
1732                metadata: self.location.new_node_metadata(Stream::<
1733                    T,
1734                    L,
1735                    Unbounded,
1736                    NoOrder,
1737                    <R as MinRetries<R2>>::Min,
1738                >::collection_kind()),
1739            },
1740        )
1741    }
1742}
1743
1744impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1745where
1746    L: Location<'a>,
1747{
1748    /// Produces a new stream that emits the input elements in sorted order.
1749    ///
1750    /// The input stream can have any ordering guarantee, but the output stream
1751    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1752    /// elements in the input stream are available, so it requires the input stream
1753    /// to be [`Bounded`].
1754    ///
1755    /// # Example
1756    /// ```rust
1757    /// # use hydro_lang::prelude::*;
1758    /// # use futures::StreamExt;
1759    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1760    /// let tick = process.tick();
1761    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1762    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1763    /// batch.sort().all_ticks()
1764    /// # }, |mut stream| async move {
1765    /// // 1, 2, 3, 4
1766    /// # for w in (1..5) {
1767    /// #     assert_eq!(stream.next().await.unwrap(), w);
1768    /// # }
1769    /// # }));
1770    /// ```
1771    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1772    where
1773        T: Ord,
1774    {
1775        Stream::new(
1776            self.location.clone(),
1777            HydroNode::Sort {
1778                input: Box::new(self.ir_node.into_inner()),
1779                metadata: self
1780                    .location
1781                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1782            },
1783        )
1784    }
1785
1786    /// Produces a new stream that first emits the elements of the `self` stream,
1787    /// and then emits the elements of the `other` stream. The output stream has
1788    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1789    /// [`TotalOrder`] guarantee.
1790    ///
1791    /// Currently, both input streams must be [`Bounded`]. This operator will block
1792    /// on the first stream until all its elements are available. In a future version,
1793    /// we will relax the requirement on the `other` stream.
1794    ///
1795    /// # Example
1796    /// ```rust
1797    /// # use hydro_lang::prelude::*;
1798    /// # use futures::StreamExt;
1799    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1800    /// let tick = process.tick();
1801    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1802    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1803    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1804    /// # }, |mut stream| async move {
1805    /// // 2, 3, 4, 5, 1, 2, 3, 4
1806    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1807    /// #     assert_eq!(stream.next().await.unwrap(), w);
1808    /// # }
1809    /// # }));
1810    /// ```
1811    pub fn chain<O2: Ordering, R2: Retries>(
1812        self,
1813        other: Stream<T, L, Bounded, O2, R2>,
1814    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1815    where
1816        O: MinOrder<O2>,
1817        R: MinRetries<R2>,
1818    {
1819        check_matching_location(&self.location, &other.location);
1820
1821        Stream::new(
1822            self.location.clone(),
1823            HydroNode::Chain {
1824                first: Box::new(self.ir_node.into_inner()),
1825                second: Box::new(other.ir_node.into_inner()),
1826                metadata: self.location.new_node_metadata(Stream::<
1827                    T,
1828                    L,
1829                    Bounded,
1830                    <O as MinOrder<O2>>::Min,
1831                    <R as MinRetries<R2>>::Min,
1832                >::collection_kind()),
1833            },
1834        )
1835    }
1836
1837    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1838    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1839    /// because this is compiled into a nested loop.
1840    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1841        self,
1842        other: Stream<T2, L, Bounded, O2, R>,
1843    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1844    where
1845        T: Clone,
1846        T2: Clone,
1847    {
1848        check_matching_location(&self.location, &other.location);
1849
1850        Stream::new(
1851            self.location.clone(),
1852            HydroNode::CrossProduct {
1853                left: Box::new(self.ir_node.into_inner()),
1854                right: Box::new(other.ir_node.into_inner()),
1855                metadata: self.location.new_node_metadata(Stream::<
1856                    (T, T2),
1857                    L,
1858                    Bounded,
1859                    <O2 as MinOrder<O>>::Min,
1860                    R,
1861                >::collection_kind()),
1862            },
1863        )
1864    }
1865
1866    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1867    /// `self` used as the values for *each* key.
1868    ///
1869    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1870    /// values. For example, it can be used to send the same set of elements to several cluster
1871    /// members, if the membership information is available as a [`KeyedSingleton`].
1872    ///
1873    /// # Example
1874    /// ```rust
1875    /// # use hydro_lang::prelude::*;
1876    /// # use futures::StreamExt;
1877    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1878    /// # let tick = process.tick();
1879    /// let keyed_singleton = // { 1: (), 2: () }
1880    /// # process
1881    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1882    /// #     .into_keyed()
1883    /// #     .batch(&tick, nondet!(/** test */))
1884    /// #     .first();
1885    /// let stream = // [ "a", "b" ]
1886    /// # process
1887    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1888    /// #     .batch(&tick, nondet!(/** test */));
1889    /// stream.repeat_with_keys(keyed_singleton)
1890    /// # .entries().all_ticks()
1891    /// # }, |mut stream| async move {
1892    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1893    /// # let mut results = Vec::new();
1894    /// # for _ in 0..4 {
1895    /// #     results.push(stream.next().await.unwrap());
1896    /// # }
1897    /// # results.sort();
1898    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1899    /// # }));
1900    /// ```
1901    pub fn repeat_with_keys<K, V2>(
1902        self,
1903        keys: KeyedSingleton<K, V2, L, Bounded>,
1904    ) -> KeyedStream<K, T, L, Bounded, O, R>
1905    where
1906        K: Clone,
1907        T: Clone,
1908    {
1909        keys.keys()
1910            .weaken_retries()
1911            .assume_ordering_trusted::<TotalOrder>(
1912                nondet!(/** keyed stream does not depend on ordering of keys */),
1913            )
1914            .cross_product_nested_loop(self)
1915            .into_keyed()
1916    }
1917}
1918
1919impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1920where
1921    L: Location<'a>,
1922{
1923    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1924    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1925    /// by equi-joining the two streams on the key attribute `K`.
1926    ///
1927    /// # Example
1928    /// ```rust
1929    /// # use hydro_lang::prelude::*;
1930    /// # use std::collections::HashSet;
1931    /// # use futures::StreamExt;
1932    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1933    /// let tick = process.tick();
1934    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1935    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1936    /// stream1.join(stream2)
1937    /// # }, |mut stream| async move {
1938    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1939    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1940    /// # stream.map(|i| assert!(expected.contains(&i)));
1941    /// # }));
1942    pub fn join<V2, O2: Ordering, R2: Retries>(
1943        self,
1944        n: Stream<(K, V2), L, B, O2, R2>,
1945    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1946    where
1947        K: Eq + Hash,
1948        R: MinRetries<R2>,
1949    {
1950        check_matching_location(&self.location, &n.location);
1951
1952        Stream::new(
1953            self.location.clone(),
1954            HydroNode::Join {
1955                left: Box::new(self.ir_node.into_inner()),
1956                right: Box::new(n.ir_node.into_inner()),
1957                metadata: self.location.new_node_metadata(Stream::<
1958                    (K, (V1, V2)),
1959                    L,
1960                    B,
1961                    NoOrder,
1962                    <R as MinRetries<R2>>::Min,
1963                >::collection_kind()),
1964            },
1965        )
1966    }
1967
1968    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1969    /// computes the anti-join of the items in the input -- i.e. returns
1970    /// unique items in the first input that do not have a matching key
1971    /// in the second input.
1972    ///
1973    /// # Example
1974    /// ```rust
1975    /// # use hydro_lang::prelude::*;
1976    /// # use futures::StreamExt;
1977    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1978    /// let tick = process.tick();
1979    /// let stream = process
1980    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1981    ///   .batch(&tick, nondet!(/** test */));
1982    /// let batch = process
1983    ///   .source_iter(q!(vec![1, 2]))
1984    ///   .batch(&tick, nondet!(/** test */));
1985    /// stream.anti_join(batch).all_ticks()
1986    /// # }, |mut stream| async move {
1987    /// # for w in vec![(3, 'c'), (4, 'd')] {
1988    /// #     assert_eq!(stream.next().await.unwrap(), w);
1989    /// # }
1990    /// # }));
1991    pub fn anti_join<O2: Ordering, R2: Retries>(
1992        self,
1993        n: Stream<K, L, Bounded, O2, R2>,
1994    ) -> Stream<(K, V1), L, B, O, R>
1995    where
1996        K: Eq + Hash,
1997    {
1998        check_matching_location(&self.location, &n.location);
1999
2000        Stream::new(
2001            self.location.clone(),
2002            HydroNode::AntiJoin {
2003                pos: Box::new(self.ir_node.into_inner()),
2004                neg: Box::new(n.ir_node.into_inner()),
2005                metadata: self
2006                    .location
2007                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2008            },
2009        )
2010    }
2011}
2012
2013impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2014    Stream<(K, V), L, B, O, R>
2015{
2016    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2017    /// is used as the key and the second element is added to the entries associated with that key.
2018    ///
2019    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2020    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2021    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2022    /// total ordering _within_ each group but no ordering _across_ groups.
2023    ///
2024    /// # Example
2025    /// ```rust
2026    /// # use hydro_lang::prelude::*;
2027    /// # use futures::StreamExt;
2028    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2029    /// process
2030    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2031    ///     .into_keyed()
2032    /// #   .entries()
2033    /// # }, |mut stream| async move {
2034    /// // { 1: [2, 3], 2: [4] }
2035    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2036    /// #     assert_eq!(stream.next().await.unwrap(), w);
2037    /// # }
2038    /// # }));
2039    /// ```
2040    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2041        KeyedStream::new(
2042            self.location.clone(),
2043            HydroNode::Cast {
2044                inner: Box::new(self.ir_node.into_inner()),
2045                metadata: self
2046                    .location
2047                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2048            },
2049        )
2050    }
2051}
2052
2053impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2054where
2055    K: Eq + Hash,
2056    L: Location<'a>,
2057{
2058    #[deprecated = "use .into_keyed().fold(...) instead"]
2059    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2060    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2061    /// in the second element are accumulated via the `comb` closure.
2062    ///
2063    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2064    /// to depend on the order of elements in the stream.
2065    ///
2066    /// If the input and output value types are the same and do not require initialization then use
2067    /// [`Stream::reduce_keyed`].
2068    ///
2069    /// # Example
2070    /// ```rust
2071    /// # use hydro_lang::prelude::*;
2072    /// # use futures::StreamExt;
2073    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2074    /// let tick = process.tick();
2075    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2076    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2077    /// batch
2078    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2079    ///     .all_ticks()
2080    /// # }, |mut stream| async move {
2081    /// // (1, 5), (2, 7)
2082    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2083    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2084    /// # }));
2085    /// ```
2086    pub fn fold_keyed<A, I, F>(
2087        self,
2088        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2089        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2090    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2091    where
2092        I: Fn() -> A + 'a,
2093        F: Fn(&mut A, V) + 'a,
2094    {
2095        self.into_keyed().fold(init, comb).entries()
2096    }
2097
2098    #[deprecated = "use .into_keyed().reduce(...) instead"]
2099    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2100    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2101    /// in the second element are accumulated via the `comb` closure.
2102    ///
2103    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2104    /// to depend on the order of elements in the stream.
2105    ///
2106    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2107    ///
2108    /// # Example
2109    /// ```rust
2110    /// # use hydro_lang::prelude::*;
2111    /// # use futures::StreamExt;
2112    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2113    /// let tick = process.tick();
2114    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2115    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2116    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2117    /// # }, |mut stream| async move {
2118    /// // (1, 5), (2, 7)
2119    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2120    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2121    /// # }));
2122    /// ```
2123    pub fn reduce_keyed<F>(
2124        self,
2125        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2126    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2127    where
2128        F: Fn(&mut V, V) + 'a,
2129    {
2130        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2131
2132        Stream::new(
2133            self.location.clone(),
2134            HydroNode::ReduceKeyed {
2135                f,
2136                input: Box::new(self.ir_node.into_inner()),
2137                metadata: self.location.new_node_metadata(Stream::<
2138                    (K, V),
2139                    Tick<L>,
2140                    Bounded,
2141                    NoOrder,
2142                    ExactlyOnce,
2143                >::collection_kind()),
2144            },
2145        )
2146    }
2147}
2148
2149impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2150where
2151    K: Eq + Hash,
2152    L: Location<'a>,
2153{
2154    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2155    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2156    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2157    /// in the second element are accumulated via the `comb` closure.
2158    ///
2159    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2160    /// as there may be non-deterministic duplicates.
2161    ///
2162    /// If the input and output value types are the same and do not require initialization then use
2163    /// [`Stream::reduce_keyed_commutative_idempotent`].
2164    ///
2165    /// # Example
2166    /// ```rust
2167    /// # use hydro_lang::prelude::*;
2168    /// # use futures::StreamExt;
2169    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2170    /// let tick = process.tick();
2171    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2172    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2173    /// batch
2174    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2175    ///     .all_ticks()
2176    /// # }, |mut stream| async move {
2177    /// // (1, false), (2, true)
2178    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2179    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2180    /// # }));
2181    /// ```
2182    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2183        self,
2184        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2185        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2186    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2187    where
2188        I: Fn() -> A + 'a,
2189        F: Fn(&mut A, V) + 'a,
2190    {
2191        self.into_keyed()
2192            .fold_commutative_idempotent(init, comb)
2193            .entries()
2194    }
2195
2196    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2197    /// # Example
2198    /// ```rust
2199    /// # use hydro_lang::prelude::*;
2200    /// # use futures::StreamExt;
2201    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2202    /// let tick = process.tick();
2203    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2204    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2205    /// batch.keys().all_ticks()
2206    /// # }, |mut stream| async move {
2207    /// // 1, 2
2208    /// # assert_eq!(stream.next().await.unwrap(), 1);
2209    /// # assert_eq!(stream.next().await.unwrap(), 2);
2210    /// # }));
2211    /// ```
2212    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2213        self.into_keyed()
2214            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2215            .keys()
2216    }
2217
2218    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2219    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2220    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2221    /// in the second element are accumulated via the `comb` closure.
2222    ///
2223    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2224    /// as there may be non-deterministic duplicates.
2225    ///
2226    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2227    ///
2228    /// # Example
2229    /// ```rust
2230    /// # use hydro_lang::prelude::*;
2231    /// # use futures::StreamExt;
2232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2233    /// let tick = process.tick();
2234    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2235    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2236    /// batch
2237    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2238    ///     .all_ticks()
2239    /// # }, |mut stream| async move {
2240    /// // (1, false), (2, true)
2241    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2242    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2243    /// # }));
2244    /// ```
2245    pub fn reduce_keyed_commutative_idempotent<F>(
2246        self,
2247        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2248    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2249    where
2250        F: Fn(&mut V, V) + 'a,
2251    {
2252        self.into_keyed()
2253            .reduce_commutative_idempotent(comb)
2254            .entries()
2255    }
2256}
2257
2258impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2259where
2260    K: Eq + Hash,
2261    L: Location<'a>,
2262{
2263    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2264    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2265    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2266    /// in the second element are accumulated via the `comb` closure.
2267    ///
2268    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2269    ///
2270    /// If the input and output value types are the same and do not require initialization then use
2271    /// [`Stream::reduce_keyed_commutative`].
2272    ///
2273    /// # Example
2274    /// ```rust
2275    /// # use hydro_lang::prelude::*;
2276    /// # use futures::StreamExt;
2277    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2278    /// let tick = process.tick();
2279    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2280    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2281    /// batch
2282    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2283    ///     .all_ticks()
2284    /// # }, |mut stream| async move {
2285    /// // (1, 5), (2, 7)
2286    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2287    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2288    /// # }));
2289    /// ```
2290    pub fn fold_keyed_commutative<A, I, F>(
2291        self,
2292        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2293        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2294    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2295    where
2296        I: Fn() -> A + 'a,
2297        F: Fn(&mut A, V) + 'a,
2298    {
2299        self.into_keyed().fold_commutative(init, comb).entries()
2300    }
2301
2302    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2303    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2304    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2305    /// in the second element are accumulated via the `comb` closure.
2306    ///
2307    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2308    ///
2309    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2310    ///
2311    /// # Example
2312    /// ```rust
2313    /// # use hydro_lang::prelude::*;
2314    /// # use futures::StreamExt;
2315    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2316    /// let tick = process.tick();
2317    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2318    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2319    /// batch
2320    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2321    ///     .all_ticks()
2322    /// # }, |mut stream| async move {
2323    /// // (1, 5), (2, 7)
2324    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2325    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2326    /// # }));
2327    /// ```
2328    pub fn reduce_keyed_commutative<F>(
2329        self,
2330        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2331    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2332    where
2333        F: Fn(&mut V, V) + 'a,
2334    {
2335        self.into_keyed().reduce_commutative(comb).entries()
2336    }
2337}
2338
2339impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2340where
2341    K: Eq + Hash,
2342    L: Location<'a>,
2343{
2344    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2345    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2346    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2347    /// in the second element are accumulated via the `comb` closure.
2348    ///
2349    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2350    ///
2351    /// If the input and output value types are the same and do not require initialization then use
2352    /// [`Stream::reduce_keyed_idempotent`].
2353    ///
2354    /// # Example
2355    /// ```rust
2356    /// # use hydro_lang::prelude::*;
2357    /// # use futures::StreamExt;
2358    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2359    /// let tick = process.tick();
2360    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2361    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2362    /// batch
2363    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2364    ///     .all_ticks()
2365    /// # }, |mut stream| async move {
2366    /// // (1, false), (2, true)
2367    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2368    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2369    /// # }));
2370    /// ```
2371    pub fn fold_keyed_idempotent<A, I, F>(
2372        self,
2373        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2374        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2375    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2376    where
2377        I: Fn() -> A + 'a,
2378        F: Fn(&mut A, V) + 'a,
2379    {
2380        self.into_keyed().fold_idempotent(init, comb).entries()
2381    }
2382
2383    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2384    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2385    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2386    /// in the second element are accumulated via the `comb` closure.
2387    ///
2388    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2389    ///
2390    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2391    ///
2392    /// # Example
2393    /// ```rust
2394    /// # use hydro_lang::prelude::*;
2395    /// # use futures::StreamExt;
2396    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2397    /// let tick = process.tick();
2398    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2399    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2400    /// batch
2401    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2402    ///     .all_ticks()
2403    /// # }, |mut stream| async move {
2404    /// // (1, false), (2, true)
2405    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2406    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2407    /// # }));
2408    /// ```
2409    pub fn reduce_keyed_idempotent<F>(
2410        self,
2411        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2412    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2413    where
2414        F: Fn(&mut V, V) + 'a,
2415    {
2416        self.into_keyed().reduce_idempotent(comb).entries()
2417    }
2418}
2419
2420impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2421where
2422    L: Location<'a> + NoTick,
2423{
2424    /// Returns a stream corresponding to the latest batch of elements being atomically
2425    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2426    /// the order of the input.
2427    ///
2428    /// # Non-Determinism
2429    /// The batch boundaries are non-deterministic and may change across executions.
2430    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2431        Stream::new(
2432            self.location.clone().tick,
2433            HydroNode::Batch {
2434                inner: Box::new(self.ir_node.into_inner()),
2435                metadata: self
2436                    .location
2437                    .tick
2438                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2439            },
2440        )
2441    }
2442
2443    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2444    /// See [`Stream::atomic`] for more details.
2445    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2446        Stream::new(
2447            self.location.tick.l.clone(),
2448            HydroNode::EndAtomic {
2449                inner: Box::new(self.ir_node.into_inner()),
2450                metadata: self
2451                    .location
2452                    .tick
2453                    .l
2454                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2455            },
2456        )
2457    }
2458}
2459
2460impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2461where
2462    L: Location<'a>,
2463{
2464    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2465    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2466    ///
2467    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2468    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2469    /// argument that declares where the stream will be atomically processed. Batching a stream into
2470    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2471    /// [`Tick`] will introduce asynchrony.
2472    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2473        let out_location = Atomic { tick: tick.clone() };
2474        Stream::new(
2475            out_location.clone(),
2476            HydroNode::BeginAtomic {
2477                inner: Box::new(self.ir_node.into_inner()),
2478                metadata: out_location
2479                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2480            },
2481        )
2482    }
2483
2484    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2485    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2486    /// the order of the input. The output stream will execute in the [`Tick`] that was
2487    /// used to create the atomic section.
2488    ///
2489    /// # Non-Determinism
2490    /// The batch boundaries are non-deterministic and may change across executions.
2491    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2492        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2493        Stream::new(
2494            tick.clone(),
2495            HydroNode::Batch {
2496                inner: Box::new(self.ir_node.into_inner()),
2497                metadata: tick
2498                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2499            },
2500        )
2501    }
2502
2503    /// Given a time interval, returns a stream corresponding to samples taken from the
2504    /// stream roughly at that interval. The output will have elements in the same order
2505    /// as the input, but with arbitrary elements skipped between samples. There is also
2506    /// no guarantee on the exact timing of the samples.
2507    ///
2508    /// # Non-Determinism
2509    /// The output stream is non-deterministic in which elements are sampled, since this
2510    /// is controlled by a clock.
2511    pub fn sample_every(
2512        self,
2513        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2514        nondet: NonDet,
2515    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2516    where
2517        L: NoTick + NoAtomic,
2518    {
2519        let samples = self.location.source_interval(interval, nondet);
2520
2521        let tick = self.location.tick();
2522        self.batch(&tick, nondet)
2523            .filter_if_some(samples.batch(&tick, nondet).first())
2524            .all_ticks()
2525            .weakest_retries()
2526    }
2527
2528    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2529    /// stream has not emitted a value since that duration.
2530    ///
2531    /// # Non-Determinism
2532    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2533    /// samples take place, timeouts may be non-deterministically generated or missed,
2534    /// and the notification of the timeout may be delayed as well. There is also no
2535    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2536    /// detected based on when the next sample is taken.
2537    pub fn timeout(
2538        self,
2539        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2540        nondet: NonDet,
2541    ) -> Optional<(), L, Unbounded>
2542    where
2543        L: NoTick + NoAtomic,
2544    {
2545        let tick = self.location.tick();
2546
2547        let latest_received = self.assume_retries(nondet).fold_commutative(
2548            q!(|| None),
2549            q!(|latest, _| {
2550                *latest = Some(Instant::now());
2551            }),
2552        );
2553
2554        latest_received
2555            .snapshot(&tick, nondet)
2556            .filter_map(q!(move |latest_received| {
2557                if let Some(latest_received) = latest_received {
2558                    if Instant::now().duration_since(latest_received) > duration {
2559                        Some(())
2560                    } else {
2561                        None
2562                    }
2563                } else {
2564                    Some(())
2565                }
2566            }))
2567            .latest()
2568    }
2569}
2570
2571impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2572where
2573    L: Location<'a> + NoTick + NoAtomic,
2574    F: Future<Output = T>,
2575{
2576    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2577    /// Future outputs are produced as available, regardless of input arrival order.
2578    ///
2579    /// # Example
2580    /// ```rust
2581    /// # use std::collections::HashSet;
2582    /// # use futures::StreamExt;
2583    /// # use hydro_lang::prelude::*;
2584    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2585    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2586    ///     .map(q!(|x| async move {
2587    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2588    ///         x
2589    ///     }))
2590    ///     .resolve_futures()
2591    /// #   },
2592    /// #   |mut stream| async move {
2593    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2594    /// #       let mut output = HashSet::new();
2595    /// #       for _ in 1..10 {
2596    /// #           output.insert(stream.next().await.unwrap());
2597    /// #       }
2598    /// #       assert_eq!(
2599    /// #           output,
2600    /// #           HashSet::<i32>::from_iter(1..10)
2601    /// #       );
2602    /// #   },
2603    /// # ));
2604    pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2605        Stream::new(
2606            self.location.clone(),
2607            HydroNode::ResolveFutures {
2608                input: Box::new(self.ir_node.into_inner()),
2609                metadata: self
2610                    .location
2611                    .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2612            },
2613        )
2614    }
2615
2616    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2617    /// Future outputs are produced in the same order as the input stream.
2618    ///
2619    /// # Example
2620    /// ```rust
2621    /// # use std::collections::HashSet;
2622    /// # use futures::StreamExt;
2623    /// # use hydro_lang::prelude::*;
2624    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2625    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2626    ///     .map(q!(|x| async move {
2627    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2628    ///         x
2629    ///     }))
2630    ///     .resolve_futures_ordered()
2631    /// #   },
2632    /// #   |mut stream| async move {
2633    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2634    /// #       let mut output = Vec::new();
2635    /// #       for _ in 1..10 {
2636    /// #           output.push(stream.next().await.unwrap());
2637    /// #       }
2638    /// #       assert_eq!(
2639    /// #           output,
2640    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2641    /// #       );
2642    /// #   },
2643    /// # ));
2644    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2645        Stream::new(
2646            self.location.clone(),
2647            HydroNode::ResolveFuturesOrdered {
2648                input: Box::new(self.ir_node.into_inner()),
2649                metadata: self
2650                    .location
2651                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2652            },
2653        )
2654    }
2655}
2656
2657impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2658where
2659    L: Location<'a> + NoTick,
2660{
2661    /// Executes the provided closure for every element in this stream.
2662    ///
2663    /// Because the closure may have side effects, the stream must have deterministic order
2664    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2665    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2666    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2667    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2668        let f = f.splice_fn1_ctx(&self.location).into();
2669        self.location
2670            .flow_state()
2671            .borrow_mut()
2672            .push_root(HydroRoot::ForEach {
2673                input: Box::new(self.ir_node.into_inner()),
2674                f,
2675                op_metadata: HydroIrOpMetadata::new(),
2676            });
2677    }
2678
2679    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2680    /// TCP socket to some other server. You should _not_ use this API for interacting with
2681    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2682    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2683    /// interaction with asynchronous sinks.
2684    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2685    where
2686        S: 'a + futures::Sink<T> + Unpin,
2687    {
2688        self.location
2689            .flow_state()
2690            .borrow_mut()
2691            .push_root(HydroRoot::DestSink {
2692                sink: sink.splice_typed_ctx(&self.location).into(),
2693                input: Box::new(self.ir_node.into_inner()),
2694                op_metadata: HydroIrOpMetadata::new(),
2695            });
2696    }
2697}
2698
2699impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2700where
2701    L: Location<'a>,
2702{
2703    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2704    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2705    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2706        Stream::new(
2707            self.location.outer().clone(),
2708            HydroNode::YieldConcat {
2709                inner: Box::new(self.ir_node.into_inner()),
2710                metadata: self
2711                    .location
2712                    .outer()
2713                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2714            },
2715        )
2716    }
2717
2718    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2719    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2720    ///
2721    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2722    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2723    /// stream's [`Tick`] context.
2724    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2725        let out_location = Atomic {
2726            tick: self.location.clone(),
2727        };
2728
2729        Stream::new(
2730            out_location.clone(),
2731            HydroNode::YieldConcat {
2732                inner: Box::new(self.ir_node.into_inner()),
2733                metadata: out_location
2734                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2735            },
2736        )
2737    }
2738
2739    /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2740    ///
2741    /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2742    /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2743    /// careful when using this operator, as its memory usage will grow linearly over time since it
2744    /// must store its inputs indefinitely.
2745    ///
2746    /// # Example
2747    /// ```rust
2748    /// # use hydro_lang::prelude::*;
2749    /// # use futures::StreamExt;
2750    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2751    /// let tick = process.tick();
2752    /// // ticks are lazy by default, forces the second tick to run
2753    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2754    ///
2755    /// let batch_first_tick = process
2756    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2757    ///   .batch(&tick, nondet!(/** test */));
2758    /// let batch_second_tick = process
2759    ///   .source_iter(q!(vec![5, 6, 7, 8]))
2760    ///   .batch(&tick, nondet!(/** test */))
2761    ///   .defer_tick(); // appears on the second tick
2762    /// batch_first_tick.chain(batch_second_tick)
2763    ///   .persist()
2764    ///   .all_ticks()
2765    /// # }, |mut stream| async move {
2766    /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2767    /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2768    /// #     assert_eq!(stream.next().await.unwrap(), w);
2769    /// # }
2770    /// # }));
2771    /// ```
2772    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2773    where
2774        T: Clone,
2775    {
2776        Stream::new(
2777            self.location.clone(),
2778            HydroNode::Persist {
2779                inner: Box::new(self.ir_node.into_inner()),
2780                metadata: self
2781                    .location
2782                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2783            },
2784        )
2785    }
2786
2787    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2788    /// always has the elements of `self` at tick `T - 1`.
2789    ///
2790    /// At tick `0`, the output stream is empty, since there is no previous tick.
2791    ///
2792    /// This operator enables stateful iterative processing with ticks, by sending data from one
2793    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2794    ///
2795    /// # Example
2796    /// ```rust
2797    /// # use hydro_lang::prelude::*;
2798    /// # use futures::StreamExt;
2799    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2800    /// let tick = process.tick();
2801    /// // ticks are lazy by default, forces the second tick to run
2802    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2803    ///
2804    /// let batch_first_tick = process
2805    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2806    ///   .batch(&tick, nondet!(/** test */));
2807    /// let batch_second_tick = process
2808    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2809    ///   .batch(&tick, nondet!(/** test */))
2810    ///   .defer_tick(); // appears on the second tick
2811    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2812    ///
2813    /// changes_across_ticks.clone().filter_not_in(
2814    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2815    /// ).all_ticks()
2816    /// # }, |mut stream| async move {
2817    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2818    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2819    /// #     assert_eq!(stream.next().await.unwrap(), w);
2820    /// # }
2821    /// # }));
2822    /// ```
2823    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2824        Stream::new(
2825            self.location.clone(),
2826            HydroNode::DeferTick {
2827                input: Box::new(self.ir_node.into_inner()),
2828                metadata: self
2829                    .location
2830                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2831            },
2832        )
2833    }
2834}
2835
2836#[cfg(test)]
2837mod tests {
2838    #[cfg(feature = "deploy")]
2839    use futures::{SinkExt, StreamExt};
2840    #[cfg(feature = "deploy")]
2841    use hydro_deploy::Deployment;
2842    #[cfg(feature = "deploy")]
2843    use serde::{Deserialize, Serialize};
2844    #[cfg(feature = "deploy")]
2845    use stageleft::q;
2846
2847    use crate::compile::builder::FlowBuilder;
2848    #[cfg(feature = "deploy")]
2849    use crate::live_collections::stream::ExactlyOnce;
2850    use crate::live_collections::stream::{NoOrder, TotalOrder};
2851    use crate::location::Location;
2852    use crate::nondet::nondet;
2853
2854    mod backtrace_chained_ops;
2855
2856    #[cfg(feature = "deploy")]
2857    struct P1 {}
2858    #[cfg(feature = "deploy")]
2859    struct P2 {}
2860
2861    #[cfg(feature = "deploy")]
2862    #[derive(Serialize, Deserialize, Debug)]
2863    struct SendOverNetwork {
2864        n: u32,
2865    }
2866
2867    #[cfg(feature = "deploy")]
2868    #[tokio::test]
2869    async fn first_ten_distributed() {
2870        let mut deployment = Deployment::new();
2871
2872        let flow = FlowBuilder::new();
2873        let first_node = flow.process::<P1>();
2874        let second_node = flow.process::<P2>();
2875        let external = flow.external::<P2>();
2876
2877        let numbers = first_node.source_iter(q!(0..10));
2878        let out_port = numbers
2879            .map(q!(|n| SendOverNetwork { n }))
2880            .send_bincode(&second_node)
2881            .send_bincode_external(&external);
2882
2883        let nodes = flow
2884            .with_process(&first_node, deployment.Localhost())
2885            .with_process(&second_node, deployment.Localhost())
2886            .with_external(&external, deployment.Localhost())
2887            .deploy(&mut deployment);
2888
2889        deployment.deploy().await.unwrap();
2890
2891        let mut external_out = nodes.connect(out_port).await;
2892
2893        deployment.start().await.unwrap();
2894
2895        for i in 0..10 {
2896            assert_eq!(external_out.next().await.unwrap().n, i);
2897        }
2898    }
2899
2900    #[cfg(feature = "deploy")]
2901    #[tokio::test]
2902    async fn first_cardinality() {
2903        let mut deployment = Deployment::new();
2904
2905        let flow = FlowBuilder::new();
2906        let node = flow.process::<()>();
2907        let external = flow.external::<()>();
2908
2909        let node_tick = node.tick();
2910        let count = node_tick
2911            .singleton(q!([1, 2, 3]))
2912            .into_stream()
2913            .flatten_ordered()
2914            .first()
2915            .into_stream()
2916            .count()
2917            .all_ticks()
2918            .send_bincode_external(&external);
2919
2920        let nodes = flow
2921            .with_process(&node, deployment.Localhost())
2922            .with_external(&external, deployment.Localhost())
2923            .deploy(&mut deployment);
2924
2925        deployment.deploy().await.unwrap();
2926
2927        let mut external_out = nodes.connect(count).await;
2928
2929        deployment.start().await.unwrap();
2930
2931        assert_eq!(external_out.next().await.unwrap(), 1);
2932    }
2933
2934    #[cfg(feature = "deploy")]
2935    #[tokio::test]
2936    async fn unbounded_reduce_remembers_state() {
2937        let mut deployment = Deployment::new();
2938
2939        let flow = FlowBuilder::new();
2940        let node = flow.process::<()>();
2941        let external = flow.external::<()>();
2942
2943        let (input_port, input) = node.source_external_bincode(&external);
2944        let out = input
2945            .reduce(q!(|acc, v| *acc += v))
2946            .sample_eager(nondet!(/** test */))
2947            .send_bincode_external(&external);
2948
2949        let nodes = flow
2950            .with_process(&node, deployment.Localhost())
2951            .with_external(&external, deployment.Localhost())
2952            .deploy(&mut deployment);
2953
2954        deployment.deploy().await.unwrap();
2955
2956        let mut external_in = nodes.connect(input_port).await;
2957        let mut external_out = nodes.connect(out).await;
2958
2959        deployment.start().await.unwrap();
2960
2961        external_in.send(1).await.unwrap();
2962        assert_eq!(external_out.next().await.unwrap(), 1);
2963
2964        external_in.send(2).await.unwrap();
2965        assert_eq!(external_out.next().await.unwrap(), 3);
2966    }
2967
2968    #[cfg(feature = "deploy")]
2969    #[tokio::test]
2970    async fn atomic_fold_replays_each_tick() {
2971        let mut deployment = Deployment::new();
2972
2973        let flow = FlowBuilder::new();
2974        let node = flow.process::<()>();
2975        let external = flow.external::<()>();
2976
2977        let (input_port, input) =
2978            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2979        let tick = node.tick();
2980
2981        let out = input
2982            .batch(&tick, nondet!(/** test */))
2983            .cross_singleton(
2984                node.source_iter(q!(vec![1, 2, 3]))
2985                    .atomic(&tick)
2986                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2987                    .snapshot_atomic(nondet!(/** test */)),
2988            )
2989            .all_ticks()
2990            .send_bincode_external(&external);
2991
2992        let nodes = flow
2993            .with_process(&node, deployment.Localhost())
2994            .with_external(&external, deployment.Localhost())
2995            .deploy(&mut deployment);
2996
2997        deployment.deploy().await.unwrap();
2998
2999        let mut external_in = nodes.connect(input_port).await;
3000        let mut external_out = nodes.connect(out).await;
3001
3002        deployment.start().await.unwrap();
3003
3004        external_in.send(1).await.unwrap();
3005        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3006
3007        external_in.send(2).await.unwrap();
3008        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3009    }
3010
3011    #[cfg(feature = "deploy")]
3012    #[tokio::test]
3013    async fn unbounded_scan_remembers_state() {
3014        let mut deployment = Deployment::new();
3015
3016        let flow = FlowBuilder::new();
3017        let node = flow.process::<()>();
3018        let external = flow.external::<()>();
3019
3020        let (input_port, input) = node.source_external_bincode(&external);
3021        let out = input
3022            .scan(
3023                q!(|| 0),
3024                q!(|acc, v| {
3025                    *acc += v;
3026                    Some(*acc)
3027                }),
3028            )
3029            .send_bincode_external(&external);
3030
3031        let nodes = flow
3032            .with_process(&node, deployment.Localhost())
3033            .with_external(&external, deployment.Localhost())
3034            .deploy(&mut deployment);
3035
3036        deployment.deploy().await.unwrap();
3037
3038        let mut external_in = nodes.connect(input_port).await;
3039        let mut external_out = nodes.connect(out).await;
3040
3041        deployment.start().await.unwrap();
3042
3043        external_in.send(1).await.unwrap();
3044        assert_eq!(external_out.next().await.unwrap(), 1);
3045
3046        external_in.send(2).await.unwrap();
3047        assert_eq!(external_out.next().await.unwrap(), 3);
3048    }
3049
3050    #[cfg(feature = "deploy")]
3051    #[tokio::test]
3052    async fn unbounded_enumerate_remembers_state() {
3053        let mut deployment = Deployment::new();
3054
3055        let flow = FlowBuilder::new();
3056        let node = flow.process::<()>();
3057        let external = flow.external::<()>();
3058
3059        let (input_port, input) = node.source_external_bincode(&external);
3060        let out = input.enumerate().send_bincode_external(&external);
3061
3062        let nodes = flow
3063            .with_process(&node, deployment.Localhost())
3064            .with_external(&external, deployment.Localhost())
3065            .deploy(&mut deployment);
3066
3067        deployment.deploy().await.unwrap();
3068
3069        let mut external_in = nodes.connect(input_port).await;
3070        let mut external_out = nodes.connect(out).await;
3071
3072        deployment.start().await.unwrap();
3073
3074        external_in.send(1).await.unwrap();
3075        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3076
3077        external_in.send(2).await.unwrap();
3078        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3079    }
3080
3081    #[cfg(feature = "deploy")]
3082    #[tokio::test]
3083    async fn unbounded_unique_remembers_state() {
3084        let mut deployment = Deployment::new();
3085
3086        let flow = FlowBuilder::new();
3087        let node = flow.process::<()>();
3088        let external = flow.external::<()>();
3089
3090        let (input_port, input) =
3091            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3092        let out = input.unique().send_bincode_external(&external);
3093
3094        let nodes = flow
3095            .with_process(&node, deployment.Localhost())
3096            .with_external(&external, deployment.Localhost())
3097            .deploy(&mut deployment);
3098
3099        deployment.deploy().await.unwrap();
3100
3101        let mut external_in = nodes.connect(input_port).await;
3102        let mut external_out = nodes.connect(out).await;
3103
3104        deployment.start().await.unwrap();
3105
3106        external_in.send(1).await.unwrap();
3107        assert_eq!(external_out.next().await.unwrap(), 1);
3108
3109        external_in.send(2).await.unwrap();
3110        assert_eq!(external_out.next().await.unwrap(), 2);
3111
3112        external_in.send(1).await.unwrap();
3113        external_in.send(3).await.unwrap();
3114        assert_eq!(external_out.next().await.unwrap(), 3);
3115    }
3116
3117    #[test]
3118    #[should_panic]
3119    fn sim_batch_nondet_size() {
3120        let flow = FlowBuilder::new();
3121        let external = flow.external::<()>();
3122        let node = flow.process::<()>();
3123
3124        let (port, input) = node.source_external_bincode::<_, _, TotalOrder, _>(&external);
3125
3126        let tick = node.tick();
3127        let out_port = input
3128            .batch(&tick, nondet!(/** test */))
3129            .count()
3130            .all_ticks()
3131            .send_bincode_external(&external);
3132
3133        flow.sim().exhaustive(async |mut compiled| {
3134            let in_send = compiled.connect(&port);
3135            let mut out_recv = compiled.connect(&out_port);
3136            compiled.launch();
3137
3138            in_send.send(());
3139            in_send.send(());
3140            in_send.send(());
3141
3142            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3143        });
3144    }
3145
3146    #[test]
3147    fn sim_batch_preserves_order() {
3148        let flow = FlowBuilder::new();
3149        let external = flow.external::<()>();
3150        let node = flow.process::<()>();
3151
3152        let (port, input) = node.source_external_bincode(&external);
3153
3154        let tick = node.tick();
3155        let out_port = input
3156            .batch(&tick, nondet!(/** test */))
3157            .all_ticks()
3158            .send_bincode_external(&external);
3159
3160        flow.sim().exhaustive(async |mut compiled| {
3161            let in_send = compiled.connect(&port);
3162            let out_recv = compiled.connect(&out_port);
3163            compiled.launch();
3164
3165            in_send.send(1);
3166            in_send.send(2);
3167            in_send.send(3);
3168
3169            out_recv.assert_yields_only([1, 2, 3]).await;
3170        });
3171    }
3172
3173    #[test]
3174    #[should_panic]
3175    fn sim_batch_unordered_shuffles() {
3176        let flow = FlowBuilder::new();
3177        let external = flow.external::<()>();
3178        let node = flow.process::<()>();
3179
3180        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3181
3182        let tick = node.tick();
3183        let batch = input.batch(&tick, nondet!(/** test */));
3184        let out_port = batch
3185            .clone()
3186            .min()
3187            .zip(batch.max())
3188            .all_ticks()
3189            .send_bincode_external(&external);
3190
3191        flow.sim().exhaustive(async |mut compiled| {
3192            let in_send = compiled.connect(&port);
3193            let out_recv = compiled.connect(&out_port);
3194            compiled.launch();
3195
3196            in_send.send_many_unordered([1, 2, 3]).unwrap();
3197
3198            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3199                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3200            }
3201        });
3202    }
3203
3204    #[test]
3205    fn sim_batch_unordered_shuffles_count() {
3206        let flow = FlowBuilder::new();
3207        let external = flow.external::<()>();
3208        let node = flow.process::<()>();
3209
3210        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3211
3212        let tick = node.tick();
3213        let batch = input.batch(&tick, nondet!(/** test */));
3214        let out_port = batch.all_ticks().send_bincode_external(&external);
3215
3216        let instance_count = flow.sim().exhaustive(async |mut compiled| {
3217            let in_send = compiled.connect(&port);
3218            let out_recv = compiled.connect(&out_port);
3219            compiled.launch();
3220
3221            in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3222            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3223        });
3224
3225        assert_eq!(
3226            instance_count,
3227            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3228        )
3229    }
3230
3231    #[test]
3232    #[ignore = "assume_ordering not yet supported on bounded collections"]
3233    fn sim_observe_order_batched_count() {
3234        let flow = FlowBuilder::new();
3235        let external = flow.external::<()>();
3236        let node = flow.process::<()>();
3237
3238        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3239
3240        let tick = node.tick();
3241        let batch = input.batch(&tick, nondet!(/** test */));
3242        let out_port = batch
3243            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3244            .all_ticks()
3245            .send_bincode_external(&external);
3246
3247        let instance_count = flow.sim().exhaustive(async |mut compiled| {
3248            let in_send = compiled.connect(&port);
3249            let out_recv = compiled.connect(&out_port);
3250            compiled.launch();
3251
3252            in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3253            let _ = out_recv.collect::<Vec<_>>().await;
3254        });
3255
3256        assert_eq!(
3257            instance_count,
3258            192 // 4! * 2^{4 - 1}
3259        )
3260    }
3261
3262    #[test]
3263    fn sim_unordered_count_instance_count() {
3264        let flow = FlowBuilder::new();
3265        let external = flow.external::<()>();
3266        let node = flow.process::<()>();
3267
3268        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3269
3270        let tick = node.tick();
3271        let out_port = input
3272            .count()
3273            .snapshot(&tick, nondet!(/** test */))
3274            .all_ticks()
3275            .send_bincode_external(&external);
3276
3277        let instance_count = flow.sim().exhaustive(async |mut compiled| {
3278            let in_send = compiled.connect(&port);
3279            let out_recv = compiled.connect(&out_port);
3280            compiled.launch();
3281
3282            in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3283            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3284        });
3285
3286        assert_eq!(
3287            instance_count,
3288            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3289        )
3290    }
3291}