Dorylus: Affordable, Scalable, and Accurate GNN Training with Distributed CPU Servers and Serverless



  • Two major GNN training obstacles

    • it replies on high-end servers with many GPUs which are expensive to purchase and maintain

    • limited memory on GPUs cannot scale to today's billion-edge graphs

  • Dorylus: distributed system for training GNNs which takes advantage of serverless computing to increase scalability at a low cost

  • Key: computation separation

    • Construct a deep, bounded-asynchronous pipeline where graph and tensor parallel tasks can fully overlap

    • Effectively hiding the network latency incurred by Lambdas


  • Model in GNN family: GCN, GRN, GAT

  • Wide range of applications

  • GPUs

    • Benefits: highly-parallel computations

    • Drawbacks:

      • expensive

        • Even in public cloud (show the pricing)

      • limited memory --> hinder scalability

  • Existing approaches to solve the two drawbacks

    • CPU

      • Looser memory restrictions, and operate at lower costs

      • But: far inferior efficiency (speed)

    • Graph sampling techniques

      • Improve scalability by considering less graph data

      • Limitations

        • must be done repeatedly per epoch, incurring time overheads

        • reduce accuracy of trained GNNs

        • no guarantee on convergence

  • Affordable, Scalable, and Accurate GNN Training

    • High efficiency: close to GPUs

    • High accuracy: e.g higher than sampling

    • Low cost for training billion-edge graphs

  • Turn to: Serverless computing paradigm

    • Large number of parallel threads at an extremely low price

    • Challenges

      • Limited compute resources (e.g. 2 weak vCPUs)

      • Restricted network resources (e.g., 200 Mbps between Lambda servers and standard EC2 servers)

  • Challenges

    • how to make computation fit into Lambda's weak compute profile?

      • Not all operations in GNN training need Lambda's parallelism

      • Divide training pipeline into a set of fine-grained tasks, based on the type of data they process

        • graph-parallel path: CPU instances

        • tensor-parallel path: Lambdas

          • Lightweight linear algebra operation on a data chunk of a small size

          • cheaper than regular CPU instances

    • how to minimize the negative impact of Lambda's network latency?

      • Now, Lambdas can spend one-third of their time on communication

      • To not let communication to be the bottleneck: bounded pipeline asynchronous computation (BPAC)

        • Make full use of pipelining where different fine-grained tasks overlap with each other

          • Graph-parallel & tensor-parallel tasks run simultaneously

        • Also, to reduce the wait time between tasks, incorporates asynchrony into the pipeline

          • Challenge: two computation paths, where exactly should asynchrony be introduced?

          • Use it where staleness can be tolerated

            • parameter updates (tensor-parallel path)

            • data gathering from neighbor vertices (graph-parallel path)

          • To prevent asynchrony from slowing down the convergence, bounds the degree of asynchrony at each location using

            • weight stashing at parameter updates

            • bounded staleness at data gathering


  • GNN

    • Input: graph-structured data, each vertex is associated with a feature vector

    • Output: a feature vector for each individual vertex or the whole graph

    • Usage: output feature vectors can be used for graph or vertex classification

    • Goal: learn the patterns and relationships among the data, rather than relying solely on the features of a single data point

    • Combine

      • Graph propagation (Gather and Scatter)

      • NN computations

  • Graph-parallel interface

    • E.x. deep graph library (DGL), which unifies a variety of GNN models with a common GAS-like interface

Example: GCN

Forward Pass

  • A: adjacency matrix of the input graph, A_bar = A + I_N: adjacency matrix with self-loops constructing by adding A with the identity matrix

  • D_bar: diagonal matrix such that

  • Normalized adjacency matrix

  • W_L: layer-specific trainable weight matrix

  • Non-linear activation function, such as ReLU

  • H_L: activations matrix of the L-th layer

  • H_0 = X is the input feature matrix for all vertices

