include module type of struct include Async.Reader end
module Read_result = Async.Reader.Read_result
module Id = Async.Reader.Id
type t = Async_unix.Reader.t
include sig ... end
val sexp_of_t : t -> Ppx_sexp_conv_lib.Sexp.t
include Async_kernel.Invariant.S with type t := t
val invariant : t -> unit
val io_stats : Async_unix.Io_stats.t
Overall IO statistics for all readers.
val last_read_time : t -> Core.Time.t
Returns time of the most recent
read system call that returned data.
val stdin : t Core.Lazy.t
stdin is a reader for file descriptor 0. It is lazy because we don't want to create it in all programs that happen to link with Async.
val open_file : ?buf_len:int -> string -> t Async_kernel.Deferred.t
open_file file opens
file for reading and returns a reader reading from it.
val transfer : t -> string Async_kernel.Pipe.Writer.t -> unit Async_kernel.Deferred.t
transfer t pipe_w transfers data from
pipe_w one chunk at a time (whatever is read from the underlying file descriptor without post-processing). The result becomes determined after reaching EOF on
t and the final bytes have been transferred, or if
pipe_w is closed.
This function will normally not be needed (see
val pipe : t -> string Async_kernel.Pipe.Reader.t
pipe t returns the reader end of a pipe that will continually be filled with chunks of data from the underlying
Reader.t. When the reader reaches EOF or the pipe is closed,
pipe closes the reader, and then after the reader close is finished, closes the pipe.
val of_pipe : Core.Info.t -> string Async_kernel.Pipe.Reader.t -> t Async_kernel.Deferred.t
of_pipe info pipe_r returns a reader
t that receives all the data from
pipe_r is closed,
t will see an EOF (but will not be automatically closed). If
t is closed, then
pipe_r will stop being drained.
of_pipe is implemented by shuttling bytes from
pipe_r to the write-end of a Unix pipe, with
t being attached to the read end of the Unix pipe.
val create : ?buf_len:int -> Async_unix.Fd.t -> t
create ~buf_len fd creates a new reader that is reading from
val of_in_channel : Core.In_channel.t -> Async_unix.Fd.Kind.t -> t
val with_file : ?buf_len:int -> ?exclusive:bool -> string -> f:(t -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t
with_file file f opens
files, creates a reader with it, and passes the reader to
f. It closes the reader when the result of
f becomes determined, and returns
Note: You need to be careful that all your IO is done when the deferred you return becomes determined. If for example you use
with_file and call
lines, make sure you return a deferred that becomes determined when the EOF is reached on the pipe, not when you get the pipe (because you get it straight away).
val close : t -> unit Async_kernel.Deferred.t
close t prevents further use of
t and closes
t's underlying file descriptor. The result of
close becomes determined once the underlying file descriptor has been closed. It is an error to call other operations on
close t has been called, except that calls of
close subsequent to the original call to
close will return the same deferred as the original call.
close_finished t becomes determined after
t's underlying file descriptor has been closed, i.e., it is the same as the result of
close_finished differs from
close in that it does not have the side effect of initiating a close.
is_closed t returns
close t has been called.
with_close t ~f runs
f (), and closes
f finishes or raises.
val close_finished : t -> unit Async_kernel.Deferred.t
val is_closed : t -> bool
val with_close : t -> f:(unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t
id returns a name for this reader that is unique across all instances of the reader module.
val fd : t -> Async_unix.Fd.t
fd returns the
Fd.t used to create this reader.
val read : t -> ?pos:int -> ?len:int -> Core.Bytes.t -> int Read_result.t Async_kernel.Deferred.t
read t ?pos ?len buf reads up to
len bytes into
buf, blocking until some data is available or EOF is reached. The resulting
0 < i <= len.
val peek : t -> len:int -> string Read_result.t Async_kernel.Deferred.t
peek t ~len peeks exactly
len bytes from
t's buffer. It blocks until
len bytes are available or EOF is reached.
val drain : t -> unit Async_kernel.Deferred.t
drain t reads and ignores all data from
t until it hits EOF, and then closes
type 'a read_one_chunk_at_a_time_result = [
`Stopped of 'a(*
`Eof_with_unconsumed_data smeans that
`Consumed (c, _)and left data in the reader's buffer (i.e.,
c < len), and that the reader reached EOF without reading any more data into the buffer; hence the data in the buffer was never consumed (and never will be, since the reader is at EOF).
`Eof_with_unconsumed_data of string
include sig ... end
val sexp_of_read_one_chunk_at_a_time_result : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a read_one_chunk_at_a_time_result -> Ppx_sexp_conv_lib.Sexp.t
type 'a handle_chunk_result = [
`Stop of 'a(*
`Stop ameans that
lenbytes, and that
read_one_chunk_at_a_timeshould stop reading and return
`Stop_consumed of 'a * int(*
`Stop_consumed (a, n)means that
nbytes, and that
read_one_chunk_at_a_timeshould stop reading and return
handle_chunkhas consumed all
`Consumed of int * [ `Need of int | `Need_unknown ](*
`Consumed (c, need)means that
cbytes were consumed and
needsays how many bytes are needed (including the data remaining in the buffer after the
cwere already consumed). It is an error if
c < 0 || c > len. For
`Need n, it is an error if
n < 0 || c + n <= len.
include sig ... end
val sexp_of_handle_chunk_result : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a handle_chunk_result -> Ppx_sexp_conv_lib.Sexp.t
val read_one_chunk_at_a_time : t -> handle_chunk: (Core.Bigstring.t -> pos:int -> len:int -> 'a handle_chunk_result Async_kernel.Deferred.t) -> 'a read_one_chunk_at_a_time_result Async_kernel.Deferred.t
read_one_chunk_at_a_time t ~handle_chunk reads into
t's internal buffer, and whenever bytes are available, applies
handle_chunk to them. It waits to read again until the deferred returned by
handle_chunk becomes determined. If
read_one_chunk_at_a_time will wait for additional data to arrive before calling
handle_chunk again. Thus,
handle_chunk should consume as much as possible.
read_one_chunk_at_a_time continues reading until it reaches
`Stop_consumed. In the case of
`Stop_consumed, one may read from
`Stop a or
`Continue respects the usual
Iobuf semantics where data up to the
Iobuf.Lo_bound is considered consumed.
include sig ... end
val sexp_of_handle_iobuf_result : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a handle_iobuf_result -> Ppx_sexp_conv_lib.Sexp.t
val read_one_iobuf_at_a_time : t -> handle_chunk: ((Core.read_write, Core.Iobuf.seek) Core.Iobuf.t -> 'a handle_iobuf_result Async_kernel.Deferred.t) -> 'a read_one_chunk_at_a_time_result Async_kernel.Deferred.t
read_one_iobuf_at_a_time is like
read_one_chunk_at_a_time, except that the user-supplied
handle_chunk function receives its data in an
Iobuf.t, and uses the
Iobuf position to communicate how much data was consumed.
read_one_iobuf_at_a_time is implemented as a wrapper around
val read_substring : t -> Core.Substring.t -> int Read_result.t Async_kernel.Deferred.t
read_substring t ss reads up to
Substring.length ss bytes into
ss, blocking until some data is available or EOF is reached. The resulting
0 < i <=
val read_bigsubstring : t -> Core.Bigsubstring.t -> int Read_result.t Async_kernel.Deferred.t
val read_char : t -> char Read_result.t Async_kernel.Deferred.t
val really_read : t -> ?pos:int -> ?len:int -> Core.Bytes.t -> [ `Ok | `Eof of int ] Async_kernel.Deferred.t
really_read t buf ?pos ?len reads until it fills
len bytes of
buf starting at
pos, or runs out of input. In the former case it returns
`Ok. In the latter, it returns
`Eof n where
n is the number of bytes that were read before end of input, and
0 <= n < String.length ss.
val really_read_substring : t -> Core.Substring.t -> [ `Ok | `Eof of int ] Async_kernel.Deferred.t
val really_read_bigsubstring : t -> Core.Bigsubstring.t -> [ `Ok | `Eof of int ] Async_kernel.Deferred.t
val read_until : t -> [ `Pred of char -> bool | `Char of char ] -> keep_delim:bool -> [ `Ok of string | `Eof_without_delim of string | `Eof ] Async_kernel.Deferred.t
read_until t pred ~keep_delim reads until it hits a delimiter
c such that:
pred = `Char c'then
c = c'
pred = `Pred pthen
`Char c' is equivalent to
`Pred (fun c -> c = c') but the underlying implementation is more efficient, in particular it will not call a function on every input character.
read_until returns a freshly-allocated string consisting of all the characters read and optionally including the delimiter as per
val read_until_max : t -> [ `Pred of char -> bool | `Char of char ] -> keep_delim:bool -> max:int -> [ `Ok of string | `Eof_without_delim of string | `Eof | `Max_exceeded of string ] Async_kernel.Deferred.t
read_until_max is just like
read_until, except you have the option of specifying a maximum number of chars to read.
val read_line : t -> string Read_result.t Async_kernel.Deferred.t
read_line t reads up to and including the next newline (
\n) character (or
\r\n) and returns a freshly-allocated string containing everything up to but not including the newline character. If
read_line encounters EOF before the newline char then everything read up to but not including EOF will be returned as a line.
val really_read_line : wait_time:Core.Time.Span.t -> t -> string option Async_kernel.Deferred.t
really_read_line ~wait_time t reads up to and including the next newline (
\n) character and returns an optional, freshly-allocated string containing everything up to but not including the newline character. If
really_read_line encounters EOF before the newline char, then a time span of
wait_time will be used before the input operation is retried. If the descriptor is closed,
None will be returned.
type 'a read = ?parse_pos:Core.Sexp.Parse_pos.t -> 'a
val read_sexp : (t -> Core.Sexp.t Read_result.t Async_kernel.Deferred.t) read
read_sexp t reads the next sexp.
val read_sexps : (t -> Core.Sexp.t Async_kernel.Pipe.Reader.t) read
read_sexps t reads all the sexps and returns them as a pipe. When the reader reaches EOF or the pipe is closed,
read_sexps closes the reader, and then after the reader close is finished, closes the pipe.
val read_bin_prot : ?max_len:int -> t -> 'a Core.Bin_prot.Type_class.reader -> 'a Read_result.t Async_kernel.Deferred.t
read_bin_prot ?max_len t bp_reader reads the next binary protocol message using binary protocol reader
bp_reader. The format is the "size-prefixed binary protocol", in which the length of the data is prefixed as a 64-bit integer to the data. This is the format that
For higher performance, consider
val peek_bin_prot : ?max_len:int -> t -> 'a Core.Bin_prot.Type_class.reader -> 'a Read_result.t Async_kernel.Deferred.t
read_bin_prot, but doesn't consume any bytes from
val read_marshal_raw : t -> Core.Bytes.t Read_result.t Async_kernel.Deferred.t
read_marshal_raw reads and returns a buffer containing one marshaled value, but doesn't unmarshal it. You can just call
Marshal.from_string on the string, and cast it to the desired type (preferably the actual type). Similar to
Marshal.from_channel, but suffers from the String-length limitation (16MB) on 32-bit platforms.
val read_marshal : t -> _ Read_result.t Async_kernel.Deferred.t
read_marshal is like
read_marshal_raw, but unmarshals the value after reading it.
val recv : t -> Core.Bytes.t Read_result.t Async_kernel.Deferred.t
recv returns a string that was written with
val read_all : t -> (t -> 'a Read_result.t Async_kernel.Deferred.t) -> 'a Async_kernel.Pipe.Reader.t
read_all t read_one returns a pipe that receives all values read from
t by repeatedly using
read_one t. When the reader reaches EOF, it closes the reader, and then after the reader close is finished, closes the pipe.
val lseek : t -> int64 -> mode:[< `Set | `End ] -> int64 Async_kernel.Deferred.t
lseek t offset ~mode clears
t's buffer and calls
t's file descriptor. The
`Cur mode is not exposed because seeking relative to the current position of the file descriptor is not the same as seeking relative to the current position of the reader.
val ltell : t -> int64 Async_kernel.Deferred.t
ltell t returns the file position of
t from the perspective of a consumer of
t. It uses
Unix.lseek to find the file position of
t's underlying file descriptor, and then subtracts the number of bytes in
t's buffer that have been read from the OS but not from
val lines : t -> string Async_kernel.Pipe.Reader.t
lines t reads all the lines from
t and puts them in the pipe, one line per pipe element. The lines do not contain the trailing newline. When the reader reaches EOF or the pipe is closed,
lines closes the reader, and then after the reader close is finished, closes the pipe.
val contents : t -> string Async_kernel.Deferred.t
contents t returns the string corresponding to the full contents (up to EOF) of the reader.
t before returning the string.
val file_contents : string -> string Async_kernel.Deferred.t
file_contents file returns the string with the full contents of the file.
val file_lines : string -> string list Async_kernel.Deferred.t
file_lines file returns a list of the lines in the file. The lines do not contain the trailing newline.
type ('sexp, 'a, 'b) load = ?exclusive:bool -> ?expand_macros:bool -> string -> ('sexp -> 'a) -> 'b Async_kernel.Deferred.t
load_sexp file conv loads a sexp from
file and converts it to a value using
conv. This function provides an accurate error location if
load_sexps is similar, but converts a sequence of sexps.
~expand_macros:true expands macros as defined in
~expand_macros:true then the
exclusive flag is ignored. Also,
load_annotated* don't support
~expand_macros:true, and will raise.
val load_sexp : (Core.Sexp.t, 'a, 'a Core.Or_error.t) load
val load_sexp_exn : (Core.Sexp.t, 'a, 'a) load
val load_sexps : (Core.Sexp.t, 'a, 'a list Core.Or_error.t) load
val load_sexps_exn : (Core.Sexp.t, 'a, 'a list) load
val load_annotated_sexp : (Core.Sexp.Annotated.t, 'a, 'a Core.Or_error.t) load
val load_annotated_sexp_exn : (Core.Sexp.Annotated.t, 'a, 'a) load
val load_annotated_sexps : (Core.Sexp.Annotated.t, 'a, 'a list Core.Or_error.t) load
val load_annotated_sexps_exn : (Core.Sexp.Annotated.t, 'a, 'a list) load
type ('a, 'b) load_bin_prot = ?exclusive:bool -> ?max_len:int -> string -> 'a Core.Bin_prot.Type_class.reader -> 'b Async_kernel.Deferred.t
val load_bin_prot : ('a, 'a Core.Or_error.t) load_bin_prot
val load_bin_prot_exn : ('a, 'a) load_bin_prot
module Macro_loader = Async.Reader.Macro_loader
include module type of struct include Reader_ext end
val input_sexps : Async.Reader.t -> Async.Sexp.t list Async.Deferred.t
val open_gzip_file : string -> Async.Reader.t Async.Deferred.t
val with_input_from_process : prog:string -> args:string list -> f:(Async.Reader.t -> 'a Async.Deferred.t) -> 'a Async.Deferred.t
Start a process and read its stdout as input from a Reader.t.
If the process writes anything to stderr it will be thrown as an exception after reading is finished.
val with_gzip_file : string -> f:(Async.Reader.t -> 'a Async.Deferred.t) -> 'a Async.Deferred.t
val with_hadoop_gzip_file : hadoop_file:string -> (Async.Reader.t -> 'a Async.Deferred.t) -> 'a Async.Deferred.t
val with_xzip_file : string -> f:(Async.Reader.t -> 'a Async.Deferred.t) -> 'a Async.Deferred.t