2025年3月28日 星期五 甲辰(龙)年 月廿七 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Go语言

golang 实现单机版 MapReduce

时间:12-14来源:作者:点击数:8
CDSY,CDSY.XYZ

本篇文章主要描述了如何使用 golang 实现一个单机版的 MapReduce 程序,想法来自于 MIT-6.824 课程的一个 lab。本文分为以下几个模块:

  • MapReduce 基本原理
  • MapReduce 简单使用
  • MapReduce 单机版实现

MapReduce 基本原理

MapReduce 的计算以一组 Key/Value 对为输入,然后输出一组 Key/Value 对,用户通过编写 Map 和 Reduce 函数来控制处理逻辑。

Map 函数把输入转换成一组中间的 Key/Value 对,MapReduce library 会把所有 Key 的中间结果传递给 Reduce 函数处理。

Reduce 函数接收 Key 和其对应的一组 Value,它的作用就是聚合这些 Value,产生最终的结果。Reduce 的输入是以迭代器的方式输入,使得 MapReduce 可以处理数据量比内存大的情况。

一次 MapReduce 的处理过程如下图:

  1. MapReduce library 会把输入文件划分成多个 16 到 64MB 大小的分片(大小可以通过参数调节),然后在一组机器上启动程序。
  2. 其中比较特殊的程序是 master,剩下的由 master 分配任务的程序叫 worker。总共有 M 个 map 任务和 R 个 reduce 任务需要分配,master 会选取空闲的 worker,然后分配一个 map 任务或者 reduce 任务。
  3. 处理 map 任务的 worker 会从输入分片读入数据,解析出输入数据的 K/V 对,然后传递给 Map 函数,生成的 K/V 中间结果会缓存在内存中。
  4. map 任务的中间结果会被周期性地写入到磁盘中,以 partition 函数来分成 R 个部分。R 个部分的磁盘地址会推送到 master,然后由它转发给响应的 reduce worker。
  5. 当 reduce worker 接收到 master 发送的地址信息时,它会通过 RPC 来向 map worker 读取对应的数据。当 reduce worker 读取到了所有的数据,它先按照 key 来排序,方便聚合操作。
  6. reduce worker 遍历排序好的中间结果,对于相同的 key,把其所有数据传入到 Reduce 函数进行处理,生成最终的结果会被追加到结果文件中。
  7. 当所有的 map 和 reduce 任务都完成时,master 会唤醒用户程序,然后返回到用户程序空间执行用户代码。

成功执行后,输出结果在 R 个文件中,通常,用户不需要合并这 R 个文件,因为,可以把它们作为新的 MapReduce 处理逻辑的输入数据,或者其它分布式应用的输入数据。

MapReduce 核心组件

MapReduce 核心组件包括 Master 和 Worker,它们的职责分别如下。

Master

MapReduce 负责一次执行过程中 Map 和 Reduce 任务的调度,其需要维护的信息包括如下:

  • worker 的状态
  • 任务的状态
  • Map 生成的文件的位置
Worker

Worker 分为两种,分别是 Map 和 Reduce:

Map Worker 的职责:

  • 对分片数据调用用户指定的 Map 函数
  • 根据 Reduce 的个数,把数据分成 R 份

Reduce Worker 的职责:

  • 对收集到的数据进行排序
  • 对于相同的 Key 调用 Reduce 函数进行处理

MapReduce 简单使用

