package streaming

  1. Overview
  2. Docs

Module with defintions for sinks.

Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can aquire resources that are guaranteed to be terminated when the sink is filled.

Sinks are a great way to define decoupled consumers that can be filled with Stream.into.

Sinks are independent from sources and streams. You can think of them as packed arguments for folding functions with early termination. Formally, they can also be interpreted as Moore machine.

type ('a, 'b) t = ('a, 'b) sink

Type for sinks that consume elements of type 'a and, once done, produce a value of type 'b.

Creating a sink

Implementing custom sinks is useful to create a collection of reusable streaming consumers for your application.

The following example demonstrates a sink that consumes all elements into a list:

let list_sink =
  let init () = [] in
  let push acc x = x :: acc in
  let stop acc = List.rev acc in
  Sink.make ~init ~push ~stop ()

Alternatively, existing list/array/string/queue sinks, or others listed below, can be used.

val fill : 'r -> ('a, 'r) t

fill result use result to fill the sink. This sink will not consume any input and will immediately produce result when used.

val fold : ('r -> 'a -> 'r) -> 'r -> ('a, 'r) t

fold f init is a sink that reduces all input elements with the stepping function f starting with the accumulator value init.

val fold_while : ('r -> bool) -> ('r -> 'a -> 'r) -> 'r -> ('a, 'r) t

fold_while full f init is similar to fold but can terminate early if full returns true.

val make : init:(unit -> 'acc) -> push:('acc -> 'a -> 'acc) -> ?full:('acc -> bool) -> stop:('acc -> 'r) -> unit -> ('a, 'r) t

Creates a sink from a function that initializes a state value, a stepping function to update that state and a stop function that produces the final result value. Optionally a full function can be passed to decide when the sink should terminate early.

Note: The calls to full should be cheap as this function will be called to avoid allocation of unnecessary resources. If the computation required to decide if the sink is full is expensive, consider caching it whenever possible.

Basic sinks

val full : ('a, unit) t

A full sink that will not consume any input and will not produce any results.

val is_empty : ('a, bool) t

is_empty is true if the sink did not consume any elements and false otherwise.

val each : ('a -> unit) -> ('a, unit) t

Applies an effectful action to all input elements producing nothing.

val len : ('a, int) t

Consumes and counts all input elements.

val first : ('a, 'a option) t

The first input element, or None if the sink did not receive enough input.

Equivalent to nth 0.

val last : ('a, 'a option) t

The last input element, or None if the sink did not receive enough input.

val nth : int -> ('a, 'a option) t

The n-th input element, or None if the sink did not receive enough input.

val drain : ('a, unit) t

Consumes all elements producing nothing. Useful for triggering actions in effectful streams.

Finding elements

val contains : where:('a -> bool) -> ('a, bool) t

contains ~where:pred finds the first element that satisfies pred returning None if there is no such element.

val find : where:('a -> bool) -> ('a, 'a option) t

find ~where:pred finds the first element that satisfies pred returning None if there is no such element.

val index : where:('a -> bool) -> ('a, int option) t

Similar to find but returns the index of the element that satisfies the predicate.

val minimum : by:('a -> 'a -> bool) -> ('a, 'a option) t

Finds the minimum element in the sequence, using the given predicate as as the comparison between the input elements.

val maximum : by:('a -> 'a -> bool) -> ('a, 'a option) t

Finds the maximum element in the sequence, using the given predicate as as the comparison between the input elements.

Logical predicates

val all : where:('a -> bool) -> ('a, bool) t

all ~where:pred is true if all input element satisfy pred. Will stop consuming elements when the first element that does not satisfy pred is found. Results in true for empty input.

val any : where:('a -> bool) -> ('a, bool) t

any ~where:pred is true if at least one input element satisfies pred. Will stop consuming elements when such an element is found. Results in false for empty input.

Data sinks

val list : ('a, 'a list) t

Puts all input elements into a list.

val array : ('a, 'a array) t

Puts all input elements into an array.

val buffer : int -> ('a, 'a array) t

Similar to array buf will only consume n elements.

val queue : ('a, 'a Stdlib.Queue.t) t

Puts all input elements into a queue.

val string : (string, string) t

Consumes and concatenates strings.

val bytes : (bytes, bytes) t

Consumes and concatenates bytes.

IO sinks

val print : (string, unit) t

Prints all input string elements to standard output as lines.

val file : string -> (string, unit) t

file path is a sink that writes input strings as lines into a file located at path.

val stdout : (string, unit) t

A sink that writes input strings as lines to STDOUT.

val stderr : (string, unit) t

A sink that writes input strings as lines to STDERR.

Numeric compuations

val sum : (int, int) t

Adds all input integer values.

val product : (int, int) t

Product of input integer values. Stops if any input element is 0.

val mean : (float, float) t

Computes a numerically stable arithmetic mean of all input elements.

Combining sinks

val zip : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

zip left right computes both left and right at the same time with the same input being sent to both sinks. The results of both sinks are produced.

val zip_left : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

zip_left left right similar to zip, but only produces the result of the left sink.

val zip_right : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

zip_left left right similar to zip, but only produces the result of the right sink.

val zip_with : ('r1 -> 'r2 -> 'r) -> ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r) t

zip_with f left right similar to zip, but applies an aggregation function to results produced by left and right.

val (<&>) : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

left <&> right is an operator version of zip left right.

val (<&) : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

left <& right is an operator version of zip_left left right.

val (&>) : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

left &> right is an operator version of zip_right left right.

val unzip : ('a, 'r1) t -> ('b, 'r2) t -> ('a * 'b, 'r1 * 'r2) t

unzip left right is a sink that receives pairs 'a * 'b, sending the first element into left and the second into right. Both sinks are computed at the same time and their results returned as an output pair.

The sink becomes full when either left or right get full.

val unzip_left : ('a, 'r) t -> ('b, _) t -> ('a * 'b, 'r) t

unzip_left left right is similar to unzip, but only produces the result of the left sink.

If right terminates first, left will be forced to terminate.

val unzip_right : ('a, _) t -> ('b, 'r) t -> ('a * 'b, 'r) t

unzip_left left right is similar to unzip, but only produces the result of the right sink.

If left terminates first, right will be forced to terminate.

val unzip_with : ('r1 -> 'r2 -> 'r) -> ('a, 'r1) t -> ('b, 'r2) t -> ('a * 'b, 'r) t

unzip_with f left right similar to unzip, but applies an aggregation function to results produced by left and right.

val (<*>) : ('a, 'r1) t -> ('b, 'r2) t -> ('a * 'b, 'r1 * 'r2) t

left <*> right is an operator version of unzip left right.

val (<*) : ('a, 'r) t -> ('b, _) t -> ('a * 'b, 'r) t

left <* right is an operator version of unzip_left left right.

val (*>) : ('a, _) t -> ('b, 'r) t -> ('a * 'b, 'r) t

left *> right is an operator version of unzip_right left right.

val distribute : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

distribute left right is similar to zip but distributes the consumed elements over left and right alternating in a round robin fashion.

type ('a, 'b) race =
  1. | Left of 'a
  2. | Right of 'b
  3. | Both of 'a * 'b

Type for race result values.

val race : ('a, 'r1) t -> ('a, 'r2) t -> ('a, ('r1, 'r2) race) t

race left right runs both left and right sinks at the same time producing the result for the one that fills first.

If the sink is terminated prematurely, before either left or right are filled, Both of their values are produced.

Examples

let sink = Sink.(race (find ~where:(fun x -> x > 10)) (nth 8)) in
let result = Stream.of_list [1; 9; 0; 8; 30; 4] |> Stream.into sink in
assert (result = Sink.Left (Some 30))
val (<|>) : ('a, 'r1) t -> ('a, 'r2) t -> ('a, ('r1, 'r2) race) t

left <|> right is the operator version of race left right.

val seq : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

seq left right runs left and then right sequentially producing both of their results.

If the resulting sink is stopped before right was started, it will be forced to initialize and terminate.

val seq_left : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

seq_left left right is similar to seq, but only produces the result of the left sink.

val seq_right : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

seq_right left right is similar to seq, but only produces the result of the right sink.

val (<+>) : ('a, 'r1) t -> ('a, 'r2) t -> ('a, 'r1 * 'r2) t

left <+> right is an operator version of seq left right.

val (<+) : ('a, 'r) t -> ('a, _) t -> ('a, 'r) t

left <+ right is an operator version of seq_left left right.

val (+>) : ('a, _) t -> ('a, 'r) t -> ('a, 'r) t

left +> right is an operator version of seq_right left right.

Mapping and filtering sinks

val map : ('r1 -> 'r2) -> ('a, 'r1) t -> ('a, 'r2) t

map f sink is a sink sink with the result transformed with f.

val (<@>) : ('a -> 'b) -> ('c, 'a) t -> ('c, 'b) t

f <@> sink is the operator version of map f sink.

val premap : ('b -> 'a) -> ('a, 'r) t -> ('b, 'r) t

premap f sink is a sink that premaps the input values.

Examples

If sink consumes integers, but we have an input with strings, we can provide a conversion from strings to integers to premap:

let sink = Sink.(premap int_of_string sum) in
let result = Stream.of_list ["1"; "2"; "3"] |> Stream.into sink in
assert (result = 6)
val prefilter : ('a -> bool) -> ('a, 'r) t -> ('a, 'r) t

prefilter predicate sink is a sink that filter the input value for sink.

Resource management

val dispose : ('a, 'r) t -> 'r

Close the sink and produce the currently accumulated result. Any internal state will be terminated.

Syntax definitions

In addition to using the sinks and operations defined above, it is possible to create sinks with a convenient (let) notation.

A common example of a composed sink is the sink that computes the arithmetic mean:

let mean =
  let open Sink.Syntax in
  let+ total = Sink.sum
  and+ count = Sink.len in
  total / count

The resulting sink has type (int, int) sink and will only consume the input once!

module Syntax : sig ... end

Module with syntax definitions for sinks.

Interface implementations

module Functor : sig ... end

Module that implements the "Functor" interface.

module Applicative : sig ... end

Module that implements the "Applicative" interface.