6.ProducerConfig详解(上)

ParkJun 1年前 ⋅ 428 阅读

本文章中针对的是kafka-clients 1.1.0版本。 ProducerConfig 类在包org.apache.kafka.clients.producer中。

ProducerConfig各配置项

bootstrap.servers

重要性:高

类型:List

默认值:Collections.emptyList()

引导producer查找Kafka集群所有broker的引导服务地址列表。

顾名思义,该配置项是引导服务列表,即用于查找Kafka集群中所有broker的host:port列表,producer通过这些host:port与kafka集群建立连接。producer用该列表中的地址只是用于发现kafka集群中所有的服务broker,而在kafka集群中,broker可能是动态改变的。另外,Kafka机制中,可以通过某一个broker而查询到所有其他broker,所以在bootstrap.servers中,并不需要配置所有broker的host:port,理想情况下,只需要配置其中的某一个就可以了。但为了提升可用性,避免因该broker挂掉而导致无法查找,那么可以选择配置多个。

配置格式为:

host1:port1,host2:port2,...

metadata.max.age.ms

重要性:高

类型:Long

默认值:300000毫秒,即5分钟

元数据最大生存时间,每隔metadata.max.age.ms时间,producer客户端会强制刷新一遍元数据metadata,即使没有任何partition leadership主动发现新的broker或者新的partition。

元数据

元数据类org.apache.kafka.clients#Metadata中,除了记录一些和自身更新策略有关的信息(metadata的更新策略值得另开一篇文章分析)。还保存了kafka集群的一些信息,参见org.apache.kafka.common#Cluster类:

集群中所有结点broker node列表,Node结点中记录了结点的ip、port以及机架信息(rack)。

机架信息(rack):broker的机架信息,类似于Hadoop那样,可以更好地利用局部性原 

理减少集群中网络开销。如果指定了机架信息(brooker.rack), Kafka在为分区做副 

本分配时就会考虑这部分信息,尽可能地为副本挑选不同机架的broker。

集群中每一个TopicPartition,对应的分区信息PartitionInfo。org.apache.kafka.common#PartitionInfo中主要记录了如下信息:

分区所属的topic。

分区partition编号。

分区的leader所在结点。

分区副本结点列表。

分区副本同步结点队列(ISR)。

离线副本结点队列。

集群中的控制结点信息。

控制结点broker:负责管理整个集群中分区和副本的状态,比如partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作 集群中每个topic对应的所有分区列表,相当于以topic作为索引。 集群中每个topic对应的可用分区列表。 集群中每个结点broker node对应的所有分区列表,相当于以broker.id作为索引。 集群中每个结点ID(broker.id)对应的结点信息。

batch.size

重要性:高

类型:Long

默认值:16384字节,即16K

消息记录batch(批)大小控制。kafka producer在将消息记录record发送到集群时,会尝试将一批要发送到相同partition的消息记录压缩在一起,称之为batch(批)。每次request,其实不是发送一个record,而是发送若干个batch,而每个batch里面可能包含多个record。这样成批成批的发送,减少了网络请求,有助于提升producer客户端和kafka集群服务的性能。

batch.size就是用来设置一个batch的最大字节数byte。当设置为0时,表示完全禁用batch的功能。如果batch.size设置过大,那么可能造成内存浪费,因为每个发送到不同partition的record都需要预先分配一块batch.size大小的内存。

acks

重要性:高

类型:String

默认值:"1"

应答数设置。producer只有接收到来自server的acks指定数量的应答,才会认为发送给server的消息记录已送达。该配置项用于控制已发送消息记录的持久性,有以下几种设置值:

acks = 0:表示producer无需等待server端的应答消息,producer将record扔到发送缓冲区,就认为该record已经发送,然后转身走人。这种情况无法保证server端真的成功接收到该消息记录,且此时即使retries配置项也无法生效,因为producer无法知道是否失败。另外,每个record返回的offset都被设为-1。

acks = 1:表示接收该消息记录的分区leader将消息记录写入到本地log文件,就返回Acknowledgement,告知producer本次发送已完成,而无需等待其他follower分区的确认。这种情况下,可能出现消息记录没有备份的情况(follower宕机等)。

acks = all:表示消息记录只有得到分区leader以及其他分区副本同步结点队列(ISR)中的分区follower的确认之后,才能回复acknowlegement,告知producer本次发送已完成。这种情况下,只要分区副本同步结点队列(ISR)中某一个follower存活,那么消息记录就不会被丢失。这种方式最安全,但效率也最低。

