hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use syn::parse_quote;
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, Unbounded};
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, DeferTick, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27
28pub mod networking;
29
30/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
31#[sealed::sealed]
32pub trait Ordering:
33 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
34{
35}
36
37/// Marks the stream as being totally ordered, which means that there are
38/// no sources of non-determinism (other than intentional ones) that will
39/// affect the order of elements.
40pub enum TotalOrder {}
41
42#[sealed::sealed]
43impl Ordering for TotalOrder {}
44
45/// Marks the stream as having no order, which means that the order of
46/// elements may be affected by non-determinism.
47///
48/// This restricts certain operators, such as `fold` and `reduce`, to only
49/// be used with commutative aggregation functions.
50pub enum NoOrder {}
51
52#[sealed::sealed]
53impl Ordering for NoOrder {}
54
55/// Helper trait for determining the weakest of two orderings.
56#[sealed::sealed]
57pub trait MinOrder<Other: ?Sized> {
58 /// The weaker of the two orderings.
59 type Min: Ordering;
60}
61
62#[sealed::sealed]
63impl MinOrder<NoOrder> for TotalOrder {
64 type Min = NoOrder;
65}
66
67#[sealed::sealed]
68impl MinOrder<TotalOrder> for TotalOrder {
69 type Min = TotalOrder;
70}
71
72#[sealed::sealed]
73impl MinOrder<TotalOrder> for NoOrder {
74 type Min = NoOrder;
75}
76
77#[sealed::sealed]
78impl MinOrder<NoOrder> for NoOrder {
79 type Min = NoOrder;
80}
81
82/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
83#[sealed::sealed]
84pub trait Retries:
85 MinRetries<Self, Min = Self>
86 + MinRetries<ExactlyOnce, Min = Self>
87 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
88{
89}
90
91/// Marks the stream as having deterministic message cardinality, with no
92/// possibility of duplicates.
93pub enum ExactlyOnce {}
94
95#[sealed::sealed]
96impl Retries for ExactlyOnce {}
97
98/// Marks the stream as having non-deterministic message cardinality, which
99/// means that duplicates may occur, but messages will not be dropped.
100pub enum AtLeastOnce {}
101
102#[sealed::sealed]
103impl Retries for AtLeastOnce {}
104
105/// Helper trait for determining the weakest of two retry guarantees.
106#[sealed::sealed]
107pub trait MinRetries<Other: ?Sized> {
108 /// The weaker of the two retry guarantees.
109 type Min: Retries;
110}
111
112#[sealed::sealed]
113impl MinRetries<AtLeastOnce> for ExactlyOnce {
114 type Min = AtLeastOnce;
115}
116
117#[sealed::sealed]
118impl MinRetries<ExactlyOnce> for ExactlyOnce {
119 type Min = ExactlyOnce;
120}
121
122#[sealed::sealed]
123impl MinRetries<ExactlyOnce> for AtLeastOnce {
124 type Min = AtLeastOnce;
125}
126
127#[sealed::sealed]
128impl MinRetries<AtLeastOnce> for AtLeastOnce {
129 type Min = AtLeastOnce;
130}
131
132/// Streaming sequence of elements with type `Type`.
133///
134/// This live collection represents a growing sequence of elements, with new elements being
135/// asynchronously appended to the end of the sequence. This can be used to model the arrival
136/// of network input, such as API requests, or streaming ingestion.
137///
138/// By default, all streams have deterministic ordering and each element is materialized exactly
139/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
140/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
141/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
142///
143/// Type Parameters:
144/// - `Type`: the type of elements in the stream
145/// - `Loc`: the location where the stream is being materialized
146/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
147/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
148/// (default is [`TotalOrder`])
149/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
150/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
151pub struct Stream<
152 Type,
153 Loc,
154 Bound: Boundedness,
155 Order: Ordering = TotalOrder,
156 Retry: Retries = ExactlyOnce,
157> {
158 pub(crate) location: Loc,
159 pub(crate) ir_node: RefCell<HydroNode>,
160
161 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
162}
163
164impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
165 for Stream<T, L, Unbounded, O, R>
166where
167 L: Location<'a>,
168{
169 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
170 Stream {
171 location: stream.location,
172 ir_node: stream.ir_node,
173 _phantom: PhantomData,
174 }
175 }
176}
177
178impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
179 for Stream<T, L, B, NoOrder, R>
180where
181 L: Location<'a>,
182{
183 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
184 Stream {
185 location: stream.location,
186 ir_node: stream.ir_node,
187 _phantom: PhantomData,
188 }
189 }
190}
191
192impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
193 for Stream<T, L, B, O, AtLeastOnce>
194where
195 L: Location<'a>,
196{
197 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
198 Stream {
199 location: stream.location,
200 ir_node: stream.ir_node,
201 _phantom: PhantomData,
202 }
203 }
204}
205
206impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
207where
208 L: Location<'a>,
209{
210 fn defer_tick(self) -> Self {
211 Stream::defer_tick(self)
212 }
213}
214
215impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
216 for Stream<T, Tick<L>, Bounded, O, R>
217where
218 L: Location<'a>,
219{
220 type Location = Tick<L>;
221
222 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
223 Stream::new(
224 location.clone(),
225 HydroNode::CycleSource {
226 ident,
227 metadata: location.new_node_metadata::<T>(),
228 },
229 )
230 }
231}
232
233impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
234 for Stream<T, Tick<L>, Bounded, O, R>
235where
236 L: Location<'a>,
237{
238 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
239 assert_eq!(
240 Location::id(&self.location),
241 expected_location,
242 "locations do not match"
243 );
244 self.location
245 .flow_state()
246 .borrow_mut()
247 .push_root(HydroRoot::CycleSink {
248 ident,
249 input: Box::new(self.ir_node.into_inner()),
250 out_location: Location::id(&self.location),
251 op_metadata: HydroIrOpMetadata::new(),
252 });
253 }
254}
255
256impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
257 for Stream<T, L, B, O, R>
258where
259 L: Location<'a> + NoTick,
260{
261 type Location = L;
262
263 fn create_source(ident: syn::Ident, location: L) -> Self {
264 Stream::new(
265 location.clone(),
266 HydroNode::Persist {
267 inner: Box::new(HydroNode::CycleSource {
268 ident,
269 metadata: location.new_node_metadata::<T>(),
270 }),
271 metadata: location.new_node_metadata::<T>(),
272 },
273 )
274 }
275}
276
277impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
278 for Stream<T, L, B, O, R>
279where
280 L: Location<'a> + NoTick,
281{
282 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
283 assert_eq!(
284 Location::id(&self.location),
285 expected_location,
286 "locations do not match"
287 );
288 let metadata = self.location.new_node_metadata::<T>();
289 self.location
290 .flow_state()
291 .borrow_mut()
292 .push_root(HydroRoot::CycleSink {
293 ident,
294 input: Box::new(HydroNode::Unpersist {
295 inner: Box::new(self.ir_node.into_inner()),
296 metadata: metadata.clone(),
297 }),
298 out_location: Location::id(&self.location),
299 op_metadata: HydroIrOpMetadata::new(),
300 });
301 }
302}
303
304impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
305where
306 T: Clone,
307 L: Location<'a>,
308{
309 fn clone(&self) -> Self {
310 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
311 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
312 *self.ir_node.borrow_mut() = HydroNode::Tee {
313 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
314 metadata: self.location.new_node_metadata::<T>(),
315 };
316 }
317
318 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
319 Stream {
320 location: self.location.clone(),
321 ir_node: HydroNode::Tee {
322 inner: TeeNode(inner.0.clone()),
323 metadata: metadata.clone(),
324 }
325 .into(),
326 _phantom: PhantomData,
327 }
328 } else {
329 unreachable!()
330 }
331 }
332}
333
334impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
335where
336 L: Location<'a>,
337{
338 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
339 Stream {
340 location,
341 ir_node: RefCell::new(ir_node),
342 _phantom: PhantomData,
343 }
344 }
345
346 /// Produces a stream based on invoking `f` on each element.
347 /// If you do not want to modify the stream and instead only want to view
348 /// each item use [`Stream::inspect`] instead.
349 ///
350 /// # Example
351 /// ```rust
352 /// # use hydro_lang::prelude::*;
353 /// # use futures::StreamExt;
354 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
355 /// let words = process.source_iter(q!(vec!["hello", "world"]));
356 /// words.map(q!(|x| x.to_uppercase()))
357 /// # }, |mut stream| async move {
358 /// # for w in vec!["HELLO", "WORLD"] {
359 /// # assert_eq!(stream.next().await.unwrap(), w);
360 /// # }
361 /// # }));
362 /// ```
363 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
364 where
365 F: Fn(T) -> U + 'a,
366 {
367 let f = f.splice_fn1_ctx(&self.location).into();
368 Stream::new(
369 self.location.clone(),
370 HydroNode::Map {
371 f,
372 input: Box::new(self.ir_node.into_inner()),
373 metadata: self.location.new_node_metadata::<U>(),
374 },
375 )
376 }
377
378 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
379 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
380 /// for the output type `U` must produce items in a **deterministic** order.
381 ///
382 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
383 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
384 ///
385 /// # Example
386 /// ```rust
387 /// # use hydro_lang::prelude::*;
388 /// # use futures::StreamExt;
389 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
390 /// process
391 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
392 /// .flat_map_ordered(q!(|x| x))
393 /// # }, |mut stream| async move {
394 /// // 1, 2, 3, 4
395 /// # for w in (1..5) {
396 /// # assert_eq!(stream.next().await.unwrap(), w);
397 /// # }
398 /// # }));
399 /// ```
400 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
401 where
402 I: IntoIterator<Item = U>,
403 F: Fn(T) -> I + 'a,
404 {
405 let f = f.splice_fn1_ctx(&self.location).into();
406 Stream::new(
407 self.location.clone(),
408 HydroNode::FlatMap {
409 f,
410 input: Box::new(self.ir_node.into_inner()),
411 metadata: self.location.new_node_metadata::<U>(),
412 },
413 )
414 }
415
416 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
417 /// for the output type `U` to produce items in any order.
418 ///
419 /// # Example
420 /// ```rust
421 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
422 /// # use futures::StreamExt;
423 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
424 /// process
425 /// .source_iter(q!(vec![
426 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
427 /// std::collections::HashSet::from_iter(vec![3, 4]),
428 /// ]))
429 /// .flat_map_unordered(q!(|x| x))
430 /// # }, |mut stream| async move {
431 /// // 1, 2, 3, 4, but in no particular order
432 /// # let mut results = Vec::new();
433 /// # for w in (1..5) {
434 /// # results.push(stream.next().await.unwrap());
435 /// # }
436 /// # results.sort();
437 /// # assert_eq!(results, vec![1, 2, 3, 4]);
438 /// # }));
439 /// ```
440 pub fn flat_map_unordered<U, I, F>(
441 self,
442 f: impl IntoQuotedMut<'a, F, L>,
443 ) -> Stream<U, L, B, NoOrder, R>
444 where
445 I: IntoIterator<Item = U>,
446 F: Fn(T) -> I + 'a,
447 {
448 let f = f.splice_fn1_ctx(&self.location).into();
449 Stream::new(
450 self.location.clone(),
451 HydroNode::FlatMap {
452 f,
453 input: Box::new(self.ir_node.into_inner()),
454 metadata: self.location.new_node_metadata::<U>(),
455 },
456 )
457 }
458
459 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
460 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
461 ///
462 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
463 /// not deterministic, use [`Stream::flatten_unordered`] instead.
464 ///
465 /// ```rust
466 /// # use hydro_lang::prelude::*;
467 /// # use futures::StreamExt;
468 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
469 /// process
470 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
471 /// .flatten_ordered()
472 /// # }, |mut stream| async move {
473 /// // 1, 2, 3, 4
474 /// # for w in (1..5) {
475 /// # assert_eq!(stream.next().await.unwrap(), w);
476 /// # }
477 /// # }));
478 /// ```
479 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
480 where
481 T: IntoIterator<Item = U>,
482 {
483 self.flat_map_ordered(q!(|d| d))
484 }
485
486 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
487 /// for the element type `T` to produce items in any order.
488 ///
489 /// # Example
490 /// ```rust
491 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
492 /// # use futures::StreamExt;
493 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
494 /// process
495 /// .source_iter(q!(vec![
496 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
497 /// std::collections::HashSet::from_iter(vec![3, 4]),
498 /// ]))
499 /// .flatten_unordered()
500 /// # }, |mut stream| async move {
501 /// // 1, 2, 3, 4, but in no particular order
502 /// # let mut results = Vec::new();
503 /// # for w in (1..5) {
504 /// # results.push(stream.next().await.unwrap());
505 /// # }
506 /// # results.sort();
507 /// # assert_eq!(results, vec![1, 2, 3, 4]);
508 /// # }));
509 /// ```
510 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
511 where
512 T: IntoIterator<Item = U>,
513 {
514 self.flat_map_unordered(q!(|d| d))
515 }
516
517 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
518 /// `f`, preserving the order of the elements.
519 ///
520 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
521 /// not modify or take ownership of the values. If you need to modify the values while filtering
522 /// use [`Stream::filter_map`] instead.
523 ///
524 /// # Example
525 /// ```rust
526 /// # use hydro_lang::prelude::*;
527 /// # use futures::StreamExt;
528 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
529 /// process
530 /// .source_iter(q!(vec![1, 2, 3, 4]))
531 /// .filter(q!(|&x| x > 2))
532 /// # }, |mut stream| async move {
533 /// // 3, 4
534 /// # for w in (3..5) {
535 /// # assert_eq!(stream.next().await.unwrap(), w);
536 /// # }
537 /// # }));
538 /// ```
539 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
540 where
541 F: Fn(&T) -> bool + 'a,
542 {
543 let f = f.splice_fn1_borrow_ctx(&self.location).into();
544 Stream::new(
545 self.location.clone(),
546 HydroNode::Filter {
547 f,
548 input: Box::new(self.ir_node.into_inner()),
549 metadata: self.location.new_node_metadata::<T>(),
550 },
551 )
552 }
553
554 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
555 ///
556 /// # Example
557 /// ```rust
558 /// # use hydro_lang::prelude::*;
559 /// # use futures::StreamExt;
560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561 /// process
562 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
563 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
564 /// # }, |mut stream| async move {
565 /// // 1, 2
566 /// # for w in (1..3) {
567 /// # assert_eq!(stream.next().await.unwrap(), w);
568 /// # }
569 /// # }));
570 /// ```
571 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
572 where
573 F: Fn(T) -> Option<U> + 'a,
574 {
575 let f = f.splice_fn1_ctx(&self.location).into();
576 Stream::new(
577 self.location.clone(),
578 HydroNode::FilterMap {
579 f,
580 input: Box::new(self.ir_node.into_inner()),
581 metadata: self.location.new_node_metadata::<U>(),
582 },
583 )
584 }
585
586 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
587 /// where `x` is the final value of `other`, a bounded [`Singleton`].
588 ///
589 /// # Example
590 /// ```rust
591 /// # use hydro_lang::prelude::*;
592 /// # use futures::StreamExt;
593 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594 /// let tick = process.tick();
595 /// let batch = process
596 /// .source_iter(q!(vec![1, 2, 3, 4]))
597 /// .batch(&tick, nondet!(/** test */));
598 /// let count = batch.clone().count(); // `count()` returns a singleton
599 /// batch.cross_singleton(count).all_ticks()
600 /// # }, |mut stream| async move {
601 /// // (1, 4), (2, 4), (3, 4), (4, 4)
602 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
603 /// # assert_eq!(stream.next().await.unwrap(), w);
604 /// # }
605 /// # }));
606 /// ```
607 pub fn cross_singleton<O2>(
608 self,
609 other: impl Into<Optional<O2, L, Bounded>>,
610 ) -> Stream<(T, O2), L, B, O, R>
611 where
612 O2: Clone,
613 {
614 let other: Optional<O2, L, Bounded> = other.into();
615 check_matching_location(&self.location, &other.location);
616
617 Stream::new(
618 self.location.clone(),
619 HydroNode::CrossSingleton {
620 left: Box::new(self.ir_node.into_inner()),
621 right: Box::new(other.ir_node.into_inner()),
622 metadata: self.location.new_node_metadata::<(T, O2)>(),
623 },
624 )
625 }
626
627 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
628 ///
629 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
630 /// leader of a cluster.
631 ///
632 /// # Example
633 /// ```rust
634 /// # use hydro_lang::prelude::*;
635 /// # use futures::StreamExt;
636 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637 /// let tick = process.tick();
638 /// // ticks are lazy by default, forces the second tick to run
639 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640 ///
641 /// let batch_first_tick = process
642 /// .source_iter(q!(vec![1, 2, 3, 4]))
643 /// .batch(&tick, nondet!(/** test */));
644 /// let batch_second_tick = process
645 /// .source_iter(q!(vec![5, 6, 7, 8]))
646 /// .batch(&tick, nondet!(/** test */))
647 /// .defer_tick(); // appears on the second tick
648 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
649 /// batch_first_tick.chain(batch_second_tick)
650 /// .filter_if_some(some_on_first_tick)
651 /// .all_ticks()
652 /// # }, |mut stream| async move {
653 /// // [1, 2, 3, 4]
654 /// # for w in vec![1, 2, 3, 4] {
655 /// # assert_eq!(stream.next().await.unwrap(), w);
656 /// # }
657 /// # }));
658 /// ```
659 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
660 self.cross_singleton(signal.map(q!(|_u| ())))
661 .map(q!(|(d, _signal)| d))
662 }
663
664 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
665 ///
666 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
667 /// some local state.
668 ///
669 /// # Example
670 /// ```rust
671 /// # use hydro_lang::prelude::*;
672 /// # use futures::StreamExt;
673 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674 /// let tick = process.tick();
675 /// // ticks are lazy by default, forces the second tick to run
676 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
677 ///
678 /// let batch_first_tick = process
679 /// .source_iter(q!(vec![1, 2, 3, 4]))
680 /// .batch(&tick, nondet!(/** test */));
681 /// let batch_second_tick = process
682 /// .source_iter(q!(vec![5, 6, 7, 8]))
683 /// .batch(&tick, nondet!(/** test */))
684 /// .defer_tick(); // appears on the second tick
685 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
686 /// batch_first_tick.chain(batch_second_tick)
687 /// .filter_if_none(some_on_first_tick)
688 /// .all_ticks()
689 /// # }, |mut stream| async move {
690 /// // [5, 6, 7, 8]
691 /// # for w in vec![5, 6, 7, 8] {
692 /// # assert_eq!(stream.next().await.unwrap(), w);
693 /// # }
694 /// # }));
695 /// ```
696 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
697 self.filter_if_some(
698 other
699 .map(q!(|_| ()))
700 .into_singleton()
701 .filter(q!(|o| o.is_none())),
702 )
703 }
704
705 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
706 /// tupled pairs in a non-deterministic order.
707 ///
708 /// # Example
709 /// ```rust
710 /// # use hydro_lang::prelude::*;
711 /// # use std::collections::HashSet;
712 /// # use futures::StreamExt;
713 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714 /// let tick = process.tick();
715 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
716 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
717 /// stream1.cross_product(stream2)
718 /// # }, |mut stream| async move {
719 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
720 /// # stream.map(|i| assert!(expected.contains(&i)));
721 /// # }));
722 /// ```
723 pub fn cross_product<T2, O2: Ordering>(
724 self,
725 other: Stream<T2, L, B, O2, R>,
726 ) -> Stream<(T, T2), L, B, NoOrder, R>
727 where
728 T: Clone,
729 T2: Clone,
730 {
731 check_matching_location(&self.location, &other.location);
732
733 Stream::new(
734 self.location.clone(),
735 HydroNode::CrossProduct {
736 left: Box::new(self.ir_node.into_inner()),
737 right: Box::new(other.ir_node.into_inner()),
738 metadata: self.location.new_node_metadata::<(T, T2)>(),
739 },
740 )
741 }
742
743 /// Takes one stream as input and filters out any duplicate occurrences. The output
744 /// contains all unique values from the input.
745 ///
746 /// # Example
747 /// ```rust
748 /// # use hydro_lang::prelude::*;
749 /// # use futures::StreamExt;
750 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
751 /// let tick = process.tick();
752 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
753 /// # }, |mut stream| async move {
754 /// # for w in vec![1, 2, 3, 4] {
755 /// # assert_eq!(stream.next().await.unwrap(), w);
756 /// # }
757 /// # }));
758 /// ```
759 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
760 where
761 T: Eq + Hash,
762 {
763 Stream::new(
764 self.location.clone(),
765 HydroNode::Unique {
766 input: Box::new(self.ir_node.into_inner()),
767 metadata: self.location.new_node_metadata::<T>(),
768 },
769 )
770 }
771
772 /// Outputs everything in this stream that is *not* contained in the `other` stream.
773 ///
774 /// The `other` stream must be [`Bounded`], since this function will wait until
775 /// all its elements are available before producing any output.
776 /// # Example
777 /// ```rust
778 /// # use hydro_lang::prelude::*;
779 /// # use futures::StreamExt;
780 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
781 /// let tick = process.tick();
782 /// let stream = process
783 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
784 /// .batch(&tick, nondet!(/** test */));
785 /// let batch = process
786 /// .source_iter(q!(vec![1, 2]))
787 /// .batch(&tick, nondet!(/** test */));
788 /// stream.filter_not_in(batch).all_ticks()
789 /// # }, |mut stream| async move {
790 /// # for w in vec![3, 4] {
791 /// # assert_eq!(stream.next().await.unwrap(), w);
792 /// # }
793 /// # }));
794 /// ```
795 pub fn filter_not_in<O2: Ordering>(
796 self,
797 other: Stream<T, L, Bounded, O2, R>,
798 ) -> Stream<T, L, Bounded, O, R>
799 where
800 T: Eq + Hash,
801 {
802 check_matching_location(&self.location, &other.location);
803
804 Stream::new(
805 self.location.clone(),
806 HydroNode::Difference {
807 pos: Box::new(self.ir_node.into_inner()),
808 neg: Box::new(other.ir_node.into_inner()),
809 metadata: self.location.new_node_metadata::<T>(),
810 },
811 )
812 }
813
814 /// An operator which allows you to "inspect" each element of a stream without
815 /// modifying it. The closure `f` is called on a reference to each item. This is
816 /// mainly useful for debugging, and should not be used to generate side-effects.
817 ///
818 /// # Example
819 /// ```rust
820 /// # use hydro_lang::prelude::*;
821 /// # use futures::StreamExt;
822 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
823 /// let nums = process.source_iter(q!(vec![1, 2]));
824 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
825 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
826 /// # }, |mut stream| async move {
827 /// # for w in vec![1, 2] {
828 /// # assert_eq!(stream.next().await.unwrap(), w);
829 /// # }
830 /// # }));
831 /// ```
832 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
833 where
834 F: Fn(&T) + 'a,
835 {
836 let f = f.splice_fn1_borrow_ctx(&self.location).into();
837
838 if L::is_top_level() {
839 Stream::new(
840 self.location.clone(),
841 HydroNode::Persist {
842 inner: Box::new(HydroNode::Inspect {
843 f,
844 input: Box::new(HydroNode::Unpersist {
845 inner: Box::new(self.ir_node.into_inner()),
846 metadata: self.location.new_node_metadata::<T>(),
847 }),
848 metadata: self.location.new_node_metadata::<T>(),
849 }),
850 metadata: self.location.new_node_metadata::<T>(),
851 },
852 )
853 } else {
854 Stream::new(
855 self.location.clone(),
856 HydroNode::Inspect {
857 f,
858 input: Box::new(self.ir_node.into_inner()),
859 metadata: self.location.new_node_metadata::<T>(),
860 },
861 )
862 }
863 }
864
865 /// An operator which allows you to "name" a `HydroNode`.
866 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
867 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
868 {
869 let mut node = self.ir_node.borrow_mut();
870 let metadata = node.metadata_mut();
871 metadata.tag = Some(name.to_string());
872 }
873 self
874 }
875
876 /// Explicitly "casts" the stream to a type with a different ordering
877 /// guarantee. Useful in unsafe code where the ordering cannot be proven
878 /// by the type-system.
879 ///
880 /// # Non-Determinism
881 /// This function is used as an escape hatch, and any mistakes in the
882 /// provided ordering guarantee will propagate into the guarantees
883 /// for the rest of the program.
884 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
885 Stream::new(self.location, self.ir_node.into_inner())
886 }
887
888 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
889 /// which is always safe because that is the weakest possible guarantee.
890 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
891 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
892 self.assume_ordering::<NoOrder>(nondet)
893 }
894
895 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
896 /// enforcing that `O2` is weaker than the input ordering guarantee.
897 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
898 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
899 self.assume_ordering::<O2>(nondet)
900 }
901
902 /// Explicitly "casts" the stream to a type with a different retries
903 /// guarantee. Useful in unsafe code where the lack of retries cannot
904 /// be proven by the type-system.
905 ///
906 /// # Non-Determinism
907 /// This function is used as an escape hatch, and any mistakes in the
908 /// provided retries guarantee will propagate into the guarantees
909 /// for the rest of the program.
910 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
911 Stream::new(self.location, self.ir_node.into_inner())
912 }
913
914 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
915 /// which is always safe because that is the weakest possible guarantee.
916 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
917 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
918 self.assume_retries::<AtLeastOnce>(nondet)
919 }
920
921 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
922 /// enforcing that `R2` is weaker than the input retries guarantee.
923 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
924 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
925 self.assume_retries::<R2>(nondet)
926 }
927}
928
929impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
930where
931 L: Location<'a>,
932{
933 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
934 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
935 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
936 self.assume_retries(
937 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
938 )
939 }
940}
941
942impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
943where
944 L: Location<'a>,
945{
946 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
947 ///
948 /// # Example
949 /// ```rust
950 /// # use hydro_lang::prelude::*;
951 /// # use futures::StreamExt;
952 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953 /// process.source_iter(q!(&[1, 2, 3])).cloned()
954 /// # }, |mut stream| async move {
955 /// // 1, 2, 3
956 /// # for w in vec![1, 2, 3] {
957 /// # assert_eq!(stream.next().await.unwrap(), w);
958 /// # }
959 /// # }));
960 /// ```
961 pub fn cloned(self) -> Stream<T, L, B, O, R>
962 where
963 T: Clone,
964 {
965 self.map(q!(|d| d.clone()))
966 }
967}
968
969impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
970where
971 L: Location<'a>,
972{
973 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
974 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
975 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
976 ///
977 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
978 /// and there may be duplicates.
979 ///
980 /// # Example
981 /// ```rust
982 /// # use hydro_lang::prelude::*;
983 /// # use futures::StreamExt;
984 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
985 /// let tick = process.tick();
986 /// let bools = process.source_iter(q!(vec![false, true, false]));
987 /// let batch = bools.batch(&tick, nondet!(/** test */));
988 /// batch
989 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
990 /// .all_ticks()
991 /// # }, |mut stream| async move {
992 /// // true
993 /// # assert_eq!(stream.next().await.unwrap(), true);
994 /// # }));
995 /// ```
996 pub fn fold_commutative_idempotent<A, I, F>(
997 self,
998 init: impl IntoQuotedMut<'a, I, L>,
999 comb: impl IntoQuotedMut<'a, F, L>,
1000 ) -> Singleton<A, L, B>
1001 where
1002 I: Fn() -> A + 'a,
1003 F: Fn(&mut A, T),
1004 {
1005 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1006 self.assume_ordering(nondet)
1007 .assume_retries(nondet)
1008 .fold(init, comb)
1009 }
1010
1011 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1012 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1013 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1014 /// reference, so that it can be modified in place.
1015 ///
1016 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1017 /// and there may be duplicates.
1018 ///
1019 /// # Example
1020 /// ```rust
1021 /// # use hydro_lang::prelude::*;
1022 /// # use futures::StreamExt;
1023 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1024 /// let tick = process.tick();
1025 /// let bools = process.source_iter(q!(vec![false, true, false]));
1026 /// let batch = bools.batch(&tick, nondet!(/** test */));
1027 /// batch
1028 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1029 /// .all_ticks()
1030 /// # }, |mut stream| async move {
1031 /// // true
1032 /// # assert_eq!(stream.next().await.unwrap(), true);
1033 /// # }));
1034 /// ```
1035 pub fn reduce_commutative_idempotent<F>(
1036 self,
1037 comb: impl IntoQuotedMut<'a, F, L>,
1038 ) -> Optional<T, L, B>
1039 where
1040 F: Fn(&mut T, T) + 'a,
1041 {
1042 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1043 self.assume_ordering(nondet)
1044 .assume_retries(nondet)
1045 .reduce(comb)
1046 }
1047
1048 /// Computes the maximum element in the stream as an [`Optional`], which
1049 /// will be empty until the first element in the input arrives.
1050 ///
1051 /// # Example
1052 /// ```rust
1053 /// # use hydro_lang::prelude::*;
1054 /// # use futures::StreamExt;
1055 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1056 /// let tick = process.tick();
1057 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1058 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1059 /// batch.max().all_ticks()
1060 /// # }, |mut stream| async move {
1061 /// // 4
1062 /// # assert_eq!(stream.next().await.unwrap(), 4);
1063 /// # }));
1064 /// ```
1065 pub fn max(self) -> Optional<T, L, B>
1066 where
1067 T: Ord,
1068 {
1069 self.reduce_commutative_idempotent(q!(|curr, new| {
1070 if new > *curr {
1071 *curr = new;
1072 }
1073 }))
1074 }
1075
1076 /// Computes the maximum element in the stream as an [`Optional`], where the
1077 /// maximum is determined according to the `key` function. The [`Optional`] will
1078 /// be empty until the first element in the input arrives.
1079 ///
1080 /// # Example
1081 /// ```rust
1082 /// # use hydro_lang::prelude::*;
1083 /// # use futures::StreamExt;
1084 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1085 /// let tick = process.tick();
1086 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1087 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1088 /// batch.max_by_key(q!(|x| -x)).all_ticks()
1089 /// # }, |mut stream| async move {
1090 /// // 1
1091 /// # assert_eq!(stream.next().await.unwrap(), 1);
1092 /// # }));
1093 /// ```
1094 pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1095 where
1096 K: Ord,
1097 F: Fn(&T) -> K + 'a,
1098 {
1099 let f = key.splice_fn1_borrow_ctx(&self.location);
1100
1101 let wrapped: syn::Expr = parse_quote!({
1102 let key_fn = #f;
1103 move |curr, new| {
1104 if key_fn(&new) > key_fn(&*curr) {
1105 *curr = new;
1106 }
1107 }
1108 });
1109
1110 let mut core = HydroNode::Reduce {
1111 f: wrapped.into(),
1112 input: Box::new(self.ir_node.into_inner()),
1113 metadata: self.location.new_node_metadata::<T>(),
1114 };
1115
1116 if L::is_top_level() {
1117 core = HydroNode::Persist {
1118 inner: Box::new(core),
1119 metadata: self.location.new_node_metadata::<T>(),
1120 };
1121 }
1122
1123 Optional::new(self.location, core)
1124 }
1125
1126 /// Computes the minimum element in the stream as an [`Optional`], which
1127 /// will be empty until the first element in the input arrives.
1128 ///
1129 /// # Example
1130 /// ```rust
1131 /// # use hydro_lang::prelude::*;
1132 /// # use futures::StreamExt;
1133 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1134 /// let tick = process.tick();
1135 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1136 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1137 /// batch.min().all_ticks()
1138 /// # }, |mut stream| async move {
1139 /// // 1
1140 /// # assert_eq!(stream.next().await.unwrap(), 1);
1141 /// # }));
1142 /// ```
1143 pub fn min(self) -> Optional<T, L, B>
1144 where
1145 T: Ord,
1146 {
1147 self.reduce_commutative_idempotent(q!(|curr, new| {
1148 if new < *curr {
1149 *curr = new;
1150 }
1151 }))
1152 }
1153}
1154
1155impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1156where
1157 L: Location<'a>,
1158{
1159 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1160 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1161 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1162 ///
1163 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1164 ///
1165 /// # Example
1166 /// ```rust
1167 /// # use hydro_lang::prelude::*;
1168 /// # use futures::StreamExt;
1169 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1170 /// let tick = process.tick();
1171 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1172 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1173 /// batch
1174 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1175 /// .all_ticks()
1176 /// # }, |mut stream| async move {
1177 /// // 10
1178 /// # assert_eq!(stream.next().await.unwrap(), 10);
1179 /// # }));
1180 /// ```
1181 pub fn fold_commutative<A, I, F>(
1182 self,
1183 init: impl IntoQuotedMut<'a, I, L>,
1184 comb: impl IntoQuotedMut<'a, F, L>,
1185 ) -> Singleton<A, L, B>
1186 where
1187 I: Fn() -> A + 'a,
1188 F: Fn(&mut A, T),
1189 {
1190 let nondet = nondet!(/** the combinator function is commutative */);
1191 self.assume_ordering(nondet).fold(init, comb)
1192 }
1193
1194 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1195 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1196 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1197 /// reference, so that it can be modified in place.
1198 ///
1199 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1200 ///
1201 /// # Example
1202 /// ```rust
1203 /// # use hydro_lang::prelude::*;
1204 /// # use futures::StreamExt;
1205 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1206 /// let tick = process.tick();
1207 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1208 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1209 /// batch
1210 /// .reduce_commutative(q!(|curr, new| *curr += new))
1211 /// .all_ticks()
1212 /// # }, |mut stream| async move {
1213 /// // 10
1214 /// # assert_eq!(stream.next().await.unwrap(), 10);
1215 /// # }));
1216 /// ```
1217 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1218 where
1219 F: Fn(&mut T, T) + 'a,
1220 {
1221 let nondet = nondet!(/** the combinator function is commutative */);
1222 self.assume_ordering(nondet).reduce(comb)
1223 }
1224
1225 /// Computes the number of elements in the stream as a [`Singleton`].
1226 ///
1227 /// # Example
1228 /// ```rust
1229 /// # use hydro_lang::prelude::*;
1230 /// # use futures::StreamExt;
1231 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1232 /// let tick = process.tick();
1233 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1234 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1235 /// batch.count().all_ticks()
1236 /// # }, |mut stream| async move {
1237 /// // 4
1238 /// # assert_eq!(stream.next().await.unwrap(), 4);
1239 /// # }));
1240 /// ```
1241 pub fn count(self) -> Singleton<usize, L, B> {
1242 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1243 }
1244}
1245
1246impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1247where
1248 L: Location<'a>,
1249{
1250 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1251 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1252 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1253 ///
1254 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1255 ///
1256 /// # Example
1257 /// ```rust
1258 /// # use hydro_lang::prelude::*;
1259 /// # use futures::StreamExt;
1260 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1261 /// let tick = process.tick();
1262 /// let bools = process.source_iter(q!(vec![false, true, false]));
1263 /// let batch = bools.batch(&tick, nondet!(/** test */));
1264 /// batch
1265 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1266 /// .all_ticks()
1267 /// # }, |mut stream| async move {
1268 /// // true
1269 /// # assert_eq!(stream.next().await.unwrap(), true);
1270 /// # }));
1271 /// ```
1272 pub fn fold_idempotent<A, I, F>(
1273 self,
1274 init: impl IntoQuotedMut<'a, I, L>,
1275 comb: impl IntoQuotedMut<'a, F, L>,
1276 ) -> Singleton<A, L, B>
1277 where
1278 I: Fn() -> A + 'a,
1279 F: Fn(&mut A, T),
1280 {
1281 let nondet = nondet!(/** the combinator function is idempotent */);
1282 self.assume_retries(nondet).fold(init, comb)
1283 }
1284
1285 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1286 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1287 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1288 /// reference, so that it can be modified in place.
1289 ///
1290 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1291 ///
1292 /// # Example
1293 /// ```rust
1294 /// # use hydro_lang::prelude::*;
1295 /// # use futures::StreamExt;
1296 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1297 /// let tick = process.tick();
1298 /// let bools = process.source_iter(q!(vec![false, true, false]));
1299 /// let batch = bools.batch(&tick, nondet!(/** test */));
1300 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1301 /// # }, |mut stream| async move {
1302 /// // true
1303 /// # assert_eq!(stream.next().await.unwrap(), true);
1304 /// # }));
1305 /// ```
1306 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1307 where
1308 F: Fn(&mut T, T) + 'a,
1309 {
1310 let nondet = nondet!(/** the combinator function is idempotent */);
1311 self.assume_retries(nondet).reduce(comb)
1312 }
1313
1314 /// Computes the first element in the stream as an [`Optional`], which
1315 /// will be empty until the first element in the input arrives.
1316 ///
1317 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1318 /// re-ordering of elements may cause the first element to change.
1319 ///
1320 /// # Example
1321 /// ```rust
1322 /// # use hydro_lang::prelude::*;
1323 /// # use futures::StreamExt;
1324 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325 /// let tick = process.tick();
1326 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1327 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1328 /// batch.first().all_ticks()
1329 /// # }, |mut stream| async move {
1330 /// // 1
1331 /// # assert_eq!(stream.next().await.unwrap(), 1);
1332 /// # }));
1333 /// ```
1334 pub fn first(self) -> Optional<T, L, B> {
1335 self.reduce_idempotent(q!(|_, _| {}))
1336 }
1337
1338 /// Computes the last element in the stream as an [`Optional`], which
1339 /// will be empty until an element in the input arrives.
1340 ///
1341 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1342 /// re-ordering of elements may cause the last element to change.
1343 ///
1344 /// # Example
1345 /// ```rust
1346 /// # use hydro_lang::prelude::*;
1347 /// # use futures::StreamExt;
1348 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1349 /// let tick = process.tick();
1350 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1351 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1352 /// batch.last().all_ticks()
1353 /// # }, |mut stream| async move {
1354 /// // 4
1355 /// # assert_eq!(stream.next().await.unwrap(), 4);
1356 /// # }));
1357 /// ```
1358 pub fn last(self) -> Optional<T, L, B> {
1359 self.reduce_idempotent(q!(|curr, new| *curr = new))
1360 }
1361}
1362
1363impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1364where
1365 L: Location<'a>,
1366{
1367 /// Returns a stream with the current count tupled with each element in the input stream.
1368 ///
1369 /// # Example
1370 /// ```rust
1371 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1372 /// # use futures::StreamExt;
1373 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1374 /// let tick = process.tick();
1375 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1376 /// numbers.enumerate()
1377 /// # }, |mut stream| async move {
1378 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1379 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1380 /// # assert_eq!(stream.next().await.unwrap(), w);
1381 /// # }
1382 /// # }));
1383 /// ```
1384 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1385 if L::is_top_level() {
1386 Stream::new(
1387 self.location.clone(),
1388 HydroNode::Persist {
1389 inner: Box::new(HydroNode::Enumerate {
1390 is_static: true,
1391 input: Box::new(HydroNode::Unpersist {
1392 inner: Box::new(self.ir_node.into_inner()),
1393 metadata: self.location.new_node_metadata::<T>(),
1394 }),
1395 metadata: self.location.new_node_metadata::<(usize, T)>(),
1396 }),
1397 metadata: self.location.new_node_metadata::<(usize, T)>(),
1398 },
1399 )
1400 } else {
1401 Stream::new(
1402 self.location.clone(),
1403 HydroNode::Enumerate {
1404 is_static: false,
1405 input: Box::new(self.ir_node.into_inner()),
1406 metadata: self.location.new_node_metadata::<(usize, T)>(),
1407 },
1408 )
1409 }
1410 }
1411
1412 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1413 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1414 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1415 ///
1416 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1417 /// to depend on the order of elements in the stream.
1418 ///
1419 /// # Example
1420 /// ```rust
1421 /// # use hydro_lang::prelude::*;
1422 /// # use futures::StreamExt;
1423 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1424 /// let tick = process.tick();
1425 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1426 /// let batch = words.batch(&tick, nondet!(/** test */));
1427 /// batch
1428 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1429 /// .all_ticks()
1430 /// # }, |mut stream| async move {
1431 /// // "HELLOWORLD"
1432 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1433 /// # }));
1434 /// ```
1435 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1436 self,
1437 init: impl IntoQuotedMut<'a, I, L>,
1438 comb: impl IntoQuotedMut<'a, F, L>,
1439 ) -> Singleton<A, L, B> {
1440 let init = init.splice_fn0_ctx(&self.location).into();
1441 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1442
1443 let mut core = HydroNode::Fold {
1444 init,
1445 acc: comb,
1446 input: Box::new(self.ir_node.into_inner()),
1447 metadata: self.location.new_node_metadata::<A>(),
1448 };
1449
1450 if L::is_top_level() {
1451 // top-level (possibly unbounded) singletons are represented as
1452 // a stream which produces all values from all ticks every tick,
1453 // so Unpersist will always give the lastest aggregation
1454 core = HydroNode::Persist {
1455 inner: Box::new(core),
1456 metadata: self.location.new_node_metadata::<A>(),
1457 };
1458 }
1459
1460 Singleton::new(self.location, core)
1461 }
1462
1463 /// Collects all the elements of this stream into a single [`Vec`] element.
1464 ///
1465 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1466 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1467 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1468 /// the vector at an arbitrary point in time.
1469 ///
1470 /// # Example
1471 /// ```rust
1472 /// # use hydro_lang::prelude::*;
1473 /// # use futures::StreamExt;
1474 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1475 /// let tick = process.tick();
1476 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1477 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1478 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1479 /// # }, |mut stream| async move {
1480 /// // [ vec![1, 2, 3, 4] ]
1481 /// # for w in vec![vec![1, 2, 3, 4]] {
1482 /// # assert_eq!(stream.next().await.unwrap(), w);
1483 /// # }
1484 /// # }));
1485 /// ```
1486 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1487 self.fold(
1488 q!(|| vec![]),
1489 q!(|acc, v| {
1490 acc.push(v);
1491 }),
1492 )
1493 }
1494
1495 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1496 /// and emitting each intermediate result.
1497 ///
1498 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1499 /// containing all intermediate accumulated values. The scan operation can also terminate early
1500 /// by returning `None`.
1501 ///
1502 /// The function takes a mutable reference to the accumulator and the current element, and returns
1503 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1504 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1505 ///
1506 /// # Examples
1507 ///
1508 /// Basic usage - running sum:
1509 /// ```rust
1510 /// # use hydro_lang::prelude::*;
1511 /// # use futures::StreamExt;
1512 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1514 /// q!(|| 0),
1515 /// q!(|acc, x| {
1516 /// *acc += x;
1517 /// Some(*acc)
1518 /// }),
1519 /// )
1520 /// # }, |mut stream| async move {
1521 /// // Output: 1, 3, 6, 10
1522 /// # for w in vec![1, 3, 6, 10] {
1523 /// # assert_eq!(stream.next().await.unwrap(), w);
1524 /// # }
1525 /// # }));
1526 /// ```
1527 ///
1528 /// Early termination example:
1529 /// ```rust
1530 /// # use hydro_lang::prelude::*;
1531 /// # use futures::StreamExt;
1532 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1533 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1534 /// q!(|| 1),
1535 /// q!(|state, x| {
1536 /// *state = *state * x;
1537 /// if *state > 6 {
1538 /// None // Terminate the stream
1539 /// } else {
1540 /// Some(-*state)
1541 /// }
1542 /// }),
1543 /// )
1544 /// # }, |mut stream| async move {
1545 /// // Output: -1, -2, -6
1546 /// # for w in vec![-1, -2, -6] {
1547 /// # assert_eq!(stream.next().await.unwrap(), w);
1548 /// # }
1549 /// # }));
1550 /// ```
1551 pub fn scan<A, U, I, F>(
1552 self,
1553 init: impl IntoQuotedMut<'a, I, L>,
1554 f: impl IntoQuotedMut<'a, F, L>,
1555 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1556 where
1557 I: Fn() -> A + 'a,
1558 F: Fn(&mut A, T) -> Option<U> + 'a,
1559 {
1560 let init = init.splice_fn0_ctx(&self.location).into();
1561 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1562
1563 if L::is_top_level() {
1564 Stream::new(
1565 self.location.clone(),
1566 HydroNode::Persist {
1567 inner: Box::new(HydroNode::Scan {
1568 init,
1569 acc: f,
1570 input: Box::new(self.ir_node.into_inner()),
1571 metadata: self.location.new_node_metadata::<U>(),
1572 }),
1573 metadata: self.location.new_node_metadata::<U>(),
1574 },
1575 )
1576 } else {
1577 Stream::new(
1578 self.location.clone(),
1579 HydroNode::Scan {
1580 init,
1581 acc: f,
1582 input: Box::new(self.ir_node.into_inner()),
1583 metadata: self.location.new_node_metadata::<U>(),
1584 },
1585 )
1586 }
1587 }
1588
1589 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1590 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1591 /// until the first element in the input arrives.
1592 ///
1593 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1594 /// to depend on the order of elements in the stream.
1595 ///
1596 /// # Example
1597 /// ```rust
1598 /// # use hydro_lang::prelude::*;
1599 /// # use futures::StreamExt;
1600 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1601 /// let tick = process.tick();
1602 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1603 /// let batch = words.batch(&tick, nondet!(/** test */));
1604 /// batch
1605 /// .map(q!(|x| x.to_string()))
1606 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1607 /// .all_ticks()
1608 /// # }, |mut stream| async move {
1609 /// // "HELLOWORLD"
1610 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1611 /// # }));
1612 /// ```
1613 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1614 self,
1615 comb: impl IntoQuotedMut<'a, F, L>,
1616 ) -> Optional<T, L, B> {
1617 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1618 let mut core = HydroNode::Reduce {
1619 f,
1620 input: Box::new(self.ir_node.into_inner()),
1621 metadata: self.location.new_node_metadata::<T>(),
1622 };
1623
1624 if L::is_top_level() {
1625 core = HydroNode::Persist {
1626 inner: Box::new(core),
1627 metadata: self.location.new_node_metadata::<T>(),
1628 };
1629 }
1630
1631 Optional::new(self.location, core)
1632 }
1633}
1634
1635impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
1636 Stream<T, L, Unbounded, O, R>
1637{
1638 /// Produces a new stream that interleaves the elements of the two input streams.
1639 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1640 ///
1641 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1642 /// [`Bounded`], you can use [`Stream::chain`] instead.
1643 ///
1644 /// # Example
1645 /// ```rust
1646 /// # use hydro_lang::prelude::*;
1647 /// # use futures::StreamExt;
1648 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1649 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1650 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1651 /// # }, |mut stream| async move {
1652 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1653 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1654 /// # assert_eq!(stream.next().await.unwrap(), w);
1655 /// # }
1656 /// # }));
1657 /// ```
1658 pub fn interleave<O2: Ordering, R2: Retries>(
1659 self,
1660 other: Stream<T, L, Unbounded, O2, R2>,
1661 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1662 where
1663 R: MinRetries<R2>,
1664 {
1665 let tick = self.location.tick();
1666 // Because the outputs are unordered, we can interleave batches from both streams.
1667 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1668 self.batch(&tick, nondet_batch_interleaving)
1669 .weakest_ordering()
1670 .chain(
1671 other
1672 .batch(&tick, nondet_batch_interleaving)
1673 .weakest_ordering(),
1674 )
1675 .all_ticks()
1676 }
1677}
1678
1679impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1680where
1681 L: Location<'a>,
1682{
1683 /// Produces a new stream that emits the input elements in sorted order.
1684 ///
1685 /// The input stream can have any ordering guarantee, but the output stream
1686 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1687 /// elements in the input stream are available, so it requires the input stream
1688 /// to be [`Bounded`].
1689 ///
1690 /// # Example
1691 /// ```rust
1692 /// # use hydro_lang::prelude::*;
1693 /// # use futures::StreamExt;
1694 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1695 /// let tick = process.tick();
1696 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1697 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1698 /// batch.sort().all_ticks()
1699 /// # }, |mut stream| async move {
1700 /// // 1, 2, 3, 4
1701 /// # for w in (1..5) {
1702 /// # assert_eq!(stream.next().await.unwrap(), w);
1703 /// # }
1704 /// # }));
1705 /// ```
1706 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1707 where
1708 T: Ord,
1709 {
1710 Stream::new(
1711 self.location.clone(),
1712 HydroNode::Sort {
1713 input: Box::new(self.ir_node.into_inner()),
1714 metadata: self.location.new_node_metadata::<T>(),
1715 },
1716 )
1717 }
1718
1719 /// Produces a new stream that first emits the elements of the `self` stream,
1720 /// and then emits the elements of the `other` stream. The output stream has
1721 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1722 /// [`TotalOrder`] guarantee.
1723 ///
1724 /// Currently, both input streams must be [`Bounded`]. This operator will block
1725 /// on the first stream until all its elements are available. In a future version,
1726 /// we will relax the requirement on the `other` stream.
1727 ///
1728 /// # Example
1729 /// ```rust
1730 /// # use hydro_lang::prelude::*;
1731 /// # use futures::StreamExt;
1732 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1733 /// let tick = process.tick();
1734 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1735 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1736 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1737 /// # }, |mut stream| async move {
1738 /// // 2, 3, 4, 5, 1, 2, 3, 4
1739 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1740 /// # assert_eq!(stream.next().await.unwrap(), w);
1741 /// # }
1742 /// # }));
1743 /// ```
1744 pub fn chain<O2: Ordering, R2: Retries>(
1745 self,
1746 other: Stream<T, L, Bounded, O2, R2>,
1747 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1748 where
1749 O: MinOrder<O2>,
1750 R: MinRetries<R2>,
1751 {
1752 check_matching_location(&self.location, &other.location);
1753
1754 Stream::new(
1755 self.location.clone(),
1756 HydroNode::Chain {
1757 first: Box::new(self.ir_node.into_inner()),
1758 second: Box::new(other.ir_node.into_inner()),
1759 metadata: self.location.new_node_metadata::<T>(),
1760 },
1761 )
1762 }
1763
1764 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1765 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1766 /// because this is compiled into a nested loop.
1767 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1768 self,
1769 other: Stream<T2, L, Bounded, O2, R>,
1770 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1771 where
1772 T: Clone,
1773 T2: Clone,
1774 {
1775 check_matching_location(&self.location, &other.location);
1776
1777 Stream::new(
1778 self.location.clone(),
1779 HydroNode::CrossProduct {
1780 left: Box::new(self.ir_node.into_inner()),
1781 right: Box::new(other.ir_node.into_inner()),
1782 metadata: self.location.new_node_metadata::<(T, T2)>(),
1783 },
1784 )
1785 }
1786}
1787
1788impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1789where
1790 L: Location<'a>,
1791{
1792 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1793 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1794 /// by equi-joining the two streams on the key attribute `K`.
1795 ///
1796 /// # Example
1797 /// ```rust
1798 /// # use hydro_lang::prelude::*;
1799 /// # use std::collections::HashSet;
1800 /// # use futures::StreamExt;
1801 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1802 /// let tick = process.tick();
1803 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1804 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1805 /// stream1.join(stream2)
1806 /// # }, |mut stream| async move {
1807 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1808 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1809 /// # stream.map(|i| assert!(expected.contains(&i)));
1810 /// # }));
1811 pub fn join<V2, O2: Ordering, R2: Retries>(
1812 self,
1813 n: Stream<(K, V2), L, B, O2, R2>,
1814 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1815 where
1816 K: Eq + Hash,
1817 R: MinRetries<R2>,
1818 {
1819 check_matching_location(&self.location, &n.location);
1820
1821 Stream::new(
1822 self.location.clone(),
1823 HydroNode::Join {
1824 left: Box::new(self.ir_node.into_inner()),
1825 right: Box::new(n.ir_node.into_inner()),
1826 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1827 },
1828 )
1829 }
1830
1831 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1832 /// computes the anti-join of the items in the input -- i.e. returns
1833 /// unique items in the first input that do not have a matching key
1834 /// in the second input.
1835 ///
1836 /// # Example
1837 /// ```rust
1838 /// # use hydro_lang::prelude::*;
1839 /// # use futures::StreamExt;
1840 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1841 /// let tick = process.tick();
1842 /// let stream = process
1843 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1844 /// .batch(&tick, nondet!(/** test */));
1845 /// let batch = process
1846 /// .source_iter(q!(vec![1, 2]))
1847 /// .batch(&tick, nondet!(/** test */));
1848 /// stream.anti_join(batch).all_ticks()
1849 /// # }, |mut stream| async move {
1850 /// # for w in vec![(3, 'c'), (4, 'd')] {
1851 /// # assert_eq!(stream.next().await.unwrap(), w);
1852 /// # }
1853 /// # }));
1854 pub fn anti_join<O2: Ordering, R2: Retries>(
1855 self,
1856 n: Stream<K, L, Bounded, O2, R2>,
1857 ) -> Stream<(K, V1), L, B, O, R>
1858 where
1859 K: Eq + Hash,
1860 {
1861 check_matching_location(&self.location, &n.location);
1862
1863 Stream::new(
1864 self.location.clone(),
1865 HydroNode::AntiJoin {
1866 pos: Box::new(self.ir_node.into_inner()),
1867 neg: Box::new(n.ir_node.into_inner()),
1868 metadata: self.location.new_node_metadata::<(K, V1)>(),
1869 },
1870 )
1871 }
1872}
1873
1874impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1875 Stream<(K, V), L, B, O, R>
1876{
1877 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1878 /// is used as the key and the second element is added to the entries associated with that key.
1879 ///
1880 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1881 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1882 /// performing grouped aggregations, but also for more precise ordering guarantees such as
1883 /// total ordering _within_ each group but no ordering _across_ groups.
1884 ///
1885 /// # Example
1886 /// ```rust
1887 /// # use hydro_lang::prelude::*;
1888 /// # use futures::StreamExt;
1889 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1890 /// process
1891 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1892 /// .into_keyed()
1893 /// # .entries()
1894 /// # }, |mut stream| async move {
1895 /// // { 1: [2, 3], 2: [4] }
1896 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1897 /// # assert_eq!(stream.next().await.unwrap(), w);
1898 /// # }
1899 /// # }));
1900 /// ```
1901 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1902 KeyedStream {
1903 underlying: self.weakest_ordering(),
1904 _phantom_order: Default::default(),
1905 }
1906 }
1907}
1908
1909impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1910where
1911 K: Eq + Hash,
1912 L: Location<'a>,
1913{
1914 #[deprecated = "use .into_keyed().fold(...) instead"]
1915 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1916 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1917 /// in the second element are accumulated via the `comb` closure.
1918 ///
1919 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1920 /// to depend on the order of elements in the stream.
1921 ///
1922 /// If the input and output value types are the same and do not require initialization then use
1923 /// [`Stream::reduce_keyed`].
1924 ///
1925 /// # Example
1926 /// ```rust
1927 /// # use hydro_lang::prelude::*;
1928 /// # use futures::StreamExt;
1929 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1930 /// let tick = process.tick();
1931 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1932 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1933 /// batch
1934 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1935 /// .all_ticks()
1936 /// # }, |mut stream| async move {
1937 /// // (1, 5), (2, 7)
1938 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1939 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1940 /// # }));
1941 /// ```
1942 pub fn fold_keyed<A, I, F>(
1943 self,
1944 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1945 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1946 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1947 where
1948 I: Fn() -> A + 'a,
1949 F: Fn(&mut A, V) + 'a,
1950 {
1951 self.into_keyed().fold(init, comb).entries()
1952 }
1953
1954 #[deprecated = "use .into_keyed().reduce(...) instead"]
1955 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1956 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1957 /// in the second element are accumulated via the `comb` closure.
1958 ///
1959 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1960 /// to depend on the order of elements in the stream.
1961 ///
1962 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1963 ///
1964 /// # Example
1965 /// ```rust
1966 /// # use hydro_lang::prelude::*;
1967 /// # use futures::StreamExt;
1968 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1969 /// let tick = process.tick();
1970 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1971 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1972 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1973 /// # }, |mut stream| async move {
1974 /// // (1, 5), (2, 7)
1975 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1976 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1977 /// # }));
1978 /// ```
1979 pub fn reduce_keyed<F>(
1980 self,
1981 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1982 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1983 where
1984 F: Fn(&mut V, V) + 'a,
1985 {
1986 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1987
1988 Stream::new(
1989 self.location.clone(),
1990 HydroNode::ReduceKeyed {
1991 f,
1992 input: Box::new(self.ir_node.into_inner()),
1993 metadata: self.location.new_node_metadata::<(K, V)>(),
1994 },
1995 )
1996 }
1997}
1998
1999impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2000where
2001 K: Eq + Hash,
2002 L: Location<'a>,
2003{
2004 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2005 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2006 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2007 /// in the second element are accumulated via the `comb` closure.
2008 ///
2009 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2010 /// as there may be non-deterministic duplicates.
2011 ///
2012 /// If the input and output value types are the same and do not require initialization then use
2013 /// [`Stream::reduce_keyed_commutative_idempotent`].
2014 ///
2015 /// # Example
2016 /// ```rust
2017 /// # use hydro_lang::prelude::*;
2018 /// # use futures::StreamExt;
2019 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2020 /// let tick = process.tick();
2021 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2022 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2023 /// batch
2024 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2025 /// .all_ticks()
2026 /// # }, |mut stream| async move {
2027 /// // (1, false), (2, true)
2028 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2029 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2030 /// # }));
2031 /// ```
2032 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2033 self,
2034 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2035 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2036 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2037 where
2038 I: Fn() -> A + 'a,
2039 F: Fn(&mut A, V) + 'a,
2040 {
2041 self.into_keyed()
2042 .fold_commutative_idempotent(init, comb)
2043 .entries()
2044 }
2045
2046 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2047 /// # Example
2048 /// ```rust
2049 /// # use hydro_lang::prelude::*;
2050 /// # use futures::StreamExt;
2051 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2052 /// let tick = process.tick();
2053 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2054 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2055 /// batch.keys().all_ticks()
2056 /// # }, |mut stream| async move {
2057 /// // 1, 2
2058 /// # assert_eq!(stream.next().await.unwrap(), 1);
2059 /// # assert_eq!(stream.next().await.unwrap(), 2);
2060 /// # }));
2061 /// ```
2062 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2063 self.into_keyed()
2064 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2065 .keys()
2066 }
2067
2068 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2069 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2070 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2071 /// in the second element are accumulated via the `comb` closure.
2072 ///
2073 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2074 /// as there may be non-deterministic duplicates.
2075 ///
2076 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2077 ///
2078 /// # Example
2079 /// ```rust
2080 /// # use hydro_lang::prelude::*;
2081 /// # use futures::StreamExt;
2082 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2083 /// let tick = process.tick();
2084 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2085 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2086 /// batch
2087 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2088 /// .all_ticks()
2089 /// # }, |mut stream| async move {
2090 /// // (1, false), (2, true)
2091 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2092 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2093 /// # }));
2094 /// ```
2095 pub fn reduce_keyed_commutative_idempotent<F>(
2096 self,
2097 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2098 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2099 where
2100 F: Fn(&mut V, V) + 'a,
2101 {
2102 self.into_keyed()
2103 .reduce_commutative_idempotent(comb)
2104 .entries()
2105 }
2106}
2107
2108impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2109where
2110 K: Eq + Hash,
2111 L: Location<'a>,
2112{
2113 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2114 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2115 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2116 /// in the second element are accumulated via the `comb` closure.
2117 ///
2118 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2119 ///
2120 /// If the input and output value types are the same and do not require initialization then use
2121 /// [`Stream::reduce_keyed_commutative`].
2122 ///
2123 /// # Example
2124 /// ```rust
2125 /// # use hydro_lang::prelude::*;
2126 /// # use futures::StreamExt;
2127 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2128 /// let tick = process.tick();
2129 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2130 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2131 /// batch
2132 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2133 /// .all_ticks()
2134 /// # }, |mut stream| async move {
2135 /// // (1, 5), (2, 7)
2136 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2137 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2138 /// # }));
2139 /// ```
2140 pub fn fold_keyed_commutative<A, I, F>(
2141 self,
2142 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2143 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2144 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2145 where
2146 I: Fn() -> A + 'a,
2147 F: Fn(&mut A, V) + 'a,
2148 {
2149 self.into_keyed().fold_commutative(init, comb).entries()
2150 }
2151
2152 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2153 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2154 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2155 /// in the second element are accumulated via the `comb` closure.
2156 ///
2157 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2158 ///
2159 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2160 ///
2161 /// # Example
2162 /// ```rust
2163 /// # use hydro_lang::prelude::*;
2164 /// # use futures::StreamExt;
2165 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2166 /// let tick = process.tick();
2167 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2168 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2169 /// batch
2170 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2171 /// .all_ticks()
2172 /// # }, |mut stream| async move {
2173 /// // (1, 5), (2, 7)
2174 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2175 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2176 /// # }));
2177 /// ```
2178 pub fn reduce_keyed_commutative<F>(
2179 self,
2180 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2181 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2182 where
2183 F: Fn(&mut V, V) + 'a,
2184 {
2185 self.into_keyed().reduce_commutative(comb).entries()
2186 }
2187}
2188
2189impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2190where
2191 K: Eq + Hash,
2192 L: Location<'a>,
2193{
2194 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2195 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2196 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2197 /// in the second element are accumulated via the `comb` closure.
2198 ///
2199 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2200 ///
2201 /// If the input and output value types are the same and do not require initialization then use
2202 /// [`Stream::reduce_keyed_idempotent`].
2203 ///
2204 /// # Example
2205 /// ```rust
2206 /// # use hydro_lang::prelude::*;
2207 /// # use futures::StreamExt;
2208 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2209 /// let tick = process.tick();
2210 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2211 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2212 /// batch
2213 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2214 /// .all_ticks()
2215 /// # }, |mut stream| async move {
2216 /// // (1, false), (2, true)
2217 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2218 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2219 /// # }));
2220 /// ```
2221 pub fn fold_keyed_idempotent<A, I, F>(
2222 self,
2223 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2224 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2225 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2226 where
2227 I: Fn() -> A + 'a,
2228 F: Fn(&mut A, V) + 'a,
2229 {
2230 self.into_keyed().fold_idempotent(init, comb).entries()
2231 }
2232
2233 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2234 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2235 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2236 /// in the second element are accumulated via the `comb` closure.
2237 ///
2238 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2239 ///
2240 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2241 ///
2242 /// # Example
2243 /// ```rust
2244 /// # use hydro_lang::prelude::*;
2245 /// # use futures::StreamExt;
2246 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2247 /// let tick = process.tick();
2248 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2249 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2250 /// batch
2251 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2252 /// .all_ticks()
2253 /// # }, |mut stream| async move {
2254 /// // (1, false), (2, true)
2255 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2256 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2257 /// # }));
2258 /// ```
2259 pub fn reduce_keyed_idempotent<F>(
2260 self,
2261 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2262 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2263 where
2264 F: Fn(&mut V, V) + 'a,
2265 {
2266 self.into_keyed().reduce_idempotent(comb).entries()
2267 }
2268}
2269
2270impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2271where
2272 L: Location<'a> + NoTick,
2273{
2274 /// Returns a stream corresponding to the latest batch of elements being atomically
2275 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2276 /// the order of the input.
2277 ///
2278 /// # Non-Determinism
2279 /// The batch boundaries are non-deterministic and may change across executions.
2280 pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2281 Stream::new(
2282 self.location.clone().tick,
2283 HydroNode::Unpersist {
2284 inner: Box::new(self.ir_node.into_inner()),
2285 metadata: self.location.new_node_metadata::<T>(),
2286 },
2287 )
2288 }
2289
2290 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2291 /// See [`Stream::atomic`] for more details.
2292 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2293 Stream::new(self.location.tick.l, self.ir_node.into_inner())
2294 }
2295
2296 /// Gets the [`Tick`] inside which this stream is synchronously processed. See [`Stream::atomic`].
2297 pub fn atomic_source(&self) -> Tick<L> {
2298 self.location.tick.clone()
2299 }
2300}
2301
2302impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2303where
2304 L: Location<'a> + NoTick + NoAtomic,
2305{
2306 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2307 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2308 ///
2309 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2310 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2311 /// argument that declares where the stream will be atomically processed. Batching a stream into
2312 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2313 /// [`Tick`] will introduce asynchrony.
2314 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2315 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2316 }
2317
2318 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2319 /// Future outputs are produced as available, regardless of input arrival order.
2320 ///
2321 /// # Example
2322 /// ```rust
2323 /// # use std::collections::HashSet;
2324 /// # use futures::StreamExt;
2325 /// # use hydro_lang::prelude::*;
2326 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2327 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2328 /// .map(q!(|x| async move {
2329 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2330 /// x
2331 /// }))
2332 /// .resolve_futures()
2333 /// # },
2334 /// # |mut stream| async move {
2335 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2336 /// # let mut output = HashSet::new();
2337 /// # for _ in 1..10 {
2338 /// # output.insert(stream.next().await.unwrap());
2339 /// # }
2340 /// # assert_eq!(
2341 /// # output,
2342 /// # HashSet::<i32>::from_iter(1..10)
2343 /// # );
2344 /// # },
2345 /// # ));
2346 pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2347 where
2348 T: Future<Output = T2>,
2349 {
2350 Stream::new(
2351 self.location.clone(),
2352 HydroNode::ResolveFutures {
2353 input: Box::new(self.ir_node.into_inner()),
2354 metadata: self.location.new_node_metadata::<T2>(),
2355 },
2356 )
2357 }
2358
2359 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2360 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2361 /// the order of the input. The output stream will execute in the [`Tick`] that was
2362 /// used to create the atomic section.
2363 ///
2364 /// # Non-Determinism
2365 /// The batch boundaries are non-deterministic and may change across executions.
2366 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2367 self.atomic(tick).batch(nondet)
2368 }
2369
2370 /// Given a time interval, returns a stream corresponding to samples taken from the
2371 /// stream roughly at that interval. The output will have elements in the same order
2372 /// as the input, but with arbitrary elements skipped between samples. There is also
2373 /// no guarantee on the exact timing of the samples.
2374 ///
2375 /// # Non-Determinism
2376 /// The output stream is non-deterministic in which elements are sampled, since this
2377 /// is controlled by a clock.
2378 pub fn sample_every(
2379 self,
2380 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2381 nondet: NonDet,
2382 ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2383 let samples = self.location.source_interval(interval, nondet);
2384
2385 let tick = self.location.tick();
2386 self.batch(&tick, nondet)
2387 .filter_if_some(samples.batch(&tick, nondet).first())
2388 .all_ticks()
2389 .weakest_retries()
2390 }
2391
2392 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2393 /// stream has not emitted a value since that duration.
2394 ///
2395 /// # Non-Determinism
2396 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2397 /// samples take place, timeouts may be non-deterministically generated or missed,
2398 /// and the notification of the timeout may be delayed as well. There is also no
2399 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2400 /// detected based on when the next sample is taken.
2401 pub fn timeout(
2402 self,
2403 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2404 nondet: NonDet,
2405 ) -> Optional<(), L, Unbounded> {
2406 let tick = self.location.tick();
2407
2408 let latest_received = self.assume_retries(nondet).fold_commutative(
2409 q!(|| None),
2410 q!(|latest, _| {
2411 *latest = Some(Instant::now());
2412 }),
2413 );
2414
2415 latest_received
2416 .snapshot(&tick, nondet)
2417 .filter_map(q!(move |latest_received| {
2418 if let Some(latest_received) = latest_received {
2419 if Instant::now().duration_since(latest_received) > duration {
2420 Some(())
2421 } else {
2422 None
2423 }
2424 } else {
2425 Some(())
2426 }
2427 }))
2428 .latest()
2429 }
2430}
2431
2432impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2433where
2434 L: Location<'a> + NoTick + NoAtomic,
2435 F: Future<Output = T>,
2436{
2437 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2438 /// Future outputs are produced in the same order as the input stream.
2439 ///
2440 /// # Example
2441 /// ```rust
2442 /// # use std::collections::HashSet;
2443 /// # use futures::StreamExt;
2444 /// # use hydro_lang::prelude::*;
2445 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2446 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2447 /// .map(q!(|x| async move {
2448 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2449 /// x
2450 /// }))
2451 /// .resolve_futures_ordered()
2452 /// # },
2453 /// # |mut stream| async move {
2454 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2455 /// # let mut output = Vec::new();
2456 /// # for _ in 1..10 {
2457 /// # output.push(stream.next().await.unwrap());
2458 /// # }
2459 /// # assert_eq!(
2460 /// # output,
2461 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2462 /// # );
2463 /// # },
2464 /// # ));
2465 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2466 Stream::new(
2467 self.location.clone(),
2468 HydroNode::ResolveFuturesOrdered {
2469 input: Box::new(self.ir_node.into_inner()),
2470 metadata: self.location.new_node_metadata::<T>(),
2471 },
2472 )
2473 }
2474}
2475
2476impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2477where
2478 L: Location<'a> + NoTick,
2479{
2480 /// Executes the provided closure for every element in this stream.
2481 ///
2482 /// Because the closure may have side effects, the stream must have deterministic order
2483 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2484 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2485 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2486 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2487 let f = f.splice_fn1_ctx(&self.location).into();
2488 let metadata = self.location.new_node_metadata::<T>();
2489 self.location
2490 .flow_state()
2491 .borrow_mut()
2492 .push_root(HydroRoot::ForEach {
2493 input: Box::new(HydroNode::Unpersist {
2494 inner: Box::new(self.ir_node.into_inner()),
2495 metadata: metadata.clone(),
2496 }),
2497 f,
2498 op_metadata: HydroIrOpMetadata::new(),
2499 });
2500 }
2501
2502 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2503 /// TCP socket to some other server. You should _not_ use this API for interacting with
2504 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2505 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2506 /// interaction with asynchronous sinks.
2507 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2508 where
2509 S: 'a + futures::Sink<T> + Unpin,
2510 {
2511 self.location
2512 .flow_state()
2513 .borrow_mut()
2514 .push_root(HydroRoot::DestSink {
2515 sink: sink.splice_typed_ctx(&self.location).into(),
2516 input: Box::new(self.ir_node.into_inner()),
2517 op_metadata: HydroIrOpMetadata::new(),
2518 });
2519 }
2520}
2521
2522impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2523where
2524 L: Location<'a>,
2525{
2526 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2527 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2528 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2529 Stream::new(
2530 self.location.outer().clone(),
2531 HydroNode::Persist {
2532 inner: Box::new(self.ir_node.into_inner()),
2533 metadata: self.location.new_node_metadata::<T>(),
2534 },
2535 )
2536 }
2537
2538 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2539 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2540 ///
2541 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2542 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2543 /// stream's [`Tick`] context.
2544 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2545 Stream::new(
2546 Atomic {
2547 tick: self.location.clone(),
2548 },
2549 HydroNode::Persist {
2550 inner: Box::new(self.ir_node.into_inner()),
2551 metadata: self.location.new_node_metadata::<T>(),
2552 },
2553 )
2554 }
2555
2556 /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2557 ///
2558 /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2559 /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2560 /// careful when using this operator, as its memory usage will grow linearly over time since it
2561 /// must store its inputs indefinitely.
2562 ///
2563 /// # Example
2564 /// ```rust
2565 /// # use hydro_lang::prelude::*;
2566 /// # use futures::StreamExt;
2567 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2568 /// let tick = process.tick();
2569 /// // ticks are lazy by default, forces the second tick to run
2570 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2571 ///
2572 /// let batch_first_tick = process
2573 /// .source_iter(q!(vec![1, 2, 3, 4]))
2574 /// .batch(&tick, nondet!(/** test */));
2575 /// let batch_second_tick = process
2576 /// .source_iter(q!(vec![5, 6, 7, 8]))
2577 /// .batch(&tick, nondet!(/** test */))
2578 /// .defer_tick(); // appears on the second tick
2579 /// batch_first_tick.chain(batch_second_tick)
2580 /// .persist()
2581 /// .all_ticks()
2582 /// # }, |mut stream| async move {
2583 /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2584 /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2585 /// # assert_eq!(stream.next().await.unwrap(), w);
2586 /// # }
2587 /// # }));
2588 /// ```
2589 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2590 where
2591 T: Clone,
2592 {
2593 Stream::new(
2594 self.location.clone(),
2595 HydroNode::Persist {
2596 inner: Box::new(self.ir_node.into_inner()),
2597 metadata: self.location.new_node_metadata::<T>(),
2598 },
2599 )
2600 }
2601
2602 #[expect(missing_docs, reason = "TODO")]
2603 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2604 Stream::new(
2605 self.location.clone(),
2606 HydroNode::DeferTick {
2607 input: Box::new(self.ir_node.into_inner()),
2608 metadata: self.location.new_node_metadata::<T>(),
2609 },
2610 )
2611 }
2612}
2613
2614#[cfg(test)]
2615mod tests {
2616 use futures::{SinkExt, StreamExt};
2617 use hydro_deploy::Deployment;
2618 use serde::{Deserialize, Serialize};
2619 use stageleft::q;
2620
2621 use crate::compile::builder::FlowBuilder;
2622 use crate::location::Location;
2623
2624 mod backtrace_chained_ops;
2625
2626 struct P1 {}
2627 struct P2 {}
2628
2629 #[derive(Serialize, Deserialize, Debug)]
2630 struct SendOverNetwork {
2631 n: u32,
2632 }
2633
2634 #[tokio::test]
2635 async fn first_ten_distributed() {
2636 let mut deployment = Deployment::new();
2637
2638 let flow = FlowBuilder::new();
2639 let first_node = flow.process::<P1>();
2640 let second_node = flow.process::<P2>();
2641 let external = flow.external::<P2>();
2642
2643 let numbers = first_node.source_iter(q!(0..10));
2644 let out_port = numbers
2645 .map(q!(|n| SendOverNetwork { n }))
2646 .send_bincode(&second_node)
2647 .send_bincode_external(&external);
2648
2649 let nodes = flow
2650 .with_process(&first_node, deployment.Localhost())
2651 .with_process(&second_node, deployment.Localhost())
2652 .with_external(&external, deployment.Localhost())
2653 .deploy(&mut deployment);
2654
2655 deployment.deploy().await.unwrap();
2656
2657 let mut external_out = nodes.connect_source_bincode(out_port).await;
2658
2659 deployment.start().await.unwrap();
2660
2661 for i in 0..10 {
2662 assert_eq!(external_out.next().await.unwrap().n, i);
2663 }
2664 }
2665
2666 #[tokio::test]
2667 async fn first_cardinality() {
2668 let mut deployment = Deployment::new();
2669
2670 let flow = FlowBuilder::new();
2671 let node = flow.process::<()>();
2672 let external = flow.external::<()>();
2673
2674 let node_tick = node.tick();
2675 let count = node_tick
2676 .singleton(q!([1, 2, 3]))
2677 .into_stream()
2678 .flatten_ordered()
2679 .first()
2680 .into_stream()
2681 .count()
2682 .all_ticks()
2683 .send_bincode_external(&external);
2684
2685 let nodes = flow
2686 .with_process(&node, deployment.Localhost())
2687 .with_external(&external, deployment.Localhost())
2688 .deploy(&mut deployment);
2689
2690 deployment.deploy().await.unwrap();
2691
2692 let mut external_out = nodes.connect_source_bincode(count).await;
2693
2694 deployment.start().await.unwrap();
2695
2696 assert_eq!(external_out.next().await.unwrap(), 1);
2697 }
2698
2699 #[tokio::test]
2700 async fn unbounded_scan_remembers_state() {
2701 let mut deployment = Deployment::new();
2702
2703 let flow = FlowBuilder::new();
2704 let node = flow.process::<()>();
2705 let external = flow.external::<()>();
2706
2707 let (input_port, input) = node.source_external_bincode(&external);
2708 let out = input
2709 .scan(
2710 q!(|| 0),
2711 q!(|acc, v| {
2712 *acc += v;
2713 Some(*acc)
2714 }),
2715 )
2716 .send_bincode_external(&external);
2717
2718 let nodes = flow
2719 .with_process(&node, deployment.Localhost())
2720 .with_external(&external, deployment.Localhost())
2721 .deploy(&mut deployment);
2722
2723 deployment.deploy().await.unwrap();
2724
2725 let mut external_in = nodes.connect_sink_bincode(input_port).await;
2726 let mut external_out = nodes.connect_source_bincode(out).await;
2727
2728 deployment.start().await.unwrap();
2729
2730 external_in.send(1).await.unwrap();
2731 assert_eq!(external_out.next().await.unwrap(), 1);
2732
2733 external_in.send(2).await.unwrap();
2734 assert_eq!(external_out.next().await.unwrap(), 3);
2735 }
2736}