Mapping R1 to vertex-centric computation model:

  • Gather (GA)

  • Apply Vertex (AV)

  • Scatter (SC)

  • Apply Edge (AE)

  • One can see layer L's activations matrix H_L as a group of activation vectors, each associated with a vertex

  • Goal: compute a new activations vector for each vertex based on the vertex's previous activations vector (which, initially is its feature vector) and the information received from its in-neighbors

  • Different (!) from traditional graph processing

    • The computation of new activations matrix H_(L+1) is based on computationally intensive NN operations rather than a numerical function

  • Step

    • GA stage retrieves a (activation) vector from each in-edge of a vertex and aggregates these vectors into a new vector v

      • Applying GA on all vertices is just a matrix multiplication between normalized adjacency matrix and input activations matrix

    • The result from first stage is fed to AV, which performs NN operations to obtain a new activations matrix H_(L+1)

      • For GCN, AV multiplies the result from stage one with a trainable weight matrix W_L and applies non-linear activation function

    • The output of AV goes to SC, which propagates the new activations vector of each vertex among all out-edges of the vertex

      • The same or different?

    • AE: Lastly, new activations vector of each vertex goes into an edge-level NN architecture to compute an activations vector for each edge

      • GCN: edge-level NN is not needed

  • Repeat the process k times (k layers) allow the vertex to consider features of vertices k hops away

Backward Pass

  • computes the gradients for all trainable weights in the vertex- and edge-level NN architectures (i.e., AV and AE).

Training epochs

  • Forward pass

  • Backward pass

  • Weights update

    • Uses the gradients computed in the backward pass to update the trainable weights in the vertex- and edge-level NN architecture in a GNN

Run repeatedly until reaching acceptable accuracy

Design Overview

Three components:

  • EC2 graph servers (GS)

    • Input graph is partitioned using an edge-cut algorithm that takes care of load balancing across partitions

    • Each partition is hosted by a graph server

      • Vertex data: represented as a 2D array

      • Edge data: represented as a single array

      • Edges are stored in the compressed sparse row (CSR) format

    • Ghost buffer

      • Stores the data that are scattered in from remote servers

    • Communicate

      • With each other: to execute graph computations by sending / receiving data along cross-partitions edges

        • Only during scatter in both

          • Forward pass: activation values are propagated along cross-partition edges

          • Backward pass: gradients are propagated along the same edges in the reverse direction

      • With lambda threads: execute tensor computations

  • Lambda threads for tensor computation

    • Tensor operations such as AV and AE interleave with graph operations

    • Employs a high-performance linear algebra kernel for tensor computation

    • Used in both forward and backward pass

    • Communicates frequently with PS

      • Forward: retrieve weights from PSes to compute layer output

      • Backward: compute updated weights

  • EC2 parameter servers

Graph computation is done in a conventional way, breaking a vertex program into stages of

  • vertex parallel (gather)

  • edge parallel (scatter)

Tasks and pipelining

Fine-grained Tasks

  • Challenge: decompose the process into a set of fine-grained tasks that can

    • Overlap with each other

    • Be processed by Lambda's weak compute

  • Decomposition is done based on the data type and computation type

    • Graph operations performed on GSes: computations that involve adjacency matrix of the input graph (i.e. A)

    • Lambda operations: computations that involve only tensor data

Specific tasks over each training epoch

Forward pass

  • computes the output using current weights

  • graph-parallel tasks on GSes

    • Gather (GA)

    • Scatter (SC)

  • tensor-parallel tasks on Lambdas: multiply matrices involving only features and weights and apply activation functions

    • Apply Vertex (AV)

      • Lambda threads retrieve vertex data (H_(L)) from GSes and weight data (W_(L)) from PSes, compute their products, apply ReLU, and send the result back to GSes as the input for scatter

      • AV returns --> SC sends data, across cross-partition edges, to the machines that host their destination vertices

    • Apply Edge (AE)

      • Each Lambda thread retrieves

        • Vertex data from the source and destination vertices of the edge (i.e. activation vectors)

        • Edge data (such as edge weights) from GSes

      • Computes per-edge update by performing model-specific tensor operations

      • Updates are streamed back to GSes and become the inputs of the next layer's GA task

