MIT 6.824(6.5840) Distributed Systems

tiny_star Lv3

本科就打算学的课程一直拖到现在,这次一定要学完🤔 6.5840 is a core 12-unit graduate subject with lectures, readings, programming labs, an optional project, a mid-term exam, and a final exam. It will present abstractions and implementation techniques for engineering distributed systems. Major topics include fault tolerance, replication, and consistency. Much of the class consists of studying and discussing case studies of distributed systems

Lecture 1: Introduction

A “distributed system”:
a group of computers cooperating to provide a service

Examples:
popular apps’ back-ends, e.g. for messaging
big web sites
cloud providers

Focus here is distributed infrastructure:
storage
transaction systems
“big data” processing frameworks

Hard to build:
concurrency
complex interactions
performance bottlenecks
partial failure

Why useful?
to increase capacity via parallel processing
to tolerate faults via replication
to match distribution of physical devices e.g. sensors
to increase security via isolation

Why take this course?
interesting – hard problems, powerful solutions
big demand – driven by the rise of big Web sites
active research area – important unsolved problems
challenging – the labs

Lectures:
paper discussion, context, lab guidance

Papers:
one per lecture
research papers, some classic, some new
ideas, problems, implementation details, evaluation
please read papers before class!
web site has a question about each paper

Labs:
goal: deeper grasp of some important techniques
goal: experience with distributed programming
first lab is due a week from Friday
one per week after that for a while

Lab 1: distributed big-data framework (like MapReduce)
Lab 2: client/server vs unreliable network
Lab 3: fault tolerance using replication (Raft)
Lab 4: a fault-tolerant database
Lab 5: scalable database performance via sharding

MAIN TOPICS

This is a course about infrastructure
Storage
Communication
Computation

A big goal: hide the complexity of distribution from applications

Topic: fault tolerance
1000s of servers, big network -> constant failures
We’d like to hide these failures
“High availability”: service continues despite failures
Big idea: replication
If one server crashes, can proceed using the other(s)

Topic: consistency
General-purpose infrastructure needs well-defined behavior
E.g. “read(x) yields the value from the most recent write(x)”
Guaranteeing specified behavior is hard!
e.g. “replica” servers are hard to keep identical

Topic: performance
A common goal: scalable throughput
Nx servers -> Nx total throughput via parallel CPU, RAM, disk, net
Scaling gets harder as N grows
e.g. load imbalance

Topic: tradeoffs
Fault-tolerance, consistency, and performance are enemies
Fault tolerance and consistency require communication
e.g., send data to backup server
e.g., check if cached data is up-to-date
but communication is often slow and hard to scale up
Many designs sacrifice consistency to gain speed
e.g. read(x) might not yield the latest write(x)!
Painful for application programmers (or users)
We’ll see many consistency/performance design points

Topic: implementation
RPC, threads, concurrency control
The labs..

CASE STUDY: MapReduce

Let’s talk about MapReduce (MR)
a good illustration of 6.5840’s main topics
hugely influential
the focus of Lab 1

Context: multi-hour computations on multi-terabyte data-sets
e.g. build search index, or sort, or analyze structure of web
only practical with 1000s of computers

A big goal: easy for non-specialist programmers
programmer just defines Map and Reduce functions
often simple sequential code
MR manages, and hides, all aspects of distribution!
MR is a framework / library; “application” is just Map()/Reduce()

Abstract view of a MapReduce job – word count
Input1 -> Map -> a,1 b,1
Input2 -> Map -> b,1
Input3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,1
| ——> Reduce -> b,2
———–> Reduce -> a,2
1) input is (already) split into M pieces
2) MR calls Map() for each input split, produces list of k,v pairs
“intermediate” data
each Map() call is a “task”
3) when Maps are done,
MR gathers all intermediate v’s for each k,
and passes each key + values to a Reduce call
4) final output is set of <k,v> pairs from Reduce()s

Word-count code
Map(d)
chop d into words
for each word w
emit(w, “1”)
Reduce(k, v[])
emit(len(v[]))

MapReduce scales well:
N “worker” computers (might) get you Nx throughput
Maps()s can run in parallel, since they don’t interact
Same for Reduce()s
Thus more computers -> more throughput – very nice!

MapReduce hides much complexity:
sending map+reduce code to servers
tracking which tasks have finished
“shuffling” intermediate data from Maps to Reduces
balancing load over servers
recovering from crashed servers

To get these benefits, MapReduce restricts applications:
Only one pattern (Map -> shuffle -> Reduce)
No interaction or state (other than via intermediate output)
Only batch: no real-time or streaming processing

Some details (paper’s Figure 1)

Input and output are stored on the GFS cluster file system
MR needs huge parallel input and output throughput
GFS splits files over many servers, many disks, in 64 MB chunks
Maps read in parallel
Reduces write in parallel
GFS replicates data on 2 or 3 servers, for fault tolerance
GFS is a big win for MapReduce

MR writes Map() output to local disk
MR splits into files by hash(key) mod R
each “hash bucket” contains multiple keys
The map workers all hash the same way

The shuffle
each Reduce task processes one hash bucket
MR fetches each Reduce tasks’ bucket from every Map worker
merge, sort by key, call Reduce() for each key
each Reduce task writes a separate output file on GFS

The “Coordinator” manages all the steps in a job
tracks state of each task
hands out tasks to worker machines

What will limit performance?
We care since that limit is the thing to optimize
CPU? memory? disk? network?

In 2004 authors were limited by network speed
What does MR send over the network?
Maps read input from GFS
Reduces fetch Map intermediate output
Often as large as input, e.g. for sorting
Reduces write output files to GFS

How fast was the paper’s network?
Section 5.1: 1800 machines, two-level switched network
[diagram: root switch, 2nd level of switches, machines]
each switch must have had ~42 ports (square root of 1800)
MR’s shuffle requires every worker to fetch data from every other
Only 1/42nd stays in local switch
So MR’s shuffle sends most data through root switch
Paper’s root switch: 100 to 200 gigabits/second, total
1800 machines, so ~55 megabits/second/machine
55 is small: less than disk or RAM speed

How does MR minimize network use?
Coordinator tries to run each Map task on GFS server that stores its input
All computers run both GFS and MR workers
So Map input is usually read from GFS data on local disk, not over network
Intermediate data goes over network just once
Map worker writes to local disk
Reduce workers read from Map worker disks over the network
(Storing it in GFS would require at least two trips over the network.)

How does MR get good load balance?
Why do we care about load balance?
If one server has more work than others, or is slower,
then other servers will lie idle (wasted) at the end, waiting
So ideally MR divides work so that all workers finish at same time
But tasks vary in size, and computers vary in speed
Solution: many more tasks than worker machines
Coordinator hands out new tasks to workers who finish previous tasks
So faster servers do more tasks than slower ones
And slow servers are given less work, reducing impact on total time

