kafka 0.10 版本探索 与spark streaming 2.x 整合
kafka 0.10 版本探索
首先,博主之前用的spark-streaming-kafka 1.6 scala 2.10的包,当时的kafka两种连接方式 高层封装1.基于Receiver的方式
简单介绍一下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。 俗称WAL ,但是个人认为这是多余且不必要的操作 kafka以及做了很多的高可用,数据备份机制。那消费数据为何还需要开启WAL?需要注意的要点
1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
4、receiver是额外启动 相对消耗资源,总的来说 优点不多。
2.基于Direct的方式
用底层API直接连kafka spark partition与kafka 一一对应 增加并行度。1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
3、一次且仅一次的事务机制: 基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
kafka 数据高可用 数据高可靠性 无丢失配置 ack=-1 消费采取direct 直连 消费数据做成事物 spark 自己维护偏移量信息
之前博主采用的直连方式是写一个org.apache.spark.streaming.kafka 包下工具类 实例化KafkaCluster 然后自己封装增强 包装设计模式,后来kafka升级到0.10
kafka新版生产者api以及消费者API
个人觉得主要比较好的是消费者的修改,在之前版本kafka将kafka偏移量保存到ZK 这时候如果消费者如果进行消费的时候,消费者与kafka可以是同步的,但是
kafka还要与zk进行交互 ,这样就导致了可以数据消费的不一致,现在kafka新版消费者,将偏移量自己维护这样增强了数据消费的一致性,并且开放了kafkacluster的权限,
推荐了direct 直连底层api 的方式,保证了消费者消费数据的可靠性。这里生产者api Demo JAVA版
生产者Demo
package com.wxstc.dl;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Utils;
import java.util.Properties;
import java.util.concurrent.Future;
public class ProducerDemo implements Runnable{
private final KafkaProducer<String, String> producer;
private final String topic;
public ProducerDemo(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "AnserInsight01:9092,AnserInsight02:9092,AnserInsight03:9092");//连接kafka集群 broker 列表
// props.put("bootstrap.servers", "192.168.0.16:9092");
/**
* 此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。 这个参数是为了保证发送请求的可靠性。以下配置方式是允许的:
acks=0 如果设置为0,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。
acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。
acks=all 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1与acks=all是等效的。
all 与 -1 等效的
*/
props.put("acks", "all");
/**
* 若设置大于0的值,则客户端会将发送失败的记录重新发送,尽管这些记录有可能是暂时性的错误。请注意,这种 retry 与客户端收到错误信息之后重新发送记录并无区别。允许 retries 并且没有设置max.in.flight.requests.per.connection 为1时,记录的顺序可能会被改变。比如:当两个批次都被发送到同一个 partition ,第一个批次发生错误并发生 retries 而第二个批次已经成功,则第二个批次的记录就会先于第一个批次出现。
*/
props.put("retries", 0);
/**
* 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。
当记录的大小超过了配置的字节数, Producer 将不再尝试往批次增加记录。
发送到 broker 的请求会包含多个批次的数据,每个批次对应一个 partition 的可用数据
小的 batch.size 将减少批处理,并且可能会降低吞吐量(如果 batch.size = 0的话将完全禁用批处理)。 很大的 batch.size 可能造成内存浪费,因为我们一般会在 batch.size 的基础上分配一部分缓存以应付额外的记录。
*/
props.put("batch.size", 16384);
/**
* producer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。通常只有当记录到达的速度超过了发送的速度时才会出现这种情况。然而,在某些场景下,即使处于可接受的负载下,客户端也希望能减少请求的数量。这个设置是通过添加少量的人为延迟来实现的&mdash;即,与其立即发送记录, producer 将等待给定的延迟时间,以便将在等待过程中到达的其他记录能合并到本批次的处理中。这可以认为是与 TCP 中的 Nagle 算法类似。这个设置为批处理的延迟提供了上限:一旦我们接受到记录超过了分区的 batch.size ,Producer 会忽略这个参数,立刻发送数据。但是如果累积的字节数少于 batch.size ,那么我们将在指定的时间内“逗留”(linger),以等待更多的记录出现。这个设置默认为0(即没有延迟)。例如:如果设置linger.ms=5 ,则发送的请求会减少并降低部分负载,但同时会增加5毫秒的延迟。
* 增加请求延迟 降低负载
*/
props.put("linger.ms", 1);
/**
* Producer 用来缓冲等待被发送到服务器的记录的总字节数。如果记录发送的速度比发送到服务器的速度快, Producer 就会阻塞,如果阻塞的时间超过 max.block.ms 配置的时长,则会抛出一个异常。
这个配置与 Producer 的可用总内存有一定的对应关系,但并不是完全等价的关系,因为 Producer 的可用内存并不是全部都用来缓存。一些额外的内存可能会用于压缩(如果启用了压缩),以及维护正在运行的请求。
*/
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
}
public void run() {
int messageNo = 1;
try {
for(;;) {
String messageStr="你好,这是第"+messageNo+"条数据";
System.out.println("成功发送了"+messageNo+"条");
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)).get();
// 程序阻塞,直到该条消息发送成功返回元数据信息或者报错
StringBuilder sb = new StringBuilder();
sb.append("record [").append(metadata.serializedKeySize()+":"+metadata.serializedValueSize()).append("] has been sent successfully!").append("\n")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(sb.toString());
//生产了10条就打印
// if(messageNo%100==0){
// //System.out.println("发送的信息:" + messageStr);
// }
//生产100条就退出
if(messageNo%10==0){
System.out.println("成功发送了"+messageNo+"条");
break;
}
messageNo++;
Utils.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String[] args) {
ProducerDemo pd = new ProducerDemo("tstmessage");
Thread t1 = new Thread(pd);
t1.start();
}
}
spark升级到2.10后 发现之前博主的工具包报错了, 竟然连KafkaCluster KafkaManager 全部都没了,讲道理 kafka升级真的是有点小变态啊。。。。 看来 kafka也是发现了Receiver的弊端,新的包内 直接封装直连方式 自己维护偏移量 并且spark2.0 全面加入了 sql操作也就是 Structured Streaming 与Structured Data 个人认为Structured Data 目前已经比较完善了。并且我们公司大部分业务都是给予Structured Data ,但是Structured Streaming 据我初步的使用情况,好像很多的东西并不完善, 就连 kafka的消费group id都无法手动配置,相关配置非常局限,看以后的更新迭代 发展,毕竟sql是最大生产力 大家懂的~~ 下面贴上spark streaming 与Structured Streaming案例
spark streaming 连接kafka 消费流程
package com.wxstc.dl.stream
//import com.wxstc.util.LoggerLevels
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object StreamingWordCount {
val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(n => (x, n)) }
}
//业务处理
def myDealService(rdd: RDD[ConsumerRecord[String, String]], offsetRanges: Array[OffsetRange]) = {
rdd.foreachPartition(iter => {
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"OffsetRange :${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
})
val rdd1 = rdd.map(record => {
(record.key(), record.value())
})
}
val myUpstate = (it: Iterator[(String, Seq[String], Option[String])]) =>{
it.map(x=>{
(x._1,x._3.getOrElse(""))
})
}
def main(args: Array[String]): Unit = {
// LoggerLevels.setStreamingLogLevels(Level.WARN)
val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("D:\\_vm\\ck")
if (args.length < 3) {
System.err.println(
s"""
|Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupid> is a consume group
|
""".stripMargin)
System.exit(1)
}
Logger.getLogger("org").setLevel(Level.WARN)
val Array(brokers, topics, groupId, zkQuorum) = args
val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers, //"115.159.93.95:9092,115.159.78.168:9092,115.159.222.161:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId, //"groupA",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//直连方式接入kafka数据流
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
//拿到数据流内需要的数据 key value
val ds2 = messages.map(rec=>{
(rec.key(),rec.value())
})
val ds3 = ds2.updateStateByKey(myUpstate,new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
/**
* 拿到数据流后对每个 微批次 也就是rdd进行业务流程操作 这里需要注意的偏移量的管理问题
* 1.通过kafka自动管理偏移量 (不推荐 不高可用,容易导致数据重复消费)
* 2.自己手动管理偏移量 (推荐,但是需要自己维护偏移量 可以保存zk,hdfs 多种途径)
* 3.自己手动提交偏移量 (推荐,可以保证数据消费可靠性,以及开发的简易型 )
* 这里我采用的是第三种 这里再次提醒一下消费者偏移量建议用kafka的方式 不要用老版zk的方式
*/
messages.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
myDealService(rdd,offsetRanges)
println("commit")
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
val ds1 = messages.map(record => (record.key, record.value))
ds1.foreachRDD(rdd => {
println(rdd.count())
})
// ds1.print(5)
ds1.print(5)
ssc.start()
ssc.awaitTermination()
}
}
Structured Streaming 连接kafka 消费流程
这里配置比较局限并且博主第一次接触 不是很了解就不加相关注释了
package com.wxstc.dl.stream
import org.apache.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.ProcessingTime
object SparkStructuredStreaming {
def main(args: Array[String]): Unit = {
val Array(brokers, topics, groupId, zkQuorum) = args
val spark = SparkSession.builder().config(new SparkConf().setMaster("local[*]"))//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("group.id",groupId)
.option("startingoffset", "earliest")//"smallest")
.option("subscribe", topics)
.load()
// df.createOrReplaceTempView("kafkaTable")
// val kafkaQuery = spark.sql(
// """
// |select * from kafkaTable
// """.stripMargin)
// .writeStream
// .format("console")
// .trigger(ProcessingTime(2000L))
// .start()
// kafkaQuery.awaitTermination()
val kafka = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
.writeStream
.format("console")
.trigger(ProcessingTime(2000L))
.start()
kafka.awaitTermination()
}
}
在调试集群的时候发现了一个问题目前还没找到问题的根本原因,首先博主将kafka绑定的是内网的端口,外网竟然可以访问~~ 好就算是我绑定错了 那是不是外网应该可以正常提供服务呢? 其实不然 根据调试 我用生产者从外网端口send message 的时候自动创建了topic好的 到这里都没问题 发送message的时候 竟然显示的是发送成功 没有抱任何的错误,kafka 集群也没有接收到消息, 这个问题就非常奇怪了,通过调试将jar提交到集群内跑是可以成功发送message的。