如何用Spark来实现已有的MapReduce程序

如题所述

  本文将简要展示怎样在Spark中重现以上过程,您将发现不需要逐字翻译Mapper和Reducer!

  作为元组的键值对
  假定我们需要计算大文本中每一行的长度,并且报告每个长度的行数。在HadoopMapReduce中,我们首先使用一个Mapper,生成为以行的长度作为key,1作为value的键值对。
  public class LineLengthMapper extends
  Mapper<LongWritable, Text, IntWritable, IntWritable> {
  @Override
  protected void map(LongWritable lineNumber, Text line, Context context)
  throws IOException, InterruptedException {
  context.write(new IntWritable(line.getLength()), new IntWritable(1));
  }
  }

  值得注意的是Mappers和Reducers只对键值对进行操作。所以由TextInputFormat提供输入给LineLengthMapper,实际上也是以文本中位置为key(很少这么用,但是总是需要有东西作为Key),文本行为值的键值对。

  与之对应的Spark实现:

  lines.map(line => (line.length, 1))

  Spark中,输入只是String构成的RDD,而不是key-value键值对。Spark中对key-value键值对的表示是一个Scala的元组,用(A,B)这样的语法来创建。上面的map操作的结果是(Int,Int)元组的RDD。当一个RDD包含很多元组,它获得了多个方法,如reduceByKey,这对再现MapReduce行为将是至关重要的。

  Reduce
  reduce()与reduceBykey()
  统计行的长度的键值对,需要在Reducer中对每种长度作为key,计算其行数的总和作为value。
  public class LineLengthReducer extends
  Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  @Override
  protected void reduce(IntWritable length, Iterable<IntWritable> counts,
  Context context) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable count : counts) {
  sum += count.get();
  }
  context.write(length, new IntWritable(sum));
  }
  }

  Spark中与上述Mapper,Reducer对应的实现只要一行代码:
  val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)

  Spark的RDD API有个reduce方法,但是它会将所有key-value键值对reduce为单个value。这并不是Hadoop MapReduce的行为,Spark中与之对应的是ReduceByKey。

  另外,Reducer的Reduce方法接收多值流,并产生0,1或多个结果。而reduceByKey,它接受的是一个将两个值转化为一个值的函数,在这里,就是把两个数字映射到它们的和的简单加法函数。此关联函数可以被调用者用来reduce多个值到一个值。与Reducer方法相比,他是一个根据Key来Reduce Value的更简单而更精确的API。
  Mapper
  map() 与 flatMap()
  现在,考虑一个统计以大写字母开头的单词的个数的算法。对于每行输入文本,Mapper可能产生0个,1个或多个键值对。
  public class CountUppercaseMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
  @Override
  protected void map(LongWritable lineNumber, Text line, Context context)
  throws IOException, InterruptedException {
  for (String word : line.toString().split(" ")) {
  if (Character.isUpperCase(word.charAt(0))) {
  context.write(new Text(word), new IntWritable(1));
  }
  }
  }
  }

  Spark对应的写法:
  lines.flatMap(
  _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))
  )
  简单的Spark map函数不适用于这种场景,因为map对于每个输入只能产生单个输出,但这个例子中一行需要产生多个输出。所以,和MapperAPI支持的相比,Spark的map函数语义更简单,应用范围更窄。

  Spark的解决方案是首先将每行映射为一组输出值,这组值可能为空值或多值。随后会通过flatMap函数被扁平化。数组中的词会被过滤并被转化为函数中的元组。这个例子中,真正模仿Mapper行为的是flatMap,而不是map。
  groupByKey()
  写一个统计次数的reducer是简单的,在Spark中,reduceByKey可以被用来统计每个单词的总数。比如出于某种原因要求输出文件中每个单词都要显示为大写字母和其数量,在MapReduce中,实现如下:
  public class CountUppercaseReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {
  @Override
  protected void reduce(Text word, Iterable<IntWritable> counts, Context context)
  throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable count : counts) {
  sum += count.get();
  }
  context
  .write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
  }
  }

  但是redeceByKey不能单独在Spark中工作,因为他保留了原来的key。为了在Spark中模拟,我们需要一些更像Reducer API的操作。我们知道Reducer的reduce方法接受一个key和一组值,然后完成一组转换。groupByKey和一个连续的map操作能够达到这样的目标:
  groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }
  groupByKey只是将某一个key的所有值收集在一起,并且不提供reduce功能。以此为基础,任何转换都可以作用在key和一系列值上。此处,将key转变为大写字母,将values直接求和。

  setup()和cleanup()

  在MapReduce中,Mapper和Reducer可以声明一个setup方法,在处理输入之前执行,来进行分配数据库连接等昂贵资源,同时可以用cleanup函数可以释放资源。
  public class SetupCleanupMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
  private Connection dbConnection;
  @Override
  protected void setup(Context context) {
  dbConnection = ...;
  }
  ...
  @Override
  protected void cleanup(Context context) {
  dbConnection.close();
  }
  }

  Spark中的map和flatMap方法每次只能在一个input上操作,而且没有提供在转换大批值前后执行代码的方法,看起来,似乎可以直接将setup和cleanup代码放在Sparkmap函数调用之前和之后:
  val dbConnection = ...
  lines.map(... dbConnection.createStatement(...) ...)
  dbConnection.close() // Wrong!

  然而这种方法却不可行,原因在于:
  · 它将对象dbConnection放在map函数的闭包中,这需要他是可序列化的(比如,通过java.io.Serializable实现)。而数据库连接这种对象一般不能被序列化。
  · map是一种转换,而不是操作,并且拖延执行。连接对象不能被及时关闭。
  · 即便如此,它也只能关闭driver上的连接,而不是释放被序列化拷贝版本分配的资源连接。

  事实上,map和flatMap都不是Spark中Mapper的最接近的对应函数,Spark中Mapper的最接近的对应函数是十分重要的mapPartitions()方法,这个方法能够不仅完成单值对单值的映射,也能完成一组值对另一组值的映射,很像一个批映射(bulkmap)方法。这意味着mapPartitions()方法能够在开始时从本地分配资源,并在批映射结束时释放资源。

  添加setup方法是简单的,添加cleanup会更困难,这是由于检测转换完成仍然是困难的。例如,这样是能工作的:

  lines.mapPartitions { valueIterator =>
  val dbConnection = ... // OK
  val transformedIterator = valueIterator.map(... dbConnection ...)
  dbConnection.close() // Still wrong! May not have evaluated iterator
  transformedIterator
  }
转载
温馨提示:答案为网友推荐,仅供参考
相似回答
大家正在搜