What about fault tolerance?
What if a worker computer crashes?
We want MR framework to hide failures
Does MR have to re-run the whole job from the beginning?
Why not?
Coordinator re-runs just the failed Map()s and Reduce()s

Suppose MR runs a Map task twice, one Reduce sees first run’s output, but another Reduce sees the second run’s output?
The two Map executions had better produce identical intermediate output!
Map and Reduce should be pure deterministic functions:
they are only allowed to look at their arguments/input
no state, no file I/O, no interaction, no external communication, no random numbers
Programmer is responsible for ensuring this determinism

Other failures/problems:
What if the coordinator gives two workers the same Map() task?
perhaps the coordinator incorrectly thinks one worker died
it will tell Reduce workers about only one of them
What if the coordinator gives two workers the same Reduce() task?
they will both try to write the same output file on GFS!
atomic GFS rename prevents mixing; one complete file will be visible
What if a single worker is very slow – a “straggler”?
perhaps due to flakey hardware
coordinator starts a second copy of last few tasks
What if a worker computes incorrect output, due to broken h/w or s/w?
too bad! MR assumes “fail-stop” CPUs and software
What if the coordinator crashes?

Performance?
Figure 2
X-Axis is time
Y-Axis is total rate at which a “grep”-style job reads its input
A terabyte (1000 GB) of input
1764 workers
30,000 MB/s (30 GB/s) is huge!
Why 30,000 MB/s?
17 MB/s per worker machine – 140 megabits/second
more than our guess (55 mbit/s) of net bandwidth
input probably read direct from two local GFS disks
so each disk probably could read at about 9 MB/second
Why is the main period of activity about 30 seconds?
Why does it take 50 seconds for throughput to reach maximum?

Current status?
Hugely influential (Hadoop, Spark, Lab 1, &c)
Probably no longer in use at Google
Replaced by Flume / FlumeJava (see paper by Chambers et al)
GFS replaced by Colossus (no good description), and BigTable

Lecture 2: Threads and RPC

Details are Go specific but the concepts are important and widely used

Go threads, and the web crawler

Why Go?
good support for threads
convenient RPC
type- and memory- safe
garbage-collected (no use after freeing problems)
threads + GC is particularly attractive!
not too complex
Go is often used in distributed systems

After the tutorial, use https://golang.org/doc/effective_go.html

Threads
a useful structuring tool, but can be tricky
Go calls them goroutines; everyone else calls them threads

Thread = “thread of execution”
threads allow one program to do many things at once
each thread executes serially, just like a non-threaded program
the threads share memory
each thread includes some per-thread state:
program counter, registers, stack

Why threads?
I/O concurrency
Client sends requests to many servers in parallel and waits for replies
Server processes many simultaneous client requests
Each request may block
While waiting for the disk to read data for client X, process a request from client Y
Multicore performance
Execute code in parallel on several cores
Convenience
In background, once per second, check whether each worker is still alive

Is there an alternative to threads?
Yes: write code that explicitly interleaves activities, in a single thread
Usually called “event-driven”
Keep a table of state about each activity, e.g. each client request
One “event” loop that:
checks for new input for each activity (e.g. arrival of reply from server), does the next step for each activity, updates state
Event-driven can get you I/O concurrency, and eliminates thread costs (which can be substantial), but doesn’t get multi-core speedup, and is painful to program

Threading challenges:
sharing data safely
what if two threads do n = n + 1 at the same time?
or one thread reads while another increments?
this is a “race”
= two threads use same memory at same time, one (or both) writes
often a bug
-> use locks (Go’s sync.Mutex)
-> or avoid sharing mutable data
coordination between threads
one thread is producing data, another thread is consuming it
how can the consumer wait (and release the CPU)?
how can the producer wake up the consumer?
-> use Go channels or sync.Cond or sync.WaitGroup
deadlock
a cycle of threads waiting for each other
via locks, or channels, or RPC

Let’s look at the tutorial’s web crawler as a threading example

What is a web crawler?
goal: fetch all web pages, e.g. to feed to an indexer
you give it a starting web page
it recursively follows all links
[diagram: pages, links, a DAG, a cycle]
but don’t fetch a given page more than once
and don’t get stuck in cycles

Crawler challenges
Exploit I/O concurrency
Network latency is more limiting than network capacity
internet latency: maybe 0.1 seconds, due to speed of light &c
internet throughput: maybe MB/sec or GB/sec
Fetch many pages in parallel
To increase URLs fetched per second
=> Use threads for concurrency
Fetch each URL only once
avoid wasting network bandwidth
avoid link cycles
be nice to remote servers
=> Need to remember which URLs visited
Know when finished

We’ll look at three solutions
Serial
Concurrent, coordination via shared data
Concurrent, coordination via channels

Serial crawler:
performs depth-first exploration via recursive Serial calls
the “fetched” map avoids repeats, breaks cycles
a single map, passed by reference, caller sees callee’s updates
finished when all [recursive] links are explored: easy
but: fetches only one page at a time – slow
can we just put a “go” in front of the Serial() call?
what will happen?
let’s try it… what happened?

ConcurrentMutex crawler:
Creates a thread for each page fetch
Many concurrent fetches, higher fetch rate
the “go func” creates a goroutine and starts it running
func… is an “anonymous function”
The threads share the fs.fetched map
So only one thread will fetch any given page
Why the Mutex (Lock() and Unlock()) in testAndSet()?
One reason:
Two threads make simultaneous calls to ConcurrentMutex() with same URL
Due to two different pages containing link to same URL
T1 reads fetched[url], T2 reads fetched[url]
Both see that url hasn’t been fetched (fetched[url] = false)
Both fetch, which is wrong
The mutex causes one to wait while the other does both check and set
So only one thread sees fetched[url]==false
We say “the lock protects fs.fetched[]”
But note Go does not enforce any relationship between locks and data!
The code between lock/unlock is often called a “critical section”
Another reason:
Internally, map is a complex data structure (tree? expandable hash?)
Concurrent update/update may wreck internal invariants
Concurrent update/read may crash the read
defer…
What if I comment out Lock() / Unlock()?
go run crawler.go
Does it always work? Always fail? Why?
go run -race crawler.go
Detects races even when output is correct!
What if I forget to Unlock()? deadlock
How does the ConcurrentMutex crawler decide it is done?
sync.WaitGroup – it’s basically a counter
Wait() waits for all Add()s to be balanced by Done()s
i.e. waits for all child threads to finish
[diagram: tree of goroutines, overlaid on cyclic URL graph]
there’s a WaitGroup per node in the tree
How many concurrent threads might there be?

