专业微信网站建设价格,玉田网站建设,中小企业网站推广,装修公司加盟十大品牌排行榜分布式定时任务1.为什么需要定时任务#xff1f;2.数据库实现分布式定时任务3.基于redis实现1.为什么需要定时任务#xff1f;
因为有时候我们需要定时的执行一些操作#xff0c;比如业务中产生的一些临时文件#xff0c;临时文件不能立即删除#xff0c;因为不清楚用户是…
分布式定时任务1.为什么需要定时任务2.数据库实现分布式定时任务3.基于redis实现1.为什么需要定时任务
因为有时候我们需要定时的执行一些操作比如业务中产生的一些临时文件临时文件不能立即删除因为不清楚用户是否操作完毕不能立即删除需要隔一段时间然后定时清楚还有像是一些电商项目每月进行数据清算。比如某些业务的排行榜实时性不是高的也可以使用定时任务去统计然后在做更新。但是我们现在大多数的应用都是分布式的相当于你写的一个定时任务会在多个子系统中运行而且是同时的我们只需要其中一个任务运行就可以了如果多次运行不仅会无故消耗系统资源还会导致任务执行出现意外那么怎么保证这个任务只执行一次呢其实解决方案有很多。
分布式任务执行出现的问题如下图所示 使用数据库唯一约束加锁使用redis的setNX命令使用分布式框架QuartzTBScheduleelastic-jobSaturnxxl-job等
当然技术是为业务服务的我们怎么选择合适的技术还得是看业务场景比如一些任务的执行频率不高也不是特别要求效率也不复杂我们完全用不上为了一个定时任务去引入一些第三方的框架作为定时任务实现我们来介绍两种方式来实现分布式定时任务。
2.数据库实现分布式定时任务
数据库实现定时任务的核心思路
需要两张表一张定时任务配置表还有一张定时任务运行记录表任务配置表有一个唯一约束字段运行记录表由运行日期任务名称作为唯一约束这是实现的核心思路使用注解aop对定时任务进行代理统一进行加锁操作避免多次运行这种适合任务不频繁一天在某个时间点执行对性能要求不高的业务场景实现起来也比较简单
表SQL语句
-- 任务运行记录表
CREATE TABLE task_record (ID varchar(20) NOT NULL COMMENT ID,start_time datetime DEFAULT NULL COMMENT 定时任务开始时间,ent_time datetime DEFAULT NULL COMMENT 定时任务结束时间,is_success varchar(1) DEFAULT NULL COMMENT 是否执行成功,error_cause longtext COMMENT 失败原因,task_name varchar(100) NOT NULL COMMENT 任务名称,run_date varchar(6) DEFAULT NULL COMMENT 运行日期,PRIMARY KEY (ID),UNIQUE KEY run_date_task_name_idx (run_date,task_name) USING BTREE COMMENT 运行日期任务名称作为唯一约束
) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT定时任务运行记录表;-- 任务配置表
CREATE TABLE task_config (id varchar(32) NOT NULL COMMENT 序号,task_describe varchar(225) DEFAULT NULL COMMENT 任务描述,task_name varchar(100) DEFAULT NULL COMMENT 任务名称,task_valid varchar(1) DEFAULT NULL COMMENT 任务有效标志,create_time datetime DEFAULT NULL COMMENT 创建时间,PRIMARY KEY (id),UNIQUE KEY task_index (task_name) COMMENT 唯一性约束
) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT定时任务配置表;1.定时任务标识注解
/*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* author compass* date 2023-03-09* since 1.0**/
Target(ElementType.METHOD)
Retention(RetentionPolicy.RUNTIME)
Documented
public interface DatabaseLock {//定时任务使用的键千万不要重复String lockName() default ;//定时任务描述String lockDesc() default ;
}2.使用aop代理定时任务方法进行拦截
/*** 代理具体的定时任务仅有一个任务会被成功执行** author compass* date 2023-03-09* since 1.0**/
Aspect
Slf4j
Component
public class DatabaseLockAspect {Resourceprivate TaskService taskService;private static final String TASK_IS_VALID 1;Around(annotation( com.springboot.example.annotation.DatabaseLock))public Object cacheLockPoint(ProceedingJoinPoint pjp) {Date startTime new Date();TaskRecord taskRecord new TaskRecord();String isRunSuccess 1;String taskConfigId;String errorCause ;Boolean currentDayRunRecord ;Method cacheMethod null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null ! method.getAnnotation(DatabaseLock.class)) {cacheMethod method;break;}}if (cacheMethod ! null) {String lockName cacheMethod.getAnnotation(DatabaseLock.class).lockName();String lockDesc cacheMethod.getAnnotation(DatabaseLock.class).lockDesc();// 运行主键避免多次运行核心关键String runDate DateUtil.format(new Date(), yyyyMMdd);String taskRecordId IdUtil.getSnowflakeNextIdStr();taskRecord.setTaskName(lockName);taskRecord.setId(taskRecordId);taskRecord.setRunDate(runDate);if (StringUtils.isBlank(lockName)) {throw new RuntimeException(定时任务锁名称不能为空);}if (StringUtils.isBlank(lockDesc)) {throw new RuntimeException(定时任务锁描述不能为空);}TaskConfig taskConfig taskService.hasRun(lockName);// 还未运行过进行初始化处理if (taskConfig null) {TaskConfig config new TaskConfig();taskConfigId IdUtil.getSnowflakeNextIdStr();config.setId(taskConfigId);config.setTaskDescribe(lockDesc);config.setTaskName(lockName);config.setTaskValid(1);config.setCreateTime(new Date());try {// 添加时出现异常已经运行过该定时任务taskService.addTaskConfig(config);taskConfig config;} catch (Exception e) {e.printStackTrace();}// 有效标志位0表示无需执行} else if (!TASK_IS_VALID.equals(taskConfig.getTaskValid())) {String message 该定时任务已经禁用;log.warn(method:{}未获取锁:{}[运行失败原因:{}], cacheMethod, lockName, message);throw new RuntimeException(String.format(method:%s未获取锁:%s[运行失败原因:%s], cacheMethod, lockName, message));}// 添加运行记录以runKey为唯一标识插入异常说明执行过try {currentDayRunRecord taskService.addCurrentDayRunRecord(taskRecord);} catch (Exception e) {log.warn(method:{}未获取锁:{}[运行失败原因:已经有别的服务进行执行], cacheMethod, lockName);return null;}// 没有执行过开始执行if (currentDayRunRecord) {try {log.warn(method:{}获取锁:{},运行成功, cacheMethod, lockName);return pjp.proceed();} catch (Throwable e) {e.printStackTrace();isRunSuccess 0;errorCause ExceptionUtils.getExceptionDetail(e);} finally {Date endTime new Date();taskRecord.setStartTime(startTime);taskRecord.setId(IdUtil.getSnowflakeNextIdStr());taskRecord.setEntTime(endTime);taskRecord.setIsSuccess(isRunSuccess);taskRecord.setErrorCause(errorCause);// 修改运行记录taskService.updateTaskRunRecord(taskRecord);}}}return null;}
}3.TaskService实现操作数据库接口与实现
public interface TaskService {/*** 判断定时任务是否运行过** param taskName* return com.springboot.example.bean.task.TaskConfig* author compass* date 2023/3/10 21:22* since 1.0.0**/TaskConfig hasRun(String taskName);/*** 将首次运行的任务添加到任务配置表** param taskConfig* return java.lang.Boolean* author compass* date 2023/3/10 21:23* since 1.0.0**/Boolean addTaskConfig(TaskConfig taskConfig);/*** 更新定时任务运行记录** param taskRecord* return java.lang.Boolean* author compass* date 2023/3/10 21:23* since 1.0.0**/Boolean updateTaskRunRecord(TaskRecord taskRecord);/*** 新增一条运行记录只有新增成功的服务才可以得到运行劝* param taskRecord* return java.lang.Boolean* author compass* date 2023/3/10 21:23* since 1.0.0**/Boolean addCurrentDayRunRecord(TaskRecord taskRecord);
}Slf4j
Service
public class TaskServiceImpl implements TaskService {Resourceprivate TaskConfigMapper taskConfigMapper;Resourceprivate TaskRecordMapper taskRecordMapper;Overridepublic TaskConfig hasRun(String taskName) {QueryWrapperTaskConfig wrapper new QueryWrapper();wrapper.eq(task_name,taskName);return taskConfigMapper.selectOne(wrapper);}Overridepublic Boolean addTaskConfig(TaskConfig taskConfig) {return taskConfigMapper.insert(taskConfig)0;}Overridepublic Boolean updateTaskRunRecord(TaskRecord taskRecord ) {QueryWrapperTaskRecord wrapper new QueryWrapper();wrapper.eq(task_name,taskRecord.getTaskName());wrapper.eq(run_date,taskRecord.getRunDate());return taskRecordMapper.update(taskRecord,wrapper)0;}Overridepublic Boolean addCurrentDayRunRecord(TaskRecord taskRecord) {return taskRecordMapper.insert(taskRecord)0;}
}4.数据库对应的实体类
// 配置类
Data
TableName(task_config)
public class TaskConfig {/*** 序号*/TableId(value id, type IdType.ASSIGN_ID)private String id;/*** 任务描述*/private String taskDescribe;/*** 任务名称*/private String taskName;/*** 任务有效标志*/private String taskValid;/*** 创建时间*/private Date createTime;}
// 运行记录类
Data
TableName(task_record)
public class TaskRecord {/*** ID*/TableId(value id, type IdType.ASSIGN_ID)private String id;/*** 定时任务开始时间*/private Date startTime;/*** 定时任务结束时间*/private Date entTime;/*** 是否执行成功*/private String isSuccess;/*** 失败原因*/private String errorCause;/*** 运行日期[yyyyMMdd]*/private String runDate;/*** 任务名称(任务名称运行日期为唯一索引)*/private String taskName;}
3.基于redis实现
主要是基于setNX来实现的setNX表示这个key存在则设置value失败只有这个key不存在的时候才会set成功我们可以给这个key指定过期时间让他一定会释放锁不然容易出现死锁的情况
1.操作redis锁的工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** redis锁工具类如果redis是集群的话需要考虑数据延时性这里默认为redis单个节点** author compass* date 2023-03-10* since 1.0**/
SuppressWarnings(value {all})
Component
public class RedisLockUtils {Autowiredprivate RedisTemplateString, Object redisTemplate;/*** 加锁** param key* param value* param time* param timeUnit* return boolean* author compass* date 2023/3/10 22:13* since 1.0.0**/public boolean lock(String key, String value, long time, TimeUnit timeUnit) {return (Boolean)redisTemplate.execute((RedisCallback) connection - {Boolean setNX connection.setNX(key.getBytes(), value.getBytes());if (setNX){return connection.expire(key.getBytes(),time);}return false;});}/*** 立即释放锁如果任务执行的非常快可能会导致其他应用获得到锁二次执行** param key* return boolean* author compass* date 2023/3/10 22:13* since 1.0.0**/public boolean fastReleaseLock(String key) {return redisTemplate.delete(key);}/*** 缓慢释放锁(隔离小段时间再释放锁,可以完全避免掉别的应用获取到锁)** param key* param time* param timeUnit* return boolean* author compass* date 2023/3/10 22:13* since 1.0.0**/public boolean turtleReleaseLock(String key, long time, TimeUnit timeUnit) {return redisTemplate.expire(key, time, timeUnit);}}2.aop切入统一管理定时任务
import com.springboot.example.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;/*** 代理具体的定时任务仅有一个任务会被成功执行** author compass* date 2023-03-09* since 1.0**/
Aspect
Slf4j
Component
public class CacheLockAspect {Resourceprivate RedisLockUtils redisLockUtils;/*** 加锁值可以是任意值**/private static final String LOCK_VALUE 1;Around(annotation(com.springboot.example.annotation.CacheLock))public Object cacheLockPoint(ProceedingJoinPoint pjp) {Method cacheMethod null;for (Method method : pjp.getTarget().getClass().getMethods()) {if (null ! method.getAnnotation(CacheLock.class)) {cacheMethod method;break;}}if (cacheMethod!null){CacheLock cacheLock cacheMethod.getAnnotation(CacheLock.class);String lockName cacheLock.lockName();long time cacheLock.timeOut();boolean successLock redisLockUtils.lock(lockName,LOCK_VALUE, time, TimeUnit.SECONDS);if (successLock){log.info(method:{}获取锁成功:{}, cacheMethod, lockName);try {// 获得锁调用被代理的定时任务return pjp.proceed();} catch (Throwable throwable) {throwable.printStackTrace();}finally {// 延时5秒再去释放锁redisLockUtils.turtleReleaseLock(lockName,5,TimeUnit.SECONDS);}}else {log.warn(method:{}获取锁失败:{}, cacheMethod, lockName);}}return null;}
}3.自定义注解 /*** 标注在定时任务上,避免多个微服务的情况下定时任务执行重复* author compass* date 2023-03-09* since 1.0**/
Target(ElementType.METHOD)
Retention(RetentionPolicy.RUNTIME)
Documented
public interface CacheLock {//定时任务使用的键千万不要重复String lockName() ;// 占用锁的时间,单位是秒默认10分钟long timeOut() default 60*10;
}今天就先介绍这两种方式后续我再为大家续上使用别的框架进行实现不过在实现的过程中使用 redisTemplate.opsForValue().setIfAbsent() 出现了一点小肯他返回的是null值然后出现空指针然后我不得不采用execute的方式去执行。