ããæ¬æå°ç®è¦å±ç¤ºææ ·å¨Sparkä¸éç°ä»¥ä¸è¿ç¨ï¼æ¨å°åç°ä¸éè¦éåç¿»è¯MapperåReducerï¼
ããä½ä¸ºå
ç»çé®å¼å¯¹
ããåå®æ们éè¦è®¡ç®å¤§ææ¬ä¸æ¯ä¸è¡çé¿åº¦ï¼å¹¶ä¸æ¥åæ¯ä¸ªé¿åº¦çè¡æ°ãå¨HadoopMapReduceä¸ï¼æ们é¦å
使ç¨ä¸ä¸ªMapperï¼çæ为以è¡çé¿åº¦ä½ä¸ºkeyï¼1ä½ä¸ºvalueçé®å¼å¯¹ã
ããpublic class LineLengthMapper extends
ããMapper<LongWritable, Text, IntWritable, IntWritable> {
ãã@Override
ããprotected void map(LongWritable lineNumber, Text line, Context context)
ããthrows IOException, InterruptedException {
ããcontext.write(new IntWritable(line.getLength()), new IntWritable(1));
ãã}
ãã}
ããå¼å¾æ³¨æçæ¯MappersåReducersåªå¯¹é®å¼å¯¹è¿è¡æä½ãæ以ç±TextInputFormatæä¾è¾å
¥ç»LineLengthMapperï¼å®é
ä¸ä¹æ¯ä»¥ææ¬ä¸ä½ç½®ä¸ºkeyï¼å¾å°è¿ä¹ç¨ï¼ä½æ¯æ»æ¯éè¦æä¸è¥¿ä½ä¸ºKeyï¼ï¼ææ¬è¡ä¸ºå¼çé®å¼å¯¹ã
ããä¸ä¹å¯¹åºçSparkå®ç°ï¼
ããlines.map(line => (line.length, 1))
ããSparkä¸ï¼è¾å
¥åªæ¯StringææçRDDï¼èä¸æ¯key-valueé®å¼å¯¹ãSparkä¸å¯¹key-valueé®å¼å¯¹ç表示æ¯ä¸ä¸ªScalaçå
ç»ï¼ç¨(Aï¼B)è¿æ ·çè¯æ³æ¥å建ãä¸é¢çmapæä½çç»ææ¯(Intï¼Int)å
ç»çRDDãå½ä¸ä¸ªRDDå
å«å¾å¤å
ç»ï¼å®è·å¾äºå¤ä¸ªæ¹æ³ï¼å¦reduceByKeyï¼è¿å¯¹åç°MapReduceè¡ä¸ºå°æ¯è³å
³éè¦çã
ããReduce
ããreduce()ä¸reduceBykey()
ããç»è®¡è¡çé¿åº¦çé®å¼å¯¹ï¼éè¦å¨Reducerä¸å¯¹æ¯ç§é¿åº¦ä½ä¸ºkeyï¼è®¡ç®å
¶è¡æ°çæ»åä½ä¸ºvalueã
ããpublic class LineLengthReducer extends
ããReducer<IntWritable, IntWritable, IntWritable, IntWritable> {
ãã@Override
ããprotected void reduce(IntWritable length, Iterable<IntWritable> counts,
ããContext context) throws IOException, InterruptedException {
ããint sum = 0;
ããfor (IntWritable count : counts) {
ããsum += count.get();
ãã}
ããcontext.write(length, new IntWritable(sum));
ãã}
ãã}
ããSparkä¸ä¸ä¸è¿°Mapperï¼Reducer对åºçå®ç°åªè¦ä¸è¡ä»£ç ï¼
ããval lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)
ããSparkçRDD APIæ个reduceæ¹æ³ï¼ä½æ¯å®ä¼å°æækey-valueé®å¼å¯¹reduce为å个valueãè¿å¹¶ä¸æ¯Hadoop MapReduceçè¡ä¸ºï¼Sparkä¸ä¸ä¹å¯¹åºçæ¯ReduceByKeyã
ããå¦å¤ï¼ReducerçReduceæ¹æ³æ¥æ¶å¤å¼æµï¼å¹¶äº§ç0ï¼1æå¤ä¸ªç»æãèreduceByKeyï¼å®æ¥åçæ¯ä¸ä¸ªå°ä¸¤ä¸ªå¼è½¬å为ä¸ä¸ªå¼çå½æ°ï¼å¨è¿éï¼å°±æ¯æ两个æ°åæ å°å°å®ä»¬çåçç®åå æ³å½æ°ãæ¤å
³èå½æ°å¯ä»¥è¢«è°ç¨è
ç¨æ¥reduceå¤ä¸ªå¼å°ä¸ä¸ªå¼ãä¸Reduceræ¹æ³ç¸æ¯ï¼ä»æ¯ä¸ä¸ªæ ¹æ®Keyæ¥Reduce Valueçæ´ç®åèæ´ç²¾ç¡®çAPIã
ããMapper
ããmap() ä¸ flatMap()
ããç°å¨ï¼èèä¸ä¸ªç»è®¡ä»¥å¤§ååæ¯å¼å¤´çåè¯ç个æ°çç®æ³ã对äºæ¯è¡è¾å
¥ææ¬ï¼Mapperå¯è½äº§ç0个ï¼1个æå¤ä¸ªé®å¼å¯¹ã
ããpublic class CountUppercaseMapper extends
ããMapper<LongWritable, Text, Text, IntWritable> {
ãã@Override
ããprotected void map(LongWritable lineNumber, Text line, Context context)
ããthrows IOException, InterruptedException {
ããfor (String word : line.toString().split(" ")) {
ããif (Character.isUpperCase(word.charAt(0))) {
ããcontext.write(new Text(word), new IntWritable(1));
ãã}
ãã}
ãã}
ãã}
ããSpark对åºçåæ³ï¼
ããlines.flatMap(
ãã_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))
ãã)
ããç®åçSpark mapå½æ°ä¸éç¨äºè¿ç§åºæ¯ï¼å 为map对äºæ¯ä¸ªè¾å
¥åªè½äº§çå个è¾åºï¼ä½è¿ä¸ªä¾åä¸ä¸è¡éè¦äº§çå¤ä¸ªè¾åºãæ以ï¼åMapperAPIæ¯æçç¸æ¯ï¼Sparkçmapå½æ°è¯ä¹æ´ç®åï¼åºç¨èå´æ´çªã
ããSparkç解å³æ¹æ¡æ¯é¦å
å°æ¯è¡æ å°ä¸ºä¸ç»è¾åºå¼ï¼è¿ç»å¼å¯è½ä¸ºç©ºå¼æå¤å¼ãéåä¼éè¿flatMapå½æ°è¢«æå¹³åãæ°ç»ä¸çè¯ä¼è¢«è¿æ»¤å¹¶è¢«è½¬å为å½æ°ä¸çå
ç»ãè¿ä¸ªä¾åä¸ï¼çæ£æ¨¡ä»¿Mapperè¡ä¸ºçæ¯flatMapï¼èä¸æ¯mapã
ããgroupByKey()
ããåä¸ä¸ªç»è®¡æ¬¡æ°çreduceræ¯ç®åçï¼å¨Sparkä¸ï¼reduceByKeyå¯ä»¥è¢«ç¨æ¥ç»è®¡æ¯ä¸ªåè¯çæ»æ°ãæ¯å¦åºäºæç§åå è¦æ±è¾åºæ件ä¸æ¯ä¸ªåè¯é½è¦æ¾ç¤ºä¸ºå¤§ååæ¯åå
¶æ°éï¼å¨MapReduceä¸ï¼å®ç°å¦ä¸ï¼
ããpublic class CountUppercaseReducer extends
ããReducer<Text, IntWritable, Text, IntWritable> {
ãã@Override
ããprotected void reduce(Text word, Iterable<IntWritable> counts, Context context)
ããthrows IOException, InterruptedException {
ããint sum = 0;
ããfor (IntWritable count : counts) {
ããsum += count.get();
ãã}
ããcontext
ãã.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
ãã}
ãã}
ããä½æ¯redeceByKeyä¸è½åç¬å¨Sparkä¸å·¥ä½ï¼å 为ä»ä¿çäºåæ¥çkeyã为äºå¨Sparkä¸æ¨¡æï¼æ们éè¦ä¸äºæ´åReducer APIçæä½ãæ们ç¥éReducerçreduceæ¹æ³æ¥åä¸ä¸ªkeyåä¸ç»å¼ï¼ç¶åå®æä¸ç»è½¬æ¢ãgroupByKeyåä¸ä¸ªè¿ç»çmapæä½è½å¤è¾¾å°è¿æ ·çç®æ :
ããgroupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }
ããgroupByKeyåªæ¯å°æä¸ä¸ªkeyçææå¼æ¶éå¨ä¸èµ·ï¼å¹¶ä¸ä¸æä¾reduceåè½ã以æ¤ä¸ºåºç¡ï¼ä»»ä½è½¬æ¢é½å¯ä»¥ä½ç¨å¨keyåä¸ç³»åå¼ä¸ãæ¤å¤ï¼å°key转å为大ååæ¯ï¼å°valuesç´æ¥æ±åã
ããsetup()åcleanup()
ããå¨MapReduceä¸ï¼MapperåReducerå¯ä»¥å£°æä¸ä¸ªsetupæ¹æ³ï¼å¨å¤çè¾å
¥ä¹åæ§è¡ï¼æ¥è¿è¡åé
æ°æ®åºè¿æ¥çæè´µèµæºï¼åæ¶å¯ä»¥ç¨cleanupå½æ°å¯ä»¥éæ¾èµæºã
ããpublic class SetupCleanupMapper extends
ããMapper<LongWritable, Text, Text, IntWritable> {
ããprivate Connection dbConnection;
ãã@Override
ããprotected void setup(Context context) {
ããdbConnection = ...;
ãã}
ãã...
ãã@Override
ããprotected void cleanup(Context context) {
ããdbConnection.close();
ãã}
ãã}
ããSparkä¸çmapåflatMapæ¹æ³æ¯æ¬¡åªè½å¨ä¸ä¸ªinputä¸æä½ï¼èä¸æ²¡ææä¾å¨è½¬æ¢å¤§æ¹å¼ååæ§è¡ä»£ç çæ¹æ³ï¼çèµ·æ¥ï¼ä¼¼ä¹å¯ä»¥ç´æ¥å°setupåcleanup代ç æ¾å¨Sparkmapå½æ°è°ç¨ä¹ååä¹åï¼
ããval dbConnection = ...
ããlines.map(... dbConnection.createStatement(...) ...)
ããdbConnection.close() // Wrong!
ããç¶èè¿ç§æ¹æ³å´ä¸å¯è¡ï¼åå å¨äºï¼
ãã· å®å°å¯¹è±¡dbConnectionæ¾å¨mapå½æ°çéå
ä¸ï¼è¿éè¦ä»æ¯å¯åºååçï¼æ¯å¦ï¼éè¿java.io.Serializableå®ç°ï¼ãèæ°æ®åºè¿æ¥è¿ç§å¯¹è±¡ä¸è¬ä¸è½è¢«åºååã
ãã· mapæ¯ä¸ç§è½¬æ¢ï¼èä¸æ¯æä½ï¼å¹¶ä¸æ延æ§è¡ãè¿æ¥å¯¹è±¡ä¸è½è¢«åæ¶å
³éã
ãã· å³ä¾¿å¦æ¤ï¼å®ä¹åªè½å
³édriverä¸çè¿æ¥ï¼èä¸æ¯éæ¾è¢«åºååæ·è´çæ¬åé
çèµæºè¿æ¥ã
ããäºå®ä¸ï¼mapåflatMapé½ä¸æ¯Sparkä¸Mapperçææ¥è¿ç对åºå½æ°ï¼Sparkä¸Mapperçææ¥è¿ç对åºå½æ°æ¯ååéè¦çmapPartitions()æ¹æ³ï¼è¿ä¸ªæ¹æ³è½å¤ä¸ä»
å®æåå¼å¯¹åå¼çæ å°ï¼ä¹è½å®æä¸ç»å¼å¯¹å¦ä¸ç»å¼çæ å°ï¼å¾åä¸ä¸ªæ¹æ å°ï¼bulkmapï¼æ¹æ³ãè¿æå³çmapPartitions()æ¹æ³è½å¤å¨å¼å§æ¶ä»æ¬å°åé
èµæºï¼å¹¶å¨æ¹æ å°ç»ææ¶éæ¾èµæºã
ããæ·»å setupæ¹æ³æ¯ç®åçï¼æ·»å cleanupä¼æ´å°é¾ï¼è¿æ¯ç±äºæ£æµè½¬æ¢å®æä»ç¶æ¯å°é¾çãä¾å¦ï¼è¿æ ·æ¯è½å·¥ä½çï¼
ããlines.mapPartitions { valueIterator =>
ããval dbConnection = ... // OK
ããval transformedIterator = valueIterator.map(... dbConnection ...)
ããdbConnection.close() // Still wrong! May not have evaluated iterator
ããtransformedIterator
ãã}
转载
温馨提示:答案为网友推荐,仅供参考