ConcurrentChannel crawler
a Go channel:
a channel is an object
ch := make(chan int)
a channel lets one thread send an object to another thread
ch <- x
the sender waits until some goroutine receives
y := <- ch
a receiver waits until some goroutine sends
also: for y := range ch
channels both communicate and synchronize
several threads can send and receive on a channel
send+recv takes less than a microsecond – fairly cheap
remember: sender blocks until the receiver receives!
“synchronous”
watch out for deadlock
ConcurrentChannel coordinator()
coordinator() creates a worker goroutine to fetch each page
worker() sends slice of page’s URLs on a channel
multiple workers send on the single channel
coordinator() reads URL slices from the channel
At what line does the coordinator wait?
Does the coordinator use CPU time while it waits?
Note: there is no recursion here; coordinator() creates all workers
Note: no need to lock the fetched map, because it isn’t shared!
How does the coordinator know it is done?
Keeps count of workers in n
Each worker sends exactly one item on channel
The channel does two things:
1.communication of values
2.notification of events (e.g. thread termination)

Why is it safe for multiple threads use the same channel?

Is this a race:
Worker thread modifies (creates) url slice, coordinator uses it?
worker only writes slice before sending
coordinator only reads slice after receiving
So they can’t use the slice at the same time, so there’s no race

Why does ConcurrentChannel() create a goroutine just for “ch <- …”?
Let’s get rid of the goroutine…

When to use sharing and locks, versus channels?
Most (all?) problems can be solved in either style
What makes the most sense depends on how the programmer thinks
state – sharing and locks
communication – channels
For the 6.824 labs, I recommend sync.Mutex/sync.Cond for shared state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package main

import (
"fmt"
"sync"
)

//
// Several solutions to the crawler exercise from the Go tutorial
// https://tour.golang.org/concurrency/10
//

//
// Serial crawler
//

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
}

//
// Concurrent crawler with shared state and Mutex
//

type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}

func (fs *fetchState) testAndSet(url string) bool {
fs.mu.Lock()
defer fs.mu.Unlock()
r := fs.fetched[url]
fs.fetched[url] = true
return r
}

func ConcurrentMutex(url string, fetcher Fetcher, fs *fetchState) {
if fs.testAndSet(url) {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
ConcurrentMutex(u, fetcher, fs)
done.Done()
}(u)
}
done.Wait()
}

func makeState() *fetchState {
return &fetchState{fetched: make(map[string]bool)}
}

//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}

//
// main
//

func main() {
fmt.Printf("=== Serial===\n")
Serial("http://golang.org/", fetcher, make(map[string]bool))

fmt.Printf("=== ConcurrentMutex ===\n")
ConcurrentMutex("http://golang.org/", fetcher, makeState())

fmt.Printf("=== ConcurrentChannel ===\n")
ConcurrentChannel("http://golang.org/", fetcher)
}

//
// Fetcher
//

type Fetcher interface {
// Fetch returns a slice of URLs found on the page.
Fetch(url string) (urls []string, err error)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
body string
urls []string
}

func (f fakeFetcher) Fetch(url string) ([]string, error) {
if res, ok := f[url]; ok {
fmt.Printf("found: %s\n", url)
return res.urls, nil
}
fmt.Printf("missing: %s\n", url)
return nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}

Go RPC

Remote Procedure Call (RPC)
a key piece of distributed system machinery; all the labs use RPC
goal: easy-to-program client/server communication
hide details of network protocols
convert data (strings, arrays, maps, &c) to “wire format”
portability / interoperability

RPC message diagram:
Client Server
request—>
<—response

Software structure
client app handler fns
stub fns dispatcher
RPC lib RPC lib
net ———— net

Go example: kv.go on schedule page
A toy key/value storage server – Put(key,value), Get(key)->value
Uses Go’s RPC library
Common:
Declare Args and Reply struct for each server handler
Client:
connect()’s Dial() creates a TCP connection to the server
get() and put() are client “stubs”
Call() asks the RPC library to perform the call
you specify connection, function name, arguments, place to put reply
library marshalls args, sends request, waits, unmarshalls reply
return value from Call() indicates whether it got a reply
usually you’ll also have a reply.Err indicating service-level failure
Server:
Go requires server to declare an object with methods as RPC handlers
Server then registers that object with the RPC library
Server accepts TCP connections, gives them to RPC library
The RPC library
reads each request
creates a new goroutine for this request
unmarshalls request
looks up the named object (in table create by Register())
calls the object’s named method (dispatch)
marshalls reply
writes reply on TCP connection
The server’s Get() and Put() handlers
Must lock, since RPC library creates a new goroutine for each request
read args; modify reply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main

import (
"fmt"
"log"
"net"
"net/rpc"
"sync"
)

//
// Common RPC request/reply definitions
//

type PutArgs struct {
Key string
Value string
}

type PutReply struct {
}

type GetArgs struct {
Key string
}

type GetReply struct {
Value string
}

//
// Client
//

func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}

func get(key string) string {
client := connect()
args := GetArgs{ key }
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}

func put(key string, val string) {
client := connect()
args := PutArgs{ key, val }
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}

//
// Server
//

type KV struct {
mu sync.Mutex
data map[string]string
}

func server() {
kv := &KV{data: map[string]string{}}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

reply.Value = kv.data[args.Key]

return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

kv.data[args.Key] = args.Value

return nil
}

//
// main
//

func main() {
server()

put("subject", "6.5840")
fmt.Printf("Put(subject, 6.5840) done\n")
fmt.Printf("get(subject) -> %s\n", get("subject"))
}

A few details:
Binding: how does client know what server computer to talk to?
For Go’s RPC, server name/port is an argument to Dial
Big systems have some kind of name or configuration server
Marshalling: format data into packets
Go’s RPC library can pass strings, arrays, objects, maps, &c
Go passes pointers by copying the pointed-to data
Cannot pass channels or functions
Marshals only exported fields (i.e., fields w/ CAPITAL letter)

RPC problem: what to do about failures?
e.g. lost packet, broken network, slow server, crashed server

What does a failure look like to the client RPC library?
Client never sees a response from the server
Client does not know if the server saw the request!
[diagram of losses at various points]
Maybe server never saw the request
Maybe server executed, crashed just before sending reply
Maybe server executed, but network died just before delivering reply
Remote procedure call doesn’t behave the same as procedure call on a single machine!
A recurring challenge in implementing distributed systems

Simplest failure-handling scheme: “best-effort RPC”
Call() waits for response for a while
If none arrives, re-send the request
Do this a few times
Then give up and return an error