了解 MapReduce 基本原理后,再来通过一个简单的 word count 例子,来描述 MapReduce 的使用方法,代码如下:

  • // The mapping function is called once for each piece of the input.
  • // In this framework, the key is the name of the file that is being processed,
  • // and the value is the file's contents. The return value should be a slice of
  • // key/value pairs, each represented by a mapreduce.KeyValue.
  • func mapF(document string, value string) (res []mapreduce.KeyValue) {
  • // TODO: you have to write this function
  • f := func(r rune) bool {
  • return !unicode.IsLetter(r)
  • }
  • words := strings.FieldsFunc(value, f)
  • for _, word := range words {
  • kv := mapreduce.KeyValue{word, " "}
  • res = append(res, kv)
  • }
  • return
  • }
  • // The reduce function is called once for each key generated by Map, with a
  • // list of that key's string value (merged across all inputs). The return value
  • // should be a single output value for that key.
  • func reduceF(key string, values []string) string {
  • // TODO: you also have to write this function
  • s := strconv.Itoa(len(values))
  • return s
  • }
  • // Can be run in 3 ways:
  • // 1) Sequential (e.g., go run wc.go master sequential x1.txt .. xN.txt)
  • // 2) Master (e.g., go run wc.go master localhost:7777 x1.txt .. xN.txt)
  • // 3) Worker (e.g., go run wc.go worker localhost:7777 localhost:7778 &)
  • func main() {
  • if len(os.Args) < 4 {
  • fmt.Printf("%s: see usage comments in file\n", os.Args[0])
  • } else if os.Args[1] == "master" {
  • var mr *mapreduce.Master
  • if os.Args[2] == "sequential" {
  • mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF)
  • } else {
  • mr = mapreduce.Distributed("wcseq", os.Args[3:], 3, os.Args[2])
  • }
  • mr.Wait()
  • } else {
  • mapreduce.RunWorker(os.Args[2], os.Args[3], mapF, reduceF, 100)
  • }
  • }

一个 MapReduce 程序由三个部分组成:

  • Map 函数
  • Reduce 函数
  • 调用 MapReduce 执行的函数

Map 函数

Map 函数主要的功能为吐出 K/V 对

Reduce 函数

Reduce 函数则是对相同的 Key 做操作,一般是统计之类的功能,具体地看应用的需求。

调用 MapReduce 库函数

分为 Sequential 和 Distributed,其中 Sequential 为串行地执行 Map 和 Reduce 任务,主要用于用户程序调试的场景,Distributed 则用于真正的用户程序执行的场景。

MapReduce 单机版实现

本节实现的 MapReduce 单机版与 Google 论文中的 MapReduce 主要的不同如下:

  • 输入和输出数据都采用本机的文件系统,没有使用到类似于 GFS 的分布式文件存储
  • Google 的 MapReduce 通过 GFS 的文件名字的原子操作来保证 Reduce Worker 宕机时,最终只会生成一份结果文件;在单机文件系统中,如果 Worker 和 Master 之间网络通信断掉,但是 Worker 本身可能还在工 作,这时候如果重新启动另一个 Worker 可能会造成两个 Worker 写入同一份文件,这种场景,在单机版 MapReduce 的 Worker 容灾中不考 虑。

本节分为两个部分来讨论:

  • MapReduce 的 Sequential 实现
  • MapReduce 的 Distributed 实现(带 Worker 容灾)

MapReduce 的 Sequential 实现

Sequential 部分的调度程序实现如下:

  • // Sequential runs map and reduce tasks sequentially, waiting for each task to // complete before scheduling the next.
  • func Sequential(jobName string, files []string, nreduce int,
  • mapF func(string, string) []KeyValue,
  • reduceF func(string, []string) string,
  • ) (mr *Master) {
  • mr = newMaster("master")
  • go mr.run(jobName, files, nreduce, func(phase jobPhase) {
  • switch phase {
  • case mapPhase:
  • for i, f := range mr.files {
  • doMap(mr.jobName, i, f, mr.nReduce, mapF)
  • }
  • case reducePhase:
  • for i := 0; i < mr.nReduce; i++ {
  • doReduce(mr.jobName, i, len(mr.files), reduceF)
  • }
  • }
  • }, func() {
  • mr.stats = []int{len(files) + nreduce}
  • })
  • return
  • }

其逻辑非常简单,就是按照顺序先一个个的处理 Map 任务,处理完成之后,再一个个的处理 Reduce 任务。

