# Dorylus: Affordable, Scalable, and Accurate GNN Training with Distributed CPU Servers and Serverless

https://arxiv.org/pdf/2105.11118.pdf

## Paper

### Abstract

Two major GNN training obstacles

it replies on high-end servers with many GPUs which are expensive to purchase and maintain

limited memory on GPUs cannot scale to today's billion-edge graphs

Dorylus: distributed system for training GNNs which takes advantage of serverless computing to increase scalability at a low cost

Key: computation separation

Construct a

*deep, bounded-asynchronous*pipeline where graph and tensor parallel tasks can fully overlapEffectively hiding the network latency incurred by Lambdas

### Introduction

Model in GNN family: GCN, GRN, GAT

Wide range of applications

GPUs

Benefits: highly-parallel computations

Drawbacks:

expensive

Even in public cloud (show the pricing)

limited memory --> hinder scalability

Existing approaches to solve the two drawbacks

CPU

Looser memory restrictions, and operate at lower

**costs**But: far inferior efficiency (speed)

Graph sampling techniques

Improve

**scalability**by considering less graph dataLimitations

must be done repeatedly per epoch, incurring time overheads

reduce

**accuracy**of trained GNNsno guarantee on convergence

**Affordable, Scalable, and Accurate GNN Training**High efficiency: close to GPUs

High accuracy: e.g higher than sampling

Low cost for training billion-edge graphs

Turn to: Serverless computing paradigm

Large number of parallel threads at an extremely low price

Challenges

Limited compute resources (e.g. 2 weak vCPUs)

Restricted network resources (e.g., 200 Mbps between Lambda servers and standard EC2 servers)

Challenges

*how to make computation fit into Lambda's weak compute profile?*Not all operations in GNN training need Lambda's parallelism

Divide training pipeline into a set of

**fine-grained tasks,**based on the type of data they processgraph-parallel path: CPU instances

tensor-parallel path: Lambdas

Lightweight linear algebra operation on a data chunk of a small size

cheaper than regular CPU instances

*how to minimize the negative impact of Lambda's network latency?*Now, Lambdas can spend one-third of their time on communication

To not let communication to be the bottleneck:

**bounded pipeline asynchronous computation (BPAC)**Make full use of pipelining where different fine-grained tasks overlap with each other

Graph-parallel & tensor-parallel tasks run simultaneously

Also, to reduce the wait time between tasks, incorporates asynchrony into the pipeline

Challenge: two computation paths, where exactly should asynchrony be introduced?

Use it where staleness can be tolerated

**parameter updates**(tensor-parallel path)**data gathering**from neighbor vertices (graph-parallel path)

To prevent asynchrony from slowing down the convergence, bounds the degree of asynchrony at each location using

**weight stashing**at parameter updates**bounded staleness**at data gathering

### Background

GNN

Input: graph-structured data, each vertex is associated with a feature vector

Output: a feature vector for each individual vertex or the whole graph

Usage: output feature vectors can be used for graph or vertex classification

Goal: learn the patterns and relationships among the data, rather than relying solely on the features of a single data point

Combine

Graph propagation (Gather and Scatter)

NN computations

Graph-parallel interface

E.x. deep graph library (DGL), which unifies a variety of GNN models with a common GAS-like interface

#### Example: GCN

#### Forward Pass

A: adjacency matrix of the input graph, A_

*bar = A + I*_N: adjacency matrix with self-loops constructing by adding A with the identity matrixD_bar: diagonal matrix such that

Normalized adjacency matrix

W_L: layer-specific trainable weight matrix

Non-linear activation function, such as ReLU

H_L: activations matrix of the L-th layer

H_0 = X is the input feature matrix for all vertices

Mapping R1 to vertex-centric computation model:

Gather (GA)

Apply Vertex (AV)

Scatter (SC)

Apply Edge (AE)

One can see layer L's activations matrix H_L as a group of activation vectors, each associated with a vertex

Goal: compute a new activations vector for each vertex based on the vertex's previous activations vector (which, initially is its feature vector) and the information received from its in-neighbors

**Different (!) from traditional graph processing**The computation of new activations matrix H_(L+1) is based on computationally intensive NN operations rather than a numerical function