Q: is “best effort” easy for applications to cope with?
A particularly bad situation:
client executes
Put(“k”, 10);
Put(“k”, 20);
both succeed
what will Get(“k”) yield?
[diagram, timeout, re-send, original arrives late]

Q: is best effort ever OK?
read-only operations
operations that it’s harmless to repeat
e.g. DB checks if record has already been inserted
Other common semantics: at-most-once
For example, Go RPC is a simple form of “at-most-once”
open TCP connection
write request to TCP connection
Go RPC never re-sends a request
So server won’t see duplicate requests
Go RPC code returns an error if it doesn’t get a reply
perhaps after a timeout (from TCP)
perhaps server didn’t see request
perhaps server processed request but server/net failed before reply came back
Labs explore others way of implementing at-most-once
No retry is too restrictive for replicated servers
Like to retry at another replica if first replica fails

Go FAQ

Q: Why does 6.5840 use Go for the labs?
A: Until a few years ago 6.5840 used C++, which worked well. Go works a little better for 6.5840 labs for a couple of reasons. Go is garbage collected and type-safe, which eliminates some common classes of bugs. Go has good support for threads (goroutines), and a nice RPC package, which are directly useful in 6.5840. Threads and garbage collection work particularly well together, since garbage collection can eliminate programmer effort to decide when the last thread using an object has stopped using it. There are other languages with these features that would probably work fine for 6.5840 labs, such as Java

Q: are there any tips/tricks for building an intuition of how to build effective Go code?
A: Get experience by writing Go code and reading other’s people go code. This page has many useful tips: https://go.dev/doc/effective_go

Q: Do goroutines run in parallel? Can you use them to increase performance?
A: Go’s goroutines are the same as threads in other languages. The Go runtime executes goroutines on all available cores, in parallel. If
there are fewer cores than runnable goroutines, the runtime will pre-emptively time-share the cores among goroutines

Q: How do Go channels work? How does Go make sure they are synchronized between the many possible goroutines?
A: You can see the source at https://golang.org/src/runtime/chan.go, though it is not easy to follow
At a high level, a chan is a struct holding a buffer and a lock. Sending on a channel involves acquiring the lock, waiting (perhaps
releasing the CPU) until some thread is receiving, and handing off the message. Receiving involves acquiring the lock and waiting for a
sender. You could implement your own channels with Go sync.Mutex and sync.Cond

Q: I’m using a channel to wake up another goroutine, by sending a dummy bool on the channel. But if that other goroutine is already running (and thus not receiving on the channel), the sending goroutine blocks. What should I do?
A: Try condition variables (Go’s sync.Cond) rather than channels. Condition variables work well to alert goroutines that may (or may
not) be waiting for something. Channels, because they are synchronous, are awkward if you’re not sure if there will be a goroutine waiting at the other end of the channel

Q: How can I have a goroutine wait for input from any one of a number of different channels? Trying to receive on any one channel blocks if there’s nothing to read, preventing the goroutine from checking other channels
A: Try creating a separate goroutine for each channel, and have each goroutine block on its channel. That’s not always possible, but when it works it’s often the simplest approach
Otherwise try Go’s select

Q: When should we use sync.WaitGroup instead of channels? and vice versa?
A: WaitGroup is fairly special-purpose; it’s only useful when waiting for a bunch of activities to complete. Channels are more general-purpose; for example, you can communicate values over channels. You can wait for multiple goroutines using channels, though it takes a few more lines of code than with WaitGroup

Q: I need my code to perform a task once per second. What’s the easiest way to do that?
A: Create a goroutine dedicated to that periodic task. It should have a loop that uses time.Sleep() to pause for a second, and then do the task, and then loop around to the time.Sleep()

Q: How do we know when the overhead of spawning goroutines exceeds the concurrency we gain from them?
A: It depends! If your machine has 16 cores, and you are looking for CPU parallelism, you should have roughly 16 executable goroutines. If it takes 0.1 second of real time to fetch a web page, and your network is capable of transmitting 100 web pages per second, you probably need about 10 goroutines concurrently fetching in order to use all of the network capacity. Experimentally, as you increase the number of goroutines, for a while you’ll see increased throughput, and then you’ll stop getting more throughput; at that point you have enough goroutines from the point of view of performance

Q: How would one create a Go channel that connects over the Internet?How would one specify the protocol to use to send messages?
A: A Go channel only works within a single program; channels cannot be used to talk to other programs or other computers
Have a look at Go’s RPC package, which lets you talk to other Go programs over the Internet: https://golang.org/pkg/net/rpc/

Q: What are some important/useful Go-specific concurrency patterns to know?
A: Here’s a slide deck on this topic, from a Go expert: https://talks.golang.org/2012/concurrency.slide

Q: How are slices implemented?
A: A slice is an object that contains a pointer to an array and a start and end index into that array. This arrangement allows multiple slices to share an underlying array, with each slice perhaps exposing a different range of array elements
Here’s a more extended discussion: https://blog.golang.org/go-slices-usage-and-internals
I use slices often, and arrays never. A Go slice is more flexible than a Go array since an array’s size is part of its type, whereas a function that takes a slice as argument can take a slice of any length

Q: What are common debugging tools people use for Go?
A: fmt.Printf()
As far as I know there’s not a great debugger for Go, though gdb can be made to work: https://golang.org/doc/gdb
In any case, for most bugs I’ve found fmt.Printf() to be an extremely effective debugging tool

Q: When is it right to use a synchronous RPC call and when is it right to use an asynchronous RPC call?
A: Most code needs the RPC reply before it can proceed; in that case it makes sense to use synchronous RPC
But sometimes a client wants to launch many concurrent RPCs; in that case async may be better. Or the client wants to do other work while it waits for the RPC to complete, perhaps because the server is far away (so speed-of-light time is high) or because the server might not be reachable so that the RPC suffers a long timeout period
I have never used async RPC in Go. When I want to send an RPC but not have to wait for the result, I create a goroutine, and have the
goroutine make a synchronous Call()

Q: Is Go used in industry?
A: Yes. You can see an estimate of how much different programming languages are used here: https://www.tiobe.com/tiobe-index/

Q: What are common problems that developers face when starting with Go?
A: Here are a few:
Not protecting maps with locks when there is concurrent access. Use Go’s race detector!
Deadlocks with channels
Not capturing a variable when creating a goroutine
Leaking goroutines

Q: Does Go support inheritance? (In the Java/C++ kind of “extends” way?)
A: Go doesn’t support C++ style inheritance but has generics, interfaces, and embedded structs, which allow you to do many things
for which you would use inheritance in C++

Q: The thing I found most confusing about the Go tutorial was that goroutines don’t continue executing after the main thread has completed. I don’t think this was mentioned explicitly anywhere in the tutorial; I figured it out through debuging the crawler exercise
A: Yes, I don’t think it is in the tutorial, but the language spec is explicit about this: https://golang.org/ref/spec (see Program
execution)