接下来,看 doMap 和 doReduce 是如何实现的。

doMap 的实现如下:

  • // doMap does the job of a map worker: it reads one of the input files
  • // (inFile), calls the user-defined map function (mapF) for that file's
  • // contents, and partitions the output into nReduce intermediate files.
  • func doMap(
  • jobName string, // the name of the MapReduce job
  • mapTaskNumber int, // which map task this is
  • inFile string,
  • nReduce int, // the number of reduce task that will be run ("R" in the paper)
  • mapF func(file string, contents string) []KeyValue,
  • ) {
  • // TODO:
  • // You will need to write this function.
  • // You can find the filename for this map task's input to reduce task number
  • // r using reduceName(jobName, mapTaskNumber, r). The ihash function (given
  • // below doMap) should be used to decide which file a given key belongs into.
  • //
  • // The intermediate output of a map task is stored in the file
  • // system as multiple files whose name indicates which map task produced
  • // them, as well as which reduce task they are for. Coming up with a
  • // scheme for how to store the key/value pairs on disk can be tricky,
  • // especially when taking into account that both keys and values could
  • // contain newlines, quotes, and any other character you can think of.
  • //
  • // One format often used for serializing data to a byte stream that the
  • // other end can correctly reconstruct is JSON. You are not required to
  • // use JSON, but as the output of the reduce tasks *must* be JSON,
  • // familiarizing yourself with it here may prove useful. You can write
  • // out a data structure as a JSON string to a file using the commented
  • // code below. The corresponding decoding functions can be found in
  • // common_reduce.go.
  • //
  • // enc := json.NewEncoder(file)
  • // for _, kv := ... {
  • // err := enc.Encode(&kv)
  • //
  • // Remember to close the file after you have written all the values!``
  • file, err := os.Open(inFile)
  • if err != nil {
  • log.Fatal(err)
  • }
  • defer file.Close()
  • scanner := bufio.NewScanner(file)
  • contents := ""
  • for scanner.Scan() {
  • s := scanner.Text()
  • s += "\n"
  • contents = contents + s
  • }
  • kvs := mapF(inFile, contents)
  • reduceFileMap := make(map[string]*os.File)
  • jsonFileMap := make(map[string]*json.Encoder)
  • for i := 0; i < nReduce; i++ {
  • reduceFileName := reduceName(jobName, mapTaskNumber, i)
  • file, err := os.Create(reduceFileName)
  • if err != nil {
  • log.Fatal(err)
  • }
  • enc := json.NewEncoder(file)
  • reduceFileMap[reduceFileName] = file
  • jsonFileMap[reduceFileName] = enc
  • defer reduceFileMap[reduceFileName].Close()
  • }
  • for _, kv := range kvs {
  • hashValue := int(ihash(kv.Key))
  • fileNum := hashValue % nReduce
  • reduceFileName := reduceName(jobName, mapTaskNumber, fileNum)
  • enc, ok := jsonFileMap[reduceFileName]
  • if !ok {
  • log.Fatal(err)
  • }
  • err := enc.Encode(&kv)
  • if err != nil {
  • log.Fatal(err)
  • }
  • }
  • }

处理过程如下:

  • 读入输出文件
  • 调用用户指定的 Map 函数,吐出所有的 K/V 对
  • 创建跟 Reduce Worker 相同数量的文件,然后,对每个 K/V 对,根据 Key 来做 hash,输出到对应的文件

