hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37 /// The [`StreamOrder`] corresponding to this type.
38 const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66 /// The weaker of the two orderings.
67 type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72 type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77 type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82 type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87 type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93 MinRetries<Self, Min = Self>
94 + MinRetries<ExactlyOnce, Min = Self>
95 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97 /// The [`StreamRetry`] corresponding to this type.
98 const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122 /// The weaker of the two retry guarantees.
123 type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128 type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133 type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138 type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143 type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162/// (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166 Type,
167 Loc,
168 Bound: Boundedness = Unbounded,
169 Order: Ordering = TotalOrder,
170 Retry: Retries = ExactlyOnce,
171> {
172 pub(crate) location: Loc,
173 pub(crate) ir_node: RefCell<HydroNode>,
174
175 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179 for Stream<T, L, Unbounded, O, R>
180where
181 L: Location<'a>,
182{
183 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184 Stream {
185 location: stream.location,
186 ir_node: stream.ir_node,
187 _phantom: PhantomData,
188 }
189 }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193 for Stream<T, L, B, NoOrder, R>
194where
195 L: Location<'a>,
196{
197 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198 Stream {
199 location: stream.location,
200 ir_node: stream.ir_node,
201 _phantom: PhantomData,
202 }
203 }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207 for Stream<T, L, B, O, AtLeastOnce>
208where
209 L: Location<'a>,
210{
211 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212 Stream {
213 location: stream.location,
214 ir_node: stream.ir_node,
215 _phantom: PhantomData,
216 }
217 }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 Stream::defer_tick(self)
226 }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230 for Stream<T, Tick<L>, Bounded, O, R>
231where
232 L: Location<'a>,
233{
234 type Location = Tick<L>;
235
236 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237 Stream::new(
238 location.clone(),
239 HydroNode::CycleSource {
240 ident,
241 metadata: location.new_node_metadata(Self::collection_kind()),
242 },
243 )
244 }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248 for Stream<T, Tick<L>, Bounded, O, R>
249where
250 L: Location<'a>,
251{
252 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253 assert_eq!(
254 Location::id(&self.location),
255 expected_location,
256 "locations do not match"
257 );
258 self.location
259 .flow_state()
260 .borrow_mut()
261 .push_root(HydroRoot::CycleSink {
262 ident,
263 input: Box::new(self.ir_node.into_inner()),
264 op_metadata: HydroIrOpMetadata::new(),
265 });
266 }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270 for Stream<T, L, B, O, R>
271where
272 L: Location<'a> + NoTick,
273{
274 type Location = L;
275
276 fn create_source(ident: syn::Ident, location: L) -> Self {
277 Stream::new(
278 location.clone(),
279 HydroNode::CycleSource {
280 ident,
281 metadata: location.new_node_metadata(Self::collection_kind()),
282 },
283 )
284 }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288 for Stream<T, L, B, O, R>
289where
290 L: Location<'a> + NoTick,
291{
292 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293 assert_eq!(
294 Location::id(&self.location),
295 expected_location,
296 "locations do not match"
297 );
298 self.location
299 .flow_state()
300 .borrow_mut()
301 .push_root(HydroRoot::CycleSink {
302 ident,
303 input: Box::new(self.ir_node.into_inner()),
304 op_metadata: HydroIrOpMetadata::new(),
305 });
306 }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311 T: Clone,
312 L: Location<'a>,
313{
314 fn clone(&self) -> Self {
315 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317 *self.ir_node.borrow_mut() = HydroNode::Tee {
318 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319 metadata: self.location.new_node_metadata(Self::collection_kind()),
320 };
321 }
322
323 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324 Stream {
325 location: self.location.clone(),
326 ir_node: HydroNode::Tee {
327 inner: TeeNode(inner.0.clone()),
328 metadata: metadata.clone(),
329 }
330 .into(),
331 _phantom: PhantomData,
332 }
333 } else {
334 unreachable!()
335 }
336 }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341 L: Location<'a>,
342{
343 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347 Stream {
348 location,
349 ir_node: RefCell::new(ir_node),
350 _phantom: PhantomData,
351 }
352 }
353
354 /// Returns the [`Location`] where this stream is being materialized.
355 pub fn location(&self) -> &L {
356 &self.location
357 }
358
359 pub(crate) fn collection_kind() -> CollectionKind {
360 CollectionKind::Stream {
361 bound: B::BOUND_KIND,
362 order: O::ORDERING_KIND,
363 retry: R::RETRIES_KIND,
364 element_type: stageleft::quote_type::<T>().into(),
365 }
366 }
367
368 /// Produces a stream based on invoking `f` on each element.
369 /// If you do not want to modify the stream and instead only want to view
370 /// each item use [`Stream::inspect`] instead.
371 ///
372 /// # Example
373 /// ```rust
374 /// # use hydro_lang::prelude::*;
375 /// # use futures::StreamExt;
376 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377 /// let words = process.source_iter(q!(vec!["hello", "world"]));
378 /// words.map(q!(|x| x.to_uppercase()))
379 /// # }, |mut stream| async move {
380 /// # for w in vec!["HELLO", "WORLD"] {
381 /// # assert_eq!(stream.next().await.unwrap(), w);
382 /// # }
383 /// # }));
384 /// ```
385 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386 where
387 F: Fn(T) -> U + 'a,
388 {
389 let f = f.splice_fn1_ctx(&self.location).into();
390 Stream::new(
391 self.location.clone(),
392 HydroNode::Map {
393 f,
394 input: Box::new(self.ir_node.into_inner()),
395 metadata: self
396 .location
397 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398 },
399 )
400 }
401
402 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404 /// for the output type `U` must produce items in a **deterministic** order.
405 ///
406 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408 ///
409 /// # Example
410 /// ```rust
411 /// # use hydro_lang::prelude::*;
412 /// # use futures::StreamExt;
413 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
414 /// process
415 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
416 /// .flat_map_ordered(q!(|x| x))
417 /// # }, |mut stream| async move {
418 /// // 1, 2, 3, 4
419 /// # for w in (1..5) {
420 /// # assert_eq!(stream.next().await.unwrap(), w);
421 /// # }
422 /// # }));
423 /// ```
424 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
425 where
426 I: IntoIterator<Item = U>,
427 F: Fn(T) -> I + 'a,
428 {
429 let f = f.splice_fn1_ctx(&self.location).into();
430 Stream::new(
431 self.location.clone(),
432 HydroNode::FlatMap {
433 f,
434 input: Box::new(self.ir_node.into_inner()),
435 metadata: self
436 .location
437 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
438 },
439 )
440 }
441
442 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
443 /// for the output type `U` to produce items in any order.
444 ///
445 /// # Example
446 /// ```rust
447 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
448 /// # use futures::StreamExt;
449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
450 /// process
451 /// .source_iter(q!(vec![
452 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
453 /// std::collections::HashSet::from_iter(vec![3, 4]),
454 /// ]))
455 /// .flat_map_unordered(q!(|x| x))
456 /// # }, |mut stream| async move {
457 /// // 1, 2, 3, 4, but in no particular order
458 /// # let mut results = Vec::new();
459 /// # for w in (1..5) {
460 /// # results.push(stream.next().await.unwrap());
461 /// # }
462 /// # results.sort();
463 /// # assert_eq!(results, vec![1, 2, 3, 4]);
464 /// # }));
465 /// ```
466 pub fn flat_map_unordered<U, I, F>(
467 self,
468 f: impl IntoQuotedMut<'a, F, L>,
469 ) -> Stream<U, L, B, NoOrder, R>
470 where
471 I: IntoIterator<Item = U>,
472 F: Fn(T) -> I + 'a,
473 {
474 let f = f.splice_fn1_ctx(&self.location).into();
475 Stream::new(
476 self.location.clone(),
477 HydroNode::FlatMap {
478 f,
479 input: Box::new(self.ir_node.into_inner()),
480 metadata: self
481 .location
482 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
483 },
484 )
485 }
486
487 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
488 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
489 ///
490 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
491 /// not deterministic, use [`Stream::flatten_unordered`] instead.
492 ///
493 /// ```rust
494 /// # use hydro_lang::prelude::*;
495 /// # use futures::StreamExt;
496 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497 /// process
498 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
499 /// .flatten_ordered()
500 /// # }, |mut stream| async move {
501 /// // 1, 2, 3, 4
502 /// # for w in (1..5) {
503 /// # assert_eq!(stream.next().await.unwrap(), w);
504 /// # }
505 /// # }));
506 /// ```
507 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
508 where
509 T: IntoIterator<Item = U>,
510 {
511 self.flat_map_ordered(q!(|d| d))
512 }
513
514 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
515 /// for the element type `T` to produce items in any order.
516 ///
517 /// # Example
518 /// ```rust
519 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520 /// # use futures::StreamExt;
521 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
522 /// process
523 /// .source_iter(q!(vec![
524 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
525 /// std::collections::HashSet::from_iter(vec![3, 4]),
526 /// ]))
527 /// .flatten_unordered()
528 /// # }, |mut stream| async move {
529 /// // 1, 2, 3, 4, but in no particular order
530 /// # let mut results = Vec::new();
531 /// # for w in (1..5) {
532 /// # results.push(stream.next().await.unwrap());
533 /// # }
534 /// # results.sort();
535 /// # assert_eq!(results, vec![1, 2, 3, 4]);
536 /// # }));
537 /// ```
538 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
539 where
540 T: IntoIterator<Item = U>,
541 {
542 self.flat_map_unordered(q!(|d| d))
543 }
544
545 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
546 /// `f`, preserving the order of the elements.
547 ///
548 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
549 /// not modify or take ownership of the values. If you need to modify the values while filtering
550 /// use [`Stream::filter_map`] instead.
551 ///
552 /// # Example
553 /// ```rust
554 /// # use hydro_lang::prelude::*;
555 /// # use futures::StreamExt;
556 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
557 /// process
558 /// .source_iter(q!(vec![1, 2, 3, 4]))
559 /// .filter(q!(|&x| x > 2))
560 /// # }, |mut stream| async move {
561 /// // 3, 4
562 /// # for w in (3..5) {
563 /// # assert_eq!(stream.next().await.unwrap(), w);
564 /// # }
565 /// # }));
566 /// ```
567 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
568 where
569 F: Fn(&T) -> bool + 'a,
570 {
571 let f = f.splice_fn1_borrow_ctx(&self.location).into();
572 Stream::new(
573 self.location.clone(),
574 HydroNode::Filter {
575 f,
576 input: Box::new(self.ir_node.into_inner()),
577 metadata: self.location.new_node_metadata(Self::collection_kind()),
578 },
579 )
580 }
581
582 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
583 ///
584 /// # Example
585 /// ```rust
586 /// # use hydro_lang::prelude::*;
587 /// # use futures::StreamExt;
588 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589 /// process
590 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
591 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
592 /// # }, |mut stream| async move {
593 /// // 1, 2
594 /// # for w in (1..3) {
595 /// # assert_eq!(stream.next().await.unwrap(), w);
596 /// # }
597 /// # }));
598 /// ```
599 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
600 where
601 F: Fn(T) -> Option<U> + 'a,
602 {
603 let f = f.splice_fn1_ctx(&self.location).into();
604 Stream::new(
605 self.location.clone(),
606 HydroNode::FilterMap {
607 f,
608 input: Box::new(self.ir_node.into_inner()),
609 metadata: self
610 .location
611 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
612 },
613 )
614 }
615
616 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
617 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
618 /// If `other` is an empty [`Optional`], no values will be produced.
619 ///
620 /// # Example
621 /// ```rust
622 /// # use hydro_lang::prelude::*;
623 /// # use futures::StreamExt;
624 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625 /// let tick = process.tick();
626 /// let batch = process
627 /// .source_iter(q!(vec![1, 2, 3, 4]))
628 /// .batch(&tick, nondet!(/** test */));
629 /// let count = batch.clone().count(); // `count()` returns a singleton
630 /// batch.cross_singleton(count).all_ticks()
631 /// # }, |mut stream| async move {
632 /// // (1, 4), (2, 4), (3, 4), (4, 4)
633 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
634 /// # assert_eq!(stream.next().await.unwrap(), w);
635 /// # }
636 /// # }));
637 /// ```
638 pub fn cross_singleton<O2>(
639 self,
640 other: impl Into<Optional<O2, L, Bounded>>,
641 ) -> Stream<(T, O2), L, B, O, R>
642 where
643 O2: Clone,
644 {
645 let other: Optional<O2, L, Bounded> = other.into();
646 check_matching_location(&self.location, &other.location);
647
648 Stream::new(
649 self.location.clone(),
650 HydroNode::CrossSingleton {
651 left: Box::new(self.ir_node.into_inner()),
652 right: Box::new(other.ir_node.into_inner()),
653 metadata: self
654 .location
655 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
656 },
657 )
658 }
659
660 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
661 ///
662 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
663 /// leader of a cluster.
664 ///
665 /// # Example
666 /// ```rust
667 /// # use hydro_lang::prelude::*;
668 /// # use futures::StreamExt;
669 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
670 /// let tick = process.tick();
671 /// // ticks are lazy by default, forces the second tick to run
672 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
673 ///
674 /// let batch_first_tick = process
675 /// .source_iter(q!(vec![1, 2, 3, 4]))
676 /// .batch(&tick, nondet!(/** test */));
677 /// let batch_second_tick = process
678 /// .source_iter(q!(vec![5, 6, 7, 8]))
679 /// .batch(&tick, nondet!(/** test */))
680 /// .defer_tick(); // appears on the second tick
681 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
682 /// batch_first_tick.chain(batch_second_tick)
683 /// .filter_if_some(some_on_first_tick)
684 /// .all_ticks()
685 /// # }, |mut stream| async move {
686 /// // [1, 2, 3, 4]
687 /// # for w in vec![1, 2, 3, 4] {
688 /// # assert_eq!(stream.next().await.unwrap(), w);
689 /// # }
690 /// # }));
691 /// ```
692 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
693 self.cross_singleton(signal.map(q!(|_u| ())))
694 .map(q!(|(d, _signal)| d))
695 }
696
697 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
698 ///
699 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
700 /// some local state.
701 ///
702 /// # Example
703 /// ```rust
704 /// # use hydro_lang::prelude::*;
705 /// # use futures::StreamExt;
706 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707 /// let tick = process.tick();
708 /// // ticks are lazy by default, forces the second tick to run
709 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
710 ///
711 /// let batch_first_tick = process
712 /// .source_iter(q!(vec![1, 2, 3, 4]))
713 /// .batch(&tick, nondet!(/** test */));
714 /// let batch_second_tick = process
715 /// .source_iter(q!(vec![5, 6, 7, 8]))
716 /// .batch(&tick, nondet!(/** test */))
717 /// .defer_tick(); // appears on the second tick
718 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
719 /// batch_first_tick.chain(batch_second_tick)
720 /// .filter_if_none(some_on_first_tick)
721 /// .all_ticks()
722 /// # }, |mut stream| async move {
723 /// // [5, 6, 7, 8]
724 /// # for w in vec![5, 6, 7, 8] {
725 /// # assert_eq!(stream.next().await.unwrap(), w);
726 /// # }
727 /// # }));
728 /// ```
729 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
730 self.filter_if_some(
731 other
732 .map(q!(|_| ()))
733 .into_singleton()
734 .filter(q!(|o| o.is_none())),
735 )
736 }
737
738 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
739 /// tupled pairs in a non-deterministic order.
740 ///
741 /// # Example
742 /// ```rust
743 /// # use hydro_lang::prelude::*;
744 /// # use std::collections::HashSet;
745 /// # use futures::StreamExt;
746 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
747 /// let tick = process.tick();
748 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
749 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
750 /// stream1.cross_product(stream2)
751 /// # }, |mut stream| async move {
752 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
753 /// # stream.map(|i| assert!(expected.contains(&i)));
754 /// # }));
755 /// ```
756 pub fn cross_product<T2, O2: Ordering>(
757 self,
758 other: Stream<T2, L, B, O2, R>,
759 ) -> Stream<(T, T2), L, B, NoOrder, R>
760 where
761 T: Clone,
762 T2: Clone,
763 {
764 check_matching_location(&self.location, &other.location);
765
766 Stream::new(
767 self.location.clone(),
768 HydroNode::CrossProduct {
769 left: Box::new(self.ir_node.into_inner()),
770 right: Box::new(other.ir_node.into_inner()),
771 metadata: self
772 .location
773 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
774 },
775 )
776 }
777
778 /// Takes one stream as input and filters out any duplicate occurrences. The output
779 /// contains all unique values from the input.
780 ///
781 /// # Example
782 /// ```rust
783 /// # use hydro_lang::prelude::*;
784 /// # use futures::StreamExt;
785 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
786 /// let tick = process.tick();
787 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
788 /// # }, |mut stream| async move {
789 /// # for w in vec![1, 2, 3, 4] {
790 /// # assert_eq!(stream.next().await.unwrap(), w);
791 /// # }
792 /// # }));
793 /// ```
794 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
795 where
796 T: Eq + Hash,
797 {
798 Stream::new(
799 self.location.clone(),
800 HydroNode::Unique {
801 input: Box::new(self.ir_node.into_inner()),
802 metadata: self
803 .location
804 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
805 },
806 )
807 }
808
809 /// Outputs everything in this stream that is *not* contained in the `other` stream.
810 ///
811 /// The `other` stream must be [`Bounded`], since this function will wait until
812 /// all its elements are available before producing any output.
813 /// # Example
814 /// ```rust
815 /// # use hydro_lang::prelude::*;
816 /// # use futures::StreamExt;
817 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818 /// let tick = process.tick();
819 /// let stream = process
820 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
821 /// .batch(&tick, nondet!(/** test */));
822 /// let batch = process
823 /// .source_iter(q!(vec![1, 2]))
824 /// .batch(&tick, nondet!(/** test */));
825 /// stream.filter_not_in(batch).all_ticks()
826 /// # }, |mut stream| async move {
827 /// # for w in vec![3, 4] {
828 /// # assert_eq!(stream.next().await.unwrap(), w);
829 /// # }
830 /// # }));
831 /// ```
832 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
833 where
834 T: Eq + Hash,
835 {
836 check_matching_location(&self.location, &other.location);
837
838 Stream::new(
839 self.location.clone(),
840 HydroNode::Difference {
841 pos: Box::new(self.ir_node.into_inner()),
842 neg: Box::new(other.ir_node.into_inner()),
843 metadata: self
844 .location
845 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
846 },
847 )
848 }
849
850 /// An operator which allows you to "inspect" each element of a stream without
851 /// modifying it. The closure `f` is called on a reference to each item. This is
852 /// mainly useful for debugging, and should not be used to generate side-effects.
853 ///
854 /// # Example
855 /// ```rust
856 /// # use hydro_lang::prelude::*;
857 /// # use futures::StreamExt;
858 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
859 /// let nums = process.source_iter(q!(vec![1, 2]));
860 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
861 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
862 /// # }, |mut stream| async move {
863 /// # for w in vec![1, 2] {
864 /// # assert_eq!(stream.next().await.unwrap(), w);
865 /// # }
866 /// # }));
867 /// ```
868 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
869 where
870 F: Fn(&T) + 'a,
871 {
872 let f = f.splice_fn1_borrow_ctx(&self.location).into();
873
874 Stream::new(
875 self.location.clone(),
876 HydroNode::Inspect {
877 f,
878 input: Box::new(self.ir_node.into_inner()),
879 metadata: self.location.new_node_metadata(Self::collection_kind()),
880 },
881 )
882 }
883
884 /// An operator which allows you to "name" a `HydroNode`.
885 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
886 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
887 {
888 let mut node = self.ir_node.borrow_mut();
889 let metadata = node.metadata_mut();
890 metadata.tag = Some(name.to_string());
891 }
892 self
893 }
894
895 /// Explicitly "casts" the stream to a type with a different ordering
896 /// guarantee. Useful in unsafe code where the ordering cannot be proven
897 /// by the type-system.
898 ///
899 /// # Non-Determinism
900 /// This function is used as an escape hatch, and any mistakes in the
901 /// provided ordering guarantee will propagate into the guarantees
902 /// for the rest of the program.
903 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
904 if O::ORDERING_KIND == O2::ORDERING_KIND {
905 Stream::new(self.location, self.ir_node.into_inner())
906 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
907 // We can always weaken the ordering guarantee
908 Stream::new(
909 self.location.clone(),
910 HydroNode::Cast {
911 inner: Box::new(self.ir_node.into_inner()),
912 metadata: self
913 .location
914 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
915 },
916 )
917 } else {
918 Stream::new(
919 self.location.clone(),
920 HydroNode::ObserveNonDet {
921 inner: Box::new(self.ir_node.into_inner()),
922 trusted: false,
923 metadata: self
924 .location
925 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
926 },
927 )
928 }
929 }
930
931 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
932 // is not observable
933 fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
934 if O::ORDERING_KIND == O2::ORDERING_KIND {
935 Stream::new(self.location, self.ir_node.into_inner())
936 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
937 // We can always weaken the ordering guarantee
938 Stream::new(
939 self.location.clone(),
940 HydroNode::Cast {
941 inner: Box::new(self.ir_node.into_inner()),
942 metadata: self
943 .location
944 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
945 },
946 )
947 } else {
948 Stream::new(
949 self.location.clone(),
950 HydroNode::ObserveNonDet {
951 inner: Box::new(self.ir_node.into_inner()),
952 trusted: true,
953 metadata: self
954 .location
955 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
956 },
957 )
958 }
959 }
960
961 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
962 /// which is always safe because that is the weakest possible guarantee.
963 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
964 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
965 self.assume_ordering::<NoOrder>(nondet)
966 }
967
968 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
969 /// enforcing that `O2` is weaker than the input ordering guarantee.
970 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
971 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
972 self.assume_ordering::<O2>(nondet)
973 }
974
975 /// Explicitly "casts" the stream to a type with a different retries
976 /// guarantee. Useful in unsafe code where the lack of retries cannot
977 /// be proven by the type-system.
978 ///
979 /// # Non-Determinism
980 /// This function is used as an escape hatch, and any mistakes in the
981 /// provided retries guarantee will propagate into the guarantees
982 /// for the rest of the program.
983 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
984 if R::RETRIES_KIND == R2::RETRIES_KIND {
985 Stream::new(self.location, self.ir_node.into_inner())
986 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
987 // We can always weaken the retries guarantee
988 Stream::new(
989 self.location.clone(),
990 HydroNode::Cast {
991 inner: Box::new(self.ir_node.into_inner()),
992 metadata: self
993 .location
994 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
995 },
996 )
997 } else {
998 Stream::new(
999 self.location.clone(),
1000 HydroNode::ObserveNonDet {
1001 inner: Box::new(self.ir_node.into_inner()),
1002 trusted: false,
1003 metadata: self
1004 .location
1005 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1006 },
1007 )
1008 }
1009 }
1010
1011 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1012 // is not observable
1013 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1014 if R::RETRIES_KIND == R2::RETRIES_KIND {
1015 Stream::new(self.location, self.ir_node.into_inner())
1016 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1017 // We can always weaken the retries guarantee
1018 Stream::new(
1019 self.location.clone(),
1020 HydroNode::Cast {
1021 inner: Box::new(self.ir_node.into_inner()),
1022 metadata: self
1023 .location
1024 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1025 },
1026 )
1027 } else {
1028 Stream::new(
1029 self.location.clone(),
1030 HydroNode::ObserveNonDet {
1031 inner: Box::new(self.ir_node.into_inner()),
1032 trusted: true,
1033 metadata: self
1034 .location
1035 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1036 },
1037 )
1038 }
1039 }
1040
1041 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1042 /// which is always safe because that is the weakest possible guarantee.
1043 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1044 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1045 self.assume_retries::<AtLeastOnce>(nondet)
1046 }
1047
1048 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1049 /// enforcing that `R2` is weaker than the input retries guarantee.
1050 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1051 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1052 self.assume_retries::<R2>(nondet)
1053 }
1054}
1055
1056impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1057where
1058 L: Location<'a>,
1059{
1060 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1061 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1062 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1063 self.assume_retries(
1064 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1065 )
1066 }
1067}
1068
1069impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1070where
1071 L: Location<'a>,
1072{
1073 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1074 ///
1075 /// # Example
1076 /// ```rust
1077 /// # use hydro_lang::prelude::*;
1078 /// # use futures::StreamExt;
1079 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1081 /// # }, |mut stream| async move {
1082 /// // 1, 2, 3
1083 /// # for w in vec![1, 2, 3] {
1084 /// # assert_eq!(stream.next().await.unwrap(), w);
1085 /// # }
1086 /// # }));
1087 /// ```
1088 pub fn cloned(self) -> Stream<T, L, B, O, R>
1089 where
1090 T: Clone,
1091 {
1092 self.map(q!(|d| d.clone()))
1093 }
1094}
1095
1096impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1097where
1098 L: Location<'a>,
1099{
1100 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1101 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1102 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1103 ///
1104 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1105 /// and there may be duplicates.
1106 ///
1107 /// # Example
1108 /// ```rust
1109 /// # use hydro_lang::prelude::*;
1110 /// # use futures::StreamExt;
1111 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112 /// let tick = process.tick();
1113 /// let bools = process.source_iter(q!(vec![false, true, false]));
1114 /// let batch = bools.batch(&tick, nondet!(/** test */));
1115 /// batch
1116 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1117 /// .all_ticks()
1118 /// # }, |mut stream| async move {
1119 /// // true
1120 /// # assert_eq!(stream.next().await.unwrap(), true);
1121 /// # }));
1122 /// ```
1123 pub fn fold_commutative_idempotent<A, I, F>(
1124 self,
1125 init: impl IntoQuotedMut<'a, I, L>,
1126 comb: impl IntoQuotedMut<'a, F, L>,
1127 ) -> Singleton<A, L, B>
1128 where
1129 I: Fn() -> A + 'a,
1130 F: Fn(&mut A, T),
1131 {
1132 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1133 self.assume_ordering(nondet)
1134 .assume_retries(nondet)
1135 .fold(init, comb)
1136 }
1137
1138 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1139 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1140 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1141 /// reference, so that it can be modified in place.
1142 ///
1143 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1144 /// and there may be duplicates.
1145 ///
1146 /// # Example
1147 /// ```rust
1148 /// # use hydro_lang::prelude::*;
1149 /// # use futures::StreamExt;
1150 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1151 /// let tick = process.tick();
1152 /// let bools = process.source_iter(q!(vec![false, true, false]));
1153 /// let batch = bools.batch(&tick, nondet!(/** test */));
1154 /// batch
1155 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1156 /// .all_ticks()
1157 /// # }, |mut stream| async move {
1158 /// // true
1159 /// # assert_eq!(stream.next().await.unwrap(), true);
1160 /// # }));
1161 /// ```
1162 pub fn reduce_commutative_idempotent<F>(
1163 self,
1164 comb: impl IntoQuotedMut<'a, F, L>,
1165 ) -> Optional<T, L, B>
1166 where
1167 F: Fn(&mut T, T) + 'a,
1168 {
1169 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1170 self.assume_ordering(nondet)
1171 .assume_retries(nondet)
1172 .reduce(comb)
1173 }
1174
1175 // only for internal APIs that have been carefully vetted, will eventually be removed once we
1176 // have algebraic verification of these properties
1177 fn reduce_commutative_idempotent_trusted<F>(
1178 self,
1179 comb: impl IntoQuotedMut<'a, F, L>,
1180 ) -> Optional<T, L, B>
1181 where
1182 F: Fn(&mut T, T) + 'a,
1183 {
1184 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1185
1186 let ordered = if B::BOUNDED {
1187 self.assume_ordering_trusted(nondet)
1188 } else {
1189 self.assume_ordering(nondet) // if unbounded, ordering affects intermediate states
1190 };
1191
1192 ordered
1193 .assume_retries_trusted(nondet) // retries never affect intermediate states
1194 .reduce(comb)
1195 }
1196
1197 /// Computes the maximum element in the stream as an [`Optional`], which
1198 /// will be empty until the first element in the input arrives.
1199 ///
1200 /// # Example
1201 /// ```rust
1202 /// # use hydro_lang::prelude::*;
1203 /// # use futures::StreamExt;
1204 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1205 /// let tick = process.tick();
1206 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1207 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1208 /// batch.max().all_ticks()
1209 /// # }, |mut stream| async move {
1210 /// // 4
1211 /// # assert_eq!(stream.next().await.unwrap(), 4);
1212 /// # }));
1213 /// ```
1214 pub fn max(self) -> Optional<T, L, B>
1215 where
1216 T: Ord,
1217 {
1218 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1219 if new > *curr {
1220 *curr = new;
1221 }
1222 }))
1223 }
1224
1225 /// Computes the minimum element in the stream as an [`Optional`], which
1226 /// will be empty until the first element in the input arrives.
1227 ///
1228 /// # Example
1229 /// ```rust
1230 /// # use hydro_lang::prelude::*;
1231 /// # use futures::StreamExt;
1232 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233 /// let tick = process.tick();
1234 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1235 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1236 /// batch.min().all_ticks()
1237 /// # }, |mut stream| async move {
1238 /// // 1
1239 /// # assert_eq!(stream.next().await.unwrap(), 1);
1240 /// # }));
1241 /// ```
1242 pub fn min(self) -> Optional<T, L, B>
1243 where
1244 T: Ord,
1245 {
1246 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1247 if new < *curr {
1248 *curr = new;
1249 }
1250 }))
1251 }
1252}
1253
1254impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1255where
1256 L: Location<'a>,
1257{
1258 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1259 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1260 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1261 ///
1262 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1263 ///
1264 /// # Example
1265 /// ```rust
1266 /// # use hydro_lang::prelude::*;
1267 /// # use futures::StreamExt;
1268 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1269 /// let tick = process.tick();
1270 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1271 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1272 /// batch
1273 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1274 /// .all_ticks()
1275 /// # }, |mut stream| async move {
1276 /// // 10
1277 /// # assert_eq!(stream.next().await.unwrap(), 10);
1278 /// # }));
1279 /// ```
1280 pub fn fold_commutative<A, I, F>(
1281 self,
1282 init: impl IntoQuotedMut<'a, I, L>,
1283 comb: impl IntoQuotedMut<'a, F, L>,
1284 ) -> Singleton<A, L, B>
1285 where
1286 I: Fn() -> A + 'a,
1287 F: Fn(&mut A, T),
1288 {
1289 let nondet = nondet!(/** the combinator function is commutative */);
1290 self.assume_ordering(nondet).fold(init, comb)
1291 }
1292
1293 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1294 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1295 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1296 /// reference, so that it can be modified in place.
1297 ///
1298 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1299 ///
1300 /// # Example
1301 /// ```rust
1302 /// # use hydro_lang::prelude::*;
1303 /// # use futures::StreamExt;
1304 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1305 /// let tick = process.tick();
1306 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1307 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1308 /// batch
1309 /// .reduce_commutative(q!(|curr, new| *curr += new))
1310 /// .all_ticks()
1311 /// # }, |mut stream| async move {
1312 /// // 10
1313 /// # assert_eq!(stream.next().await.unwrap(), 10);
1314 /// # }));
1315 /// ```
1316 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1317 where
1318 F: Fn(&mut T, T) + 'a,
1319 {
1320 let nondet = nondet!(/** the combinator function is commutative */);
1321 self.assume_ordering(nondet).reduce(comb)
1322 }
1323
1324 /// Computes the number of elements in the stream as a [`Singleton`].
1325 ///
1326 /// # Example
1327 /// ```rust
1328 /// # use hydro_lang::prelude::*;
1329 /// # use futures::StreamExt;
1330 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1331 /// let tick = process.tick();
1332 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1333 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1334 /// batch.count().all_ticks()
1335 /// # }, |mut stream| async move {
1336 /// // 4
1337 /// # assert_eq!(stream.next().await.unwrap(), 4);
1338 /// # }));
1339 /// ```
1340 pub fn count(self) -> Singleton<usize, L, B> {
1341 self.assume_ordering_trusted(nondet!(
1342 /// Order does not affect eventual count, and also does not affect intermediate states.
1343 ))
1344 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1345 }
1346}
1347
1348impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1349where
1350 L: Location<'a>,
1351{
1352 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1353 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1354 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1355 ///
1356 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1357 ///
1358 /// # Example
1359 /// ```rust
1360 /// # use hydro_lang::prelude::*;
1361 /// # use futures::StreamExt;
1362 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1363 /// let tick = process.tick();
1364 /// let bools = process.source_iter(q!(vec![false, true, false]));
1365 /// let batch = bools.batch(&tick, nondet!(/** test */));
1366 /// batch
1367 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1368 /// .all_ticks()
1369 /// # }, |mut stream| async move {
1370 /// // true
1371 /// # assert_eq!(stream.next().await.unwrap(), true);
1372 /// # }));
1373 /// ```
1374 pub fn fold_idempotent<A, I, F>(
1375 self,
1376 init: impl IntoQuotedMut<'a, I, L>,
1377 comb: impl IntoQuotedMut<'a, F, L>,
1378 ) -> Singleton<A, L, B>
1379 where
1380 I: Fn() -> A + 'a,
1381 F: Fn(&mut A, T),
1382 {
1383 let nondet = nondet!(/** the combinator function is idempotent */);
1384 self.assume_retries(nondet).fold(init, comb)
1385 }
1386
1387 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1388 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1389 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1390 /// reference, so that it can be modified in place.
1391 ///
1392 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1393 ///
1394 /// # Example
1395 /// ```rust
1396 /// # use hydro_lang::prelude::*;
1397 /// # use futures::StreamExt;
1398 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1399 /// let tick = process.tick();
1400 /// let bools = process.source_iter(q!(vec![false, true, false]));
1401 /// let batch = bools.batch(&tick, nondet!(/** test */));
1402 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1403 /// # }, |mut stream| async move {
1404 /// // true
1405 /// # assert_eq!(stream.next().await.unwrap(), true);
1406 /// # }));
1407 /// ```
1408 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1409 where
1410 F: Fn(&mut T, T) + 'a,
1411 {
1412 let nondet = nondet!(/** the combinator function is idempotent */);
1413 self.assume_retries(nondet).reduce(comb)
1414 }
1415
1416 /// Computes the first element in the stream as an [`Optional`], which
1417 /// will be empty until the first element in the input arrives.
1418 ///
1419 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1420 /// re-ordering of elements may cause the first element to change.
1421 ///
1422 /// # Example
1423 /// ```rust
1424 /// # use hydro_lang::prelude::*;
1425 /// # use futures::StreamExt;
1426 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1427 /// let tick = process.tick();
1428 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1429 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1430 /// batch.first().all_ticks()
1431 /// # }, |mut stream| async move {
1432 /// // 1
1433 /// # assert_eq!(stream.next().await.unwrap(), 1);
1434 /// # }));
1435 /// ```
1436 pub fn first(self) -> Optional<T, L, B> {
1437 self.reduce_idempotent(q!(|_, _| {}))
1438 }
1439
1440 /// Computes the last element in the stream as an [`Optional`], which
1441 /// will be empty until an element in the input arrives.
1442 ///
1443 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1444 /// re-ordering of elements may cause the last element to change.
1445 ///
1446 /// # Example
1447 /// ```rust
1448 /// # use hydro_lang::prelude::*;
1449 /// # use futures::StreamExt;
1450 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1451 /// let tick = process.tick();
1452 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1453 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1454 /// batch.last().all_ticks()
1455 /// # }, |mut stream| async move {
1456 /// // 4
1457 /// # assert_eq!(stream.next().await.unwrap(), 4);
1458 /// # }));
1459 /// ```
1460 pub fn last(self) -> Optional<T, L, B> {
1461 self.reduce_idempotent(q!(|curr, new| *curr = new))
1462 }
1463}
1464
1465impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1466where
1467 L: Location<'a>,
1468{
1469 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1470 ///
1471 /// # Example
1472 /// ```rust
1473 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1474 /// # use futures::StreamExt;
1475 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1476 /// let tick = process.tick();
1477 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1478 /// numbers.enumerate()
1479 /// # }, |mut stream| async move {
1480 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1481 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1482 /// # assert_eq!(stream.next().await.unwrap(), w);
1483 /// # }
1484 /// # }));
1485 /// ```
1486 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1487 Stream::new(
1488 self.location.clone(),
1489 HydroNode::Enumerate {
1490 input: Box::new(self.ir_node.into_inner()),
1491 metadata: self.location.new_node_metadata(Stream::<
1492 (usize, T),
1493 L,
1494 B,
1495 TotalOrder,
1496 ExactlyOnce,
1497 >::collection_kind()),
1498 },
1499 )
1500 }
1501
1502 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1503 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1504 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1505 ///
1506 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1507 /// to depend on the order of elements in the stream.
1508 ///
1509 /// # Example
1510 /// ```rust
1511 /// # use hydro_lang::prelude::*;
1512 /// # use futures::StreamExt;
1513 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1514 /// let tick = process.tick();
1515 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1516 /// let batch = words.batch(&tick, nondet!(/** test */));
1517 /// batch
1518 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1519 /// .all_ticks()
1520 /// # }, |mut stream| async move {
1521 /// // "HELLOWORLD"
1522 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1523 /// # }));
1524 /// ```
1525 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1526 self,
1527 init: impl IntoQuotedMut<'a, I, L>,
1528 comb: impl IntoQuotedMut<'a, F, L>,
1529 ) -> Singleton<A, L, B> {
1530 let init = init.splice_fn0_ctx(&self.location).into();
1531 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1532
1533 let core = HydroNode::Fold {
1534 init,
1535 acc: comb,
1536 input: Box::new(self.ir_node.into_inner()),
1537 metadata: self
1538 .location
1539 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1540 };
1541
1542 Singleton::new(self.location, core)
1543 }
1544
1545 /// Collects all the elements of this stream into a single [`Vec`] element.
1546 ///
1547 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1548 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1549 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1550 /// the vector at an arbitrary point in time.
1551 ///
1552 /// # Example
1553 /// ```rust
1554 /// # use hydro_lang::prelude::*;
1555 /// # use futures::StreamExt;
1556 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1557 /// let tick = process.tick();
1558 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1559 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1560 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1561 /// # }, |mut stream| async move {
1562 /// // [ vec![1, 2, 3, 4] ]
1563 /// # for w in vec![vec![1, 2, 3, 4]] {
1564 /// # assert_eq!(stream.next().await.unwrap(), w);
1565 /// # }
1566 /// # }));
1567 /// ```
1568 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1569 self.fold(
1570 q!(|| vec![]),
1571 q!(|acc, v| {
1572 acc.push(v);
1573 }),
1574 )
1575 }
1576
1577 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1578 /// and emitting each intermediate result.
1579 ///
1580 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1581 /// containing all intermediate accumulated values. The scan operation can also terminate early
1582 /// by returning `None`.
1583 ///
1584 /// The function takes a mutable reference to the accumulator and the current element, and returns
1585 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1586 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1587 ///
1588 /// # Examples
1589 ///
1590 /// Basic usage - running sum:
1591 /// ```rust
1592 /// # use hydro_lang::prelude::*;
1593 /// # use futures::StreamExt;
1594 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1595 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1596 /// q!(|| 0),
1597 /// q!(|acc, x| {
1598 /// *acc += x;
1599 /// Some(*acc)
1600 /// }),
1601 /// )
1602 /// # }, |mut stream| async move {
1603 /// // Output: 1, 3, 6, 10
1604 /// # for w in vec![1, 3, 6, 10] {
1605 /// # assert_eq!(stream.next().await.unwrap(), w);
1606 /// # }
1607 /// # }));
1608 /// ```
1609 ///
1610 /// Early termination example:
1611 /// ```rust
1612 /// # use hydro_lang::prelude::*;
1613 /// # use futures::StreamExt;
1614 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1615 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1616 /// q!(|| 1),
1617 /// q!(|state, x| {
1618 /// *state = *state * x;
1619 /// if *state > 6 {
1620 /// None // Terminate the stream
1621 /// } else {
1622 /// Some(-*state)
1623 /// }
1624 /// }),
1625 /// )
1626 /// # }, |mut stream| async move {
1627 /// // Output: -1, -2, -6
1628 /// # for w in vec![-1, -2, -6] {
1629 /// # assert_eq!(stream.next().await.unwrap(), w);
1630 /// # }
1631 /// # }));
1632 /// ```
1633 pub fn scan<A, U, I, F>(
1634 self,
1635 init: impl IntoQuotedMut<'a, I, L>,
1636 f: impl IntoQuotedMut<'a, F, L>,
1637 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1638 where
1639 I: Fn() -> A + 'a,
1640 F: Fn(&mut A, T) -> Option<U> + 'a,
1641 {
1642 let init = init.splice_fn0_ctx(&self.location).into();
1643 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1644
1645 Stream::new(
1646 self.location.clone(),
1647 HydroNode::Scan {
1648 init,
1649 acc: f,
1650 input: Box::new(self.ir_node.into_inner()),
1651 metadata: self.location.new_node_metadata(
1652 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1653 ),
1654 },
1655 )
1656 }
1657
1658 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1659 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1660 /// until the first element in the input arrives.
1661 ///
1662 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1663 /// to depend on the order of elements in the stream.
1664 ///
1665 /// # Example
1666 /// ```rust
1667 /// # use hydro_lang::prelude::*;
1668 /// # use futures::StreamExt;
1669 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1670 /// let tick = process.tick();
1671 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1672 /// let batch = words.batch(&tick, nondet!(/** test */));
1673 /// batch
1674 /// .map(q!(|x| x.to_string()))
1675 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1676 /// .all_ticks()
1677 /// # }, |mut stream| async move {
1678 /// // "HELLOWORLD"
1679 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1680 /// # }));
1681 /// ```
1682 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1683 self,
1684 comb: impl IntoQuotedMut<'a, F, L>,
1685 ) -> Optional<T, L, B> {
1686 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1687 let core = HydroNode::Reduce {
1688 f,
1689 input: Box::new(self.ir_node.into_inner()),
1690 metadata: self
1691 .location
1692 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1693 };
1694
1695 Optional::new(self.location, core)
1696 }
1697}
1698
1699impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1700 /// Produces a new stream that interleaves the elements of the two input streams.
1701 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1702 ///
1703 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1704 /// [`Bounded`], you can use [`Stream::chain`] instead.
1705 ///
1706 /// # Example
1707 /// ```rust
1708 /// # use hydro_lang::prelude::*;
1709 /// # use futures::StreamExt;
1710 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1711 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1712 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1713 /// # }, |mut stream| async move {
1714 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1715 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1716 /// # assert_eq!(stream.next().await.unwrap(), w);
1717 /// # }
1718 /// # }));
1719 /// ```
1720 pub fn interleave<O2: Ordering, R2: Retries>(
1721 self,
1722 other: Stream<T, L, Unbounded, O2, R2>,
1723 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1724 where
1725 R: MinRetries<R2>,
1726 {
1727 Stream::new(
1728 self.location.clone(),
1729 HydroNode::Chain {
1730 first: Box::new(self.ir_node.into_inner()),
1731 second: Box::new(other.ir_node.into_inner()),
1732 metadata: self.location.new_node_metadata(Stream::<
1733 T,
1734 L,
1735 Unbounded,
1736 NoOrder,
1737 <R as MinRetries<R2>>::Min,
1738 >::collection_kind()),
1739 },
1740 )
1741 }
1742}
1743
1744impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1745where
1746 L: Location<'a>,
1747{
1748 /// Produces a new stream that emits the input elements in sorted order.
1749 ///
1750 /// The input stream can have any ordering guarantee, but the output stream
1751 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1752 /// elements in the input stream are available, so it requires the input stream
1753 /// to be [`Bounded`].
1754 ///
1755 /// # Example
1756 /// ```rust
1757 /// # use hydro_lang::prelude::*;
1758 /// # use futures::StreamExt;
1759 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1760 /// let tick = process.tick();
1761 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1762 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1763 /// batch.sort().all_ticks()
1764 /// # }, |mut stream| async move {
1765 /// // 1, 2, 3, 4
1766 /// # for w in (1..5) {
1767 /// # assert_eq!(stream.next().await.unwrap(), w);
1768 /// # }
1769 /// # }));
1770 /// ```
1771 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1772 where
1773 T: Ord,
1774 {
1775 Stream::new(
1776 self.location.clone(),
1777 HydroNode::Sort {
1778 input: Box::new(self.ir_node.into_inner()),
1779 metadata: self
1780 .location
1781 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1782 },
1783 )
1784 }
1785
1786 /// Produces a new stream that first emits the elements of the `self` stream,
1787 /// and then emits the elements of the `other` stream. The output stream has
1788 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1789 /// [`TotalOrder`] guarantee.
1790 ///
1791 /// Currently, both input streams must be [`Bounded`]. This operator will block
1792 /// on the first stream until all its elements are available. In a future version,
1793 /// we will relax the requirement on the `other` stream.
1794 ///
1795 /// # Example
1796 /// ```rust
1797 /// # use hydro_lang::prelude::*;
1798 /// # use futures::StreamExt;
1799 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1800 /// let tick = process.tick();
1801 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1802 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1803 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1804 /// # }, |mut stream| async move {
1805 /// // 2, 3, 4, 5, 1, 2, 3, 4
1806 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1807 /// # assert_eq!(stream.next().await.unwrap(), w);
1808 /// # }
1809 /// # }));
1810 /// ```
1811 pub fn chain<O2: Ordering, R2: Retries>(
1812 self,
1813 other: Stream<T, L, Bounded, O2, R2>,
1814 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1815 where
1816 O: MinOrder<O2>,
1817 R: MinRetries<R2>,
1818 {
1819 check_matching_location(&self.location, &other.location);
1820
1821 Stream::new(
1822 self.location.clone(),
1823 HydroNode::Chain {
1824 first: Box::new(self.ir_node.into_inner()),
1825 second: Box::new(other.ir_node.into_inner()),
1826 metadata: self.location.new_node_metadata(Stream::<
1827 T,
1828 L,
1829 Bounded,
1830 <O as MinOrder<O2>>::Min,
1831 <R as MinRetries<R2>>::Min,
1832 >::collection_kind()),
1833 },
1834 )
1835 }
1836
1837 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1838 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1839 /// because this is compiled into a nested loop.
1840 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1841 self,
1842 other: Stream<T2, L, Bounded, O2, R>,
1843 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1844 where
1845 T: Clone,
1846 T2: Clone,
1847 {
1848 check_matching_location(&self.location, &other.location);
1849
1850 Stream::new(
1851 self.location.clone(),
1852 HydroNode::CrossProduct {
1853 left: Box::new(self.ir_node.into_inner()),
1854 right: Box::new(other.ir_node.into_inner()),
1855 metadata: self.location.new_node_metadata(Stream::<
1856 (T, T2),
1857 L,
1858 Bounded,
1859 <O2 as MinOrder<O>>::Min,
1860 R,
1861 >::collection_kind()),
1862 },
1863 )
1864 }
1865
1866 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1867 /// `self` used as the values for *each* key.
1868 ///
1869 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1870 /// values. For example, it can be used to send the same set of elements to several cluster
1871 /// members, if the membership information is available as a [`KeyedSingleton`].
1872 ///
1873 /// # Example
1874 /// ```rust
1875 /// # use hydro_lang::prelude::*;
1876 /// # use futures::StreamExt;
1877 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1878 /// # let tick = process.tick();
1879 /// let keyed_singleton = // { 1: (), 2: () }
1880 /// # process
1881 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1882 /// # .into_keyed()
1883 /// # .batch(&tick, nondet!(/** test */))
1884 /// # .first();
1885 /// let stream = // [ "a", "b" ]
1886 /// # process
1887 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1888 /// # .batch(&tick, nondet!(/** test */));
1889 /// stream.repeat_with_keys(keyed_singleton)
1890 /// # .entries().all_ticks()
1891 /// # }, |mut stream| async move {
1892 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1893 /// # let mut results = Vec::new();
1894 /// # for _ in 0..4 {
1895 /// # results.push(stream.next().await.unwrap());
1896 /// # }
1897 /// # results.sort();
1898 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1899 /// # }));
1900 /// ```
1901 pub fn repeat_with_keys<K, V2>(
1902 self,
1903 keys: KeyedSingleton<K, V2, L, Bounded>,
1904 ) -> KeyedStream<K, T, L, Bounded, O, R>
1905 where
1906 K: Clone,
1907 T: Clone,
1908 {
1909 keys.keys()
1910 .weaken_retries()
1911 .assume_ordering_trusted::<TotalOrder>(
1912 nondet!(/** keyed stream does not depend on ordering of keys */),
1913 )
1914 .cross_product_nested_loop(self)
1915 .into_keyed()
1916 }
1917}
1918
1919impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1920where
1921 L: Location<'a>,
1922{
1923 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1924 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1925 /// by equi-joining the two streams on the key attribute `K`.
1926 ///
1927 /// # Example
1928 /// ```rust
1929 /// # use hydro_lang::prelude::*;
1930 /// # use std::collections::HashSet;
1931 /// # use futures::StreamExt;
1932 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1933 /// let tick = process.tick();
1934 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1935 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1936 /// stream1.join(stream2)
1937 /// # }, |mut stream| async move {
1938 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1939 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1940 /// # stream.map(|i| assert!(expected.contains(&i)));
1941 /// # }));
1942 pub fn join<V2, O2: Ordering, R2: Retries>(
1943 self,
1944 n: Stream<(K, V2), L, B, O2, R2>,
1945 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1946 where
1947 K: Eq + Hash,
1948 R: MinRetries<R2>,
1949 {
1950 check_matching_location(&self.location, &n.location);
1951
1952 Stream::new(
1953 self.location.clone(),
1954 HydroNode::Join {
1955 left: Box::new(self.ir_node.into_inner()),
1956 right: Box::new(n.ir_node.into_inner()),
1957 metadata: self.location.new_node_metadata(Stream::<
1958 (K, (V1, V2)),
1959 L,
1960 B,
1961 NoOrder,
1962 <R as MinRetries<R2>>::Min,
1963 >::collection_kind()),
1964 },
1965 )
1966 }
1967
1968 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1969 /// computes the anti-join of the items in the input -- i.e. returns
1970 /// unique items in the first input that do not have a matching key
1971 /// in the second input.
1972 ///
1973 /// # Example
1974 /// ```rust
1975 /// # use hydro_lang::prelude::*;
1976 /// # use futures::StreamExt;
1977 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1978 /// let tick = process.tick();
1979 /// let stream = process
1980 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1981 /// .batch(&tick, nondet!(/** test */));
1982 /// let batch = process
1983 /// .source_iter(q!(vec![1, 2]))
1984 /// .batch(&tick, nondet!(/** test */));
1985 /// stream.anti_join(batch).all_ticks()
1986 /// # }, |mut stream| async move {
1987 /// # for w in vec![(3, 'c'), (4, 'd')] {
1988 /// # assert_eq!(stream.next().await.unwrap(), w);
1989 /// # }
1990 /// # }));
1991 pub fn anti_join<O2: Ordering, R2: Retries>(
1992 self,
1993 n: Stream<K, L, Bounded, O2, R2>,
1994 ) -> Stream<(K, V1), L, B, O, R>
1995 where
1996 K: Eq + Hash,
1997 {
1998 check_matching_location(&self.location, &n.location);
1999
2000 Stream::new(
2001 self.location.clone(),
2002 HydroNode::AntiJoin {
2003 pos: Box::new(self.ir_node.into_inner()),
2004 neg: Box::new(n.ir_node.into_inner()),
2005 metadata: self
2006 .location
2007 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2008 },
2009 )
2010 }
2011}
2012
2013impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2014 Stream<(K, V), L, B, O, R>
2015{
2016 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2017 /// is used as the key and the second element is added to the entries associated with that key.
2018 ///
2019 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2020 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2021 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2022 /// total ordering _within_ each group but no ordering _across_ groups.
2023 ///
2024 /// # Example
2025 /// ```rust
2026 /// # use hydro_lang::prelude::*;
2027 /// # use futures::StreamExt;
2028 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2029 /// process
2030 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2031 /// .into_keyed()
2032 /// # .entries()
2033 /// # }, |mut stream| async move {
2034 /// // { 1: [2, 3], 2: [4] }
2035 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2036 /// # assert_eq!(stream.next().await.unwrap(), w);
2037 /// # }
2038 /// # }));
2039 /// ```
2040 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2041 KeyedStream::new(
2042 self.location.clone(),
2043 HydroNode::Cast {
2044 inner: Box::new(self.ir_node.into_inner()),
2045 metadata: self
2046 .location
2047 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2048 },
2049 )
2050 }
2051}
2052
2053impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2054where
2055 K: Eq + Hash,
2056 L: Location<'a>,
2057{
2058 #[deprecated = "use .into_keyed().fold(...) instead"]
2059 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2060 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2061 /// in the second element are accumulated via the `comb` closure.
2062 ///
2063 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2064 /// to depend on the order of elements in the stream.
2065 ///
2066 /// If the input and output value types are the same and do not require initialization then use
2067 /// [`Stream::reduce_keyed`].
2068 ///
2069 /// # Example
2070 /// ```rust
2071 /// # use hydro_lang::prelude::*;
2072 /// # use futures::StreamExt;
2073 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2074 /// let tick = process.tick();
2075 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2076 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2077 /// batch
2078 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2079 /// .all_ticks()
2080 /// # }, |mut stream| async move {
2081 /// // (1, 5), (2, 7)
2082 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2083 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2084 /// # }));
2085 /// ```
2086 pub fn fold_keyed<A, I, F>(
2087 self,
2088 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2089 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2090 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2091 where
2092 I: Fn() -> A + 'a,
2093 F: Fn(&mut A, V) + 'a,
2094 {
2095 self.into_keyed().fold(init, comb).entries()
2096 }
2097
2098 #[deprecated = "use .into_keyed().reduce(...) instead"]
2099 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2100 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2101 /// in the second element are accumulated via the `comb` closure.
2102 ///
2103 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2104 /// to depend on the order of elements in the stream.
2105 ///
2106 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2107 ///
2108 /// # Example
2109 /// ```rust
2110 /// # use hydro_lang::prelude::*;
2111 /// # use futures::StreamExt;
2112 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2113 /// let tick = process.tick();
2114 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2115 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2116 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2117 /// # }, |mut stream| async move {
2118 /// // (1, 5), (2, 7)
2119 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2120 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2121 /// # }));
2122 /// ```
2123 pub fn reduce_keyed<F>(
2124 self,
2125 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2126 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2127 where
2128 F: Fn(&mut V, V) + 'a,
2129 {
2130 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2131
2132 Stream::new(
2133 self.location.clone(),
2134 HydroNode::ReduceKeyed {
2135 f,
2136 input: Box::new(self.ir_node.into_inner()),
2137 metadata: self.location.new_node_metadata(Stream::<
2138 (K, V),
2139 Tick<L>,
2140 Bounded,
2141 NoOrder,
2142 ExactlyOnce,
2143 >::collection_kind()),
2144 },
2145 )
2146 }
2147}
2148
2149impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2150where
2151 K: Eq + Hash,
2152 L: Location<'a>,
2153{
2154 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2155 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2156 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2157 /// in the second element are accumulated via the `comb` closure.
2158 ///
2159 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2160 /// as there may be non-deterministic duplicates.
2161 ///
2162 /// If the input and output value types are the same and do not require initialization then use
2163 /// [`Stream::reduce_keyed_commutative_idempotent`].
2164 ///
2165 /// # Example
2166 /// ```rust
2167 /// # use hydro_lang::prelude::*;
2168 /// # use futures::StreamExt;
2169 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2170 /// let tick = process.tick();
2171 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2172 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2173 /// batch
2174 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2175 /// .all_ticks()
2176 /// # }, |mut stream| async move {
2177 /// // (1, false), (2, true)
2178 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2179 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2180 /// # }));
2181 /// ```
2182 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2183 self,
2184 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2185 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2186 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2187 where
2188 I: Fn() -> A + 'a,
2189 F: Fn(&mut A, V) + 'a,
2190 {
2191 self.into_keyed()
2192 .fold_commutative_idempotent(init, comb)
2193 .entries()
2194 }
2195
2196 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2197 /// # Example
2198 /// ```rust
2199 /// # use hydro_lang::prelude::*;
2200 /// # use futures::StreamExt;
2201 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2202 /// let tick = process.tick();
2203 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2204 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2205 /// batch.keys().all_ticks()
2206 /// # }, |mut stream| async move {
2207 /// // 1, 2
2208 /// # assert_eq!(stream.next().await.unwrap(), 1);
2209 /// # assert_eq!(stream.next().await.unwrap(), 2);
2210 /// # }));
2211 /// ```
2212 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2213 self.into_keyed()
2214 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2215 .keys()
2216 }
2217
2218 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2219 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2220 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2221 /// in the second element are accumulated via the `comb` closure.
2222 ///
2223 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2224 /// as there may be non-deterministic duplicates.
2225 ///
2226 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2227 ///
2228 /// # Example
2229 /// ```rust
2230 /// # use hydro_lang::prelude::*;
2231 /// # use futures::StreamExt;
2232 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2233 /// let tick = process.tick();
2234 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2235 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2236 /// batch
2237 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2238 /// .all_ticks()
2239 /// # }, |mut stream| async move {
2240 /// // (1, false), (2, true)
2241 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2242 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2243 /// # }));
2244 /// ```
2245 pub fn reduce_keyed_commutative_idempotent<F>(
2246 self,
2247 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2248 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2249 where
2250 F: Fn(&mut V, V) + 'a,
2251 {
2252 self.into_keyed()
2253 .reduce_commutative_idempotent(comb)
2254 .entries()
2255 }
2256}
2257
2258impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2259where
2260 K: Eq + Hash,
2261 L: Location<'a>,
2262{
2263 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2264 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2265 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2266 /// in the second element are accumulated via the `comb` closure.
2267 ///
2268 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2269 ///
2270 /// If the input and output value types are the same and do not require initialization then use
2271 /// [`Stream::reduce_keyed_commutative`].
2272 ///
2273 /// # Example
2274 /// ```rust
2275 /// # use hydro_lang::prelude::*;
2276 /// # use futures::StreamExt;
2277 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2278 /// let tick = process.tick();
2279 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2280 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2281 /// batch
2282 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2283 /// .all_ticks()
2284 /// # }, |mut stream| async move {
2285 /// // (1, 5), (2, 7)
2286 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2287 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2288 /// # }));
2289 /// ```
2290 pub fn fold_keyed_commutative<A, I, F>(
2291 self,
2292 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2293 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2294 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2295 where
2296 I: Fn() -> A + 'a,
2297 F: Fn(&mut A, V) + 'a,
2298 {
2299 self.into_keyed().fold_commutative(init, comb).entries()
2300 }
2301
2302 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2303 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2304 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2305 /// in the second element are accumulated via the `comb` closure.
2306 ///
2307 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2308 ///
2309 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2310 ///
2311 /// # Example
2312 /// ```rust
2313 /// # use hydro_lang::prelude::*;
2314 /// # use futures::StreamExt;
2315 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2316 /// let tick = process.tick();
2317 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2318 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2319 /// batch
2320 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2321 /// .all_ticks()
2322 /// # }, |mut stream| async move {
2323 /// // (1, 5), (2, 7)
2324 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2325 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2326 /// # }));
2327 /// ```
2328 pub fn reduce_keyed_commutative<F>(
2329 self,
2330 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2331 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2332 where
2333 F: Fn(&mut V, V) + 'a,
2334 {
2335 self.into_keyed().reduce_commutative(comb).entries()
2336 }
2337}
2338
2339impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2340where
2341 K: Eq + Hash,
2342 L: Location<'a>,
2343{
2344 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2345 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2346 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2347 /// in the second element are accumulated via the `comb` closure.
2348 ///
2349 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2350 ///
2351 /// If the input and output value types are the same and do not require initialization then use
2352 /// [`Stream::reduce_keyed_idempotent`].
2353 ///
2354 /// # Example
2355 /// ```rust
2356 /// # use hydro_lang::prelude::*;
2357 /// # use futures::StreamExt;
2358 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2359 /// let tick = process.tick();
2360 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2361 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2362 /// batch
2363 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2364 /// .all_ticks()
2365 /// # }, |mut stream| async move {
2366 /// // (1, false), (2, true)
2367 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2368 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2369 /// # }));
2370 /// ```
2371 pub fn fold_keyed_idempotent<A, I, F>(
2372 self,
2373 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2374 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2375 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2376 where
2377 I: Fn() -> A + 'a,
2378 F: Fn(&mut A, V) + 'a,
2379 {
2380 self.into_keyed().fold_idempotent(init, comb).entries()
2381 }
2382
2383 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2384 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2385 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2386 /// in the second element are accumulated via the `comb` closure.
2387 ///
2388 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2389 ///
2390 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2391 ///
2392 /// # Example
2393 /// ```rust
2394 /// # use hydro_lang::prelude::*;
2395 /// # use futures::StreamExt;
2396 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2397 /// let tick = process.tick();
2398 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2399 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2400 /// batch
2401 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2402 /// .all_ticks()
2403 /// # }, |mut stream| async move {
2404 /// // (1, false), (2, true)
2405 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2406 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2407 /// # }));
2408 /// ```
2409 pub fn reduce_keyed_idempotent<F>(
2410 self,
2411 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2412 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2413 where
2414 F: Fn(&mut V, V) + 'a,
2415 {
2416 self.into_keyed().reduce_idempotent(comb).entries()
2417 }
2418}
2419
2420impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2421where
2422 L: Location<'a> + NoTick,
2423{
2424 /// Returns a stream corresponding to the latest batch of elements being atomically
2425 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2426 /// the order of the input.
2427 ///
2428 /// # Non-Determinism
2429 /// The batch boundaries are non-deterministic and may change across executions.
2430 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2431 Stream::new(
2432 self.location.clone().tick,
2433 HydroNode::Batch {
2434 inner: Box::new(self.ir_node.into_inner()),
2435 metadata: self
2436 .location
2437 .tick
2438 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2439 },
2440 )
2441 }
2442
2443 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2444 /// See [`Stream::atomic`] for more details.
2445 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2446 Stream::new(
2447 self.location.tick.l.clone(),
2448 HydroNode::EndAtomic {
2449 inner: Box::new(self.ir_node.into_inner()),
2450 metadata: self
2451 .location
2452 .tick
2453 .l
2454 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2455 },
2456 )
2457 }
2458}
2459
2460impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2461where
2462 L: Location<'a>,
2463{
2464 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2465 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2466 ///
2467 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2468 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2469 /// argument that declares where the stream will be atomically processed. Batching a stream into
2470 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2471 /// [`Tick`] will introduce asynchrony.
2472 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2473 let out_location = Atomic { tick: tick.clone() };
2474 Stream::new(
2475 out_location.clone(),
2476 HydroNode::BeginAtomic {
2477 inner: Box::new(self.ir_node.into_inner()),
2478 metadata: out_location
2479 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2480 },
2481 )
2482 }
2483
2484 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2485 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2486 /// the order of the input. The output stream will execute in the [`Tick`] that was
2487 /// used to create the atomic section.
2488 ///
2489 /// # Non-Determinism
2490 /// The batch boundaries are non-deterministic and may change across executions.
2491 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2492 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2493 Stream::new(
2494 tick.clone(),
2495 HydroNode::Batch {
2496 inner: Box::new(self.ir_node.into_inner()),
2497 metadata: tick
2498 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2499 },
2500 )
2501 }
2502
2503 /// Given a time interval, returns a stream corresponding to samples taken from the
2504 /// stream roughly at that interval. The output will have elements in the same order
2505 /// as the input, but with arbitrary elements skipped between samples. There is also
2506 /// no guarantee on the exact timing of the samples.
2507 ///
2508 /// # Non-Determinism
2509 /// The output stream is non-deterministic in which elements are sampled, since this
2510 /// is controlled by a clock.
2511 pub fn sample_every(
2512 self,
2513 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2514 nondet: NonDet,
2515 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2516 where
2517 L: NoTick + NoAtomic,
2518 {
2519 let samples = self.location.source_interval(interval, nondet);
2520
2521 let tick = self.location.tick();
2522 self.batch(&tick, nondet)
2523 .filter_if_some(samples.batch(&tick, nondet).first())
2524 .all_ticks()
2525 .weakest_retries()
2526 }
2527
2528 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2529 /// stream has not emitted a value since that duration.
2530 ///
2531 /// # Non-Determinism
2532 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2533 /// samples take place, timeouts may be non-deterministically generated or missed,
2534 /// and the notification of the timeout may be delayed as well. There is also no
2535 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2536 /// detected based on when the next sample is taken.
2537 pub fn timeout(
2538 self,
2539 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2540 nondet: NonDet,
2541 ) -> Optional<(), L, Unbounded>
2542 where
2543 L: NoTick + NoAtomic,
2544 {
2545 let tick = self.location.tick();
2546
2547 let latest_received = self.assume_retries(nondet).fold_commutative(
2548 q!(|| None),
2549 q!(|latest, _| {
2550 *latest = Some(Instant::now());
2551 }),
2552 );
2553
2554 latest_received
2555 .snapshot(&tick, nondet)
2556 .filter_map(q!(move |latest_received| {
2557 if let Some(latest_received) = latest_received {
2558 if Instant::now().duration_since(latest_received) > duration {
2559 Some(())
2560 } else {
2561 None
2562 }
2563 } else {
2564 Some(())
2565 }
2566 }))
2567 .latest()
2568 }
2569}
2570
2571impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2572where
2573 L: Location<'a> + NoTick + NoAtomic,
2574 F: Future<Output = T>,
2575{
2576 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2577 /// Future outputs are produced as available, regardless of input arrival order.
2578 ///
2579 /// # Example
2580 /// ```rust
2581 /// # use std::collections::HashSet;
2582 /// # use futures::StreamExt;
2583 /// # use hydro_lang::prelude::*;
2584 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2585 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2586 /// .map(q!(|x| async move {
2587 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2588 /// x
2589 /// }))
2590 /// .resolve_futures()
2591 /// # },
2592 /// # |mut stream| async move {
2593 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2594 /// # let mut output = HashSet::new();
2595 /// # for _ in 1..10 {
2596 /// # output.insert(stream.next().await.unwrap());
2597 /// # }
2598 /// # assert_eq!(
2599 /// # output,
2600 /// # HashSet::<i32>::from_iter(1..10)
2601 /// # );
2602 /// # },
2603 /// # ));
2604 pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2605 Stream::new(
2606 self.location.clone(),
2607 HydroNode::ResolveFutures {
2608 input: Box::new(self.ir_node.into_inner()),
2609 metadata: self
2610 .location
2611 .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2612 },
2613 )
2614 }
2615
2616 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2617 /// Future outputs are produced in the same order as the input stream.
2618 ///
2619 /// # Example
2620 /// ```rust
2621 /// # use std::collections::HashSet;
2622 /// # use futures::StreamExt;
2623 /// # use hydro_lang::prelude::*;
2624 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2625 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2626 /// .map(q!(|x| async move {
2627 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2628 /// x
2629 /// }))
2630 /// .resolve_futures_ordered()
2631 /// # },
2632 /// # |mut stream| async move {
2633 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2634 /// # let mut output = Vec::new();
2635 /// # for _ in 1..10 {
2636 /// # output.push(stream.next().await.unwrap());
2637 /// # }
2638 /// # assert_eq!(
2639 /// # output,
2640 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2641 /// # );
2642 /// # },
2643 /// # ));
2644 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2645 Stream::new(
2646 self.location.clone(),
2647 HydroNode::ResolveFuturesOrdered {
2648 input: Box::new(self.ir_node.into_inner()),
2649 metadata: self
2650 .location
2651 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2652 },
2653 )
2654 }
2655}
2656
2657impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2658where
2659 L: Location<'a> + NoTick,
2660{
2661 /// Executes the provided closure for every element in this stream.
2662 ///
2663 /// Because the closure may have side effects, the stream must have deterministic order
2664 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2665 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2666 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2667 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2668 let f = f.splice_fn1_ctx(&self.location).into();
2669 self.location
2670 .flow_state()
2671 .borrow_mut()
2672 .push_root(HydroRoot::ForEach {
2673 input: Box::new(self.ir_node.into_inner()),
2674 f,
2675 op_metadata: HydroIrOpMetadata::new(),
2676 });
2677 }
2678
2679 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2680 /// TCP socket to some other server. You should _not_ use this API for interacting with
2681 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2682 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2683 /// interaction with asynchronous sinks.
2684 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2685 where
2686 S: 'a + futures::Sink<T> + Unpin,
2687 {
2688 self.location
2689 .flow_state()
2690 .borrow_mut()
2691 .push_root(HydroRoot::DestSink {
2692 sink: sink.splice_typed_ctx(&self.location).into(),
2693 input: Box::new(self.ir_node.into_inner()),
2694 op_metadata: HydroIrOpMetadata::new(),
2695 });
2696 }
2697}
2698
2699impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2700where
2701 L: Location<'a>,
2702{
2703 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2704 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2705 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2706 Stream::new(
2707 self.location.outer().clone(),
2708 HydroNode::YieldConcat {
2709 inner: Box::new(self.ir_node.into_inner()),
2710 metadata: self
2711 .location
2712 .outer()
2713 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2714 },
2715 )
2716 }
2717
2718 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2719 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2720 ///
2721 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2722 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2723 /// stream's [`Tick`] context.
2724 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2725 let out_location = Atomic {
2726 tick: self.location.clone(),
2727 };
2728
2729 Stream::new(
2730 out_location.clone(),
2731 HydroNode::YieldConcat {
2732 inner: Box::new(self.ir_node.into_inner()),
2733 metadata: out_location
2734 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2735 },
2736 )
2737 }
2738
2739 /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2740 ///
2741 /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2742 /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2743 /// careful when using this operator, as its memory usage will grow linearly over time since it
2744 /// must store its inputs indefinitely.
2745 ///
2746 /// # Example
2747 /// ```rust
2748 /// # use hydro_lang::prelude::*;
2749 /// # use futures::StreamExt;
2750 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2751 /// let tick = process.tick();
2752 /// // ticks are lazy by default, forces the second tick to run
2753 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2754 ///
2755 /// let batch_first_tick = process
2756 /// .source_iter(q!(vec![1, 2, 3, 4]))
2757 /// .batch(&tick, nondet!(/** test */));
2758 /// let batch_second_tick = process
2759 /// .source_iter(q!(vec![5, 6, 7, 8]))
2760 /// .batch(&tick, nondet!(/** test */))
2761 /// .defer_tick(); // appears on the second tick
2762 /// batch_first_tick.chain(batch_second_tick)
2763 /// .persist()
2764 /// .all_ticks()
2765 /// # }, |mut stream| async move {
2766 /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2767 /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2768 /// # assert_eq!(stream.next().await.unwrap(), w);
2769 /// # }
2770 /// # }));
2771 /// ```
2772 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2773 where
2774 T: Clone,
2775 {
2776 Stream::new(
2777 self.location.clone(),
2778 HydroNode::Persist {
2779 inner: Box::new(self.ir_node.into_inner()),
2780 metadata: self
2781 .location
2782 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2783 },
2784 )
2785 }
2786
2787 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2788 /// always has the elements of `self` at tick `T - 1`.
2789 ///
2790 /// At tick `0`, the output stream is empty, since there is no previous tick.
2791 ///
2792 /// This operator enables stateful iterative processing with ticks, by sending data from one
2793 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2794 ///
2795 /// # Example
2796 /// ```rust
2797 /// # use hydro_lang::prelude::*;
2798 /// # use futures::StreamExt;
2799 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2800 /// let tick = process.tick();
2801 /// // ticks are lazy by default, forces the second tick to run
2802 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2803 ///
2804 /// let batch_first_tick = process
2805 /// .source_iter(q!(vec![1, 2, 3, 4]))
2806 /// .batch(&tick, nondet!(/** test */));
2807 /// let batch_second_tick = process
2808 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2809 /// .batch(&tick, nondet!(/** test */))
2810 /// .defer_tick(); // appears on the second tick
2811 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2812 ///
2813 /// changes_across_ticks.clone().filter_not_in(
2814 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2815 /// ).all_ticks()
2816 /// # }, |mut stream| async move {
2817 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2818 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2819 /// # assert_eq!(stream.next().await.unwrap(), w);
2820 /// # }
2821 /// # }));
2822 /// ```
2823 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2824 Stream::new(
2825 self.location.clone(),
2826 HydroNode::DeferTick {
2827 input: Box::new(self.ir_node.into_inner()),
2828 metadata: self
2829 .location
2830 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2831 },
2832 )
2833 }
2834}
2835
2836#[cfg(test)]
2837mod tests {
2838 #[cfg(feature = "deploy")]
2839 use futures::{SinkExt, StreamExt};
2840 #[cfg(feature = "deploy")]
2841 use hydro_deploy::Deployment;
2842 #[cfg(feature = "deploy")]
2843 use serde::{Deserialize, Serialize};
2844 #[cfg(feature = "deploy")]
2845 use stageleft::q;
2846
2847 use crate::compile::builder::FlowBuilder;
2848 #[cfg(feature = "deploy")]
2849 use crate::live_collections::stream::ExactlyOnce;
2850 use crate::live_collections::stream::{NoOrder, TotalOrder};
2851 use crate::location::Location;
2852 use crate::nondet::nondet;
2853
2854 mod backtrace_chained_ops;
2855
2856 #[cfg(feature = "deploy")]
2857 struct P1 {}
2858 #[cfg(feature = "deploy")]
2859 struct P2 {}
2860
2861 #[cfg(feature = "deploy")]
2862 #[derive(Serialize, Deserialize, Debug)]
2863 struct SendOverNetwork {
2864 n: u32,
2865 }
2866
2867 #[cfg(feature = "deploy")]
2868 #[tokio::test]
2869 async fn first_ten_distributed() {
2870 let mut deployment = Deployment::new();
2871
2872 let flow = FlowBuilder::new();
2873 let first_node = flow.process::<P1>();
2874 let second_node = flow.process::<P2>();
2875 let external = flow.external::<P2>();
2876
2877 let numbers = first_node.source_iter(q!(0..10));
2878 let out_port = numbers
2879 .map(q!(|n| SendOverNetwork { n }))
2880 .send_bincode(&second_node)
2881 .send_bincode_external(&external);
2882
2883 let nodes = flow
2884 .with_process(&first_node, deployment.Localhost())
2885 .with_process(&second_node, deployment.Localhost())
2886 .with_external(&external, deployment.Localhost())
2887 .deploy(&mut deployment);
2888
2889 deployment.deploy().await.unwrap();
2890
2891 let mut external_out = nodes.connect(out_port).await;
2892
2893 deployment.start().await.unwrap();
2894
2895 for i in 0..10 {
2896 assert_eq!(external_out.next().await.unwrap().n, i);
2897 }
2898 }
2899
2900 #[cfg(feature = "deploy")]
2901 #[tokio::test]
2902 async fn first_cardinality() {
2903 let mut deployment = Deployment::new();
2904
2905 let flow = FlowBuilder::new();
2906 let node = flow.process::<()>();
2907 let external = flow.external::<()>();
2908
2909 let node_tick = node.tick();
2910 let count = node_tick
2911 .singleton(q!([1, 2, 3]))
2912 .into_stream()
2913 .flatten_ordered()
2914 .first()
2915 .into_stream()
2916 .count()
2917 .all_ticks()
2918 .send_bincode_external(&external);
2919
2920 let nodes = flow
2921 .with_process(&node, deployment.Localhost())
2922 .with_external(&external, deployment.Localhost())
2923 .deploy(&mut deployment);
2924
2925 deployment.deploy().await.unwrap();
2926
2927 let mut external_out = nodes.connect(count).await;
2928
2929 deployment.start().await.unwrap();
2930
2931 assert_eq!(external_out.next().await.unwrap(), 1);
2932 }
2933
2934 #[cfg(feature = "deploy")]
2935 #[tokio::test]
2936 async fn unbounded_reduce_remembers_state() {
2937 let mut deployment = Deployment::new();
2938
2939 let flow = FlowBuilder::new();
2940 let node = flow.process::<()>();
2941 let external = flow.external::<()>();
2942
2943 let (input_port, input) = node.source_external_bincode(&external);
2944 let out = input
2945 .reduce(q!(|acc, v| *acc += v))
2946 .sample_eager(nondet!(/** test */))
2947 .send_bincode_external(&external);
2948
2949 let nodes = flow
2950 .with_process(&node, deployment.Localhost())
2951 .with_external(&external, deployment.Localhost())
2952 .deploy(&mut deployment);
2953
2954 deployment.deploy().await.unwrap();
2955
2956 let mut external_in = nodes.connect(input_port).await;
2957 let mut external_out = nodes.connect(out).await;
2958
2959 deployment.start().await.unwrap();
2960
2961 external_in.send(1).await.unwrap();
2962 assert_eq!(external_out.next().await.unwrap(), 1);
2963
2964 external_in.send(2).await.unwrap();
2965 assert_eq!(external_out.next().await.unwrap(), 3);
2966 }
2967
2968 #[cfg(feature = "deploy")]
2969 #[tokio::test]
2970 async fn atomic_fold_replays_each_tick() {
2971 let mut deployment = Deployment::new();
2972
2973 let flow = FlowBuilder::new();
2974 let node = flow.process::<()>();
2975 let external = flow.external::<()>();
2976
2977 let (input_port, input) =
2978 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2979 let tick = node.tick();
2980
2981 let out = input
2982 .batch(&tick, nondet!(/** test */))
2983 .cross_singleton(
2984 node.source_iter(q!(vec![1, 2, 3]))
2985 .atomic(&tick)
2986 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2987 .snapshot_atomic(nondet!(/** test */)),
2988 )
2989 .all_ticks()
2990 .send_bincode_external(&external);
2991
2992 let nodes = flow
2993 .with_process(&node, deployment.Localhost())
2994 .with_external(&external, deployment.Localhost())
2995 .deploy(&mut deployment);
2996
2997 deployment.deploy().await.unwrap();
2998
2999 let mut external_in = nodes.connect(input_port).await;
3000 let mut external_out = nodes.connect(out).await;
3001
3002 deployment.start().await.unwrap();
3003
3004 external_in.send(1).await.unwrap();
3005 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3006
3007 external_in.send(2).await.unwrap();
3008 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3009 }
3010
3011 #[cfg(feature = "deploy")]
3012 #[tokio::test]
3013 async fn unbounded_scan_remembers_state() {
3014 let mut deployment = Deployment::new();
3015
3016 let flow = FlowBuilder::new();
3017 let node = flow.process::<()>();
3018 let external = flow.external::<()>();
3019
3020 let (input_port, input) = node.source_external_bincode(&external);
3021 let out = input
3022 .scan(
3023 q!(|| 0),
3024 q!(|acc, v| {
3025 *acc += v;
3026 Some(*acc)
3027 }),
3028 )
3029 .send_bincode_external(&external);
3030
3031 let nodes = flow
3032 .with_process(&node, deployment.Localhost())
3033 .with_external(&external, deployment.Localhost())
3034 .deploy(&mut deployment);
3035
3036 deployment.deploy().await.unwrap();
3037
3038 let mut external_in = nodes.connect(input_port).await;
3039 let mut external_out = nodes.connect(out).await;
3040
3041 deployment.start().await.unwrap();
3042
3043 external_in.send(1).await.unwrap();
3044 assert_eq!(external_out.next().await.unwrap(), 1);
3045
3046 external_in.send(2).await.unwrap();
3047 assert_eq!(external_out.next().await.unwrap(), 3);
3048 }
3049
3050 #[cfg(feature = "deploy")]
3051 #[tokio::test]
3052 async fn unbounded_enumerate_remembers_state() {
3053 let mut deployment = Deployment::new();
3054
3055 let flow = FlowBuilder::new();
3056 let node = flow.process::<()>();
3057 let external = flow.external::<()>();
3058
3059 let (input_port, input) = node.source_external_bincode(&external);
3060 let out = input.enumerate().send_bincode_external(&external);
3061
3062 let nodes = flow
3063 .with_process(&node, deployment.Localhost())
3064 .with_external(&external, deployment.Localhost())
3065 .deploy(&mut deployment);
3066
3067 deployment.deploy().await.unwrap();
3068
3069 let mut external_in = nodes.connect(input_port).await;
3070 let mut external_out = nodes.connect(out).await;
3071
3072 deployment.start().await.unwrap();
3073
3074 external_in.send(1).await.unwrap();
3075 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3076
3077 external_in.send(2).await.unwrap();
3078 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3079 }
3080
3081 #[cfg(feature = "deploy")]
3082 #[tokio::test]
3083 async fn unbounded_unique_remembers_state() {
3084 let mut deployment = Deployment::new();
3085
3086 let flow = FlowBuilder::new();
3087 let node = flow.process::<()>();
3088 let external = flow.external::<()>();
3089
3090 let (input_port, input) =
3091 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3092 let out = input.unique().send_bincode_external(&external);
3093
3094 let nodes = flow
3095 .with_process(&node, deployment.Localhost())
3096 .with_external(&external, deployment.Localhost())
3097 .deploy(&mut deployment);
3098
3099 deployment.deploy().await.unwrap();
3100
3101 let mut external_in = nodes.connect(input_port).await;
3102 let mut external_out = nodes.connect(out).await;
3103
3104 deployment.start().await.unwrap();
3105
3106 external_in.send(1).await.unwrap();
3107 assert_eq!(external_out.next().await.unwrap(), 1);
3108
3109 external_in.send(2).await.unwrap();
3110 assert_eq!(external_out.next().await.unwrap(), 2);
3111
3112 external_in.send(1).await.unwrap();
3113 external_in.send(3).await.unwrap();
3114 assert_eq!(external_out.next().await.unwrap(), 3);
3115 }
3116
3117 #[test]
3118 #[should_panic]
3119 fn sim_batch_nondet_size() {
3120 let flow = FlowBuilder::new();
3121 let external = flow.external::<()>();
3122 let node = flow.process::<()>();
3123
3124 let (port, input) = node.source_external_bincode::<_, _, TotalOrder, _>(&external);
3125
3126 let tick = node.tick();
3127 let out_port = input
3128 .batch(&tick, nondet!(/** test */))
3129 .count()
3130 .all_ticks()
3131 .send_bincode_external(&external);
3132
3133 flow.sim().exhaustive(async |mut compiled| {
3134 let in_send = compiled.connect(&port);
3135 let mut out_recv = compiled.connect(&out_port);
3136 compiled.launch();
3137
3138 in_send.send(());
3139 in_send.send(());
3140 in_send.send(());
3141
3142 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3143 });
3144 }
3145
3146 #[test]
3147 fn sim_batch_preserves_order() {
3148 let flow = FlowBuilder::new();
3149 let external = flow.external::<()>();
3150 let node = flow.process::<()>();
3151
3152 let (port, input) = node.source_external_bincode(&external);
3153
3154 let tick = node.tick();
3155 let out_port = input
3156 .batch(&tick, nondet!(/** test */))
3157 .all_ticks()
3158 .send_bincode_external(&external);
3159
3160 flow.sim().exhaustive(async |mut compiled| {
3161 let in_send = compiled.connect(&port);
3162 let out_recv = compiled.connect(&out_port);
3163 compiled.launch();
3164
3165 in_send.send(1);
3166 in_send.send(2);
3167 in_send.send(3);
3168
3169 out_recv.assert_yields_only([1, 2, 3]).await;
3170 });
3171 }
3172
3173 #[test]
3174 #[should_panic]
3175 fn sim_batch_unordered_shuffles() {
3176 let flow = FlowBuilder::new();
3177 let external = flow.external::<()>();
3178 let node = flow.process::<()>();
3179
3180 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3181
3182 let tick = node.tick();
3183 let batch = input.batch(&tick, nondet!(/** test */));
3184 let out_port = batch
3185 .clone()
3186 .min()
3187 .zip(batch.max())
3188 .all_ticks()
3189 .send_bincode_external(&external);
3190
3191 flow.sim().exhaustive(async |mut compiled| {
3192 let in_send = compiled.connect(&port);
3193 let out_recv = compiled.connect(&out_port);
3194 compiled.launch();
3195
3196 in_send.send_many_unordered([1, 2, 3]).unwrap();
3197
3198 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3199 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3200 }
3201 });
3202 }
3203
3204 #[test]
3205 fn sim_batch_unordered_shuffles_count() {
3206 let flow = FlowBuilder::new();
3207 let external = flow.external::<()>();
3208 let node = flow.process::<()>();
3209
3210 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3211
3212 let tick = node.tick();
3213 let batch = input.batch(&tick, nondet!(/** test */));
3214 let out_port = batch.all_ticks().send_bincode_external(&external);
3215
3216 let instance_count = flow.sim().exhaustive(async |mut compiled| {
3217 let in_send = compiled.connect(&port);
3218 let out_recv = compiled.connect(&out_port);
3219 compiled.launch();
3220
3221 in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3222 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3223 });
3224
3225 assert_eq!(
3226 instance_count,
3227 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3228 )
3229 }
3230
3231 #[test]
3232 #[ignore = "assume_ordering not yet supported on bounded collections"]
3233 fn sim_observe_order_batched_count() {
3234 let flow = FlowBuilder::new();
3235 let external = flow.external::<()>();
3236 let node = flow.process::<()>();
3237
3238 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3239
3240 let tick = node.tick();
3241 let batch = input.batch(&tick, nondet!(/** test */));
3242 let out_port = batch
3243 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3244 .all_ticks()
3245 .send_bincode_external(&external);
3246
3247 let instance_count = flow.sim().exhaustive(async |mut compiled| {
3248 let in_send = compiled.connect(&port);
3249 let out_recv = compiled.connect(&out_port);
3250 compiled.launch();
3251
3252 in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3253 let _ = out_recv.collect::<Vec<_>>().await;
3254 });
3255
3256 assert_eq!(
3257 instance_count,
3258 192 // 4! * 2^{4 - 1}
3259 )
3260 }
3261
3262 #[test]
3263 fn sim_unordered_count_instance_count() {
3264 let flow = FlowBuilder::new();
3265 let external = flow.external::<()>();
3266 let node = flow.process::<()>();
3267
3268 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3269
3270 let tick = node.tick();
3271 let out_port = input
3272 .count()
3273 .snapshot(&tick, nondet!(/** test */))
3274 .all_ticks()
3275 .send_bincode_external(&external);
3276
3277 let instance_count = flow.sim().exhaustive(async |mut compiled| {
3278 let in_send = compiled.connect(&port);
3279 let out_recv = compiled.connect(&out_port);
3280 compiled.launch();
3281
3282 in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3283 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3284 });
3285
3286 assert_eq!(
3287 instance_count,
3288 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3289 )
3290 }
3291}