Q: I’m still a little confused about when to choose value or pointer receivers. Can you provide any concrete/real-world examples of when we would choose one over the other?
A: When you want to modify the state of the receiver, you have to use pointer receivers. If the struct is very big, you probably want to
use a pointer receiver because value receivers operate on a copy. If neither applies, you can use a value receiver. However, be careful
with value receivers; e.g., if you have a mutex in a struct, you cannot make it a value receiver, because the mutex would be copied, defeating its purpose

vote examples

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// vote-count-1.go
package main

import "time"
import "math/rand"

func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0

for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
if vote {
count++
}
finished++
}()
}

for count < 5 && finished != 10 {
// wait
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
}

func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int() % 2 == 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// vote-count-2.go
package main

import "sync"
import "time"
import "math/rand"

func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
var mu sync.Mutex

for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
mu.Lock()
defer mu.Unlock()
if vote {
count++
}
finished++
}()
}

for {
mu.Lock()

if count >= 5 || finished == 10 {
break
}
mu.Unlock()
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
mu.Unlock()
}

func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int()%2 == 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// vote-count-3.go
package main

import "sync"
import "time"
import "math/rand"

func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
var mu sync.Mutex

for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
mu.Lock()
defer mu.Unlock()
if vote {
count++
}
finished++
}()
}

for {
mu.Lock()
if count >= 5 || finished == 10 {
break
}
mu.Unlock()
time.Sleep(50 * time.Millisecond)
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
mu.Unlock()
}

func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int() % 2 == 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// vote-count-4.go
package main

import "sync"
import "time"
import "math/rand"

func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
var mu sync.Mutex
cond := sync.NewCond(&mu)

for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
mu.Lock()
defer mu.Unlock()
if vote {
count++
}
finished++
cond.Broadcast()
}()
}

mu.Lock()
for count < 5 && finished != 10 {
cond.Wait()
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
mu.Unlock()
}

func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int() % 2 == 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// vote-count-5.go
package main

import "time"
import "math/rand"

func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
ch := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
ch <- requestVote()
}()
}
for count < 5 && finished < 10 {
v := <-ch
if v {
count += 1
}
finished += 1
}
if count >= 5 {
println("received 5+ votes!")

} else {
println("lost")
}
}

func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int()%2 == 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// vote-count-6.go
package main

import "time"
import "math/rand"

func main() {
rand.Seed(time.Now().UnixNano())

count := 0
ch := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
ch <- requestVote()
}()
}
for i := 0; i < 10; i++ {
v := <-ch
if v {
count += 1
}
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
}

func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int()%2 == 0
}

Lecture 3: GFS

The Google File System
Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung
SOSP 2003

**Why are we reading this paper? **
GFS paper touches on many themes of 6.5840
parallel performance, fault tolerance, replication, consistency
good systems paper – details from apps all the way to network
successful real-world design

Important distributed system ideas
Build fault-tolerant applications using fault-tolerant storage
Sharding
Primary-backup replication protocol
Leases to ensure 1 primary
Checksums for detecting faulty hardware/software
Sacrifice consistency for performance/simplicity

GFS context
Many Google services needed a big fast unified storage system
Mapreduce, crawler, indexer, log storage/analysis
Shared among multiple applications e.g. crawl, index, analyze
Huge capacity
Huge performance
Fault tolerant
But:
just for internal Google use
aimed at batch big-data performance, not interactive

**GFS overview **
100s/1000s of clients (e.g. MapReduce worker machines)
100s of chunkservers, each with its own disk
one coordinator

Capacity story?
big files split into 64 MB chunks
each file’s chunks striped/sharded over chunkservers
so a file can be much larger than any one disk
each chunk in a Linux file

Throughput story?
clients talk directly to chunkservers to read/write data
if lots of clients access different chunks, huge parallel throughput
read or write

Fault tolerance story?
each 64 MB chunk stored (replicated) on three chunkservers
client writes are sent to all of a chunk’s copies
a read just needs to consult one copy

What are the steps when client C wants to read a file?
1.C sends filename and offset to coordinator (CO) (if not cached)
CO has a filename -> array-of-chunkhandle table
and a chunkhandle -> list-of-chunkservers table
2.CO finds chunk handle for that offset
3.CO replies with chunkhandle + list of chunkservers
4.C caches handle + chunkserver list
5.C sends request to nearest chunkserver
chunk handle, offset
6.chunk server reads from chunk file on disk, returns to client

Clients only ask coordinator where to find a file’s chunks
clients cache name -> chunkhandle info
coordinator does not handle data, so (hopefully) not heavily loaded

What about writes?
Client knows which chunkservers hold replicas that must be updated
How should we manage updating of replicas of a chunk?

A bad replication scheme
(This is not what GFS does)
[diagram: C, S1, S2, S3]
Client sends update to each replica chunkserver
Each chunkserver applies the update to its copy

What can go wrong?
*Two* clients write the same data at the same time
i.e. “concurrent writes”
Chunkservers may see the updates in different orders!
Again, the risk is that, later, two clients may read different content

Idea: primary/secondary replication
(or primary/backup)
For each chunk, designate one server as “primary”
Clients send write requests just to the primary
The primary alone manages interactions with secondary servers
(Some designs send reads just to primary, some also to secondaries)
The primary chooses the order for all client writes
Tells the secondaries – with sequence numbers – so all replicas
apply writes in the same order, even for concurrent client writes
There are still many details to fill in, and we’ll
see a number of variants in upcoming papers

What are the steps when C wants to write a file at some offset?
paper’s Figure 2
1.C asks CO about file’s chunk @ offset
2.CO tells C the primary and secondaries
3.C sends data to all (just temporary…), waits for all replies (?)
4.C asks P to write
5.P checks that lease hasn’t expired
6.P writes its own chunk file (a Linux file)
7.P tells each secondary to write (copy temporary into chunk file)
8.P waits for all secondaries to reply, or timeout
secondary can reply “error” e.g. out of disk space
9.P tells C “ok” or “error”
10.C retries from start if error

What data may a client read after a failed write?
Replicas may return different data
Primary P updated its own state
But secondary S1 did not update (failed? slow? network problem?)
Client C1 reads from P; Client C2 reads from S1
they will see different results!
Such a departure from ideal behavior is an “anomaly”
in non-replicated systems, this couldn’t happen!
data can be *inconsistent*
Note: a successful writes doesn’t lead to inconsistency