doReduce 实现如下:

  • // doReduce does the job of a reduce worker: it reads the intermediate
  • // key/value pairs (produced by the map phase) for this task, sorts the
  • // intermediate key/value pairs by key, calls the user-defined reduce function
  • // (reduceF) for each key, and writes the output to disk.
  • func doReduce(
  • jobName string, // the name of the whole MapReduce job
  • reduceTaskNumber int, // which reduce task this is
  • nMap int, // the number of map tasks that were run ("M" in the paper)
  • reduceF func(key string, values []string) string,
  • ) {
  • // TODO:
  • // You will need to write this function.
  • // You can find the intermediate file for this reduce task from map task number
  • // m using reduceName(jobName, m, reduceTaskNumber).
  • // Remember that you've encoded the values in the intermediate files, so you
  • // will need to decode them. If you chose to use JSON, you can read out
  • // multiple decoded values by creating a decoder, and then repeatedly calling
  • // .Decode() on it until Decode() returns an error.
  • //
  • // You should write the reduced output in as JSON encoded KeyValue
  • // objects to a file named mergeName(jobName, reduceTaskNumber). We require
  • // you to use JSON here because that is what the merger than combines the
  • // output from all the reduce tasks expects. There is nothing "special" about
  • // JSON -- it is just the marshalling format we chose to use. It will look
  • // something like this:
  • //
  • // enc := json.NewEncoder(mergeFile)
  • // for key in ... {
  • // enc.Encode(KeyValue{key, reduceF(...)})
  • // }
  • // file.Close()
  • kvs := make(map[string][]string)
  • for i := 0; i < nMap; i++ {
  • reduceFileName := reduceName(jobName, i, reduceTaskNumber)
  • file, err := os.Open(reduceFileName)
  • if err != nil {
  • log.Fatal(err)
  • }
  • defer file.Close()
  • enc := json.NewDecoder(file)
  • for {
  • var kv KeyValue
  • if err := enc.Decode(&kv); err == io.EOF {
  • break
  • } else if err != nil {
  • log.Fatal(err)
  • } else {
  • log.Println(kv.Key + kv.Value)
  • kvs[kv.Key] = append(kvs[kv.Key], kv.Value)
  • }
  • }
  • }
  • var keys []string
  • for k, _ := range kvs {
  • keys = append(keys, k)
  • }
  • sort.Sort(sort.StringSlice(keys))
  • mergeFileName := mergeName(jobName, reduceTaskNumber)
  • mergeFile, err := os.Create(mergeFileName)
  • if err != nil {
  • log.Fatal(err)
  • }
  • defer mergeFile.Close()
  • enc := json.NewEncoder(mergeFile)
  • for _, key := range keys {
  • enc.Encode(KeyValue{key, reduceF(key, kvs[key])})
  • }
  • }

Reduce 任务的处理逻辑如下:

  • 根据之前约定好的命名格式,找到该 Reduce Worker 需要处理的文件,然后,按照约定的方式进行解码
  • 得到所有的 K/V 对之后,根据 Key 对 K/V 对排序
  • 调用用户指定的 ReduceF 函数,对相同的 Key 的所有 Value 进行处理
  • 把处理后的结果以一定的编码方式写入文件

MapReduce 的 Distributed 实现(带 Worker 容灾)

Distributed 和 Sequencial 的主要区别在于调度函数的实现,如下

  • // schedule starts and waits for all tasks in the given phase (Map or Reduce).
  • func (mr *Master) schedule(phase jobPhase) {
  • var ntasks int
  • var nios int // number of inputs (for reduce) or outputs (for map)
  • switch phase {
  • case mapPhase:
  • ntasks = len(mr.files)
  • nios = mr.nReduce
  • case reducePhase:
  • ntasks = mr.nReduce
  • nios = len(mr.files)
  • }
  • fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)
  • // All ntasks tasks have to be scheduled on workers, and only once all of
  • // them have been completed successfully should the function return.
  • // Remember that workers may fail, and that any given worker may finish
  • // multiple tasks.
  • //
  • // TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
  • //
  • switch phase {
  • case mapPhase:
  • mr.scheduleMap()
  • case reducePhase:
  • mr.scheduleReduce()
  • }
  • fmt.Printf("Schedule: %v phase done\n", phase)
  • }

分为 Map 和 Reduce 两阶段的调度,先来看 ScheduleMap 部分:

