Python3实时操作处理日志文件的实现
最近,需要对日志文件进行实时数据处理。
一、简单的实时文件处理(单一文件)
假设我们要实时读取的日志的路径为: /data/mongodb/shard1/log/pg.csv
那么我们可以在python文件中使用shell脚本命令tail -F 进行实时读取并操作
代码如下:
import re import codecs import subprocess def pg_data_to_elk(): p = subprocess.Popen('tail -F /data/mongodb/shard1/log/pg.csv', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) #起一个进程,执行shell命令 while True: line = p.stdout.readline() #实时获取行 if line: #如果行存在的话 xxxxxxxxxxxx your operation
简单解释一下subprocess模块:
subprocess允许你生成新的进程,连接到它们的 input/output/error 管道,并获取它们的返回(状态)码。
subprocess.Popen介绍
该类用于在一个新的进程中执行一个子程序。
subprocess.Popen的构造函数
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())
参数说明:
- args: 要执行的shell命令,可以是字符串,也可以是命令各个参数组成的序列。当该参数的值是一个字符串时,该命令的解释过程是与平台相关的,因此通常建议将args参数作为一个序列传递。
- stdin, stdout, stderr: 分别表示程序标准输入、输出、错误句柄。
- shell: 该参数用于标识是否使用shell作为要执行的程序,如果shell值为True,则建议将args参数作为一个字符串传递而不要作为一个序列传递。
二、复杂的实时文件处理(不断产生新文件)
如果日志会在满足一定条件下产生新的日志文件,比如log1.csv已经到了20M,那么则会写入log2.csv,这样一天下来大概有1000多个文件,且不断产生新的,那么如何进行实时获取呢?
思路如下:
在实时监听(tail -F)中加入当前文件的大小判定,如果当前文件大小大于20M,那么跳出实时监听,获取新的日志文件。(如果有其他判定条件也是这个思路,只不过把当前文件大小的判定换成你所需要的判定)
代码如下:
import re import os import time import codecs import subprocess from datetime import datetime path = '/home/liao/python/csv' time_now_day = datetime.now.strftime('%Y-%m-%d') def get_file_size(new_file): fsize = os.path.getsize(new_file) fsize = fsize/float(1024*1024) return fsize def get_the_new_file(): files = os.listdir(path) files_list = list(filter(lambda x:x[-4:]=='.csv' and x[11:21]==time_now_day, files)) files_list.sort(key=lambda fn:os.path.getmtime(path + '/' + fn) if not os.path.isdir(path + '/' + fn) else 0) new_file = os.path.join(path, files_list[-1]) return new_file def pg_data_to_elk(): while True: new_file = get_the_new_file() p = subprocess.Popen('tail -F {0}'.format(new_file), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) #起一个进程,执行shell命令 while True: line = p.stdout.readline() #实时获取行 if line: #如果行存在的话 if get_file_size(new_file) > 20: #如果大于20M,则跳出循环 break xxxxxxxxxxxx your operation time.sleep(3)
到此这篇关于Python3实时操作处理日志文件的实现的文章就介绍到这了,更多相关Python3实时操作日志文件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
使用python3调用wxpy模块监控linux日志并定时发送消息给群组或好友
这篇文章主要介绍了使用python3调用wxpy模块,监控linux日志并定时发送消息给群组或好友,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下2019-06-06Python的Tornado框架实现异步非阻塞访问数据库的示例
Tornado框架的异步非阻塞特性是其最大的亮点,这里我们将立足于基础来介绍一种简单的Python的Tornado框架实现异步非阻塞访问数据库的示例:2016-06-06pandas 实现 in 和 not in 的用法及使用心得
pandas按条件筛选数据时,除了使用query()方法,还可以使用isin和对isin取反进行条件筛选,今天通过本文给大家介绍pandas 实现 in 和 not in 的用法及使用心得,感兴趣的朋友跟随小编一起看看吧2023-01-01
最新评论