博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
064 SparkStream与kafka的集成,主要是编程
阅读量:6220 次
发布时间:2019-06-21

本文共 10441 字,大约阅读时间需要 34 分钟。

  这里面包含了如何在kafka+sparkStreaming集成后的开发,也包含了一部分的优化。

一:说明

1.官网

  指导网址:http://spark.apache.org/docs/1.6.1/streaming-kafka-integration.html

  

 

2.SparkStream+kafka

  Use Receiver

    内部使用kafka的high lenel consumer API

    consumer offset 只能保持到zk/kafka中,只能通过配置进行offset的相关操作

  Direct

    内部使用的是kafka的simple consumer api

    自定义对kafka的offset偏移量进行控制操作

    集成依赖pom配置:

      

 

二:单Receiver的程序

1.先启动服务

  在这里需要启动kafka的生产者

  

 

2.程序

1 package com.stream.it 2  3 import kafka.serializer.StringDecoder 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.dstream.ReceiverInputDStream 6 import org.apache.spark.streaming.kafka.KafkaUtils 7 import org.apache.spark.streaming.{Seconds, StreamingContext} 8 import org.apache.spark.{SparkConf, SparkContext} 9 10 object KafkaWordcount {11   def main(args: Array[String]): Unit = {12     val conf=new SparkConf()13         .setAppName("spark-streaming-wordcount")14           .setMaster("local[*]")15     val sc=SparkContext.getOrCreate(conf)16     val ssc=new StreamingContext(sc,Seconds(15))17 18     /*19     def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](20       ssc: StreamingContext,21       kafkaParams: Map[String, String],22       topics: Map[String, Int],23       storageLevel: StorageLevel24     ): ReceiverInputDStream[(K, V)]25     */26     val kafkaParams=Map("group.id"->"stream-sparking-0",27           "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",28           "auto.offset.reset"->"smallest"29     )30     val topics=Map("beifeng"->1)31     val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](32       ssc,             //给定sparkStreaming的上下文33       kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接34       topics,          //给定读取对应的topic的名称以及读取数据的线程数量35       StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别36     ).map(_._2)37 38 39     val resultWordcount=dStream40       .filter(line=>line.nonEmpty)41         .flatMap(line=>line.split(" ").map((_,1)))42         .reduceByKey(_+_)43     resultWordcount.foreachRDD(rdd=>{44       rdd.foreachPartition(iter=>iter.foreach(println))45     })46 47     //启动48     ssc.start()49     //等到50     ssc.awaitTermination()51   }52 }

 

3.效果

  在kafka producer输入内容,将会在控制台上进行展示

 

三:多Receiver

1.说明

  当当个reveiver接收的数据被限制的时候,可以使用多个receiver

 

2.程序

1 package com.stream.it 2  3 import kafka.serializer.StringDecoder 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.kafka.KafkaUtils 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 import org.apache.spark.{SparkConf, SparkContext} 8  9 object MulReceiverKafkaWordcount {10   def main(args: Array[String]): Unit = {11     val conf=new SparkConf()12         .setAppName("spark-streaming-wordcount2")13           .setMaster("local[*]")14     val sc=SparkContext.getOrCreate(conf)15     val ssc=new StreamingContext(sc,Seconds(15))16 17     /*18     def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](19       ssc: StreamingContext,20       kafkaParams: Map[String, String],21       topics: Map[String, Int],22       storageLevel: StorageLevel23     ): ReceiverInputDStream[(K, V)]24     */25     val kafkaParams=Map("group.id"->"stream-sparking-0",26           "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",27           "auto.offset.reset"->"smallest"28     )29     val topics=Map("beifeng"->4)30     val dStream1=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](31       ssc,             //给定sparkStreaming的上下文32       kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接33       topics,          //给定读取对应的topic的名称以及读取数据的线程数量34       StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别35     ).map(_._2)36 37     val dStream2=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](38       ssc,             //给定sparkStreaming的上下文39       kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接40       topics,          //给定读取对应的topic的名称以及读取数据的线程数量41       StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别42     ).map(_._2)43 44     //合并dstream45     val dStream=dStream1.union(dStream2)46 47 48     val resultWordcount=dStream49       .filter(line=>line.nonEmpty)50         .flatMap(line=>line.split(" ").map((_,1)))51         .reduceByKey(_+_)52     resultWordcount.foreachRDD(rdd=>{53       rdd.foreachPartition(iter=>iter.foreach(println))54     })55 56     //启动57     ssc.start()58     //等到59     ssc.awaitTermination()60   }61 }

 

3.效果 

  一条数据是一个event

  

   这里有两个receiver。

  

 

四:Direct

1.说明

  直接读取,不存在receiver

  不足,kafkaParams指定连接kafka的参数,内部使用的是kafka的SimpleConsumerAPI,所以,offset只能从头或者从尾开始读取,不能设置。

  topics:topic的名称

 

2.程序

1 package com.stream.it 2  3 import kafka.serializer.StringDecoder 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.kafka.KafkaUtils 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 import org.apache.spark.{SparkConf, SparkContext} 8  9 object DirectKafkaWordcount {10   def main(args: Array[String]): Unit = {11     val conf=new SparkConf()12         .setAppName("spark-streaming-wordcount")13           .setMaster("local[*]")14     val sc=SparkContext.getOrCreate(conf)15     val ssc=new StreamingContext(sc,Seconds(15))16 25     val kafkaParams=Map(26           "metadata.broker.list"->"linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093,linux-hadoop01.ibeifeng.com:9094",27           "auto.offset.reset"->"smallest"28     )29     val topics=Set("beifeng")30     val dStream=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](31       ssc,32       kafkaParams,33       topics).map(_._2)34     35     val resultWordcount=dStream36       .filter(line=>line.nonEmpty)37         .flatMap(line=>line.split(" ").map((_,1)))38         .reduceByKey(_+_)39     resultWordcount.foreachRDD(rdd=>{40       rdd.foreachPartition(iter=>iter.foreach(println))41     })42 43     //启动44     ssc.start()45     //等到46     ssc.awaitTermination()47   }48 }

 

3.效果

  没有receiver。

  

       

 

五:Direct实现是累加器管理offset偏移量

1.程序

  kafkaParams 中只有这个参数下才能生效。

  数据先进行保存或者打印,然后更新accumulable中的offset,然后下一批的dstream进行更新offset。

  累加器需要在外面进行定义。

 

1 package com.stream.it  2   3 import scala.collection.mutable  4 import kafka.common.TopicAndPartition  5 import kafka.message.MessageAndMetadata  6 import kafka.serializer.StringDecoder  7 import org.apache.spark.storage.StorageLevel  8 import org.apache.spark.streaming.kafka.KafkaUtils  9 import org.apache.spark.streaming.{Seconds, StreamingContext} 10 import org.apache.spark.{Accumulable, AccumulableParam, SparkConf, SparkContext} 11  12 object AccumubaleKafkaWordcount { 13   def main(args: Array[String]): Unit = { 14     val conf=new SparkConf() 15         .setAppName("spark-streaming-wordcount") 16           .setMaster("local[*]") 17     val sc=SparkContext.getOrCreate(conf) 18     val ssc=new StreamingContext(sc,Seconds(15)) 19     val accumu = DroppedAccumulable.getInstance(sc) 20  21     val kafkaParams = Map( 22       "metadata.broker.list" -> "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093,linux-hadoop01.ibeifeng.com:9094,linux-hadoop01.ibeifeng.com:9095" 23     ) 24  25     // TODO: 从某一个存储offset的地方读取offset偏移量数据, redis\hbase\其他地方..... 26     val fromOffsets = Map( 27       TopicAndPartition("beifeng", 0) -> -1L, // 如果这里给定的偏移量是异常的,会直接从kafka中读取偏移量数据(largest) 28       TopicAndPartition("beifeng", 1) -> 0L, 29       TopicAndPartition("beifeng", 2) -> 0L, 30       TopicAndPartition("beifeng", 3) -> 0L 31     ) 32  33  34     val dstream = KafkaUtils.createDirectStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder, String]( 35       ssc, // 上下文 36       kafkaParams, // kafka连接 37       fromOffsets, 38       (message: MessageAndMetadata[String, String]) => { 39         // 这一块在Executor上被执行 40         // 更新偏移量offset 41         val topic = message.topic 42         val paritionID = message.partition 43         val offset = message.offset 44         accumu += (topic, paritionID) -> offset 45         // 返回value的数据 46         message.message() 47       } 48     ) 49  50     val resultWordCount = dstream 51       .filter(line => line.nonEmpty) 52       .flatMap(line => line.split(" ").map((_, 1))) 53       .reduceByKey(_ + _) 54  55  56     resultWordCount.foreachRDD(rdd => { 57       // 在driver上执行 58       try { 59         rdd.foreachPartition(iter => { 60           // 代码在executor上执行 61           // TODO: 这里进行具体的数据保存操作 62           iter.foreach(println) 63         }) 64  65         // TODO: 在这里更新offset, 将数据写入到redis\hbase\其他地方..... 66         accumu.value.foreach(println) 67       } catch { 68         case e: Exception => // nothings 69       } 70     }) 71  72  73  74     //启动 75     ssc.start() 76     //等到 77     ssc.awaitTermination() 78   } 79 } 80 object DroppedAccumulable { 81   private var instance: Accumulable[mutable.Map[(String, Int), Long], ((String, Int), Long)] = null 82  83   def getInstance(sc: SparkContext): Accumulable[mutable.Map[(String, Int), Long], ((String, Int), Long)] = { 84     if (instance == null) { 85       synchronized { 86         if (instance == null) instance = sc.accumulable(mutable.Map[(String, Int), Long]())(param = new AccumulableParam[mutable.Map[(String, Int), Long], ((String, Int), Long)]() { 87           /** 88             * 将t添加到r中 89             * 90             * @param r 91             * @param t 92             * @return 93             */ 94           override def addAccumulator(r: mutable.Map[(String, Int), Long], t: ((String, Int), Long)): mutable.Map[(String, Int), Long] = { 95             val oldOffset = r.getOrElse(t._1, t._2) 96             if (t._2 >= oldOffset) r += t 97             else r 98           } 99 100           override def addInPlace(r1: mutable.Map[(String, Int), Long], r2: mutable.Map[(String, Int), Long]): mutable.Map[(String, Int), Long] = {101             r2.foldLeft(r1)((r, t) => {102               val oldOffset = r.getOrElse(t._1, t._2)103               if (t._2 >= oldOffset) r += t104               else r105             })106           }107 108           override def zero(initialValue: mutable.Map[(String, Int), Long]): mutable.Map[(String, Int), Long] = mutable.Map.empty[(String, Int), Long]109         })110       }111     }112 113     // 返回结果114     instance115   }116 }

 

2.效果

  可以将以前的信息打出来。

  

  

 

转载地址:http://dvoja.baihongyu.com/

你可能感兴趣的文章
寻找正在连接中的网络连接
查看>>
svn client命令
查看>>
patch 用法
查看>>
Dos命令快速设置ip、网关、dns地址
查看>>
OpenCV中CvSVM部分函数解读
查看>>
SpringBoot项目创建
查看>>
Spring简化配置
查看>>
在 Asp.NET MVC 中使用 SignalR 实现推送功能
查看>>
图解排序算法(二)之希尔排序
查看>>
Android App中使用Gallery制作幻灯片播放效果
查看>>
第2阶段——编写uboot之编译测试以及改进(3)
查看>>
30多条mysql数据库优化方法,千万级数据库记录查询轻松解决
查看>>
Swift Modules for React Native
查看>>
牛客网Java刷题知识点之多线程同步的实现方法有哪些
查看>>
Fabric动态增加组织【资料】
查看>>
maven
查看>>
web服务器之mongoose:数据结构
查看>>
Ionic简介和环境安装
查看>>
进程控制
查看>>
背水一战 Windows 10 (58) - 控件(集合类): ListViewBase - ListView, GridView
查看>>