What is the result of concurrent writes?
Clients break writes spanning chunks into two writes
C1 writes chunks x and y at offset 0 and 1, respectively
C2 writes chunks a b at offset 0 and 1
A possible state of the replicas (without errors): [a y]
x was overwritten by a
b was overwritten with y
x and b are lost!

GFS solution: atomic append of records
Primary picks the offset and pads if record spans chunks
Result for the above case: some interleaving of [a, b, x, and y]
No data lost, but “empty” areas due to padding
Note: client doesn’t determine where the data is in the file
primary picks the offset
append returns offset where client’s record is written
Note: requires changing applications to use append instead of write

What data may a client read after a failed append?
Read from a backup that has the record
Read from a backup that didn’t apply the append: a hole
Read from a backup that recorded the initial and retried append
duplicate records

Consistency: guarantees offered by a storage system to applications
GFS consistency is complex! (Table 1)
if primary tells client that a write succeeded,
and no other client is writing the same part of the file,
all readers will see the write
“defined”
if successful concurrent writes to the same part of a file,
and they all succeed,
all readers will see the same content,
but maybe it will be a mix of the writes
“consistent”
E.g. C1 writes “ab”, C2 writes “xy”, everyone might see “xb”
if primary doesn’t tell the client that the write succeeded,
different readers may see different content, or none
“inconsistent”

Why are these anomalies OK?
They only intended to support a certain subset of their own applications
Written with knowledge of GFS’s behavior
Probably mostly single-writer and Record Append
Writers could include checksums and record IDs
Readers could use them to filter out junk and duplicates
Later commentary by Google engineers suggests that it might have been better to make GFS more consistent
http://queue.acm.org/detail.cfm?id=1594206

What might better consistency look like?
There are many possible answers
Trade-off between easy-to-use for client application programmers, and easy-to-implement for storage system designers
Maybe try to mimic local disk file behavior
Perhaps:
* atomic writes: either all replicas are updated, or none, even if failures
* read sees latest write
* all readers see the same content (assuming no writes)
We’ll see more precision later

Let’s think about how GFS handles crashes of various entities

A client crashes while writing?
Either it got as far as asking primary to write, or not

A secondary crashes just as the primary asks it to write?
1.Primary may retry a few times, if secondary revives quickly with disk intact, it may execute the primary’s request and all is well
2.Primary gives up, and returns an error to the client
Client can retry – but why would the write work the second time around?
3.Coordinator notices that a chunkserver is down
Periodically pings all chunk servers
Removes the failed chunkserver from all chunkhandle lists
Perhaps re-replicates, to maintain 3 replicas
Tells primary the new secondary list

Re-replication after a chunkserver failure may take a Long Time
Since a chunkserver failure requires re-replication of all its chunks
80 GB disk, 10 MB/s network -> an hour or two for full copy
So the primary probably re-tries for a while, and the coordinator lets the system operate with a missing chunk replica, before declaring the chunkserver permanently dead
How long to wait before re-replicating?
Too short: wasted copying work if chunkserver comes back to life
Too long: more failures might destroy all copies of data

What if a primary crashes?
Remove that chunkserver from all chunkhandle lists
For each chunk for which it was primary, wait for lease to expire, grant lease to another chunkserver holding that chunk

What is a lease?
Permission to act as primary for a given time (60 seconds)
Primary promises to stop acting as primary before lease expires
Coordinator promises not to change primaries until after expiration
Separate lease per actively written chunk

Why are leases helpful?
The coordinator must be able to designate a new primary if the present primary fails
But the coordinator cannot distinguish “primary has failed” from “primary is still alive but the network has a problem.”
What if the coordinator designates a new primary while old one is active?
two active primaries!
C1 writes to P1, C2 reads from P2, doesn’t seen C1’s write!
called “split brain” – a disaster
Leases help prevent split brain:
Coordinator won’t designate new primary until the current one is guaranteed to have stopped acting as primary

What if the coordinator crashes?
Two strategies
1.Coordinator writes critical state to its disk
If it crashes and reboots with disk intact, re-reads state, resumes operations
2.Coordinator sends each state update to a “backup coordinator”, which also records it to disk; backup coordinator can take over if main coordinator cannot be restarted

What information must the coordinator save to disk to recover from crashes?
Table mapping file name -> array of chunk handles
Table mapping chunk handle -> current version #
What about the list of chunkservers for each chunk?
A rebooted coordinator asks all the chunkservers what they store
A rebooted coordinator must also wait one lease time before designating any new primaries

*Who/what decides the coordinator is dead, and chooses a replacement?
Paper does not say
Could the coordinator replicas ping the coordinator, and automatically take over if no response?

*Suppose the coordinator reboots, and polls chunkservers
What if a chunkserver has a chunk, but it wasn’t a secondary?
I.e. the current primary wasn’t keeping it up to date?
Coordinator remembers version number per chunk, on disk
Increments each time it designates a new primary for the chunk
Chunkserver also remembers its version number per chunk
When chunkserver reports to coordinator, coordinator compares version number, only accepts if current version

*What if a client has cached a stale (wrong) primary for a chunk?

*What if the reading client has cached a stale server list for a chunk?

*What if the primary crashes before sending append to all secondaries?
Could a secondary that didn’t see the append be chosen as the new primary?
Is it a problem that the other secondary did see the append?

What would it take to have no anomalies – strict consistency?
I.e. all clients see the same file content
Too hard to give a real answer, but here are some issues
*All replicas should complete each write, or none – “atomic write”
Perhaps tentative writes until all promise to complete it?
Don’t expose writes until all have agreed to perform them!
*Primary should detect duplicate client write requests
If primary crashes, some replicas may be missing the last few ops
They must sync up
*Clients must be prevented from reading from stale ex-secondaries
You’ll see solutions in Labs 2, 3, and 4!

*Are there circumstances in which GFS will break its guarantees?
e.g. write succeeds, but subsequent readers don’t see the data
All coordinator replicas permanently lose state (permanent disk failure)
Read will fail
All chunkservers holding the chunk permanently lose disk content
Read will fail
CPU, RAM, network, or disk yields an incorrect value
checksum catches some cases, but not all
Read may say “success” but yield the wrong data!
Above errors were “fail-stop”, but this is a “byzantine” failure
Time is not properly synchronized, so leases don’t work out
So multiple primaries, maybe write goes to one, read to the other
Again, read may yield “success” but wrong data – byzantine failure

Performance (Figure 3)
large aggregate throughput for read
94 MB/sec total for 16 clients + 16 chunkservers
or 6 MB/second per client
is that good?
one disk sequential throughput was about 30 MB/s
one NIC was about 10 MB/s
Close to saturating inter-switch link’s 125 MB/sec (1 Gbit/sec)
So: multi-client scalability is good
Table 3 reports 500 MB/sec for cluster A, which was a lot
writes to different files lower than possible maximum
authors blame their network stack (but no detail)
concurrent appends to single file
limited by the server that stores last chunk
hard to interpret after 15 years, e.g. how fast were the disks?

