Kafka中的producer拦截器与consumer拦截器详解

 更新时间:2023年12月11日 09:49:18   作者:warybee  
这篇文章主要介绍了Kafka中的producer拦截器与consumer拦截器详解,Producer 的Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等,需要的朋友可以参考下

1. producer 拦截器(interceptor)

1.1 介绍

Producer 的Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。Producer允许指定多个Interceptor按照指定顺序作用于一条消 息从而形成一个拦截链(interceptor chain)。

自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,接口定义如下:

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable {
    
    /**
      该方法封装进KafkaProducer.send()方法中,方法会在消息发送之前被调用,用户可以在该方法中对消息做任
       何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
    */
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     该方法会在消息成功提交或发送失败之后被调用,
     KafkaProducer.send()异步发送有回调通知 callback, onAcknowledgement 的调用要早于 callback 的调用。
    */
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    /**
     关闭Interceptor,主要用于执行一些资源清理工作。
    */
    void close();
}
  • onSend() : 方法封装进KafkaProducer.send()方法中,方法会在消息发送之前被调用,用户可以在该方法中对消息做任
  • 何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算;
  • onAcknowledgement(): 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法。KafkaProducer.send()异步发送有回调通知 callback, onAcknowledgement 的调用要早于 callback 的调用。
  • close(): 关闭Interceptor,主要用于执行一些资源清理工作。

如果指定了多个Interceptor,则Producer将按照指定顺序调用它们,如果interceptor出现异常Producer仅仅是捕获每个 Interceptor抛出的异常记录到错误日志中而非在向上传递。

1.2 案例

实现两个拦截器(interceptor),组成拦截链。第一个拦截器在消息发送前,给消息添加header。第二个拦截器统计消息的发送成功数和失败数。

PS:其实这两个功能可以放在一个interceptor中,这里仅仅是为了演示多个interceptor。

定义拦截器

拦截器1:

public class MyInterceptor implements ProducerInterceptor<Integer,String> {
    /**
     * 消息发送之前调用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        //消息的主题
        String topic = record.topic();
        Integer partition = record.partition();
        Integer key = record.key();
        String value = record.value();
        Headers headers = record.headers();
        //给header添加时间戳
        String stamp = System.currentTimeMillis()+"";
        headers.add("timestamp",stamp.getBytes(StandardCharsets.UTF_8));
        ProducerRecord<Integer,String> resultRecord=
                  new ProducerRecord<>(topic,partition,key,value,headers);
        return resultRecord;
    }
    /**
     * 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

拦截器2:

public class MyInterceptor02 implements ProducerInterceptor<Integer,String> {
    private int errorNum=0;
    private int successNum=0;
    /**
     * 消息发送之前调用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        return record;
    }
    /**
     * 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
         if (exception==null){
             successNum++;
         }else {
            errorNum++;
         }
    }
    @Override
    public void close() {
        System.out.println("消息发送成功数: " + successNum);
        System.out.println("消息发送失败数: " + errorNum);
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

生产者

configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"拦截器类全路径");,多个拦截器使用,分割

public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置value的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        //添加拦截器
        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyInterceptor,com.warybee.interceptor.MyInterceptor02");
        KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs);
        //发送100条消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord<Integer,String> producerRecord=new ProducerRecord<>
                    (       "test_topic_1",
                            0,
                            i,
                            "test topic msg "+i);
            //消息的异步确认
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if (exception==null){
                        System.out.println("消息的主题:"+recordMetadata.topic());
                        System.out.println("消息的分区:"+recordMetadata.partition());
                        System.out.println("消息的偏移量:"+recordMetadata.offset());
                    }else {
                        System.out.println("发送消息异常");
                    }
                }
            });
        }
        // 关闭生产者
        kafkaProducer.close();
    }
}

2 Consumer拦截器(interceptor)

2.1.介绍

消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。

  • ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
  • 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
  • ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
  • 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
  • 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。

自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口,接口定义如下:

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    /**
        该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
        该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。
     */
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    /**
      当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
      调用者将忽略此方法抛出的任何异常。
     */
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    /**
     *  关闭Interceptor之前调用
     */
    void close();
}

