kafka生产者发送消息流程深入分析讲解

 更新时间:2023年03月30日 09:17:06   作者:william_cr7  
本文将介绍kafka的一条消息的发送流程,从消息的发送到服务端的存储。上文说到kafak分为客户端与服务端,要发送消息就涉及到了网络通讯,kafka采用TCP协议进行客户端与服务端的通讯协议

消息发送过程

消息的发送可能会经过拦截器、序列化、分区器等过程。消息发送的主要涉及两个线程,分别为main线程和sender线程。

如图所示,主线程由 afkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator (也称为消息收集器)中。 Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka中。

拦截器

在消息序列化之前会经过消息拦截器,自定义拦截器需要实现ProducerInterceptor接口,接口主要有两个方案#onSend和#onAcknowledgement,在消息发送之前会调用前者方法,可以在发送之前假如处理逻辑,比如计费。在收到服务端ack响应后会触发后者方法。需要注意的是拦截器中不要加入过多的复杂业务逻辑,以免影响发送效率。

消息分区

消息ProducerRecord会将消息路由到那个分区中,分两种情况:

1.指定了partition字段

如果消息ProducerRecord中指定了 partition字段,那么就不需要走分区器,直接发往指定得partition分区中。

2.没有指定partition,但自定义了分区器

3.没指定parittion,也没有自定义分区器,但key不为空

4.没指定parittion,也没有自定义分区器,key也为空

看源码

// KafkaProducer#partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//指定分区partition则直接返回,否则走分区器
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(),                 serializedValue, cluster);
}
//DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

partition 方法中定义了分区分配逻辑 如果 ke 不为 null , 那 么默认的分区器会对 key 进行哈 希(采 MurmurHash2 算法 ,具备高运算性能及 低碰 撞率),最终根据得到 哈希值来 算分区号, 有相同 key 的消息会被写入同一个分区 如果 key null ,那么消息将会以轮询的方式发往主题内的各个可用分区。

消息累加器

分区确定好了之后,消息并不是直接发送给broker,因为一个个发送网络消耗太大,而是先缓存到消息累加器RecordAccumulator,RecordAccumulator主要用来缓存消息 Sender 线程可以批量发送,进 减少网络传输 的资源消耗以提升性能 RecordAccumulator 缓存的大 小可以通过生产者客户端参数 buffer memory 配置,默认值为 33554432B ,即 32MB如果生产者发送消息的速度超过发 送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer的send()方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max block ms 的配置,此参数的默认值为 60秒。

消息累加器本质上是个ConcurrentMap,

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

发送流程源码分析

//KafkaProducer
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
	// intercept the record, which can be potentially modified; this method does not throw exceptions
    //首先执行拦截器链
	ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
	return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
	try {
		throwIfProducerClosed();
		// first make sure the metadata for the topic is available
		long nowMs = time.milliseconds();
		ClusterAndWaitTime clusterAndWaitTime;
		try {
			clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
		} catch (KafkaException e) {
			if (metadata.isClosed())
				throw new KafkaException("Producer closed while send in progress", e);
			throw e;
		}
		nowMs += clusterAndWaitTime.waitedOnMetadataMs;
		long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
		Cluster cluster = clusterAndWaitTime.cluster;
		byte[] serializedKey;
		try {
			//key序列化
			serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
		} catch (ClassCastException cce) {
			throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
					" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
					" specified in key.serializer", cce);
		}
		byte[] serializedValue;
		try {
			//value序列化
			serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
		} catch (ClassCastException cce) {
			throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
					" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
					" specified in value.serializer", cce);
		}
		//获取分区partition
		int partition = partition(record, serializedKey, serializedValue, cluster);
		tp = new TopicPartition(record.topic(), partition);
		setReadOnly(record.headers());
		Header[] headers = record.headers().toArray();
		//消息压缩
		int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
				compressionType, serializedKey, serializedValue, headers);
		//判断消息是否超过最大允许大小,消息缓存空间是否已满
		ensureValidRecordSize(serializedSize);
		long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
		if (log.isTraceEnabled()) {
			log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
		}
		// producer callback will make sure to call both 'callback' and interceptor callback
		Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
		if (transactionManager != null && transactionManager.isTransactional()) {
			transactionManager.failIfNotReadyForSend();
		}
		//将消息缓存在消息累加器RecordAccumulator中
		RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
				serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
        //开辟新的ProducerBatch
		if (result.abortForNewBatch) {
			int prevPartition = partition;
			partitioner.onNewBatch(record.topic(), cluster, prevPartition);
			partition = partition(record, serializedKey, serializedValue, cluster);
			tp = new TopicPartition(record.topic(), partition);
			if (log.isTraceEnabled()) {
				log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
			}
			// producer callback will make sure to call both 'callback' and interceptor callback
			interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
			result = accumulator.append(tp, timestamp, serializedKey,
				serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
		}
		if (transactionManager != null && transactionManager.isTransactional())
			transactionManager.maybeAddPartitionToTransaction(tp);
		//判断消息是否已满,唤醒sender线程进行发送消息
		if (result.batchIsFull || result.newBatchCreated) {
			log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
			this.sender.wakeup();
		}
		return result.future;
		// handling exceptions and record the errors;
		// for API exceptions return them in the future,
		// for other exceptions throw directly
	} catch (Exception e) {
		// we notify interceptor about all exceptions, since onSend is called before anything else in this method
		this.interceptors.onSendError(record, tp, e);
		throw e;
	}
}