Step

**GA**stage retrieves a (activation) vector from each in-edge of a vertex and aggregates these vectors into a new vector vApplying GA on all vertices is just a matrix multiplication between normalized adjacency matrix and input activations matrix

The result from first stage is fed to

**AV**, which performs NN operations to obtain a new activations matrix H_(L+1)For GCN, AV multiplies the result from stage one with a trainable weight matrix W_L and applies non-linear activation function

The output of AV goes to

**SC**, which propagates the new activations vector of each vertex among all out-edges of the vertexThe same or different?

**AE:**Lastly, new activations vector of each vertex goes into an edge-level NN architecture to compute an activations vector for each edgeGCN: edge-level NN is not needed

Repeat the process k times (k layers) allow the vertex to consider features of vertices k hops away

#### Backward Pass

computes the gradients for all trainable weights in the vertex- and edge-level NN architectures (i.e., AV and AE).

#### Training epochs

Forward pass

Backward pass

Weights update

Uses the gradients computed in the backward pass to update the trainable weights in the vertex- and edge-level NN architecture in a GNN

Run repeatedly until reaching acceptable accuracy

### Design Overview

Three components:

**EC2 graph servers (GS)**Input graph is partitioned using an edge-cut algorithm that takes care of load balancing across partitions

Each partition is hosted by a graph server

Vertex data: represented as a 2D array

Edge data: represented as a single array

Edges are stored in the compressed sparse row (CSR) format

Ghost buffer

Stores the data that are scattered in from remote servers

Communicate

With each other: to execute graph computations by sending / receiving data along cross-partitions edges

Only during scatter in both

Forward pass: activation values are propagated along cross-partition edges

Backward pass: gradients are propagated along the same edges in the reverse direction

With lambda threads: execute tensor computations

**Lambda threads for tensor computation**Tensor operations such as AV and AE interleave with graph operations

Employs a high-performance linear algebra kernel for tensor computation

Used in both forward and backward pass

Communicates frequently with PS

Forward: retrieve weights from PSes to compute layer output

Backward: compute updated weights

**EC2 parameter servers**

Graph computation is done in a conventional way, breaking a vertex program into stages of

vertex parallel (gather)

edge parallel (scatter)

### Tasks and pipelining

#### Fine-grained Tasks

Challenge: decompose the process into a set of fine-grained tasks that can

Overlap with each other

Be processed by Lambda's weak compute

Decomposition is done based on the

*data type*and*computation type*Graph operations performed on GSes: computations that involve adjacency matrix of the input graph (i.e. A)

Lambda operations: computations that involve only tensor data

#### Specific tasks over each training epoch

#### Forward pass

computes the output using current weights

graph-parallel tasks on GSes

Gather (GA)

Scatter (SC)

tensor-parallel tasks on Lambdas: multiply matrices involving only features and weights and apply activation functions

Apply Vertex (AV)

Lambda threads retrieve vertex data (H_(

*L)) from GSes*and weight data (W_(L)) from PSes, compute their products, apply ReLU, and send the result back to GSes as the input for scatterAV returns --> SC sends data, across cross-partition edges, to the machines that host their destination vertices

Apply Edge (AE)

Each Lambda thread retrieves

Vertex data from the source and destination vertices of the edge (i.e. activation vectors)

Edge data (such as edge weights) from GSes

Computes per-edge update by performing model-specific tensor operations

Updates are streamed back to GSes and become the inputs of the next layer's GA task

#### Backward pass

uses a loss function to compute weight updates

Involves: GSes, Lambdas, and PSes that coordinate to run a graph-augmented SGD algorithm

Each task in the forward pass --> a backward pass corresponded

Either propagates information in the reverse direction of edges on the graph

Or computes the gradients of its trainable weights with respect to a given loss function

Weight updates (WU), aggregate gradients across PSes

Del(GA), Del (SC): propagate information in the reverse direction

AE & AV: apply weights to compute the output of the edge and vertex NN

Conversely, Del(AE) and Del(AV) compute weight updates for the NNs, which are inputs to WU

Tensor-only computation and are executed by Lambdas

