消息发送流程
1.生产者main线程调用send发送消息,先走拦截器,然后会将消息进行序列化,然后选择对应的分区器,将消息发送到RecordAccumulator中,默认是32m
2.Sender线程会异步读取,要不数据达到batch的大小 进行数据拉取,要么数据达到linger的时间,读取数据之后通过网络进行将数据发送到Kafka集群。如果出现失败在进行重试,直到Broker返回ACK确认信息。
其中设计到的有main线程、Sender线程、拦截器、序列化器、分区器,以及一个双端内存队列。
生产者重要参数
- bootstrap.servers 生产者连接集群所需的broker地址清单
- key.serializer和value.serializer 指定发送消息的key和value的序列化类型。一定要写全类名。
- buffer.memory RecordAccumulator缓冲区总大小,默认32m
- batch.size 缓冲区一批数据最大值,默认16k
- linger.ms 如果数据未达到batch.size ,sender等待linger.ms 之后发送数据,默认是0
- acks 0 不需要数据落盘应答,1leader收到数据应答, -1(ALL)leader+isr队列所有阶段收到数据应答,默认值-1
- max.in.flight.requests.per.connection 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
- retries:消息发送出现错误的时候,重新发送,默认是int最大值,如果有重试,还想保证消息顺序性,MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
- retry.backoff.ms 两次重试之间的时间间隔,默认100ms
- enable.idempotence 是否开启幂等,默认true,开启幂等
- compression.type 压缩方式,默认值none,支持gzip,snappy等格式,主要通过压缩来提高生产者发送消息的性能,空间换时间。
本文链接:https://my.lmcjl.com/post/4426.html
展开阅读全文
4 评论