Java Flink与kafka实现实时告警功能过程

 更新时间:2023年01月18日 09:45:29   作者:钦拆大仁  
这篇文章主要介绍了Java Flink与kafka实现实时告警功能,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

引出问题

项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据。这种方式存在告警的延时实在是太高了。数据从产生到保存,从保存到判断都会存在时间差,按照保存数据定时5分钟一次,定时任务5分钟一次。最高会产生10分钟的误差,这种告警就没什么意义了。

demo设计

为了简单的还原业务场景,做了简单的demo假设

实现一个对于学生成绩评价的实时处理程序

数学成绩,基准范围是90-140,超出告警

物理成绩,基准范围是60-95,超出告警

环境搭建

使用windows环境演示

准备工作

1、安装jdk

2、安装zookeeper

解压压缩包

zoo_sample.cfg将它重命名为zoo.cfg

修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data

配置环境变量

3、安装kafka

解压压缩包

修改config/server.properties

log.dirs=D://tools//kafka_2.11-2.1.0//log

flink程序代码

pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

主程序

public class StreamAlertDemo {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer);
		DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper());
		resultStream.print().setParallelism(4);
		resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
		env.execute();
	}
}
主程序,配置告警规则后期可以使用推送或者拉去方式获取数据
public class RuleMap {
	private RuleMap(){}
	public final static Map<String,List<AlertRule>> initialRuleMap;
	private static List<AlertRule> ruleList = new ArrayList<>();
	private static List<String> ruleStringList = new ArrayList<>(Arrays.asList(
			"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
			"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));
	static {
		for (String i : ruleStringList) {
			ruleList.add(JSON.parseObject(i, AlertRule.class));
		}
		initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
	}
}

AlertFlatMapper,处理告警逻辑

public class AlertFlatMapper implements FlatMapFunction<String, String> {
	@Override
	public void flatMap(String inVal, Collector<String> out) throws Exception {
		Achievement user = JSON.parseObject(inVal, Achievement.class);
		Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap;
		List<AlertInfo> resList = new ArrayList<>();
		List<AlertRule> mathRule = initialRuleMap.get("MathVal");
		for (AlertRule rule : mathRule) {
			if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal");
		for (AlertRule rule : physicsRule) {
			if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		String result = JSON.toJSONString(resList);
		out.collect(result);
	}
	private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
		switch (type) {
			case 0:
				return actVal < targetVal;
			case 1:
				return actVal.equals(targetVal);
			case 2:
				return actVal > targetVal;
			default:
				return false;
		}
	}
}

三个实体类

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private Integer mathVal;
    private Integer physicsVal;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private String descInfo;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {
	private static final long serialVersionUID = -1L;
	private String target;
	//0小于 1等于 2大于
	private Integer type;
	private Integer criticalVal;
	private String descInfo;
}

项目演示

创建kafka生产者 test

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

创建kafka消费者 demo

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

启动flink应用

给topic test发送消息

{"name":"liu","MathVal":45,"PhysicsVal":76}

消费topic demo

告警系统架构

到此这篇关于Java Flink与kafka实现实时告警功能过程的文章就介绍到这了,更多相关Java Flink与kafka实时告警内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring中的循环依赖问题

    Spring中的循环依赖问题

    在Spring框架中,循环依赖是指两个或多个Bean相互依赖,这导致在Bean的创建过程中出现依赖死锁,为了解决这一问题,Spring引入了三级缓存机制,包括singletonObjects、earlySingletonObjects和singletonFactories
    2024-09-09
  • Spring Boot教程之利用ActiveMQ实现延迟消息

    Spring Boot教程之利用ActiveMQ实现延迟消息

    这篇文章主要给大家介绍了关于Spring Boot教程之利用ActiveMQ实现延迟消息的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-11-11
  • SpringBoot整个启动过程的分析

    SpringBoot整个启动过程的分析

    今天小编就为大家分享一篇关于SpringBoot整个启动过程的分析,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • Java数据结构之LinkedList的用法详解

    Java数据结构之LinkedList的用法详解

    链表(Linked list)是一种常见的基础数据结构,是一种线性表。Java的LinkedList(链表) 类似于 ArrayList,是一种常用的数据容器,本文就来简单讲讲它的使用吧
    2023-05-05
  • Springboot HTTP如何调用其他服务

    Springboot HTTP如何调用其他服务

    这篇文章主要介绍了Springboot HTTP如何调用其他服务,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • 使用Java的Lucene搜索工具对检索结果进行分组和分页

    使用Java的Lucene搜索工具对检索结果进行分组和分页

    这篇文章主要介绍了使用Java的搜索工具Lucene对检索结果进行分组和分页的方法,Luence是Java环境中的一个全文检索引擎工具包,需要的朋友可以参考下
    2016-03-03
  • Spring自定义注解配置简单日志示例

    Spring自定义注解配置简单日志示例

    这篇文章主要介绍了Spring自定义注解配置简单日志示例,注解可以增强我们的java代码,同时利用反射技术可以扩充实现很多功能,它们被广泛应用于三大框架底层,需要的朋友可以参考下
    2023-05-05
  • SpringBoot定制JSON响应数据返回的示例代码

    SpringBoot定制JSON响应数据返回的示例代码

    @JsonView 是 Jackson 库中的一个注解,它允许你定义哪些属性应该被序列化到 JSON 中,基于不同的“视图”或“配置”,在本文中,通过了解@JsonView,你将能够更好地掌握如何在Spring Boot应用中定制JSON数据的输出,需要的朋友可以参考下
    2024-05-05
  • 解决Spring MVC中文乱码的编码配置

    解决Spring MVC中文乱码的编码配置

    这篇文章主要为大家介绍了解决SpringMVC中文乱码的编码配置示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • Java OOP三大特征之封装继承与多态详解

    Java OOP三大特征之封装继承与多态详解

    本文主要讲述的是面向对象的三大特性:封装,继承,多态,内容含括从封装到继承再到多态的所有重点内容以及使用细节和注意事项,内容有点长,请大家耐心看完
    2022-07-07

最新评论