通过源码分析Producer性能瓶颈 | 专程
來源: 专程
0 2019/07/11

本文,kafka源码是以0.8.2.2,虽然版本相对比较老,但是阅读还是很有必要的。主要是java的kafka生产者源码,Broker接收到producer请求之后处理的相关源码。估计源码内容是比较多的,只给出大致逻辑,主类和函数名称。本文的目的是让大家,彻底了解发送消息到kafka的过程及如何对producer进行调优。没耐心的小伙伴底部总结可以直接阅读。

一. kafka的producer基本介绍及主要类

1. 基本介绍

Kafka的Producer,主要负责将消息发送给kafka集群。主要核心特性有两点:

1),异步 or 同步。

可以通过配置producer.type(1或async和2或sync)

设置为异步的话,其实就是启动了一个后台线程,负责从队列里面取数据发送给kafka,我们的主程序负责往队列了写数据。支持批发送。

2),消息的key决定分区。

可以采用分区器,来决定我们的消息发往哪个partition。实现Partitioner可以实现自定义分区器。默认的分区策略是对key取hash值,然后对总的分区数取余,得到的余数作为我们发送消息到哪个分区的依据。

2,主要类

2.1 Producer

kafka.producer.Producer该类异常重要,负责对DefaultEventHandler进行初始化并且在此过程也初始化真正的发送者池ProducerPool。

异步发送消息的策略情况下会对初始化我们后台发送线程。

2.2 DefaultEventHandler

该类主要是为消息发送做准备,比如更新broker信息,找到分区的leader等,最终通过SyncProducer将消息发送给Broker。

2.3 ProducerSendThread

这个是异步发送情况下才会有的,一个后台发送线程。这种模式下,生产端实际是形成了一个生产消费模型,用户调用Producer.send实际是将消息添加到了一个消息队列里面LinkedBlockingQueue,然后由ProducerSendThread的processEvents,批量处理后交给了DefaultEventHandler。

2.4 ProducerPool

维护了Broker数目个SyncProducer,会在每次发送消息前更新。

2.5 SyncProducer

每个SyncProducer对象,包含了一个到一个broker链接,我们通过获取该对象来发消息到特定的broker。

2.6 BrokerPartitionInfo

该类主要是在获取topic元数据信息的时候会用到。在更新元数据的时候会同时更新我们ProducerPool,避免Borker挂掉之后SyncProducer对象不可用。

2.7 Partitioner

主要是分区器。对我们的消息按照key进行分区。

2.8 KafkaApis

Kafka的各种请求的逻辑处理类。

2.9 Processor

负责应答请求。

二,源码讲解

producer与Broker通信骨干

640?wx_fmt=png

由上图可以看出,生产者于消费者通讯的骨干分两类:

1.TopicMetadataRequest

RequestId:RequestKeys.MetadataKey。主要是负责请求topic的元数据,更新Producer里面syncProducers链接信息,也获取了topic的具体信息。

在DefaultEventHandler的Handle方法里,开启了元数据的更新

brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)def updateInfo(topics: Set[String], correlationId: Int) {var topicsMetadata: Seq[TopicMetadata] = Nilval topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)

在ClientUtils.fetchTopicMetadata方法里,会根据我们给定的broker列表,随机选出一个创建一个SyncProducer,去获取元数据。获取到就会推出(一台Broker会包含所有的元数据)。

  1. val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
  2. var topicMetadataResponse: TopicMetadataResponse = null
  3. var t: Throwable = null
  4. // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the
  5. // same broker
  6. val shuffledBrokers = Random.shuffle(brokers)
  7. while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
  8. val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
  9. info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
  10. try {
  11. topicMetadataResponse = producer.send(topicMetadataRequest)
  12. fetchMetaDataSucceeded = true
  13. }
  14. catch {
  15. case e: Throwable =>
  16. warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
  17. .format(correlationId, topics, shuffledBrokers(i).toString), e)
  18. t = e
  19. } finally {
  20. i = i + 1
  21. producer.close()
  22. }
  23. }
  24. ducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))

TopicMetadataRequest被KafkaApis识别为获取topic元数据的请求,最终调用的处理函数是handleTopicMetadataRequest(request)。

2,ProducerRequest

RequestId:RequestKeys.ProduceKey。被KafkaApis识别为消息发送请求。会调用的处理函数handleProducerOrOffsetCommitRequest(request)。

具体过程如下:

640?wx_fmt=png

根据上图可知,异步通讯这种方式其实涵盖了,同步通讯。所以,这块源码呢,我们主要关注异步通讯。

1),初始化