#### Pipelining

GSes kick off training by running parallel graph tasks

Dorylus

**divides vertices****in each partition**into**intervals**(i.e. minibatches)For each interval, the amount of tensor computations depends on both number of vertices (i.e. AV) and number of edges (i.e. AE) in the interval

Amount of graph computation (on a GS) depends primarily on the number of edges (i.e. GA and SC)

Balance work intervals: division uses simple algorithm to ensure that different intervals have the same number of vertices and vertices in each interval have similar numbers of inter-interval edges

Edges incur cross-minibatch dependency (async pipeline needs to handle)

Each interval is processed by a task

When the pipeline is saturated, different tasks will be executed on distinct intervals of vertices

Each GS maintains a task queue and enqueues a task once it's ready to execute (i.e. input available)

Thread pool: # of threads = # of vCPUs

Output of GS task is fed to a Lambda for tensor computation

Figure 4

Dorylus enqueues a set of GA tasks, each processing a vertex interval

# of threads on each GS << # of tasks, some tasks finish earlier than others and results are pushed immediately to Lambda threads for AV

Once they are done, outputs are sent back to GS for Scatter

Backward pass

Del(AE) and Del(AV) compute gradients and send them to PSes for weight updates

Overall, through pipelining, Dorylus overlaps the graph-parallel and tensor-parallel computations so as to hide Lambda's communication latency

Contribution: enable pipelining in GNN training

Fine-grained tasks

Insight of computation separation

### Bounded Asynchrony

Goal: workers do not need to wait for updates to proceed in most cases

Lambdas run in an extremely dynamic environment and stragglers almost always exist

Challenge: asynchrony slows down convergence (fast-progressing minibatches may use out-of-date weights, prolonging the training time)

Existing solutions: bounded staleness (lightweight sync)

But challenge uniquely to Dorylus is the two sync points

weight sync at each WU task

sync of (vertex) activations data from neighbors at each GA

#### Bounded Asynchrony at Weight Updates

Full asynchrony:

different vertex intervals are at their own training pace; some intervals may use a particular version v0 of weights during a forward pass to compute gradients while applying these gradients on another version v1 of weights on their way back in the backward pass

Statistical inefficiency

Method: weight stashing (proposed in PipeDream)

allows any interval to use the latest version of weights available in a forward pass and stashes (i.e., caches) this version for use during the corresponding backward pass.

Challenge:

Occurs at PSes (host weight matrices and perform updates)

Need to balance loads and bandwidth usage

Solution:

Multiple PSes

Directs a Lambda to a PS that has the lightest load

Each Lambda can be at different stages of an epoch (e.g. forware & backward pass, different layers)

Each PS host a replication of weight matrices of all layers --> any Lambda can use any PS in any stage

GNN: few layers, replicating weights are feasible (no much memory overheads)

But if we allow any Lambda to use any PS, each PS has to maintain not only the latest weight matrices but also their stashed version of all intervals, practically impossible

Solution:

each PS has replication of all latest weights (periodically broadcast their latest weights), but weight stashes only for a subset of vertex intervals

for each interval in a given epoch, the interval's weight stashes are only maintained on the first PS it interacts with in the epoch

#### Bounded Asynchrony at Gather

Async gather allows vertex intervals to progress independently using stale vertex values (i.e activation vectors) from their neighbors without waiting for their updates

Number of layers in a GNN is determined statically and n-layer GNN aims to propagate the impact of a vertex's n-hop neighborhood to the vertex.

Question: can asynchrony change the semantics of the GNN?

1) Can vertices eventually receive the effect of their n-hop neighborhood? Yes

2) Is it possible for any vertex to receive the effect of its m-hop neighbor where m > n after many epochs? No

Use bounded staleness at Gather (a fast-moving vertex interval is allowed to be at most S epochs away from the slowest-moving interval)

### Lambda Management

Each GS runs a Lambda controller, which launches Lambdas, batches data to be sent to each Lambda, monitors each Lambda's health, and routes its result back to the GS

Launched by the controller for a task t at the time t's previous task starts executing

E.g. n Lambda threads are launched (prepare for AV) when n GA tasks start to run

