clojure.core.async.flow

Note - Alpha, work-in-progress, names and other details are in flux 

A library for building concurrent, event driven data processing
flows out of communication-free functions, while centralizing
control, reporting, execution and error handling. Built on core.async.

The top-level construct is the flow, comprising:
a set of processes (generally, threads) - concurrent activities
a set of channels flowing data into and out of the processes
a set of channels for centralized control, reporting, error-handling,
  and execution of the processes

A flow is constructed from flow configuration data which defines a
directed graph of processes and the connections between
them. Processes describe their I/O requirements and the
flow (library) itself creates channels and passes them to the
processes that requested them. See 'create-flow' for the
details. The flow configuration provides a centralized place for
policy decisions regarding process settings, threading, buffering etc.

It is expected that applications will rarely define instances of the
process protocol but instead use the API functions here, 'process'
and 'step-process', that implement the process protocol in terms of
calls to ordinary functions that might include no communication or
core.async code. In this way the library helps you achieve a strict
separation of your application logic from its execution,
communication, lifecycle, error handling and monitoring.

Note that at several points the library calls upon the user to
supply ids for processes, inputs, outputs etc. These should be
keywords. When a namespaced keyword is required it is explicitly
stated. This documentation refers to various keywords utilized by
the library itself as ::flow/xyz, where ::flow is an alias for
clojure.core.async.flow

A process is represented in the flow definition by an implementation
of spi/ProcLauncher that starts it. See the spi docs for
details.

command-proc

(command-proc g pid cmd-id more-kvs)
synchronously sends a process-specific command with the given id and
additional kvs to the process. The cmd-id must be ns-qualified with
a ns you own.

create-flow

(create-flow config)
Creates a flow from the supplied configuration: a map containing the
keys :procs and :conns, and optionally :mixed-exec/:io-exec/:compute-exec

:procs - a map of pid->proc-def
where proc-def is a map with keys :proc, :args, :chan-opts

:proc - a function that starts a process
:args - a map of param->val which will be passed to the process ctor
:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
             and xform have their meanings per core.async/chan
             the default is {:buf-or-n 10}

:conns - a collection of [[from-pid outid] [to-pid inid]] tuples.

Inputs and outputs support multiple connections. When an output is
connected multiple times every connection will get every message,
as per a core.async/mult.

:mixed-exec/:io-exec/:compute-exec -> ExecutorService
These can be used to specify the ExecutorService to use for the
corresonding workload, in lieu of the lib defaults.

N.B. The flow is not started. See 'start'

futurize

(futurize f & {:keys [exec], :or {exec :mixed}, :as opts})
Takes a fn f and returns a fn that takes the same arguments as f
and immediately returns a future, having started a thread for the
indicated workload, or via the supplied executor, that invokes f
with those args and completes that future with its return.

futurize accepts kwarg options:
:exec - one of the workloads :mixed, :io, :compute
        or a j.u.c.ExecutorService object,
        default :mixed

inject

(inject g [pid io-id :as coord] msgs)
asynchronously puts the messages on the channel corresponding to the
input or output of the process, returning a future that will
complete when done.

lift*->step

(lift*->step f)
given a fn f taking one arg and returning a collection of non-nil
values, create a 'step' fn as needed by step-process, with one input
and one output (named :in and :out), and no state.

lift1->step

(lift1->step f)
like lift*->step except taking a fn returning one value, which when
nil will yield no output.

pause

(pause g)
pauses a running flow

pause-proc

(pause-proc g pid)
pauses a process

ping

(ping g & {:keys [timeout-ms], :or {timeout-ms 1000}})
pings all processes, returning a map of pid -> proc status and
state, for those procs that reply within timeout-ms (default 1000)

ping-proc

(ping-proc g pid & {:keys [timeout-ms], :or {timeout-ms 1000}})
like ping, but just pings the specified process

process

(process fn-or-map)(process fn-or-map {:keys [workload timeout-ms], :or {timeout-ms 5000}, :as opts})
Given a function of four arities (0-3), or a map of functions
corresponding thereto (described below), returns a launcher that
creates a process compliant with the process protocol (see the
spi/ProcLauncher doc).