前面已经说过了在构建kafka.producer.Producer对象的时候会初始化ProducerPool和DefaultEventHandler。

  1. //构建DefaultEventHandler的时候会通过反射构建分区器,和key-value的序列化方式
  2. def this(config: ProducerConfig) =
  3. this(config,
  4. new DefaultEventHandler[K,V](config,
  5. Utils.createObject[Partitioner](config.partitionerClass, config.props),
  6. Utils.createObject[Encoder[V]](config.serializerClass, config.props),
  7.        Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
  8.        new ProducerPool(config)))

假如为异步消息发送也会构建一个后台发送线程

config.producerType match {  case "sync" =>  case "async" =>    sync = false    producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,                 queue,                 eventHandler,                 config.queueBufferingMaxMs,                 config.batchNumMessages,                 config.clientId)    producerSendThread.start()}

2),具体数据发送过程

A),消息生产到消息队列

producer.send(new KeyedMessage<IntegerString>(topic, messageStr));

调用kafka.producer.Producer对象asyncSend将消息存储到消息队列,自己的。

  1. asyncSend(messages: Seq[KeyedMessage[K,V]])
  2. LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

B),后台线程从消息队列取出消息

此过程,牵涉到发送调优的一个策略:

(1)满足消息发送批大小发送

(2)消息等待超时也会将消息发送

首先是构建清空队列的流

  1. Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
  2. .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach

然后里面会进行判断

  1. // check if the batch size is reached
  2. full = events.size >= batchSize
  3. if(full || expired) {
  4. if(expired)
  5. debug(elapsed + " ms elapsed. Queue time reached. Sending..")
  6. if(full)
  7. debug("Batch full. Sending..")
  8. // if either queue time has reached or batch size has reached, dispatch to event handler
  9. tryToHandle(events)
  10. lastSend = SystemTime.milliseconds
  11. events = new ArrayBuffer[KeyedMessage[K,V]]
  12. }

C),DefaultEventHandler消息发送前的准备

在handle方法里,首先是会判断是否满足topic元数据更新的时间间隔,间隔设置方式:

topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)

满足则进行topic元数据更新

然后呢?获取leader,按照key进行分区,构建发送的消息块,发送消息。具体由DefaultEventHandler的dispatchSerializedData方法完成

首先,用DefaultEventHandler的partitionAndCollate方法获取

Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]

其中,其中map的key是分区leader的brokerId

Map的Value,又是一个map,key是TopicAndPartition,value是该分区的具体消息。

接着,正式进入消息封装与发送

  1. val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
  2. val failedTopicPartitions = send(brokerid, messageSetPerBroker)

在send方法中构建了

  1. val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
  2. config.requestTimeoutMs, messagesPerTopic)

从Producer池中获取一个SyncProducer,发送消息

val response = syncProducer.send(producerRequest)

Broker接受到消息根据requestId,进行逻辑处理,最终是交给了KafkaApis的方法

case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)

三,总结

由于kafka在我们整个生产环境系统中的重要性,主要体现在,kafka集群一旦垮了,会导致真个业务系统断了,系统瘫痪。所以,保证kafka集群的存活,高效运转,是我们大数据工作者一个重要责任。这次主要是从生产者的角度强调我们的优化过程。后面会陆续出文章讲多种消费者角色的源码和优化策略和kafka本身的优化策略。

1,采用异步发送策略

异步策略,增加了一个后台发送线程,增加并发度。

异步策略支持批量发送和超时发送,提升了性能。

2,设置合理的批大小和超时时间(异步处理情况)

配置

默认

作用

queue.buffering.max.ms

5000

异步发送消息超时发送时间

batch.num.messages

200

异步消息批量发送的阈值


3,设置合适的kafka分区数

分区数一方面决定了我们写消息的并发度,由此也影响着吞吐量。也决定后端处理线程的并发度。

分区并发度决定因素:数据量,后端业务处理复杂度,磁盘数量。

并不是分区数越多就越好,磁盘竞争也很影响性能的。

4,尽量使数据均匀分布

重要等级高,可以使我们后端处理线程负载均匀。

1),key随机或者轮训分区进行发送

2),自定义分区策略

5,如何保证消息顺序性

将需要保证顺序的消息,采用同步的方式发送发送到同一个分区里。

6,高级优化策略

自己使用kafka client的api实现自己的生产者,减少中间环节,尤其针对生产者跟kafka集群在同一台主机的时候,我们可以只发送数据到当前的主机的分区,减少了流量跨主机传输,节省带宽。

下一篇: Spark Adaptive Execution调研 | 专程
上一篇: 如何写出无法维护的代码 | 专程