Backward pass

  • uses a loss function to compute weight updates

  • Involves: GSes, Lambdas, and PSes that coordinate to run a graph-augmented SGD algorithm

  • Each task in the forward pass --> a backward pass corresponded

    • Either propagates information in the reverse direction of edges on the graph

    • Or computes the gradients of its trainable weights with respect to a given loss function

  • Weight updates (WU), aggregate gradients across PSes

  • Del(GA), Del (SC): propagate information in the reverse direction

  • AE & AV: apply weights to compute the output of the edge and vertex NN

  • Conversely, Del(AE) and Del(AV) compute weight updates for the NNs, which are inputs to WU

    • Tensor-only computation and are executed by Lambdas


  • GSes kick off training by running parallel graph tasks

  • Dorylus divides vertices in each partition into intervals (i.e. minibatches)

    • For each interval, the amount of tensor computations depends on both number of vertices (i.e. AV) and number of edges (i.e. AE) in the interval

    • Amount of graph computation (on a GS) depends primarily on the number of edges (i.e. GA and SC)

  • Balance work intervals: division uses simple algorithm to ensure that different intervals have the same number of vertices and vertices in each interval have similar numbers of inter-interval edges

    • Edges incur cross-minibatch dependency (async pipeline needs to handle)

  • Each interval is processed by a task

    • When the pipeline is saturated, different tasks will be executed on distinct intervals of vertices

  • Each GS maintains a task queue and enqueues a task once it's ready to execute (i.e. input available)

    • Thread pool: # of threads = # of vCPUs

  • Output of GS task is fed to a Lambda for tensor computation

  • Figure 4

    • Dorylus enqueues a set of GA tasks, each processing a vertex interval

      • # of threads on each GS << # of tasks, some tasks finish earlier than others and results are pushed immediately to Lambda threads for AV

    • Once they are done, outputs are sent back to GS for Scatter

    • Backward pass

      • Del(AE) and Del(AV) compute gradients and send them to PSes for weight updates

Overall, through pipelining, Dorylus overlaps the graph-parallel and tensor-parallel computations so as to hide Lambda's communication latency

  • Contribution: enable pipelining in GNN training

    • Fine-grained tasks

    • Insight of computation separation

Bounded Asynchrony

  • Goal: workers do not need to wait for updates to proceed in most cases

    • Lambdas run in an extremely dynamic environment and stragglers almost always exist

  • Challenge: asynchrony slows down convergence (fast-progressing minibatches may use out-of-date weights, prolonging the training time)

  • Existing solutions: bounded staleness (lightweight sync)

    • But challenge uniquely to Dorylus is the two sync points

      • weight sync at each WU task

      • sync of (vertex) activations data from neighbors at each GA

Bounded Asynchrony at Weight Updates

  • Full asynchrony:

    • different vertex intervals are at their own training pace; some intervals may use a particular version v0 of weights during a forward pass to compute gradients while applying these gradients on another version v1 of weights on their way back in the backward pass

    • Statistical inefficiency

  • Method: weight stashing (proposed in PipeDream)

    • allows any interval to use the latest version of weights available in a forward pass and stashes (i.e., caches) this version for use during the corresponding backward pass.

  • Challenge:

    • Occurs at PSes (host weight matrices and perform updates)

    • Need to balance loads and bandwidth usage

    • Solution:

      • Multiple PSes

      • Directs a Lambda to a PS that has the lightest load

        • Each Lambda can be at different stages of an epoch (e.g. forware & backward pass, different layers)

        • Each PS host a replication of weight matrices of all layers --> any Lambda can use any PS in any stage

          • GNN: few layers, replicating weights are feasible (no much memory overheads)

        • But if we allow any Lambda to use any PS, each PS has to maintain not only the latest weight matrices but also their stashed version of all intervals, practically impossible

          • Solution:

            • each PS has replication of all latest weights (periodically broadcast their latest weights), but weight stashes only for a subset of vertex intervals

            • for each interval in a given epoch, the interval's weight stashes are only maintained on the first PS it interacts with in the epoch

Bounded Asynchrony at Gather

  • Async gather allows vertex intervals to progress independently using stale vertex values (i.e activation vectors) from their neighbors without waiting for their updates

  • Number of layers in a GNN is determined statically and n-layer GNN aims to propagate the impact of a vertex's n-hop neighborhood to the vertex.

  • Question: can asynchrony change the semantics of the GNN?

1) Can vertices eventually receive the effect of their n-hop neighborhood? Yes

2) Is it possible for any vertex to receive the effect of its m-hop neighbor where m > n after many epochs? No

  • Use bounded staleness at Gather (a fast-moving vertex interval is allowed to be at most S epochs away from the slowest-moving interval)

Lambda Management

  • Each GS runs a Lambda controller, which launches Lambdas, batches data to be sent to each Lambda, monitors each Lambda's health, and routes its result back to the GS

  • Launched by the controller for a task t at the time t's previous task starts executing

    • E.g. n Lambda threads are launched (prepare for AV) when n GA tasks start to run

  • Setting

    • Each runs with Open-BLAS library

    • Communicate with GS and PS using ZeroMQ

    • Deployed inside virtual private cloud (VPC)

    • AWS reuses a container that already has the code deployed instead of cold-starting a new container

    • Times each execution, relaunches after timeout

