hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{
15    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode,
16};
17#[cfg(stageleft_runtime)]
18use crate::forward_handle::{CycleCollection, ReceiverComplete};
19use crate::forward_handle::{ForwardRef, TickCycle};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25
26/// A *nullable* Rust value that can asynchronously change over time.
27///
28/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
29/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
30/// asynchronously change over time, including becoming present of uninhabited.
31///
32/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
33/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this optional (when it is not null)
37/// - `Loc`: the [`Location`] where the optional is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Optional<Type, Loc, Bound: Boundedness> {
40    pub(crate) location: Loc,
41    pub(crate) ir_node: RefCell<HydroNode>,
42
43    _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48    L: Location<'a>,
49{
50    fn defer_tick(self) -> Self {
51        Optional::defer_tick(self)
52    }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
56where
57    L: Location<'a>,
58{
59    type Location = Tick<L>;
60
61    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62        Optional::new(
63            location.clone(),
64            HydroNode::CycleSource {
65                ident,
66                metadata: location.new_node_metadata(Self::collection_kind()),
67            },
68        )
69    }
70}
71
72impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
73where
74    L: Location<'a>,
75{
76    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
77        assert_eq!(
78            Location::id(&self.location),
79            expected_location,
80            "locations do not match"
81        );
82        self.location
83            .flow_state()
84            .borrow_mut()
85            .push_root(HydroRoot::CycleSink {
86                ident,
87                input: Box::new(self.ir_node.into_inner()),
88                op_metadata: HydroIrOpMetadata::new(),
89            });
90    }
91}
92
93impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
94where
95    L: Location<'a>,
96{
97    type Location = Tick<L>;
98
99    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
100        Optional::new(
101            location.clone(),
102            HydroNode::CycleSource {
103                ident,
104                metadata: location.new_node_metadata(Self::collection_kind()),
105            },
106        )
107    }
108}
109
110impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
111where
112    L: Location<'a>,
113{
114    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
115        assert_eq!(
116            Location::id(&self.location),
117            expected_location,
118            "locations do not match"
119        );
120        self.location
121            .flow_state()
122            .borrow_mut()
123            .push_root(HydroRoot::CycleSink {
124                ident,
125                input: Box::new(self.ir_node.into_inner()),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133    L: Location<'a> + NoTick,
134{
135    type Location = L;
136
137    fn create_source(ident: syn::Ident, location: L) -> Self {
138        Optional::new(
139            location.clone(),
140            HydroNode::CycleSource {
141                ident,
142                metadata: location.new_node_metadata(Self::collection_kind()),
143            },
144        )
145    }
146}
147
148impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
149where
150    L: Location<'a> + NoTick,
151{
152    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
153        assert_eq!(
154            Location::id(&self.location),
155            expected_location,
156            "locations do not match"
157        );
158        self.location
159            .flow_state()
160            .borrow_mut()
161            .push_root(HydroRoot::CycleSink {
162                ident,
163                input: Box::new(self.ir_node.into_inner()),
164                op_metadata: HydroIrOpMetadata::new(),
165            });
166    }
167}
168
169impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
170where
171    L: Location<'a>,
172{
173    fn from(singleton: Optional<T, L, Bounded>) -> Self {
174        Optional::new(singleton.location, singleton.ir_node.into_inner())
175    }
176}
177
178impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
179where
180    L: Location<'a>,
181{
182    fn from(singleton: Singleton<T, L, B>) -> Self {
183        Optional::new(
184            singleton.location.clone(),
185            HydroNode::Cast {
186                inner: Box::new(singleton.ir_node.into_inner()),
187                metadata: singleton
188                    .location
189                    .new_node_metadata(Self::collection_kind()),
190            },
191        )
192    }
193}
194
195#[cfg(stageleft_runtime)]
196fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
197    me: Optional<T, L, B>,
198    other: Optional<O, L, B>,
199) -> Optional<(T, O), L, B> {
200    check_matching_location(&me.location, &other.location);
201
202    Optional::new(
203        me.location.clone(),
204        HydroNode::CrossSingleton {
205            left: Box::new(me.ir_node.into_inner()),
206            right: Box::new(other.ir_node.into_inner()),
207            metadata: me
208                .location
209                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
210        },
211    )
212}
213
214#[cfg(stageleft_runtime)]
215fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
216    me: Optional<T, L, B>,
217    other: Optional<T, L, B>,
218) -> Optional<T, L, B> {
219    check_matching_location(&me.location, &other.location);
220
221    Optional::new(
222        me.location.clone(),
223        HydroNode::ChainFirst {
224            first: Box::new(me.ir_node.into_inner()),
225            second: Box::new(other.ir_node.into_inner()),
226            metadata: me
227                .location
228                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
229        },
230    )
231}
232
233impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
234where
235    T: Clone,
236    L: Location<'a>,
237{
238    fn clone(&self) -> Self {
239        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
240            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
241            *self.ir_node.borrow_mut() = HydroNode::Tee {
242                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
243                metadata: self.location.new_node_metadata(Self::collection_kind()),
244            };
245        }
246
247        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
248            Optional {
249                location: self.location.clone(),
250                ir_node: HydroNode::Tee {
251                    inner: TeeNode(inner.0.clone()),
252                    metadata: metadata.clone(),
253                }
254                .into(),
255                _phantom: PhantomData,
256            }
257        } else {
258            unreachable!()
259        }
260    }
261}
262
263impl<'a, T, L, B: Boundedness> Optional<T, L, B>
264where
265    L: Location<'a>,
266{
267    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
268        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
269        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
270        Optional {
271            location,
272            ir_node: RefCell::new(ir_node),
273            _phantom: PhantomData,
274        }
275    }
276
277    pub(crate) fn collection_kind() -> CollectionKind {
278        CollectionKind::Optional {
279            bound: B::BOUND_KIND,
280            element_type: stageleft::quote_type::<T>().into(),
281        }
282    }
283
284    /// Returns the [`Location`] where this optional is being materialized.
285    pub fn location(&self) -> &L {
286        &self.location
287    }
288
289    /// Transforms the optional value by applying a function `f` to it,
290    /// continuously as the input is updated.
291    ///
292    /// Whenever the optional is empty, the output optional is also empty.
293    ///
294    /// # Example
295    /// ```rust
296    /// # use hydro_lang::prelude::*;
297    /// # use futures::StreamExt;
298    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
299    /// let tick = process.tick();
300    /// let optional = tick.optional_first_tick(q!(1));
301    /// optional.map(q!(|v| v + 1)).all_ticks()
302    /// # }, |mut stream| async move {
303    /// // 2
304    /// # assert_eq!(stream.next().await.unwrap(), 2);
305    /// # }));
306    /// ```
307    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308    where
309        F: Fn(T) -> U + 'a,
310    {
311        let f = f.splice_fn1_ctx(&self.location).into();
312        Optional::new(
313            self.location.clone(),
314            HydroNode::Map {
315                f,
316                input: Box::new(self.ir_node.into_inner()),
317                metadata: self
318                    .location
319                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
320            },
321        )
322    }
323
324    /// Transforms the optional value by applying a function `f` to it and then flattening
325    /// the result into a stream, preserving the order of elements.
326    ///
327    /// If the optional is empty, the output stream is also empty. If the optional contains
328    /// a value, `f` is applied to produce an iterator, and all items from that iterator
329    /// are emitted in the output stream in deterministic order.
330    ///
331    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
332    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
333    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
334    ///
335    /// # Example
336    /// ```rust
337    /// # use hydro_lang::prelude::*;
338    /// # use futures::StreamExt;
339    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
340    /// let tick = process.tick();
341    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
342    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
343    /// # }, |mut stream| async move {
344    /// // 1, 2, 3
345    /// # for w in vec![1, 2, 3] {
346    /// #     assert_eq!(stream.next().await.unwrap(), w);
347    /// # }
348    /// # }));
349    /// ```
350    pub fn flat_map_ordered<U, I, F>(
351        self,
352        f: impl IntoQuotedMut<'a, F, L>,
353    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
354    where
355        I: IntoIterator<Item = U>,
356        F: Fn(T) -> I + 'a,
357    {
358        let f = f.splice_fn1_ctx(&self.location).into();
359        Stream::new(
360            self.location.clone(),
361            HydroNode::FlatMap {
362                f,
363                input: Box::new(self.ir_node.into_inner()),
364                metadata: self.location.new_node_metadata(
365                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
366                ),
367            },
368        )
369    }
370
371    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
372    /// for the output type `I` to produce items in any order.
373    ///
374    /// If the optional is empty, the output stream is also empty. If the optional contains
375    /// a value, `f` is applied to produce an iterator, and all items from that iterator
376    /// are emitted in the output stream in non-deterministic order.
377    ///
378    /// # Example
379    /// ```rust
380    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
381    /// # use futures::StreamExt;
382    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
383    /// let tick = process.tick();
384    /// let optional = tick.optional_first_tick(q!(
385    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
386    /// ));
387    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
388    /// # }, |mut stream| async move {
389    /// // 1, 2, 3, but in no particular order
390    /// # let mut results = Vec::new();
391    /// # for _ in 0..3 {
392    /// #     results.push(stream.next().await.unwrap());
393    /// # }
394    /// # results.sort();
395    /// # assert_eq!(results, vec![1, 2, 3]);
396    /// # }));
397    /// ```
398    pub fn flat_map_unordered<U, I, F>(
399        self,
400        f: impl IntoQuotedMut<'a, F, L>,
401    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
402    where
403        I: IntoIterator<Item = U>,
404        F: Fn(T) -> I + 'a,
405    {
406        let f = f.splice_fn1_ctx(&self.location).into();
407        Stream::new(
408            self.location.clone(),
409            HydroNode::FlatMap {
410                f,
411                input: Box::new(self.ir_node.into_inner()),
412                metadata: self
413                    .location
414                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
415            },
416        )
417    }
418
419    /// Flattens the optional value into a stream, preserving the order of elements.
420    ///
421    /// If the optional is empty, the output stream is also empty. If the optional contains
422    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
423    /// in the output stream in deterministic order.
424    ///
425    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
426    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
427    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
428    ///
429    /// # Example
430    /// ```rust
431    /// # use hydro_lang::prelude::*;
432    /// # use futures::StreamExt;
433    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434    /// let tick = process.tick();
435    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
436    /// optional.flatten_ordered().all_ticks()
437    /// # }, |mut stream| async move {
438    /// // 1, 2, 3
439    /// # for w in vec![1, 2, 3] {
440    /// #     assert_eq!(stream.next().await.unwrap(), w);
441    /// # }
442    /// # }));
443    /// ```
444    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
445    where
446        T: IntoIterator<Item = U>,
447    {
448        self.flat_map_ordered(q!(|v| v))
449    }
450
451    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
452    /// for the element type `T` to produce items in any order.
453    ///
454    /// If the optional is empty, the output stream is also empty. If the optional contains
455    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
456    /// in the output stream in non-deterministic order.
457    ///
458    /// # Example
459    /// ```rust
460    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
461    /// # use futures::StreamExt;
462    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
463    /// let tick = process.tick();
464    /// let optional = tick.optional_first_tick(q!(
465    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
466    /// ));
467    /// optional.flatten_unordered().all_ticks()
468    /// # }, |mut stream| async move {
469    /// // 1, 2, 3, but in no particular order
470    /// # let mut results = Vec::new();
471    /// # for _ in 0..3 {
472    /// #     results.push(stream.next().await.unwrap());
473    /// # }
474    /// # results.sort();
475    /// # assert_eq!(results, vec![1, 2, 3]);
476    /// # }));
477    /// ```
478    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
479    where
480        T: IntoIterator<Item = U>,
481    {
482        self.flat_map_unordered(q!(|v| v))
483    }
484
485    /// Creates an optional containing only the value if it satisfies a predicate `f`.
486    ///
487    /// If the optional is empty, the output optional is also empty. If the optional contains
488    /// a value and the predicate returns `true`, the output optional contains the same value.
489    /// If the predicate returns `false`, the output optional is empty.
490    ///
491    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
492    /// not modify or take ownership of the value. If you need to modify the value while filtering
493    /// use [`Optional::filter_map`] instead.
494    ///
495    /// # Example
496    /// ```rust
497    /// # use hydro_lang::prelude::*;
498    /// # use futures::StreamExt;
499    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
500    /// let tick = process.tick();
501    /// let optional = tick.optional_first_tick(q!(5));
502    /// optional.filter(q!(|&x| x > 3)).all_ticks()
503    /// # }, |mut stream| async move {
504    /// // 5
505    /// # assert_eq!(stream.next().await.unwrap(), 5);
506    /// # }));
507    /// ```
508    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
509    where
510        F: Fn(&T) -> bool + 'a,
511    {
512        let f = f.splice_fn1_borrow_ctx(&self.location).into();
513        Optional::new(
514            self.location.clone(),
515            HydroNode::Filter {
516                f,
517                input: Box::new(self.ir_node.into_inner()),
518                metadata: self.location.new_node_metadata(Self::collection_kind()),
519            },
520        )
521    }
522
523    /// An operator that both filters and maps. It yields only the value if the supplied
524    /// closure `f` returns `Some(value)`.
525    ///
526    /// If the optional is empty, the output optional is also empty. If the optional contains
527    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
528    /// If the closure returns `None`, the output optional is empty.
529    ///
530    /// # Example
531    /// ```rust
532    /// # use hydro_lang::prelude::*;
533    /// # use futures::StreamExt;
534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535    /// let tick = process.tick();
536    /// let optional = tick.optional_first_tick(q!("42"));
537    /// optional
538    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
539    ///     .all_ticks()
540    /// # }, |mut stream| async move {
541    /// // 42
542    /// # assert_eq!(stream.next().await.unwrap(), 42);
543    /// # }));
544    /// ```
545    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
546    where
547        F: Fn(T) -> Option<U> + 'a,
548    {
549        let f = f.splice_fn1_ctx(&self.location).into();
550        Optional::new(
551            self.location.clone(),
552            HydroNode::FilterMap {
553                f,
554                input: Box::new(self.ir_node.into_inner()),
555                metadata: self
556                    .location
557                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
558            },
559        )
560    }
561
562    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
563    ///
564    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
565    /// non-null. This is useful for combining several pieces of state together.
566    ///
567    /// # Example
568    /// ```rust
569    /// # use hydro_lang::prelude::*;
570    /// # use futures::StreamExt;
571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572    /// let tick = process.tick();
573    /// let numbers = process
574    ///   .source_iter(q!(vec![123, 456, 789]))
575    ///   .batch(&tick, nondet!(/** test */));
576    /// let min = numbers.clone().min(); // Optional
577    /// let max = numbers.max(); // Optional
578    /// min.zip(max).all_ticks()
579    /// # }, |mut stream| async move {
580    /// // [(123, 789)]
581    /// # for w in vec![(123, 789)] {
582    /// #     assert_eq!(stream.next().await.unwrap(), w);
583    /// # }
584    /// # }));
585    /// ```
586    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
587    where
588        O: Clone,
589    {
590        let other: Optional<O, L, B> = other.into();
591        check_matching_location(&self.location, &other.location);
592
593        if L::is_top_level()
594            && let Some(tick) = self.location.try_tick()
595        {
596            let out = zip_inside_tick(
597                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
598                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
599            )
600            .latest();
601
602            Optional::new(out.location, out.ir_node.into_inner())
603        } else {
604            zip_inside_tick(self, other)
605        }
606    }
607
608    /// Passes through `self` when it has a value, otherwise passes through `other`.
609    ///
610    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
611    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
612    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
613    ///
614    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
615    /// of the inputs change (including to/from null states).
616    ///
617    /// # Example
618    /// ```rust
619    /// # use hydro_lang::prelude::*;
620    /// # use futures::StreamExt;
621    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
622    /// let tick = process.tick();
623    /// // ticks are lazy by default, forces the second tick to run
624    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
625    ///
626    /// let some_first_tick = tick.optional_first_tick(q!(123));
627    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
628    /// some_first_tick.or(some_second_tick).all_ticks()
629    /// # }, |mut stream| async move {
630    /// // [123 /* first tick */, 456 /* second tick */]
631    /// # for w in vec![123, 456] {
632    /// #     assert_eq!(stream.next().await.unwrap(), w);
633    /// # }
634    /// # }));
635    /// ```
636    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
637        check_matching_location(&self.location, &other.location);
638
639        if L::is_top_level()
640            && let Some(tick) = self.location.try_tick()
641        {
642            let out = or_inside_tick(
643                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
644                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
645            )
646            .latest();
647
648            Optional::new(out.location, out.ir_node.into_inner())
649        } else {
650            Optional::new(
651                self.location.clone(),
652                HydroNode::ChainFirst {
653                    first: Box::new(self.ir_node.into_inner()),
654                    second: Box::new(other.ir_node.into_inner()),
655                    metadata: self.location.new_node_metadata(Self::collection_kind()),
656                },
657            )
658        }
659    }
660
661    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
662    ///
663    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
664    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
665    ///
666    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
667    /// of the inputs change (including to/from null states).
668    ///
669    /// # Example
670    /// ```rust
671    /// # use hydro_lang::prelude::*;
672    /// # use futures::StreamExt;
673    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674    /// let tick = process.tick();
675    /// // ticks are lazy by default, forces the later ticks to run
676    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
677    ///
678    /// let some_first_tick = tick.optional_first_tick(q!(123));
679    /// some_first_tick
680    ///     .unwrap_or(tick.singleton(q!(456)))
681    ///     .all_ticks()
682    /// # }, |mut stream| async move {
683    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
684    /// # for w in vec![123, 456, 456, 456] {
685    /// #     assert_eq!(stream.next().await.unwrap(), w);
686    /// # }
687    /// # }));
688    /// ```
689    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
690        let res_option = self.or(other.into());
691        Singleton::new(
692            res_option.location.clone(),
693            HydroNode::Cast {
694                inner: Box::new(res_option.ir_node.into_inner()),
695                metadata: res_option
696                    .location
697                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
698            },
699        )
700    }
701
702    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
703    ///
704    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
705    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
706    /// so that Hydro can skip any computation on null values.
707    ///
708    /// # Example
709    /// ```rust
710    /// # use hydro_lang::prelude::*;
711    /// # use futures::StreamExt;
712    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
713    /// let tick = process.tick();
714    /// // ticks are lazy by default, forces the later ticks to run
715    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
716    ///
717    /// let some_first_tick = tick.optional_first_tick(q!(123));
718    /// some_first_tick.into_singleton().all_ticks()
719    /// # }, |mut stream| async move {
720    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
721    /// # for w in vec![Some(123), None, None, None] {
722    /// #     assert_eq!(stream.next().await.unwrap(), w);
723    /// # }
724    /// # }));
725    /// ```
726    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
727    where
728        T: Clone,
729    {
730        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
731        let core_ir = HydroNode::Source {
732            source: HydroSource::Iter(none.into()),
733            metadata: self
734                .location
735                .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
736        };
737
738        let none_singleton = if L::is_top_level() {
739            Singleton::new(
740                self.location.clone(),
741                HydroNode::Persist {
742                    inner: Box::new(core_ir),
743                    metadata: self
744                        .location
745                        .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
746                },
747            )
748        } else {
749            Singleton::new(self.location.clone(), core_ir)
750        };
751
752        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
753    }
754
755    /// An operator which allows you to "name" a `HydroNode`.
756    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
757    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
758        {
759            let mut node = self.ir_node.borrow_mut();
760            let metadata = node.metadata_mut();
761            metadata.tag = Some(name.to_string());
762        }
763        self
764    }
765}
766
767impl<'a, T, L> Optional<T, L, Bounded>
768where
769    L: Location<'a>,
770{
771    /// Filters this optional, passing through the optional value if it is non-null **and** the
772    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
773    ///
774    /// Useful for conditionally processing, such as only emitting an optional's value outside
775    /// a tick if some other condition is satisfied.
776    ///
777    /// # Example
778    /// ```rust
779    /// # use hydro_lang::prelude::*;
780    /// # use futures::StreamExt;
781    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
782    /// let tick = process.tick();
783    /// // ticks are lazy by default, forces the second tick to run
784    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
785    ///
786    /// let batch_first_tick = process
787    ///   .source_iter(q!(vec![]))
788    ///   .batch(&tick, nondet!(/** test */));
789    /// let batch_second_tick = process
790    ///   .source_iter(q!(vec![456]))
791    ///   .batch(&tick, nondet!(/** test */))
792    ///   .defer_tick(); // appears on the second tick
793    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
794    /// batch_first_tick.chain(batch_second_tick).first()
795    ///   .filter_if_some(some_on_first_tick)
796    ///   .unwrap_or(tick.singleton(q!(789)))
797    ///   .all_ticks()
798    /// # }, |mut stream| async move {
799    /// // [789, 789]
800    /// # for w in vec![789, 789] {
801    /// #     assert_eq!(stream.next().await.unwrap(), w);
802    /// # }
803    /// # }));
804    /// ```
805    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
806        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
807    }
808
809    /// Filters this optional, passing through the optional value if it is non-null **and** the
810    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
811    ///
812    /// Useful for conditionally processing, such as only emitting an optional's value outside
813    /// a tick if some other condition is satisfied.
814    ///
815    /// # Example
816    /// ```rust
817    /// # use hydro_lang::prelude::*;
818    /// # use futures::StreamExt;
819    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
820    /// let tick = process.tick();
821    /// // ticks are lazy by default, forces the second tick to run
822    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
823    ///
824    /// let batch_first_tick = process
825    ///   .source_iter(q!(vec![]))
826    ///   .batch(&tick, nondet!(/** test */));
827    /// let batch_second_tick = process
828    ///   .source_iter(q!(vec![456]))
829    ///   .batch(&tick, nondet!(/** test */))
830    ///   .defer_tick(); // appears on the second tick
831    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
832    /// batch_first_tick.chain(batch_second_tick).first()
833    ///   .filter_if_none(some_on_first_tick)
834    ///   .unwrap_or(tick.singleton(q!(789)))
835    ///   .all_ticks()
836    /// # }, |mut stream| async move {
837    /// // [789, 789]
838    /// # for w in vec![789, 456] {
839    /// #     assert_eq!(stream.next().await.unwrap(), w);
840    /// # }
841    /// # }));
842    /// ```
843    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
844        self.filter_if_some(
845            other
846                .map(q!(|_| ()))
847                .into_singleton()
848                .filter(q!(|o| o.is_none())),
849        )
850    }
851
852    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
853    ///
854    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
855    /// having a value, such as only releasing a piece of state if the node is the leader.
856    ///
857    /// # Example
858    /// ```rust
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let tick = process.tick();
863    /// // ticks are lazy by default, forces the second tick to run
864    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865    ///
866    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
867    /// some_on_first_tick
868    ///     .if_some_then(tick.singleton(q!(456)))
869    ///     .unwrap_or(tick.singleton(q!(123)))
870    /// # .all_ticks()
871    /// # }, |mut stream| async move {
872    /// // 456 (first tick) ~> 123 (second tick onwards)
873    /// # for w in vec![456, 123, 123] {
874    /// #     assert_eq!(stream.next().await.unwrap(), w);
875    /// # }
876    /// # }));
877    /// ```
878    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
879        value.filter_if_some(self)
880    }
881}
882
883impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
884where
885    L: Location<'a> + NoTick,
886{
887    /// Returns an optional value corresponding to the latest snapshot of the optional
888    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
889    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
890    /// all snapshots of this optional into the atomic-associated tick will observe the
891    /// same value each tick.
892    ///
893    /// # Non-Determinism
894    /// Because this picks a snapshot of a optional whose value is continuously changing,
895    /// the output optional has a non-deterministic value since the snapshot can be at an
896    /// arbitrary point in time.
897    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
898        Optional::new(
899            self.location.clone().tick,
900            HydroNode::Batch {
901                inner: Box::new(self.ir_node.into_inner()),
902                metadata: self
903                    .location
904                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
905            },
906        )
907    }
908
909    /// Returns this optional back into a top-level, asynchronous execution context where updates
910    /// to the value will be asynchronously propagated.
911    pub fn end_atomic(self) -> Optional<T, L, B> {
912        Optional::new(
913            self.location.tick.l.clone(),
914            HydroNode::EndAtomic {
915                inner: Box::new(self.ir_node.into_inner()),
916                metadata: self
917                    .location
918                    .tick
919                    .l
920                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
921            },
922        )
923    }
924}
925
926impl<'a, T, L, B: Boundedness> Optional<T, L, B>
927where
928    L: Location<'a>,
929{
930    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
931    /// will observe the same version of the value and will be executed synchronously before any
932    /// outputs are yielded (in [`Optional::end_atomic`]).
933    ///
934    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
935    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
936    /// a different version).
937    ///
938    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
939    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
940    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
941    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
942        let out_location = Atomic { tick: tick.clone() };
943        Optional::new(
944            out_location.clone(),
945            HydroNode::BeginAtomic {
946                inner: Box::new(self.ir_node.into_inner()),
947                metadata: out_location
948                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
949            },
950        )
951    }
952
953    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
954    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
955    /// relevant data that contributed to the snapshot at tick `t`.
956    ///
957    /// # Non-Determinism
958    /// Because this picks a snapshot of a optional whose value is continuously changing,
959    /// the output optional has a non-deterministic value since the snapshot can be at an
960    /// arbitrary point in time.
961    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
962        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
963        Optional::new(
964            tick.clone(),
965            HydroNode::Batch {
966                inner: Box::new(self.ir_node.into_inner()),
967                metadata: tick
968                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
969            },
970        )
971    }
972
973    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
974    /// with order corresponding to increasing prefixes of data contributing to the optional.
975    ///
976    /// # Non-Determinism
977    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
978    /// to non-deterministic batching and arrival of inputs, the output stream is
979    /// non-deterministic.
980    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
981    where
982        L: NoTick,
983    {
984        let tick = self.location.tick();
985        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
986    }
987
988    /// Given a time interval, returns a stream corresponding to snapshots of the optional
989    /// value taken at various points in time. Because the input optional may be
990    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
991    /// represent the value of the optional given some prefix of the streams leading up to
992    /// it.
993    ///
994    /// # Non-Determinism
995    /// The output stream is non-deterministic in which elements are sampled, since this
996    /// is controlled by a clock.
997    pub fn sample_every(
998        self,
999        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1000        nondet: NonDet,
1001    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1002    where
1003        L: NoTick + NoAtomic,
1004    {
1005        let samples = self.location.source_interval(interval, nondet);
1006        let tick = self.location.tick();
1007
1008        self.snapshot(&tick, nondet)
1009            .filter_if_some(samples.batch(&tick, nondet).first())
1010            .all_ticks()
1011            .weakest_retries()
1012    }
1013}
1014
1015impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1016where
1017    L: Location<'a>,
1018{
1019    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1020    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1021    /// null values).
1022    ///
1023    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1024    /// producing one element in the output for each (non-null) tick. This is useful for batched
1025    /// computations, where the results from each tick must be combined together.
1026    ///
1027    /// # Example
1028    /// ```rust
1029    /// # use hydro_lang::prelude::*;
1030    /// # use futures::StreamExt;
1031    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1032    /// # let tick = process.tick();
1033    /// # // ticks are lazy by default, forces the second tick to run
1034    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1035    /// # let batch_first_tick = process
1036    /// #   .source_iter(q!(vec![]))
1037    /// #   .batch(&tick, nondet!(/** test */));
1038    /// # let batch_second_tick = process
1039    /// #   .source_iter(q!(vec![1, 2, 3]))
1040    /// #   .batch(&tick, nondet!(/** test */))
1041    /// #   .defer_tick(); // appears on the second tick
1042    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1043    /// input_batch // first tick: [], second tick: [1, 2, 3]
1044    ///     .max()
1045    ///     .all_ticks()
1046    /// # }, |mut stream| async move {
1047    /// // [3]
1048    /// # for w in vec![3] {
1049    /// #     assert_eq!(stream.next().await.unwrap(), w);
1050    /// # }
1051    /// # }));
1052    /// ```
1053    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1054        self.into_stream().all_ticks()
1055    }
1056
1057    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1058    /// which will stream the value computed in _each_ tick as a separate stream element.
1059    ///
1060    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1061    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1062    /// optional's [`Tick`] context.
1063    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1064        self.into_stream().all_ticks_atomic()
1065    }
1066
1067    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1068    /// be asynchronously updated with the latest value of the optional inside the tick, including
1069    /// whether the optional is null or not.
1070    ///
1071    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1072    /// tick that tracks the inner value. This is useful for getting the value as of the
1073    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1074    ///
1075    /// # Example
1076    /// ```rust
1077    /// # use hydro_lang::prelude::*;
1078    /// # use futures::StreamExt;
1079    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080    /// # let tick = process.tick();
1081    /// # // ticks are lazy by default, forces the second tick to run
1082    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1083    /// # let batch_first_tick = process
1084    /// #   .source_iter(q!(vec![]))
1085    /// #   .batch(&tick, nondet!(/** test */));
1086    /// # let batch_second_tick = process
1087    /// #   .source_iter(q!(vec![1, 2, 3]))
1088    /// #   .batch(&tick, nondet!(/** test */))
1089    /// #   .defer_tick(); // appears on the second tick
1090    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1091    /// input_batch // first tick: [], second tick: [1, 2, 3]
1092    ///     .max()
1093    ///     .latest()
1094    /// # .into_singleton()
1095    /// # .sample_eager(nondet!(/** test */))
1096    /// # }, |mut stream| async move {
1097    /// // asynchronously changes from None ~> 3
1098    /// # for w in vec![None, Some(3)] {
1099    /// #     assert_eq!(stream.next().await.unwrap(), w);
1100    /// # }
1101    /// # }));
1102    /// ```
1103    pub fn latest(self) -> Optional<T, L, Unbounded> {
1104        Optional::new(
1105            self.location.outer().clone(),
1106            HydroNode::YieldConcat {
1107                inner: Box::new(self.ir_node.into_inner()),
1108                metadata: self
1109                    .location
1110                    .outer()
1111                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1112            },
1113        )
1114    }
1115
1116    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1117    /// be updated with the latest value of the optional inside the tick.
1118    ///
1119    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1120    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1121    /// optional's [`Tick`] context.
1122    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1123        let out_location = Atomic {
1124            tick: self.location.clone(),
1125        };
1126
1127        Optional::new(
1128            out_location.clone(),
1129            HydroNode::YieldConcat {
1130                inner: Box::new(self.ir_node.into_inner()),
1131                metadata: out_location
1132                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1133            },
1134        )
1135    }
1136
1137    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1138    /// always has the state of `self` at tick `T - 1`.
1139    ///
1140    /// At tick `0`, the output optional is null, since there is no previous tick.
1141    ///
1142    /// This operator enables stateful iterative processing with ticks, by sending data from one
1143    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1144    ///
1145    /// # Example
1146    /// ```rust
1147    /// # use hydro_lang::prelude::*;
1148    /// # use futures::StreamExt;
1149    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1150    /// let tick = process.tick();
1151    /// // ticks are lazy by default, forces the second tick to run
1152    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1153    ///
1154    /// let batch_first_tick = process
1155    ///   .source_iter(q!(vec![1, 2]))
1156    ///   .batch(&tick, nondet!(/** test */));
1157    /// let batch_second_tick = process
1158    ///   .source_iter(q!(vec![3, 4]))
1159    ///   .batch(&tick, nondet!(/** test */))
1160    ///   .defer_tick(); // appears on the second tick
1161    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1162    ///   .reduce(q!(|state, v| *state += v));
1163    ///
1164    /// current_tick_sum.clone().into_singleton().zip(
1165    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1166    /// ).all_ticks()
1167    /// # }, |mut stream| async move {
1168    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1169    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1170    /// #     assert_eq!(stream.next().await.unwrap(), w);
1171    /// # }
1172    /// # }));
1173    /// ```
1174    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1175        Optional::new(
1176            self.location.clone(),
1177            HydroNode::DeferTick {
1178                input: Box::new(self.ir_node.into_inner()),
1179                metadata: self.location.new_node_metadata(Self::collection_kind()),
1180            },
1181        )
1182    }
1183
1184    #[deprecated(note = "use .into_stream().persist()")]
1185    #[expect(missing_docs, reason = "deprecated")]
1186    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1187    where
1188        T: Clone,
1189    {
1190        self.into_stream().persist()
1191    }
1192
1193    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1194    /// non-null. Otherwise, the stream is empty.
1195    ///
1196    /// # Example
1197    /// ```rust
1198    /// # use hydro_lang::prelude::*;
1199    /// # use futures::StreamExt;
1200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1201    /// # let tick = process.tick();
1202    /// # // ticks are lazy by default, forces the second tick to run
1203    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1204    /// # let batch_first_tick = process
1205    /// #   .source_iter(q!(vec![]))
1206    /// #   .batch(&tick, nondet!(/** test */));
1207    /// # let batch_second_tick = process
1208    /// #   .source_iter(q!(vec![123, 456]))
1209    /// #   .batch(&tick, nondet!(/** test */))
1210    /// #   .defer_tick(); // appears on the second tick
1211    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1212    /// input_batch // first tick: [], second tick: [123, 456]
1213    ///     .clone()
1214    ///     .max()
1215    ///     .into_stream()
1216    ///     .chain(input_batch)
1217    ///     .all_ticks()
1218    /// # }, |mut stream| async move {
1219    /// // [456, 123, 456]
1220    /// # for w in vec![456, 123, 456] {
1221    /// #     assert_eq!(stream.next().await.unwrap(), w);
1222    /// # }
1223    /// # }));
1224    /// ```
1225    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1226        Stream::new(
1227            self.location.clone(),
1228            HydroNode::Cast {
1229                inner: Box::new(self.ir_node.into_inner()),
1230                metadata: self.location.new_node_metadata(Stream::<
1231                    T,
1232                    Tick<L>,
1233                    Bounded,
1234                    TotalOrder,
1235                    ExactlyOnce,
1236                >::collection_kind()),
1237            },
1238        )
1239    }
1240}
1241
1242#[cfg(feature = "deploy")]
1243#[cfg(test)]
1244mod tests {
1245    use futures::StreamExt;
1246    use hydro_deploy::Deployment;
1247    use stageleft::q;
1248
1249    use super::Optional;
1250    use crate::compile::builder::FlowBuilder;
1251    use crate::location::Location;
1252
1253    #[tokio::test]
1254    async fn optional_or_cardinality() {
1255        let mut deployment = Deployment::new();
1256
1257        let flow = FlowBuilder::new();
1258        let node = flow.process::<()>();
1259        let external = flow.external::<()>();
1260
1261        let node_tick = node.tick();
1262        let tick_singleton = node_tick.singleton(q!(123));
1263        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1264        let counts = tick_optional_inhabited
1265            .clone()
1266            .or(tick_optional_inhabited)
1267            .into_stream()
1268            .count()
1269            .all_ticks()
1270            .send_bincode_external(&external);
1271
1272        let nodes = flow
1273            .with_process(&node, deployment.Localhost())
1274            .with_external(&external, deployment.Localhost())
1275            .deploy(&mut deployment);
1276
1277        deployment.deploy().await.unwrap();
1278
1279        let mut external_out = nodes.connect(counts).await;
1280
1281        deployment.start().await.unwrap();
1282
1283        assert_eq!(external_out.next().await.unwrap(), 1);
1284    }
1285}