redis实现延迟任务的项目实践
一、实现思路
问题思路
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据类型,list和zset?
效率问题,算法的时间复杂度
3.在添加zset数据的时候,为什么不需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
二、延迟任务服务实现
1、搭建heima-leadnews-schedule模块
leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务
①:导入资料文件夹下的heima-leadnews-schedule模块到heima-leadnews-service下。
如下图所示:
②:添加bootstrap.yml
server: port: 51701 spring: application: name: leadnews-schedule cloud: nacos: discovery: server-addr: 192.168.200.130:8848 config: server-addr: 192.168.200.130:8848 file-extension: yml
③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: root password: root # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置 mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 设置别名包扫描路径,通过该属性可以给包中的类注册别名 type-aliases-package: com.heima.model.schedule.pojos
2、数据库准备
导入资料中leadnews_schedule数据库
taskinfo 任务表
实体类
package com.heima.model.schedule.pojos; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import java.io.Serializable; import java.util.Date; /** * <p> * * </p> * * @author itheima */ @Data @TableName("taskinfo") public class Taskinfo implements Serializable { private static final long serialVersionUID = 1L; /** * 任务id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 执行时间 */ @TableField("execute_time") private Date executeTime; /** * 参数 */ @TableField("parameters") private byte[] parameters; /** * 优先级 */ @TableField("priority") private Integer priority; /** * 任务类型 */ @TableField("task_type") private Integer taskType; }
taskinfo_logs 任务日志表
实体类
package com.heima.model.schedule.pojos; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.io.Serializable; import java.util.Date; /** * <p> * * </p> * * @author itheima */ @Data @TableName("taskinfo_logs") public class TaskinfoLogs implements Serializable { private static final long serialVersionUID = 1L; /** * 任务id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 执行时间 */ @TableField("execute_time") private Date executeTime; /** * 参数 */ @TableField("parameters") private byte[] parameters; /** * 优先级 */ @TableField("priority") private Integer priority; /** * 任务类型 */ @TableField("task_type") private Integer taskType; /** * 版本号,用乐观锁 */ @Version private Integer version; /** * 状态 0=int 1=EXECUTED 2=CANCELLED */ @TableField("status") private Integer status; }
乐观锁支持:
/** * mybatis-plus乐观锁支持 * @return */ @Bean public MybatisPlusInterceptor optimisticLockerInterceptor(){ MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; }
3、安装redis
①拉取镜像
docker pull redis
② 创建容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
③链接测试
打开资料中的Redis Desktop Manager,输入host、port、password链接测试
能链接成功,即可
4、项目集成redis
①:在项目导入redis相关依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- redis依赖commons-pool 这个依赖一定要添加 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
②:在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis
spring: redis: host: 192.168.200.130 password: leadnews port: 6379
③:拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置
cacheService.java
package com.heima.common.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.*; import org.springframework.data.redis.core.*; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import org.springframework.data.redis.core.types.Expiration; import org.springframework.lang.Nullable; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; @Component public class CacheService extends CachingConfigurerSupport { @Autowired private StringRedisTemplate stringRedisTemplate; public StringRedisTemplate getstringRedisTemplate() { return this.stringRedisTemplate; } /** -------------------key相关操作--------------------- */ /** * 删除key * * @param key */ public void delete(String key) { stringRedisTemplate.delete(key); } /** * 批量删除key * * @param keys */ public void delete(Collection<String> keys) { stringRedisTemplate.delete(keys); } /** * 序列化key * * @param key * @return */ public byte[] dump(String key) { return stringRedisTemplate.dump(key); } /** * 是否存在key * * @param key * @return */ public Boolean exists(String key) { return stringRedisTemplate.hasKey(key); } /** * 设置过期时间 * * @param key * @param timeout * @param unit * @return */ public Boolean expire(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.expire(key, timeout, unit); } /** * 设置过期时间 * * @param key * @param date * @return */ public Boolean expireAt(String key, Date date) { return stringRedisTemplate.expireAt(key, date); } /** * 查找匹配的key * * @param pattern * @return */ public Set<String> keys(String pattern) { return stringRedisTemplate.keys(pattern); } /** * 将当前数据库的 key 移动到给定的数据库 db 当中 * * @param key * @param dbIndex * @return */ public Boolean move(String key, int dbIndex) { return stringRedisTemplate.move(key, dbIndex); } /** * 移除 key 的过期时间,key 将持久保持 * * @param key * @return */ public Boolean persist(String key) { return stringRedisTemplate.persist(key); } /** * 返回 key 的剩余的过期时间 * * @param key * @param unit * @return */ public Long getExpire(String key, TimeUnit unit) { return stringRedisTemplate.getExpire(key, unit); } /** * 返回 key 的剩余的过期时间 * * @param key * @return */ public Long getExpire(String key) { return stringRedisTemplate.getExpire(key); } /** * 从当前数据库中随机返回一个 key * * @return */ public String randomKey() { return stringRedisTemplate.randomKey(); } /** * 修改 key 的名称 * * @param oldKey * @param newKey */ public void rename(String oldKey, String newKey) { stringRedisTemplate.rename(oldKey, newKey); } /** * 仅当 newkey 不存在时,将 oldKey 改名为 newkey * * @param oldKey * @param newKey * @return */ public Boolean renameIfAbsent(String oldKey, String newKey) { return stringRedisTemplate.renameIfAbsent(oldKey, newKey); } /** * 返回 key 所储存的值的类型 * * @param key * @return */ public DataType type(String key) { return stringRedisTemplate.type(key); } /** -------------------string相关操作--------------------- */ /** * 设置指定 key 的值 * @param key * @param value */ public void set(String key, String value) { stringRedisTemplate.opsForValue().set(key, value); } /** * 获取指定 key 的值 * @param key * @return */ public String get(String key) { return stringRedisTemplate.opsForValue().get(key); } /** * 返回 key 中字符串值的子字符 * @param key * @param start * @param end * @return */ public String getRange(String key, long start, long end) { return stringRedisTemplate.opsForValue().get(key, start, end); } /** * 将给定 key 的值设为 value ,并返回 key 的旧值(old value) * * @param key * @param value * @return */ public String getAndSet(String key, String value) { return stringRedisTemplate.opsForValue().getAndSet(key, value); } /** * 对 key 所储存的字符串值,获取指定偏移量上的位(bit) * * @param key * @param offset * @return */ public Boolean getBit(String key, long offset) { return stringRedisTemplate.opsForValue().getBit(key, offset); } /** * 批量获取 * * @param keys * @return */ public List<String> multiGet(Collection<String> keys) { return stringRedisTemplate.opsForValue().multiGet(keys); } /** * 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value * * @param key * @param * @param value * 值,true为1, false为0 * @return */ public boolean setBit(String key, long offset, boolean value) { return stringRedisTemplate.opsForValue().setBit(key, offset, value); } /** * 将值 value 关联到 key ,并将 key 的过期时间设为 timeout * * @param key * @param value * @param timeout * 过期时间 * @param unit * 时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES * 秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS */ public void setEx(String key, String value, long timeout, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, value, timeout, unit); } /** * 只有在 key 不存在时设置 key 的值 * * @param key * @param value * @return 之前已经存在返回false,不存在返回true */ public boolean setIfAbsent(String key, String value) { return stringRedisTemplate.opsForValue().setIfAbsent(key, value); } /** * 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始 * * @param key * @param value * @param offset * 从指定位置开始覆写 */ public void setRange(String key, String value, long offset) { stringRedisTemplate.opsForValue().set(key, value, offset); } /** * 获取字符串的长度 * * @param key * @return */ public Long size(String key) { return stringRedisTemplate.opsForValue().size(key); } /** * 批量添加 * * @param maps */ public void multiSet(Map<String, String> maps) { stringRedisTemplate.opsForValue().multiSet(maps); } /** * 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在 * * @param maps * @return 之前已经存在返回false,不存在返回true */ public boolean multiSetIfAbsent(Map<String, String> maps) { return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps); } /** * 增加(自增长), 负数则为自减 * * @param key * @param * @return */ public Long incrBy(String key, long increment) { return stringRedisTemplate.opsForValue().increment(key, increment); } /** * * @param key * @param * @return */ public Double incrByFloat(String key, double increment) { return stringRedisTemplate.opsForValue().increment(key, increment); } /** * 追加到末尾 * * @param key * @param value * @return */ public Integer append(String key, String value) { return stringRedisTemplate.opsForValue().append(key, value); } /** -------------------hash相关操作------------------------- */ /** * 获取存储在哈希表中指定字段的值 * * @param key * @param field * @return */ public Object hGet(String key, String field) { return stringRedisTemplate.opsForHash().get(key, field); } /** * 获取所有给定字段的值 * * @param key * @return */ public Map<Object, Object> hGetAll(String key) { return stringRedisTemplate.opsForHash().entries(key); } /** * 获取所有给定字段的值 * * @param key * @param fields * @return */ public List<Object> hMultiGet(String key, Collection<Object> fields) { return stringRedisTemplate.opsForHash().multiGet(key, fields); } public void hPut(String key, String hashKey, String value) { stringRedisTemplate.opsForHash().put(key, hashKey, value); } public void hPutAll(String key, Map<String, String> maps) { stringRedisTemplate.opsForHash().putAll(key, maps); } /** * 仅当hashKey不存在时才设置 * * @param key * @param hashKey * @param value * @return */ public Boolean hPutIfAbsent(String key, String hashKey, String value) { return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value); } /** * 删除一个或多个哈希表字段 * * @param key * @param fields * @return */ public Long hDelete(String key, Object... fields) { return stringRedisTemplate.opsForHash().delete(key, fields); } /** * 查看哈希表 key 中,指定的字段是否存在 * * @param key * @param field * @return */ public boolean hExists(String key, String field) { return stringRedisTemplate.opsForHash().hasKey(key, field); } /** * 为哈希表 key 中的指定字段的整数值加上增量 increment * * @param key * @param field * @param increment * @return */ public Long hIncrBy(String key, Object field, long increment) { return stringRedisTemplate.opsForHash().increment(key, field, increment); } /** * 为哈希表 key 中的指定字段的整数值加上增量 increment * * @param key * @param field * @param delta * @return */ public Double hIncrByFloat(String key, Object field, double delta) { return stringRedisTemplate.opsForHash().increment(key, field, delta); } /** * 获取所有哈希表中的字段 * * @param key * @return */ public Set<Object> hKeys(String key) { return stringRedisTemplate.opsForHash().keys(key); } /** * 获取哈希表中字段的数量 * * @param key * @return */ public Long hSize(String key) { return stringRedisTemplate.opsForHash().size(key); } /** * 获取哈希表中所有值 * * @param key * @return */ public List<Object> hValues(String key) { return stringRedisTemplate.opsForHash().values(key); } /** * 迭代哈希表中的键值对 * * @param key * @param options * @return */ public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) { return stringRedisTemplate.opsForHash().scan(key, options); } /** ------------------------list相关操作---------------------------- */ /** * 通过索引获取列表中的元素 * * @param key * @param index * @return */ public String lIndex(String key, long index) { return stringRedisTemplate.opsForList().index(key, index); } /** * 获取列表指定范围内的元素 * * @param key * @param start * 开始位置, 0是开始位置 * @param end * 结束位置, -1返回所有 * @return */ public List<String> lRange(String key, long start, long end) { return stringRedisTemplate.opsForList().range(key, start, end); } /** * 存储在list头部 * * @param key * @param value * @return */ public Long lLeftPush(String key, String value) { return stringRedisTemplate.opsForList().leftPush(key, value); } /** * * @param key * @param value * @return */ public Long lLeftPushAll(String key, String... value) { return stringRedisTemplate.opsForList().leftPushAll(key, value); } /** * * @param key * @param value * @return */ public Long lLeftPushAll(String key, Collection<String> value) { return stringRedisTemplate.opsForList().leftPushAll(key, value); } /** * 当list存在的时候才加入 * * @param key * @param value * @return */ public Long lLeftPushIfPresent(String key, String value) { return stringRedisTemplate.opsForList().leftPushIfPresent(key, value); } /** * 如果pivot存在,再pivot前面添加 * * @param key * @param pivot * @param value * @return */ public Long lLeftPush(String key, String pivot, String value) { return stringRedisTemplate.opsForList().leftPush(key, pivot, value); } /** * * @param key * @param value * @return */ public Long lRightPush(String key, String value) { return stringRedisTemplate.opsForList().rightPush(key, value); } /** * * @param key * @param value * @return */ public Long lRightPushAll(String key, String... value) { return stringRedisTemplate.opsForList().rightPushAll(key, value); } /** * * @param key * @param value * @return */ public Long lRightPushAll(String key, Collection<String> value) { return stringRedisTemplate.opsForList().rightPushAll(key, value); } /** * 为已存在的列表添加值 * * @param key * @param value * @return */ public Long lRightPushIfPresent(String key, String value) { return stringRedisTemplate.opsForList().rightPushIfPresent(key, value); } /** * 在pivot元素的右边添加值 * * @param key * @param pivot * @param value * @return */ public Long lRightPush(String key, String pivot, String value) { return stringRedisTemplate.opsForList().rightPush(key, pivot, value); } /** * 通过索引设置列表元素的值 * * @param key * @param index * 位置 * @param value */ public void lSet(String key, long index, String value) { stringRedisTemplate.opsForList().set(key, index, value); } /** * 移出并获取列表的第一个元素 * * @param key * @return 删除的元素 */ public String lLeftPop(String key) { return stringRedisTemplate.opsForList().leftPop(key); } /** * 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 * * @param key * @param timeout * 等待时间 * @param unit * 时间单位 * @return */ public String lBLeftPop(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().leftPop(key, timeout, unit); } /** * 移除并获取列表最后一个元素 * * @param key * @return 删除的元素 */ public String lRightPop(String key) { return stringRedisTemplate.opsForList().rightPop(key); } /** * 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 * * @param key * @param timeout * 等待时间 * @param unit * 时间单位 * @return */ public String lBRightPop(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().rightPop(key, timeout, unit); } /** * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回 * * @param sourceKey * @param destinationKey * @return */ public String lRightPopAndLeftPush(String sourceKey, String destinationKey) { return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey, destinationKey); } /** * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 * * @param sourceKey * @param destinationKey * @param timeout * @param unit * @return */ public String lBRightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit); } /** * 删除集合中值等于value得元素 * * @param key * @param index * index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素; * index<0, 从尾部开始删除第一个值等于value的元素; * @param value * @return */ public Long lRemove(String key, long index, String value) { return stringRedisTemplate.opsForList().remove(key, index, value); } /** * 裁剪list * * @param key * @param start * @param end */ public void lTrim(String key, long start, long end) { stringRedisTemplate.opsForList().trim(key, start, end); } /** * 获取列表长度 * * @param key * @return */ public Long lLen(String key) { return stringRedisTemplate.opsForList().size(key); } /** --------------------set相关操作-------------------------- */ /** * set添加元素 * * @param key * @param values * @return */ public Long sAdd(String key, String... values) { return stringRedisTemplate.opsForSet().add(key, values); } /** * set移除元素 * * @param key * @param values * @return */ public Long sRemove(String key, Object... values) { return stringRedisTemplate.opsForSet().remove(key, values); } /** * 移除并返回集合的一个随机元素 * * @param key * @return */ public String sPop(String key) { return stringRedisTemplate.opsForSet().pop(key); } /** * 将元素value从一个集合移到另一个集合 * * @param key * @param value * @param destKey * @return */ public Boolean sMove(String key, String value, String destKey) { return stringRedisTemplate.opsForSet().move(key, value, destKey); } /** * 获取集合的大小 * * @param key * @return */ public Long sSize(String key) { return stringRedisTemplate.opsForSet().size(key); } /** * 判断集合是否包含value * * @param key * @param value * @return */ public Boolean sIsMember(String key, Object value) { return stringRedisTemplate.opsForSet().isMember(key, value); } /** * 获取两个集合的交集 * * @param key * @param otherKey * @return */ public Set<String> sIntersect(String key, String otherKey) { return stringRedisTemplate.opsForSet().intersect(key, otherKey); } /** * 获取key集合与多个集合的交集 * * @param key * @param otherKeys * @return */ public Set<String> sIntersect(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().intersect(key, otherKeys); } /** * key集合与otherKey集合的交集存储到destKey集合中 * * @param key * @param otherKey * @param destKey * @return */ public Long sIntersectAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey, destKey); } /** * key集合与多个集合的交集存储到destKey集合中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sIntersectAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys, destKey); } /** * 获取两个集合的并集 * * @param key * @param otherKeys * @return */ public Set<String> sUnion(String key, String otherKeys) { return stringRedisTemplate.opsForSet().union(key, otherKeys); } /** * 获取key集合与多个集合的并集 * * @param key * @param otherKeys * @return */ public Set<String> sUnion(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().union(key, otherKeys); } /** * key集合与otherKey集合的并集存储到destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long sUnionAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey); } /** * key集合与多个集合的并集存储到destKey中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sUnionAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey); } /** * 获取两个集合的差集 * * @param key * @param otherKey * @return */ public Set<String> sDifference(String key, String otherKey) { return stringRedisTemplate.opsForSet().difference(key, otherKey); } /** * 获取key集合与多个集合的差集 * * @param key * @param otherKeys * @return */ public Set<String> sDifference(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().difference(key, otherKeys); } /** * key集合与otherKey集合的差集存储到destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long sDifference(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey, destKey); } /** * key集合与多个集合的差集存储到destKey中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sDifference(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys, destKey); } /** * 获取集合所有元素 * * @param key * @param * @param * @return */ public Set<String> setMembers(String key) { return stringRedisTemplate.opsForSet().members(key); } /** * 随机获取集合中的一个元素 * * @param key * @return */ public String sRandomMember(String key) { return stringRedisTemplate.opsForSet().randomMember(key); } /** * 随机获取集合中count个元素 * * @param key * @param count * @return */ public List<String> sRandomMembers(String key, long count) { return stringRedisTemplate.opsForSet().randomMembers(key, count); } /** * 随机获取集合中count个元素并且去除重复的 * * @param key * @param count * @return */ public Set<String> sDistinctRandomMembers(String key, long count) { return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count); } /** * * @param key * @param options * @return */ public Cursor<String> sScan(String key, ScanOptions options) { return stringRedisTemplate.opsForSet().scan(key, options); } /**------------------zSet相关操作--------------------------------*/ /** * 添加元素,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @param score * @return */ public Boolean zAdd(String key, String value, double score) { return stringRedisTemplate.opsForZSet().add(key, value, score); } /** * * @param key * @param values * @return */ public Long zAdd(String key, Set<TypedTuple<String>> values) { return stringRedisTemplate.opsForZSet().add(key, values); } /** * * @param key * @param values * @return */ public Long zRemove(String key, Object... values) { return stringRedisTemplate.opsForZSet().remove(key, values); } public Long zRemove(String key, Collection<String> values) { if(values!=null&&!values.isEmpty()){ Object[] objs = values.toArray(new Object[values.size()]); return stringRedisTemplate.opsForZSet().remove(key, objs); } return 0L; } /** * 增加元素的score值,并返回增加后的值 * * @param key * @param value * @param delta * @return */ public Double zIncrementScore(String key, String value, double delta) { return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta); } /** * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @return 0表示第一位 */ public Long zRank(String key, Object value) { return stringRedisTemplate.opsForZSet().rank(key, value); } /** * 返回元素在集合的排名,按元素的score值由大到小排列 * * @param key * @param value * @return */ public Long zReverseRank(String key, Object value) { return stringRedisTemplate.opsForZSet().reverseRank(key, value); } /** * 获取集合的元素, 从小到大排序 * * @param key * @param start * 开始位置 * @param end * 结束位置, -1查询所有 * @return */ public Set<String> zRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().range(key, start, end); } /** * 获取zset集合的所有元素, 从小到大排序 * */ public Set<String> zRangeAll(String key) { return zRange(key,0,-1); } /** * 获取集合元素, 并且把score值也获取 * * @param key * @param start * @param end * @return */ public Set<TypedTuple<String>> zRangeWithScores(String key, long start, long end) { return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end); } /** * 根据Score值查询集合元素 * * @param key * @param min * 最小值 * @param max * 最大值 * @return */ public Set<String> zRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max); } /** * 根据Score值查询集合元素, 从小到大排序 * * @param key * @param min * 最小值 * @param max * 最大值 * @return */ public Set<TypedTuple<String>> zRangeByScoreWithScores(String key, double min, double max) { return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max); } /** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set<TypedTuple<String>> zRangeByScoreWithScores(String key, double min, double max, long start, long end) { return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max, start, end); } /** * 获取集合的元素, 从大到小排序 * * @param key * @param start * @param end * @return */ public Set<String> zReverseRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRange(key, start, end); } public Set<String> zReverseRangeByScore(String key, long min, long max) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max); } /** * 获取集合的元素, 从大到小排序, 并返回score值 * * @param key * @param start * @param end * @return */ public Set<TypedTuple<String>> zReverseRangeWithScores(String key, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start, end); } /** * 根据Score值查询集合元素, 从大到小排序 * * @param key * @param min * @param max * @return */ public Set<String> zReverseRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max); } /** * 根据Score值查询集合元素, 从大到小排序 * * @param key * @param min * @param max * @return */ public Set<TypedTuple<String>> zReverseRangeByScoreWithScores( String key, double min, double max) { return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, min, max); } /** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set<String> zReverseRangeByScore(String key, double min, double max, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max, start, end); } /** * 根据score值获取集合元素数量 * * @param key * @param min * @param max * @return */ public Long zCount(String key, double min, double max) { return stringRedisTemplate.opsForZSet().count(key, min, max); } /** * 获取集合大小 * * @param key * @return */ public Long zSize(String key) { return stringRedisTemplate.opsForZSet().size(key); } /** * 获取集合大小 * * @param key * @return */ public Long zZCard(String key) { return stringRedisTemplate.opsForZSet().zCard(key); } /** * 获取集合中value元素的score值 * * @param key * @param value * @return */ public Double zScore(String key, Object value) { return stringRedisTemplate.opsForZSet().score(key, value); } /** * 移除指定索引位置的成员 * * @param key * @param start * @param end * @return */ public Long zRemoveRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().removeRange(key, start, end); } /** * 根据指定的score值的范围来移除成员 * * @param key * @param min * @param max * @return */ public Long zRemoveRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max); } /** * 获取key和otherKey的并集并存储在destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long zUnionAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey); } /** * * @param key * @param otherKeys * @param destKey * @return */ public Long zUnionAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForZSet() .unionAndStore(key, otherKeys, destKey); } /** * 交集 * * @param key * @param otherKey * @param destKey * @return */ public Long zIntersectAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey, destKey); } /** * 交集 * * @param key * @param otherKeys * @param destKey * @return */ public Long zIntersectAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys, destKey); } /** * * @param key * @param options * @return */ public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) { return stringRedisTemplate.opsForZSet().scan(key, options); } /** * 扫描主键,建议使用 * @param patten * @return */ public Set<String> scan(String patten){ Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> { Set<String> result = new HashSet<>(); try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder() .match(patten).count(10000).build())) { while (cursor.hasNext()) { result.add(new String(cursor.next())); } } catch (IOException e) { e.printStackTrace(); } return result; }); return keys; } /** * 管道技术,提高性能 * @param type * @param values * @return */ public List<Object> lRightPushPipeline(String type,Collection<String> values){ List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() { public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisConnection stringRedisConn = (StringRedisConnection)connection; //集合转换数组 String[] strings = values.toArray(new String[values.size()]); //直接批量发送 stringRedisConn.rPush(type, strings); return null; } }); return results; } public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){ List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection; String[] strings = values.toArray(new String[values.size()]); stringRedisConnection.rPush(topic_key,strings); stringRedisConnection.zRem(future_key,strings); return null; } }); return objects; } /** * 加锁 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //参考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; } }
④:测试
package com.heima.schedule.test; import com.heima.common.redis.CacheService; import com.heima.schedule.ScheduleApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Set; @SpringBootTest(classes = ScheduleApplication.class) @RunWith(SpringRunner.class) public class RedisTest { @Autowired private CacheService cacheService; @Test public void testList(){ //在list的左边添加元素 // cacheService.lLeftPush("list_001","hello,redis"); //在list的右边获取元素,并删除 String list_001 = cacheService.lRightPop("list_001"); System.out.println(list_001); } @Test public void testZset(){ //添加数据到zset中 分值 /*cacheService.zAdd("zset_key_001","hello zset 001",1000); cacheService.zAdd("zset_key_001","hello zset 002",8888); cacheService.zAdd("zset_key_001","hello zset 003",7777); cacheService.zAdd("zset_key_001","hello zset 004",999999);*/ //按照分值获取数据 Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888); System.out.println(zset_key_001); } }
5、添加任务
①:拷贝mybatis-plus生成的文件,mapper
②:创建task类,用于接收添加任务的参数
package com.heima.model.schedule.dtos; import lombok.Data; import java.io.Serializable; @Data public class Task implements Serializable { /** * 任务id */ private Long taskId; /** * 类型 */ private Integer taskType; /** * 优先级 */ private Integer priority; /** * 执行id */ private long executeTime; /** * task参数 */ private byte[] parameters; }
③:创建TaskService
package com.heima.schedule.service; import com.heima.model.schedule.dtos.Task; /** * 对外访问接口 */ public interface TaskService { /** * 添加任务 * @param task 任务对象 * @return 任务id */ public long addTask(Task task) ; }
实现:
package com.heima.schedule.service.impl; import com.alibaba.fastjson.JSON; import com.heima.common.constants.ScheduleConstants; import com.heima.common.redis.CacheService; import com.heima.model.schedule.dtos.Task; import com.heima.model.schedule.pojos.Taskinfo; import com.heima.model.schedule.pojos.TaskinfoLogs; import com.heima.schedule.mapper.TaskinfoLogsMapper; import com.heima.schedule.mapper.TaskinfoMapper; import com.heima.schedule.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Calendar; import java.util.Date; @Service @Transactional @Slf4j public class TaskServiceImpl implements TaskService { /** * 添加延迟任务 * * @param task * @return */ @Override public long addTask(Task task) { //1.添加任务到数据库中 boolean success = addTaskToDb(task); if (success) { //2.添加任务到redis addTaskToCache(task); } return task.getTaskId(); } @Autowired private CacheService cacheService; /** * 把任务添加到redis中 * * @param task */ private void addTaskToCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); //获取5分钟之后的时间 毫秒值 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduleTime = calendar.getTimeInMillis(); //2.1 如果任务的执行时间小于等于当前时间,存入list if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if (task.getExecuteTime() <= nextScheduleTime) { //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中 cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); } } @Autowired private TaskinfoMapper taskinfoMapper; @Autowired private TaskinfoLogsMapper taskinfoLogsMapper; /** * 添加任务到数据库中 * * @param task * @return */ private boolean addTaskToDb(Task task) { boolean flag = false; try { //保存任务表 Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo); //设置taskID task.setTaskId(taskinfo.getTaskId()); //保存任务日志数据 TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs); flag = true; } catch (Exception e) { e.printStackTrace(); } return flag; } }
ScheduleConstants常量类
package com.heima.common.constants; public class ScheduleConstants { //task状态 public static final int SCHEDULED=0; //初始化状态 public static final int EXECUTED=1; //已执行状态 public static final int CANCELLED=2; //已取消状态 public static String FUTURE="future_"; //未来数据key前缀 public static String TOPIC="topic_"; //当前数据key前缀 }
④:测试
6、取消任务
在TaskService中添加方法
/** * 取消任务 * @param taskId 任务id * @return 取消结果 */ public boolean cancelTask(long taskId);
实现
/** * 取消任务 * @param taskId * @return */ @Override public boolean cancelTask(long taskId) { boolean flag = false; //删除任务,更新日志 Task task = updateDb(taskId,ScheduleConstants.EXECUTED); //删除redis的数据 if(task != null){ removeTaskFromCache(task); flag = true; } return false; } /** * 删除redis中的任务数据 * @param task */ private void removeTaskFromCache(Task task) { String key = task.getTaskType()+"_"+task.getPriority(); if(task.getExecuteTime()<=System.currentTimeMillis()){ cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); }else { cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task)); } } /** * 删除任务,更新任务日志状态 * @param taskId * @param status * @return */ private Task updateDb(long taskId, int status) { Task task = null; try { //删除任务 taskinfoMapper.deleteById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); }catch (Exception e){ log.error("task cancel exception taskid={}",taskId); } return task; }
测试
7、消费任务
在TaskService中添加方法
/** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ public Task poll(int type,int priority);
实现
/** * 按照类型和优先级拉取任务 * @return */ @Override public Task poll(int type,int priority) { Task task = null; try { String key = type+"_"+priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)){ task = JSON.parseObject(task_json, Task.class); //更新数据库信息 updateDb(task.getTaskId(),ScheduleConstants.EXECUTED); } }catch (Exception e){ e.printStackTrace(); log.error("poll task exception"); } return task; }
8、未来数据定时刷新
1.reids key值匹配
方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞。
方案2:scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
代码案例:
@Test public void testKeys(){ Set<String> keys = cacheService.keys("future_*"); System.out.println(keys); Set<String> scan = cacheService.scan("future_*"); System.out.println(scan); }
2.reids管道
普通redis客户端和服务器交互模式
Pipeline请求模型
官方测试结果数据对比
测试案例对比:
//耗时6151 @Test public void testPiple1(){ long start =System.currentTimeMillis(); for (int i = 0; i <10000 ; i++) { Task task = new Task(); task.setTaskType(1001); task.setPriority(1); task.setExecuteTime(new Date().getTime()); cacheService.lLeftPush("1001_1", JSON.toJSONString(task)); } System.out.println("耗时"+(System.currentTimeMillis()- start)); } @Test public void testPiple2(){ long start = System.currentTimeMillis(); //使用管道技术 List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { for (int i = 0; i <10000 ; i++) { Task task = new Task(); task.setTaskType(1001); task.setPriority(1); task.setExecuteTime(new Date().getTime()); redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes()); } return null; } }); System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒"); }
3.未来数据定时刷新-功能完成
在TaskService中添加方法
@Scheduled(cron = "0 */1 * * * ?") public void refresh() { System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务"); // 获取所有未来数据集合的key值 Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_* for (String futureKey : futureKeys) { // future_250_250 String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; //获取该组key下当前需要消费的任务数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { //将这些任务数据添加到消费者队列中 cacheService.refreshWithPipeline(futureKey, topicKey, tasks); System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下"); } } }
在引导类中添加开启任务调度注解:@EnableScheduling
9、分布式锁解决集群下的方法抢占执行
1.问题描述
启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法
2.分布式锁
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
解决方案:
3.redis分布式锁
sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
- 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
- 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
- 客户端A执行代码完成,删除锁
- 客户端B在等待一段时间后再去请求设置key的值,设置成功
- 客户端B执行代码完成,删除锁
4.在工具类CacheService中添加方法
/** * 加锁 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //参考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; }
修改未来数据定时刷新的方法,如下:
/** * 未来数据定时刷新 */ @Scheduled(cron = "0 */1 * * * ?") public void refresh(){ String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)){ log.info("未来数据定时刷新---定时任务"); //获取所有未来数据的集合key Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {//future_100_50 //获取当前数据的key topic String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1]; //按照key和分值查询符合条件的数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); //同步数据 if(!tasks.isEmpty()){ cacheService.refreshWithPipeline(futureKey,topicKey,tasks); log.info("成功的将"+futureKey+"刷新到了"+topicKey); } } } }
10、数据库同步到redis
@Scheduled(cron = "0 */5 * * * ?") @PostConstruct public void reloadData() { clearCache(); log.info("数据库数据同步到缓存"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); //查看小于未来5分钟的所有任务 List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime())); if(allTasks != null && allTasks.size() > 0){ for (Taskinfo taskinfo : allTasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo,task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } } private void clearCache(){ // 删除缓存中未来数据集合和当前消费者队列的所有key Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_ Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_ cacheService.delete(futurekeys); cacheService.delete(topickeys); }
三、延迟队列解决精准时间发布文章
1、延迟队列服务提供对外接口
提供远程的feign接口,在heima-leadnews-feign-api编写类如下:
package com.heima.apis.schedule; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @FeignClient("leadnews-schedule") public interface IScheduleClient { /** * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task); /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskId); /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority); }
在heima-leadnews-schedule微服务下提供对应的实现
package com.heima.schedule.feign; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import com.heima.schedule.service.TaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController public class ScheduleClient implements IScheduleClient { @Autowired private TaskService taskService; /** * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); } /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") @Override public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); } }
2、发布文章集成添加延迟队列接口
在创建WmNewsTaskService
package com.heima.wemedia.service; import com.heima.model.wemedia.pojos.WmNews; public interface WmNewsTaskService { /** * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ public void addNewsToTask(Integer id, Date publishTime); }
实现:
package com.heima.wemedia.service.impl; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.enums.TaskTypeEnum; import com.heima.model.schedule.dtos.Task; import com.heima.model.wemedia.pojos.WmNews; import com.heima.utils.common.ProtostuffUtil; import com.heima.wemedia.service.WmNewsTaskService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service @Slf4j public class WmNewsTaskServiceImpl implements WmNewsTaskService { @Autowired private IScheduleClient scheduleClient; /** * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ @Override @Async public void addNewsToTask(Integer id, Date publishTime) { log.info("添加任务到延迟服务中----begin"); Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任务到延迟服务中----end"); } }
枚举类:
package com.heima.model.common.enums; import lombok.AllArgsConstructor; import lombok.Getter; @Getter @AllArgsConstructor public enum TaskTypeEnum { NEWS_SCAN_TIME(1001, 1,"文章定时审核"), REMOTEERROR(1002, 2,"第三方接口调用失败,重试"); private final int taskType; //对应具体业务 private final int priority; //业务不同级别 private final String desc; //描述信息 }
序列化工具对比
- JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
- Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类
拷贝资料中的两个类到heima-leadnews-utils下
JdkSerializeUtil.java
package com.heima.utils.common; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; /** * jdk序列化 */ public class JdkSerializeUtil { /** * 序列化 * @param obj * @param <T> * @return */ public static <T> byte[] serialize(T obj) { if (obj == null){ throw new NullPointerException(); } ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); return bos.toByteArray(); } catch (Exception ex) { ex.printStackTrace(); } return new byte[0]; } /** * 反序列化 * @param data * @param clazz * @param <T> * @return */ public static <T> T deserialize(byte[] data, Class<T> clazz) { ByteArrayInputStream bis = new ByteArrayInputStream(data); try { ObjectInputStream ois = new ObjectInputStream(bis); T obj = (T)ois.readObject(); return obj; } catch (Exception ex) { ex.printStackTrace(); } return null; } }
ProtostuffUtil.java
package com.heima.utils.common; import com.heima.model.wemedia.pojos.WmNews; import io.protostuff.LinkedBuffer; import io.protostuff.ProtostuffIOUtil; import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; public class ProtostuffUtil { /** * 序列化 * @param t * @param <T> * @return */ public static <T> byte[] serialize(T t){ Schema schema = RuntimeSchema.getSchema(t.getClass()); return ProtostuffIOUtil.toByteArray(t,schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)); } /** * 反序列化 * @param bytes * @param c * @param <T> * @return */ public static <T> T deserialize(byte []bytes,Class<T> c) { T t = null; try { t = c.newInstance(); Schema schema = RuntimeSchema.getSchema(t.getClass()); ProtostuffIOUtil.mergeFrom(bytes,t,schema); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return t; } /** * jdk序列化与protostuff序列化对比 * @param args */ public static void main(String[] args) { long start =System.currentTimeMillis(); for (int i = 0; i <1000000 ; i++) { WmNews wmNews =new WmNews(); JdkSerializeUtil.serialize(wmNews); } System.out.println(" jdk 花费 "+(System.currentTimeMillis()-start)); start =System.currentTimeMillis(); for (int i = 0; i <1000000 ; i++) { WmNews wmNews =new WmNews(); ProtostuffUtil.serialize(wmNews); } System.out.println(" protostuff 花费 "+(System.currentTimeMillis()-start)); } }
Protostuff需要引导依赖:
<dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.6.0</version> </dependency>
修改发布文章代码:
把之前的异步调用修改为调用延迟任务
@Autowired private WmNewsTaskService wmNewsTaskService; /** * 发布修改文章或保存为草稿 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) { //0.条件判断 if(dto == null || dto.getContent() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //1.保存或修改文章 WmNews wmNews = new WmNews(); //属性拷贝 属性名词和类型相同才能拷贝 BeanUtils.copyProperties(dto,wmNews); //封面图片 list---> string if(dto.getImages() != null && dto.getImages().size() > 0){ //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg String imageStr = StringUtils.join(dto.getImages(), ","); wmNews.setImages(imageStr); } //如果当前封面类型为自动 -1 if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){ wmNews.setType(null); } saveOrUpdateWmNews(wmNews); //2.判断是否为草稿 如果为草稿结束当前方法 if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){ return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } //3.不是草稿,保存文章内容图片与素材的关系 //获取到文章内容中的图片信息 List<String> materials = ectractUrlInfo(dto.getContent()); saveRelativeInfoForContent(materials,wmNews.getId()); //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片 saveRelativeInfoForCover(dto,wmNews,materials); //审核文章 // wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime()); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
3、消费任务进行审核文章
WmNewsTaskService中添加方法
/** * 消费延迟队列数据 */ public void scanNewsByTask();
实现
@Autowired private WmNewsAutoScanServiceImpl wmNewsAutoScanService; /** * 消费延迟队列数据 */ @Scheduled(fixedRate = 1000) @Override @SneakyThrows public void scanNewsByTask() { log.info("文章审核---消费任务执行---begin---"); ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if(responseResult.getCode().equals(200) && responseResult.getData() != null){ String json_str = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(json_str, Task.class); byte[] parameters = task.getParameters(); WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class); System.out.println(wmNews.getId()+"-----------"); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); } log.info("文章审核---消费任务执行---end---"); }
到此这篇关于redis实现延迟任务的项目实践的文章就介绍到这了,更多相关redis 延迟任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Redis分布式锁python-redis-lock使用方法
这篇文章主要介绍了Redis分布式锁python-redis-lock使用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2020-11-11详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作
这篇文章主要介绍了详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作 ,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。2016-12-12
最新评论