Flask SocketIO实现动态绘图的示例详解
Flask-SocketIO 是基于 Flask 的一个扩展,用于简化在 Flask 应用中集成 WebSocket 功能。WebSocket 是一种在客户端和服务器之间实现实时双向通信的协议,常用于实现实时性要求较高的应用,如聊天应用、实时通知等,使得开发者可以更轻松地构建实时性要求较高的应用。通过定义事件处理函数,可以实现双向实时通信,为应用提供更加丰富和实时的用户体验。
前端参数拼接
Flask 提供了针对WebSocket的支持插件flask_socketio
直接通过pip命令安装即可导入使用,同时前端也需要引入SocketIO.js
库文件。
如下代码通过ECharts图表库和WebSocket技术实现了一个实时监控主机CPU负载的动态折线图。通过WebSocket连接到Flask应用中的Socket.IO命名空间,前端通过实时接收后端传来的CPU负载数据,动态更新折线图,展示1分钟、5分钟和15分钟的CPU负载趋势。同时,通过控制台打印实时数据,实现了方便的调试和监控功能。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <script type="text/javascript" src="https://www.lyshark.com/javascript/jquery/3.5.1/jquery.min.js"></script> <script type="text/javascript" src="https://www.lyshark.com/javascript/socket.io/socket.io.min.js"></script> <script type="text/javascript" src="https://www.lyshark.com/javascript/echarts/5.3.0/echarts.min.js"></script> </head> <body> <div id="Linechart" style="height:500px;width:1200px;border:1px solid #673ab7;padding:10px;"></div> <!-- 执行绘图函数--> <script type="text/javascript" charset="UTF-8"> var display = function(time,x,y,z) { var echo = echarts.init(document.getElementById("Linechart")); var option = { title: { left: 'left', text: 'CPU 利用表动态监控', }, // 调节大小 grid: { left: '3%', right: '4%', bottom: '3%', containLabel: true }, // tooltip 鼠标放上去之后会自动出现坐标 tooltip: { trigger: 'axis', axisPointer: { type: 'cross', label: { backgroundColor: '#6a7985' } } }, legend: { data: ['1分钟负载', '5分钟负载', '15分钟负载'] }, xAxis: { type: 'category', // data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] data: time }, yAxis: { type: 'value' }, series: [ { name: "1分钟负载", stack: "总量", //data: [10, 25, 99, 87, 54, 66, 2], data: x, type: 'line', areaStyle: {} }, { name: "5分钟负载", stack: "总量", //data: [89, 57, 85, 44, 25, 4, 54], data: y, type: 'line', areaStyle: {} }, { name: "15分钟负载", stack: "总量", //data: [1, 43, 2, 12, 5, 4, 7], data: z, type: 'line', areaStyle: {} } ] }; echo.setOption(option,true); } </script> <!-- 负责对参数的解析,填充数据 --> <script type="text/javascript" charset="UTF-8"> var time =["","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","",""]; var cpu_load1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]; var cpu_load5 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]; var cpu_load15 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]; var update_function = function(recv) { time.push(recv.datetime); cpu_load1.push(parseFloat(recv.load1)); cpu_load5.push(parseFloat(recv.load5)); cpu_load15.push(parseFloat(recv.load15)); if(time.length >=10) { time.shift(); cpu_load1.shift(); cpu_load5.shift(); cpu_load15.shift(); console.log("时间数组: " + time); console.log("1分钟: " + cpu_load1); console.log("5分钟: " + cpu_load5); console.log("15分钟: " + cpu_load15); // 调用绘图函数 display(time,cpu_load1,cpu_load5,cpu_load15); } }; </script> <!-- 负责接收目标主机的CPU负载情况 --> <script type="text/javascript" charset="UTF-8"> $(document).ready(function() { namespace = '/Socket'; var socket = io.connect("http://" + document.domain + ":" + location.port + namespace); socket.emit("message",{"data":"hello lyshark"}); // 初始化完成后,发送一条消息. socket.on('response', function(recv) { console.log("时间: " + recv.datetime); console.log("1分钟: " + recv.load1); console.log("5分钟: " + recv.load5); console.log("15分钟: " + recv.load15); // 调用函数完成数据填充 update_function(recv); }); }); </script> </body> </html>
后台代码使用Flask和Flask-SocketIO搭建了一个实时监控主机CPU负载的WebSocket应用,并将数据通过socketio.emit
函数将数据推送给前端展示。
关键点概括如下:
Flask和SocketIO集成:
使用Flask框架创建了一个Web应用,并通过Flask-SocketIO集成了WebSocket功能,实现了实时双向通信。
消息接收与实时推送:
定义了socket
事件处理函数,用于接收前端通过WebSocket发送的消息。在无限循环中,通过socketio.sleep
方法设置每2秒推送一次实时的CPU负载数据给前端。
前端连接和断开事件:
定义了connect
和disconnect
事件处理函数,分别在WebSocket连接建立和断开时触发。在控制台打印相应信息,用于监控连接状态。
实时数据推送:
使用socketio.emit
方法实时将CPU负载数据推送给前端,以更新折线图。推送的数据包括当前时间、1分钟负载、5分钟负载和15分钟负载。
前端页面渲染:
通过Flask的render_template
方法渲染了一个HTML页面,用于展示实时更新的CPU负载折线图。
调试信息输出:
在每个事件处理函数中使用print
语句输出调试信息,方便监测WebSocket连接和消息的传递过程。
总体来说,该应用实现了一个简单而实用的实时监控系统,通过WebSocket技术实时推送主机CPU负载数据至前端,为用户提供了实时可视化的监控体验。
from flask import Flask,render_template,request from flask_socketio import SocketIO import time,psutil async_mode = None app = Flask(__name__) app.config['SECRET_KEY'] = "lyshark" socketio = SocketIO(app) @app.route("/") def index(): return render_template("index.html") # 出现消息后,率先执行此处 @socketio.on("message",namespace="/Socket") def socket(message): print("接收到消息:",message['data']) while True: socketio.sleep(2) data = time.strftime("%M:%S",time.localtime()) cpu = psutil.cpu_percent(interval=None,percpu=True) socketio.emit("response", {"datetime": data, "load1": cpu[0], "load5": cpu[1], "load15": cpu[2]}, namespace="/Socket") # 当websocket连接成功时,自动触发connect默认方法 @socketio.on("connect",namespace="/Socket") def connect(): print("链接建立成功..") # 当websocket连接失败时,自动触发disconnect默认方法 @socketio.on("disconnect",namespace="/Socket") def disconnect(): print("链接建立失败..") if __name__ == '__main__': socketio.run(app,debug=True)
运行后,即可输出当前系统下CPU的负载情况,如下图所示;
后端参数拼接
如上所示的代码是在前端进行的数据拼接,如果我们想要在后端进行数据的拼接,则需要对代码进行一定的改进。
前端编写以下代码,通过WebSocket建立通信隧道,而后台则每隔2秒向前台推送传递字典数据。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <script type="text/javascript" src="https://www.lyshark.com/javascript/jquery/3.5.1/jquery.min.js"></script> <script type="text/javascript" src="https://www.lyshark.com/javascript/socket.io/socket.io.min.js"></script> <script type="text/javascript" src="https://www.lyshark.com/javascript/echarts/5.3.0/echarts.min.js"></script> </head> <body> <div id="Linechart" style="height:500px;width:1200px;border:1px solid #673ab7;padding:10px;"></div> <!-- 执行绘图函数--> <script type="text/javascript" charset="UTF-8"> var display = function(time,x,y,z) { var echo = echarts.init(document.getElementById("Linechart")); var option = { title: { left: 'left', text: 'CPU 利用表动态监控', }, // 调节大小 grid: { left: '3%', right: '4%', bottom: '3%', containLabel: true }, // tooltip 鼠标放上去之后会自动出现坐标 tooltip: { trigger: 'axis', axisPointer: { type: 'cross', label: { backgroundColor: '#6a7985' } } }, legend: { data: ['1分钟负载', '5分钟负载', '15分钟负载'] }, xAxis: { type: 'category', // data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] data: time }, yAxis: { type: 'value' }, series: [ { name: "1分钟负载", stack: "总量", //data: [10, 25, 99, 87, 54, 66, 2], data: x, type: 'line', areaStyle: {} }, { name: "5分钟负载", stack: "总量", //data: [89, 57, 85, 44, 25, 4, 54], data: y, type: 'line', areaStyle: {} }, { name: "15分钟负载", stack: "总量", //data: [1, 43, 2, 12, 5, 4, 7], data: z, type: 'line', areaStyle: {} } ] }; echo.setOption(option,true); } </script> <!-- 负责接收目标主机的CPU负载情况 --> <script type="text/javascript" charset="UTF-8"> $(document).ready(function() { namespace = '/Socket'; var socket = io.connect("http://" + document.domain + ":" + location.port + namespace); socket.emit("message",{"data":"hello lyshark"}); // 初始化完成后,发送一条消息. socket.on('response', function(recv) { console.log("时间: " + recv.datetime); console.log("1分钟: " + recv.load1); console.log("5分钟: " + recv.load5); console.log("15分钟: " + recv.load15); // 调用绘图函数 display(recv.datetime,recv.load1,recv.load5,recv.load15); }); }); </script> </body> </html>
后台代码则是收集数据,并将数据通过socketio.emit
函数,推送给前端。
from flask import Flask,render_template,request from flask_socketio import SocketIO import time,psutil async_mode = None app = Flask(__name__) app.config['SECRET_KEY'] = "lyshark" socketio = SocketIO(app) # 填充数据表 local_time = ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""] cpu_load1 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] cpu_load5 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] cpu_load15 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # 左移填充 def shift(Array, Size, Push): if len(Array) <= Size and len(Array) >= 0: Array.pop(0) Array.append(Push) return True return False # 右移填充 def unshift(Array, Size, Push): if len(Array) <= Size and len(Array) >= 0: Array.pop(Size-1) Array.insert(0,Push) @app.route("/") def index(): return render_template("index.html") # 出现消息后,率先执行此处 @socketio.on("message",namespace="/Socket") def socket(message): print("接收到消息:",message['data']) while True: socketio.sleep(1) data = time.strftime("%M:%S",time.localtime()) cpu = psutil.cpu_percent(interval=None,percpu=True) # 实现数组最大35,每次左移覆盖第一个 shift(local_time,35,data) shift(cpu_load1,35,cpu[0]) shift(cpu_load5, 35, cpu[1]) shift(cpu_load15, 35, cpu[2]) socketio.emit("response", {"datetime": local_time, "load1": cpu_load1, "load5": cpu_load5, "load15": cpu_load15}, namespace="/Socket") # 当websocket连接成功时,自动触发connect默认方法 @socketio.on("connect",namespace="/Socket") def connect(): print("链接建立成功..") # 当websocket连接失败时,自动触发disconnect默认方法 @socketio.on("disconnect",namespace="/Socket") def disconnect(): print("链接建立失败..") if __name__ == '__main__': socketio.run(app,debug=True)
运行动态图如下所示;
以上就是Flask SocketIO实现动态绘图的示例详解的详细内容,更多关于Flask SocketIO动态绘图的资料请关注脚本之家其它相关文章!
相关文章
Appium+python自动化之连接模拟器并启动淘宝APP(超详解)
这篇文章主要介绍了Appium+python自动化之 连接模拟器并启动淘宝APP(超详解)本文以淘宝app为例,通过实例代码给大家介绍的非常详细,需要的朋友可以参考下2019-06-06
最新评论