PowerJob LockService方法工作流程源码解读
序
本文主要研究一下PowerJob的LockService
LockService
tech/powerjob/server/extension/LockService.java
public interface LockService { /** * 上锁(获取锁),立即返回,不会阻塞等待锁 * @param name 锁名称 * @param maxLockTime 最长持有锁的时间,单位毫秒(ms) * @return true -> 获取到锁,false -> 未获取到锁 */ boolean tryLock(String name, long maxLockTime); /** * 释放锁 * @param name 锁名称 */ void unlock(String name); }
LockService接口定义了tryLock、unlock方法
DatabaseLockService
tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java
@Slf4j @Service public class DatabaseLockService implements LockService { private final String ownerIp; private final OmsLockRepository omsLockRepository; @Autowired public DatabaseLockService(OmsLockRepository omsLockRepository) { this.ownerIp = NetUtils.getLocalHost(); this.omsLockRepository = omsLockRepository; Runtime.getRuntime().addShutdownHook(new Thread(() -> { int num = omsLockRepository.deleteByOwnerIP(ownerIp); log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num); })); } @Override public boolean tryLock(String name, long maxLockTime) { OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime); try { omsLockRepository.saveAndFlush(newLock); return true; } catch (DataIntegrityViolationException ignore) { } catch (Exception e) { log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e); } OmsLockDO omsLockDO = omsLockRepository.findByLockName(name); long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime(); // 锁超时,强制释放锁并重新尝试获取 if (lockedMillions > omsLockDO.getMaxLockTime()) { log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO); unlock(name); return tryLock(name, maxLockTime); } return false; } @Override public void unlock(String name) { try { CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name)); }catch (Exception e) { log.error("[DatabaseLockService] unlock {} failed.", name, e); } } }
DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName
NetUtils.getLocalHost
tech/powerjob/common/utils/NetUtils.java
public static String getLocalHost() { if (HOST_ADDRESS != null) { return HOST_ADDRESS; } String addressFromJVM = System.getProperty(PowerJobDKey.BIND_LOCAL_ADDRESS); if (StringUtils.isNotEmpty(addressFromJVM)) { log.info("[Net] use address from[{}]: {}", PowerJobDKey.BIND_LOCAL_ADDRESS, addressFromJVM); return HOST_ADDRESS = addressFromJVM; } InetAddress address = getLocalAddress(); if (address != null) { return HOST_ADDRESS = address.getHostAddress(); } return LOCALHOST_VALUE; } public static InetAddress getLocalAddress() { if (LOCAL_ADDRESS != null) { return LOCAL_ADDRESS; } InetAddress localAddress = getLocalAddress0(); LOCAL_ADDRESS = localAddress; return localAddress; } private static InetAddress getLocalAddress0() { // @since 2.7.6, choose the {@link NetworkInterface} first try { InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface()); if (addressOp != null) { return addressOp; } } catch (Throwable e) { log.warn("[Net] getLocalAddress0 failed.", e); } InetAddress localAddress = null; try { localAddress = InetAddress.getLocalHost(); Optional<InetAddress> addressOp = toValidAddress(localAddress); if (addressOp.isPresent()) { return addressOp.get(); } } catch (Throwable e) { log.warn("[Net] getLocalAddress0 failed.", e); } return localAddress; }
NetUtils的getLocalHost先判断HOST_ADDRESS是否有值,有则直接返回,否则先从系统属性读取powerjob.network.local.address,读取不到则取LOCAL_ADDRESS,若LOCAL_ADDRESS为null则通过getLocalAddress0获取
OmsLockDO
tech/powerjob/server/persistence/remote/model/OmsLockDO.java
@Data @Entity @NoArgsConstructor @Table(uniqueConstraints = {@UniqueConstraint(name = "uidx01_oms_lock", columnNames = {"lockName"})}) public class OmsLockDO { @Id @GeneratedValue(strategy = GenerationType.AUTO, generator = "native") @GenericGenerator(name = "native", strategy = "native") private Long id; private String lockName; private String ownerIP; /** * 最长持有锁的时间 */ private Long maxLockTime; private Date gmtCreate; private Date gmtModified; public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) { this.lockName = lockName; this.ownerIP = ownerIP; this.maxLockTime = maxLockTime; this.gmtCreate = new Date(); this.gmtModified = this.gmtCreate; } }
OmsLockDO定义lockName为唯一索引,它还定义了ownerIP、maxLockTime
OmsLockRepository
tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> { @Modifying @Transactional(rollbackOn = Exception.class) @Query(value = "delete from OmsLockDO where lockName = ?1") int deleteByLockName(String lockName); OmsLockDO findByLockName(String lockName); @Modifying @Transactional(rollbackOn = Exception.class) int deleteByOwnerIP(String ip); }
OmsLockRepository继承了JpaRepository,它定义了deleteByLockName、findByLockName、deleteByOwnerIP方法
小结
LockService接口定义了tryLock、unlock方法;DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName。
以上就是PowerJob LockService方法工作流程源码解读的详细内容,更多关于PowerJob LockService的资料请关注脚本之家其它相关文章!
相关文章
Spring Boot整合mybatis使用注解实现动态Sql、参数传递等常用操作(实现方法)
这篇文章主要介绍了Spring Boot整合mybatis使用注解实现动态Sql、参数传递等常用操作(实现方法),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-08-08详解使用Spring的BeanPostProcessor优雅的实现工厂模式
这篇文章主要介绍了详解使用Spring的BeanPostProcessor优雅的实现工厂模式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-07-07关于idea引入spring boot <parent></parent>父依赖标红问题
这篇文章主要介绍了idea引入spring boot <parent></parent>父依赖标红问题,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-10-10
最新评论