Dorylus: Affordable, Scalable, and Accurate GNN Training with Distributed CPU Servers and Serverless
https://arxiv.org/pdf/2105.11118.pdf
Paper
Abstract
Two major GNN training obstacles
it replies on high-end servers with many GPUs which are expensive to purchase and maintain
limited memory on GPUs cannot scale to today's billion-edge graphs
Dorylus: distributed system for training GNNs which takes advantage of serverless computing to increase scalability at a low cost
Key: computation separation
Construct a deep, bounded-asynchronous pipeline where graph and tensor parallel tasks can fully overlap
Effectively hiding the network latency incurred by Lambdas
Introduction
Model in GNN family: GCN, GRN, GAT
Wide range of applications
GPUs
Benefits: highly-parallel computations
Drawbacks:
expensive
Even in public cloud (show the pricing)
limited memory --> hinder scalability
Existing approaches to solve the two drawbacks
CPU
Looser memory restrictions, and operate at lower costs
But: far inferior efficiency (speed)
Graph sampling techniques
Improve scalability by considering less graph data
Limitations
must be done repeatedly per epoch, incurring time overheads
reduce accuracy of trained GNNs
no guarantee on convergence
Affordable, Scalable, and Accurate GNN Training
High efficiency: close to GPUs
High accuracy: e.g higher than sampling
Low cost for training billion-edge graphs
Turn to: Serverless computing paradigm
Large number of parallel threads at an extremely low price
Challenges
Limited compute resources (e.g. 2 weak vCPUs)
Restricted network resources (e.g., 200 Mbps between Lambda servers and standard EC2 servers)
Challenges
how to make computation fit into Lambda's weak compute profile?
Not all operations in GNN training need Lambda's parallelism
Divide training pipeline into a set of fine-grained tasks, based on the type of data they process
graph-parallel path: CPU instances
tensor-parallel path: Lambdas
Lightweight linear algebra operation on a data chunk of a small size
cheaper than regular CPU instances
how to minimize the negative impact of Lambda's network latency?
Now, Lambdas can spend one-third of their time on communication
To not let communication to be the bottleneck: bounded pipeline asynchronous computation (BPAC)
Make full use of pipelining where different fine-grained tasks overlap with each other
Graph-parallel & tensor-parallel tasks run simultaneously
Also, to reduce the wait time between tasks, incorporates asynchrony into the pipeline
Challenge: two computation paths, where exactly should asynchrony be introduced?
Use it where staleness can be tolerated
parameter updates (tensor-parallel path)
data gathering from neighbor vertices (graph-parallel path)
To prevent asynchrony from slowing down the convergence, bounds the degree of asynchrony at each location using
weight stashing at parameter updates
bounded staleness at data gathering
Background
GNN
Input: graph-structured data, each vertex is associated with a feature vector
Output: a feature vector for each individual vertex or the whole graph
Usage: output feature vectors can be used for graph or vertex classification
Goal: learn the patterns and relationships among the data, rather than relying solely on the features of a single data point
Combine
Graph propagation (Gather and Scatter)
NN computations
Graph-parallel interface
E.x. deep graph library (DGL), which unifies a variety of GNN models with a common GAS-like interface
Example: GCN
Forward Pass
A: adjacency matrix of the input graph, A_bar = A + I_N: adjacency matrix with self-loops constructing by adding A with the identity matrix
D_bar: diagonal matrix such that
Normalized adjacency matrix
W_L: layer-specific trainable weight matrix
Non-linear activation function, such as ReLU
H_L: activations matrix of the L-th layer
H_0 = X is the input feature matrix for all vertices
Mapping R1 to vertex-centric computation model:
Gather (GA)
Apply Vertex (AV)
Scatter (SC)
Apply Edge (AE)
One can see layer L's activations matrix H_L as a group of activation vectors, each associated with a vertex
Goal: compute a new activations vector for each vertex based on the vertex's previous activations vector (which, initially is its feature vector) and the information received from its in-neighbors
Different (!) from traditional graph processing
The computation of new activations matrix H_(L+1) is based on computationally intensive NN operations rather than a numerical function
Step
GA stage retrieves a (activation) vector from each in-edge of a vertex and aggregates these vectors into a new vector v
Applying GA on all vertices is just a matrix multiplication between normalized adjacency matrix and input activations matrix
The result from first stage is fed to AV, which performs NN operations to obtain a new activations matrix H_(L+1)
For GCN, AV multiplies the result from stage one with a trainable weight matrix W_L and applies non-linear activation function
The output of AV goes to SC, which propagates the new activations vector of each vertex among all out-edges of the vertex
The same or different?
AE: Lastly, new activations vector of each vertex goes into an edge-level NN architecture to compute an activations vector for each edge
GCN: edge-level NN is not needed
Repeat the process k times (k layers) allow the vertex to consider features of vertices k hops away
Backward Pass
computes the gradients for all trainable weights in the vertex- and edge-level NN architectures (i.e., AV and AE).
Training epochs
Forward pass
Backward pass
Weights update
Uses the gradients computed in the backward pass to update the trainable weights in the vertex- and edge-level NN architecture in a GNN
Run repeatedly until reaching acceptable accuracy
Design Overview
Three components:
EC2 graph servers (GS)
Input graph is partitioned using an edge-cut algorithm that takes care of load balancing across partitions
Each partition is hosted by a graph server
Vertex data: represented as a 2D array
Edge data: represented as a single array
Edges are stored in the compressed sparse row (CSR) format
Ghost buffer
Stores the data that are scattered in from remote servers
Communicate
With each other: to execute graph computations by sending / receiving data along cross-partitions edges
Only during scatter in both
Forward pass: activation values are propagated along cross-partition edges
Backward pass: gradients are propagated along the same edges in the reverse direction
With lambda threads: execute tensor computations
Lambda threads for tensor computation
Tensor operations such as AV and AE interleave with graph operations
Employs a high-performance linear algebra kernel for tensor computation
Used in both forward and backward pass
Communicates frequently with PS
Forward: retrieve weights from PSes to compute layer output
Backward: compute updated weights
EC2 parameter servers
Graph computation is done in a conventional way, breaking a vertex program into stages of
vertex parallel (gather)
edge parallel (scatter)
Tasks and pipelining
Fine-grained Tasks
Challenge: decompose the process into a set of fine-grained tasks that can
Overlap with each other
Be processed by Lambda's weak compute
Decomposition is done based on the data type and computation type
Graph operations performed on GSes: computations that involve adjacency matrix of the input graph (i.e. A)
Lambda operations: computations that involve only tensor data
Specific tasks over each training epoch
Forward pass
computes the output using current weights
graph-parallel tasks on GSes
Gather (GA)
Scatter (SC)
tensor-parallel tasks on Lambdas: multiply matrices involving only features and weights and apply activation functions
Apply Vertex (AV)
Lambda threads retrieve vertex data (H_(L)) from GSes and weight data (W_(L)) from PSes, compute their products, apply ReLU, and send the result back to GSes as the input for scatter
AV returns --> SC sends data, across cross-partition edges, to the machines that host their destination vertices
Apply Edge (AE)
Each Lambda thread retrieves
Vertex data from the source and destination vertices of the edge (i.e. activation vectors)
Edge data (such as edge weights) from GSes
Computes per-edge update by performing model-specific tensor operations
Updates are streamed back to GSes and become the inputs of the next layer's GA task
Backward pass
uses a loss function to compute weight updates
Involves: GSes, Lambdas, and PSes that coordinate to run a graph-augmented SGD algorithm
Each task in the forward pass --> a backward pass corresponded
Either propagates information in the reverse direction of edges on the graph
Or computes the gradients of its trainable weights with respect to a given loss function
Weight updates (WU), aggregate gradients across PSes
Del(GA), Del (SC): propagate information in the reverse direction
AE & AV: apply weights to compute the output of the edge and vertex NN
Conversely, Del(AE) and Del(AV) compute weight updates for the NNs, which are inputs to WU
Tensor-only computation and are executed by Lambdas
Pipelining
GSes kick off training by running parallel graph tasks
Dorylus divides vertices in each partition into intervals (i.e. minibatches)
For each interval, the amount of tensor computations depends on both number of vertices (i.e. AV) and number of edges (i.e. AE) in the interval
Amount of graph computation (on a GS) depends primarily on the number of edges (i.e. GA and SC)
Balance work intervals: division uses simple algorithm to ensure that different intervals have the same number of vertices and vertices in each interval have similar numbers of inter-interval edges
Edges incur cross-minibatch dependency (async pipeline needs to handle)
Each interval is processed by a task
When the pipeline is saturated, different tasks will be executed on distinct intervals of vertices
Each GS maintains a task queue and enqueues a task once it's ready to execute (i.e. input available)
Thread pool: # of threads = # of vCPUs
Output of GS task is fed to a Lambda for tensor computation
Figure 4
Dorylus enqueues a set of GA tasks, each processing a vertex interval
# of threads on each GS << # of tasks, some tasks finish earlier than others and results are pushed immediately to Lambda threads for AV
Once they are done, outputs are sent back to GS for Scatter
Backward pass
Del(AE) and Del(AV) compute gradients and send them to PSes for weight updates
Overall, through pipelining, Dorylus overlaps the graph-parallel and tensor-parallel computations so as to hide Lambda's communication latency
Contribution: enable pipelining in GNN training
Fine-grained tasks
Insight of computation separation
Bounded Asynchrony
Goal: workers do not need to wait for updates to proceed in most cases
Lambdas run in an extremely dynamic environment and stragglers almost always exist
Challenge: asynchrony slows down convergence (fast-progressing minibatches may use out-of-date weights, prolonging the training time)
Existing solutions: bounded staleness (lightweight sync)
But challenge uniquely to Dorylus is the two sync points
weight sync at each WU task
sync of (vertex) activations data from neighbors at each GA
Bounded Asynchrony at Weight Updates
Full asynchrony:
different vertex intervals are at their own training pace; some intervals may use a particular version v0 of weights during a forward pass to compute gradients while applying these gradients on another version v1 of weights on their way back in the backward pass
Statistical inefficiency
Method: weight stashing (proposed in PipeDream)
allows any interval to use the latest version of weights available in a forward pass and stashes (i.e., caches) this version for use during the corresponding backward pass.
Challenge:
Occurs at PSes (host weight matrices and perform updates)
Need to balance loads and bandwidth usage
Solution:
Multiple PSes
Directs a Lambda to a PS that has the lightest load
Each Lambda can be at different stages of an epoch (e.g. forware & backward pass, different layers)
Each PS host a replication of weight matrices of all layers --> any Lambda can use any PS in any stage
GNN: few layers, replicating weights are feasible (no much memory overheads)
But if we allow any Lambda to use any PS, each PS has to maintain not only the latest weight matrices but also their stashed version of all intervals, practically impossible
Solution:
each PS has replication of all latest weights (periodically broadcast their latest weights), but weight stashes only for a subset of vertex intervals
for each interval in a given epoch, the interval's weight stashes are only maintained on the first PS it interacts with in the epoch
Bounded Asynchrony at Gather
Async gather allows vertex intervals to progress independently using stale vertex values (i.e activation vectors) from their neighbors without waiting for their updates
Number of layers in a GNN is determined statically and n-layer GNN aims to propagate the impact of a vertex's n-hop neighborhood to the vertex.
Question: can asynchrony change the semantics of the GNN?
1) Can vertices eventually receive the effect of their n-hop neighborhood? Yes
2) Is it possible for any vertex to receive the effect of its m-hop neighbor where m > n after many epochs? No
Use bounded staleness at Gather (a fast-moving vertex interval is allowed to be at most S epochs away from the slowest-moving interval)
Lambda Management
Each GS runs a Lambda controller, which launches Lambdas, batches data to be sent to each Lambda, monitors each Lambda's health, and routes its result back to the GS
Launched by the controller for a task t at the time t's previous task starts executing
E.g. n Lambda threads are launched (prepare for AV) when n GA tasks start to run
Setting
Each runs with Open-BLAS library
Communicate with GS and PS using ZeroMQ
Deployed inside virtual private cloud (VPC)
AWS reuses a container that already has the code deployed instead of cold-starting a new container
Times each execution, relaunches after timeout
Lambda Optimizations
Challenge: limited network bandwidth
Per-lambda bandwidth goes down as the number of lambdas increases (suspect: Lambdas created by the same user get scheduled on the same machine and share a network link)
Three optimizations
Task fusion
Since AV of the last layer in a forward pass is connected directly to del(AV) of the last layer in the next backward pass, merge them into a single Lambda-based task
Reduce invocations of thousands of Lambdas for each epoch
Save a round-trip communication between Lambdas and GSes
Tensor rematerialization
Now: cache intermediate result during the forward pass to be used in the backward pass
But: because network communication is a bottleneck, more profitable to rematerialize the intermediate tensors by launching more Lambdas rather than retrieving them from GSes
Lambda-internal streaming
A data chunk: retrieve the first half of the data, with which it proceeds to computation while simultaneously retrieving the second half
Can: reduce Lambda response time
Autotuning Numbers of Lambdas
Performance: effectiveness of Lambdas depends on whether the pipeline can be saturated
Too few Lambdas would not generate enough task instances for the graph computations to saturate CPU cores
Cost side: we want to avoid over-saturating the pipeline which can generate too many CPU tasks for the GS to handle
Opt number of Lambdas is related to
Pace of the graph computation
Which depends on the graph structure (i.e. density) and partitioning that are hard to predict before execution
min(#intervals, 100)
number of intervals = number of vertex intervals on each GS
Auto-tuner auto-adjusts the number by periodically checking the size of the CPU's task queue
If the size of the queue constantly grows: CPU cores have too many tasks to process, scale down
If queue quickly shrinks, scale up the number of Lambdas
Goal: stabilize the size of the queue s.t. the number of Lambdas matches the pace of graph tasks
Evaluation
Instance Selection
c5: compute-optimized instances
c5n: compute and network optimized instances
Lambda: a container with 0.11 vCPUs and 192MB memory
$0.2 per 1M requests
Compute cost: $0.01125/h (billed per 100 ms)
Able to handle short bursts of massive parallelism much better than CPU instances
Asynchrony
Can provide overall performance benefits in general but too large a staleness value leads to slow convergence and poor performance, although the per-epoch time reduces
Choose s = 0 as default Lambda variant in the following experiments: allow a vertex to use a stale value from a neighbor as long as the neighbor is in the same epoch (e.g. can be in the previous layer)
Effects of Lambdas
Compare it with CPU-only servers for computations and GPU-only servers (both without Lambdas)
Use computation separation for scalability
Value: take the reciprocal of the total time (i.e. the performance or rate of completion) and divide it by the cost
Dorylus with Lambdas provides a 2.75x higher value than CPU-only
But for small dense graphs, GPU-only variants is better
This suggests that GPUs may be better suited to process small, dense graphs rather than large, dense graph
Scaling out
Dorylus can gain even more value by scaling out to more servers, due to the burst parallelism provided by Lambdas and deep pipelining.
Scales well in terms of both performance and values
Dorylus can roughly provide the same value as the CPU-only variant with only half of the number of servers.
Other observations
The more sparse the graph, the more useful Dorylus is
E.g. for Amazon and Friendster, Dorylus even outperforms the GPU-only version for two reasons
The fraction of time on scatter is significantly larger in those two examples
Why? The time depends on a combination of the number of ghost vertices and inter-partition edges. They have many inter-partition edges, but very few ghost vertices
Scatter takes much longer time in GPU clusters
Moving ghost data between GPU memories on different nodes is much slower than data transferring between CPU memories
Lambda threads are more effective in boosting performance for GAT than GCN
GAT includes an additional AE task, which performs per-edge tensor computation and thus benefits significantly from a high degree of parallelism
Dorylus achieves comparable performance with CPU-only variant that uses twice as many servers
Comparison with existing systems
Breakdown of Performance and Costs
Last updated
Was this helpful?