方法说明:

  • onConsume 该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
  • onCommit 当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。

2.2 案例

定义拦截器

public class MyConsumerInterceptor implements ConsumerInterceptor<Integer,String> {
    @Override
    public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> records) {
        //在这里可以对接收到的消息进行修改
        //如不做处理,直接返回即可
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((tp,offsetAndMetadata) -> {
            System.out.println(tp+" : "+offsetAndMetadata.offset());
        });
    }
    @Override
    public void close() {
    }
    /**
     * 用于获取消费者的设置参数
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {
        configs.forEach((k, v) -> {
            System.out.println(k + "\t" + v);
        });
    }
}

消费者

在消费者客户端配置中增加如下配置

如果有多个拦截器,用,分割即可

configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
        //创建消费者对象
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
        List<String> topics = new ArrayList<>();
        topics.add("test_topic_1");
        //消费者订阅主题
        consumer.subscribe(topics);
        while (true){
            //批量拉取主题消息,每3秒拉取一次
            ConsumerRecords<Integer, String> records = consumer.poll(3000);
            //变量消息
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println(record.topic() + "\t"
                        + record.partition() + "\t"
                        + record.offset() + "\t"
                        + record.key() + "\t"
                        + record.value());
            }
        }
    }
}

到此这篇关于Kafka中的producer拦截器与consumer拦截器详解的文章就介绍到这了,更多相关producer拦截器与consumer拦截器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java OOM内存泄漏原因及解决方法

    java OOM内存泄漏原因及解决方法

    这篇文章主要介绍了java OOM内存泄漏原因及解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-08-08
  • Java在Excel中创建多级分组、折叠或展开分组的实现

    Java在Excel中创建多级分组、折叠或展开分组的实现

    这篇文章主要介绍了Java在Excel中创建多级分组、折叠或展开分组的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-05-05
  • Java基础之多线程的三种实现方式

    Java基础之多线程的三种实现方式

    这篇文章主要介绍了Java基础之多线程的三种实现方式,文中有非常详细的代码示例,对正在学习java基础的小伙伴们有非常好的帮助,需要的朋友可以参考下
    2021-04-04
  • Java中String类常用方法详细汇总

    Java中String类常用方法详细汇总

    Java中String类在所有项目开发里面一定会用到,因此String类提供了一系列的功能操作方法,下面这篇文章主要给大家介绍了关于Java中String类常用方法的相关资料,需要的朋友可以参考下
    2023-05-05
  • Java编写程序之输入一个数字实现该数字阶乘的计算

    Java编写程序之输入一个数字实现该数字阶乘的计算

    这篇文章主要介绍了Java编写程序之输入一个数字实现该数字阶乘的计算,本文通过实例代码给大家介绍的非常想详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • springboot bean扫描路径的实现

    springboot bean扫描路径的实现

    这篇文章主要介绍了springboot bean扫描路径的实现,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • Spring security自定义用户认证流程详解

    Spring security自定义用户认证流程详解

    这篇文章主要介绍了Spring security自定义用户认证流程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • 一文深入了解Java中的AtomicInteger类

    一文深入了解Java中的AtomicInteger类

    AtomicInteger是java并发包下面提供的原子类,主要操作的是int类型的整型,通过调用底层Unsafe的CAS等方法实现原子操作,这篇文章主要给大家介绍了关于如何通过一文深入了解Java中AtomicInteger类的相关资料,需要的朋友可以参考下
    2024-02-02
  • 使用代码生成器自定义Entity的部分注解

    使用代码生成器自定义Entity的部分注解

    这篇文章主要介绍了使用代码生成器自定义Entity的部分注解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-05-05
  • java微信支付接入流程详解

    java微信支付接入流程详解

    这篇文章主要为大家详细介绍了java微信支付接入流程,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-05-05

最新评论