生产消息的可靠性

消息发送到broker,什么情况下生产者才确定消息写入成功了呢?ack是生产者一个重要的参数,它有三个值,ack=1表示leader副本写入成功服务端即可返回给生产者,是吞吐量和消息可靠性的平衡方案;ack=0表示生产者发送消息之后不需要等服务端响应,这种消息丢失风险最大;ack=-1表示生产者需要等等ISR中所有副本写入成功后才能收到响应,这种消息可靠性最高但吞吐量也是最小的。

到此这篇关于kafka生产者发送消息流程深入分析讲解的文章就介绍到这了,更多相关kafka发送消息流程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java的接口解耦方式

    java的接口解耦方式

    这篇文章主要介绍了java的接口解耦方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • 基于JWT实现SSO单点登录流程图解

    基于JWT实现SSO单点登录流程图解

    这篇文章主要介绍了基于JWT实现SSO单点登录流程图解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Java字符串查找的三种方式

    Java字符串查找的三种方式

    本篇文章给大家整理了关于Java字符串查找的三种方式,并把其中需要留意的地方做了标注,一起参考学习下。
    2018-03-03
  • 深入了解java中的逃逸分析

    深入了解java中的逃逸分析

    这篇文章主要介绍了深入了解java中的逃逸分析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • Java高效映射工具MapStruct的使用示例

    Java高效映射工具MapStruct的使用示例

    MapStruct 是一个 Java 注解处理器,用于在不同 Java Beans 或数据传输对象(DTOs)之间自动生成类型安全的映射代码,这是一个编译时映射框架,意味着它利用注解在编译时生成代码,本文将给大家介绍一下Java注解处理器MapStruct的使用示例,需要的朋友可以参考下
    2023-12-12
  • 深度deepin安装以及jdk、tomcat、Nginx安装教程

    深度deepin安装以及jdk、tomcat、Nginx安装教程

    这篇文章主要给大家介绍了关于深度deepin安装以及jdk、tomcat、Nginx安装的相关资料,按照文中介绍的方法可以轻松的实现安装,对大家的工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2024-01-01
  • Java公平锁与非公平锁的核心原理讲解

    Java公平锁与非公平锁的核心原理讲解

    从公平的角度来说,Java 中的锁总共可分为两类:公平锁和非公平锁。但公平锁和非公平锁有哪些区别?核心原理是什么?本文就来和大家详细聊聊
    2022-11-11
  • Nacos简介最新收藏版

    Nacos简介最新收藏版

    Nacos 是阿里巴巴推出来的一个新开源项目,这是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台,Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台,对Nacos相关基本知识感兴趣的朋友一起看看吧
    2023-08-08
  • JMagick实现基本图像处理的类实例

    JMagick实现基本图像处理的类实例

    这篇文章主要介绍了JMagick实现基本图像处理的类,实例分析了java图像处理的相关技巧,需要的朋友可以参考下
    2015-06-06
  • Springboot为什么加载不上application.yml的配置文件

    Springboot为什么加载不上application.yml的配置文件

    这篇文章主要介绍了Springboot为什么加载不上application.yml的配置文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-10-10

最新评论