ScheduleMap
  • type taskStat struct {
  • taskNumber int
  • ok bool
  • }
  • func (mr *Master) checkWorkerExist(w string) bool {
  • mr.Lock()
  • defer mr.Unlock()
  • for _, v := range mr.workers {
  • if w == v {
  • return true
  • }
  • }
  • return false
  • }
  • func (mr *Master) chooseTask(failedTasks []int, nTaskIndex int) ([]int, int) {
  • if 0 == len(failedTasks) {
  • return failedTasks, nTaskIndex
  • } else {
  • fmt.Println("choose failed tasks")
  • task := failedTasks[0]
  • failedTasks = failedTasks[1:len(failedTasks)]
  • fmt.Println("failedTasks", failedTasks)
  • return failedTasks, task
  • }
  • }
  • func (mr *Master) runMapTask(nTaskNumber int, w string, doneTask chan taskStat) {
  • if nTaskNumber < len(mr.files) {
  • var args DoTaskArgs
  • args.JobName = mr.jobName
  • args.File = mr.files[nTaskNumber]
  • args.Phase = mapPhase
  • args.TaskNumber = nTaskNumber
  • args.NumOtherPhase = mr.nReduce
  • go func() {
  • ok := call(w, "Worker.DoTask", args, new(struct{}))
  • var taskstat taskStat
  • taskstat.taskNumber = args.TaskNumber
  • taskstat.ok = ok
  • if ok {
  • doneTask <- taskstat
  • fmt.Println("done task %d", args.TaskNumber)
  • } else {
  • doneTask <- taskstat
  • fmt.Println("get failed task %d", args.TaskNumber)
  • }
  • }()
  • } else {
  • fmt.Printf("all tasks sent out")
  • }
  • }
  • func (mr *Master) scheduleMap() {
  • fmt.Println("begin scheduling map tasks")
  • taskWorkerMap := make(map[int]string)
  • doneTask := make(chan taskStat, 1)
  • var nTaskIndex = 0
  • var failedTasks []int
  • mr.Lock()
  • var nInitTask = min(len(mr.files), len(mr.workers))
  • mr.Unlock()
  • for ; nTaskIndex < nInitTask; nTaskIndex++ {
  • mr.Lock()
  • w := mr.workers[nTaskIndex]
  • mr.Unlock()
  • mr.runMapTask(nTaskIndex, w, doneTask)
  • }
  • for {
  • select {
  • case newWorker := <-mr.registerChannel:
  • fmt.Println("New Worker register %s", newWorker)
  • var nextTask int
  • failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
  • if nextTask < len(mr.files) {
  • fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
  • mr.runMapTask(nextTask, newWorker, doneTask)
  • taskWorkerMap[nextTask] = newWorker
  • if nTaskIndex == nextTask {
  • nTaskIndex++
  • }
  • }
  • case taskStat := <-doneTask:
  • var w string
  • taskNumber, ok := taskStat.taskNumber, taskStat.ok
  • if !ok {
  • failedTasks = append(failedTasks, taskNumber)
  • } else {
  • w = taskWorkerMap[taskNumber]
  • delete(taskWorkerMap, taskNumber)
  • }
  • if mr.checkWorkerExist(w) {
  • fmt.Println("failed task count, failed tasks", len(failedTasks), failedTasks)
  • var nextTask int
  • failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
  • if nextTask < len(mr.files) {
  • fmt.Println("failed task count, failed tasks", len(failedTasks), failedTasks)
  • fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
  • mr.runMapTask(nextTask, w, doneTask)
  • taskWorkerMap[nextTask] = w
  • if nTaskIndex == nextTask {
  • nTaskIndex++
  • }
  • }
  • }
  • fmt.Println("task index %d, task number %d, map count", nTaskIndex, taskNumber, len(taskWorkerMap))
  • for k, v := range taskWorkerMap {
  • fmt.Println("%s:%v", k, v)
  • }
  • if (nTaskIndex == len(mr.files)) && (0 == len(taskWorkerMap)) {
  • fmt.Println("all tasks in mapPhase is done")
  • return
  • }
  • }
  • }
  • }
  • 先根据已注册的 Worker 数量,生成相应数量的 Map 任务,然后发送给 Worker 执行
  • 接下来在,在 select 中处理两种事件:一是有新 Worker 注册的事件;二是之前调度的任务执行完成的事件

