MapReduce是一种编程模型,通过将工作分成独立的任务并在一组机器上并行执行任务,可以处理和生成大量数据。 MapReduce编程风格的灵感来自函数式编程结构map和reduce,它们通常用于处理数据列表。在高层MapReduce程序将输入数据元素列表转换为输出数据元素列表两次,一次在映射阶段,一次在还原阶段。
本章首先介绍MapReduce编程模型,并描述数据如何流经模型的不同阶段。然后示例如何使用Python编写MapReduce作业。
数据流
MapReduce框架由三个主要阶段组成:map,shuffle和sort,以及reduce。
在映射阶段,mapper函数处理一系列键值对。映射器按顺序处理键值对,产生零个或多个输出键值对。
比如将句子转换为单词。输入是包含句子的字符串,映射器将句子拆分为单词并输出单词。
映射阶段的中间输出将移动到reducer。将输出从映射器移动到reducer的过程称为洗(shuffling)。
Shuffling由分区函数处理,称为partitioner。partitioner用于控制从映射器到reducer的键值对的流动。reducer知道映射器的输出键和reducer的数量,返回预期的reducer的索引。partitioner程序确保将同一键的所有值发送到同一reducer。默认分区程序是基于哈希的。它计算映射器输出键的哈希值,并根据此结果分配分区。
reducers开始处理数据之前的最后阶段是排序过程。在呈现给reducer之前,每个分区的中间键和值都由Hadoop框架排序。
在reducer阶段,值的迭代器被提供给称为reducer的函数。迭代器把值提供给reducer,这些值是唯一键的一组非唯一值。 reducer聚合每个唯一键的值,并产生零个或多个输出键值对。
数据流|
比如对键的所有值求和。此reducer的输入是键的所有值,reducer对所有值求和。然后,reducer输出键值对中包含输入键和输入键值的总和。
Hadoop流
Hadoop流是与Hadoop发行版一起打包的工具,它允许使用任何可执行文件创建MapReduce作业作为映射器和reducer。 Hadoop流实用程序支持Python,shell。
mapper和reducer都是可执行文件,它们从标准输入(stdin),逐行读取输入,并将写入标准输出(stdout)。 Hadoop流公国创建MapReduce作业,将作业提交到集群,并监视其进度直到完成。
mapper初始化时,每个映射任务都会将指定的可执行文件作为单独的进程启动。映射器读取输入文件,并通过stdin将每行显示给可执行文件。在可执行文件处理每行输入后,映射器从stdout收集输出并将每一行转换为键值对。键由第一个制表符前面的行部分组成,值由第一个制表符后面的行部分组成。如果一行不包含制表符,则整行被视为键,值为null。
初始化reducer时,每个reduce任务都会将指定的可执行文件作为单独的进程启动。 reducer将输入键值对转换为通过stdin呈现给可执行文件的行。
reducer从stdout收集可执行文件的结果,并将每一行转换为键值对。与映射器类似,可执行文件通过制表符分隔键和值来指定键值对。
下面我们用python来模拟Hadoop流工具。
mapper.py:在WordCount的map阶段实现逻辑的Python程序。它从stdin读取数据,将行拆分为单词,并将每个单词的中间计数输出到stdout。
#!/usr/bin/env python # 项目实战讨论QQ群630011153 144081101 # https://github.com/china-testing/python-api-tesing import sys # Read each line from STDIN for line in sys.stdin: # Get the words in each line words = line.split() # Generate the count for each word for word in words: # Write the key-value pair to STDOUT to be processed by the reducer. # The key is anything before the first tab character and the value is # anything after the first tab character. print('{0} {1}'.format(word, 1))
reducer.py是在WordCount的reduce阶段实现逻辑的Python程序。它从stdin中读取mapper.py的结果,对每个单词的出现次数求和,并将结果写入stdout。
#!/usr/bin/env python import sys curr_word = None curr_count = 0 # Process each key-value pair from the mapper for line in sys.stdin: # Get the key and value from the current line word, count = line.split(' ') # Convert the count to an int count = int(count) # If the current word is the same as the previous word, increment its # count, otherwise print the words count to STDOUT if word == curr_word: curr_count += count else: # Write word and its number of occurrences as a key-value pair to STDOUT if curr_word: print('{0} {1}'.format(curr_word, curr_count)) curr_word = word curr_count = count # Output the count for the last word if curr_word == word: print('{0} {1}'.format(curr_word, curr_count))
执行
$ echo 'jack be nimble jack be quick' | ./mapper.py | sort -t 1 | ./reducer.py be 2 jack 2 nimble 1 quick 1
现在我们把'jack be nimble jack be quick'存成/home/hduser_/input2.txt,用hadoop来实现这一过程。
$ hdfs dfs -put /home/hduser_/input2.txt /user/hduser $ $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /user/hduser/input2.txt -output /user/hduser/output 19/01/22 10:44:38 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 19/01/22 10:44:38 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 19/01/22 10:44:38 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 19/01/22 10:44:38 ERROR streaming.StreamJob: Error Launching job : Output directory hdfs://localhost:54310/user/hduser/output already exists Streaming Command Failed! hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/MapReduce/HadoopStreaming$ $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /user/hduser/input2.txt -output /user/hduser/output2 19/01/22 10:44:45 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 19/01/22 10:44:45 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 19/01/22 10:44:45 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 19/01/22 10:44:46 INFO mapred.FileInputFormat: Total input files to process : 1 19/01/22 10:44:46 INFO mapreduce.JobSubmitter: number of splits:1 19/01/22 10:44:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local208759810_0001 19/01/22 10:44:46 INFO mapred.LocalDistributedCacheManager: Localized file:/home/andrew/code/HadoopWithPython/python/MapReduce/HadoopStreaming/mapper.py as file:/app/hadoop/tmp/mapred/local/1548125086275/mapper.py 19/01/22 10:44:46 INFO mapred.LocalDistributedCacheManager: Localized file:/home/andrew/code/HadoopWithPython/python/MapReduce/HadoopStreaming/reducer.py as file:/app/hadoop/tmp/mapred/local/1548125086276/reducer.py 19/01/22 10:44:46 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 19/01/22 10:44:46 INFO mapred.LocalJobRunner: OutputCommitter set in config null 19/01/22 10:44:46 INFO mapreduce.Job: Running job: job_local208759810_0001 19/01/22 10:44:46 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 19/01/22 10:44:46 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 19/01/22 10:44:46 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Waiting for map tasks 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Starting task: attempt_local208759810_0001_m_000000_0 19/01/22 10:44:46 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 19/01/22 10:44:46 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 19/01/22 10:44:46 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 19/01/22 10:44:46 INFO mapred.MapTask: Processing split: hdfs://localhost:54310/user/hduser/input2.txt:0+29 19/01/22 10:44:46 INFO mapred.MapTask: numReduceTasks: 1 19/01/22 10:44:46 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 19/01/22 10:44:46 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 19/01/22 10:44:46 INFO mapred.MapTask: soft limit at 83886080 19/01/22 10:44:46 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 19/01/22 10:44:46 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 19/01/22 10:44:46 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 19/01/22 10:44:46 INFO streaming.PipeMapRed: PipeMapRed exec [/home/andrew/code/HadoopWithPython/python/MapReduce/HadoopStreaming/./mapper.py] 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir 19/01/22 10:44:46 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir 19/01/22 10:44:46 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 19/01/22 10:44:46 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 19/01/22 10:44:46 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 19/01/22 10:44:46 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 19/01/22 10:44:46 INFO streaming.PipeMapRed: Records R/W=1/1 19/01/22 10:44:46 INFO streaming.PipeMapRed: MRErrorThread done 19/01/22 10:44:46 INFO streaming.PipeMapRed: mapRedFinished 19/01/22 10:44:46 INFO mapred.LocalJobRunner: 19/01/22 10:44:46 INFO mapred.MapTask: Starting flush of map output 19/01/22 10:44:46 INFO mapred.MapTask: Spilling map output 19/01/22 10:44:46 INFO mapred.MapTask: bufstart = 0; bufend = 41; bufvoid = 104857600 19/01/22 10:44:46 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214376(104857504); length = 21/6553600 19/01/22 10:44:46 INFO mapred.MapTask: Finished spill 0 19/01/22 10:44:46 INFO mapred.Task: Task:attempt_local208759810_0001_m_000000_0 is done. And is in the process of committing 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Records R/W=1/1 19/01/22 10:44:46 INFO mapred.Task: Task 'attempt_local208759810_0001_m_000000_0' done. 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Finishing task: attempt_local208759810_0001_m_000000_0 19/01/22 10:44:46 INFO mapred.LocalJobRunner: map task executor complete. 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Waiting for reduce tasks 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Starting task: attempt_local208759810_0001_r_000000_0 19/01/22 10:44:46 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 19/01/22 10:44:46 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 19/01/22 10:44:46 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 19/01/22 10:44:46 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@78674b51 19/01/22 10:44:46 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10 19/01/22 10:44:46 INFO reduce.EventFetcher: attempt_local208759810_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 19/01/22 10:44:46 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local208759810_0001_m_000000_0 decomp: 55 len: 59 to MEMORY 19/01/22 10:44:46 INFO reduce.InMemoryMapOutput: Read 55 bytes from map-output for attempt_local208759810_0001_m_000000_0 19/01/22 10:44:46 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 55, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->55 19/01/22 10:44:46 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 19/01/22 10:44:46 INFO mapred.LocalJobRunner: 1 / 1 copied. 19/01/22 10:44:46 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 19/01/22 10:44:46 INFO mapred.Merger: Merging 1 sorted segments 19/01/22 10:44:46 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 19/01/22 10:44:46 INFO reduce.MergeManagerImpl: Merged 1 segments, 55 bytes to disk to satisfy reduce memory limit 19/01/22 10:44:46 INFO reduce.MergeManagerImpl: Merging 1 files, 59 bytes from disk 19/01/22 10:44:46 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 19/01/22 10:44:46 INFO mapred.Merger: Merging 1 sorted segments 19/01/22 10:44:46 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes 19/01/22 10:44:46 INFO mapred.LocalJobRunner: 1 / 1 copied. 19/01/22 10:44:46 INFO streaming.PipeMapRed: PipeMapRed exec [/home/andrew/code/HadoopWithPython/python/MapReduce/HadoopStreaming/./reducer.py] 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 19/01/22 10:44:46 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 19/01/22 10:44:46 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 19/01/22 10:44:46 INFO streaming.PipeMapRed: Records R/W=6/1 19/01/22 10:44:46 INFO streaming.PipeMapRed: MRErrorThread done 19/01/22 10:44:46 INFO streaming.PipeMapRed: mapRedFinished 19/01/22 10:44:46 INFO mapred.Task: Task:attempt_local208759810_0001_r_000000_0 is done. And is in the process of committing 19/01/22 10:44:46 INFO mapred.LocalJobRunner: 1 / 1 copied. 19/01/22 10:44:46 INFO mapred.Task: Task attempt_local208759810_0001_r_000000_0 is allowed to commit now 19/01/22 10:44:46 INFO output.FileOutputCommitter: Saved output of task 'attempt_local208759810_0001_r_000000_0' to hdfs://localhost:54310/user/hduser/output2/_temporary/0/task_local208759810_0001_r_000000 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Records R/W=6/1 > reduce 19/01/22 10:44:46 INFO mapred.Task: Task 'attempt_local208759810_0001_r_000000_0' done. 19/01/22 10:44:46 INFO mapred.LocalJobRunner: Finishing task: attempt_local208759810_0001_r_000000_0 19/01/22 10:44:46 INFO mapred.LocalJobRunner: reduce task executor complete. 19/01/22 10:44:47 INFO mapreduce.Job: Job job_local208759810_0001 running in uber mode : false 19/01/22 10:44:47 INFO mapreduce.Job: map 100% reduce 100% 19/01/22 10:44:47 INFO mapreduce.Job: Job job_local208759810_0001 completed successfully 19/01/22 10:44:47 INFO mapreduce.Job: Counters: 35 File System Counters FILE: Number of bytes read=273356 FILE: Number of bytes written=1217709 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=58 HDFS: Number of bytes written=29 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=1 Map output records=6 Map output bytes=41 Map output materialized bytes=59 Input split bytes=97 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=59 Reduce input records=6 Reduce output records=4 Spilled Records=12 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 Total committed heap usage (bytes)=552599552 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=29 File Output Format Counters Bytes Written=29 19/01/22 10:44:47 INFO streaming.StreamJob: Output directory: /user/hduser/output2 $ hdfs dfs -cat /user/hduser/output2/part-00000 be 2 jack 2 nimble 1 quick 1
本系列教程目录:https://www.jianshu.com/c/dde4ef0f60a0
页面更新:2024-03-15
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号