hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use syn::parse_quote;
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, Unbounded};
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, DeferTick, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27
28pub mod networking;
29
30/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
31#[sealed::sealed]
32pub trait Ordering:
33    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
34{
35}
36
37/// Marks the stream as being totally ordered, which means that there are
38/// no sources of non-determinism (other than intentional ones) that will
39/// affect the order of elements.
40pub enum TotalOrder {}
41
42#[sealed::sealed]
43impl Ordering for TotalOrder {}
44
45/// Marks the stream as having no order, which means that the order of
46/// elements may be affected by non-determinism.
47///
48/// This restricts certain operators, such as `fold` and `reduce`, to only
49/// be used with commutative aggregation functions.
50pub enum NoOrder {}
51
52#[sealed::sealed]
53impl Ordering for NoOrder {}
54
55/// Helper trait for determining the weakest of two orderings.
56#[sealed::sealed]
57pub trait MinOrder<Other: ?Sized> {
58    /// The weaker of the two orderings.
59    type Min: Ordering;
60}
61
62#[sealed::sealed]
63impl MinOrder<NoOrder> for TotalOrder {
64    type Min = NoOrder;
65}
66
67#[sealed::sealed]
68impl MinOrder<TotalOrder> for TotalOrder {
69    type Min = TotalOrder;
70}
71
72#[sealed::sealed]
73impl MinOrder<TotalOrder> for NoOrder {
74    type Min = NoOrder;
75}
76
77#[sealed::sealed]
78impl MinOrder<NoOrder> for NoOrder {
79    type Min = NoOrder;
80}
81
82/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
83#[sealed::sealed]
84pub trait Retries:
85    MinRetries<Self, Min = Self>
86    + MinRetries<ExactlyOnce, Min = Self>
87    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
88{
89}
90
91/// Marks the stream as having deterministic message cardinality, with no
92/// possibility of duplicates.
93pub enum ExactlyOnce {}
94
95#[sealed::sealed]
96impl Retries for ExactlyOnce {}
97
98/// Marks the stream as having non-deterministic message cardinality, which
99/// means that duplicates may occur, but messages will not be dropped.
100pub enum AtLeastOnce {}
101
102#[sealed::sealed]
103impl Retries for AtLeastOnce {}
104
105/// Helper trait for determining the weakest of two retry guarantees.
106#[sealed::sealed]
107pub trait MinRetries<Other: ?Sized> {
108    /// The weaker of the two retry guarantees.
109    type Min: Retries;
110}
111
112#[sealed::sealed]
113impl MinRetries<AtLeastOnce> for ExactlyOnce {
114    type Min = AtLeastOnce;
115}
116
117#[sealed::sealed]
118impl MinRetries<ExactlyOnce> for ExactlyOnce {
119    type Min = ExactlyOnce;
120}
121
122#[sealed::sealed]
123impl MinRetries<ExactlyOnce> for AtLeastOnce {
124    type Min = AtLeastOnce;
125}
126
127#[sealed::sealed]
128impl MinRetries<AtLeastOnce> for AtLeastOnce {
129    type Min = AtLeastOnce;
130}
131
132/// Streaming sequence of elements with type `Type`.
133///
134/// This live collection represents a growing sequence of elements, with new elements being
135/// asynchronously appended to the end of the sequence. This can be used to model the arrival
136/// of network input, such as API requests, or streaming ingestion.
137///
138/// By default, all streams have deterministic ordering and each element is materialized exactly
139/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
140/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
141/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
142///
143/// Type Parameters:
144/// - `Type`: the type of elements in the stream
145/// - `Loc`: the location where the stream is being materialized
146/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
147/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
148///   (default is [`TotalOrder`])
149/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
150///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
151pub struct Stream<
152    Type,
153    Loc,
154    Bound: Boundedness,
155    Order: Ordering = TotalOrder,
156    Retry: Retries = ExactlyOnce,
157> {
158    pub(crate) location: Loc,
159    pub(crate) ir_node: RefCell<HydroNode>,
160
161    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
162}
163
164impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
165    for Stream<T, L, Unbounded, O, R>
166where
167    L: Location<'a>,
168{
169    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
170        Stream {
171            location: stream.location,
172            ir_node: stream.ir_node,
173            _phantom: PhantomData,
174        }
175    }
176}
177
178impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
179    for Stream<T, L, B, NoOrder, R>
180where
181    L: Location<'a>,
182{
183    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
184        Stream {
185            location: stream.location,
186            ir_node: stream.ir_node,
187            _phantom: PhantomData,
188        }
189    }
190}
191
192impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
193    for Stream<T, L, B, O, AtLeastOnce>
194where
195    L: Location<'a>,
196{
197    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
198        Stream {
199            location: stream.location,
200            ir_node: stream.ir_node,
201            _phantom: PhantomData,
202        }
203    }
204}
205
206impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
207where
208    L: Location<'a>,
209{
210    fn defer_tick(self) -> Self {
211        Stream::defer_tick(self)
212    }
213}
214
215impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
216    for Stream<T, Tick<L>, Bounded, O, R>
217where
218    L: Location<'a>,
219{
220    type Location = Tick<L>;
221
222    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
223        Stream::new(
224            location.clone(),
225            HydroNode::CycleSource {
226                ident,
227                metadata: location.new_node_metadata::<T>(),
228            },
229        )
230    }
231}
232
233impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
234    for Stream<T, Tick<L>, Bounded, O, R>
235where
236    L: Location<'a>,
237{
238    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
239        assert_eq!(
240            Location::id(&self.location),
241            expected_location,
242            "locations do not match"
243        );
244        self.location
245            .flow_state()
246            .borrow_mut()
247            .push_root(HydroRoot::CycleSink {
248                ident,
249                input: Box::new(self.ir_node.into_inner()),
250                out_location: Location::id(&self.location),
251                op_metadata: HydroIrOpMetadata::new(),
252            });
253    }
254}
255
256impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
257    for Stream<T, L, B, O, R>
258where
259    L: Location<'a> + NoTick,
260{
261    type Location = L;
262
263    fn create_source(ident: syn::Ident, location: L) -> Self {
264        Stream::new(
265            location.clone(),
266            HydroNode::Persist {
267                inner: Box::new(HydroNode::CycleSource {
268                    ident,
269                    metadata: location.new_node_metadata::<T>(),
270                }),
271                metadata: location.new_node_metadata::<T>(),
272            },
273        )
274    }
275}
276
277impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
278    for Stream<T, L, B, O, R>
279where
280    L: Location<'a> + NoTick,
281{
282    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
283        assert_eq!(
284            Location::id(&self.location),
285            expected_location,
286            "locations do not match"
287        );
288        let metadata = self.location.new_node_metadata::<T>();
289        self.location
290            .flow_state()
291            .borrow_mut()
292            .push_root(HydroRoot::CycleSink {
293                ident,
294                input: Box::new(HydroNode::Unpersist {
295                    inner: Box::new(self.ir_node.into_inner()),
296                    metadata: metadata.clone(),
297                }),
298                out_location: Location::id(&self.location),
299                op_metadata: HydroIrOpMetadata::new(),
300            });
301    }
302}
303
304impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
305where
306    T: Clone,
307    L: Location<'a>,
308{
309    fn clone(&self) -> Self {
310        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
311            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
312            *self.ir_node.borrow_mut() = HydroNode::Tee {
313                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
314                metadata: self.location.new_node_metadata::<T>(),
315            };
316        }
317
318        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
319            Stream {
320                location: self.location.clone(),
321                ir_node: HydroNode::Tee {
322                    inner: TeeNode(inner.0.clone()),
323                    metadata: metadata.clone(),
324                }
325                .into(),
326                _phantom: PhantomData,
327            }
328        } else {
329            unreachable!()
330        }
331    }
332}
333
334impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
335where
336    L: Location<'a>,
337{
338    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
339        Stream {
340            location,
341            ir_node: RefCell::new(ir_node),
342            _phantom: PhantomData,
343        }
344    }
345
346    /// Produces a stream based on invoking `f` on each element.
347    /// If you do not want to modify the stream and instead only want to view
348    /// each item use [`Stream::inspect`] instead.
349    ///
350    /// # Example
351    /// ```rust
352    /// # use hydro_lang::prelude::*;
353    /// # use futures::StreamExt;
354    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
355    /// let words = process.source_iter(q!(vec!["hello", "world"]));
356    /// words.map(q!(|x| x.to_uppercase()))
357    /// # }, |mut stream| async move {
358    /// # for w in vec!["HELLO", "WORLD"] {
359    /// #     assert_eq!(stream.next().await.unwrap(), w);
360    /// # }
361    /// # }));
362    /// ```
363    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
364    where
365        F: Fn(T) -> U + 'a,
366    {
367        let f = f.splice_fn1_ctx(&self.location).into();
368        Stream::new(
369            self.location.clone(),
370            HydroNode::Map {
371                f,
372                input: Box::new(self.ir_node.into_inner()),
373                metadata: self.location.new_node_metadata::<U>(),
374            },
375        )
376    }
377
378    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
379    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
380    /// for the output type `U` must produce items in a **deterministic** order.
381    ///
382    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
383    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
384    ///
385    /// # Example
386    /// ```rust
387    /// # use hydro_lang::prelude::*;
388    /// # use futures::StreamExt;
389    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
390    /// process
391    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
392    ///     .flat_map_ordered(q!(|x| x))
393    /// # }, |mut stream| async move {
394    /// // 1, 2, 3, 4
395    /// # for w in (1..5) {
396    /// #     assert_eq!(stream.next().await.unwrap(), w);
397    /// # }
398    /// # }));
399    /// ```
400    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
401    where
402        I: IntoIterator<Item = U>,
403        F: Fn(T) -> I + 'a,
404    {
405        let f = f.splice_fn1_ctx(&self.location).into();
406        Stream::new(
407            self.location.clone(),
408            HydroNode::FlatMap {
409                f,
410                input: Box::new(self.ir_node.into_inner()),
411                metadata: self.location.new_node_metadata::<U>(),
412            },
413        )
414    }
415
416    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
417    /// for the output type `U` to produce items in any order.
418    ///
419    /// # Example
420    /// ```rust
421    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
422    /// # use futures::StreamExt;
423    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
424    /// process
425    ///     .source_iter(q!(vec![
426    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
427    ///         std::collections::HashSet::from_iter(vec![3, 4]),
428    ///     ]))
429    ///     .flat_map_unordered(q!(|x| x))
430    /// # }, |mut stream| async move {
431    /// // 1, 2, 3, 4, but in no particular order
432    /// # let mut results = Vec::new();
433    /// # for w in (1..5) {
434    /// #     results.push(stream.next().await.unwrap());
435    /// # }
436    /// # results.sort();
437    /// # assert_eq!(results, vec![1, 2, 3, 4]);
438    /// # }));
439    /// ```
440    pub fn flat_map_unordered<U, I, F>(
441        self,
442        f: impl IntoQuotedMut<'a, F, L>,
443    ) -> Stream<U, L, B, NoOrder, R>
444    where
445        I: IntoIterator<Item = U>,
446        F: Fn(T) -> I + 'a,
447    {
448        let f = f.splice_fn1_ctx(&self.location).into();
449        Stream::new(
450            self.location.clone(),
451            HydroNode::FlatMap {
452                f,
453                input: Box::new(self.ir_node.into_inner()),
454                metadata: self.location.new_node_metadata::<U>(),
455            },
456        )
457    }
458
459    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
460    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
461    ///
462    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
463    /// not deterministic, use [`Stream::flatten_unordered`] instead.
464    ///
465    /// ```rust
466    /// # use hydro_lang::prelude::*;
467    /// # use futures::StreamExt;
468    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
469    /// process
470    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
471    ///     .flatten_ordered()
472    /// # }, |mut stream| async move {
473    /// // 1, 2, 3, 4
474    /// # for w in (1..5) {
475    /// #     assert_eq!(stream.next().await.unwrap(), w);
476    /// # }
477    /// # }));
478    /// ```
479    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
480    where
481        T: IntoIterator<Item = U>,
482    {
483        self.flat_map_ordered(q!(|d| d))
484    }
485
486    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
487    /// for the element type `T` to produce items in any order.
488    ///
489    /// # Example
490    /// ```rust
491    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
492    /// # use futures::StreamExt;
493    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
494    /// process
495    ///     .source_iter(q!(vec![
496    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
497    ///         std::collections::HashSet::from_iter(vec![3, 4]),
498    ///     ]))
499    ///     .flatten_unordered()
500    /// # }, |mut stream| async move {
501    /// // 1, 2, 3, 4, but in no particular order
502    /// # let mut results = Vec::new();
503    /// # for w in (1..5) {
504    /// #     results.push(stream.next().await.unwrap());
505    /// # }
506    /// # results.sort();
507    /// # assert_eq!(results, vec![1, 2, 3, 4]);
508    /// # }));
509    /// ```
510    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
511    where
512        T: IntoIterator<Item = U>,
513    {
514        self.flat_map_unordered(q!(|d| d))
515    }
516
517    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
518    /// `f`, preserving the order of the elements.
519    ///
520    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
521    /// not modify or take ownership of the values. If you need to modify the values while filtering
522    /// use [`Stream::filter_map`] instead.
523    ///
524    /// # Example
525    /// ```rust
526    /// # use hydro_lang::prelude::*;
527    /// # use futures::StreamExt;
528    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
529    /// process
530    ///     .source_iter(q!(vec![1, 2, 3, 4]))
531    ///     .filter(q!(|&x| x > 2))
532    /// # }, |mut stream| async move {
533    /// // 3, 4
534    /// # for w in (3..5) {
535    /// #     assert_eq!(stream.next().await.unwrap(), w);
536    /// # }
537    /// # }));
538    /// ```
539    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
540    where
541        F: Fn(&T) -> bool + 'a,
542    {
543        let f = f.splice_fn1_borrow_ctx(&self.location).into();
544        Stream::new(
545            self.location.clone(),
546            HydroNode::Filter {
547                f,
548                input: Box::new(self.ir_node.into_inner()),
549                metadata: self.location.new_node_metadata::<T>(),
550            },
551        )
552    }
553
554    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
555    ///
556    /// # Example
557    /// ```rust
558    /// # use hydro_lang::prelude::*;
559    /// # use futures::StreamExt;
560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561    /// process
562    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
563    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
564    /// # }, |mut stream| async move {
565    /// // 1, 2
566    /// # for w in (1..3) {
567    /// #     assert_eq!(stream.next().await.unwrap(), w);
568    /// # }
569    /// # }));
570    /// ```
571    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
572    where
573        F: Fn(T) -> Option<U> + 'a,
574    {
575        let f = f.splice_fn1_ctx(&self.location).into();
576        Stream::new(
577            self.location.clone(),
578            HydroNode::FilterMap {
579                f,
580                input: Box::new(self.ir_node.into_inner()),
581                metadata: self.location.new_node_metadata::<U>(),
582            },
583        )
584    }
585
586    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
587    /// where `x` is the final value of `other`, a bounded [`Singleton`].
588    ///
589    /// # Example
590    /// ```rust
591    /// # use hydro_lang::prelude::*;
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594    /// let tick = process.tick();
595    /// let batch = process
596    ///   .source_iter(q!(vec![1, 2, 3, 4]))
597    ///   .batch(&tick, nondet!(/** test */));
598    /// let count = batch.clone().count(); // `count()` returns a singleton
599    /// batch.cross_singleton(count).all_ticks()
600    /// # }, |mut stream| async move {
601    /// // (1, 4), (2, 4), (3, 4), (4, 4)
602    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
603    /// #     assert_eq!(stream.next().await.unwrap(), w);
604    /// # }
605    /// # }));
606    /// ```
607    pub fn cross_singleton<O2>(
608        self,
609        other: impl Into<Optional<O2, L, Bounded>>,
610    ) -> Stream<(T, O2), L, B, O, R>
611    where
612        O2: Clone,
613    {
614        let other: Optional<O2, L, Bounded> = other.into();
615        check_matching_location(&self.location, &other.location);
616
617        Stream::new(
618            self.location.clone(),
619            HydroNode::CrossSingleton {
620                left: Box::new(self.ir_node.into_inner()),
621                right: Box::new(other.ir_node.into_inner()),
622                metadata: self.location.new_node_metadata::<(T, O2)>(),
623            },
624        )
625    }
626
627    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
628    ///
629    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
630    /// leader of a cluster.
631    ///
632    /// # Example
633    /// ```rust
634    /// # use hydro_lang::prelude::*;
635    /// # use futures::StreamExt;
636    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637    /// let tick = process.tick();
638    /// // ticks are lazy by default, forces the second tick to run
639    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640    ///
641    /// let batch_first_tick = process
642    ///   .source_iter(q!(vec![1, 2, 3, 4]))
643    ///   .batch(&tick, nondet!(/** test */));
644    /// let batch_second_tick = process
645    ///   .source_iter(q!(vec![5, 6, 7, 8]))
646    ///   .batch(&tick, nondet!(/** test */))
647    ///   .defer_tick(); // appears on the second tick
648    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
649    /// batch_first_tick.chain(batch_second_tick)
650    ///   .filter_if_some(some_on_first_tick)
651    ///   .all_ticks()
652    /// # }, |mut stream| async move {
653    /// // [1, 2, 3, 4]
654    /// # for w in vec![1, 2, 3, 4] {
655    /// #     assert_eq!(stream.next().await.unwrap(), w);
656    /// # }
657    /// # }));
658    /// ```
659    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
660        self.cross_singleton(signal.map(q!(|_u| ())))
661            .map(q!(|(d, _signal)| d))
662    }
663
664    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
665    ///
666    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
667    /// some local state.
668    ///
669    /// # Example
670    /// ```rust
671    /// # use hydro_lang::prelude::*;
672    /// # use futures::StreamExt;
673    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674    /// let tick = process.tick();
675    /// // ticks are lazy by default, forces the second tick to run
676    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
677    ///
678    /// let batch_first_tick = process
679    ///   .source_iter(q!(vec![1, 2, 3, 4]))
680    ///   .batch(&tick, nondet!(/** test */));
681    /// let batch_second_tick = process
682    ///   .source_iter(q!(vec![5, 6, 7, 8]))
683    ///   .batch(&tick, nondet!(/** test */))
684    ///   .defer_tick(); // appears on the second tick
685    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
686    /// batch_first_tick.chain(batch_second_tick)
687    ///   .filter_if_none(some_on_first_tick)
688    ///   .all_ticks()
689    /// # }, |mut stream| async move {
690    /// // [5, 6, 7, 8]
691    /// # for w in vec![5, 6, 7, 8] {
692    /// #     assert_eq!(stream.next().await.unwrap(), w);
693    /// # }
694    /// # }));
695    /// ```
696    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
697        self.filter_if_some(
698            other
699                .map(q!(|_| ()))
700                .into_singleton()
701                .filter(q!(|o| o.is_none())),
702        )
703    }
704
705    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
706    /// tupled pairs in a non-deterministic order.
707    ///
708    /// # Example
709    /// ```rust
710    /// # use hydro_lang::prelude::*;
711    /// # use std::collections::HashSet;
712    /// # use futures::StreamExt;
713    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714    /// let tick = process.tick();
715    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
716    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
717    /// stream1.cross_product(stream2)
718    /// # }, |mut stream| async move {
719    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
720    /// # stream.map(|i| assert!(expected.contains(&i)));
721    /// # }));
722    /// ```
723    pub fn cross_product<T2, O2: Ordering>(
724        self,
725        other: Stream<T2, L, B, O2, R>,
726    ) -> Stream<(T, T2), L, B, NoOrder, R>
727    where
728        T: Clone,
729        T2: Clone,
730    {
731        check_matching_location(&self.location, &other.location);
732
733        Stream::new(
734            self.location.clone(),
735            HydroNode::CrossProduct {
736                left: Box::new(self.ir_node.into_inner()),
737                right: Box::new(other.ir_node.into_inner()),
738                metadata: self.location.new_node_metadata::<(T, T2)>(),
739            },
740        )
741    }
742
743    /// Takes one stream as input and filters out any duplicate occurrences. The output
744    /// contains all unique values from the input.
745    ///
746    /// # Example
747    /// ```rust
748    /// # use hydro_lang::prelude::*;
749    /// # use futures::StreamExt;
750    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
751    /// let tick = process.tick();
752    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
753    /// # }, |mut stream| async move {
754    /// # for w in vec![1, 2, 3, 4] {
755    /// #     assert_eq!(stream.next().await.unwrap(), w);
756    /// # }
757    /// # }));
758    /// ```
759    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
760    where
761        T: Eq + Hash,
762    {
763        Stream::new(
764            self.location.clone(),
765            HydroNode::Unique {
766                input: Box::new(self.ir_node.into_inner()),
767                metadata: self.location.new_node_metadata::<T>(),
768            },
769        )
770    }
771
772    /// Outputs everything in this stream that is *not* contained in the `other` stream.
773    ///
774    /// The `other` stream must be [`Bounded`], since this function will wait until
775    /// all its elements are available before producing any output.
776    /// # Example
777    /// ```rust
778    /// # use hydro_lang::prelude::*;
779    /// # use futures::StreamExt;
780    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
781    /// let tick = process.tick();
782    /// let stream = process
783    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
784    ///   .batch(&tick, nondet!(/** test */));
785    /// let batch = process
786    ///   .source_iter(q!(vec![1, 2]))
787    ///   .batch(&tick, nondet!(/** test */));
788    /// stream.filter_not_in(batch).all_ticks()
789    /// # }, |mut stream| async move {
790    /// # for w in vec![3, 4] {
791    /// #     assert_eq!(stream.next().await.unwrap(), w);
792    /// # }
793    /// # }));
794    /// ```
795    pub fn filter_not_in<O2: Ordering>(
796        self,
797        other: Stream<T, L, Bounded, O2, R>,
798    ) -> Stream<T, L, Bounded, O, R>
799    where
800        T: Eq + Hash,
801    {
802        check_matching_location(&self.location, &other.location);
803
804        Stream::new(
805            self.location.clone(),
806            HydroNode::Difference {
807                pos: Box::new(self.ir_node.into_inner()),
808                neg: Box::new(other.ir_node.into_inner()),
809                metadata: self.location.new_node_metadata::<T>(),
810            },
811        )
812    }
813
814    /// An operator which allows you to "inspect" each element of a stream without
815    /// modifying it. The closure `f` is called on a reference to each item. This is
816    /// mainly useful for debugging, and should not be used to generate side-effects.
817    ///
818    /// # Example
819    /// ```rust
820    /// # use hydro_lang::prelude::*;
821    /// # use futures::StreamExt;
822    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
823    /// let nums = process.source_iter(q!(vec![1, 2]));
824    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
825    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
826    /// # }, |mut stream| async move {
827    /// # for w in vec![1, 2] {
828    /// #     assert_eq!(stream.next().await.unwrap(), w);
829    /// # }
830    /// # }));
831    /// ```
832    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
833    where
834        F: Fn(&T) + 'a,
835    {
836        let f = f.splice_fn1_borrow_ctx(&self.location).into();
837
838        if L::is_top_level() {
839            Stream::new(
840                self.location.clone(),
841                HydroNode::Persist {
842                    inner: Box::new(HydroNode::Inspect {
843                        f,
844                        input: Box::new(HydroNode::Unpersist {
845                            inner: Box::new(self.ir_node.into_inner()),
846                            metadata: self.location.new_node_metadata::<T>(),
847                        }),
848                        metadata: self.location.new_node_metadata::<T>(),
849                    }),
850                    metadata: self.location.new_node_metadata::<T>(),
851                },
852            )
853        } else {
854            Stream::new(
855                self.location.clone(),
856                HydroNode::Inspect {
857                    f,
858                    input: Box::new(self.ir_node.into_inner()),
859                    metadata: self.location.new_node_metadata::<T>(),
860                },
861            )
862        }
863    }
864
865    /// An operator which allows you to "name" a `HydroNode`.
866    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
867    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
868        {
869            let mut node = self.ir_node.borrow_mut();
870            let metadata = node.metadata_mut();
871            metadata.tag = Some(name.to_string());
872        }
873        self
874    }
875
876    /// Explicitly "casts" the stream to a type with a different ordering
877    /// guarantee. Useful in unsafe code where the ordering cannot be proven
878    /// by the type-system.
879    ///
880    /// # Non-Determinism
881    /// This function is used as an escape hatch, and any mistakes in the
882    /// provided ordering guarantee will propagate into the guarantees
883    /// for the rest of the program.
884    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
885        Stream::new(self.location, self.ir_node.into_inner())
886    }
887
888    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
889    /// which is always safe because that is the weakest possible guarantee.
890    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
891        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
892        self.assume_ordering::<NoOrder>(nondet)
893    }
894
895    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
896    /// enforcing that `O2` is weaker than the input ordering guarantee.
897    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
898        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
899        self.assume_ordering::<O2>(nondet)
900    }
901
902    /// Explicitly "casts" the stream to a type with a different retries
903    /// guarantee. Useful in unsafe code where the lack of retries cannot
904    /// be proven by the type-system.
905    ///
906    /// # Non-Determinism
907    /// This function is used as an escape hatch, and any mistakes in the
908    /// provided retries guarantee will propagate into the guarantees
909    /// for the rest of the program.
910    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
911        Stream::new(self.location, self.ir_node.into_inner())
912    }
913
914    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
915    /// which is always safe because that is the weakest possible guarantee.
916    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
917        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
918        self.assume_retries::<AtLeastOnce>(nondet)
919    }
920
921    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
922    /// enforcing that `R2` is weaker than the input retries guarantee.
923    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
924        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
925        self.assume_retries::<R2>(nondet)
926    }
927}
928
929impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
930where
931    L: Location<'a>,
932{
933    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
934    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
935    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
936        self.assume_retries(
937            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
938        )
939    }
940}
941
942impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
943where
944    L: Location<'a>,
945{
946    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
947    ///
948    /// # Example
949    /// ```rust
950    /// # use hydro_lang::prelude::*;
951    /// # use futures::StreamExt;
952    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953    /// process.source_iter(q!(&[1, 2, 3])).cloned()
954    /// # }, |mut stream| async move {
955    /// // 1, 2, 3
956    /// # for w in vec![1, 2, 3] {
957    /// #     assert_eq!(stream.next().await.unwrap(), w);
958    /// # }
959    /// # }));
960    /// ```
961    pub fn cloned(self) -> Stream<T, L, B, O, R>
962    where
963        T: Clone,
964    {
965        self.map(q!(|d| d.clone()))
966    }
967}
968
969impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
970where
971    L: Location<'a>,
972{
973    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
974    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
975    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
976    ///
977    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
978    /// and there may be duplicates.
979    ///
980    /// # Example
981    /// ```rust
982    /// # use hydro_lang::prelude::*;
983    /// # use futures::StreamExt;
984    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
985    /// let tick = process.tick();
986    /// let bools = process.source_iter(q!(vec![false, true, false]));
987    /// let batch = bools.batch(&tick, nondet!(/** test */));
988    /// batch
989    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
990    ///     .all_ticks()
991    /// # }, |mut stream| async move {
992    /// // true
993    /// # assert_eq!(stream.next().await.unwrap(), true);
994    /// # }));
995    /// ```
996    pub fn fold_commutative_idempotent<A, I, F>(
997        self,
998        init: impl IntoQuotedMut<'a, I, L>,
999        comb: impl IntoQuotedMut<'a, F, L>,
1000    ) -> Singleton<A, L, B>
1001    where
1002        I: Fn() -> A + 'a,
1003        F: Fn(&mut A, T),
1004    {
1005        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1006        self.assume_ordering(nondet)
1007            .assume_retries(nondet)
1008            .fold(init, comb)
1009    }
1010
1011    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1012    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1013    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1014    /// reference, so that it can be modified in place.
1015    ///
1016    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1017    /// and there may be duplicates.
1018    ///
1019    /// # Example
1020    /// ```rust
1021    /// # use hydro_lang::prelude::*;
1022    /// # use futures::StreamExt;
1023    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1024    /// let tick = process.tick();
1025    /// let bools = process.source_iter(q!(vec![false, true, false]));
1026    /// let batch = bools.batch(&tick, nondet!(/** test */));
1027    /// batch
1028    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1029    ///     .all_ticks()
1030    /// # }, |mut stream| async move {
1031    /// // true
1032    /// # assert_eq!(stream.next().await.unwrap(), true);
1033    /// # }));
1034    /// ```
1035    pub fn reduce_commutative_idempotent<F>(
1036        self,
1037        comb: impl IntoQuotedMut<'a, F, L>,
1038    ) -> Optional<T, L, B>
1039    where
1040        F: Fn(&mut T, T) + 'a,
1041    {
1042        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1043        self.assume_ordering(nondet)
1044            .assume_retries(nondet)
1045            .reduce(comb)
1046    }
1047
1048    /// Computes the maximum element in the stream as an [`Optional`], which
1049    /// will be empty until the first element in the input arrives.
1050    ///
1051    /// # Example
1052    /// ```rust
1053    /// # use hydro_lang::prelude::*;
1054    /// # use futures::StreamExt;
1055    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1056    /// let tick = process.tick();
1057    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1058    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1059    /// batch.max().all_ticks()
1060    /// # }, |mut stream| async move {
1061    /// // 4
1062    /// # assert_eq!(stream.next().await.unwrap(), 4);
1063    /// # }));
1064    /// ```
1065    pub fn max(self) -> Optional<T, L, B>
1066    where
1067        T: Ord,
1068    {
1069        self.reduce_commutative_idempotent(q!(|curr, new| {
1070            if new > *curr {
1071                *curr = new;
1072            }
1073        }))
1074    }
1075
1076    /// Computes the maximum element in the stream as an [`Optional`], where the
1077    /// maximum is determined according to the `key` function. The [`Optional`] will
1078    /// be empty until the first element in the input arrives.
1079    ///
1080    /// # Example
1081    /// ```rust
1082    /// # use hydro_lang::prelude::*;
1083    /// # use futures::StreamExt;
1084    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1085    /// let tick = process.tick();
1086    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1087    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1088    /// batch.max_by_key(q!(|x| -x)).all_ticks()
1089    /// # }, |mut stream| async move {
1090    /// // 1
1091    /// # assert_eq!(stream.next().await.unwrap(), 1);
1092    /// # }));
1093    /// ```
1094    pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1095    where
1096        K: Ord,
1097        F: Fn(&T) -> K + 'a,
1098    {
1099        let f = key.splice_fn1_borrow_ctx(&self.location);
1100
1101        let wrapped: syn::Expr = parse_quote!({
1102            let key_fn = #f;
1103            move |curr, new| {
1104                if key_fn(&new) > key_fn(&*curr) {
1105                    *curr = new;
1106                }
1107            }
1108        });
1109
1110        let mut core = HydroNode::Reduce {
1111            f: wrapped.into(),
1112            input: Box::new(self.ir_node.into_inner()),
1113            metadata: self.location.new_node_metadata::<T>(),
1114        };
1115
1116        if L::is_top_level() {
1117            core = HydroNode::Persist {
1118                inner: Box::new(core),
1119                metadata: self.location.new_node_metadata::<T>(),
1120            };
1121        }
1122
1123        Optional::new(self.location, core)
1124    }
1125
1126    /// Computes the minimum element in the stream as an [`Optional`], which
1127    /// will be empty until the first element in the input arrives.
1128    ///
1129    /// # Example
1130    /// ```rust
1131    /// # use hydro_lang::prelude::*;
1132    /// # use futures::StreamExt;
1133    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1134    /// let tick = process.tick();
1135    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1136    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1137    /// batch.min().all_ticks()
1138    /// # }, |mut stream| async move {
1139    /// // 1
1140    /// # assert_eq!(stream.next().await.unwrap(), 1);
1141    /// # }));
1142    /// ```
1143    pub fn min(self) -> Optional<T, L, B>
1144    where
1145        T: Ord,
1146    {
1147        self.reduce_commutative_idempotent(q!(|curr, new| {
1148            if new < *curr {
1149                *curr = new;
1150            }
1151        }))
1152    }
1153}
1154
1155impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1156where
1157    L: Location<'a>,
1158{
1159    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1160    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1161    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1162    ///
1163    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1164    ///
1165    /// # Example
1166    /// ```rust
1167    /// # use hydro_lang::prelude::*;
1168    /// # use futures::StreamExt;
1169    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1170    /// let tick = process.tick();
1171    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1172    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1173    /// batch
1174    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1175    ///     .all_ticks()
1176    /// # }, |mut stream| async move {
1177    /// // 10
1178    /// # assert_eq!(stream.next().await.unwrap(), 10);
1179    /// # }));
1180    /// ```
1181    pub fn fold_commutative<A, I, F>(
1182        self,
1183        init: impl IntoQuotedMut<'a, I, L>,
1184        comb: impl IntoQuotedMut<'a, F, L>,
1185    ) -> Singleton<A, L, B>
1186    where
1187        I: Fn() -> A + 'a,
1188        F: Fn(&mut A, T),
1189    {
1190        let nondet = nondet!(/** the combinator function is commutative */);
1191        self.assume_ordering(nondet).fold(init, comb)
1192    }
1193
1194    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1195    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1196    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1197    /// reference, so that it can be modified in place.
1198    ///
1199    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1200    ///
1201    /// # Example
1202    /// ```rust
1203    /// # use hydro_lang::prelude::*;
1204    /// # use futures::StreamExt;
1205    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1206    /// let tick = process.tick();
1207    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1208    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1209    /// batch
1210    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1211    ///     .all_ticks()
1212    /// # }, |mut stream| async move {
1213    /// // 10
1214    /// # assert_eq!(stream.next().await.unwrap(), 10);
1215    /// # }));
1216    /// ```
1217    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1218    where
1219        F: Fn(&mut T, T) + 'a,
1220    {
1221        let nondet = nondet!(/** the combinator function is commutative */);
1222        self.assume_ordering(nondet).reduce(comb)
1223    }
1224
1225    /// Computes the number of elements in the stream as a [`Singleton`].
1226    ///
1227    /// # Example
1228    /// ```rust
1229    /// # use hydro_lang::prelude::*;
1230    /// # use futures::StreamExt;
1231    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1232    /// let tick = process.tick();
1233    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1234    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1235    /// batch.count().all_ticks()
1236    /// # }, |mut stream| async move {
1237    /// // 4
1238    /// # assert_eq!(stream.next().await.unwrap(), 4);
1239    /// # }));
1240    /// ```
1241    pub fn count(self) -> Singleton<usize, L, B> {
1242        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1243    }
1244}
1245
1246impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1247where
1248    L: Location<'a>,
1249{
1250    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1251    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1252    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1253    ///
1254    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1255    ///
1256    /// # Example
1257    /// ```rust
1258    /// # use hydro_lang::prelude::*;
1259    /// # use futures::StreamExt;
1260    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1261    /// let tick = process.tick();
1262    /// let bools = process.source_iter(q!(vec![false, true, false]));
1263    /// let batch = bools.batch(&tick, nondet!(/** test */));
1264    /// batch
1265    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1266    ///     .all_ticks()
1267    /// # }, |mut stream| async move {
1268    /// // true
1269    /// # assert_eq!(stream.next().await.unwrap(), true);
1270    /// # }));
1271    /// ```
1272    pub fn fold_idempotent<A, I, F>(
1273        self,
1274        init: impl IntoQuotedMut<'a, I, L>,
1275        comb: impl IntoQuotedMut<'a, F, L>,
1276    ) -> Singleton<A, L, B>
1277    where
1278        I: Fn() -> A + 'a,
1279        F: Fn(&mut A, T),
1280    {
1281        let nondet = nondet!(/** the combinator function is idempotent */);
1282        self.assume_retries(nondet).fold(init, comb)
1283    }
1284
1285    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1286    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1287    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1288    /// reference, so that it can be modified in place.
1289    ///
1290    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1291    ///
1292    /// # Example
1293    /// ```rust
1294    /// # use hydro_lang::prelude::*;
1295    /// # use futures::StreamExt;
1296    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1297    /// let tick = process.tick();
1298    /// let bools = process.source_iter(q!(vec![false, true, false]));
1299    /// let batch = bools.batch(&tick, nondet!(/** test */));
1300    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1301    /// # }, |mut stream| async move {
1302    /// // true
1303    /// # assert_eq!(stream.next().await.unwrap(), true);
1304    /// # }));
1305    /// ```
1306    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1307    where
1308        F: Fn(&mut T, T) + 'a,
1309    {
1310        let nondet = nondet!(/** the combinator function is idempotent */);
1311        self.assume_retries(nondet).reduce(comb)
1312    }
1313
1314    /// Computes the first element in the stream as an [`Optional`], which
1315    /// will be empty until the first element in the input arrives.
1316    ///
1317    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1318    /// re-ordering of elements may cause the first element to change.
1319    ///
1320    /// # Example
1321    /// ```rust
1322    /// # use hydro_lang::prelude::*;
1323    /// # use futures::StreamExt;
1324    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325    /// let tick = process.tick();
1326    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1327    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1328    /// batch.first().all_ticks()
1329    /// # }, |mut stream| async move {
1330    /// // 1
1331    /// # assert_eq!(stream.next().await.unwrap(), 1);
1332    /// # }));
1333    /// ```
1334    pub fn first(self) -> Optional<T, L, B> {
1335        self.reduce_idempotent(q!(|_, _| {}))
1336    }
1337
1338    /// Computes the last element in the stream as an [`Optional`], which
1339    /// will be empty until an element in the input arrives.
1340    ///
1341    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1342    /// re-ordering of elements may cause the last element to change.
1343    ///
1344    /// # Example
1345    /// ```rust
1346    /// # use hydro_lang::prelude::*;
1347    /// # use futures::StreamExt;
1348    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1349    /// let tick = process.tick();
1350    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1351    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1352    /// batch.last().all_ticks()
1353    /// # }, |mut stream| async move {
1354    /// // 4
1355    /// # assert_eq!(stream.next().await.unwrap(), 4);
1356    /// # }));
1357    /// ```
1358    pub fn last(self) -> Optional<T, L, B> {
1359        self.reduce_idempotent(q!(|curr, new| *curr = new))
1360    }
1361}
1362
1363impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1364where
1365    L: Location<'a>,
1366{
1367    /// Returns a stream with the current count tupled with each element in the input stream.
1368    ///
1369    /// # Example
1370    /// ```rust
1371    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1372    /// # use futures::StreamExt;
1373    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1374    /// let tick = process.tick();
1375    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1376    /// numbers.enumerate()
1377    /// # }, |mut stream| async move {
1378    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1379    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1380    /// #     assert_eq!(stream.next().await.unwrap(), w);
1381    /// # }
1382    /// # }));
1383    /// ```
1384    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1385        if L::is_top_level() {
1386            Stream::new(
1387                self.location.clone(),
1388                HydroNode::Persist {
1389                    inner: Box::new(HydroNode::Enumerate {
1390                        is_static: true,
1391                        input: Box::new(HydroNode::Unpersist {
1392                            inner: Box::new(self.ir_node.into_inner()),
1393                            metadata: self.location.new_node_metadata::<T>(),
1394                        }),
1395                        metadata: self.location.new_node_metadata::<(usize, T)>(),
1396                    }),
1397                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1398                },
1399            )
1400        } else {
1401            Stream::new(
1402                self.location.clone(),
1403                HydroNode::Enumerate {
1404                    is_static: false,
1405                    input: Box::new(self.ir_node.into_inner()),
1406                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1407                },
1408            )
1409        }
1410    }
1411
1412    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1413    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1414    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1415    ///
1416    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1417    /// to depend on the order of elements in the stream.
1418    ///
1419    /// # Example
1420    /// ```rust
1421    /// # use hydro_lang::prelude::*;
1422    /// # use futures::StreamExt;
1423    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1424    /// let tick = process.tick();
1425    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1426    /// let batch = words.batch(&tick, nondet!(/** test */));
1427    /// batch
1428    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1429    ///     .all_ticks()
1430    /// # }, |mut stream| async move {
1431    /// // "HELLOWORLD"
1432    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1433    /// # }));
1434    /// ```
1435    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1436        self,
1437        init: impl IntoQuotedMut<'a, I, L>,
1438        comb: impl IntoQuotedMut<'a, F, L>,
1439    ) -> Singleton<A, L, B> {
1440        let init = init.splice_fn0_ctx(&self.location).into();
1441        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1442
1443        let mut core = HydroNode::Fold {
1444            init,
1445            acc: comb,
1446            input: Box::new(self.ir_node.into_inner()),
1447            metadata: self.location.new_node_metadata::<A>(),
1448        };
1449
1450        if L::is_top_level() {
1451            // top-level (possibly unbounded) singletons are represented as
1452            // a stream which produces all values from all ticks every tick,
1453            // so Unpersist will always give the lastest aggregation
1454            core = HydroNode::Persist {
1455                inner: Box::new(core),
1456                metadata: self.location.new_node_metadata::<A>(),
1457            };
1458        }
1459
1460        Singleton::new(self.location, core)
1461    }
1462
1463    /// Collects all the elements of this stream into a single [`Vec`] element.
1464    ///
1465    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1466    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1467    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1468    /// the vector at an arbitrary point in time.
1469    ///
1470    /// # Example
1471    /// ```rust
1472    /// # use hydro_lang::prelude::*;
1473    /// # use futures::StreamExt;
1474    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1475    /// let tick = process.tick();
1476    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1477    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1478    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1479    /// # }, |mut stream| async move {
1480    /// // [ vec![1, 2, 3, 4] ]
1481    /// # for w in vec![vec![1, 2, 3, 4]] {
1482    /// #     assert_eq!(stream.next().await.unwrap(), w);
1483    /// # }
1484    /// # }));
1485    /// ```
1486    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1487        self.fold(
1488            q!(|| vec![]),
1489            q!(|acc, v| {
1490                acc.push(v);
1491            }),
1492        )
1493    }
1494
1495    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1496    /// and emitting each intermediate result.
1497    ///
1498    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1499    /// containing all intermediate accumulated values. The scan operation can also terminate early
1500    /// by returning `None`.
1501    ///
1502    /// The function takes a mutable reference to the accumulator and the current element, and returns
1503    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1504    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1505    ///
1506    /// # Examples
1507    ///
1508    /// Basic usage - running sum:
1509    /// ```rust
1510    /// # use hydro_lang::prelude::*;
1511    /// # use futures::StreamExt;
1512    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1514    ///     q!(|| 0),
1515    ///     q!(|acc, x| {
1516    ///         *acc += x;
1517    ///         Some(*acc)
1518    ///     }),
1519    /// )
1520    /// # }, |mut stream| async move {
1521    /// // Output: 1, 3, 6, 10
1522    /// # for w in vec![1, 3, 6, 10] {
1523    /// #     assert_eq!(stream.next().await.unwrap(), w);
1524    /// # }
1525    /// # }));
1526    /// ```
1527    ///
1528    /// Early termination example:
1529    /// ```rust
1530    /// # use hydro_lang::prelude::*;
1531    /// # use futures::StreamExt;
1532    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1533    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1534    ///     q!(|| 1),
1535    ///     q!(|state, x| {
1536    ///         *state = *state * x;
1537    ///         if *state > 6 {
1538    ///             None // Terminate the stream
1539    ///         } else {
1540    ///             Some(-*state)
1541    ///         }
1542    ///     }),
1543    /// )
1544    /// # }, |mut stream| async move {
1545    /// // Output: -1, -2, -6
1546    /// # for w in vec![-1, -2, -6] {
1547    /// #     assert_eq!(stream.next().await.unwrap(), w);
1548    /// # }
1549    /// # }));
1550    /// ```
1551    pub fn scan<A, U, I, F>(
1552        self,
1553        init: impl IntoQuotedMut<'a, I, L>,
1554        f: impl IntoQuotedMut<'a, F, L>,
1555    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1556    where
1557        I: Fn() -> A + 'a,
1558        F: Fn(&mut A, T) -> Option<U> + 'a,
1559    {
1560        let init = init.splice_fn0_ctx(&self.location).into();
1561        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1562
1563        if L::is_top_level() {
1564            Stream::new(
1565                self.location.clone(),
1566                HydroNode::Persist {
1567                    inner: Box::new(HydroNode::Scan {
1568                        init,
1569                        acc: f,
1570                        input: Box::new(self.ir_node.into_inner()),
1571                        metadata: self.location.new_node_metadata::<U>(),
1572                    }),
1573                    metadata: self.location.new_node_metadata::<U>(),
1574                },
1575            )
1576        } else {
1577            Stream::new(
1578                self.location.clone(),
1579                HydroNode::Scan {
1580                    init,
1581                    acc: f,
1582                    input: Box::new(self.ir_node.into_inner()),
1583                    metadata: self.location.new_node_metadata::<U>(),
1584                },
1585            )
1586        }
1587    }
1588
1589    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1590    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1591    /// until the first element in the input arrives.
1592    ///
1593    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1594    /// to depend on the order of elements in the stream.
1595    ///
1596    /// # Example
1597    /// ```rust
1598    /// # use hydro_lang::prelude::*;
1599    /// # use futures::StreamExt;
1600    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1601    /// let tick = process.tick();
1602    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1603    /// let batch = words.batch(&tick, nondet!(/** test */));
1604    /// batch
1605    ///     .map(q!(|x| x.to_string()))
1606    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1607    ///     .all_ticks()
1608    /// # }, |mut stream| async move {
1609    /// // "HELLOWORLD"
1610    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1611    /// # }));
1612    /// ```
1613    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1614        self,
1615        comb: impl IntoQuotedMut<'a, F, L>,
1616    ) -> Optional<T, L, B> {
1617        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1618        let mut core = HydroNode::Reduce {
1619            f,
1620            input: Box::new(self.ir_node.into_inner()),
1621            metadata: self.location.new_node_metadata::<T>(),
1622        };
1623
1624        if L::is_top_level() {
1625            core = HydroNode::Persist {
1626                inner: Box::new(core),
1627                metadata: self.location.new_node_metadata::<T>(),
1628            };
1629        }
1630
1631        Optional::new(self.location, core)
1632    }
1633}
1634
1635impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
1636    Stream<T, L, Unbounded, O, R>
1637{
1638    /// Produces a new stream that interleaves the elements of the two input streams.
1639    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1640    ///
1641    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1642    /// [`Bounded`], you can use [`Stream::chain`] instead.
1643    ///
1644    /// # Example
1645    /// ```rust
1646    /// # use hydro_lang::prelude::*;
1647    /// # use futures::StreamExt;
1648    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1649    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1650    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1651    /// # }, |mut stream| async move {
1652    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1653    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1654    /// #     assert_eq!(stream.next().await.unwrap(), w);
1655    /// # }
1656    /// # }));
1657    /// ```
1658    pub fn interleave<O2: Ordering, R2: Retries>(
1659        self,
1660        other: Stream<T, L, Unbounded, O2, R2>,
1661    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1662    where
1663        R: MinRetries<R2>,
1664    {
1665        let tick = self.location.tick();
1666        // Because the outputs are unordered, we can interleave batches from both streams.
1667        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1668        self.batch(&tick, nondet_batch_interleaving)
1669            .weakest_ordering()
1670            .chain(
1671                other
1672                    .batch(&tick, nondet_batch_interleaving)
1673                    .weakest_ordering(),
1674            )
1675            .all_ticks()
1676    }
1677}
1678
1679impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1680where
1681    L: Location<'a>,
1682{
1683    /// Produces a new stream that emits the input elements in sorted order.
1684    ///
1685    /// The input stream can have any ordering guarantee, but the output stream
1686    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1687    /// elements in the input stream are available, so it requires the input stream
1688    /// to be [`Bounded`].
1689    ///
1690    /// # Example
1691    /// ```rust
1692    /// # use hydro_lang::prelude::*;
1693    /// # use futures::StreamExt;
1694    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1695    /// let tick = process.tick();
1696    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1697    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1698    /// batch.sort().all_ticks()
1699    /// # }, |mut stream| async move {
1700    /// // 1, 2, 3, 4
1701    /// # for w in (1..5) {
1702    /// #     assert_eq!(stream.next().await.unwrap(), w);
1703    /// # }
1704    /// # }));
1705    /// ```
1706    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1707    where
1708        T: Ord,
1709    {
1710        Stream::new(
1711            self.location.clone(),
1712            HydroNode::Sort {
1713                input: Box::new(self.ir_node.into_inner()),
1714                metadata: self.location.new_node_metadata::<T>(),
1715            },
1716        )
1717    }
1718
1719    /// Produces a new stream that first emits the elements of the `self` stream,
1720    /// and then emits the elements of the `other` stream. The output stream has
1721    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1722    /// [`TotalOrder`] guarantee.
1723    ///
1724    /// Currently, both input streams must be [`Bounded`]. This operator will block
1725    /// on the first stream until all its elements are available. In a future version,
1726    /// we will relax the requirement on the `other` stream.
1727    ///
1728    /// # Example
1729    /// ```rust
1730    /// # use hydro_lang::prelude::*;
1731    /// # use futures::StreamExt;
1732    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1733    /// let tick = process.tick();
1734    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1735    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1736    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1737    /// # }, |mut stream| async move {
1738    /// // 2, 3, 4, 5, 1, 2, 3, 4
1739    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1740    /// #     assert_eq!(stream.next().await.unwrap(), w);
1741    /// # }
1742    /// # }));
1743    /// ```
1744    pub fn chain<O2: Ordering, R2: Retries>(
1745        self,
1746        other: Stream<T, L, Bounded, O2, R2>,
1747    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1748    where
1749        O: MinOrder<O2>,
1750        R: MinRetries<R2>,
1751    {
1752        check_matching_location(&self.location, &other.location);
1753
1754        Stream::new(
1755            self.location.clone(),
1756            HydroNode::Chain {
1757                first: Box::new(self.ir_node.into_inner()),
1758                second: Box::new(other.ir_node.into_inner()),
1759                metadata: self.location.new_node_metadata::<T>(),
1760            },
1761        )
1762    }
1763
1764    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1765    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1766    /// because this is compiled into a nested loop.
1767    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1768        self,
1769        other: Stream<T2, L, Bounded, O2, R>,
1770    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1771    where
1772        T: Clone,
1773        T2: Clone,
1774    {
1775        check_matching_location(&self.location, &other.location);
1776
1777        Stream::new(
1778            self.location.clone(),
1779            HydroNode::CrossProduct {
1780                left: Box::new(self.ir_node.into_inner()),
1781                right: Box::new(other.ir_node.into_inner()),
1782                metadata: self.location.new_node_metadata::<(T, T2)>(),
1783            },
1784        )
1785    }
1786}
1787
1788impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1789where
1790    L: Location<'a>,
1791{
1792    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1793    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1794    /// by equi-joining the two streams on the key attribute `K`.
1795    ///
1796    /// # Example
1797    /// ```rust
1798    /// # use hydro_lang::prelude::*;
1799    /// # use std::collections::HashSet;
1800    /// # use futures::StreamExt;
1801    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1802    /// let tick = process.tick();
1803    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1804    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1805    /// stream1.join(stream2)
1806    /// # }, |mut stream| async move {
1807    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1808    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1809    /// # stream.map(|i| assert!(expected.contains(&i)));
1810    /// # }));
1811    pub fn join<V2, O2: Ordering, R2: Retries>(
1812        self,
1813        n: Stream<(K, V2), L, B, O2, R2>,
1814    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1815    where
1816        K: Eq + Hash,
1817        R: MinRetries<R2>,
1818    {
1819        check_matching_location(&self.location, &n.location);
1820
1821        Stream::new(
1822            self.location.clone(),
1823            HydroNode::Join {
1824                left: Box::new(self.ir_node.into_inner()),
1825                right: Box::new(n.ir_node.into_inner()),
1826                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1827            },
1828        )
1829    }
1830
1831    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1832    /// computes the anti-join of the items in the input -- i.e. returns
1833    /// unique items in the first input that do not have a matching key
1834    /// in the second input.
1835    ///
1836    /// # Example
1837    /// ```rust
1838    /// # use hydro_lang::prelude::*;
1839    /// # use futures::StreamExt;
1840    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1841    /// let tick = process.tick();
1842    /// let stream = process
1843    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1844    ///   .batch(&tick, nondet!(/** test */));
1845    /// let batch = process
1846    ///   .source_iter(q!(vec![1, 2]))
1847    ///   .batch(&tick, nondet!(/** test */));
1848    /// stream.anti_join(batch).all_ticks()
1849    /// # }, |mut stream| async move {
1850    /// # for w in vec![(3, 'c'), (4, 'd')] {
1851    /// #     assert_eq!(stream.next().await.unwrap(), w);
1852    /// # }
1853    /// # }));
1854    pub fn anti_join<O2: Ordering, R2: Retries>(
1855        self,
1856        n: Stream<K, L, Bounded, O2, R2>,
1857    ) -> Stream<(K, V1), L, B, O, R>
1858    where
1859        K: Eq + Hash,
1860    {
1861        check_matching_location(&self.location, &n.location);
1862
1863        Stream::new(
1864            self.location.clone(),
1865            HydroNode::AntiJoin {
1866                pos: Box::new(self.ir_node.into_inner()),
1867                neg: Box::new(n.ir_node.into_inner()),
1868                metadata: self.location.new_node_metadata::<(K, V1)>(),
1869            },
1870        )
1871    }
1872}
1873
1874impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1875    Stream<(K, V), L, B, O, R>
1876{
1877    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1878    /// is used as the key and the second element is added to the entries associated with that key.
1879    ///
1880    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1881    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1882    /// performing grouped aggregations, but also for more precise ordering guarantees such as
1883    /// total ordering _within_ each group but no ordering _across_ groups.
1884    ///
1885    /// # Example
1886    /// ```rust
1887    /// # use hydro_lang::prelude::*;
1888    /// # use futures::StreamExt;
1889    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1890    /// process
1891    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1892    ///     .into_keyed()
1893    /// #   .entries()
1894    /// # }, |mut stream| async move {
1895    /// // { 1: [2, 3], 2: [4] }
1896    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1897    /// #     assert_eq!(stream.next().await.unwrap(), w);
1898    /// # }
1899    /// # }));
1900    /// ```
1901    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1902        KeyedStream {
1903            underlying: self.weakest_ordering(),
1904            _phantom_order: Default::default(),
1905        }
1906    }
1907}
1908
1909impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1910where
1911    K: Eq + Hash,
1912    L: Location<'a>,
1913{
1914    #[deprecated = "use .into_keyed().fold(...) instead"]
1915    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1916    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1917    /// in the second element are accumulated via the `comb` closure.
1918    ///
1919    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1920    /// to depend on the order of elements in the stream.
1921    ///
1922    /// If the input and output value types are the same and do not require initialization then use
1923    /// [`Stream::reduce_keyed`].
1924    ///
1925    /// # Example
1926    /// ```rust
1927    /// # use hydro_lang::prelude::*;
1928    /// # use futures::StreamExt;
1929    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1930    /// let tick = process.tick();
1931    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1932    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1933    /// batch
1934    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1935    ///     .all_ticks()
1936    /// # }, |mut stream| async move {
1937    /// // (1, 5), (2, 7)
1938    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1939    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1940    /// # }));
1941    /// ```
1942    pub fn fold_keyed<A, I, F>(
1943        self,
1944        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1945        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1946    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1947    where
1948        I: Fn() -> A + 'a,
1949        F: Fn(&mut A, V) + 'a,
1950    {
1951        self.into_keyed().fold(init, comb).entries()
1952    }
1953
1954    #[deprecated = "use .into_keyed().reduce(...) instead"]
1955    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1956    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1957    /// in the second element are accumulated via the `comb` closure.
1958    ///
1959    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1960    /// to depend on the order of elements in the stream.
1961    ///
1962    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1963    ///
1964    /// # Example
1965    /// ```rust
1966    /// # use hydro_lang::prelude::*;
1967    /// # use futures::StreamExt;
1968    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1969    /// let tick = process.tick();
1970    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1971    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1972    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1973    /// # }, |mut stream| async move {
1974    /// // (1, 5), (2, 7)
1975    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1976    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1977    /// # }));
1978    /// ```
1979    pub fn reduce_keyed<F>(
1980        self,
1981        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1982    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1983    where
1984        F: Fn(&mut V, V) + 'a,
1985    {
1986        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1987
1988        Stream::new(
1989            self.location.clone(),
1990            HydroNode::ReduceKeyed {
1991                f,
1992                input: Box::new(self.ir_node.into_inner()),
1993                metadata: self.location.new_node_metadata::<(K, V)>(),
1994            },
1995        )
1996    }
1997}
1998
1999impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2000where
2001    K: Eq + Hash,
2002    L: Location<'a>,
2003{
2004    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2005    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2006    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2007    /// in the second element are accumulated via the `comb` closure.
2008    ///
2009    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2010    /// as there may be non-deterministic duplicates.
2011    ///
2012    /// If the input and output value types are the same and do not require initialization then use
2013    /// [`Stream::reduce_keyed_commutative_idempotent`].
2014    ///
2015    /// # Example
2016    /// ```rust
2017    /// # use hydro_lang::prelude::*;
2018    /// # use futures::StreamExt;
2019    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2020    /// let tick = process.tick();
2021    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2022    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2023    /// batch
2024    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2025    ///     .all_ticks()
2026    /// # }, |mut stream| async move {
2027    /// // (1, false), (2, true)
2028    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2029    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2030    /// # }));
2031    /// ```
2032    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2033        self,
2034        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2035        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2036    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2037    where
2038        I: Fn() -> A + 'a,
2039        F: Fn(&mut A, V) + 'a,
2040    {
2041        self.into_keyed()
2042            .fold_commutative_idempotent(init, comb)
2043            .entries()
2044    }
2045
2046    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2047    /// # Example
2048    /// ```rust
2049    /// # use hydro_lang::prelude::*;
2050    /// # use futures::StreamExt;
2051    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2052    /// let tick = process.tick();
2053    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2054    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2055    /// batch.keys().all_ticks()
2056    /// # }, |mut stream| async move {
2057    /// // 1, 2
2058    /// # assert_eq!(stream.next().await.unwrap(), 1);
2059    /// # assert_eq!(stream.next().await.unwrap(), 2);
2060    /// # }));
2061    /// ```
2062    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2063        self.into_keyed()
2064            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2065            .keys()
2066    }
2067
2068    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2069    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2070    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2071    /// in the second element are accumulated via the `comb` closure.
2072    ///
2073    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2074    /// as there may be non-deterministic duplicates.
2075    ///
2076    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2077    ///
2078    /// # Example
2079    /// ```rust
2080    /// # use hydro_lang::prelude::*;
2081    /// # use futures::StreamExt;
2082    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2083    /// let tick = process.tick();
2084    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2085    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2086    /// batch
2087    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2088    ///     .all_ticks()
2089    /// # }, |mut stream| async move {
2090    /// // (1, false), (2, true)
2091    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2092    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2093    /// # }));
2094    /// ```
2095    pub fn reduce_keyed_commutative_idempotent<F>(
2096        self,
2097        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2098    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2099    where
2100        F: Fn(&mut V, V) + 'a,
2101    {
2102        self.into_keyed()
2103            .reduce_commutative_idempotent(comb)
2104            .entries()
2105    }
2106}
2107
2108impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2109where
2110    K: Eq + Hash,
2111    L: Location<'a>,
2112{
2113    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2114    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2115    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2116    /// in the second element are accumulated via the `comb` closure.
2117    ///
2118    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2119    ///
2120    /// If the input and output value types are the same and do not require initialization then use
2121    /// [`Stream::reduce_keyed_commutative`].
2122    ///
2123    /// # Example
2124    /// ```rust
2125    /// # use hydro_lang::prelude::*;
2126    /// # use futures::StreamExt;
2127    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2128    /// let tick = process.tick();
2129    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2130    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2131    /// batch
2132    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2133    ///     .all_ticks()
2134    /// # }, |mut stream| async move {
2135    /// // (1, 5), (2, 7)
2136    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2137    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2138    /// # }));
2139    /// ```
2140    pub fn fold_keyed_commutative<A, I, F>(
2141        self,
2142        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2143        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2144    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2145    where
2146        I: Fn() -> A + 'a,
2147        F: Fn(&mut A, V) + 'a,
2148    {
2149        self.into_keyed().fold_commutative(init, comb).entries()
2150    }
2151
2152    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2153    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2154    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2155    /// in the second element are accumulated via the `comb` closure.
2156    ///
2157    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2158    ///
2159    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2160    ///
2161    /// # Example
2162    /// ```rust
2163    /// # use hydro_lang::prelude::*;
2164    /// # use futures::StreamExt;
2165    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2166    /// let tick = process.tick();
2167    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2168    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2169    /// batch
2170    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2171    ///     .all_ticks()
2172    /// # }, |mut stream| async move {
2173    /// // (1, 5), (2, 7)
2174    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2175    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2176    /// # }));
2177    /// ```
2178    pub fn reduce_keyed_commutative<F>(
2179        self,
2180        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2181    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2182    where
2183        F: Fn(&mut V, V) + 'a,
2184    {
2185        self.into_keyed().reduce_commutative(comb).entries()
2186    }
2187}
2188
2189impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2190where
2191    K: Eq + Hash,
2192    L: Location<'a>,
2193{
2194    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2195    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2196    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2197    /// in the second element are accumulated via the `comb` closure.
2198    ///
2199    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2200    ///
2201    /// If the input and output value types are the same and do not require initialization then use
2202    /// [`Stream::reduce_keyed_idempotent`].
2203    ///
2204    /// # Example
2205    /// ```rust
2206    /// # use hydro_lang::prelude::*;
2207    /// # use futures::StreamExt;
2208    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2209    /// let tick = process.tick();
2210    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2211    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2212    /// batch
2213    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2214    ///     .all_ticks()
2215    /// # }, |mut stream| async move {
2216    /// // (1, false), (2, true)
2217    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2218    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2219    /// # }));
2220    /// ```
2221    pub fn fold_keyed_idempotent<A, I, F>(
2222        self,
2223        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2224        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2225    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2226    where
2227        I: Fn() -> A + 'a,
2228        F: Fn(&mut A, V) + 'a,
2229    {
2230        self.into_keyed().fold_idempotent(init, comb).entries()
2231    }
2232
2233    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2234    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2235    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2236    /// in the second element are accumulated via the `comb` closure.
2237    ///
2238    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2239    ///
2240    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2241    ///
2242    /// # Example
2243    /// ```rust
2244    /// # use hydro_lang::prelude::*;
2245    /// # use futures::StreamExt;
2246    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2247    /// let tick = process.tick();
2248    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2249    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2250    /// batch
2251    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2252    ///     .all_ticks()
2253    /// # }, |mut stream| async move {
2254    /// // (1, false), (2, true)
2255    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2256    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2257    /// # }));
2258    /// ```
2259    pub fn reduce_keyed_idempotent<F>(
2260        self,
2261        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2262    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2263    where
2264        F: Fn(&mut V, V) + 'a,
2265    {
2266        self.into_keyed().reduce_idempotent(comb).entries()
2267    }
2268}
2269
2270impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2271where
2272    L: Location<'a> + NoTick,
2273{
2274    /// Returns a stream corresponding to the latest batch of elements being atomically
2275    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2276    /// the order of the input.
2277    ///
2278    /// # Non-Determinism
2279    /// The batch boundaries are non-deterministic and may change across executions.
2280    pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2281        Stream::new(
2282            self.location.clone().tick,
2283            HydroNode::Unpersist {
2284                inner: Box::new(self.ir_node.into_inner()),
2285                metadata: self.location.new_node_metadata::<T>(),
2286            },
2287        )
2288    }
2289
2290    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2291    /// See [`Stream::atomic`] for more details.
2292    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2293        Stream::new(self.location.tick.l, self.ir_node.into_inner())
2294    }
2295
2296    /// Gets the [`Tick`] inside which this stream is synchronously processed. See [`Stream::atomic`].
2297    pub fn atomic_source(&self) -> Tick<L> {
2298        self.location.tick.clone()
2299    }
2300}
2301
2302impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2303where
2304    L: Location<'a> + NoTick + NoAtomic,
2305{
2306    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2307    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2308    ///
2309    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2310    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2311    /// argument that declares where the stream will be atomically processed. Batching a stream into
2312    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2313    /// [`Tick`] will introduce asynchrony.
2314    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2315        Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2316    }
2317
2318    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2319    /// Future outputs are produced as available, regardless of input arrival order.
2320    ///
2321    /// # Example
2322    /// ```rust
2323    /// # use std::collections::HashSet;
2324    /// # use futures::StreamExt;
2325    /// # use hydro_lang::prelude::*;
2326    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2327    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2328    ///     .map(q!(|x| async move {
2329    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2330    ///         x
2331    ///     }))
2332    ///     .resolve_futures()
2333    /// #   },
2334    /// #   |mut stream| async move {
2335    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2336    /// #       let mut output = HashSet::new();
2337    /// #       for _ in 1..10 {
2338    /// #           output.insert(stream.next().await.unwrap());
2339    /// #       }
2340    /// #       assert_eq!(
2341    /// #           output,
2342    /// #           HashSet::<i32>::from_iter(1..10)
2343    /// #       );
2344    /// #   },
2345    /// # ));
2346    pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2347    where
2348        T: Future<Output = T2>,
2349    {
2350        Stream::new(
2351            self.location.clone(),
2352            HydroNode::ResolveFutures {
2353                input: Box::new(self.ir_node.into_inner()),
2354                metadata: self.location.new_node_metadata::<T2>(),
2355            },
2356        )
2357    }
2358
2359    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2360    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2361    /// the order of the input. The output stream will execute in the [`Tick`] that was
2362    /// used to create the atomic section.
2363    ///
2364    /// # Non-Determinism
2365    /// The batch boundaries are non-deterministic and may change across executions.
2366    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2367        self.atomic(tick).batch(nondet)
2368    }
2369
2370    /// Given a time interval, returns a stream corresponding to samples taken from the
2371    /// stream roughly at that interval. The output will have elements in the same order
2372    /// as the input, but with arbitrary elements skipped between samples. There is also
2373    /// no guarantee on the exact timing of the samples.
2374    ///
2375    /// # Non-Determinism
2376    /// The output stream is non-deterministic in which elements are sampled, since this
2377    /// is controlled by a clock.
2378    pub fn sample_every(
2379        self,
2380        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2381        nondet: NonDet,
2382    ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2383        let samples = self.location.source_interval(interval, nondet);
2384
2385        let tick = self.location.tick();
2386        self.batch(&tick, nondet)
2387            .filter_if_some(samples.batch(&tick, nondet).first())
2388            .all_ticks()
2389            .weakest_retries()
2390    }
2391
2392    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2393    /// stream has not emitted a value since that duration.
2394    ///
2395    /// # Non-Determinism
2396    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2397    /// samples take place, timeouts may be non-deterministically generated or missed,
2398    /// and the notification of the timeout may be delayed as well. There is also no
2399    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2400    /// detected based on when the next sample is taken.
2401    pub fn timeout(
2402        self,
2403        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2404        nondet: NonDet,
2405    ) -> Optional<(), L, Unbounded> {
2406        let tick = self.location.tick();
2407
2408        let latest_received = self.assume_retries(nondet).fold_commutative(
2409            q!(|| None),
2410            q!(|latest, _| {
2411                *latest = Some(Instant::now());
2412            }),
2413        );
2414
2415        latest_received
2416            .snapshot(&tick, nondet)
2417            .filter_map(q!(move |latest_received| {
2418                if let Some(latest_received) = latest_received {
2419                    if Instant::now().duration_since(latest_received) > duration {
2420                        Some(())
2421                    } else {
2422                        None
2423                    }
2424                } else {
2425                    Some(())
2426                }
2427            }))
2428            .latest()
2429    }
2430}
2431
2432impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2433where
2434    L: Location<'a> + NoTick + NoAtomic,
2435    F: Future<Output = T>,
2436{
2437    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2438    /// Future outputs are produced in the same order as the input stream.
2439    ///
2440    /// # Example
2441    /// ```rust
2442    /// # use std::collections::HashSet;
2443    /// # use futures::StreamExt;
2444    /// # use hydro_lang::prelude::*;
2445    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2446    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2447    ///     .map(q!(|x| async move {
2448    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2449    ///         x
2450    ///     }))
2451    ///     .resolve_futures_ordered()
2452    /// #   },
2453    /// #   |mut stream| async move {
2454    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2455    /// #       let mut output = Vec::new();
2456    /// #       for _ in 1..10 {
2457    /// #           output.push(stream.next().await.unwrap());
2458    /// #       }
2459    /// #       assert_eq!(
2460    /// #           output,
2461    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2462    /// #       );
2463    /// #   },
2464    /// # ));
2465    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2466        Stream::new(
2467            self.location.clone(),
2468            HydroNode::ResolveFuturesOrdered {
2469                input: Box::new(self.ir_node.into_inner()),
2470                metadata: self.location.new_node_metadata::<T>(),
2471            },
2472        )
2473    }
2474}
2475
2476impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2477where
2478    L: Location<'a> + NoTick,
2479{
2480    /// Executes the provided closure for every element in this stream.
2481    ///
2482    /// Because the closure may have side effects, the stream must have deterministic order
2483    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2484    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2485    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2486    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2487        let f = f.splice_fn1_ctx(&self.location).into();
2488        let metadata = self.location.new_node_metadata::<T>();
2489        self.location
2490            .flow_state()
2491            .borrow_mut()
2492            .push_root(HydroRoot::ForEach {
2493                input: Box::new(HydroNode::Unpersist {
2494                    inner: Box::new(self.ir_node.into_inner()),
2495                    metadata: metadata.clone(),
2496                }),
2497                f,
2498                op_metadata: HydroIrOpMetadata::new(),
2499            });
2500    }
2501
2502    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2503    /// TCP socket to some other server. You should _not_ use this API for interacting with
2504    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2505    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2506    /// interaction with asynchronous sinks.
2507    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2508    where
2509        S: 'a + futures::Sink<T> + Unpin,
2510    {
2511        self.location
2512            .flow_state()
2513            .borrow_mut()
2514            .push_root(HydroRoot::DestSink {
2515                sink: sink.splice_typed_ctx(&self.location).into(),
2516                input: Box::new(self.ir_node.into_inner()),
2517                op_metadata: HydroIrOpMetadata::new(),
2518            });
2519    }
2520}
2521
2522impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2523where
2524    L: Location<'a>,
2525{
2526    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2527    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2528    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2529        Stream::new(
2530            self.location.outer().clone(),
2531            HydroNode::Persist {
2532                inner: Box::new(self.ir_node.into_inner()),
2533                metadata: self.location.new_node_metadata::<T>(),
2534            },
2535        )
2536    }
2537
2538    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2539    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2540    ///
2541    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2542    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2543    /// stream's [`Tick`] context.
2544    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2545        Stream::new(
2546            Atomic {
2547                tick: self.location.clone(),
2548            },
2549            HydroNode::Persist {
2550                inner: Box::new(self.ir_node.into_inner()),
2551                metadata: self.location.new_node_metadata::<T>(),
2552            },
2553        )
2554    }
2555
2556    /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2557    ///
2558    /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2559    /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2560    /// careful when using this operator, as its memory usage will grow linearly over time since it
2561    /// must store its inputs indefinitely.
2562    ///
2563    /// # Example
2564    /// ```rust
2565    /// # use hydro_lang::prelude::*;
2566    /// # use futures::StreamExt;
2567    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2568    /// let tick = process.tick();
2569    /// // ticks are lazy by default, forces the second tick to run
2570    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2571    ///
2572    /// let batch_first_tick = process
2573    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2574    ///   .batch(&tick, nondet!(/** test */));
2575    /// let batch_second_tick = process
2576    ///   .source_iter(q!(vec![5, 6, 7, 8]))
2577    ///   .batch(&tick, nondet!(/** test */))
2578    ///   .defer_tick(); // appears on the second tick
2579    /// batch_first_tick.chain(batch_second_tick)
2580    ///   .persist()
2581    ///   .all_ticks()
2582    /// # }, |mut stream| async move {
2583    /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2584    /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2585    /// #     assert_eq!(stream.next().await.unwrap(), w);
2586    /// # }
2587    /// # }));
2588    /// ```
2589    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2590    where
2591        T: Clone,
2592    {
2593        Stream::new(
2594            self.location.clone(),
2595            HydroNode::Persist {
2596                inner: Box::new(self.ir_node.into_inner()),
2597                metadata: self.location.new_node_metadata::<T>(),
2598            },
2599        )
2600    }
2601
2602    #[expect(missing_docs, reason = "TODO")]
2603    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2604        Stream::new(
2605            self.location.clone(),
2606            HydroNode::DeferTick {
2607                input: Box::new(self.ir_node.into_inner()),
2608                metadata: self.location.new_node_metadata::<T>(),
2609            },
2610        )
2611    }
2612}
2613
2614#[cfg(test)]
2615mod tests {
2616    use futures::{SinkExt, StreamExt};
2617    use hydro_deploy::Deployment;
2618    use serde::{Deserialize, Serialize};
2619    use stageleft::q;
2620
2621    use crate::compile::builder::FlowBuilder;
2622    use crate::location::Location;
2623
2624    mod backtrace_chained_ops;
2625
2626    struct P1 {}
2627    struct P2 {}
2628
2629    #[derive(Serialize, Deserialize, Debug)]
2630    struct SendOverNetwork {
2631        n: u32,
2632    }
2633
2634    #[tokio::test]
2635    async fn first_ten_distributed() {
2636        let mut deployment = Deployment::new();
2637
2638        let flow = FlowBuilder::new();
2639        let first_node = flow.process::<P1>();
2640        let second_node = flow.process::<P2>();
2641        let external = flow.external::<P2>();
2642
2643        let numbers = first_node.source_iter(q!(0..10));
2644        let out_port = numbers
2645            .map(q!(|n| SendOverNetwork { n }))
2646            .send_bincode(&second_node)
2647            .send_bincode_external(&external);
2648
2649        let nodes = flow
2650            .with_process(&first_node, deployment.Localhost())
2651            .with_process(&second_node, deployment.Localhost())
2652            .with_external(&external, deployment.Localhost())
2653            .deploy(&mut deployment);
2654
2655        deployment.deploy().await.unwrap();
2656
2657        let mut external_out = nodes.connect_source_bincode(out_port).await;
2658
2659        deployment.start().await.unwrap();
2660
2661        for i in 0..10 {
2662            assert_eq!(external_out.next().await.unwrap().n, i);
2663        }
2664    }
2665
2666    #[tokio::test]
2667    async fn first_cardinality() {
2668        let mut deployment = Deployment::new();
2669
2670        let flow = FlowBuilder::new();
2671        let node = flow.process::<()>();
2672        let external = flow.external::<()>();
2673
2674        let node_tick = node.tick();
2675        let count = node_tick
2676            .singleton(q!([1, 2, 3]))
2677            .into_stream()
2678            .flatten_ordered()
2679            .first()
2680            .into_stream()
2681            .count()
2682            .all_ticks()
2683            .send_bincode_external(&external);
2684
2685        let nodes = flow
2686            .with_process(&node, deployment.Localhost())
2687            .with_external(&external, deployment.Localhost())
2688            .deploy(&mut deployment);
2689
2690        deployment.deploy().await.unwrap();
2691
2692        let mut external_out = nodes.connect_source_bincode(count).await;
2693
2694        deployment.start().await.unwrap();
2695
2696        assert_eq!(external_out.next().await.unwrap(), 1);
2697    }
2698
2699    #[tokio::test]
2700    async fn unbounded_scan_remembers_state() {
2701        let mut deployment = Deployment::new();
2702
2703        let flow = FlowBuilder::new();
2704        let node = flow.process::<()>();
2705        let external = flow.external::<()>();
2706
2707        let (input_port, input) = node.source_external_bincode(&external);
2708        let out = input
2709            .scan(
2710                q!(|| 0),
2711                q!(|acc, v| {
2712                    *acc += v;
2713                    Some(*acc)
2714                }),
2715            )
2716            .send_bincode_external(&external);
2717
2718        let nodes = flow
2719            .with_process(&node, deployment.Localhost())
2720            .with_external(&external, deployment.Localhost())
2721            .deploy(&mut deployment);
2722
2723        deployment.deploy().await.unwrap();
2724
2725        let mut external_in = nodes.connect_sink_bincode(input_port).await;
2726        let mut external_out = nodes.connect_source_bincode(out).await;
2727
2728        deployment.start().await.unwrap();
2729
2730        external_in.send(1).await.unwrap();
2731        assert_eq!(external_out.next().await.unwrap(), 1);
2732
2733        external_in.send(2).await.unwrap();
2734        assert_eq!(external_out.next().await.unwrap(), 3);
2735    }
2736}