To focus the search input from anywhere on the page, press the 'S' key.
in-package search v0.1.0
Library
Module
Module type
Parameter
Class
Class type
Reader
is Async's main API for buffered input from a file descriptor. It is the analog of Stdio.In_channel
.
Each reader has an internal buffer, which is filled via read()
system calls when data is needed to satisfy a Reader.read*
call.
Each of the read functions returns a deferred that will become determined when the read completes. It is an error to have two simultaneous reads. That is, if you call a read function, you should not call another read function until the first one completes.
If the file descriptor underlying a reader is closed, the reader will return EOF (after all the buffered bytes have been read).
Any Reader.read*
call could, rather than determine its result, send an exception to the monitor in effect when read
was called. Such exceptions can be handled in the usual way by using try_with
, e.g.:
try_with (fun () -> Reader.read reader ...)
module Read_result : sig ... end
module Id : Core.Unique_id
val sexp_of_t : t -> Sexplib0.Sexp.t
include Async_kernel.Invariant.S with type t := t
val invariant : t -> unit
val io_stats : Io_stats.t
Overall IO statistics for all readers.
val last_read_time : t -> Time_unix.t
Returns time of the most recent read
system call that returned data.
val stdin : t Core.Lazy.t
stdin
is a reader for file descriptor 0. It is lazy because we don't want to create it in all programs that happen to link with Async.
val open_file : ?buf_len:int -> string -> t Async_kernel.Deferred.t
open_file file
opens file
for reading and returns a reader reading from it.
This may raise an exception for the typical reasons that an open(2)
system call may fail. If it does raise, it's guaranteed to be a Unix_error
variant.
val transfer :
t ->
string Async_kernel.Pipe.Writer.t ->
unit Async_kernel.Deferred.t
transfer t pipe_w
transfers data from t
into pipe_w
one chunk at a time (whatever is read from the underlying file descriptor without post-processing). The result becomes determined after reaching EOF on t
and the final bytes have been transferred, or if pipe_w
is closed.
This function will normally not be needed (see pipe
).
val pipe : t -> string Async_kernel.Pipe.Reader.t
pipe t
returns the reader end of a pipe that will continually be filled with chunks of data from the underlying Reader.t
. When the reader reaches EOF or the pipe is closed, pipe
closes the reader, and then after the reader close is finished, closes the pipe.
val of_pipe :
Core.Info.t ->
string Async_kernel.Pipe.Reader.t ->
t Async_kernel.Deferred.t
of_pipe info pipe_r
returns a reader t
that receives all the data from pipe_r
. If pipe_r
is closed, t
will see an EOF (but will not be automatically closed). If t
is closed, then pipe_r
will stop being drained.
of_pipe
is implemented by shuttling bytes from pipe_r
to the write-end of a Unix pipe, with t
being attached to the read end of the Unix pipe.
create ~buf_len fd
creates a new reader that is reading from fd
.
val of_in_channel : Core.In_channel.t -> Fd.Kind.t -> t
val with_file :
?buf_len:int ->
?exclusive:bool ->
string ->
f:(t -> 'a Async_kernel.Deferred.t) ->
'a Async_kernel.Deferred.t
with_file file f
opens file
, creates a reader with it, and passes the reader to f
. It closes the reader when the result of f
becomes determined, and returns f
's result.
This may raise an exception for the typical reasons that an open(2)
system call may fail. If it does raise before f
is called, it's guaranteed to be a Unix_error
variant.
Note: You need to be careful that all your IO is done when the deferred you return becomes determined. If for example you use with_file
and call lines
, make sure you return a deferred that becomes determined when the EOF is reached on the pipe, not when you get the pipe (because you get it straight away).
exclusive = true
uses a filesystem lock to try and make sure that the file is not read while it's being modified. This is an advisory lock, which means that the writer must be cooperating by taking a relevant lock when writing (see Writer.with_file
). This is unrelated and should not be confused with the O_EXCL
flag in open
systemcall. Note that the implementation uses Unix.lockf
, which has known pitfalls. It's recommended that you avoid the exclusive
flag in favor of using a library dedicated to dealing with file locks where the pitfalls can be documented in detail.
val close : t -> unit Async_kernel.Deferred.t
close t
prevents further use of t
and closes t
's underlying file descriptor. The result of close
becomes determined once the underlying file descriptor has been closed. It is an error to call other operations on t
after close t
has been called, except that calls of close
subsequent to the original call to close
will return the same deferred as the original call.
close_finished t
becomes determined after t
's underlying file descriptor has been closed, i.e., it is the same as the result of close
. close_finished
differs from close
in that it does not have the side effect of initiating a close.
is_closed t
returns true
iff close t
has been called.
with_close t ~f
runs f ()
, and closes t
after f
finishes or raises.
val close_finished : t -> unit Async_kernel.Deferred.t
val is_closed : t -> bool
val with_close :
t ->
f:(unit -> 'a Async_kernel.Deferred.t) ->
'a Async_kernel.Deferred.t
id
returns a name for this reader that is unique across all instances of the reader module.
val read :
t ->
?pos:int ->
?len:int ->
Core.Bytes.t ->
int Read_result.t Async_kernel.Deferred.t
read t ?pos ?len buf
reads up to len
bytes into buf
, blocking until some data is available or EOF is reached. The resulting i
satisfies 0 < i <= len
.
val peek : t -> len:int -> string Read_result.t Async_kernel.Deferred.t
peek t ~len
peeks exactly len
bytes from t
's buffer. It blocks until len
bytes are available or EOF is reached.
val bytes_available : t -> int
Reports how many bytes of data are currently in the reader's buffer.
val read_available : t -> ?pos:int -> ?len:int -> Core.Bytes.t -> int
Consumes data from the reader's buffer without performing any additional I/O.
val peek_available : t -> len:int -> string
Reads up to len
bytes from the reader's buffer without consuming it and without performing any additional I/O.
val drain : t -> unit Async_kernel.Deferred.t
drain t
reads and ignores all data from t
until it hits EOF, and then closes t
.
type 'a read_one_chunk_at_a_time_result = [
|
`Eof
|
`Stopped of 'a
(*
*)`Eof_with_unconsumed_data s
means thathandle_chunk
returned`Consumed (c, _)
and left data in the reader's buffer (i.e.,c < len
), and that the reader reached EOF without reading any more data into the buffer; hence the data in the buffer was never consumed (and never will be, since the reader is at EOF).|
`Eof_with_unconsumed_data of string
]
val sexp_of_read_one_chunk_at_a_time_result :
('a -> Sexplib0.Sexp.t) ->
'a read_one_chunk_at_a_time_result ->
Sexplib0.Sexp.t
type 'a handle_chunk_result = [
|
`Stop of 'a
(*
*)`Stop a
means thathandle_chunk
consumed alllen
bytes, and thatread_one_chunk_at_a_time
should stop reading and return`Stopped a
.|
`Stop_consumed of 'a * int
(*
*)`Stop_consumed (a, n)
means thathandle_chunk
consumedn
bytes, and thatread_one_chunk_at_a_time
should stop reading and return`Stopped a
.|
`Continue
(*
*)`Continue
means thathandle_chunk
has consumed alllen
bytes.|
`Consumed of int * [ `Need of int | `Need_unknown ]
(*
*)`Consumed (c, need)
means thatc
bytes were consumed andneed
says how many bytes are needed (including the data remaining in the buffer after thec
were already consumed). It is an error ifc < 0 || c > len
. For`Need n
, it is an error ifn < 0 || c + n <= len
.
]
val sexp_of_handle_chunk_result :
('a -> Sexplib0.Sexp.t) ->
'a handle_chunk_result ->
Sexplib0.Sexp.t
val read_one_chunk_at_a_time :
t ->
handle_chunk:
(Core.Bigstring.t ->
pos:int ->
len:int ->
'a handle_chunk_result Async_kernel.Deferred.t) ->
'a read_one_chunk_at_a_time_result Async_kernel.Deferred.t
read_one_chunk_at_a_time t ~handle_chunk
reads into t
's internal buffer, and whenever bytes are available, applies handle_chunk
to them. It waits to read again until the deferred returned by handle_chunk
becomes determined. If handle_chunk
returns `Consumed
, then read_one_chunk_at_a_time
will wait for additional data to arrive before calling handle_chunk
again. Thus, handle_chunk
should consume as much as possible.
read_one_chunk_at_a_time
continues reading until it reaches `Eof
or handle_chunk
returns `Stop
or `Stop_consumed
. In the case of `Stop
and `Stop_consumed
, one may read from t
after read_one_chunk_at_a_time
returns.
`Stop a
or `Continue
respects the usual Iobuf
semantics where data up to the Iobuf.Lo_bound
is considered consumed.
val sexp_of_handle_iobuf_result :
('a -> Sexplib0.Sexp.t) ->
'a handle_iobuf_result ->
Sexplib0.Sexp.t
val read_one_iobuf_at_a_time :
t ->
handle_chunk:
((Core.read_write, Iobuf.seek) Iobuf.t ->
'a handle_iobuf_result Async_kernel.Deferred.t) ->
'a read_one_chunk_at_a_time_result Async_kernel.Deferred.t
read_one_iobuf_at_a_time
is like read_one_chunk_at_a_time
, except that the user-supplied handle_chunk
function receives its data in an Iobuf.t
, and uses the Iobuf
position to communicate how much data was consumed. read_one_iobuf_at_a_time
is implemented as a wrapper around read_one_chunk_at_a_time
.
val read_substring :
t ->
Core.Substring.t ->
int Read_result.t Async_kernel.Deferred.t
read_substring t ss
reads up to Substring.length ss
bytes into ss
, blocking until some data is available or EOF is reached. The resulting i
satisfies 0 < i <=
Substring.length ss
.
val read_bigsubstring :
t ->
Core.Bigsubstring.t ->
int Read_result.t Async_kernel.Deferred.t
val read_char : t -> char Read_result.t Async_kernel.Deferred.t
val really_read :
t ->
?pos:int ->
?len:int ->
Core.Bytes.t ->
[ `Ok | `Eof of int ] Async_kernel.Deferred.t
really_read t buf ?pos ?len
reads until it fills len
bytes of buf
starting at pos
, or runs out of input. In the former case it returns `Ok
. In the latter, it returns `Eof n
where n
is the number of bytes that were read before end of input, and 0 <= n < String.length ss
.
val really_read_substring :
t ->
Core.Substring.t ->
[ `Ok | `Eof of int ] Async_kernel.Deferred.t
val really_read_bigsubstring :
t ->
Core.Bigsubstring.t ->
[ `Ok | `Eof of int ] Async_kernel.Deferred.t
val read_until :
t ->
[ `Pred of char -> bool | `Char of char ] ->
keep_delim:bool ->
[ `Ok of string | `Eof_without_delim of string | `Eof ]
Async_kernel.Deferred.t
read_until t pred ~keep_delim
reads until it hits a delimiter c
such that:
- if
pred = `Char c'
thenc = c'
- if
pred = `Pred p
thenp c
`Char c'
is equivalent to `Pred (fun c -> c = c')
but the underlying implementation is more efficient, in particular it will not call a function on every input character.
read_until
returns a freshly-allocated string consisting of all the characters read and optionally including the delimiter as per keep_delim
.
val read_until_bounded :
t ->
[ `Pred of char -> bool | `Char of char ] ->
keep_delim:bool ->
max:int ->
[ `Ok of string
| `Eof_without_delim of string
| `Eof
| `Max_exceeded of string ]
Async_kernel.Deferred.t
read_until_bounded
is just like read_until
, except you have the option of specifying a maximum number of chars to read (not including the separator).
When `Max_exceeded str
is returned, the length of str
will be equal to max + 1
.
val read_line : t -> string Read_result.t Async_kernel.Deferred.t
read_line t
reads up to and including the next newline (\n
) character (or \r\n
) and returns a freshly-allocated string containing everything up to but not including the newline character. If read_line
encounters EOF before the newline char then everything read up to but not including EOF will be returned as a line.
val really_read_line :
wait_time:Time_unix.Span.t ->
t ->
string option Async_kernel.Deferred.t
really_read_line ~wait_time t
reads up to and including the next newline (\n
) character and returns an optional, freshly-allocated string containing everything up to but not including the newline character. If really_read_line
encounters EOF before the newline char, then a time span of wait_time
will be used before the input operation is retried. If the descriptor is closed, None
will be returned.
type 'a read = ?parse_pos:Core.Sexp.Parse_pos.t -> 'a
val read_sexp : (t -> Core.Sexp.t Read_result.t Async_kernel.Deferred.t) read
read_sexp t
reads the next sexp.
val read_sexps : (t -> Core.Sexp.t Async_kernel.Pipe.Reader.t) read
read_sexps t
reads all the sexps and returns them as a pipe. When the reader reaches EOF or the pipe is closed, read_sexps
closes the reader, and then after the reader close is finished, closes the pipe.
val read_annotated_sexps :
(t -> Core.Sexp.Annotated.t Async_kernel.Pipe.Reader.t) read
val read_bin_prot :
?max_len:int ->
t ->
'a Core.Bin_prot.Type_class.reader ->
'a Read_result.t Async_kernel.Deferred.t
read_bin_prot ?max_len t bp_reader
reads the next binary protocol message using binary protocol reader bp_reader
. The format is the "size-prefixed binary protocol", in which the length of the data is prefixed as a 64-bit integer to the data. This is the format that Writer.write_bin_prot
writes.
For higher performance, consider Unpack_sequence.unpack_bin_prot_from_reader
.
val peek_bin_prot :
?max_len:int ->
t ->
'a Core.Bin_prot.Type_class.reader ->
'a Read_result.t Async_kernel.Deferred.t
Similar to read_bin_prot
, but doesn't consume any bytes from t
.
val read_marshal_raw : t -> Core.Bytes.t Read_result.t Async_kernel.Deferred.t
read_marshal_raw
reads and returns a buffer containing one marshaled value, but doesn't unmarshal it. You can just call Marshal.from_string
on the string, and cast it to the desired type (preferably the actual type). Similar to Marshal.from_channel
, but suffers from the String-length limitation (16MB) on 32-bit platforms.
val read_marshal : t -> _ Read_result.t Async_kernel.Deferred.t
read_marshal
is like read_marshal_raw
, but unmarshals the value after reading it.
val recv : t -> Core.Bytes.t Read_result.t Async_kernel.Deferred.t
recv
returns a string that was written with Writer.send
.
val read_all :
t ->
(t -> 'a Read_result.t Async_kernel.Deferred.t) ->
'a Async_kernel.Pipe.Reader.t
read_all t read_one
returns a pipe that receives all values read from t
by repeatedly using read_one t
. When the reader reaches EOF, it closes the reader, and then after the reader close is finished, closes the pipe.
val lseek :
t ->
int64 ->
mode:[< `Set | `End ] ->
int64 Async_kernel.Deferred.t
lseek t offset ~mode
clears t
's buffer and calls Unix.lseek
on t
's file descriptor. The `Cur
mode is not exposed because seeking relative to the current position of the file descriptor is not the same as seeking relative to the current position of the reader.
val ltell : t -> int64 Async_kernel.Deferred.t
ltell t
returns the file position of t
from the perspective of a consumer of t
. It uses Unix.lseek
to find the file position of t
's underlying file descriptor, and then subtracts the number of bytes in t
's buffer that have been read from the OS but not from t
.
val lines : t -> string Async_kernel.Pipe.Reader.t
lines t
reads all the lines from t
and puts them in the pipe, one line per pipe element. The lines do not contain the trailing newline. When the reader reaches EOF or the pipe is closed, lines
closes the reader, and then after the reader close is finished, closes the pipe.
val contents : t -> string Async_kernel.Deferred.t
contents t
returns the string corresponding to the full contents (up to EOF) of the reader. contents
closes t
before returning the string.
val file_contents : string -> string Async_kernel.Deferred.t
file_contents file
returns the string with the full contents of the file.
val file_lines : string -> string list Async_kernel.Deferred.t
file_lines file
returns a list of the lines in the file. The lines do not contain the trailing newline.
load_sexp file conv
loads a sexp from file
and converts it to a value using conv
. This function provides an accurate error location if convert
raises Of_sexp_error
.
load_sexps
is similar, but converts a sequence of sexps.
type ('sexp, 'a, 'b) load =
?exclusive:bool ->
string ->
('sexp -> 'a) ->
'b Async_kernel.Deferred.t
val load_sexp : (Core.Sexp.t, 'a, 'a Core.Or_error.t) load
val load_sexp_exn : (Core.Sexp.t, 'a, 'a) load
val load_sexps : (Core.Sexp.t, 'a, 'a list Core.Or_error.t) load
val load_sexps_exn : (Core.Sexp.t, 'a, 'a list) load
val load_annotated_sexp : (Core.Sexp.Annotated.t, 'a, 'a Core.Or_error.t) load
val load_annotated_sexp_exn : (Core.Sexp.Annotated.t, 'a, 'a) load
val load_annotated_sexps :
(Core.Sexp.Annotated.t, 'a, 'a list Core.Or_error.t) load
val load_annotated_sexps_exn : (Core.Sexp.Annotated.t, 'a, 'a list) load
type ('a, 'b) load_bin_prot =
?exclusive:bool ->
?max_len:int ->
string ->
'a Core.Bin_prot.Type_class.reader ->
'b Async_kernel.Deferred.t
val load_bin_prot : ('a, 'a Core.Or_error.t) load_bin_prot
val load_bin_prot_exn : ('a, 'a) load_bin_prot
module For_testing : sig ... end