Retrospective interview with GFS engineer:
http://queue.acm.org/detail.cfm?id=1594206
file count was the biggest problem
eventual numbers grew to 1000x those in Table 2 !
hard to fit in coordinator RAM
coordinator scanning of all files/chunks for GC is slow
1000s of clients -> too much CPU load on coordinator
coordinator fail-over initially manual, 10s of minutes, too long
applications had to be designed to cope with GFS semantics and limitations
more painful than expected
BigTable is one answer to many-small-files problem and Colossus apparently shards coordinator data over many coordinators

Summary
case study of performance, fault-tolerance, consistency
specialized for MapReduce applications
good ideas:
global cluster file system as universal infrastructure
separation of naming (coordinator) from storage (chunkserver)
sharding for parallel throughput
huge files/chunks to reduce overheads
primary to choose order for concurrent writes
leases to prevent split-brain
not so great:
single coordinator performance
ran out of RAM and CPU
chunkservers not very efficient for small files
lack of automatic fail-over to coordinator replica
maybe consistency was too relaxed

http://queue.acm.org/detail.cfm?id=1594206
https://cloud.google.com/blog/products/storage-data-transfer/a-peek-behind-colossus-googles-file-system

Paper Questions

The GFS paper is a “classic” paper that describes one of the first distributed file systems for data-center applications such as large MapReduce jobs. It touches on many themes of 6.5840: parallel performance, fault tolerance, replication, and consistency. It is good systems paper with details from apps all the way to network successful

GFS has been replaced by something called Colossus, with the same overall goals, but improvements in master performance and fault-tolerance. In addition, many applications within Google have switched to more database-like storage systems such as BigTable and Spanner. However, much of the GFS design lives on in HDFS, the storage system for the Hadoop open-source MapReduce

Question: Describe a sequence of events that would result in a client reading stale data from the Google File System

Answer: 基于“元数据缓存”与“版本号延迟检测”的序列
导致过期数据(Stale Data)的核心原因是:客户端依赖了过期的元数据缓存,而此时 Master 尚未剔除或检测到已经落后的副本
事件序列描述:
初始状态:
某个数据块(Chunk)在 Chunkserver A、B、C 上有三个副本
Master 记录的该 Chunk 当前版本号(Version Number)为 V1
客户端缓存位置:
客户端 C1 询问 Master 该 Chunk 的位置
Master 返回副本列表 [A, B, C]
关键点:客户端 C1 将该列表及其版本号 V1 缓存在本地内存中(通常缓存有效期为数分钟)
副本故障与数据更新:
副本 C 发生故障(如网络断开或暂时宕机)
另一个客户端 C2 此时申请写入该 Chunk
Master 发现副本 C 不可用,于是增加版本号到 V2,并只将 V2 发送给在线的副本 A 和 B
客户端 C2 成功将新数据写入副本 A 和 B,此时 A、B 的数据是 V2 版本
副本恢复:
副本 C 重新上线。由于它错过了刚才的写入,它持有的依然是 V1 版本 的旧数据
此时,Master 可能尚未通过心跳信号(Heartbeat)检测到 C 的版本号已经落后,或者检测到了但尚未通知到所有客户端
客户端读取过期数据:
客户端 C1 再次需要读取该 Chunk
关键点:C1 不询问 Master,而是直接从本地 缓存 中获取了副本列表 [A, B, C]
C1 随机选择或根据负载选择了副本 C 进行读取
副本 C 并不自知已过期,直接返回了 V1 版本 的旧数据
结果:客户端 C1 读取到了过期数据

Version Number(版本号):用于区分新旧数据,但 Master 增加版本号的操作可能无法立即同步给所有潜在读者
Metadata Caching(元数据缓存):客户端为了性能,在缓存有效期内不会联系 Master,这是过期读取的根本原因
Master-Chunkserver Heartbeats(心跳机制):Master 发现副本过期的过程是异步的(通过周期性心跳),存在时间差
Fail-stop model:GFS 假设副本是诚实的,如果副本没意识到自己过期,它会直接提供旧数据

GFS 如何缓解此问题?
大多数文件是 Append-only(只追加) 的,读取时通过文件长度或校验和很容易发现数据异常
元数据缓存有超时限制
客户端在下一次打开文件或缓存失效后,最终会从 Master 处获得正确的副本列表

GFS FAQ

