Parallel_scan
* Parallel_scan
describes a signature of a solution for the following * problem statement: * * Efficiently compute a periodic scan on an infinite stream pumping at some * target rate, prefer maximizing throughput, then minimizing latency, then * minimizing size of state on an infinite core machine. * * A periodic scan is a scan that only returns incremental results of some fold * every so often, and usually returns None
. It requires a way to lift some * data 'd
into the space 'a
where an associative fold operation exists. * * The actual work of the scan is handled out-of-band, so here we expose an * interface of the intermediate state of some on-going long scan. * * Conceptually, you can imagine having a series of virtual trees where data * starts at the base and works its way up to the top, at which point we emit * the next incremental result. The implementation, for efficiency, does not * actually construct these trees, but succinctly uses a ring buffer. The * state of the scan are these incomplete virtual trees. * * Specifically, the state of this scan is has the following primary operations: * * empty
to create the initial state * * update
adding raw data that will be lifted and processed later; adding * merges for the completed raw/merged data. This moves us closer to emitting * something from a tree * * next_jobs
to get the next work to complete from this data *
module Sequence_number : sig ... end
module Job_status : sig ... end
Each node on the tree is viewed as a job that needs to be completed. When a job is completed, it creates a new "Todo" job and marks the old job as "Done"
module Weight : sig ... end
number of jobs that can be added to this tree. This number corresponding to a specific level of the tree. New jobs received is distributed across the tree based on this number.
module Base : sig ... end
Base Job: Proving new transactions
module Merge : sig ... end
Merge Job: Merging two proofs
module Available_job : sig ... end
An available job is an incomplete job that has enough information for one * to process it into a completed job
module Space_partition : sig ... end
Space available and number of jobs required to enqueue data. first = space on the current tree and number of jobs required to be completed second = If the current-tree space is less than <max_base_jobs> then remaining number of slots on a new tree and the corresponding job count.
module Job_view : sig ... end
module State : sig ... end
val empty : max_base_jobs:int -> delay:int -> ( 'merge, 'base ) State.t
The initial state of the parallel scan at some parallelism
val all_jobs :
( 'merge, 'base ) State.t ->
( 'merge, 'base ) Available_job.t list list
Get all the available jobs
val jobs_for_next_update :
( 'merge, 'base ) State.t ->
( 'merge, 'base ) Available_job.t list list
Get all the available jobs to be done in the next update
val jobs_for_slots :
( 'merge, 'base ) State.t ->
slots:int ->
( 'merge, 'base ) Available_job.t list list
Get all the available jobs to be done for the given # slots to be occupied
val free_space : ( 'merge, 'base ) State.t -> int
Compute how much data 'd
elements we are allowed to add to the state
val update :
data:'base list ->
completed_jobs:'merge list ->
( 'merge, 'base ) State.t ->
(('merge * 'base list) option * ( 'merge, 'base ) State.t)
Core_kernel.Or_error.t
Complete jobs needed at this state -- optionally emits the 'a
at the top * of the tree along with the 'd list
responsible for emitting the 'a
.
val last_emitted_value :
( 'merge, 'base ) State.t ->
('merge * 'base list) option
The last 'a
we emitted from the top of the tree and the 'd list
* responsible for that 'a
.
val partition_if_overflowing : ( 'merge, 'base ) State.t -> Space_partition.t
If there aren't enough slots for max_slots
many 'd
, then before * continuing onto the next virtual tree, split max_slots = (x,y) such that * x = number of slots till the end of the current tree and y = max_slots - x * (starts from the begining of the next tree)
val current_job_sequence_number : ( 'merge, 'base ) State.t -> int
Get the current job sequence number
val view_jobs_with_position :
( 'merge, 'base ) State.t ->
( 'merge -> 'c ) ->
( 'base -> 'c ) ->
'c Job_view.t list list
Each list corresponds to the jobs on one of the trees
val base_jobs_on_latest_tree : ( 'merge, 'base ) State.t -> 'base list
All the base jobs that are part of the latest tree being filled * i.e., does not include base jobs that are part of previous trees not * promoted to the merge jobs yet
val base_jobs_on_earlier_tree :
( 'merge, 'base ) State.t ->
index:int ->
'base list
All the base jobs that are part of a tree before the latest tree index is 0-based, 0 is the next-to-latest tree
val next_on_new_tree : ( 'merge, 'base ) State.t -> bool
Returns true only if the next 'd that could be enqueued is on a new tree
val pending_data : ( 'merge, 'base ) State.t -> 'base list
All the 'ds (in the order in which they were added) for which scan results are yet to computed
val update_metrics : ( 'merge, 'base ) State.t -> unit Core_kernel.Or_error.t
update tree level metrics