不带 Worker 容灾的处理:

处理新 Worker 注册的事件的方式为选择下一个要执行的任务,发送给新注册的 Worker 去执行。

处理调度完成的事件的方式为选择下个需要执行的任务,调度给刚刚完成执行任务的 Worker 执行。当所有的 Map 任务都处理完成后,表示 Map 阶段完成,退出调度。

带 Worker 容灾的处理:

Worker 容灾的处理逻辑为,当任务执行失败时,加入到执行失败的任务队列中,当发生上述两种事件时,先从失败的任务队列中拿下一个任务执行,只有当失败的任务队列为空时,才调度新的任务执行。

ScheduleReduce

ScheduleReduce 的实现如下

  • func (mr *Master) scheduleReduce() {
  • fmt.Println("start scheduling reduce tasks")
  • taskWorkerMap := make(map[int]string)
  • doneTask := make(chan taskStat, 1)
  • var failedTasks []int
  • var nTaskIndex = 0
  • mr.Lock()
  • var nInitTask = min(mr.nReduce, len(mr.workers))
  • mr.Unlock()
  • for ; nTaskIndex < nInitTask; nTaskIndex++ {
  • mr.Lock()
  • w := mr.workers[nTaskIndex]
  • mr.Unlock()
  • taskWorkerMap[nTaskIndex] = w
  • mr.runReduceTask(nTaskIndex, w, doneTask)
  • }
  • for {
  • select {
  • case newWorker := <-mr.registerChannel:
  • fmt.Println("New Worker register %s", newWorker)
  • var nextTask int
  • failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
  • if nextTask < mr.nReduce {
  • fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
  • mr.runReduceTask(nextTask, newWorker, doneTask)
  • taskWorkerMap[nextTask] = newWorker
  • if nTaskIndex == nextTask {
  • nTaskIndex++
  • }
  • }
  • case taskStat := <-doneTask:
  • var w string
  • taskNumber, ok := taskStat.taskNumber, taskStat.ok
  • if !ok {
  • failedTasks = append(failedTasks, taskNumber)
  • } else {
  • w = taskWorkerMap[taskNumber]
  • delete(taskWorkerMap, taskNumber)
  • }
  • if mr.checkWorkerExist(w) {
  • var nextTask int
  • failedTasks, nextTask = mr.chooseTask(failedTasks, nTaskIndex)
  • if nextTask < mr.nReduce {
  • fmt.Println("nextTask %d, total %d", nextTask, len(mr.files))
  • mr.runReduceTask(nextTask, w, doneTask)
  • taskWorkerMap[nextTask] = w
  • if nTaskIndex == nextTask {
  • nTaskIndex++
  • }
  • }
  • }
  • fmt.Println("task index %d, task number %d, map count", nTaskIndex, taskNumber, len(taskWorkerMap))
  • for k, v := range taskWorkerMap {
  • fmt.Println("%s:%v", k, v)
  • }
  • if (nTaskIndex == mr.nReduce) && (0 == len(taskWorkerMap)) {
  • fmt.Println("all tasks in mapPhase is done")
  • return
  • }
  • }
  • }
  • }

整体的处理逻辑和 ScheduleMap 类似,如下

  • 先根据已注册的 Worker 数量,生成相应数量的 Reduce 任务,然后发送给 Worker 执行
  • 接下来在,在 select 中处理两种事件:一是有新 Worker 注册的事件;二是之前调度的任务执行完成的事件

容灾过程也是类似的,不再赘述。

CDSY,CDSY.XYZ
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
    无相关信息
栏目更新
栏目热门
本栏推荐