本篇文章主要描述了如何使用 golang 实现一个单机版的 MapReduce 程序,想法来自于 MIT-6.824 课程的一个 lab。本文分为以下几个模块:
MapReduce 的计算以一组 Key/Value 对为输入,然后输出一组 Key/Value 对,用户通过编写 Map 和 Reduce 函数来控制处理逻辑。
Map 函数把输入转换成一组中间的 Key/Value 对,MapReduce library 会把所有 Key 的中间结果传递给 Reduce 函数处理。
Reduce 函数接收 Key 和其对应的一组 Value,它的作用就是聚合这些 Value,产生最终的结果。Reduce 的输入是以迭代器的方式输入,使得 MapReduce 可以处理数据量比内存大的情况。
一次 MapReduce 的处理过程如下图:
成功执行后,输出结果在 R 个文件中,通常,用户不需要合并这 R 个文件,因为,可以把它们作为新的 MapReduce 处理逻辑的输入数据,或者其它分布式应用的输入数据。
MapReduce 核心组件包括 Master 和 Worker,它们的职责分别如下。
MapReduce 负责一次执行过程中 Map 和 Reduce 任务的调度,其需要维护的信息包括如下:
Worker 分为两种,分别是 Map 和 Reduce:
Map Worker 的职责:
Reduce Worker 的职责:
了解 MapReduce 基本原理后,再来通过一个简单的 word count 例子,来描述 MapReduce 的使用方法,代码如下:
- // The mapping function is called once for each piece of the input.
- // In this framework, the key is the name of the file that is being processed,
- // and the value is the file's contents. The return value should be a slice of
- // key/value pairs, each represented by a mapreduce.KeyValue.
- func mapF(document string, value string) (res []mapreduce.KeyValue) {
- // TODO: you have to write this function
- f := func(r rune) bool {
- return !unicode.IsLetter(r)
- }
- words := strings.FieldsFunc(value, f)
- for _, word := range words {
- kv := mapreduce.KeyValue{word, " "}
- res = append(res, kv)
- }
- return
- }
-
- // The reduce function is called once for each key generated by Map, with a
- // list of that key's string value (merged across all inputs). The return value
- // should be a single output value for that key.
- func reduceF(key string, values []string) string {
- // TODO: you also have to write this function
- s := strconv.Itoa(len(values))
- return s
- }
-
- // Can be run in 3 ways:
- // 1) Sequential (e.g., go run wc.go master sequential x1.txt .. xN.txt)
- // 2) Master (e.g., go run wc.go master localhost:7777 x1.txt .. xN.txt)
- // 3) Worker (e.g., go run wc.go worker localhost:7777 localhost:7778 &)
- func main() {
- if len(os.Args) < 4 {
- fmt.Printf("%s: see usage comments in file\n", os.Args[0])
- } else if os.Args[1] == "master" {
- var mr *mapreduce.Master
- if os.Args[2] == "sequential" {
- mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF)
- } else {
- mr = mapreduce.Distributed("wcseq", os.Args[3:], 3, os.Args[2])
- }
- mr.Wait()
- } else {
- mapreduce.RunWorker(os.Args[2], os.Args[3], mapF, reduceF, 100)
- }
- }
-
一个 MapReduce 程序由三个部分组成:
Map 函数主要的功能为吐出 K/V 对
Reduce 函数则是对相同的 Key 做操作,一般是统计之类的功能,具体地看应用的需求。
分为 Sequential 和 Distributed,其中 Sequential 为串行地执行 Map 和 Reduce 任务,主要用于用户程序调试的场景,Distributed 则用于真正的用户程序执行的场景。
本节实现的 MapReduce 单机版与 Google 论文中的 MapReduce 主要的不同如下:
本节分为两个部分来讨论:
Sequential 部分的调度程序实现如下:
- // Sequential runs map and reduce tasks sequentially, waiting for each task to // complete before scheduling the next.
- func Sequential(jobName string, files []string, nreduce int,
- mapF func(string, string) []KeyValue,
- reduceF func(string, []string) string,
- ) (mr *Master) {
- mr = newMaster("master")
- go mr.run(jobName, files, nreduce, func(phase jobPhase) {
- switch phase {
- case mapPhase:
- for i, f := range mr.files {
- doMap(mr.jobName, i, f, mr.nReduce, mapF)
- }
- case reducePhase:
- for i := 0; i < mr.nReduce; i++ {
- doReduce(mr.jobName, i, len(mr.files), reduceF)
- }
- }
- }, func() {
- mr.stats = []int{len(files) + nreduce}
- })
- return
- }
-
其逻辑非常简单,就是按照顺序先一个个的处理 Map 任务,处理完成之后,再一个个的处理 Reduce 任务。
接下来,看 doMap 和 doReduce 是如何实现的。
doMap 的实现如下:
- // doMap does the job of a map worker: it reads one of the input files
- // (inFile), calls the user-defined map function (mapF) for that file's
- // contents, and partitions the output into nReduce intermediate files.
- func doMap(
- jobName string, // the name of the MapReduce job
- mapTaskNumber int, // which map task this is
- inFile string,
- nReduce int, // the number of reduce task that will be run ("R" in the paper)
- mapF func(file string, contents string) []KeyValue,
- ) {
- // TODO:
- // You will need to write this function.
- // You can find the filename for this map task's input to reduce task number
- // r using reduceName(jobName, mapTaskNumber, r). The ihash function (given
- // below doMap) should be used to decide which file a given key belongs into.
- //
- // The intermediate output of a map task is stored in the file
- // system as multiple files whose name indicates which map task produced
- // them, as well as which reduce task they are for. Coming up with a
- // scheme for how to store the key/value pairs on disk can be tricky,
- // especially when taking into account that both keys and values could
- // contain newlines, quotes, and any other character you can think of.
- //
- // One format often used for serializing data to a byte stream that the
- // other end can correctly reconstruct is JSON. You are not required to
- // use JSON, but as the output of the reduce tasks *must* be JSON,
- // familiarizing yourself with it here may prove useful. You can write
- // out a data structure as a JSON string to a file using the commented
- // code below. The corresponding decoding functions can be found in
- // common_reduce.go.
- //
- // enc := json.NewEncoder(file)
- // for _, kv := ... {
- // err := enc.Encode(&kv)
- //
- // Remember to close the file after you have written all the values!``
- file, err := os.Open(inFile)
- if err != nil {
- log.Fatal(err)
- }
- defer file.Close()
- scanner := bufio.NewScanner(file)
- contents := ""
- for scanner.Scan() {
- s := scanner.Text()
- s += "\n"
- contents = contents + s
- }
- kvs := mapF(inFile, contents)
- reduceFileMap := make(map[string]*os.File)
- jsonFileMap := make(map[string]*json.Encoder)
- for i := 0; i < nReduce; i++ {
- reduceFileName := reduceName(jobName, mapTaskNumber, i)
- file, err := os.Create(reduceFileName)
- if err != nil {
- log.Fatal(err)
- }
- enc := json.NewEncoder(file)
- reduceFileMap[reduceFileName] = file
- jsonFileMap[reduceFileName] = enc
- defer reduceFileMap[reduceFileName].Close()
- }
- for _, kv := range kvs {
- hashValue := int(ihash(kv.Key))
- fileNum := hashValue % nReduce
- reduceFileName := reduceName(jobName, mapTaskNumber, fileNum)
- enc, ok := jsonFileMap[reduceFileName]
- if !ok {
- log.Fatal(err)
- }
- err := enc.Encode(&kv)
- if err != nil {
- log.Fatal(err)
- }
- }
- }
-
-
处理过程如下:
doReduce 实现如下:
- // doReduce does the job of a reduce worker: it reads the intermediate
- // key/value pairs (produced by the map phase) for this task, sorts the
- // intermediate key/value pairs by key, calls the user-defined reduce function
- // (reduceF) for each key, and writes the output to disk.
- func doReduce(
- jobName string, // the name of the whole MapReduce job
- reduceTaskNumber int, // which reduce task this is
- nMap int, // the number of map tasks that were run ("M" in the paper)
- reduceF func(key string, values []string) string,
- ) {
- // TODO:
- // You will need to write this function.
- // You can find the intermediate file for this reduce task from map task number
- // m using reduceName(jobName, m, reduceTaskNumber).
- // Remember that you've encoded the values in the intermediate files, so you
- // will need to decode them. If you chose to use JSON, you can read out
- // multiple decoded values by creating a decoder, and then repeatedly calling
- // .Decode() on it until Decode() returns an error.
- //
- // You should write the reduced output in as JSON encoded KeyValue
- // objects to a file named mergeName(jobName, reduceTaskNumber). We require
- // you to use JSON here because that is what the merger than combines the
- // output from all the reduce tasks expects. There is nothing "special" about
- // JSON -- it is just the marshalling format we chose to use. It will look
- // something like this:
- //
- // enc := json.NewEncoder(mergeFile)
- // for key in ... {
- // enc.Encode(KeyValue{key, reduceF(...)})
- // }
- // file.Close()
- kvs := make(map[string][]string)
- for i := 0; i < nMap; i++ {
- reduceFileName := reduceName(jobName, i, reduceTaskNumber)
- file, err := os.Open(reduceFileName)
- if err != nil {
- log.Fatal(err)
- }
- defer file.Close()
- enc := json.NewDecoder(file)
- for {
- var kv KeyValue
- if err := enc.Decode(&kv); err == io.EOF {
- break
- } else if err != nil {
- log.Fatal(err)
- } else {
- log.Println(kv.Key + kv.Value)
- kvs[kv.Key] = append(kvs[kv.Key], kv.Value)
- }
- }
- }
-
- var keys []string
- for k, _ := range kvs {
- keys = append(keys, k)
- }
- sort.Sort(sort.StringSlice(keys))
- mergeFileName := mergeName(jobName, reduceTaskNumber)
- mergeFile, err := os.Create(mergeFileName)
- if err != nil {
- log.Fatal(err)
- }
- defer mergeFile.Close()
- enc := json.NewEncoder(mergeFile)
- for _, key := range keys {
- enc.Encode(KeyValue{key, reduceF(key, kvs[key])})
- }
- }
-
-
Reduce 任务的处理逻辑如下:
Distributed 和 Sequencial 的主要区别在于调度函数的实现,如下
- // schedule starts and waits for all tasks in the given phase (Map or Reduce).
- func (mr *Master) schedule(phase jobPhase) {
- var ntasks int
- var nios int // number of inputs (for reduce) or outputs (for map)
- switch phase {
- case mapPhase:
- ntasks = len(mr.files)
- nios = mr.nReduce
- case reducePhase:
- ntasks = mr.nReduce
- nios = len(mr.files)
- }
-
- fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)
-
- // All ntasks tasks have to be scheduled on workers, and only once all of
- // them have been completed successfully should the function return.
- // Remember that workers may fail, and that any given worker may finish
- // multiple tasks.
- //
- // TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
- //
- switch phase {
- case mapPhase:
- mr.scheduleMap()
- case reducePhase:
- mr.scheduleReduce()
- }
- fmt.Printf("Schedule: %v phase done\n", phase)
- }
-
分为 Map 和 Reduce 两阶段的调度,先来看 ScheduleMap 部分:
- type taskStat struct {
- taskNumber int
- ok bool
- }
-
- func (mr *Master) checkWorkerExist(w string) bool {
- mr.Lock()
- defer mr.Unlock()
- for _, v := range mr.workers {
- if w == v {
- return true
- }
- }
- return false
- }
-
- func (mr *Master) chooseTask(failedTasks []int, nTaskIndex int) ([]int, int) {
- if 0 == len(failedTasks) {
- return failedTasks, nTaskIndex
- } else {
- fmt.Println("choose failed tasks")
- task := failedTasks[0]
- failedTasks = failedTasks[1:len(failedTasks)]
- fmt.Println("failedTasks", failedTasks)
- return failedTasks, task
- }
- }
-
- func (mr *Master) runMapTask(nTaskNumber int, w string, doneTask chan taskStat) {
- if nTaskNumber < len(mr.files) {
- var args DoTaskArgs
- args.JobName = mr.jobName
- args.File = mr.files[nTaskNumber]
- args.Phase = mapPhase
- args.TaskNumber = nTaskNumber
- args.NumOtherPhase = mr.nReduce
- go func() {
- ok := call(w, "Worker.DoTask", args, new(struct{}))
- var taskstat taskStat
- taskstat.taskNumber = args.TaskNumber
- taskstat.ok = ok
- if ok {
- doneTask <- taskstat
- fmt.Println("done task %d", args.TaskNumber)
- } else {
- doneTask <- taskstat
- fmt.Println("get failed task %d", args.TaskNumber)
- }
- }()
- } else {
- fmt.Printf("all tasks sent out")
- }
- }
-
- func (mr *Master) scheduleMap() {
- fmt.Println("begin scheduling map tasks")
- taskWorkerMap := make(map[int]string)
- doneTask := make(chan taskStat, 1)
- var nTaskIndex = 0
- var failedTasks []int
- mr.Lock()
- var nInitTask = min(len(mr.files), len(mr.workers))
- mr.Unlock()
- for ; nTaskIndex < nInitTask; nTaskIndex++ {
- mr.Lock()
- w := mr.workers[nTaskIndex]
- mr.Unlock()
- mr.runMapTask(nTaskIndex, w, doneTask)
- }
-
- for {
- select {
- case newWorker := <-mr.registerChannel:
- fmt.Println("New Worker register %s", newWorker)
- var nextTask int
- failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
- if nextTask < len(mr.files) {
- fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
- mr.runMapTask(nextTask, newWorker, doneTask)
- taskWorkerMap[nextTask] = newWorker
- if nTaskIndex == nextTask {
- nTaskIndex++
- }
- }
- case taskStat := <-doneTask:
- var w string
- taskNumber, ok := taskStat.taskNumber, taskStat.ok
- if !ok {
- failedTasks = append(failedTasks, taskNumber)
- } else {
- w = taskWorkerMap[taskNumber]
- delete(taskWorkerMap, taskNumber)
- }
- if mr.checkWorkerExist(w) {
- fmt.Println("failed task count, failed tasks", len(failedTasks), failedTasks)
- var nextTask int
- failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
- if nextTask < len(mr.files) {
- fmt.Println("failed task count, failed tasks", len(failedTasks), failedTasks)
- fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
- mr.runMapTask(nextTask, w, doneTask)
- taskWorkerMap[nextTask] = w
- if nTaskIndex == nextTask {
- nTaskIndex++
- }
- }
- }
- fmt.Println("task index %d, task number %d, map count", nTaskIndex, taskNumber, len(taskWorkerMap))
- for k, v := range taskWorkerMap {
- fmt.Println("%s:%v", k, v)
- }
- if (nTaskIndex == len(mr.files)) && (0 == len(taskWorkerMap)) {
- fmt.Println("all tasks in mapPhase is done")
- return
- }
- }
-
- }
- }
-
-
不带 Worker 容灾的处理:
处理新 Worker 注册的事件的方式为选择下一个要执行的任务,发送给新注册的 Worker 去执行。
处理调度完成的事件的方式为选择下个需要执行的任务,调度给刚刚完成执行任务的 Worker 执行。当所有的 Map 任务都处理完成后,表示 Map 阶段完成,退出调度。
带 Worker 容灾的处理:
Worker 容灾的处理逻辑为,当任务执行失败时,加入到执行失败的任务队列中,当发生上述两种事件时,先从失败的任务队列中拿下一个任务执行,只有当失败的任务队列为空时,才调度新的任务执行。
ScheduleReduce 的实现如下
- func (mr *Master) scheduleReduce() {
- fmt.Println("start scheduling reduce tasks")
- taskWorkerMap := make(map[int]string)
- doneTask := make(chan taskStat, 1)
- var failedTasks []int
- var nTaskIndex = 0
- mr.Lock()
- var nInitTask = min(mr.nReduce, len(mr.workers))
- mr.Unlock()
- for ; nTaskIndex < nInitTask; nTaskIndex++ {
- mr.Lock()
- w := mr.workers[nTaskIndex]
- mr.Unlock()
- taskWorkerMap[nTaskIndex] = w
- mr.runReduceTask(nTaskIndex, w, doneTask)
- }
-
- for {
- select {
- case newWorker := <-mr.registerChannel:
- fmt.Println("New Worker register %s", newWorker)
- var nextTask int
- failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
- if nextTask < mr.nReduce {
- fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
- mr.runReduceTask(nextTask, newWorker, doneTask)
- taskWorkerMap[nextTask] = newWorker
- if nTaskIndex == nextTask {
- nTaskIndex++
- }
- }
- case taskStat := <-doneTask:
- var w string
- taskNumber, ok := taskStat.taskNumber, taskStat.ok
- if !ok {
- failedTasks = append(failedTasks, taskNumber)
- } else {
- w = taskWorkerMap[taskNumber]
- delete(taskWorkerMap, taskNumber)
- }
- if mr.checkWorkerExist(w) {
- var nextTask int
- failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
- if nextTask < mr.nReduce {
- fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
- mr.runReduceTask(nextTask, w, doneTask)
- taskWorkerMap[nextTask] = w
- if nTaskIndex == nextTask {
- nTaskIndex++
- }
- }
- }
- fmt.Println("task index %d, task number %d, map count", nTaskIndex, taskNumber, len(taskWorkerMap))
- for k, v := range taskWorkerMap {
- fmt.Println("%s:%v", k, v)
- }
- if (nTaskIndex == mr.nReduce) && (0 == len(taskWorkerMap)) {
- fmt.Println("all tasks in mapPhase is done")
- return
- }
- }
- }
- }
-
整体的处理逻辑和 ScheduleMap 类似,如下
容灾过程也是类似的,不再赘述。