Module Parallel_scan

* Parallel_scan describes a signature of a solution for the following * problem statement: * * Efficiently compute a periodic scan on an infinite stream pumping at some * target rate, prefer maximizing throughput, then minimizing latency, then * minimizing size of state on an infinite core machine. * * A periodic scan is a scan that only returns incremental results of some fold * every so often, and usually returns None. It requires a way to lift some * data 'd into the space 'a where an associative fold operation exists. * * The actual work of the scan is handled out-of-band, so here we expose an * interface of the intermediate state of some on-going long scan. * * Conceptually, you can imagine having a series of virtual trees where data * starts at the base and works its way up to the top, at which point we emit * the next incremental result. The implementation, for efficiency, does not * actually construct these trees, but succinctly uses a ring buffer. The * state of the scan are these incomplete virtual trees. * * Specifically, the state of this scan is has the following primary operations: * * empty to create the initial state * * update adding raw data that will be lifted and processed later; adding * merges for the completed raw/merged data. This moves us closer to emitting * something from a tree * * next_jobs to get the next work to complete from this data *

module Sequence_number : sig ... end
module Job_status : sig ... end

Each node on the tree is viewed as a job that needs to be completed. When a job is completed, it creates a new "Todo" job and marks the old job as "Done"

module Weight : sig ... end

number of jobs that can be added to this tree. This number corresponding to a specific level of the tree. New jobs received is distributed across the tree based on this number.

module Base : sig ... end

Base Job: Proving new transactions

module Merge : sig ... end

Merge Job: Merging two proofs

module Available_job : sig ... end

An available job is an incomplete job that has enough information for one * to process it into a completed job

module Space_partition : sig ... end

Space available and number of jobs required to enqueue data. first = space on the current tree and number of jobs required to be completed second = If the current-tree space is less than <max_base_jobs> then remaining number of slots on a new tree and the corresponding job count.

module Job_view : sig ... end
module State : sig ... end
val empty : max_base_jobs:int -> delay:int -> ( 'merge, 'base ) State.t

The initial state of the parallel scan at some parallelism

val all_jobs : ( 'merge, 'base ) State.t -> ( 'merge, 'base ) Available_job.t list list

Get all the available jobs

val jobs_for_next_update : ( 'merge, 'base ) State.t -> ( 'merge, 'base ) Available_job.t list list

Get all the available jobs to be done in the next update

val jobs_for_slots : ( 'merge, 'base ) State.t -> slots:int -> ( 'merge, 'base ) Available_job.t list list

Get all the available jobs to be done for the given # slots to be occupied

val free_space : ( 'merge, 'base ) State.t -> int

Compute how much data 'd elements we are allowed to add to the state

val update : data:'base list -> completed_jobs:'merge list -> ( 'merge, 'base ) State.t -> (('merge * 'base list) option * ( 'merge, 'base ) State.t) Core_kernel.Or_error.t

Complete jobs needed at this state -- optionally emits the 'a at the top * of the tree along with the 'd list responsible for emitting the 'a.

val last_emitted_value : ( 'merge, 'base ) State.t -> ('merge * 'base list) option

The last 'a we emitted from the top of the tree and the 'd list * responsible for that 'a.

val partition_if_overflowing : ( 'merge, 'base ) State.t -> Space_partition.t

If there aren't enough slots for max_slots many 'd, then before * continuing onto the next virtual tree, split max_slots = (x,y) such that * x = number of slots till the end of the current tree and y = max_slots - x * (starts from the begining of the next tree)

val current_job_sequence_number : ( 'merge, 'base ) State.t -> int

Get the current job sequence number

val view_jobs_with_position : ( 'merge, 'base ) State.t -> ( 'merge -> 'c ) -> ( 'base -> 'c ) -> 'c Job_view.t list list

Each list corresponds to the jobs on one of the trees

val base_jobs_on_latest_tree : ( 'merge, 'base ) State.t -> 'base list

All the base jobs that are part of the latest tree being filled * i.e., does not include base jobs that are part of previous trees not * promoted to the merge jobs yet

val base_jobs_on_earlier_tree : ( 'merge, 'base ) State.t -> index:int -> 'base list

All the base jobs that are part of a tree before the latest tree index is 0-based, 0 is the next-to-latest tree

val next_on_new_tree : ( 'merge, 'base ) State.t -> bool

Returns true only if the next 'd that could be enqueued is on a new tree

val pending_data : ( 'merge, 'base ) State.t -> 'base list

All the 'ds (in the order in which they were added) for which scan results are yet to computed

val update_metrics : ( 'merge, 'base ) State.t -> unit Core_kernel.Or_error.t

update tree level metrics