Q: Did having a single master turn out to be a good idea?
A: That idea simplified initial deployment but was not so great in the long run. This article (GFS: Evolution on Fast Forward, https://queue.acm.org/detail.cfm?id=1594206) says that as the years went by and GFS use grew, a few things went wrong. The number of files grew enough that it wasn’t reasonable to store all files’ metadata in the RAM of a single master. The number of clients grew enough that a single master didn’t have enough CPU power to serve them. The fact that switching from a failed master to one of its secondaries required human intervention made recovery slow. Apparently Google’s replacement for GFS, Colossus, splits the master over multiple servers, and has more automated master failure recovery

Q: Why is atomic record append at-least-once, rather than exactly once?
A: Section 3.1, Step 7, says that if a write fails at one of the secondaries, the client re-tries the write. That will cause the data to be appended more than once at the non-failed replicas. A different design could detect duplicate client requests despite arbitrary failures (e.g. a primary failure between the original request and the client’s retry). You’ll implement such a design in the labs, at considerable expense in complexity and performance

Q: How does an application know what sections of a chunk consist of padding and duplicate records?
A: To detect padding, applications can put a predictable magic number at the start of a valid record, or include a checksum that will likely only be valid if the record is valid. The application can detect duplicates by including unique IDs in records. Then, if it reads a record that has the same ID as an earlier record, it knows that they are duplicates of each other. GFS provides a library for applications that handles these cases. This aspect of the GFS design effectively moves complexity from GFS to applications, which is perhaps not ideal

Q: How can clients find their data given that atomic record append writes it at an unpredictable offset in the file?
A: Append (and GFS in general) is mostly intended for applications that sequentially read entire files. Such applications will scan the file looking for valid records (see the previous question), so they don’t need to know the record locations in advance. For example, the file might contain URLs encountered by a set of concurrent web crawlers. The file offset of any given URL doesn’t matter much; readers just want to be able to read the entire set of URLs

Q: What’s a checksum?
A: A checksum algorithm takes a sequence of bytes as input and returns a single number that’s a function of that sequence. For example, a simple checksum might be the sum of all the bytes in the input. GFS stores the checksum of each 64 kilobyte “block” in each chunk. When a chunkserver writes a block of data to its disk, it first computes the checksum of the block, and saves the checksum on disk. When a chunkserver reads a block from its disk, it also reads the relevant previously-saved checksum, re-computes a checksum from the data read from disk, and checks that the two checksums match. If the data was corrupted by the disk, the checksums won’t match, and the chunkserver will know to return an error. Separately, some GFS applications store their own checksums, over application-defined records, inside GFS files, to distinguish between correct records and padding. CRC32 is an example of a checksum algorithm

Q: The paper mentions reference counts – what are they?
A: They are part of the implementation of copy-on-write for snapshots. When GFS creates a snapshot, it doesn’t copy the chunks, but instead increases the reference counter of each chunk. This makes creating a snapshot inexpensive. If a client writes a chunk and the master notices the reference count is greater than one, the master first makes a copy so that the client can update the copy (instead of the chunk that is part of the snapshot). You can view this as delaying the copy until it is absolutely necessary. The hope is that not all chunks will be modified and one can avoid making some copies

Q: If an application uses the standard POSIX file APIs, would it need to be modified in order to use GFS?
A: Yes, but GFS isn’t intended for existing applications. It is designed for newly-written applications, such as MapReduce programs

Q: How does GFS determine the location of the nearest replica?
A: The paper hints that GFS does this based on the IP addresses of the servers storing the available replicas. In 2003, Google must have assigned IP addresses in such a way that if two IP addresses are close to each other in IP address space, then they are also close to each other in machine-room network topology (perhaps plugged into the same Ethernet switch, or into Ethernet switches that are themselves directly connected)

Q: What’s a lease?
A: For GFS, a lease is a period of time for which the master grants a chunkserver the ability to act as the primary for a particular chunk. The master guarantees not to assign a different primary for the duration of the lease, and the primary agrees to stop acting as primary before the lease expires (unless the primary first asks the master to extend the lease). Leases are a way to avoid having the primary have to repeatedly ask the master if it is still primary – it knows it can act as primary for the next minute (or whatever the lease interval is) without talking to the master again

Q: Suppose S1 is the primary for a chunk, and the network between the master and S1 fails. The master will notice and designate some other server as primary, say S2. Since S1 didn’t actually fail, are there now two primaries for the same chunk?
A: That would be a disaster, since both primaries might apply different updates to the same chunk. Luckily GFS’s lease mechanism prevents this scenario. The master granted S1 a 60-second lease to be primary. S1 knows to stop being primary before its lease expires. The master won’t grant a lease to S2 until after the lease to S1 expires. So S2 won’t start acting as primary until after S1 stops

Q: 64 megabytes sounds awkwardly large for the chunk size!
A: The 64 MB chunk size is the unit of book-keeping in the master, and the granularity at which files are sharded over chunkservers. Clients can issue smaller reads and writes – they are not forced to deal in whole 64 MB chunks. The point of using such a big chunk size is to reduce the size of the meta-data tables in the master, and to avoid limiting clients that want to do huge transfers to reduce overhead. On the other hand, files less than 64 MB in size do not get much parallelism

Q: Does Google still use GFS?
A: GFS has been replaced by something called Colossus, with the same overall goals, but improvements in master performance and fault-tolerance. In addition, many applications within Google have switched to more database-like storage systems such as BigTable and Spanner. However, much of the GFS design lives on in HDFS, the storage system for the Hadoop open-source MapReduce
https://cloud.google.com/blog/products/storage-data-transfer/a-peek-behind-colossus-googles-file-system

Q: How acceptable is it that GFS trades correctness for performance and simplicity?
A: This a recurring theme in distributed systems. Strong consistency usually requires protocols that are complex and require communication and waiting for replies (as we will see in the next few lectures). By exploiting ways that specific application classes can tolerate relaxed consistency, one can design systems that have good performance and sufficient consistency. For example, GFS optimizes for MapReduce applications, which need high read performance for large files and are OK with having holes in files, records showing up several times, and inconsistent reads. On the other hand, GFS would not be good for storing account balances at a bank

Q: What if the master fails?
A: There are replica masters with a full copy of the master state; the paper’s design requires some outside entity (a human?) to decide to switch to one of the replicas after a master failure (Section 5.1.3). We will see later how to build replicated services that automatically
switch to a backup server if the main server fails, and you’ll build such a thing in Lab 2

Q: Why 3 replicas?
A: Perhaps this was the line of reasoning: two replicas are not enough because, after one fails, there may not be enough time to re-replicate before the remaining replica fails; three makes that scenario much less likely. With 1000s of disks, low-probabilty events like multiple replicas failing in short order occur uncomfortably often. Here is a study of disk reliability from that era: https://research.google.com/archive/disk_failures.pdf. You need to factor in the time it takes to make new copies of all the chunks that were stored on a failed disk; and perhaps also the frequency of power, server, network, and software failures. The cost of disks (and associated power, air conditioning, and rent), and the value of the data being protected, are also relevant

Q: What is internal fragmentation? Why does lazy allocation help?
A: Internal fragmentation is the space wasted when a system uses an allocation unit larger than needed for the requested allocation. If
GFS allocated disk space in 64MB units, then a one-byte file would waste almost 64MB of disk. GFS avoids this problem by allocating disk space lazily. Every chunk is a Linux file, and Linux file systems use block sizes of a few tens of kilobytes; so when an application creates a one-byte GFS file, the file’s chunk consumes only one Linux disk block, not 64 MB

Q: What benefit does GFS obtain from the weakness of its consistency?
A: It’s easier to think about the additional work GFS would have to do to achieve stronger consistency
The primary should not let secondaries apply a write unless all the secondaries will be able to do it. This likely requires two rounds of
communication – one to ask all secondaries if they are alive and are able to promise to do the write if asked, and (if all answer yes) a second round to tell the secondaries to commit the write
If the primary dies, some secondaries may have missed the last few update messages the primary sent. This will cause the remaining secondaries to have slightly differing copies of the data. Before resuming operation, a new primary should ensure that all the secondaries have identical copies
Since clients re-send requests if they suspect something has gone wrong, primaries would need to filter out operations that have already been executed
Clients cache chunk locations, and may send reads to a chunkserver that holds a stale version of a chunk. GFS would need a way to
guarantee that this cannot succeed

  • Titre: MIT 6.824(6.5840) Distributed Systems
  • Auteur: tiny_star
  • Créé à : 2026-03-10 11:45:32
  • Mis à jour à : 2026-03-25 19:19:17
  • Lien: https://tiny-star3.github.io/2026/03/10/Cpp/[Class]MIT 6.824(6.5840) Distributed Systems/
  • Licence: Cette œuvre est sous licence CC BY-NC-SA 4.0.
Commentaires