线程池调用kafka发送消息产生的内存泄漏问题排查解决
布控预警的需求
在布控预警的需求实现里,我需要把持久化在数据库中的布控对象(身份证、姓名、手机号、imsi、faceId等等都可分别作为布控对象)一直往kafka里发送,然后由flink进行消费,把当前的布控对象和存储的用户轨迹记录(有旅客、航班、车辆卡口、人脸等等数据)同时包含身份证、姓名等等信息进行比对,如果比对成功则触发布控预警
最开始我是通过单线程发送,发现发送速度有点慢,这里面我需要把布控对象做一些处理(比如split,flat,fitler)后才发送到kafka,大概30w+的布控对象需要十多分钟那边才能预警到,于是开始进行性能优化,使用并行流进行处理,改变之后同样30w的数据这次只需要8秒左右就能发送完成,然而跑了一阵之后程序开始出现报错
Received invalid metadata error in produce request on partition xxxTopic due to org.apache.kafka.common.errors.
NetworkException: The server disconnected before a response was received.. Going to request metadata update now
报错分析
第一眼看到这个错,我以为是kafka或者网络出问题了,也去看了broker的日志,发现是有一些关于当前topic被删除的错误,但是日志级别只是一个info,我以为是topic出现错误,又尝试了使用kakfa的消费命令,发现是能够正常消费的,说明这个topic和metadata应该是没有问题的。
我也在网上搜索着,发现很少有这个问题的说明,但是我发现了一些特征,经过我重启后程序又是可以继续发送的,而且速度还是很快,但是跑了一会儿,开始又有这个报错了,开始是一个报错,其他的都成功,后来是慢慢的,报错越来越多,成功的越来越少。接着过了很久竟然还发生了nacos的心跳超时导致服务不可用的情况。
我这时想到了之前看过的一些文章,说频繁fullgc可能会导致心跳请求失败的问题,最开始的时候看了cpu使用率,发现非常高,600%左右,我以为是因为我的程序里面因为是有定时任务去循环发送消息所有有点占用是正常的,没有当回事,后来我用起了arthas和gc命令,先使用dashboard,看了下cpu线程,发现gc线程占用以及那个并行流forkjoinpool的线程占用非常大的cpu利用率,然后full gc次数非常多,又使用
jstat -gcutil $pid 1000
命令进行确认,开始我就发现了old区的占用比非常高,且fullgc的频率非常高,几乎是几秒钟就有一次,最重要的是,最开始是能回收一些内存,随着时间的推移,old区占用比基线一直在增长,最后到了100,在这个过程中我也发现了,出现报错的频率和old区占用比之间是存在关系的,报错越来越多的时候,old区被占用的越多,fullgc越频繁。
内存泄漏
这里我才意识到这是出现内存泄漏的问题了吧,可能是我写的代码有点问题,于是我继续开始排查,通过
jmap dump:format=b,file=xx.hprof $pid
然后导出文件到本地,用java visualvm打开(也可以用mat或者idea的profiler进行解析,我这里用visualvm解析类实例的还报了堆内存不足,要改一下/lib/visualvm/etc/visualvm.conf,把-xms改大点)
查看了一下占用最大的几个对象ObjectiveVO(这个是我推送到kafka的封装对象)、LogUtil.$$(这个是我发送消息的工具类)、java.util.concurrent.LinkedBlockingDeque$NODE,还有char[]、String这几个占用是比较大的,看到这里我逐渐清晰了,在logUtil里面的代码我是从其他需要发送日志的地方拷过来的,在这里我加了一个异步executor的操作,主要就是为了隔离业务和日志的发送或者发生异常的情况,然而,我把这个executor的线程池队列设置为了new LinkedBlockingDeque<>(),这个的队列的默认长度值为Integer.MAX,因为我们的系统属于to G系统,所以操作日志是比较少的,一直没有发现这个问题,而在这个布控发送Kafka的场景中,数据量非常大,8秒钟左右差不多就是30w的数据,为了快速实现布控我甚至在外层又加了一个并行流,然后这里logUtil里面又有一个异步把ObjectiveVO加入到队列,所以才导致的内存溢出了....
后面我把这个异步executor去掉了,就没有发生内存溢出的问题,也就没有发生上面报的NetworkException,或者如果要使用线程池异步的话,一定要设计一下队列的长度,以及拒绝策略!!!
以上就是线程池调用kafka发送消息产生的内存泄漏问题的详细内容,更多关于kafka发送消息内存泄漏的资料请关注脚本之家其它相关文章!
相关文章
Java多线程 Callable、Future 和FutureTask
这篇文章主要介绍Java多线程中的 Callable、Future 以及FutureTask,下面文章围绕Java多线程的相关资料展开全文详细内容,需要的朋友可以参考一下2021-10-10Springboot使用redis实现接口Api限流的示例代码
本文主要介绍了Springboot使用redis实现接口Api限流的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2022-07-07Java List的sort()方法改写compare()实现升序,降序,倒序的案例
这篇文章主要介绍了Java List的sort()方法改写compare()实现升序,降序,倒序的案例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2021-03-03
最新评论