Setting

Each runs with Open-BLAS library

Communicate with GS and PS using ZeroMQ

Deployed inside virtual private cloud (VPC)

AWS reuses a container that already has the code deployed instead of cold-starting a new container

Times each execution, relaunches after timeout

#### Lambda Optimizations

Challenge: limited network bandwidth

Per-lambda bandwidth goes down as the number of lambdas increases (suspect: Lambdas created by the same user get scheduled on the same machine and share a network link)

Three optimizations

**Task fusion**Since AV of the last layer in a forward pass is connected directly to del(AV) of the last layer in the next backward pass, merge them into a single Lambda-based task

Reduce invocations of thousands of Lambdas for each epoch

Save a round-trip communication between Lambdas and GSes

**Tensor rematerialization**Now: cache intermediate result during the forward pass to be used in the backward pass

But: because network communication is a bottleneck, more profitable to rematerialize the intermediate tensors by launching more Lambdas rather than retrieving them from GSes

**Lambda-internal streaming**A data chunk: retrieve the first half of the data, with which it proceeds to computation while simultaneously retrieving the second half

Can: reduce Lambda response time

#### Autotuning Numbers of Lambdas

Performance: effectiveness of Lambdas depends on whether the pipeline can be saturated

Too few Lambdas would not generate enough task instances for the graph computations to saturate CPU cores

Cost side: we want to avoid over-saturating the pipeline which can generate too many CPU tasks for the GS to handle

Opt number of Lambdas is related to

Pace of the graph computation

Which depends on the graph structure (i.e. density) and partitioning that are hard to predict before execution

min(#intervals, 100)

number of intervals = number of vertex intervals on each GS

Auto-tuner auto-adjusts the number by periodically checking the size of the CPU's task queue

If the size of the queue constantly grows: CPU cores have too many tasks to process, scale down

If queue quickly shrinks, scale up the number of Lambdas

Goal: stabilize the size of the queue s.t. the number of Lambdas matches the pace of graph tasks

### Evaluation

#### Instance Selection

c5: compute-optimized instances

c5n: compute and network optimized instances

Lambda: a container with 0.11 vCPUs and 192MB memory

$0.2 per 1M requests

Compute cost: $0.01125/h (billed per 100 ms)

Able to handle short bursts of massive parallelism much better than CPU instances

#### Asynchrony

Can provide overall performance benefits in general but too large a staleness value leads to slow convergence and poor performance, although the per-epoch time reduces

Choose s = 0 as default Lambda variant in the following experiments: allow a vertex to use a stale value from a neighbor as long as the neighbor is in the same epoch (e.g. can be in the previous layer)

#### Effects of Lambdas

Compare it with CPU-only servers for computations and GPU-only servers (both without Lambdas)

Use computation separation for scalability

Value: take the reciprocal of the total time (i.e. the performance or rate of completion) and divide it by the cost

Dorylus with Lambdas provides a 2.75x higher value than CPU-only

But for small dense graphs, GPU-only variants is better

This suggests that GPUs may be better suited to process small, dense graphs rather than large, dense graph

**Scaling out **

Dorylus can gain even more value by scaling out to more servers, due to the burst parallelism provided by Lambdas and deep pipelining.

Scales well in terms of both performance and values

Dorylus can roughly provide the same value as the CPU-only variant with only half of the number of servers.

**Other observations **

The more sparse the graph, the more useful Dorylus is

E.g. for Amazon and Friendster, Dorylus even outperforms the GPU-only version for two reasons

The fraction of time on scatter is significantly larger in those two examples

Why? The time depends on a combination of the number of ghost vertices and inter-partition edges. They have many inter-partition edges, but very few ghost vertices

Scatter takes much longer time in GPU clusters

Moving ghost data between GPU memories on different nodes is much slower than data transferring between CPU memories

Lambda threads are more effective in boosting performance for GAT than GCN

GAT includes an additional AE task, which performs per-edge tensor computation and thus benefits significantly from a high degree of parallelism

Dorylus achieves comparable performance with CPU-only variant that uses twice as many servers

#### Comparison with existing systems

#### Breakdown of Performance and Costs

Last updated