hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47/// ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51 K,
52 V,
53 Loc,
54 Bound: Boundedness = Unbounded,
55 Order: Ordering = TotalOrder,
56 Retry: Retries = ExactlyOnce,
57> {
58 pub(crate) location: Loc,
59 pub(crate) ir_node: RefCell<HydroNode>,
60
61 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65 for KeyedStream<K, V, L, B, NoOrder, R>
66where
67 L: Location<'a>,
68{
69 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70 KeyedStream {
71 location: stream.location,
72 ir_node: stream.ir_node,
73 _phantom: PhantomData,
74 }
75 }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80 L: Location<'a>,
81{
82 fn defer_tick(self) -> Self {
83 KeyedStream::defer_tick(self)
84 }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90 L: Location<'a>,
91{
92 type Location = Tick<L>;
93
94 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95 KeyedStream {
96 location: location.clone(),
97 ir_node: RefCell::new(HydroNode::CycleSource {
98 ident,
99 metadata: location.new_node_metadata(
100 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101 ),
102 }),
103 _phantom: PhantomData,
104 }
105 }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111 L: Location<'a>,
112{
113 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114 assert_eq!(
115 Location::id(&self.location),
116 expected_location,
117 "locations do not match"
118 );
119
120 self.location
121 .flow_state()
122 .borrow_mut()
123 .push_root(HydroRoot::CycleSink {
124 ident,
125 input: Box::new(self.ir_node.into_inner()),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132 for KeyedStream<K, V, L, B, O, R>
133where
134 L: Location<'a> + NoTick,
135{
136 type Location = L;
137
138 fn create_source(ident: syn::Ident, location: L) -> Self {
139 KeyedStream {
140 location: location.clone(),
141 ir_node: RefCell::new(HydroNode::CycleSource {
142 ident,
143 metadata: location
144 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152 for KeyedStream<K, V, L, B, O, R>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176 fn clone(&self) -> Self {
177 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179 *self.ir_node.borrow_mut() = HydroNode::Tee {
180 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181 metadata: self.location.new_node_metadata(Self::collection_kind()),
182 };
183 }
184
185 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186 KeyedStream {
187 location: self.location.clone(),
188 ir_node: HydroNode::Tee {
189 inner: TeeNode(inner.0.clone()),
190 metadata: metadata.clone(),
191 }
192 .into(),
193 _phantom: PhantomData,
194 }
195 } else {
196 unreachable!()
197 }
198 }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202 KeyedStream<K, V, L, B, O, R>
203{
204 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208 KeyedStream {
209 location,
210 ir_node: RefCell::new(ir_node),
211 _phantom: PhantomData,
212 }
213 }
214
215 /// Returns the [`CollectionKind`] corresponding to this type.
216 pub fn collection_kind() -> CollectionKind {
217 CollectionKind::KeyedStream {
218 bound: B::BOUND_KIND,
219 value_order: O::ORDERING_KIND,
220 value_retry: R::RETRIES_KIND,
221 key_type: stageleft::quote_type::<K>().into(),
222 value_type: stageleft::quote_type::<V>().into(),
223 }
224 }
225
226 /// Returns the [`Location`] where this keyed stream is being materialized.
227 pub fn location(&self) -> &L {
228 &self.location
229 }
230
231 /// Explicitly "casts" the keyed stream to a type with a different ordering
232 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233 /// by the type-system.
234 ///
235 /// # Non-Determinism
236 /// This function is used as an escape hatch, and any mistakes in the
237 /// provided ordering guarantee will propagate into the guarantees
238 /// for the rest of the program.
239 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240 if O::ORDERING_KIND == O2::ORDERING_KIND {
241 KeyedStream::new(self.location, self.ir_node.into_inner())
242 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243 // We can always weaken the ordering guarantee
244 KeyedStream::new(
245 self.location.clone(),
246 HydroNode::Cast {
247 inner: Box::new(self.ir_node.into_inner()),
248 metadata: self
249 .location
250 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251 },
252 )
253 } else {
254 KeyedStream::new(
255 self.location.clone(),
256 HydroNode::ObserveNonDet {
257 inner: Box::new(self.ir_node.into_inner()),
258 trusted: false,
259 metadata: self
260 .location
261 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
262 },
263 )
264 }
265 }
266
267 fn assume_ordering_trusted<O2: Ordering>(
268 self,
269 _nondet: NonDet,
270 ) -> KeyedStream<K, V, L, B, O2, R> {
271 if O::ORDERING_KIND == O2::ORDERING_KIND {
272 KeyedStream::new(self.location, self.ir_node.into_inner())
273 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
274 // We can always weaken the ordering guarantee
275 KeyedStream::new(
276 self.location.clone(),
277 HydroNode::Cast {
278 inner: Box::new(self.ir_node.into_inner()),
279 metadata: self
280 .location
281 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
282 },
283 )
284 } else {
285 KeyedStream::new(
286 self.location.clone(),
287 HydroNode::ObserveNonDet {
288 inner: Box::new(self.ir_node.into_inner()),
289 trusted: true,
290 metadata: self
291 .location
292 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
293 },
294 )
295 }
296 }
297
298 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
299 /// which is always safe because that is the weakest possible guarantee.
300 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
301 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
302 self.assume_ordering::<NoOrder>(nondet)
303 }
304
305 /// Explicitly "casts" the keyed stream to a type with a different retries
306 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
307 /// be proven by the type-system.
308 ///
309 /// # Non-Determinism
310 /// This function is used as an escape hatch, and any mistakes in the
311 /// provided retries guarantee will propagate into the guarantees
312 /// for the rest of the program.
313 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
314 if R::RETRIES_KIND == R2::RETRIES_KIND {
315 KeyedStream::new(self.location, self.ir_node.into_inner())
316 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
317 // We can always weaken the retries guarantee
318 KeyedStream::new(
319 self.location.clone(),
320 HydroNode::Cast {
321 inner: Box::new(self.ir_node.into_inner()),
322 metadata: self
323 .location
324 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
325 },
326 )
327 } else {
328 KeyedStream::new(
329 self.location.clone(),
330 HydroNode::ObserveNonDet {
331 inner: Box::new(self.ir_node.into_inner()),
332 trusted: false,
333 metadata: self
334 .location
335 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
336 },
337 )
338 }
339 }
340
341 /// Flattens the keyed stream into an unordered stream of key-value pairs.
342 ///
343 /// # Example
344 /// ```rust
345 /// # use hydro_lang::prelude::*;
346 /// # use futures::StreamExt;
347 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
348 /// process
349 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
350 /// .into_keyed()
351 /// .entries()
352 /// # }, |mut stream| async move {
353 /// // (1, 2), (1, 3), (2, 4) in any order
354 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
355 /// # assert_eq!(stream.next().await.unwrap(), w);
356 /// # }
357 /// # }));
358 /// ```
359 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
360 Stream::new(
361 self.location.clone(),
362 HydroNode::Cast {
363 inner: Box::new(self.ir_node.into_inner()),
364 metadata: self
365 .location
366 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
367 },
368 )
369 }
370
371 /// Flattens the keyed stream into an unordered stream of only the values.
372 ///
373 /// # Example
374 /// ```rust
375 /// # use hydro_lang::prelude::*;
376 /// # use futures::StreamExt;
377 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
378 /// process
379 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
380 /// .into_keyed()
381 /// .values()
382 /// # }, |mut stream| async move {
383 /// // 2, 3, 4 in any order
384 /// # for w in vec![2, 3, 4] {
385 /// # assert_eq!(stream.next().await.unwrap(), w);
386 /// # }
387 /// # }));
388 /// ```
389 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
390 self.entries().map(q!(|(_, v)| v))
391 }
392
393 /// Transforms each value by invoking `f` on each element, with keys staying the same
394 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
395 ///
396 /// If you do not want to modify the stream and instead only want to view
397 /// each item use [`KeyedStream::inspect`] instead.
398 ///
399 /// # Example
400 /// ```rust
401 /// # use hydro_lang::prelude::*;
402 /// # use futures::StreamExt;
403 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
404 /// process
405 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
406 /// .into_keyed()
407 /// .map(q!(|v| v + 1))
408 /// # .entries()
409 /// # }, |mut stream| async move {
410 /// // { 1: [3, 4], 2: [5] }
411 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
412 /// # assert_eq!(stream.next().await.unwrap(), w);
413 /// # }
414 /// # }));
415 /// ```
416 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
417 where
418 F: Fn(V) -> U + 'a,
419 {
420 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
421 let map_f = q!({
422 let orig = f;
423 move |(k, v)| (k, orig(v))
424 })
425 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
426 .into();
427
428 KeyedStream::new(
429 self.location.clone(),
430 HydroNode::Map {
431 f: map_f,
432 input: Box::new(self.ir_node.into_inner()),
433 metadata: self
434 .location
435 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
436 },
437 )
438 }
439
440 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
441 /// re-grouped even they are tuples; instead they will be grouped under the original key.
442 ///
443 /// If you do not want to modify the stream and instead only want to view
444 /// each item use [`KeyedStream::inspect_with_key`] instead.
445 ///
446 /// # Example
447 /// ```rust
448 /// # use hydro_lang::prelude::*;
449 /// # use futures::StreamExt;
450 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
451 /// process
452 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
453 /// .into_keyed()
454 /// .map_with_key(q!(|(k, v)| k + v))
455 /// # .entries()
456 /// # }, |mut stream| async move {
457 /// // { 1: [3, 4], 2: [6] }
458 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
459 /// # assert_eq!(stream.next().await.unwrap(), w);
460 /// # }
461 /// # }));
462 /// ```
463 pub fn map_with_key<U, F>(
464 self,
465 f: impl IntoQuotedMut<'a, F, L> + Copy,
466 ) -> KeyedStream<K, U, L, B, O, R>
467 where
468 F: Fn((K, V)) -> U + 'a,
469 K: Clone,
470 {
471 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
472 let map_f = q!({
473 let orig = f;
474 move |(k, v)| {
475 let out = orig((Clone::clone(&k), v));
476 (k, out)
477 }
478 })
479 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
480 .into();
481
482 KeyedStream::new(
483 self.location.clone(),
484 HydroNode::Map {
485 f: map_f,
486 input: Box::new(self.ir_node.into_inner()),
487 metadata: self
488 .location
489 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
490 },
491 )
492 }
493
494 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
495 /// `f`, preserving the order of the elements within the group.
496 ///
497 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
498 /// not modify or take ownership of the values. If you need to modify the values while filtering
499 /// use [`KeyedStream::filter_map`] instead.
500 ///
501 /// # Example
502 /// ```rust
503 /// # use hydro_lang::prelude::*;
504 /// # use futures::StreamExt;
505 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
506 /// process
507 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
508 /// .into_keyed()
509 /// .filter(q!(|&x| x > 2))
510 /// # .entries()
511 /// # }, |mut stream| async move {
512 /// // { 1: [3], 2: [4] }
513 /// # for w in vec![(1, 3), (2, 4)] {
514 /// # assert_eq!(stream.next().await.unwrap(), w);
515 /// # }
516 /// # }));
517 /// ```
518 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
519 where
520 F: Fn(&V) -> bool + 'a,
521 {
522 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
523 let filter_f = q!({
524 let orig = f;
525 move |t: &(_, _)| orig(&t.1)
526 })
527 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
528 .into();
529
530 KeyedStream::new(
531 self.location.clone(),
532 HydroNode::Filter {
533 f: filter_f,
534 input: Box::new(self.ir_node.into_inner()),
535 metadata: self.location.new_node_metadata(Self::collection_kind()),
536 },
537 )
538 }
539
540 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
541 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
542 ///
543 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
544 /// not modify or take ownership of the values. If you need to modify the values while filtering
545 /// use [`KeyedStream::filter_map_with_key`] instead.
546 ///
547 /// # Example
548 /// ```rust
549 /// # use hydro_lang::prelude::*;
550 /// # use futures::StreamExt;
551 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552 /// process
553 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
554 /// .into_keyed()
555 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
556 /// # .entries()
557 /// # }, |mut stream| async move {
558 /// // { 1: [3], 2: [4] }
559 /// # for w in vec![(1, 3), (2, 4)] {
560 /// # assert_eq!(stream.next().await.unwrap(), w);
561 /// # }
562 /// # }));
563 /// ```
564 pub fn filter_with_key<F>(
565 self,
566 f: impl IntoQuotedMut<'a, F, L> + Copy,
567 ) -> KeyedStream<K, V, L, B, O, R>
568 where
569 F: Fn(&(K, V)) -> bool + 'a,
570 {
571 let filter_f = f
572 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
573 .into();
574
575 KeyedStream::new(
576 self.location.clone(),
577 HydroNode::Filter {
578 f: filter_f,
579 input: Box::new(self.ir_node.into_inner()),
580 metadata: self.location.new_node_metadata(Self::collection_kind()),
581 },
582 )
583 }
584
585 /// An operator that both filters and maps each value, with keys staying the same.
586 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
587 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
588 ///
589 /// # Example
590 /// ```rust
591 /// # use hydro_lang::prelude::*;
592 /// # use futures::StreamExt;
593 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594 /// process
595 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
596 /// .into_keyed()
597 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
598 /// # .entries()
599 /// # }, |mut stream| async move {
600 /// // { 1: [2], 2: [4] }
601 /// # for w in vec![(1, 2), (2, 4)] {
602 /// # assert_eq!(stream.next().await.unwrap(), w);
603 /// # }
604 /// # }));
605 /// ```
606 pub fn filter_map<U, F>(
607 self,
608 f: impl IntoQuotedMut<'a, F, L> + Copy,
609 ) -> KeyedStream<K, U, L, B, O, R>
610 where
611 F: Fn(V) -> Option<U> + 'a,
612 {
613 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
614 let filter_map_f = q!({
615 let orig = f;
616 move |(k, v)| orig(v).map(|o| (k, o))
617 })
618 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
619 .into();
620
621 KeyedStream::new(
622 self.location.clone(),
623 HydroNode::FilterMap {
624 f: filter_map_f,
625 input: Box::new(self.ir_node.into_inner()),
626 metadata: self
627 .location
628 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
629 },
630 )
631 }
632
633 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
634 /// re-grouped even they are tuples; instead they will be grouped under the original key.
635 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
636 ///
637 /// # Example
638 /// ```rust
639 /// # use hydro_lang::prelude::*;
640 /// # use futures::StreamExt;
641 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
642 /// process
643 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
644 /// .into_keyed()
645 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
646 /// # .entries()
647 /// # }, |mut stream| async move {
648 /// // { 2: [2] }
649 /// # for w in vec![(2, 2)] {
650 /// # assert_eq!(stream.next().await.unwrap(), w);
651 /// # }
652 /// # }));
653 /// ```
654 pub fn filter_map_with_key<U, F>(
655 self,
656 f: impl IntoQuotedMut<'a, F, L> + Copy,
657 ) -> KeyedStream<K, U, L, B, O, R>
658 where
659 F: Fn((K, V)) -> Option<U> + 'a,
660 K: Clone,
661 {
662 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
663 let filter_map_f = q!({
664 let orig = f;
665 move |(k, v)| {
666 let out = orig((Clone::clone(&k), v));
667 out.map(|o| (k, o))
668 }
669 })
670 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
671 .into();
672
673 KeyedStream::new(
674 self.location.clone(),
675 HydroNode::FilterMap {
676 f: filter_map_f,
677 input: Box::new(self.ir_node.into_inner()),
678 metadata: self
679 .location
680 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
681 },
682 )
683 }
684
685 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
686 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
687 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
688 ///
689 /// # Example
690 /// ```rust
691 /// # use hydro_lang::prelude::*;
692 /// # use futures::StreamExt;
693 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
694 /// let tick = process.tick();
695 /// let batch = process
696 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
697 /// .into_keyed()
698 /// .batch(&tick, nondet!(/** test */));
699 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
700 /// batch.cross_singleton(count).all_ticks().entries()
701 /// # }, |mut stream| async move {
702 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
703 /// # for w in vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))] {
704 /// # assert_eq!(stream.next().await.unwrap(), w);
705 /// # }
706 /// # }));
707 /// ```
708 pub fn cross_singleton<O2>(
709 self,
710 other: impl Into<Optional<O2, L, Bounded>>,
711 ) -> KeyedStream<K, (V, O2), L, B, O, R>
712 where
713 O2: Clone,
714 {
715 let other: Optional<O2, L, Bounded> = other.into();
716 check_matching_location(&self.location, &other.location);
717
718 Stream::new(
719 self.location.clone(),
720 HydroNode::CrossSingleton {
721 left: Box::new(self.ir_node.into_inner()),
722 right: Box::new(other.ir_node.into_inner()),
723 metadata: self
724 .location
725 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
726 },
727 )
728 .map(q!(|((k, v), o2)| (k, (v, o2))))
729 .into_keyed()
730 }
731
732 /// For each value `v` in each group, transform `v` using `f` and then treat the
733 /// result as an [`Iterator`] to produce values one by one within the same group.
734 /// The implementation for [`Iterator`] for the output type `I` must produce items
735 /// in a **deterministic** order.
736 ///
737 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
738 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
739 ///
740 /// # Example
741 /// ```rust
742 /// # use hydro_lang::prelude::*;
743 /// # use futures::StreamExt;
744 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
745 /// process
746 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
747 /// .into_keyed()
748 /// .flat_map_ordered(q!(|x| x))
749 /// # .entries()
750 /// # }, |mut stream| async move {
751 /// // { 1: [2, 3, 4], 2: [5, 6] }
752 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
753 /// # assert_eq!(stream.next().await.unwrap(), w);
754 /// # }
755 /// # }));
756 /// ```
757 pub fn flat_map_ordered<U, I, F>(
758 self,
759 f: impl IntoQuotedMut<'a, F, L> + Copy,
760 ) -> KeyedStream<K, U, L, B, O, R>
761 where
762 I: IntoIterator<Item = U>,
763 F: Fn(V) -> I + 'a,
764 K: Clone,
765 {
766 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
767 let flat_map_f = q!({
768 let orig = f;
769 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
770 })
771 .splice_fn1_ctx::<(K, V), _>(&self.location)
772 .into();
773
774 KeyedStream::new(
775 self.location.clone(),
776 HydroNode::FlatMap {
777 f: flat_map_f,
778 input: Box::new(self.ir_node.into_inner()),
779 metadata: self
780 .location
781 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
782 },
783 )
784 }
785
786 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
787 /// for the output type `I` to produce items in any order.
788 ///
789 /// # Example
790 /// ```rust
791 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
792 /// # use futures::StreamExt;
793 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
794 /// process
795 /// .source_iter(q!(vec![
796 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
797 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
798 /// ]))
799 /// .into_keyed()
800 /// .flat_map_unordered(q!(|x| x))
801 /// # .entries()
802 /// # }, |mut stream| async move {
803 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
804 /// # let mut results = Vec::new();
805 /// # for _ in 0..4 {
806 /// # results.push(stream.next().await.unwrap());
807 /// # }
808 /// # results.sort();
809 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
810 /// # }));
811 /// ```
812 pub fn flat_map_unordered<U, I, F>(
813 self,
814 f: impl IntoQuotedMut<'a, F, L> + Copy,
815 ) -> KeyedStream<K, U, L, B, NoOrder, R>
816 where
817 I: IntoIterator<Item = U>,
818 F: Fn(V) -> I + 'a,
819 K: Clone,
820 {
821 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
822 let flat_map_f = q!({
823 let orig = f;
824 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
825 })
826 .splice_fn1_ctx::<(K, V), _>(&self.location)
827 .into();
828
829 KeyedStream::new(
830 self.location.clone(),
831 HydroNode::FlatMap {
832 f: flat_map_f,
833 input: Box::new(self.ir_node.into_inner()),
834 metadata: self
835 .location
836 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
837 },
838 )
839 }
840
841 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
842 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
843 /// items in a **deterministic** order.
844 ///
845 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
846 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
847 ///
848 /// # Example
849 /// ```rust
850 /// # use hydro_lang::prelude::*;
851 /// # use futures::StreamExt;
852 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
853 /// process
854 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
855 /// .into_keyed()
856 /// .flatten_ordered()
857 /// # .entries()
858 /// # }, |mut stream| async move {
859 /// // { 1: [2, 3, 4], 2: [5, 6] }
860 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
861 /// # assert_eq!(stream.next().await.unwrap(), w);
862 /// # }
863 /// # }));
864 /// ```
865 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
866 where
867 V: IntoIterator<Item = U>,
868 K: Clone,
869 {
870 self.flat_map_ordered(q!(|d| d))
871 }
872
873 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
874 /// for the value type `V` to produce items in any order.
875 ///
876 /// # Example
877 /// ```rust
878 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
879 /// # use futures::StreamExt;
880 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
881 /// process
882 /// .source_iter(q!(vec![
883 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
884 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
885 /// ]))
886 /// .into_keyed()
887 /// .flatten_unordered()
888 /// # .entries()
889 /// # }, |mut stream| async move {
890 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
891 /// # let mut results = Vec::new();
892 /// # for _ in 0..4 {
893 /// # results.push(stream.next().await.unwrap());
894 /// # }
895 /// # results.sort();
896 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
897 /// # }));
898 /// ```
899 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
900 where
901 V: IntoIterator<Item = U>,
902 K: Clone,
903 {
904 self.flat_map_unordered(q!(|d| d))
905 }
906
907 /// An operator which allows you to "inspect" each element of a stream without
908 /// modifying it. The closure `f` is called on a reference to each value. This is
909 /// mainly useful for debugging, and should not be used to generate side-effects.
910 ///
911 /// # Example
912 /// ```rust
913 /// # use hydro_lang::prelude::*;
914 /// # use futures::StreamExt;
915 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
916 /// process
917 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
918 /// .into_keyed()
919 /// .inspect(q!(|v| println!("{}", v)))
920 /// # .entries()
921 /// # }, |mut stream| async move {
922 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
923 /// # assert_eq!(stream.next().await.unwrap(), w);
924 /// # }
925 /// # }));
926 /// ```
927 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
928 where
929 F: Fn(&V) + 'a,
930 {
931 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
932 let inspect_f = q!({
933 let orig = f;
934 move |t: &(_, _)| orig(&t.1)
935 })
936 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
937 .into();
938
939 KeyedStream::new(
940 self.location.clone(),
941 HydroNode::Inspect {
942 f: inspect_f,
943 input: Box::new(self.ir_node.into_inner()),
944 metadata: self.location.new_node_metadata(Self::collection_kind()),
945 },
946 )
947 }
948
949 /// An operator which allows you to "inspect" each element of a stream without
950 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
951 /// mainly useful for debugging, and should not be used to generate side-effects.
952 ///
953 /// # Example
954 /// ```rust
955 /// # use hydro_lang::prelude::*;
956 /// # use futures::StreamExt;
957 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
958 /// process
959 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
960 /// .into_keyed()
961 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
962 /// # .entries()
963 /// # }, |mut stream| async move {
964 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
965 /// # assert_eq!(stream.next().await.unwrap(), w);
966 /// # }
967 /// # }));
968 /// ```
969 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
970 where
971 F: Fn(&(K, V)) + 'a,
972 {
973 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
974
975 KeyedStream::new(
976 self.location.clone(),
977 HydroNode::Inspect {
978 f: inspect_f,
979 input: Box::new(self.ir_node.into_inner()),
980 metadata: self.location.new_node_metadata(Self::collection_kind()),
981 },
982 )
983 }
984
985 /// An operator which allows you to "name" a `HydroNode`.
986 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
987 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
988 {
989 let mut node = self.ir_node.borrow_mut();
990 let metadata = node.metadata_mut();
991 metadata.tag = Some(name.to_string());
992 }
993 self
994 }
995}
996
997impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
998 KeyedStream<K, V, L, Unbounded, O, R>
999{
1000 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
1001 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
1002 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
1003 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
1004 ///
1005 /// Currently, both input streams must be [`Unbounded`].
1006 ///
1007 /// # Example
1008 /// ```rust
1009 /// # use hydro_lang::prelude::*;
1010 /// # use futures::StreamExt;
1011 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1012 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
1013 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
1014 /// numbers1.interleave(numbers2)
1015 /// # .entries()
1016 /// # }, |mut stream| async move {
1017 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
1018 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
1019 /// # assert_eq!(stream.next().await.unwrap(), w);
1020 /// # }
1021 /// # }));
1022 /// ```
1023 pub fn interleave<O2: Ordering, R2: Retries>(
1024 self,
1025 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
1026 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1027 where
1028 R: MinRetries<R2>,
1029 {
1030 let tick = self.location.tick();
1031 // Because the outputs are unordered, we can interleave batches from both streams.
1032 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1033 self.batch(&tick, nondet_batch_interleaving)
1034 .weakest_ordering()
1035 .chain(
1036 other
1037 .batch(&tick, nondet_batch_interleaving)
1038 .weakest_ordering(),
1039 )
1040 .all_ticks()
1041 }
1042}
1043
1044/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1045/// control the processing of future elements.
1046pub enum Generate<T> {
1047 /// Emit the provided element, and keep processing future inputs.
1048 Yield(T),
1049 /// Emit the provided element as the _final_ element, do not process future inputs.
1050 Return(T),
1051 /// Do not emit anything, but continue processing future inputs.
1052 Continue,
1053 /// Do not emit anything, and do not process further inputs.
1054 Break,
1055}
1056
1057impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1058where
1059 K: Eq + Hash,
1060 L: Location<'a>,
1061{
1062 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1063 ///
1064 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1065 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1066 /// early by returning `None`.
1067 ///
1068 /// The function takes a mutable reference to the accumulator and the current element, and returns
1069 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1070 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1071 ///
1072 /// # Example
1073 /// ```rust
1074 /// # use hydro_lang::prelude::*;
1075 /// # use futures::StreamExt;
1076 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1077 /// process
1078 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1079 /// .into_keyed()
1080 /// .scan(
1081 /// q!(|| 0),
1082 /// q!(|acc, x| {
1083 /// *acc += x;
1084 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1085 /// }),
1086 /// )
1087 /// # .entries()
1088 /// # }, |mut stream| async move {
1089 /// // Output: { 0: [1], 1: [3, 7] }
1090 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1091 /// # assert_eq!(stream.next().await.unwrap(), w);
1092 /// # }
1093 /// # }));
1094 /// ```
1095 pub fn scan<A, U, I, F>(
1096 self,
1097 init: impl IntoQuotedMut<'a, I, L> + Copy,
1098 f: impl IntoQuotedMut<'a, F, L> + Copy,
1099 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1100 where
1101 K: Clone,
1102 I: Fn() -> A + 'a,
1103 F: Fn(&mut A, V) -> Option<U> + 'a,
1104 {
1105 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1106 self.generator(
1107 init,
1108 q!({
1109 let orig = f;
1110 move |state, v| {
1111 if let Some(out) = orig(state, v) {
1112 Generate::Yield(out)
1113 } else {
1114 Generate::Break
1115 }
1116 }
1117 }),
1118 )
1119 }
1120
1121 /// Iteratively processes the elements in each group using a state machine that can yield
1122 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1123 /// syntax in Rust, without requiring special syntax.
1124 ///
1125 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1126 /// state for each group. The second argument defines the processing logic, taking in a
1127 /// mutable reference to the group's state and the value to be processed. It emits a
1128 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1129 /// should be processed.
1130 ///
1131 /// # Example
1132 /// ```rust
1133 /// # use hydro_lang::prelude::*;
1134 /// # use futures::StreamExt;
1135 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1136 /// process
1137 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1138 /// .into_keyed()
1139 /// .generator(
1140 /// q!(|| 0),
1141 /// q!(|acc, x| {
1142 /// *acc += x;
1143 /// if *acc > 100 {
1144 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1145 /// "done!".to_string()
1146 /// )
1147 /// } else if *acc % 2 == 0 {
1148 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1149 /// "even".to_string()
1150 /// )
1151 /// } else {
1152 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1153 /// }
1154 /// }),
1155 /// )
1156 /// # .entries()
1157 /// # }, |mut stream| async move {
1158 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1159 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1160 /// # assert_eq!(stream.next().await.unwrap(), w);
1161 /// # }
1162 /// # }));
1163 /// ```
1164 pub fn generator<A, U, I, F>(
1165 self,
1166 init: impl IntoQuotedMut<'a, I, L> + Copy,
1167 f: impl IntoQuotedMut<'a, F, L> + Copy,
1168 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1169 where
1170 K: Clone,
1171 I: Fn() -> A + 'a,
1172 F: Fn(&mut A, V) -> Generate<U> + 'a,
1173 {
1174 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1175 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1176
1177 let scan_init = q!(|| HashMap::new())
1178 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1179 .into();
1180 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1181 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1182 if let Some(existing_state_value) = existing_state {
1183 match f(existing_state_value, v) {
1184 Generate::Yield(out) => Some(Some((k, out))),
1185 Generate::Return(out) => {
1186 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1187 Some(Some((k, out)))
1188 }
1189 Generate::Break => {
1190 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1191 Some(None)
1192 }
1193 Generate::Continue => Some(None),
1194 }
1195 } else {
1196 Some(None)
1197 }
1198 })
1199 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1200 .into();
1201
1202 let scan_node = HydroNode::Scan {
1203 init: scan_init,
1204 acc: scan_f,
1205 input: Box::new(self.ir_node.into_inner()),
1206 metadata: self.location.new_node_metadata(Stream::<
1207 Option<(K, U)>,
1208 L,
1209 B,
1210 TotalOrder,
1211 ExactlyOnce,
1212 >::collection_kind()),
1213 };
1214
1215 let flatten_f = q!(|d| d)
1216 .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1217 .into();
1218 let flatten_node = HydroNode::FlatMap {
1219 f: flatten_f,
1220 input: Box::new(scan_node),
1221 metadata: self.location.new_node_metadata(KeyedStream::<
1222 K,
1223 U,
1224 L,
1225 B,
1226 TotalOrder,
1227 ExactlyOnce,
1228 >::collection_kind()),
1229 };
1230
1231 KeyedStream::new(self.location.clone(), flatten_node)
1232 }
1233
1234 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1235 /// in-order across the values in each group. But the aggregation function returns a boolean,
1236 /// which when true indicates that the aggregated result is complete and can be released to
1237 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1238 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1239 /// normal stream elements.
1240 ///
1241 /// # Example
1242 /// ```rust
1243 /// # use hydro_lang::prelude::*;
1244 /// # use futures::StreamExt;
1245 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1246 /// process
1247 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1248 /// .into_keyed()
1249 /// .fold_early_stop(
1250 /// q!(|| 0),
1251 /// q!(|acc, x| {
1252 /// *acc += x;
1253 /// x % 2 == 0
1254 /// }),
1255 /// )
1256 /// # .entries()
1257 /// # }, |mut stream| async move {
1258 /// // Output: { 0: 2, 1: 9 }
1259 /// # for w in vec![(0, 2), (1, 9)] {
1260 /// # assert_eq!(stream.next().await.unwrap(), w);
1261 /// # }
1262 /// # }));
1263 /// ```
1264 pub fn fold_early_stop<A, I, F>(
1265 self,
1266 init: impl IntoQuotedMut<'a, I, L> + Copy,
1267 f: impl IntoQuotedMut<'a, F, L> + Copy,
1268 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1269 where
1270 K: Clone,
1271 I: Fn() -> A + 'a,
1272 F: Fn(&mut A, V) -> bool + 'a,
1273 {
1274 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1275 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1276 let out_without_bound_cast = self.generator(
1277 q!(move || Some(init())),
1278 q!(move |key_state, v| {
1279 if let Some(key_state_value) = key_state.as_mut() {
1280 if f(key_state_value, v) {
1281 Generate::Return(key_state.take().unwrap())
1282 } else {
1283 Generate::Continue
1284 }
1285 } else {
1286 unreachable!()
1287 }
1288 }),
1289 );
1290
1291 KeyedSingleton::new(
1292 out_without_bound_cast.location.clone(),
1293 HydroNode::Cast {
1294 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1295 metadata: out_without_bound_cast
1296 .location
1297 .new_node_metadata(
1298 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1299 ),
1300 },
1301 )
1302 }
1303
1304 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1305 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1306 /// otherwise the first element would be non-deterministic.
1307 ///
1308 /// # Example
1309 /// ```rust
1310 /// # use hydro_lang::prelude::*;
1311 /// # use futures::StreamExt;
1312 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1313 /// process
1314 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1315 /// .into_keyed()
1316 /// .first()
1317 /// # .entries()
1318 /// # }, |mut stream| async move {
1319 /// // Output: { 0: 2, 1: 3 }
1320 /// # for w in vec![(0, 2), (1, 3)] {
1321 /// # assert_eq!(stream.next().await.unwrap(), w);
1322 /// # }
1323 /// # }));
1324 /// ```
1325 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1326 where
1327 K: Clone,
1328 {
1329 self.fold_early_stop(
1330 q!(|| None),
1331 q!(|acc, v| {
1332 *acc = Some(v);
1333 true
1334 }),
1335 )
1336 .map(q!(|v| v.unwrap()))
1337 }
1338
1339 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1340 ///
1341 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1342 /// to depend on the order of elements in the group.
1343 ///
1344 /// If the input and output value types are the same and do not require initialization then use
1345 /// [`KeyedStream::reduce`].
1346 ///
1347 /// # Example
1348 /// ```rust
1349 /// # use hydro_lang::prelude::*;
1350 /// # use futures::StreamExt;
1351 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1352 /// let tick = process.tick();
1353 /// let numbers = process
1354 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1355 /// .into_keyed();
1356 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1357 /// batch
1358 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
1359 /// .entries()
1360 /// .all_ticks()
1361 /// # }, |mut stream| async move {
1362 /// // (1, 5), (2, 7)
1363 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1364 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1365 /// # }));
1366 /// ```
1367 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1368 self,
1369 init: impl IntoQuotedMut<'a, I, L>,
1370 comb: impl IntoQuotedMut<'a, F, L>,
1371 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1372 let init = init.splice_fn0_ctx(&self.location).into();
1373 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1374
1375 KeyedSingleton::new(
1376 self.location.clone(),
1377 HydroNode::FoldKeyed {
1378 init,
1379 acc: comb,
1380 input: Box::new(self.ir_node.into_inner()),
1381 metadata: self.location.new_node_metadata(KeyedSingleton::<
1382 K,
1383 A,
1384 L,
1385 B::WhenValueUnbounded,
1386 >::collection_kind()),
1387 },
1388 )
1389 }
1390
1391 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1392 ///
1393 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1394 /// to depend on the order of elements in the stream.
1395 ///
1396 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1397 ///
1398 /// # Example
1399 /// ```rust
1400 /// # use hydro_lang::prelude::*;
1401 /// # use futures::StreamExt;
1402 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1403 /// let tick = process.tick();
1404 /// let numbers = process
1405 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1406 /// .into_keyed();
1407 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1408 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1409 /// # }, |mut stream| async move {
1410 /// // (1, 5), (2, 7)
1411 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1412 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1413 /// # }));
1414 /// ```
1415 pub fn reduce<F: Fn(&mut V, V) + 'a>(
1416 self,
1417 comb: impl IntoQuotedMut<'a, F, L>,
1418 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1419 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1420
1421 KeyedSingleton::new(
1422 self.location.clone(),
1423 HydroNode::ReduceKeyed {
1424 f,
1425 input: Box::new(self.ir_node.into_inner()),
1426 metadata: self.location.new_node_metadata(KeyedSingleton::<
1427 K,
1428 V,
1429 L,
1430 B::WhenValueUnbounded,
1431 >::collection_kind()),
1432 },
1433 )
1434 }
1435
1436 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1437 ///
1438 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1439 /// to depend on the order of elements in the stream.
1440 ///
1441 /// # Example
1442 /// ```rust
1443 /// # use hydro_lang::prelude::*;
1444 /// # use futures::StreamExt;
1445 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1446 /// let tick = process.tick();
1447 /// let watermark = tick.singleton(q!(1));
1448 /// let numbers = process
1449 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1450 /// .into_keyed();
1451 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1452 /// batch
1453 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1454 /// .entries()
1455 /// .all_ticks()
1456 /// # }, |mut stream| async move {
1457 /// // (2, 204)
1458 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1459 /// # }));
1460 /// ```
1461 pub fn reduce_watermark<O, F>(
1462 self,
1463 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1464 comb: impl IntoQuotedMut<'a, F, L>,
1465 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1466 where
1467 O: Clone,
1468 F: Fn(&mut V, V) + 'a,
1469 {
1470 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1471 check_matching_location(&self.location.root(), other.location.outer());
1472 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1473
1474 KeyedSingleton::new(
1475 self.location.clone(),
1476 HydroNode::ReduceKeyedWatermark {
1477 f,
1478 input: Box::new(self.ir_node.into_inner()),
1479 watermark: Box::new(other.ir_node.into_inner()),
1480 metadata: self.location.new_node_metadata(KeyedSingleton::<
1481 K,
1482 V,
1483 L,
1484 B::WhenValueUnbounded,
1485 >::collection_kind()),
1486 },
1487 )
1488 }
1489}
1490
1491impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1492where
1493 K: Eq + Hash,
1494 L: Location<'a>,
1495{
1496 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1497 ///
1498 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1499 ///
1500 /// If the input and output value types are the same and do not require initialization then use
1501 /// [`KeyedStream::reduce_commutative`].
1502 ///
1503 /// # Example
1504 /// ```rust
1505 /// # use hydro_lang::prelude::*;
1506 /// # use futures::StreamExt;
1507 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1508 /// let tick = process.tick();
1509 /// let numbers = process
1510 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1511 /// .into_keyed();
1512 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1513 /// batch
1514 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1515 /// .entries()
1516 /// .all_ticks()
1517 /// # }, |mut stream| async move {
1518 /// // (1, 5), (2, 7)
1519 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1520 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1521 /// # }));
1522 /// ```
1523 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1524 self,
1525 init: impl IntoQuotedMut<'a, I, L>,
1526 comb: impl IntoQuotedMut<'a, F, L>,
1527 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1528 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1529 .fold(init, comb)
1530 }
1531
1532 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1533 ///
1534 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1535 ///
1536 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1537 ///
1538 /// # Example
1539 /// ```rust
1540 /// # use hydro_lang::prelude::*;
1541 /// # use futures::StreamExt;
1542 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1543 /// let tick = process.tick();
1544 /// let numbers = process
1545 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1546 /// .into_keyed();
1547 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1548 /// batch
1549 /// .reduce_commutative(q!(|acc, x| *acc += x))
1550 /// .entries()
1551 /// .all_ticks()
1552 /// # }, |mut stream| async move {
1553 /// // (1, 5), (2, 7)
1554 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1555 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1556 /// # }));
1557 /// ```
1558 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1559 self,
1560 comb: impl IntoQuotedMut<'a, F, L>,
1561 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1562 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1563 .reduce(comb)
1564 }
1565
1566 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1567 ///
1568 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1569 ///
1570 /// # Example
1571 /// ```rust
1572 /// # use hydro_lang::prelude::*;
1573 /// # use futures::StreamExt;
1574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1575 /// let tick = process.tick();
1576 /// let watermark = tick.singleton(q!(1));
1577 /// let numbers = process
1578 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1579 /// .into_keyed();
1580 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1581 /// batch
1582 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1583 /// .entries()
1584 /// .all_ticks()
1585 /// # }, |mut stream| async move {
1586 /// // (2, 204)
1587 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1588 /// # }));
1589 /// ```
1590 pub fn reduce_watermark_commutative<O2, F>(
1591 self,
1592 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1593 comb: impl IntoQuotedMut<'a, F, L>,
1594 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1595 where
1596 O2: Clone,
1597 F: Fn(&mut V, V) + 'a,
1598 {
1599 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1600 .reduce_watermark(other, comb)
1601 }
1602
1603 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1604 ///
1605 /// # Example
1606 /// ```rust
1607 /// # use hydro_lang::prelude::*;
1608 /// # use futures::StreamExt;
1609 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1610 /// let tick = process.tick();
1611 /// let numbers = process
1612 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1613 /// .into_keyed();
1614 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1615 /// batch
1616 /// .value_counts()
1617 /// .entries()
1618 /// .all_ticks()
1619 /// # }, |mut stream| async move {
1620 /// // (1, 3), (2, 2)
1621 /// # assert_eq!(stream.next().await.unwrap(), (1, 3));
1622 /// # assert_eq!(stream.next().await.unwrap(), (2, 2));
1623 /// # }));
1624 /// ```
1625 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded> {
1626 self.assume_ordering_trusted(
1627 nondet!(/** ordering within each group affects neither result nor intermediates */),
1628 )
1629 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1630 }
1631}
1632
1633impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1634where
1635 K: Eq + Hash,
1636 L: Location<'a>,
1637{
1638 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1639 ///
1640 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1641 ///
1642 /// If the input and output value types are the same and do not require initialization then use
1643 /// [`KeyedStream::reduce_idempotent`].
1644 ///
1645 /// # Example
1646 /// ```rust
1647 /// # use hydro_lang::prelude::*;
1648 /// # use futures::StreamExt;
1649 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1650 /// let tick = process.tick();
1651 /// let numbers = process
1652 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1653 /// .into_keyed();
1654 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1655 /// batch
1656 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1657 /// .entries()
1658 /// .all_ticks()
1659 /// # }, |mut stream| async move {
1660 /// // (1, false), (2, true)
1661 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1662 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1663 /// # }));
1664 /// ```
1665 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1666 self,
1667 init: impl IntoQuotedMut<'a, I, L>,
1668 comb: impl IntoQuotedMut<'a, F, L>,
1669 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1670 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1671 .fold(init, comb)
1672 }
1673
1674 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1675 ///
1676 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1677 ///
1678 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1679 ///
1680 /// # Example
1681 /// ```rust
1682 /// # use hydro_lang::prelude::*;
1683 /// # use futures::StreamExt;
1684 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1685 /// let tick = process.tick();
1686 /// let numbers = process
1687 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1688 /// .into_keyed();
1689 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1690 /// batch
1691 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1692 /// .entries()
1693 /// .all_ticks()
1694 /// # }, |mut stream| async move {
1695 /// // (1, false), (2, true)
1696 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1697 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1698 /// # }));
1699 /// ```
1700 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1701 self,
1702 comb: impl IntoQuotedMut<'a, F, L>,
1703 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1704 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1705 .reduce(comb)
1706 }
1707
1708 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1709 ///
1710 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1711 ///
1712 /// # Example
1713 /// ```rust
1714 /// # use hydro_lang::prelude::*;
1715 /// # use futures::StreamExt;
1716 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1717 /// let tick = process.tick();
1718 /// let watermark = tick.singleton(q!(1));
1719 /// let numbers = process
1720 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1721 /// .into_keyed();
1722 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1723 /// batch
1724 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1725 /// .entries()
1726 /// .all_ticks()
1727 /// # }, |mut stream| async move {
1728 /// // (2, true)
1729 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1730 /// # }));
1731 /// ```
1732 pub fn reduce_watermark_idempotent<O2, F>(
1733 self,
1734 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1735 comb: impl IntoQuotedMut<'a, F, L>,
1736 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1737 where
1738 O2: Clone,
1739 F: Fn(&mut V, V) + 'a,
1740 {
1741 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1742 .reduce_watermark(other, comb)
1743 }
1744}
1745
1746impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1747where
1748 K: Eq + Hash,
1749 L: Location<'a>,
1750{
1751 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1752 ///
1753 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1754 /// as there may be non-deterministic duplicates.
1755 ///
1756 /// If the input and output value types are the same and do not require initialization then use
1757 /// [`KeyedStream::reduce_commutative_idempotent`].
1758 ///
1759 /// # Example
1760 /// ```rust
1761 /// # use hydro_lang::prelude::*;
1762 /// # use futures::StreamExt;
1763 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1764 /// let tick = process.tick();
1765 /// let numbers = process
1766 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1767 /// .into_keyed();
1768 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1769 /// batch
1770 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1771 /// .entries()
1772 /// .all_ticks()
1773 /// # }, |mut stream| async move {
1774 /// // (1, false), (2, true)
1775 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1776 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1777 /// # }));
1778 /// ```
1779 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1780 self,
1781 init: impl IntoQuotedMut<'a, I, L>,
1782 comb: impl IntoQuotedMut<'a, F, L>,
1783 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1784 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1785 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1786 .fold(init, comb)
1787 }
1788
1789 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1790 ///
1791 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1792 /// as there may be non-deterministic duplicates.
1793 ///
1794 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1795 ///
1796 /// # Example
1797 /// ```rust
1798 /// # use hydro_lang::prelude::*;
1799 /// # use futures::StreamExt;
1800 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1801 /// let tick = process.tick();
1802 /// let numbers = process
1803 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1804 /// .into_keyed();
1805 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1806 /// batch
1807 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1808 /// .entries()
1809 /// .all_ticks()
1810 /// # }, |mut stream| async move {
1811 /// // (1, false), (2, true)
1812 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1813 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1814 /// # }));
1815 /// ```
1816 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1817 self,
1818 comb: impl IntoQuotedMut<'a, F, L>,
1819 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1820 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1821 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1822 .reduce(comb)
1823 }
1824
1825 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1826 ///
1827 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1828 /// as there may be non-deterministic duplicates.
1829 ///
1830 /// # Example
1831 /// ```rust
1832 /// # use hydro_lang::prelude::*;
1833 /// # use futures::StreamExt;
1834 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1835 /// let tick = process.tick();
1836 /// let watermark = tick.singleton(q!(1));
1837 /// let numbers = process
1838 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1839 /// .into_keyed();
1840 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1841 /// batch
1842 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1843 /// .entries()
1844 /// .all_ticks()
1845 /// # }, |mut stream| async move {
1846 /// // (2, true)
1847 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1848 /// # }));
1849 /// ```
1850 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1851 self,
1852 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1853 comb: impl IntoQuotedMut<'a, F, L>,
1854 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1855 where
1856 O2: Clone,
1857 F: Fn(&mut V, V) + 'a,
1858 {
1859 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1860 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1861 .reduce_watermark(other, comb)
1862 }
1863
1864 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1865 /// whose keys are not in the bounded stream.
1866 ///
1867 /// # Example
1868 /// ```rust
1869 /// # use hydro_lang::prelude::*;
1870 /// # use futures::StreamExt;
1871 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1872 /// let tick = process.tick();
1873 /// let keyed_stream = process
1874 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1875 /// .batch(&tick, nondet!(/** test */))
1876 /// .into_keyed();
1877 /// let keys_to_remove = process
1878 /// .source_iter(q!(vec![1, 2]))
1879 /// .batch(&tick, nondet!(/** test */));
1880 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1881 /// # .entries()
1882 /// # }, |mut stream| async move {
1883 /// // { 3: ['c'], 4: ['d'] }
1884 /// # for w in vec![(3, 'c'), (4, 'd')] {
1885 /// # assert_eq!(stream.next().await.unwrap(), w);
1886 /// # }
1887 /// # }));
1888 /// ```
1889 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1890 self,
1891 other: Stream<K, L, Bounded, O2, R2>,
1892 ) -> Self {
1893 check_matching_location(&self.location, &other.location);
1894
1895 KeyedStream::new(
1896 self.location.clone(),
1897 HydroNode::AntiJoin {
1898 pos: Box::new(self.ir_node.into_inner()),
1899 neg: Box::new(other.ir_node.into_inner()),
1900 metadata: self.location.new_node_metadata(Self::collection_kind()),
1901 },
1902 )
1903 }
1904}
1905
1906impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1907where
1908 L: Location<'a>,
1909{
1910 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1911 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1912 ///
1913 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1914 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1915 /// argument that declares where the stream will be atomically processed. Batching a stream into
1916 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1917 /// [`Tick`] will introduce asynchrony.
1918 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1919 let out_location = Atomic { tick: tick.clone() };
1920 KeyedStream::new(
1921 out_location.clone(),
1922 HydroNode::BeginAtomic {
1923 inner: Box::new(self.ir_node.into_inner()),
1924 metadata: out_location
1925 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1926 },
1927 )
1928 }
1929
1930 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1931 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1932 /// the order of the input.
1933 ///
1934 /// # Non-Determinism
1935 /// The batch boundaries are non-deterministic and may change across executions.
1936 pub fn batch(
1937 self,
1938 tick: &Tick<L>,
1939 nondet: NonDet,
1940 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1941 let _ = nondet;
1942 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1943 KeyedStream::new(
1944 tick.clone(),
1945 HydroNode::Batch {
1946 inner: Box::new(self.ir_node.into_inner()),
1947 metadata: tick.new_node_metadata(
1948 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1949 ),
1950 },
1951 )
1952 }
1953}
1954
1955impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1956where
1957 L: Location<'a> + NoTick,
1958{
1959 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1960 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1961 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1962 /// used to create the atomic section.
1963 ///
1964 /// # Non-Determinism
1965 /// The batch boundaries are non-deterministic and may change across executions.
1966 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1967 let _ = nondet;
1968 KeyedStream::new(
1969 self.location.clone().tick,
1970 HydroNode::Batch {
1971 inner: Box::new(self.ir_node.into_inner()),
1972 metadata: self.location.tick.new_node_metadata(KeyedStream::<
1973 K,
1974 V,
1975 Tick<L>,
1976 Bounded,
1977 O,
1978 R,
1979 >::collection_kind(
1980 )),
1981 },
1982 )
1983 }
1984
1985 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1986 /// See [`KeyedStream::atomic`] for more details.
1987 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1988 KeyedStream::new(
1989 self.location.tick.l.clone(),
1990 HydroNode::EndAtomic {
1991 inner: Box::new(self.ir_node.into_inner()),
1992 metadata: self
1993 .location
1994 .tick
1995 .l
1996 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1997 },
1998 )
1999 }
2000}
2001
2002impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
2003where
2004 L: Location<'a>,
2005{
2006 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2007 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2008 /// is only present in one of the inputs, its values are passed through as-is). The output has
2009 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2010 ///
2011 /// Currently, both input streams must be [`Bounded`]. This operator will block
2012 /// on the first stream until all its elements are available. In a future version,
2013 /// we will relax the requirement on the `other` stream.
2014 ///
2015 /// # Example
2016 /// ```rust
2017 /// # use hydro_lang::prelude::*;
2018 /// # use futures::StreamExt;
2019 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2020 /// let tick = process.tick();
2021 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2022 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2023 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2024 /// # .entries()
2025 /// # }, |mut stream| async move {
2026 /// // { 0: [2, 1], 1: [4, 3] }
2027 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
2028 /// # assert_eq!(stream.next().await.unwrap(), w);
2029 /// # }
2030 /// # }));
2031 /// ```
2032 pub fn chain<O2: Ordering, R2: Retries>(
2033 self,
2034 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2035 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2036 where
2037 O: MinOrder<O2>,
2038 R: MinRetries<R2>,
2039 {
2040 check_matching_location(&self.location, &other.location);
2041
2042 KeyedStream::new(
2043 self.location.clone(),
2044 HydroNode::Chain {
2045 first: Box::new(self.ir_node.into_inner()),
2046 second: Box::new(other.ir_node.into_inner()),
2047 metadata: self.location.new_node_metadata(KeyedStream::<
2048 K,
2049 V,
2050 L,
2051 Bounded,
2052 <O as MinOrder<O2>>::Min,
2053 <R as MinRetries<R2>>::Min,
2054 >::collection_kind()),
2055 },
2056 )
2057 }
2058}
2059
2060impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2061where
2062 L: Location<'a>,
2063{
2064 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2065 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2066 /// each key.
2067 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2068 KeyedStream::new(
2069 self.location.outer().clone(),
2070 HydroNode::YieldConcat {
2071 inner: Box::new(self.ir_node.into_inner()),
2072 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2073 K,
2074 V,
2075 L,
2076 Unbounded,
2077 O,
2078 R,
2079 >::collection_kind(
2080 )),
2081 },
2082 )
2083 }
2084
2085 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2086 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2087 /// each key.
2088 ///
2089 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2090 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2091 /// stream's [`Tick`] context.
2092 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2093 let out_location = Atomic {
2094 tick: self.location.clone(),
2095 };
2096
2097 KeyedStream::new(
2098 out_location.clone(),
2099 HydroNode::YieldConcat {
2100 inner: Box::new(self.ir_node.into_inner()),
2101 metadata: out_location.new_node_metadata(KeyedStream::<
2102 K,
2103 V,
2104 Atomic<L>,
2105 Unbounded,
2106 O,
2107 R,
2108 >::collection_kind()),
2109 },
2110 )
2111 }
2112
2113 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2114 /// tick `T` always has the entries of `self` at tick `T - 1`.
2115 ///
2116 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2117 ///
2118 /// This operator enables stateful iterative processing with ticks, by sending data from one
2119 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2120 ///
2121 /// # Example
2122 /// ```rust
2123 /// # use hydro_lang::prelude::*;
2124 /// # use futures::StreamExt;
2125 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2126 /// let tick = process.tick();
2127 /// # // ticks are lazy by default, forces the second tick to run
2128 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2129 /// # let batch_first_tick = process
2130 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2131 /// # .batch(&tick, nondet!(/** test */))
2132 /// # .into_keyed();
2133 /// # let batch_second_tick = process
2134 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2135 /// # .batch(&tick, nondet!(/** test */))
2136 /// # .defer_tick()
2137 /// # .into_keyed(); // appears on the second tick
2138 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2139 /// # batch_first_tick.chain(batch_second_tick);
2140 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2141 /// changes_across_ticks // from the current tick
2142 /// )
2143 /// # .entries().all_ticks()
2144 /// # }, |mut stream| async move {
2145 /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2146 /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2147 /// # assert_eq!(stream.next().await.unwrap(), w);
2148 /// # }
2149 /// # }));
2150 /// ```
2151 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2152 KeyedStream::new(
2153 self.location.clone(),
2154 HydroNode::DeferTick {
2155 input: Box::new(self.ir_node.into_inner()),
2156 metadata: self.location.new_node_metadata(KeyedStream::<
2157 K,
2158 V,
2159 Tick<L>,
2160 Bounded,
2161 O,
2162 R,
2163 >::collection_kind()),
2164 },
2165 )
2166 }
2167}
2168
2169#[cfg(test)]
2170mod tests {
2171 #[cfg(feature = "deploy")]
2172 use futures::{SinkExt, StreamExt};
2173 #[cfg(feature = "deploy")]
2174 use hydro_deploy::Deployment;
2175 use stageleft::q;
2176
2177 use crate::compile::builder::FlowBuilder;
2178 #[cfg(feature = "deploy")]
2179 use crate::live_collections::stream::ExactlyOnce;
2180 use crate::location::Location;
2181 use crate::nondet::nondet;
2182
2183 #[cfg(feature = "deploy")]
2184 #[tokio::test]
2185 async fn reduce_watermark_filter() {
2186 let mut deployment = Deployment::new();
2187
2188 let flow = FlowBuilder::new();
2189 let node = flow.process::<()>();
2190 let external = flow.external::<()>();
2191
2192 let node_tick = node.tick();
2193 let watermark = node_tick.singleton(q!(1));
2194
2195 let sum = node
2196 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2197 .into_keyed()
2198 .reduce_watermark(
2199 watermark,
2200 q!(|acc, v| {
2201 *acc += v;
2202 }),
2203 )
2204 .snapshot(&node_tick, nondet!(/** test */))
2205 .entries()
2206 .all_ticks()
2207 .send_bincode_external(&external);
2208
2209 let nodes = flow
2210 .with_process(&node, deployment.Localhost())
2211 .with_external(&external, deployment.Localhost())
2212 .deploy(&mut deployment);
2213
2214 deployment.deploy().await.unwrap();
2215
2216 let mut out = nodes.connect(sum).await;
2217
2218 deployment.start().await.unwrap();
2219
2220 assert_eq!(out.next().await.unwrap(), (2, 204));
2221 }
2222
2223 #[cfg(feature = "deploy")]
2224 #[tokio::test]
2225 async fn reduce_watermark_garbage_collect() {
2226 let mut deployment = Deployment::new();
2227
2228 let flow = FlowBuilder::new();
2229 let node = flow.process::<()>();
2230 let external = flow.external::<()>();
2231 let (tick_send, tick_trigger) =
2232 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2233
2234 let node_tick = node.tick();
2235 let (watermark_complete_cycle, watermark) =
2236 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2237 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2238 watermark_complete_cycle.complete_next_tick(next_watermark);
2239
2240 let tick_triggered_input = node
2241 .source_iter(q!([(3, 103)]))
2242 .batch(&node_tick, nondet!(/** test */))
2243 .filter_if_some(
2244 tick_trigger
2245 .clone()
2246 .batch(&node_tick, nondet!(/** test */))
2247 .first(),
2248 )
2249 .all_ticks();
2250
2251 let sum = node
2252 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2253 .interleave(tick_triggered_input)
2254 .into_keyed()
2255 .reduce_watermark_commutative(
2256 watermark,
2257 q!(|acc, v| {
2258 *acc += v;
2259 }),
2260 )
2261 .snapshot(&node_tick, nondet!(/** test */))
2262 .entries()
2263 .all_ticks()
2264 .send_bincode_external(&external);
2265
2266 let nodes = flow
2267 .with_default_optimize()
2268 .with_process(&node, deployment.Localhost())
2269 .with_external(&external, deployment.Localhost())
2270 .deploy(&mut deployment);
2271
2272 deployment.deploy().await.unwrap();
2273
2274 let mut tick_send = nodes.connect(tick_send).await;
2275 let mut out_recv = nodes.connect(sum).await;
2276
2277 deployment.start().await.unwrap();
2278
2279 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2280
2281 tick_send.send(()).await.unwrap();
2282
2283 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2284 }
2285
2286 #[test]
2287 #[should_panic]
2288 fn sim_batch_nondet_size() {
2289 let flow = FlowBuilder::new();
2290 let external = flow.external::<()>();
2291 let node = flow.process::<()>();
2292
2293 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2294
2295 let tick = node.tick();
2296 let out_port = input
2297 .batch(&tick, nondet!(/** test */))
2298 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2299 .entries()
2300 .all_ticks()
2301 .send_bincode_external(&external);
2302
2303 flow.sim().exhaustive(async |mut compiled| {
2304 let out_recv = compiled.connect(&out_port);
2305 compiled.launch();
2306
2307 out_recv
2308 .assert_yields_only_unordered([(1, vec![1, 2])])
2309 .await;
2310 });
2311 }
2312
2313 #[test]
2314 fn sim_batch_preserves_group_order() {
2315 let flow = FlowBuilder::new();
2316 let external = flow.external::<()>();
2317 let node = flow.process::<()>();
2318
2319 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2320
2321 let tick = node.tick();
2322 let out_port = input
2323 .batch(&tick, nondet!(/** test */))
2324 .all_ticks()
2325 .fold_early_stop(
2326 q!(|| 0),
2327 q!(|acc, v| {
2328 *acc = std::cmp::max(v, *acc);
2329 *acc >= 2
2330 }),
2331 )
2332 .entries()
2333 .send_bincode_external(&external);
2334
2335 let instances = flow.sim().exhaustive(async |mut compiled| {
2336 let out_recv = compiled.connect(&out_port);
2337 compiled.launch();
2338
2339 out_recv
2340 .assert_yields_only_unordered([(1, 2), (2, 3)])
2341 .await;
2342 });
2343
2344 assert_eq!(instances, 8);
2345 // - three cases: all three in a separate tick (pick where (2, 3) is)
2346 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2347 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2348 // - one case: all three together
2349 }
2350
2351 #[test]
2352 fn sim_batch_unordered_shuffles() {
2353 let flow = FlowBuilder::new();
2354 let external = flow.external::<()>();
2355 let node = flow.process::<()>();
2356
2357 let input = node
2358 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2359 .into_keyed()
2360 .weakest_ordering();
2361
2362 let tick = node.tick();
2363 let out_port = input
2364 .batch(&tick, nondet!(/** test */))
2365 .all_ticks()
2366 .entries()
2367 .send_bincode_external(&external);
2368
2369 let instances = flow.sim().exhaustive(async |mut compiled| {
2370 let out_recv = compiled.connect(&out_port);
2371 compiled.launch();
2372
2373 out_recv
2374 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2375 .await;
2376 });
2377
2378 assert_eq!(instances, 13);
2379 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2380 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2381 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2382 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2383 }
2384}