Lambda Optimizations

  • Challenge: limited network bandwidth

    • Per-lambda bandwidth goes down as the number of lambdas increases (suspect: Lambdas created by the same user get scheduled on the same machine and share a network link)

  • Three optimizations

    • Task fusion

      • Since AV of the last layer in a forward pass is connected directly to del(AV) of the last layer in the next backward pass, merge them into a single Lambda-based task

        • Reduce invocations of thousands of Lambdas for each epoch

        • Save a round-trip communication between Lambdas and GSes

    • Tensor rematerialization

      • Now: cache intermediate result during the forward pass to be used in the backward pass

      • But: because network communication is a bottleneck, more profitable to rematerialize the intermediate tensors by launching more Lambdas rather than retrieving them from GSes

    • Lambda-internal streaming

      • A data chunk: retrieve the first half of the data, with which it proceeds to computation while simultaneously retrieving the second half

      • Can: reduce Lambda response time

Autotuning Numbers of Lambdas

  • Performance: effectiveness of Lambdas depends on whether the pipeline can be saturated

    • Too few Lambdas would not generate enough task instances for the graph computations to saturate CPU cores

  • Cost side: we want to avoid over-saturating the pipeline which can generate too many CPU tasks for the GS to handle

  • Opt number of Lambdas is related to

    • Pace of the graph computation

      • Which depends on the graph structure (i.e. density) and partitioning that are hard to predict before execution

  • min(#intervals, 100)

    • number of intervals = number of vertex intervals on each GS

    • Auto-tuner auto-adjusts the number by periodically checking the size of the CPU's task queue

      • If the size of the queue constantly grows: CPU cores have too many tasks to process, scale down

      • If queue quickly shrinks, scale up the number of Lambdas

  • Goal: stabilize the size of the queue s.t. the number of Lambdas matches the pace of graph tasks


Instance Selection

  • c5: compute-optimized instances

  • c5n: compute and network optimized instances

  • Lambda: a container with 0.11 vCPUs and 192MB memory

    • $0.2 per 1M requests

    • Compute cost: $0.01125/h (billed per 100 ms)

    • Able to handle short bursts of massive parallelism much better than CPU instances


  • Can provide overall performance benefits in general but too large a staleness value leads to slow convergence and poor performance, although the per-epoch time reduces

  • Choose s = 0 as default Lambda variant in the following experiments: allow a vertex to use a stale value from a neighbor as long as the neighbor is in the same epoch (e.g. can be in the previous layer)

Effects of Lambdas

  • Compare it with CPU-only servers for computations and GPU-only servers (both without Lambdas)

    • Use computation separation for scalability

  • Value: take the reciprocal of the total time (i.e. the performance or rate of completion) and divide it by the cost

    • Dorylus with Lambdas provides a 2.75x higher value than CPU-only

  • But for small dense graphs, GPU-only variants is better

    • This suggests that GPUs may be better suited to process small, dense graphs rather than large, dense graph

Scaling out

  • Dorylus can gain even more value by scaling out to more servers, due to the burst parallelism provided by Lambdas and deep pipelining.

  • Scales well in terms of both performance and values

  • Dorylus can roughly provide the same value as the CPU-only variant with only half of the number of servers.

Other observations

  1. The more sparse the graph, the more useful Dorylus is

    1. E.g. for Amazon and Friendster, Dorylus even outperforms the GPU-only version for two reasons

      1. The fraction of time on scatter is significantly larger in those two examples

        1. Why? The time depends on a combination of the number of ghost vertices and inter-partition edges. They have many inter-partition edges, but very few ghost vertices

      2. Scatter takes much longer time in GPU clusters

        1. Moving ghost data between GPU memories on different nodes is much slower than data transferring between CPU memories

  2. Lambda threads are more effective in boosting performance for GAT than GCN

    1. GAT includes an additional AE task, which performs per-edge tensor computation and thus benefits significantly from a high degree of parallelism

  3. Dorylus achieves comparable performance with CPU-only variant that uses twice as many servers

Comparison with existing systems

Breakdown of Performance and Costs

Last updated