acks = -1:等同于acks = all。

linger.ms

重要性:中

类型:Long

默认值:0毫秒,表示无延时,立即发送。

延迟发送消息记录的时间,上面及前面文章中也已经提到过,producer在发送消息记录record的时候,会将发送到同一个partition的records压缩在batch中。但通常这只发生在records到达速度快于records发送速度的情况下,很容易理解:如果发送速度大于record到达速度,则每来一个record都会被立即发送出去,根本不存在将多个records压缩为一个的可能。

但很多时候,即便是发送速度大于到达速度,我们也不希望每个record就发送一次,还是希望分批次发送,以减少发送次数,提升producer客户端和服务器端的性能。为此,我们需要人为地加一个发送延迟控制,即每次发送之间,存在一定的时间间隔linger.ms,在这段时间内,可能有多个records到达,此时就可以对他们分组压缩,成批次发送。这类似于TCP的拥塞控制方法。

注意:

linger.ms设置了发送延迟的最高时间上限,另一个配置项batch.size也同时控制着发送的时机。如果为某个partition压缩的batch字节数已经达到了batch.size设置的字节数,那么该batch将被立即发送到指定的partition,即使此时延迟时间还没达到linger.ms的设置。 同样的,如果延迟的时间已经达到了linger.ms的设置,那么即使压缩累积的batch没有达到batch.size设置的字节数,也会被发送到指定的partition。 linger.ms是针对每一个发送到partition的request。即不同partition的request并不是同时发送的。 延迟以为这性能降低,需要在延迟和性能之间进行平衡,找到一个合适的linger.ms值。

client.id

重要程度:中

类型:String

默认值:""

producer 客户端ID,在创建request时,会传送到kafka服务。其目的是为了跟踪记录请求的来源,虽然服务端可以通过ip/port来追踪请求的来源,但ip/port无法表达业务语义,所以,可以通过client.id来设置一个富有业务逻辑语义的名字(如PDK游戏),有助于后续的分析和记录。

send.buffer.bytes

重要程度:中

类型:int

默认值:131072字节,即128K。

TCP发送缓冲区(SO_SNDBUF)的大小,若send.buffer.bytes设为-1,则使用操作系统的默认值。

receive.buffer.bytes

重要程度:中

类型:int

默认值:32768字节,即32K。

TCP接收缓冲区(SO_RCVBUF)大小,当receive.buffer.bytes设置为-1,则使用操作系统默认的大小。

max.request.size

重要程度:中

类型:String

默认值:1048576字节,即1M。

一个请求request中最大字节数,用于控制producer发送的单个请求request中,record batches的最大数量,以避免单个请求数据过于巨大。

max.request.size & batch.size

一个请求request中,可能包含多个record batch。

max.request.size可能影响record batch的大小上限,即当batch.size 大于 max.request.size时,batch的上限就变成了 max.request.size设置的大小。

reconnect.backoff.ms

重要性:低 类型:Long 默认值:50毫秒。

重连间隔时间,避免producer客户端过于紧密循环地重连kafka服务broker。该值针对的是所有client到broker的连接。

reconnect.backoff.max.ms

重要性:低

类型:Long

默认值:1000毫秒

producer客户端连接一个kafka服务(broker)失败重连的总时间,每次连接失败,重连时间都会指数级增加,每次增加的时间会存在20%的随机抖动,以避免连接风暴。

连接风暴 应用启动的时候,经常可能发生各应用服务器的连接数异常飙升的情况。假设连接数的设置为:min值3,max值10,正常的业务使用连接数在5个左右,当重启应用时,各应用连接数可能会飙升到10个,瞬间甚至还有可能部分应用会报取不到连接。启动完成后接下来的时间内,连接开始慢慢返回到业务的正常值。这就是所谓的连接风暴。

max.block.ms

重要性:低 类型:Long 默认值:1000毫秒

该配置值控制着KafkaProducer.send()函数以及KafkaProducer.partitionsFor()函数将阻塞的最大时间。另外当发送缓冲区满或者metadata不可用时,这两个方法也会被阻塞。如果阻塞发生在用户提供的自定义序列化类serializers或者是自定义的分区类partitioner,那么这些阻塞的时间不会被计算在该配置值之类。

ProducerConfig下半部分

本文归作者所有,未经作者允许,不得转载