package async_kernel

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type
module T1 : sig ... end

A time source has a phantom read-write parameter, where write gives permission to call advance and fire_past_alarms.

module Read_write : sig ... end
type t = Core.read T1.t
val sexp_of_t : t -> Sexplib0.Sexp.t
val id : _ T1.t -> Id.t

id t returns a unique, consistent identifier which can be used e.g. as a map or hash table key.

include Core.Invariant.S with type t := t
val invariant : t Base__Invariant_intf.inv
val invariant_with_jobs : job: (Async_kernel__.Types.Execution_context.t, Stdlib.Obj.t -> unit, Stdlib.Obj.t) Tuple_pool.Slots.t3 Tuple_pool.Pointer.t Core.Invariant.t -> t Core.Invariant.t
val read_only : [> Core.read ] T1.t -> t
val create : ?timing_wheel_config:Timing_wheel.Config.t -> now:Core.Time_ns.t -> unit -> _ T1.t

Creates a new simulated time source.

val wall_clock : unit -> t

A time source with now t given by wall-clock time (i.e., Time_ns.now) and that is advanced automatically as time passes (specifically, at the start of each Async cycle). There is only one wall-clock time source; every call to wall_clock () returns the same value. The behavior of now is special for wall_clock (); it always calls Time_ns.now (), so it can return times that the time source has not yet been advanced to.

Accessors. now (wall_clock ()) behaves specially; see wall_clock above.

val is_wall_clock : [> Core.read ] T1.t -> bool
val next_alarm_fires_at : [> Core.read ] T1.t -> Core.Time_ns.t option
val now : [> Core.read ] T1.t -> Core.Time_ns.t
val timing_wheel_now : [> Core.read ] T1.t -> Core.Time_ns.t

Removes the special behavior of now for wall_clock; it always returns the timing_wheel's notion of now.

val advance_directly : [> Core.write ] T1.t -> to_:Core.Time_ns.t -> unit

Instead of advance_directly, you probably should use advance_by_alarms. advance_directly t ~to_ advances the clock directly to to_, whereas advance_by_alarms advances the clock in steps, to each intervening alarm. advance_directly approximately determines the set of events to fire, up to timing-wheel alarm precision, whereas advance_by_alarms fires all alarms whose time is <= to_. With advance_directly, you must call fire_past_alarms if you want that behavior (see docs for Timing_wheel.advance_clock vs. Timing_wheel.fire_past_alarms).

val advance : [> Core.write ] T1.t -> to_:Core.Time_ns.t -> unit
  • deprecated [since 2019-06] Use [advance_directly] (to preserve behavior) or [advance_by_alarms]
val advance_directly_by : [> Core.write ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> unit
  • deprecated [since 2019-06] Use [advance_directly_by] (to preserve behavior) or [advance_by_alarms_by]
val fire_past_alarms : [> Core.write ] T1.t -> unit
val advance_by_alarms : ?wait_for:(unit -> unit Deferred.t) -> [> Core.write ] T1.t -> to_:Core.Time_ns.t -> unit Deferred.t

advance_by_alarms t repeatedly calls advance t to drive the time forward in steps, where each step is the minimum of to_ and the next alarm time. After each step, advance_by_alarms waits for the result of wait_for to become determined before advancing. By default, wait_for will be Scheduler.yield () to allow the triggered timers to execute and potentially rearm for subsequent steps. The returned deferred is filled when to_ is reached.

advance_by_alarms is useful in simulation when one wants to efficiently advance to a time in the future while giving periodic timers (e.g., resulting from every) a chance to fire with approximately the same timing as they would live.

Note: an alarm is anything that's scheduled to happen at a particular time using this time source, so e.g. any scheduled Event, something scheduled by run_*, or any deferred returned by at/after.

val advance_by_max_alarms_in_each_timing_wheel_interval : ?wait_for:(unit -> unit Deferred.t) -> [> Core.write ] T1.t -> to_:Core.Time_ns.t -> unit Deferred.t
  • deprecated [since 2021-12] This is the old implementation of [advance_by_alarms], kept in case the new implementation causes problems.
val advance_by_alarms_by : ?wait_for:(unit -> unit Deferred.t) -> [> Core.write ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> unit Deferred.t

advance_by_alarms_by ?wait_for t by is equivalent to: advance_by_alarms ?wait_for t ~to_:(Time_ns.add (now t) by)

module Continue : sig ... end
val run_repeatedly : ?start:unit Deferred.t -> ?stop:unit Deferred.t -> ?continue_on_error:bool -> ?finished:unit Ivar.t -> [> Core.read ] T1.t -> f:(unit -> unit Deferred.t) -> continue:Continue.t -> unit

run_repeatedly is the same as every', with the delay between jobs controlled by continue. When continue is Continue.immediately (the only value currently exposed in this interface), a new execution of f will be scheduled immediately after the deferred returned by f is resolved.

The functions below here are the same as in clock_intf.ml, except they take an explicit t argument. See Clock_intf for documentation.

val run_at : [> Core.read ] T1.t -> Core.Time_ns.t -> ('a -> unit) -> 'a -> unit
val run_after : [> Core.read ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> ('a -> unit) -> 'a -> unit
val at : [> Core.read ] T1.t -> Core.Time_ns.t -> unit Deferred.t
val with_timeout : [> Core.read ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> 'a Deferred.t -> [ `Timeout | `Result of 'a ] Deferred.t
module Event : sig ... end
val at_varying_intervals : ?stop:unit Deferred.t -> [> Core.read ] T1.t -> (unit -> Core.Core_private.Time_ns_alternate_sexp.Span.t) -> unit Tail.Stream.t
val every' : ?start:unit Deferred.t -> ?stop:unit Deferred.t -> ?continue_on_error:bool -> ?finished:unit Ivar.t -> [> Core.read ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> (unit -> unit Deferred.t) -> unit

See Clock.every' for documentation.

val every : ?start:unit Deferred.t -> ?stop:unit Deferred.t -> ?continue_on_error:bool -> [> Core.read ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> (unit -> unit) -> unit
val run_at_intervals' : ?start:Core.Time_ns.t -> ?stop:unit Deferred.t -> ?continue_on_error:bool -> [> Core.read ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> (unit -> unit Deferred.t) -> unit
val run_at_intervals : ?start:Core.Time_ns.t -> ?stop:unit Deferred.t -> ?continue_on_error:bool -> [> Core.read ] T1.t -> Core.Core_private.Time_ns_alternate_sexp.Span.t -> (unit -> unit) -> unit
val of_synchronous : 'a Async_kernel__.Types.Time_source.t1 -> 'a T1.t

Time_source and Synchronous_time_source are the same data structure and use the same underlying timing wheel. The types are freely interchangeable.

val to_synchronous : 'a T1.t -> 'a Async_kernel__.Types.Time_source.t1
val advance_directly_if_quiescent : [> Core.write ] T1.t -> to_:Core.Time_ns.t -> bool

Advance iff:

  • no alarms are scheduled up (and including) to that time point
  • no jobs are runnable (which could cause events to happen in the time range)

Returns true if we advanced, false if we were unable to.

This is an optimisation relative to (for instance) Time_source.advance_by_alarms or other methods that will rely on running async cycles to produce quiescence.

OCaml

Innovation. Community. Security.