Programmable Pregel Algorithms
This feature is experimental and under active development. The naming and interfaces may change at any time. Execution times are not representative of the final product.
Pregel is a system for large scale graph processing. It is already implemented in ArangoDB and can be used with predefined algorithms, e.g. PageRank, Single-Source Shortest Path and Connected components.
Programmable Pregel Algorithms (PPA) are based on the already existing ArangoDB Pregel engine. The big change here is the possibility to write and execute your own defined algorithms.
The best part: You can write, develop and execute your custom algorithms without having to plug C++ code into ArangoDB (and re-compile). Algorithms can be defined and executed in a running ArangoDB instance without the need of restarting your instance.
Requirements
PPAs can be run on a single-server instance but as it is designed to run in parallel in a distributed environment, you will only be able to add computing power in a clustered environment. Also PPAs do require proper graph sharding to be efficient. Using SmartGraphs is the recommend way to run Pregel algorithms.
As this is an extension of the native Pregel framework, the same prerequisites and requirements apply.
Basics
A Pregel computation consists of a sequence of iterations, each one of them is called a superstep. During a superstep, the custom algorithm will be executed for each vertex. This is happening in parallel, as the vertices are communicating via messages and not with shared memory.
The basic methods are (we are in superstep round S here):
- Read messages which are sent to the vertex V in the previous superstep (S-1)
- Send messages to other vertices that will be received in the next superstep (S+1)
- Modify the state of the vertex V
- Vote to Halt (mark a vertex V as “done”. V will be inactive in S+1, but it is possible to re-activate)
Definition of a custom algorithm
The format of a custom algorithm right now is based on a JSON object.
Algorithm skeleton
{
"resultField": "<string>",
"maxGSS": "<number>",
"dataAccess": {
"writeVertex": "<program>",
"readVertex": "<array>",
"readEdge": "<array>"
},
"vertexAccumulators": "<object>",
"globalAccumulators": "<object>",
"customAccumulators": "<object>",
"phases": "<array>"
}
Algorithm parameters
-
resultField (string, optional): Name of the document attribute to store the result in. The system replaces the attributes value with an object, mapping accumulator names to their values.
-
maxGSS (number, required): The max amount of global supersteps.
After the amount of max defined supersteps is reached, the Pregel execution will stop.
-
dataAccess (object, optional): Allows to define
writeVertex
,readVertex
andreadEdge
.-
writeVertex: An AIR program that is used to write the results into vertices. If
writeVertex
is used, theresultField
must not be set. - readVertex: An
array
that consists ofstrings
and/or additionalarrays
(that represents a path).string
: Represents a single attribute at the top level.array of strings
: Represents a nested path
- readEdge: An
array
that consists ofstrings
and/or additionalarrays
(that represents a path).string
: Represents a single attribute at the top level.array of strings
: Represents a nested path
readVertex
andreadEdge
are used to modify the associated data for a vertex or edge. If not provided the default behavior is to load the whole document. -
-
vertexAccumulators (object, optional): Definition of all used vertex accumulators.
-
globalAccumulators (object, optional): Definition all used global accumulators. Global Accumulators are able to access variables at shared global level.
-
customAccumulators (object, optional): Definition of all used custom accumulators.
-
phases (array): Array of a single or multiple phase definitions.
-
debug (optional): See Debugging.
Phases
Phases will run sequentially during your Pregel computation. The definition of multiple phases is allowed. Each phase requires instructions based on the operations you want to perform. The initialization program (1) will run a single time in the very first round. All upcoming rounds will execute the update program (2).
In each phase, the Pregel program execution will follow the order:
Step 1: Initialization
onPreStep
(Coordinator, executed on Coordinator instances)initProgram
(Worker, executed on DB-Server instances)onPostStep
(Coordinator)
Step 2 (+n): Computation
onPreStep
(Coordinator)updateProgram
(Worker)onPostStep
(Coordinator)
Phase parameters
-
name (string, required): Phase name.
The given name of the defined phase.
-
onPreStep: Program to be executed.
The onPreStep program will run once before each Pregel execution round.
-
initProgram: Program to be executed.
The init program will run initially once per all vertices that are part of your graph.
-
updateProgram: Program to be executed.
The updateProgram will be executed during every Pregel execution round and each per vertex.
-
onPostStep: Program to be executed.
The onPostStep program will run once after each Pregel execution round.
All programs are specified as AIR programs.
The return value of initProgram
resp. updateProgram
is inspected. It must
be one of the following:
"vote-halt"
orfalse
: indicates that this vertex voted halt."vote-active"
ortrue
: indicates that this vertex voted active and is active in the next round.
Debugging
Using the debug
field in the algorithm specification you instruct the Pregel
system to generate additional tracing information for debugging purpose.
Currently, only sent messages can be traced but in future this will be expanded
as needed.
{
debug: {
traceMessages: {
"my-vertex-id": {}
}
}
}
This will generate a report for every message that is sent to the vertex
my-vertex-id
. Additionally you can specify a filter by adding a filter
field.
{
debug: {
traceMessages: {
"my-vertex-id": {
filter: {
bySender: ["my-sender-vertex"],
byAccumulator: ["some-accumulator"]
}
}
}
}
}
This for example only generates trace reports for messages that were sent by
my-sender-vertex
and use the some-accumulator
accumulator. You can add more
than one vertex or accumulator to that list. The filters are combined using
and
semantics, i.e. only those messages that pass all filters are traced.
Specification
traceMessages
(optional) a mapping fromvertex-id
to a dict described belowfilter
(optional)bySender
(optional) a list of vertex document ids. Only messages sent by those vertices are traced.byAccumulator
(optional) a list of accumulator names. Only messages sent to those accumulators are traced.
AIR Program
As the name already indicates, the AIR program is the part where the actual algorithmic action takes place. An AIR program is represented with the Arango Intermediate Representation (AIR).
Arango Intermediate Representation
We developed a Lisp-like intermediate representation to be able to transport programs into the existing Pregel implementation in ArangoDB. These programs are executed using the interpreter inside the AIR Pregel algorithm.
At the moment this interpreter is a prototype and hence not optimized and (probably) slow. It is very flexible in terms of what we can implement, provide and test: We can provide any function as a primitive in the language, and all basic operations are available as it is customary in the LISP tradition.
The intention is not that this language is presented to users as is. This is only the representation we are using in our early stage of that experimental feature state.
It is merely an intermediate representation which is very flexible for good prototyping. A surface syntax is subject to development and even flexible in terms of providing more than one. In particular this way we get a better feeling for which functionality is needed by clients and users of graph analytics.
A surface language / syntax will be available later.
AIR specification
The following list of functions and special forms is available in all contexts. AIR is based on Lisp, but represented in JSON and supports its data types.
Strings, numeric constants, booleans and objects (dicts) are self-evaluating, i.e. the result of the evaluation of those values is the value itself. Arrays are not self-evaluating. In general you should read an array like a function call:
["foo", 1, 2, "bar"] // read as foo(1, 2, "bar")
The first element of a list specifies the function. This can either be a string containing the function name, or a lambda object.
To prevent unwanted evaluation or to actually write down a list there are multiple options:
list
quote
quasi-quote
["list", 1, 2, ["foo", "bar"]] // evaluates to [1, 2, foo("bar")] -- evaluated parameters
["quote", 1, 2, ["foo", "bar"]] // evaluates to [1, 2, ["foo", "bar"]] -- unevaluated parameters
They are described in more detail below.
The documentation refers to an array of length two as pair. The first entry is
called first
and the second entry second
.
Truthiness of values
A value is considered false if it is boolean false
or absent (null
)
All other values are considered true.
Special forms
A special form is special in the sense that it does not necessarily evaluate its parameters.
let
statement
binding values to variables
["let", [[name, value]...], expr...]
Expects as first parameter a list of name-value-pairs. Both members of each
pair are evaluated. first
has to evaluate to a string. Declared names become
visible at the first expr
. The following expressions are then evaluated in a
context where the named variables are assigned to their given values. When
evaluating the expression, let
behaves like seq
.
Variables can be dereference using var-ref
.
> ["let", [["x", 12], ["y", 5]], ["+", ["var-ref", "x"], ["var-ref", "y"]]]
= 17
seq
statement
sequence of commands
["seq", expr ...]
seq
evaluates expr
in order. The result value is the result value of the
last expression. An empty seq
evaluates to null
.
> ["seq", ["report", "Hello World!"], 2, 3]
Hello World!
= 3
if
statement
classical if-elseif-else-statement
["if", [cond, body], ...]
Takes pairs [cond, body]
of conditions cond
and expression body
and
evaluates the first body
for which cond
evaluates to a value that is
considered true. It does not evaluate the other cond
s. If no condition
matches, it evaluates to null
. To simulate an else
statement, set the
last condition to true
.
> ["if", [
["lt?", ["var-ref", "x"], 0],
["-", 0, ["var-ref", "x"]]
], [
true, // else
["var-ref", "x"]
]]
= 5
match
statement
not-so-classical switch-statement
["match", proto, [c, body]...]
First evaluates proto
, then evaluates each c
until ["eq?", val, c]
is
considered true. Then the corresponding body
is evaluated and its return
value is returned. If no branch matches, null
is returned. This is a C-like
switch
statement except that its case
-values are not treated as constants.
> ["match", 5,
[1, ["A"]],
[2, ["B"]],
[3, ["C"]],
[4, ["D"]],
[5, ["E"]]
]
= "E"
for-each
statement
["for-each", [[var, list]...] expr...]
Behaves similar to let
but expects a list as value
for each variable.
It then produces the cartesian product of all lists and evaluates its
expression for each n-tuple. The return value is always null
. The order
is guaranteed to be lexicographic order. If the list of variables is empty,
the expressions are evaluated once. If one list is empty, nothing is evaluated.
> ["for-each", [["x", ["list", 1, 2]], ["y", ["list", 3, 4]]], ["report", ["var-ref", "x"], ["var-ref", "y"]]]
1 3
1 4
2 3
2 4
(no value)
quote
and quote-splice
statements
escape sequences for lisp
["quote", expr]
["quote-splice", list]
quote
/quote-splice
copies/splices its parameter verbatim into its output,
without evaluating them.
quote-splice
fails if it is called in a context where it can not splice into
something, for example at top-level.
> ["quote", ["foo"]]
= ["foo"]
> ["list", "foo", ["quote-splice", ["bar"]] ]
= ["foo","bar"]
quasi-quote
, unquote
and unquote-splice
statements
like quote
but can be unquoted
["quasi-quote", expr]
["unquote", expr]
["unquote-splice", expr]
quasi-quote
is like quote
but can be unquoted using
unquote
/unquote-splice
.
Unlike quote
, quasi-quote
scans all the unevaluated values passed as
parameters but copies them. When it finds a unquote
or unquote-splice
it
evaluates its parameters and copies/splices the resulting value into the output.
["quasi-quote", [
["this", "list", "is", "copied"], // this is not evaluated as call
["this", "is", // this neither
["unquote-splice", ["f", 2]] // this will splice f(2) into the result
]
]]
= [["this", "list", "is", "copied"], ["this", "is", f(2)]]
> ["quasi-quote", [["foo"], ["unquote", ["list", 1, 2]], ["unquote-splice", ["list", 1, 2]]]]
= [["foo"],[1,2],1,2]
cons
statement
constructor for lists
["cons", value, list]
Classical lisp instruction that prepends value
to the list list
.
> ["cons", 1, [2, 3]]
= [1, 2, 3]
and
and or
statements
basic logical operations
["and", expr...]
["or", expr...]
Computes the logical and
/or
expression of the given expression. As they are
special forms, those expression shortcut, i.e. and
/or
terminates the
evaluation on the first value considered false
/true
. The empty list
evaluates as true
/false
. The rules for truthiness are applied.
There is also a not
, but it is not a special form.
Language Primitives
Language primitives are methods which can be used in any context. As those are functions, all parameters are always evaluated before passed to the function.
Basic Algebraic Operators
left-fold with algebraic operators and the first value as initial value
["+", ...]
["-", ...]
["*", ...]
["/", ...]
All operators accept multiple parameters. The commutative operators +
/*
calculate the sum/product of all values passed. The empty list evaluates to
0
/1
. The operator -
subtracts the remaining operands from the first,
while /
divides the first operand by the remaining. Again empty lists
evaluate to 0
/1
.
> ["+", 1, 2, 3]
= 6
> ["-", 5, 3, 2]
= 0
Logical operators
convert values to booleans according to their truthiness
["true?", expr]
["false?", expr]
["not", expr]
true?
returns true ifexpr
is considered true, returns false otherwise.false?
returns true ifexpr
is considered false, returns true otherwise.not
is an alias forfalse?
.
> ["true?", 5]
= true
> ["true?", 0]
= true
> ["true?", false]
= false
> ["true?", "Hello world!"]
= true
> ["false?", 5]
= false
Comparison operators
compares on value to other values
["eq?", proto, expr...]
["gt?", proto, expr...]
["ge?", proto, expr...]
["le?", proto, expr...]
["lt?", proto, expr...]
["ne?", proto, expr...]
Compares proto
to all other expressions according to the selected operator.
Returns true if all comparisons are true. Returns true for the empty list.
Relational operators are only available for numeric values. If proto is a
boolean value the other values are first converted to booleans using true?
,
i.e. you compare their truthiness.
The operator names translate as follows:
eq?
--["eq?", left, right]
evaluates totrue
ifleft
is equal toright
gt?
--["gt?", left, right]
evaluates totrue
ifleft
is greater thanright
ge?
--["ge?", left, right]
evaluates totrue
ifleft
is greater than or equal toright
le?
--["le?", left, right]
evaluates totrue
ifleft
is less than or equal toright
lt?
--["lt?", left, right]
evaluates totrue
ifleft
is less thanright
ne?
--["ne?", left, right]
evaluates totrue
ifleft
is not equal toright
Given more than two parameters
[<op>, proto, expr_1, expr_2, ...]
is equivalent to
["and", [<op>, proto, expr_1], [<op>, proto, expr_2], ...]
except that proto
is only evaluated once.
> ["eq?", "foo", "foo"]
= true
> ["lt?", 1, 2, 3]
= true
> ["lt?", 1, 3, 0]
= false
> ["ne?", "foo", "bar"]
= true
Lists
sequential container of inhomogeneous values
["list", expr...]
["list-cat", lists...]
["list-append", list, expr...]
["list-ref", list, index]
["list-set", arr, index, value]
["list-empty?", value]
["list-length", list]
list
constructs a new list using the evaluated expr
s. list-cat
concatenates given lists. list-append
returns a new list, consisting of the
old list and the evaluated expr
s. list-ref
returns the value at index
.
Accessing out of bound is an error. Offsets are zero based. list-set
returns a copy of the old list, where the entry and index index
is replaced
by value
. Writing an index that is out of bounds is an error. list-empty?
returns true if and only if the given value is an empty list. list-length
returns the length of the list.
Sort
sort a list
["sort", compare, list]
`sort` sorts a list in ascending order by using the compare function. `compare`
is called with two parameters `a` and `b`. `a` is considered less than `b` is
the return value of this call is considered true. The sort is **not** stable.
```js
> ["sort", "lt?", ["list", 3, 1, 2]]
= [1, 2, 3]
Dicts
["dict", [key, value]...]
["dict-merge", dict...]
["dict-keys", dict]
["dict-directory", dict]
["attrib-ref", dict, key]
["attrib-ref", dict, path]
["attrib-ref-or", dict, key, default]
["attrib-ref-or", dict, path, default]
["attrib-ref-or-fail", dict, key]
["attrib-ref-or-fail", dict, path]
["attrib-set", dict, key, value]
["attrib-set", dict, path, value]
dict
creates a new dict using the specified key-value pairs. It is undefined
behavior to specify a key more than once. dict-merge
merges two or more
dicts, keeping the latest occurrence of each key. dict-keys
returns a list of
all top level keys. dict-directory
returns a list of all available paths in
preorder, intended to be used with nested directories.
attrib-ref
returns the value of key
in dict
. If key
is not present
null
is returned. attrib-set
returns a copy of dict
but with key
set to
value
. Both functions have a variant that accepts a path. A path is a list of
strings. The function will recurse into the dict using that path. attrib-set
returns the whole dict but with updated subdict.
attrib-ref-or
is similar to attrib-ref
except that it returns default
if
the key was not present. attrib-ref-or-fail
returns an error instead.
> ["attrib-ref", {"foo": "bar"}, "foo"]
= "bar"
> ["dict", ["quote", "foo", "bar"], ["quote", "x", 2]]
= {"foo":"bar", "x": 2}
> ["attrib-ref-or", {"foo": "bar"}, "baz", 5]
= 5
Lambdas
["lambda", captures, parameters, body]
lambda
create a new function object. captures
is a list of variables that
are copied into the lambda at creating time. parameters
is a list of names
that the parameters are bound to. Both can be accessed using their name via
var-ref
. body
is evaluated when the lambda is called.
Lambdas can be used wherever a function is expected.
> [["lambda", ["quote", []], ["quote", ["x"]], ["quote", ["+", ["var-ref", "x"], 4]]], 6]
= 10
Reduce
["reduce", value, lambda, initialValue]
The reduce method executes a reducer function (lambda - required) on each element of the array resp. object in natural resp. undefined order. In general, it is being used to generate a single output value, yet it can be used to generate any supported type.
The lambda function accepts three parameters, the current index (which is either the position in an array, or the current key in case of an object), the value and the current reduced value.
Example:
Addition of all array elements, start value set to 100.
["reduce",
["list", 1, 2, 3],
["lambda",
["quote", []],
["quote", ["key", "value", "accum" ]],
["quote",
["+", ["var-ref", "value"], ["var-ref", "accum"] ]
]
],
100
]
Will produce:
=> 106
Explanation:
- Iteration 1:
- Take 100 as the initial accumulator value
- Calculate and return the sum of 100 and 1
- Iteration 2:
- Take result of the first iteration as accumulator value
- Calculate and return the sum of 101 and 2
- Iteration 3:
- Take result of the second iteration as accumulator value
- Calculate and return the sum of 103 and 3
- Return 106 as we’ve reached the end of our array
Advanced example:
Calculate the sum of all available object values
["reduce",
{"a": 1, "b": 2, "c": 3},
["lambda",
["quote", []],
["quote", ["key", "value", "accum" ]],
["seq",
["quote",
[
"attrib-set",
["var-ref", "accum"],
["var-ref", "key"],
["+", ["var-ref", "value"], ["attrib-ref", ["var-ref", "accum"], ["var-ref", "key"]] ]
]
]
]
],
{"a": 1, "b": 2, "c": 3, "d": 4}
]
Will produce:
=> {"a":2, "b":4, "c":6, "d":4}
Utilities
random functions that fit no other category
["string-cat", strings...]
["int-to-str", int]
["min", numbers...]
["max", numbers...]
["avg", numbers...]
["rand"]
["rand-range", min, max]
string-cat
concatenates the given strings. int-to-string
converts an integer
to its decimal representation.
min
/max
/avg
computes the minimum/maximum/average of its values.
rand
/rand-range
produces a pseudo random number uniformly distributed in [0,1]
/[min,max]
.
> ["string-cat", "hello", " ", "world"]
= "hello world"
> ["min", 1, 2, 3]
= 1
> ["max", 1, 2, 3]
= 3
> ["avg", 1, 2, 3]
= 2
> ["rand"]
= 0.8401877171547095
> ["rand-range", 5, 7]
= 5.788765853638186
Functional
["id", value]
["apply", func, list]
["map", func, list]
["map", func, dict]
["filter", func, list]
["filter", func, dict]
id
returns its argument. apply
invokes func
using the values from list
as arguments. map
invokes func
for every value/key-value-pair in the
list
/dict
. func
should accept two parameters (index, value)
/(key, value)
.
filter
returns a new list
/dict
that contains all entries for which the
return value of func
invoked with (index, value)
/(key, value)
is
considered true.
> ["id", 12]
= 12
> ["apply", "min", ["quote", 1, 2, 3]]
= 1
> ["map", ["lambda", ["list"], ["list", "idx", "value"], ["quote", ["int-to-str", ["var-ref", "value"]]]], ["list", 1, 2, 3, 4]]
= ["1", "2", "3", "4"]
> ["filter", ["lambda",
["list"],
["list", "idx", "value"],
["quote", ["gt?", ["var-ref", "value"], 3]]
], ["list", 1, 2, 3, 4, 5, 6]]
= [4,5,6]
Variables
["var-ref", name]
["bind-ref", name]
var-ref
evaluates to the current value of the variable with name name
.
It is an error to reference a variable that is not defined in the current
context. bind-ref
is an alias of var-ref
.
Debug operators
["report", values...]
["error", msg...]
["assert", cond, msg...]
report
print in a context dependent way the string representation of its
arguments joined by spaces. Strings represent themselves, numbers are converted
to decimal representation, booleans are represented as true
or false
.
Dicts and lists are converted to JSON.
This function is not supported in all contexts, yet.
error
creates an error and aborts execution immediately. Errors are reported
in a context dependent way. The error message is constructed from the remaining
parameters like print
, except that it is not printed but associated with the
error. This like a panic or an uncaught exception.
assert
checks if cond is considered true if it an error with the remaining
parameters as message is raised. It is equivalent to
["if", [cond, ["error", msg...]]]
.
Math Library
The following mathematical functions are available in all contexts. They all
interpret the data as a double
and directly forward their input to the
respective C/C++ library implementation.
abs
acos
acosh
asin
asinh
atan
atan2
atanh
cbrt
ceil
cos
cosh
exp
exp2
expm1
floor
fmod
hypot
log
log10
log1p
log2
pow
round
sin
sinh
sqrt
tan
tanh
trunc
Foreign calls in Vertex Computation context
The following functions are only available when running as a vertex computation
(i.e. as a initProgram
, updateProgram
, …). this
usually refers to the
vertex we are attached to.
Vertex Accumulators
["accum-ref", name]
["accum-set!", name, value]
["accum-clear!", name]
accum-ref
evaluates to the current value of the accumulatorname
.accum-set!
sets the current value of the accumulatorname
tovalue
.accum-clear!
resets the current value of the accumulatorname
to a well-known one. Currently numeric limits formax
andmin
accumulators,0
forsum
,false
foror
,true
forand
, and empty forlist
and VelocyPack.
Global Accumulators
["global-accum-ref", name]
["send-to-global-accum", name, value]
global-accum-ref
evaluates the global accumulatorname
.send-to-global-accum
sendsvalue
to the global accumulatorname
.
Also see the remarks about update visibility.
Message Passing
["send-to-accum", name, to-pregel-id, value]
["send-to-all-neighbors", name, value]
send-to-accum
send the valuevalue
to the accumulatorname
at vertex with pregel-idto-pregel-vertex
. There is not edge required between the sender and the receiver.send-to-all-neighbors
sends the valuevalue
to the accumulatorname
in all neighbors reachable by an edge, i.e. along outbound edges. Note that if there are multiple edges from us to the neighbor, the value is sent multiple times.
This Vertex
["this-doc"]
["this-vertex-id"]
["this-unique-id"]
["this-pregel-id"]
["this-outdegree"]
["this-outbound-edges-count"]
["this-outbound-edges"]
this-doc
returns the data associated with the vertex.this-outdegree
returns the number of outgoing edges.this-outbound-edges-count
alias forthis-outdegree
.this-outbound-edges
returns a list of outbound edges of the form{ "document": <edge-document>, "to-pregel-id": <to-vertex-pregel-id> }
this-vertex-id
returns the vertex document identifier.this-unique-id
returns a unique but opaque numeric value associated with this vertex.this-pregel-id
returns an identifier used by Pregel to send messages.
Miscellaneous
["vertex-count"]
returns the total number of vertices in the graph under consideration.["global-superstep"]
the current superstep the algorithm is in.["phase-superstep"]
the current superstep the current phase is in.["current-phase"]
the current phase name.
Foreign calls in Coordinator context
The following functions are only available when running in the Coordinator context to coordinate phases and phase changes and to access and modify global accumulators.
Phase Management
["goto-phase", phase]
["finish"]
goto-phase
sets the current phase to phase
. finish
finishes the Pregel
computation.
Global Accumulators
["global-accum-ref", name]
["global-accum-set!", name, value]
["global-accum-clear!", name]
global-accum-ref
, global-accum-set!
, global-accum-clear!
like for
accumulators but for global accumulators.
Foreign calls in Custom Accumulator context
The following functions are only available when running inside a custom accumulator.
["parameters"]
returns the object passed as parameter to the accumulator definition["current-value"]
returns the current value of the accumulator["get-current-value"]
returns the current value but calls thegetProgram
to do so.["input-value"]
returns the input value. This is the value received as update inupdateProgram
. Or the value the accumulator is set to insetProgram
.["input-sender"]
returns the vertex-id of the sending vertex. This is only available inupdateProgram
.["input-state"]
return the input state for a merge operation. This is only available inaggregateStateProgram
.["this-set!", value]
set the new value of the accumulator tovalue
.["this-set-value!", value]
set the new value of the accumulator but calls thesetProgram
to do so.
Accumulators
In PPAs there are special types, called: Accumulators
. There are two
types of Accumulators:
- VertexAccumulators: one instance per vertex.
- GlobalAccumulators: a single instance globally.
Accumulators are used to consume and process messages which are being sent to
them during the computational steps (initProgram
, updateProgram
,
onPreStep
, onPostStep
) of a superstep. After a superstep is done, all
messages will be processed.
The manner on how they are going to be processed depends on their
accumulatorType
.
Vertex Accumulators
Vertex Accumulators are following the general definition of an Accumulator. There is only one exception: A vertex is able to modify their own local accumulator directly during the computational steps, but only their own.
In short: Modifications which will be done via messages, will be visible in the next superstep round. Changes done locally, are visible directly - but cannot be done from one vertex to another.
Example
Imagine a simple part of a graph like this:
B ← E
↗
A → C
↘
D
The vertex A
has edges pointing to the vertices B
, C
and D
.
Additionally, the vertex E
is pointing to the vertex B
. If we want to
calculate now, how many incoming edges B
, C
and D
have, we need to sent
a message with the value 1
, which represents an incoming edge, along all
outgoing edges of our vertices. As only A
and E
do have outgoing edges,
only those two vertices will send messages:
-
Phase - Computation (Superstep S)
Vertex A:
- Sending 1 to B
- Sending 1 to C
- Sending 1 to D
Vertex E:
- Sending 1 to B
As we want to sum up all received values, the
sumAccumulator
needs to be used. It will automatically compute the value out of all received messages: -
Phase - Aggregation
- Vertex
B
receives two messages- Result is: 2. (1+1)
- Vertex
C
receives one messages- Result is: 1. (1)
- Vertex
D
receives one messages- Result is: 1. (1)
- Vertex
-
Phase - onPostStep (Superstep S)
Aggregated Accumulators are visible now. Additional modifications can be implemented here.
-
Phase - onPreStep (Superstep S+1)
Aggregated Accumulators are visible now. They could be modified in the previous
onPostStep
routine. Latest changes will be visible here as well. Further modifications can be done here. -
Phase - Computation (Superstep S+1)
The latest Accumulator states are visible. New messages can be sent. They will be visible in the next round.
Vertex Accumulator Definition
Each vertex accumulator requires a name as string
:
{
"<name>": {
"accumulatorType": "<accumulator-type>",
"valueType": "<valueType>",
"customType": "<custom-accumulator-type>"
}
}
Vertex Accumulator Parameters
- accumulatorType (string, required): The name of the used accumulator type.
Valid values are:
max
: stores the maximum of all messages received.min
: stores the minimum of all messages received.sum
: sums up all messages received.and
: computesand
on all messages received.or
: computesor
and all messages received.store
: holds the last received value (non-deterministic).list
: stores all received values in list (order is non-deterministic).custom
: see below.
- valueType (string, required): The name of the value type.
Valid value types are:
any
(JSON data)int
(Integer type)double
: (Double type)bool
: (Boolean type)string
: (String type)
- customType (string, optional): The name of the used custom accumulator type.
Has to be set if and only if
accumulatorType == custom
.
Global Accumulator
Global Accumulators are following the general definition of an Accumulator.
Compared to a Vertex Accumulator they do not have local access to the Accumulator.
Changes can only take place when sending messages or in pre-step and post-step
programs and therefore can only be visible in the next superstep round
(or in the onPostStep
routine in the current round).
Custom Accumulator
Because the above list of accumulators feels limited and may not suite your case
best you can create your own custom accumulator. You can define a custom
accumulator in the customAccumulators
field of the algorithm, which is an
object, mapping the name of the custom accumulator to its definition.
To use it, set the accumulatorType
to custom
and the valueType
to any
.
In customType
put the name of the custom accumulator.
The definition of a custom vertex accumulator contains the following fields:
updateProgram
this code is executed whenever the accumulator receives a message. Theinput-value
andinput-sender
functions are available here. This program should either return"hot"
when the accumulator changed, i.e. its vertex will be activated in the next step, or"cold"
if not.clearProgram
this code is executed whenever the accumulator is cleared, for example whenaccum-clear
is called.setProgram
this code is executed whenever the accumulator is set to a specific value. Theinput-value
function is available to receive the new value, for example whenaccum-set!
is called. By default this program replaces the internal state of the accumulator with the given value.getProgram
this code is executed whenever the accumulator is read from. Its return value is the actual value that will be returned by for exampleaccum-ref
. By default this program returns the internal state of the accumulator.finalizeProgram
this code is executed when the value of the accumulator is written back into the vertex document. It defaults togetProgram
.
Each custom accumulator has an internal buffer. You can access this buffer using
the current-value
function. To set a new value use this-set!
. Note that
this-set!
will not invoke the setProgram
but instead copy the provided value
to the internal buffer.
A simple sum accumulator could look like this:
{
"updateProgram": ["if",
[["eq?", ["input-value"], 0],
"cold"],
[true,
["seq",
["this-set!", ["+", ["current-value"], ["input-value"]]],
"hot"]]],
"clearProgram": ["this-set!", 0],
"getProgram": ["current-value"],
"setProgram": ["this-set!", ["input-value"]],
"finalizeProgram": ["current-value"],
}
Global Custom Accumulators
You can upgrade a custom vertex accumulator to a global accumulator as follows.
Before a new superstep begins the global accumulators are distributed to the
DB-Servers by the Coordinator. During the superstep, vertex programs can read from
those accumulators and send messages to them. Those messages are then
accumulated per DB-Server in a cleared version of the accumulator,
i.e. sending a message does call updateProgram
but the write accumulator
is cleared when the superstep begins.
After the superstep the accumulated values are collected by the Coordinator and
then aggregated. Finally the new value of the global accumulator is available
in the onPostStep
program.
There are more fields, some of them required, involved in when using accumulator as global accumulator.
setStateProgram
this code is executed when the DB-Server receives a new value for the global accumulator.input-state
is available in this context. The default implementation replaces the internal state of the accumulator withinput-state
.getStateProgram
this code is executed when the Coordinator serializes the value of the global accumulator before distributing it to the DB-Servers. The default implementation just copies the internal state.getStateUpdateProgram
this code is executed when the DB-Server serializes the value of the accumulator during the collect phase, sending its result back to the Coordinator. The default implementation is to callgetStateProgram
.aggregateStateProgram
this code is executed on the Coordinator after it received the update states. This code merges the different aggregates.
Coming back to our sum accumulator we would expand it like so:
{
updateProgram: ["if",
[["eq?", ["input-value"], 0],
"cold"],
[true,
["seq",
["this-set!", ["+", ["current-value"], ["input-value"]]],
"hot"]]],
clearProgram: ["this-set!", 0],
getProgram: ["current-value"],
setProgram: ["this-set!", ["input-value"]],
finalizeProgram: ["current-value"],
aggregateStateProgram: ["seq",
["this-set!", ["+", ["current-value"], ["input-state"]]],
"hot"
],
}
Execute a PPA
Except the precondition to have your custom defined algorithm, the execution of a PPA follows the basic Pregel implementation. To start a PPA, you need to require the Pregel module in arangosh.
const pregel = require("@arangodb/pregel");
return pregel.start("air", "<graphName>", "<custom-algorithm>");
Status of a PPA
Executing a PPA using the pregel.start()
method will deliver unique ID to the
status of the algorithm execution.
let pregelID = pregel.start("air", graphName, "<custom-algorithm>");
var status = pregel.status(pregelID);
The result will tell you the current status of the algorithm execution. It will tell you the current state of the execution, the current global superstep, the runtime, the global aggregator values as well as the number of send and received messages. Also see Status of an algorithm execution.
Additionally, the status objects for custom algorithms is extended and contains more info as the general pregel one. More details in the next section.
Error reporting
Before the execution of a PPAs starts, it will be validated and checked for
potential errors. This helps a lot during development. If a PPA fails, the
status will be “fatal error”. In that case there will be an additional field
called reports
. All debugging messages and errors will be listed there.
Also you’ll get detailed information when, where and why the error occurred.
Example:
{
"reports": [{
"msg": "in phase `init` init-program failed: pregel program returned \"vote-halts\", expecting one of `true`, `false`, `\"vote-halt\", or `\"vote-active\"`\n",
"level": "error",
"annotations": {
"vertex": "LineGraph10_V/0:4020479",
"pregel-id": {
"key": "0:4020479",
"shard": 1
},
"phase-step": 0,
"phase": "init",
"global-superstep": 0
}
}]
}
Also we have added a few debugging primitives to help you increase your
developing speed. For example, there is the possibility to add “prints”
to your program. Furthermore have a look at the documentation of the debug
field for the algorithm. See Debug operators.
Developing a PPA
There are two ways of developing your PPA. You can either run and develop in the ArangoShell (as shown above), or you can use the Foxx Service “Pregelator”. The Pregelator can be installed separately and provides a nice UI to write a PPA, execute it and get direct feedback in both “success” and “error” cases.
Pregelator
The Pregelator Service is available on GitHub: https://github.com/arangodb-foxx/pregelator
The bundled ZIP files are kept in the directory: zippedBuilds
and can be
installed via foxx-cli
, the standard web-ui
or via arangosh
.
Examples
As there are almost no limits regarding the definition of a PPA, here we will provide a basic example of the “vertex-degree” algorithm and demonstrate how the implementation would look like.
Note: We have implemented also more complex algorithms in PPA to demonstrate advanced usage. As those are complex algorithms, they are not included as examples in the documentation. But for the curious ones, they can be found here:
Vertex Degree
The algorithm calculates the vertex degree for incoming and outgoing edges. First, take a look at the complete vertex degree implementation. Afterwards we will split things up and go into more details per each individual section.
{
"maxGSS": 1,
"vertexAccumulators": {
"outDegree": {
"accumulatorType": "store",
"valueType": "ints"
},
"inDegree": {
"accumulatorType": "store",
"valueType": "ints"
}
},
"phases": [{
"name": "main",
"initProgram": ["seq",
["accum-set!", "outDegree", ["this-outbound-edges-count"]],
["accum-set!", "inDegree", 0],
["send-to-all-neighbours", "inDegree", 1]
],
"updateProgram": ["seq",
false]
}],
"dataAccess": {
"writeVertex": [
"attrib-set", ["attrib-set", ["dict"], "inDegree", ["accum-ref", "inDegree"]],
"outDegree", ["accum-ref", "outDegree"]
]
}
}
Used Accumulators
In the example, we are using two vertex accumulators: outDegree
and inDegree
,
as we want to calculate and store two values.
outDegree
"outDegree": {
"accumulatorType": "store",
"valueType": "ints"
}
A vertex knows exactly how many outgoing edges it has by definition. Therefore
we only have to set the amount to an accumulator once and not multiple times.
With that knowledge it makes sense to set the accumulatorType
to store, as
no further calculations need to take place. As the possible amount of
outgoing edges is integral, we are setting valueType
to ints
.
inDegree
What a vertex does not know is, how many incoming edges it has. Therefore we need to get them to know that value.
"inDegree": {
"accumulatorType": "sum",
"valueType": "ints"
}
The choice for valueType
is equal compared to “outDegree”
because of the same reason. But as you can see, the accumulatorType
is now
set to sum
. As our vertices do not know how many incoming edges there are,
each vertex needs to send a message to all outgoing. In our program, a message
will be sent to every neighbor. That means a vertex with n neighbors,
will send n messages. As in our case, a single message represents a single
incoming edge, we need to add “+1” to our accumulator per each message, and
therefor set the accumulatorType
to sum
.
Program
initProgram
initProgram: [
"seq",
// Set our outDegree accumulator to ["this-outbound-edges-count"]
["accum-set!", "outDegree", ["this-outbound-edges-count"]],
// Initializes our inDegree (sum) accumulator to 0
["accum-set!", "inDegree", 0],
// Send value: "1" to all neighbors, so their inDegree can be raised next round!
["send-to-all-neighbours", "inDegree", 1]
]
updateProgram
updateProgram: ["seq",
"vote-halt"]
}]
As in our case, we do not need an update program. We are just inserting a dummy
(void) method (will be improved in further state). But currently it is necessary,
because our update program needs to run once to accumulate the inDegrees values
that have been sent out in our initProgram
. Therefore maxGSS
is set to 2
.
Storing the result
To be able to store the result, we either need to define a resultField
(which will just store all accumulators into the given resultField attribute)
or create a <program>
which will take care of our store procedure.
The next code snippet demonstrates how a store program could look like:
"dataAccess": {
"writeVertex": [
"dict",
["list", "inDegree", ["accum-ref", "inDegree"]],
["list", "outDegree", ["accum-ref", "outDegree"]]
]
}