【消息队列】聊一下生产者消息发送流程

消息发送流程


1.生产者main线程调用send发送消息,先走拦截器,然后会将消息进行序列化,然后选择对应的分区器,将消息发送到RecordAccumulator中,默认是32m
2.Sender线程会异步读取,要不数据达到batch的大小 进行数据拉取,要么数据达到linger的时间,读取数据之后通过网络进行将数据发送到Kafka集群。如果出现失败在进行重试,直到Broker返回ACK确认信息。
其中设计到的有main线程、Sender线程、拦截器、序列化器、分区器,以及一个双端内存队列。

生产者重要参数

  • bootstrap.servers 生产者连接集群所需的broker地址清单
  • key.serializer和value.serializer 指定发送消息的key和value的序列化类型。一定要写全类名。
  • buffer.memory RecordAccumulator缓冲区总大小,默认32m
  • batch.size 缓冲区一批数据最大值,默认16k
  • linger.ms 如果数据未达到batch.size ,sender等待linger.ms 之后发送数据,默认是0
  • acks 0 不需要数据落盘应答,1leader收到数据应答, -1(ALL)leader+isr队列所有阶段收到数据应答,默认值-1
  • max.in.flight.requests.per.connection 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
  • retries:消息发送出现错误的时候,重新发送,默认是int最大值,如果有重试,还想保证消息顺序性,MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
  • retry.backoff.ms 两次重试之间的时间间隔,默认100ms
  • enable.idempotence 是否开启幂等,默认true,开启幂等
  • compression.type 压缩方式,默认值none,支持gzip,snappy等格式,主要通过压缩来提高生产者发送消息的性能,空间换时间。

本文链接:https://my.lmcjl.com/post/4426.html

展开阅读全文

4 评论

留下您的评论.