package octez-libs
include Distributed.Process with type 'a io = 'a Lwt.t
Exception that is raised if run_node
is called more than once.
exception InvalidNode of Distributed.Node_id.t
Exception that is raised when add_remote_node
or remove_remote_node
is called on a node that is operating in local only mode.
type 'a io = 'a Lwt.t
Abstract type for monadic concurrent IO returning 'a
.
The abstract type representing a non-empty list of matchers to be used with receive
function.
The abstract type representing a monitor_ref that is returned when a processes is monitored and can be used to unmonitor it.
type monitor_reason =
| Normal of Distributed.Process_id.t
(*Process terminated normally.
*)| Exception of Distributed.Process_id.t * exn
(*Process terminated with exception.
*)| UnkownNodeId of Distributed.Process_id.t * Distributed.Node_id.t
(*An operation failed because the remote node id is unknown.
*)| NoProcess of Distributed.Process_id.t
(*Attempted to monitor a process that does not exist.
*)
Reason for process termination.
module Remote_config : sig ... end
The configuration of a node to be run as a remote node i.e., one that can both send an receive messages with other nodes.
module Local_config : sig ... end
The configuration of a node to be run as a local node i.e., one that can not send or receive messages with other nodes.
The configuration of a node. Can be one of node_config.Local
or node_config.Remote
.
val return : 'a -> 'a t
return v
creates a computation returning v
.
c >>= f
is a computation which first waits for the computation c
to terminate and then, if the computation succeeds, behaves as the application of function f
to the return value of c
. If the computation c
fails, c >>= f
also fails, with the same exception.
val register : string -> (Distributed.Process_id.t -> unit -> unit t) -> unit t
val fail : exn -> 'a t
fail e
is a process that fails with the exception e
.
catch p f
is a process that behaves as the process p ()
if this process succeeds. If the process p ()
fails with some exception, catch p f
behaves as the application of f
to this exception.
val spawn :
?monitor:bool ->
Distributed.Node_id.t ->
proc_rep ->
Distributed.Process_id.t ->
(Distributed.Process_id.t * monitor_ref option) t
spawn monitor name node_id process
will spawn process
on node_id
returning the Process_id.t
associated with the newly spawned process. If monitor
is true (default value is false) then the spawned process will also be monitored and the associated monitor_ref
will be returned.
If node_id
is an unknown node then InvalidNode
exception is raised.
val case : (message_type -> (unit -> 'a t) option) -> 'a matcher_list
case match_fn
will create a matcher_list
which will use match_fn
to match on potential messages. match_fn
should return None
to indicate no match or Some handler
where handler
is the function that should be called to handle the matching message.
val termination_case : (monitor_reason -> 'a t) -> 'a matcher_list
termination_case handler
will create a matcher_list
which can use used to match against termination_reason
for a process that is being monitored. If this process is monitoring another process then providing this matcher in the list of matchers to receive
will allow this process to act on the termination of the monitored process.
NOTE : when a remote process (i.e., one running on another node) raises an exception you will not be able to pattern match on the exception . This is a limitation of the Marshal OCaml module : " Values of extensible variant types, for example exceptions (of extensible type exn), returned by the unmarshaller should not be pattern-matched over through match ... with
or try ... with
, because unmarshalling does not preserve the information required for matching their constructors. Structural equalities with other extensible variant values does not work either. Most other uses such as Printexc.to_string, will still work as expected. "
See http://caml.inria.fr/pub/docs/manual-ocaml/libref/Marshal.html.
val (|.) : 'a matcher_list -> 'a matcher_list -> 'a matcher_list
a_matcher |. b_matcher
is a matcher_list
consiting of the matchers in a_matcher
followed by the matchers in b_matcher
.
val receive : ?timeout_duration:float -> 'a matcher_list -> 'a option t
receive timeout matchers
will wait for a message to be sent to this process which matches one of matchers provided in matchers
. The first matching matcher in matchers
will used process the matching message returning Some result
where result
is result of the matcher processing the matched message. All the other non-matching messages are left in the same order they came in.
If a time out is provided and no matching messages has arrived in the time out period then None will be returned.
If the matchers
is empty then an Empty_matchers
exception is raised.
val receive_loop : ?timeout_duration:float -> bool matcher_list -> unit t
receive_loop timeout matchers
is a convenience function which will loop until a matcher in matchers
returns false.
val send : Distributed.Process_id.t -> message_type -> unit t
send process_id msg
will send, asynchronously, message msg
to the process with id process_id
(possibly running on a remote node).
If process_id
is resides on an unknown node then InvalidNode
exception is raised.
If process_id
is an unknown process but the node on which it resides is known then send will still succeed (i.e., will not raise any exceptions).
val (>!) : Distributed.Process_id.t -> message_type -> unit t
pid >! msg
is equivalent to send pid msg
. >!
is an infix alias for send
.
val broadcast : Distributed.Node_id.t -> message_type -> unit t
broadcast node_id msg
will send, asynchronously, message msg
to all the processes on node_id
.
If node_id
is an unknown node then InvalidNode
exception is raised.
val monitor : Distributed.Process_id.t -> monitor_ref t
monitor pid
will allows the calling process to monitor pid
. When pid
terminates (normally or abnormally) this monitoring process will receive a termination_reason
message, which can be matched in receive
using termination_matcher
. A single process can be monitored my multiple processes.
If process_id
is resides on an unknown node then InvalidNode
exception is raised.
val unmonitor : monitor_ref -> unit t
unmonitor mref
will cause this process to stop monitoring the process which is referenced by mref
. If the current process is not monitoring the process referenced by mref
then unmonitor
is a no-op.
If process being unmonitored as indicated by monitor_ref
is resides on an unknown node then InvalidNode
exception is raised.
val get_self_pid : Distributed.Process_id.t t
get_self_pid process
will return the process id associated with process
.
val get_self_node : Distributed.Node_id.t t
get_self_node process
will return the node id associated with process
.
val get_remote_node : string -> Distributed.Node_id.t option t
get_remote_node node_name
will return the node id associated with name
, if there is no record of a node with name
at this time then None
is returned.
val get_remote_nodes : Distributed.Node_id.t list t
The list of all nodes currently active and inactive.
val add_remote_node : string -> int -> string -> Distributed.Node_id.t t
add_remote_node ip port name
will connect to the remote node at ip
:port
with name name
and add it to the current nodes list of connected remote nodes. The newly added node id is returned as the result. Adding a remote node that already exists is a no-op.
If the node is operating in local only mode then Local_only_mode
is raised.
val remove_remote_node : Distributed.Node_id.t -> unit t
remove_remote_node node_id
will remove node_id
from the list of connected remote nodes.
If the node is operating in local only mode then Local_only_mode
is raised.
val run_node : ?process:(unit -> unit t) -> node_config -> unit io
run_node process node_monitor_fn node_config
performs the necessary bootstrapping to start this node according to node_config
. If provided, runs the initial process
returning the resulting io
.
If it's called more than once then an exception of Init_more_than_once
is raised.
module M :
Communication.Distributed_wrapper.Enriched_message_type
with type 'a step = 'a Msg.step
with type 'a request = 'a Msg.request
with type 'a reply = 'a Msg.reply
Additional monadic interface
val dmap :
pids:Distributed.Process_id.t list ->
request:('a -> index:int -> 'step M.request) ->
reply:('step M.reply -> (unit -> 'b t) option) ->
'a list ->
'b list t
dmap ~pids ~request ~reply l
sends requests built by applying request
to the elements of l
to the workers pids
and waits to receive a valid reply
from each worker.
val handle_request :
Distributed.Process_id.t ->
step:'step M.step ->
handler:('step M.request -> (unit -> ('step M.reply * 'b) t) option) ->
'b t
handle_request master_pid ~setp ~handler l
waits to receive a request for a given step
, process it through handler
and sends the reply to master_pid
. The handler
might also return some additional data ('b) that isn't meant to be sent back to the master, but rather kept by the worker for future computation.