PowerJob的GridFsManager工作流程源码解读
序
本文主要研究一下PowerJob的GridFsManager
GridFsManager
tech/powerjob/server/persistence/mongodb/GridFsManager.java
@Slf4j @Service public class GridFsManager implements InitializingBean { private final Environment environment; private final MongoDatabase db; private boolean available; private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap(); public static final String LOG_BUCKET = "log"; public static final String CONTAINER_BUCKET = "container"; public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) { this.environment = environment; if (mongoTemplate != null) { this.db = mongoTemplate.getDb(); } else { this.db = null; } } /** * 是否可用 * @return true:可用;false:不可用 */ public boolean available() { return available; } //...... private GridFSBucket getBucket(String bucketName) { return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName)); } @Override public void afterPropertiesSet() throws Exception { String enable = environment.getProperty(PowerJobServerConfigKey.MONGODB_ENABLE, Boolean.FALSE.toString()); available = Boolean.TRUE.toString().equals(enable) && db != null; log.info("[GridFsManager] available: {}, db: {}", available, db); } }
GridFsManager实现了InitializingBean接口,其afterPropertiesSet从environment读取oms.mongodb.enable配置,默认为false;其构造器注入mongoTemplate,若为null则available为false;其getBucket方法则根据bucketName缓存到bucketCache,若不存在则通过GridFSBuckets.create创建
store
/** * 存储文件到 GridFS * @param localFile 本地文件 * @param bucketName 桶名称 * @param fileName GirdFS中的文件名称 * @throws IOException 异常 */ public void store(File localFile, String bucketName, String fileName) throws IOException { if (available()) { GridFSBucket bucket = getBucket(bucketName); try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) { bucket.uploadFromStream(fileName, bis); } } }
store方法先获取bucket,再读取localFile,通过bucket.uploadFromStream上传
download
/** * 从 GridFS 下载文件 * @param targetFile 下载的目标文件(本地文件) * @param bucketName 桶名称 * @param fileName GirdFS中的文件名称 * @throws IOException 异常 */ public void download(File targetFile, String bucketName, String fileName) throws IOException { if (available()) { GridFSBucket bucket = getBucket(bucketName); try (GridFSDownloadStream gis = bucket.openDownloadStream(fileName); BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile)) ) { byte[] buffer = new byte[1024]; int bytes = 0; while ((bytes = gis.read(buffer)) != -1) { bos.write(buffer, 0, bytes); } bos.flush(); } } }
download方法则先获取bucket,再通过bucket.openDownloadStream获取GridFSDownloadStream,最后写到targetFile
deleteBefore
/** * 删除几天前的文件 * @param bucketName 桶名称 * @param day 日期偏移量,单位 天 */ public void deleteBefore(String bucketName, int day) { Stopwatch sw = Stopwatch.createStarted(); Date date = DateUtils.addDays(new Date(), -day); GridFSBucket bucket = getBucket(bucketName); Bson filter = Filters.lt("uploadDate", date); // 循环删除性能很差?我猜你肯定没看过官方实现[狗头]:org.springframework.data.mongodb.gridfs.GridFsTemplate.delete bucket.find(filter).forEach((Consumer<GridFSFile>) gridFSFile -> { ObjectId objectId = gridFSFile.getObjectId(); try { bucket.delete(objectId); log.info("[GridFsManager] deleted {}#{}", bucketName, objectId); }catch (Exception e) { log.error("[GridFsManager] deleted {}#{} failed.", bucketName, objectId, e); } }); log.info("[GridFsManager] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop()); }
deleteBefore主要通过bucket.find(Filters.lt("uploadDate", date))找到GridFSFile,再挨个执行delete
exists
public boolean exists(String bucketName, String fileName) { GridFSBucket bucket = getBucket(bucketName); GridFSFindIterable files = bucket.find(Filters.eq("filename", fileName)); try { GridFSFile first = files.first(); return first != null; }catch (Exception ignore) { } return false; }
exists方法则通过bucket.find(Filters.eq("filename", fileName))来进行查找
sync
tech/powerjob/server/core/instance/InstanceLogService.java
@Async(PJThreadPool.BACKGROUND_POOL) public void sync(Long instanceId) { Stopwatch sw = Stopwatch.createStarted(); try { // 先持久化到本地文件 File stableLogFile = genStableLogFile(instanceId); // 将文件推送到 MongoDB if (gridFsManager.available()) { try { gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId)); log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop()); }catch (Exception e) { log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e); } } }catch (Exception e) { log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e); } // 删除本地数据库数据 try { instanceId2LastReportTime.remove(instanceId); CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId); }catch (Exception e) { log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); } }
InstanceLogService的sync方法先持久化到本地文件,再将文件推送到 MongoDB
小结
GridFsManager实现了InitializingBean接口,其afterPropertiesSet从environment读取oms.mongodb.enable
配置,默认为false;其构造器注入mongoTemplate,若为null则available为false;其store和download方法都先判断是否available,为false则空操作。
以上就是PowerJob的GridFsManager工作流程源码解读的详细内容,更多关于PowerJob GridFsManager的资料请关注脚本之家其它相关文章!
相关文章
Springboot使用Maven占位符@替换不生效问题及解决
这篇文章主要介绍了Springboot使用Maven占位符@替换不生效问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2023-04-04Java通过在主循环中判断Boolean来停止线程的方法示例
这篇文章主要介绍了Java通过在主循环中判断Boolean来停止线程的方法,结合具体实例形式分析了java针对线程的判断与停止操作相关实现技巧,需要的朋友可以参考下2017-04-04Spring Cloud Ribbon 负载均衡使用策略示例详解
Spring Cloud Ribbon 是基于Netflix Ribbon 实现的一套客户端负载均衡工具,Ribbon客户端组件提供了一系列的完善的配置,如超时,重试等,这篇文章主要介绍了Spring Cloud Ribbon 负载均衡使用策略示例详解,需要的朋友可以参考下2023-03-03
最新评论