The possible arities/entries for fn/map are 0 - :describe, 1
- :init, 2 - :transition and 3 - :transform. This is the core
facility for defining the logic for processes via ordinary
functions. Using a var holding a fn as the 'fn' is the preferred
method for defining a proc, as it enables hot-code-reloading of the
proc logic in a flow, and better names in datafy. You can use the
map form to compose the proc logic from disparate functions or to
leverage the optionality of some of the entry points.

arity 0, or :describe - required, () -> description
where description is a map with keys :params :ins and :outs, each of which
in turn is a map of keyword to doc string, and :workload with
possible values of :mixed :io :compute. All entries in the describe
return map are optional.

:params describes the initial arguments to setup the state for the function.
:ins enumerates the input[s], for which the flow will create channels
:outs enumerates the output[s], for which the flow may create channels.
:workload - describes the nature of the workload, one of :mixed :io or :compute
        an :io workload should not do extended computation
        a :compute workload should never block

No key may be present in both :ins and :outs, allowing for a uniform
channel coordinate system of [:process-id :channel-id]. The
ins/outs/params returned will be the ins/outs/params of the
process. describe may be called by users to understand how to use
the proc. It will also be called by the impl in order to discover
what channels are needed.

arity 1, or :init - optional, (arg-map) -> initial-state

init will be called once by the process to establish any initial
state. The arg-map will be a map of param->val, as supplied in the
flow def. init must be provided if 'describe' returns :params.

Optionally, a returned init state may contain the
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
of cid -> a core.async.channel. The cids must not conflict with the
in/out ids. These channels will become part of the input/output set
of the process, but are not otherwise visible/resolvable within the
flow. Ports are a way to allow data to enter or exit the flow from
outside of it. Use :transition to coordinate the lifecycle of these
external channels.

Optionally, _any_ returned state, whether from :init, :transition
or :transform, may contain the key ::flow/input-filter, a predicate
of cid. Only inputs (including in-ports) satisfying the predicate
will be part of the next channel read set. In the absence of this
predicate all inputs are read.

arity 2, or :transition - optional, (state transition) -> state'

transition will be called when the process makes a state transition,
transition being one of ::flow/resume, ::flow/pause or ::flow/stop

With this fn a process impl can track changes and coordinate
resources, especially cleaning up any resources on :stop, since the
process will no longer be used following that. See the SPI for
details. state' will be the state supplied to subsequent calls.

arity 3, or :transform - required, (state in-name msg) -> [state' output]
where output is a map of outid->[msgs*]

The transform fn will be called every time a message arrives at any
of the inputs. Output can be sent to none, any or all of the :outs
enumerated, and/or an input named by a [pid inid] tuple (e.g. for
reply-to), and/or to the ::flow/report output. A step need not
output at all (output or msgs can be empyt/nil), however an output _message_
may never be nil (per core.async channels). state' will be the state
supplied to subsequent calls.

process accepts an option map with keys:
:workload - one of :mixed, :io or :compute
:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
              will be used when getting the return from the future - see below

A :workload supplied as an option to process will override
any :workload returned by the :describe fn of the process. If neither
are provded the default is :mixed.

In the :workload context of :mixed or :io, this dictates the type of
thread in which the process loop will run, _including its calls to
transform_. 

When :io is specified, transform should not do extensive computation.

When :compute is specified (only allowed for :transform), each call
to transform will be run in a separate thread. The process loop will
run in an :io context (since it no longer directly calls transform,
all it does is I/O) and it will submit transform to the :compute
executor then await (blocking, for compute-timeout-ms) the
completion of the future returned by the executor. If the future
times out it will be reported on ::flow/error.

When :compute is specified transform must not block!

resume

(resume g)
resumes a paused flow

resume-proc

(resume-proc g pid)
resumes a process

start

(start g)
starts the entire flow from init values. The processes start paused.
Call 'resume' or 'resume-proc' to start flow.  returns a map with keys:

:report-chan - a core.async chan for reading.'ping' reponses
will show up here, as will any explicit ::flow/report outputs
from :transform

:error-chan - a core.async chan for reading. Any (and only)
exceptions thrown anywhere on any thread inside a flow will appear
in maps sent here. There will at least be a ::flow/ex entry with the
exception, and may be additional keys for pid, state, status etc
depending on the context of the error.

stop

(stop g)
shuts down the flow, stopping all procsesses and closing the error
and report channels. The flow can be started again