Distributed Snapshots: Determining Global States of Distributed Systems

Setup: 分布式snapshot算法,使用:apache flink, apache spark (structured streaming), ray

Reference review:

Main idea

  • Problem: detecting global states

    • Which in terms help solve: "stable property detection" problem, e.g. "computation has terminated", "system is deadlocked", "all tokens in a token ring has disappeared"

    • Can also be used for checkpointing

  • Propose an algorithm by which a process in a distributed system determines a global state of the system during a computation

Problem setup

  • A process can record its own state and the messages it sends and receives, nothing else

    • No share clocks or memory

  • To determine a global system state, a process p must enlist the cooperation of other processes that must record their own local states and send the recorded local states to p

  • Problem: algorithms by which processes record their own states and the states of communication channels so that the set of processes and channel states recorded from a global system state

    • MUST run concurrently with, but not alter the underlying computation

      • Don't stop sending the messages, don't stop the application

  • Example: taking photographies of migrating birds

  • Define a class of problem

    • y(S)=true/falsey(S)= true / false for a global state S of D (distributed system)

      • y is a stable property of D if y(S)y(S) implies y(S)y(S')for all global states SS' of D reachable from global state SS of D

      • i.e. if y is a stable property and y is true at a point in a computation of D, then y is true at all later points in that computation

  • The problem of initiating the next phase of computation is not considered here

Model of a distributed system

  • Labeled, directed graphs

    • Vertices: processes

      • Defined by a set of states, an initial state, and a set of events

      • An event ee in a process pp is an atomic action that may change the state of pp itself and the state of at most one channel cc incident on pp

          • 1) The process p in which the event occurs

          • 2) The state s of p immediately before the event

          • 3) The state s' of p immediately after the evetn

          • 4) The channel c (if any) whose state is altered by the event

          • 5) The message M, if any, sent along c (if c is a channel directed away from p) or received along c (if c is directed towards p)

        • M and c are a special symbol, nullnull, if the occurrence of e does not change the state of any channel

        • Event can occur in global state SS if and only if

          • 1) The state of process p in global state S is s

          • and 2) if c is a channel directed towards p, then the state of c in global state S is a sequence of messages with M at its head

    • Edges: channels

      • State = sequence of messages sent along the channel, excluding the messages received along the channel

  • Assumption

    • Channel: infinite buffers, error-free, delivery messages in the order sent

  • A global state is a set of component process and channel states

    • Initial global state: state of each process is its initial state and the state of each channel is the empty sequence

    • next(S,e)next(S, e)= global state immediately after the occurrence of event e in global state S

      • computation of the system iff eie_i can occur in global state SS

  • Example 1

    • State transition illustration (four states: in-p, in-c, in-q, in-c')


  • Motivation, outline, termination


  • How do global state recording algorithms work?

    • Each process: record its state

    • Two processes that a channel is incident on cooperate in recording the channel state

    • Require the recorded process and channel states form a "meaningful" global system state

  • Example of transition

    • One scenario: inconsistency arises because the state of p is recorded before p sent a message along c and the state of c is recorded after p sent the message

      • n = number of msgs sent along c before p's state is recorded

        • 在 p 的状态记录之前,p 发往channel c的msg数

      • n' = number of msgs sent along c before c's state is recorded

        • 在 c 的状态记录之前,p 发往channel c的msg数

      • the recorded global state may be inconsistent if n < n'

        • 两个token的情况: p的状态记录在in-p中,其他的状态记录在in-c中

        • n = 0, n' = 1?

    • Another scenario: state of c is recorded in global state in-p, the system then transits to global state in-c, and the states of c', p, and q are recorded in global state in-c

      • The recorded global state shows no token in the system

      • Example suggests that the recorded global state may be inconsistent if the state of c is recorded before p sends a message along c and the state of p is recorded after p sends a message along c, that is, n > n'

    • Thus, a consistent global state requires (1) n=nn=n'

  • Definition

    • Let m be the number of messages received along c before q's state is recorded.

    • Let m' be the number of messages received along c before c's state is recorded

    • (2) m=mm=m': 与之前的分析相同

    • (3) nmn'\geq m'

      • I.e. # of msgs received along a channel cannot exceed the # of msgs sent along that channel in every state

    • (4) nmn\ge m

    • 理解

      • Channel要记录的state = list of msgs sent along the channel before the sender's state is recorded - list of msgs received along the channel before the receiver's state is recorded

        • 需要example理解这个内容

  • (1) - (4) suggests a simple algorithm by which q can record the state of channel c

    • Process p sends a special message, called a marker, after the nth message it sends along c

    • The state of c is the sequence of messages received by q after q records its own state and before q receives the marker along c

    • To ensure (4), q must record its state, if it has not done so already, after receiving a marker along c and before q receives further messages along c


  • Initiation

    • can be done by one or more processes, each of which records its state spontaneously, without receiving markers from other processes

  • Marker-sending rule for a process p: for each channel c, incident on, and directed away from p

    • p sends one marker along c after p records its state and before p sends further messages along c

    • Or

      • p records its state and prepares a special marker message (differ from the application message)

      • p sends the marker message to all other processes (using N-1 outbound channels), in this case, only q

      • Start recording all incoming messages from channels CjiC_{ji}for j not equal to i

    • Marker-receiving rule for a process q: on receiving a marker along a channel c

      • If not receive the marker message for the first time

        • Add all messages from inbound channels since we began recording to their states

  • Termination of the algorithm

    • Need to ensure that

      • (L1) no marker remains forever in an incident input channel

      • (L2) it records its states within finite time of initiation of the algorithm

    • Rules

      • All processes have received a marker (and recorded their own state)

      • All processes have received a marker on the N - 1 incoming channels (and recorded their states)

      • Later, a central server can gather the partial state to build a global snapshot

Proof and properties ref. paper

  • Casual consistency

    • Related to the Lamport clock partial ordering

    • An event is presnapshot if it occurs before the local snapshot on a process

    • Postsnapshot if afterwards

    • If event A happens casually before event B, and B is pre-snapshot, then A is too

Some questions

  • Also assume no failures on the channels? All messages arrive, no duplicate, in order and such?

    • Too strong assumptions could reduce the deployability of the algorithm?

  • What if the markers are lost? Retransmission? In-correct ordering?

  • What if the topology is dynamically changing

  • How fast does it take to form a global state? (with all the mechanisms of snapshotting, collecting, and putting together states from different components)

  • Centralized controller seem to be more easy to implement and reason about

Last updated