25 changed files with 1451 additions and 18 deletions
@ -0,0 +1,39 @@ |
|||
package com.kms.yxgh.common.config; |
|||
|
|||
|
|||
import com.shuili.common.utils.StringUtils; |
|||
import org.redisson.Redisson; |
|||
import org.redisson.api.RedissonClient; |
|||
import org.redisson.config.Config; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
@Configuration |
|||
public class RedissonConfig { |
|||
|
|||
|
|||
@Value("${spring.redis.host}") |
|||
private String redisHost; |
|||
|
|||
@Value("${spring.redis.port}") |
|||
private int redisPort; |
|||
|
|||
@Value("${spring.redis.password}") |
|||
private String redisPassword; |
|||
|
|||
@Value("${spring.redis.database:0}") |
|||
private int redisDatabase; |
|||
|
|||
@Bean |
|||
public RedissonClient redissonClient() { |
|||
Config config = new Config(); |
|||
config.useSingleServer() |
|||
.setAddress("redis://" + redisHost + ":" + redisPort) |
|||
.setDatabase(redisDatabase); |
|||
if (StringUtils.isNotBlank(redisPassword)) { |
|||
config.useSingleServer().setPassword(redisPassword); |
|||
} |
|||
return Redisson.create(config); |
|||
} |
|||
} |
@ -0,0 +1,59 @@ |
|||
package com.kms.yxgh.common.domain; |
|||
|
|||
import com.baomidou.mybatisplus.annotation.TableField; |
|||
import com.baomidou.mybatisplus.annotation.TableName; |
|||
import com.fasterxml.jackson.annotation.JsonFormat; |
|||
import com.kms.yxgh.base.SyBaseEntity; |
|||
import io.swagger.annotations.ApiModel; |
|||
import io.swagger.annotations.ApiModelProperty; |
|||
import lombok.Data; |
|||
import lombok.ToString; |
|||
|
|||
import java.util.Date; |
|||
|
|||
/** |
|||
* 定时任务实体类对应 bs_sgc_job_task 表 |
|||
*/ |
|||
@TableName("bs_sgc_job_task") |
|||
@Data |
|||
@ApiModel("定时任务") |
|||
@ToString(callSuper = true) |
|||
public class JobTask extends SyBaseEntity { |
|||
|
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
@ApiModelProperty("任务名称") |
|||
private String name; |
|||
|
|||
@ApiModelProperty("业务类型") |
|||
private String businessType; |
|||
|
|||
@ApiModelProperty("业务id") |
|||
private String businessId; |
|||
|
|||
@ApiModelProperty("任务状态") |
|||
private String status; |
|||
|
|||
@ApiModelProperty("任务参数") |
|||
@TableField(value = "params") |
|||
private String jobParams; |
|||
|
|||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
|||
@ApiModelProperty("上次开始时间") |
|||
private Date lastStartTime; |
|||
|
|||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
|||
@ApiModelProperty("上次结束时间") |
|||
private Date lastEndTime; |
|||
|
|||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
|||
@ApiModelProperty("下次开始时间") |
|||
private Date nextStartTime; |
|||
|
|||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
|||
@ApiModelProperty("任务结束时间") |
|||
private Date taskEndTime; |
|||
|
|||
@TableField(exist = false) |
|||
private String remark; |
|||
} |
@ -0,0 +1,78 @@ |
|||
package com.kms.yxgh.common.job; |
|||
|
|||
import com.baomidou.mybatisplus.core.conditions.Wrapper; |
|||
import com.baomidou.mybatisplus.core.metadata.IPage; |
|||
import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
|||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.kms.yxgh.common.job.event.StartJobEvent; |
|||
import com.kms.yxgh.common.service.JobTaskService; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.context.ApplicationContext; |
|||
import org.springframework.scheduling.annotation.EnableAsync; |
|||
import org.springframework.scheduling.annotation.EnableScheduling; |
|||
import org.springframework.scheduling.annotation.Scheduled; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Calendar; |
|||
import java.util.Date; |
|||
|
|||
@EnableAsync |
|||
@EnableScheduling |
|||
@Slf4j |
|||
@Component |
|||
@AllArgsConstructor |
|||
public class JobScheduled { |
|||
|
|||
public final JobTaskService jobTaskService; |
|||
private final ApplicationContext applicationContext; |
|||
|
|||
@Scheduled(fixedRate = 1000 * 60) |
|||
public void run() { |
|||
log.info("start to run job"); |
|||
|
|||
Date currentTime = new Date(); |
|||
int page = 1; |
|||
IPage<JobTask> jobTaskList = getJobTaskList(page, currentTime); |
|||
while (!jobTaskList.getRecords().isEmpty()) { |
|||
jobTaskList.getRecords().forEach(this::addTask); |
|||
jobTaskList = getJobTaskList(page + 1, currentTime); |
|||
} |
|||
log.info("finish run job"); |
|||
} |
|||
|
|||
private void addTask(JobTask jobTask) { |
|||
applicationContext.publishEvent(new StartJobEvent(this, jobTask.getId())); |
|||
} |
|||
|
|||
|
|||
private IPage<JobTask> getJobTaskList(int page, Date currentTime) { |
|||
IPage<JobTask> pageQuery = new Page<>(page, 100); |
|||
Wrapper<JobTask> wrapper = Wrappers.<JobTask>lambdaQuery() |
|||
.eq(JobTask::getStatus, JobStatus.RUNNING.getCode()) |
|||
.le(JobTask::getNextStartTime, currentTime) |
|||
.gt(JobTask::getNextStartTime, getCurrentMidnight()) |
|||
.select(JobTask::getId) |
|||
.orderByAsc(JobTask::getNextStartTime); |
|||
return jobTaskService.page(pageQuery, wrapper); |
|||
} |
|||
|
|||
private Date getCurrentMidnight() { |
|||
Calendar calendar = Calendar.getInstance(); |
|||
calendar.set(Calendar.HOUR_OF_DAY, 0); |
|||
calendar.set(Calendar.MINUTE, 0); |
|||
calendar.set(Calendar.SECOND, 0); |
|||
calendar.set(Calendar.MILLISECOND, 0); |
|||
return calendar.getTime(); |
|||
} |
|||
|
|||
//每天凌晨1点执行,清理过期任务
|
|||
@Scheduled(cron = "0 0 1 * * ?") |
|||
public void clearExpireJob() { |
|||
log.info("start to clear expire job"); |
|||
jobTaskService.clearExpireJob(); |
|||
log.info("finish clear expire job"); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,33 @@ |
|||
package com.kms.yxgh.common.job; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
@Getter |
|||
public enum JobStatus { |
|||
/** |
|||
* 任务状态 |
|||
*/ |
|||
INIT("0", "初始化"), |
|||
RUNNING("1", "运行中"), |
|||
STOP("2", "停止"), |
|||
PAUSE("3", "暂停"), |
|||
FINISH("4", "完成"), |
|||
ERROR("5", "异常"); |
|||
|
|||
private final String code; |
|||
private final String desc; |
|||
|
|||
JobStatus(String code, String desc) { |
|||
this.code = code; |
|||
this.desc = desc; |
|||
} |
|||
|
|||
public static JobStatus getByCode(String code) { |
|||
for (JobStatus value : values()) { |
|||
if (value.code.equals(code)) { |
|||
return value; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
} |
@ -0,0 +1,18 @@ |
|||
package com.kms.yxgh.common.job.event; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.ToString; |
|||
import org.springframework.context.ApplicationEvent; |
|||
|
|||
@Getter |
|||
@ToString |
|||
public class StartJobEvent extends ApplicationEvent { |
|||
|
|||
private final String jobId; |
|||
|
|||
public StartJobEvent(Object source, String jobId) { |
|||
super(source); |
|||
this.jobId = jobId; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,26 @@ |
|||
package com.kms.yxgh.common.job.event; |
|||
|
|||
import com.kms.yxgh.common.service.JobTaskService; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.context.ApplicationListener; |
|||
import org.springframework.scheduling.annotation.Async; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
@AllArgsConstructor |
|||
public class StartJobEventListener implements ApplicationListener<StartJobEvent> { |
|||
|
|||
public final JobTaskService jobTaskService; |
|||
|
|||
@Async |
|||
@SneakyThrows |
|||
@Override |
|||
public void onApplicationEvent(StartJobEvent event) { |
|||
log.info("start to run job, job id: {}", event.getJobId()); |
|||
jobTaskService.runJob(event.getJobId()); |
|||
log.info("finish run job, job id: {}", event.getJobId()); |
|||
} |
|||
} |
@ -0,0 +1,159 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import cn.hutool.core.collection.CollectionUtil; |
|||
import cn.hutool.core.util.IdUtil; |
|||
import cn.hutool.http.HttpRequest; |
|||
import cn.hutool.http.HttpResponse; |
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.kms.system.service.SysUserService; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.shuili.common.core.domain.entity.SysUser; |
|||
import com.shuili.common.exception.CustomException; |
|||
import com.shuili.common.utils.StringUtils; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.codec.digest.DigestUtils; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
|
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
public abstract class AbstractSendMessageHandler implements JobHandler { |
|||
|
|||
//网关
|
|||
@Value("${water.gateway.paasToken:}") |
|||
private String gatewayPaasToken; |
|||
@Value("${water.gateway.paasId:}") |
|||
private String gatewayPaasId; |
|||
|
|||
//应用中台
|
|||
@Value("${water.center.paasToken:}") |
|||
private String centerPaasToken; |
|||
@Value("${water.center.paasId:}") |
|||
private String centerPaasId; |
|||
|
|||
@Value("${water.message.url:}") |
|||
private String messageUrl; |
|||
|
|||
@Autowired |
|||
private SysUserService sysUserService; |
|||
|
|||
public abstract List<MsgVo> getMessagesAndUpdateJobTask(JobTask jobTask); |
|||
|
|||
@Override |
|||
public JobTask runJob(JobTask jobTask) { |
|||
List<MsgVo> messages = getMessagesAndUpdateJobTask(jobTask); |
|||
sendMessage(messages); |
|||
return jobTask; |
|||
} |
|||
|
|||
|
|||
@Data |
|||
public static class MsgVo { |
|||
private String bizType;// 业务类型(由调用方选填,可填任意字符串,仅作为查询条件使用)
|
|||
private String content; // 消息内容
|
|||
private String contentType; // 内容类型(由调用方选填,可填任意字符串,仅作为查询条件使用)
|
|||
// description 粤政易消息类型为textcard时的描述,不超过512个字节,超过会自动截断
|
|||
//msgType 粤政易消息类型:text、文本,textcard、文本卡片
|
|||
//params 参数
|
|||
private String receiveNo; // 接收号码:类型为短信则为手机号,粤政易则为统一身份用户id,邮箱则为邮箱号码,站内信为用户主键id
|
|||
private String receiver; // 接收用户姓名
|
|||
// recordId 消息记录id
|
|||
//sendNo 发送号码:类型为短信则为手机号,粤政易则为统一身份用户id,邮箱则为邮箱号码,站内信为用户主键id
|
|||
private String sender; // 发送用户姓名
|
|||
//templateId 模板id//;
|
|||
// title 粤政易消息类型为textcard时或者消息类型为email的标题,不超过128个字节,超过会自动截断
|
|||
private String type = "inMail";// 消息类型:sms-短信,zwwx-旧粤政易,yzy-新粤政易,email-邮箱,inMail-站内信
|
|||
//url 粤政易消息类型为textcard时点击后跳转的链接
|
|||
//pcUrl PC端跳转URL
|
|||
//mobileUrl 移动端跳转URL
|
|||
// sendType 消息发送类型(关联订阅,数据字典:msg_send_type)
|
|||
} |
|||
|
|||
protected void setReceiver(MsgVo msgVo, String userId) { |
|||
if (StringUtils.isNotBlank(userId)) { |
|||
SysUser user = sysUserService.getById(userId); |
|||
if (user == null) { |
|||
log.error("用户不存在[{}]", userId); |
|||
return; |
|||
} |
|||
msgVo.setReceiveNo(user.getSingleUserId()); |
|||
msgVo.setReceiver(user.getNickName()); |
|||
} |
|||
} |
|||
|
|||
|
|||
private void sendMessage(List<MsgVo> messages) { |
|||
if (StringUtils.isNotBlank(messageUrl)) { |
|||
log.warn("消息发送地址为空"); |
|||
return; |
|||
} |
|||
if (CollectionUtil.isEmpty(messages)) { |
|||
log.warn("消息为空"); |
|||
return; |
|||
} |
|||
HttpRequest httpRequest = HttpRequest.post(messageUrl).body(JSON.toJSONString(messages)); |
|||
setHead(httpRequest); |
|||
try (HttpResponse response = httpRequest.execute()) { |
|||
if (!response.isOk()) { |
|||
String body = response.body(); |
|||
WaterResult waterResult = JSONObject.parseObject(body, WaterResult.class); |
|||
isSuccess(waterResult); |
|||
} else { |
|||
log.error("消息发送失败"); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("消息发送失败", e); |
|||
} |
|||
} |
|||
|
|||
private void isSuccess(WaterResult waterResult) { |
|||
if (waterResult == null) { |
|||
throw new CustomException("请求异常"); |
|||
} |
|||
if (waterResult.getCode() != 200 || !waterResult.getSuccess()) { |
|||
throw new CustomException("请求失败,原因:" + waterResult.getMessage()); |
|||
} |
|||
} |
|||
|
|||
private void setHead(HttpRequest httpRequest) { |
|||
String timestamp = String.valueOf(new Date().getTime()); |
|||
String nonce = IdUtil.fastSimpleUUID(); |
|||
String gatewaySignature = timestamp + gatewayPaasToken + nonce + timestamp; |
|||
String centerSignature = timestamp + centerPaasToken + nonce + timestamp; |
|||
try { |
|||
gatewaySignature = DigestUtils.sha256Hex(gatewaySignature).toUpperCase(); |
|||
centerSignature = DigestUtils.sha256Hex(centerSignature).toUpperCase(); |
|||
} catch (Exception e) { |
|||
log.error("签名失败", e); |
|||
} |
|||
httpRequest |
|||
.header("x-tif-paasid", gatewayPaasId) |
|||
.header("x-tif-signature", gatewaySignature) |
|||
.header("x-tif-timestamp", timestamp) |
|||
.header("x-tif-nonce", nonce) |
|||
.header("x-tsp-paasid", centerPaasId) |
|||
.header("x-tsp-signature", centerSignature) |
|||
.header("x-tsp-timestamp", timestamp) |
|||
.header("x-tsp-nonce", nonce); |
|||
|
|||
} |
|||
|
|||
@Data |
|||
@AllArgsConstructor |
|||
@NoArgsConstructor |
|||
public static class WaterResult { |
|||
|
|||
private int code; |
|||
private String message; |
|||
private Boolean success; |
|||
private String data; |
|||
private String timestamp; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,27 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
@Getter |
|||
public enum CycleType { |
|||
DAY("0", "每天"), |
|||
WEEK("1", "每周"), |
|||
MONTH("2", "每月"); |
|||
|
|||
private final String code; |
|||
private final String desc; |
|||
|
|||
CycleType(String code, String desc) { |
|||
this.code = code; |
|||
this.desc = desc; |
|||
} |
|||
|
|||
public static CycleType getByCode(String code) { |
|||
for (CycleType value : values()) { |
|||
if (value.code.equals(code)) { |
|||
return value; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
} |
@ -0,0 +1,89 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import cn.hutool.core.collection.CollectionUtil; |
|||
import cn.hutool.core.date.DateUtil; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.kms.yxgh.common.job.JobStatus; |
|||
import com.kms.yxgh.df.dto.DfAnimalPlanDto; |
|||
import com.kms.yxgh.df.service.DfAnimalPlanService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class DfAnimalPlanSendMessageHandler extends AbstractSendMessageHandler { |
|||
|
|||
@Autowired() |
|||
@Lazy |
|||
private DfAnimalPlanService dfAnimalPlanService; |
|||
|
|||
@Override |
|||
public List<MsgVo> getMessagesAndUpdateJobTask(JobTask jobTask) { |
|||
DfAnimalPlanDto dto = dfAnimalPlanService.getDetailById(jobTask.getBusinessId()); |
|||
if (dto == null || dto.getIsReminder() != 1 || CollectionUtil.isEmpty(dto.getOperators())) { |
|||
jobTask.setTaskEndTime(new Date()); |
|||
jobTask.setStatus(JobStatus.ERROR.getCode()); |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
if (isExpired(dto, new Date())) { |
|||
jobTask.setTaskEndTime(new Date()); |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
jobTask.setNextStartTime(getNextTime(dto, jobTask.getNextStartTime())); |
|||
boolean isFinish = isExpired(dto, jobTask.getNextStartTime()); |
|||
if (isFinish) { |
|||
jobTask.setTaskEndTime(new Date()); |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
} |
|||
|
|||
return dto.getOperators().stream().map(operator -> { |
|||
MsgVo msgVo = new MsgVo(); |
|||
msgVo.setBizType(jobTask.getBusinessType()); |
|||
msgVo.setContent("害堤动物防治计划<<" + dto.getName() + ">>即将开始,请及时执行!"); |
|||
msgVo.setContentType(jobTask.getBusinessId()); |
|||
setReceiver(msgVo, operator.getUid()); |
|||
return msgVo; |
|||
}).collect(Collectors.toList()); |
|||
} |
|||
|
|||
//计算下次执行时间
|
|||
private Date getNextTime(DfAnimalPlanDto dto, Date targetTime) { |
|||
CycleType cycleType = CycleType.getByCode(dto.getCycleType()); |
|||
if (cycleType == null) { |
|||
return targetTime; |
|||
} |
|||
switch (cycleType) { |
|||
case DAY: |
|||
return DateUtil.offsetDay(targetTime, 1); |
|||
case WEEK: |
|||
return DateUtil.offsetWeek(targetTime, 1); |
|||
case MONTH: |
|||
return DateUtil.offsetMonth(targetTime, 1); |
|||
default: |
|||
return targetTime; |
|||
} |
|||
|
|||
} |
|||
|
|||
//计划是否过期
|
|||
private boolean isExpired(DfAnimalPlanDto dto, Date targetTime) { |
|||
String currentYearMonth = DateUtil.format(targetTime, "yyyy-MM"); |
|||
String planYearMonth = DateUtil.format(dto.getPlanTime(), "yyyy-MM"); |
|||
return !currentYearMonth.equals(planYearMonth); |
|||
} |
|||
|
|||
@Override |
|||
public String type() { |
|||
return SendMessageType.DF_ANIMAL_PLAN.name(); |
|||
} |
|||
} |
@ -0,0 +1,133 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import cn.hutool.core.collection.CollectionUtil; |
|||
import cn.hutool.core.date.DateUtil; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.kms.yxgh.common.job.JobStatus; |
|||
import com.kms.yxgh.df.dto.DfCheckingPlanContentDto; |
|||
import com.kms.yxgh.df.dto.DfPlanDetailDto; |
|||
import com.kms.yxgh.df.service.DfPlanService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class DfCheckingPlanSendMessageHandler extends AbstractSendMessageHandler { |
|||
|
|||
@Autowired() |
|||
@Lazy |
|||
private DfPlanService dfPlanService; |
|||
|
|||
@Override |
|||
public List<MsgVo> getMessagesAndUpdateJobTask(JobTask jobTask) { |
|||
DfPlanDetailDto dto = dfPlanService.getDetailById(jobTask.getBusinessId()); |
|||
if (dto == null || CollectionUtil.isEmpty(dto.getContents())) { |
|||
log.error("DfCheckingPlanSendMessageHandler getMessagesAndUpdateJobTask error, dto is null, jobTask:{}", jobTask); |
|||
jobTask.setStatus(JobStatus.ERROR.getCode()); |
|||
return Collections.emptyList(); |
|||
} |
|||
Date currentTime = new Date(); |
|||
Optional<DfCheckingPlanContentDto> optional = dto.getContents().stream().filter(content -> content.getStartDate().before(currentTime) && content.getEndDate().after(currentTime)) |
|||
.findFirst(); |
|||
if (optional.isPresent()) { |
|||
DfCheckingPlanContentDto content = optional.get(); |
|||
if (CollectionUtil.isEmpty(content.getOperator())) { |
|||
return Collections.emptyList(); |
|||
} |
|||
Date nextTime = getNextTime(dto, jobTask.getNextStartTime()); |
|||
if (dto.getContents().stream().anyMatch(c -> c.getEndDate().after(nextTime))) { |
|||
jobTask.setNextStartTime(nextTime); |
|||
} else { |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
} |
|||
return content.getOperator().stream() |
|||
.map(operator -> { |
|||
MsgVo msgVo = new MsgVo(); |
|||
msgVo.setBizType(jobTask.getBusinessType()); |
|||
msgVo.setContent("堤防巡视检查计划<<" + content.getName() + ">>即将开始,请及时执行!\n 计划开始时间:" + content.getStartDate() + " \n计划结束时间:" + content.getEndDate()); |
|||
msgVo.setContentType(jobTask.getBusinessId()); |
|||
setReceiver(msgVo, operator.getUid()); |
|||
return msgVo; |
|||
}).collect(Collectors.toList()); |
|||
} else { |
|||
if (dto.getContents().stream().anyMatch(content -> content.getEndDate().after(currentTime))) { |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
} |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
} |
|||
|
|||
private Date getNextTime(DfPlanDetailDto dto, Date nextStartTime) { |
|||
CycleType cycleType = CycleType.getByCode(dto.getCycleType()); |
|||
if (cycleType == null) { |
|||
return nextStartTime; |
|||
} |
|||
Date currentTime = new Date(); |
|||
switch (cycleType) { |
|||
case DAY: |
|||
return DateUtil.offsetDay(nextStartTime, 1); |
|||
case WEEK: |
|||
//获取现在是周几
|
|||
int currentWeek = DateUtil.dayOfWeek(currentTime); |
|||
if (dto.getOtherConfig() == null) { |
|||
return nextStartTime; |
|||
} |
|||
List<Integer> weeks = dto.getOtherConfig().getReminderWeeks(); |
|||
if (weeks == null) { |
|||
return nextStartTime; |
|||
} |
|||
weeks = weeks.stream().sorted().collect(Collectors.toList()); |
|||
int netWeek = weeks.stream().filter(w -> w > currentWeek).findFirst().orElse(weeks.get(0)); |
|||
if (netWeek > currentWeek) { |
|||
return DateUtil.offsetDay(nextStartTime, netWeek - currentWeek); |
|||
} else { |
|||
return DateUtil.offsetDay(nextStartTime, 7 - currentWeek + netWeek); |
|||
} |
|||
|
|||
case MONTH: |
|||
Date currentDate = new Date(); |
|||
int currentMonthDays = getMonthDays(currentDate); |
|||
int currentDay = DateUtil.dayOfMonth(currentDate); |
|||
if (dto.getOtherConfig() == null) { |
|||
return nextStartTime; |
|||
} |
|||
List<Integer> days = dto.getOtherConfig().getReminderDays(); |
|||
if (days == null) { |
|||
return nextStartTime; |
|||
} |
|||
days = days.stream().sorted().collect(Collectors.toList()); |
|||
int nextDay = days.stream().filter(d -> d > currentDay && d <= currentMonthDays).findFirst().orElse(days.get(0)); |
|||
if (nextDay > currentDay) { |
|||
return DateUtil.offsetDay(nextStartTime, nextDay - currentDay); |
|||
} else { |
|||
return DateUtil.offsetDay(nextStartTime, currentMonthDays - currentDay + nextDay); |
|||
} |
|||
default: |
|||
return nextStartTime; |
|||
} |
|||
} |
|||
|
|||
//获取指定时间月份的天数
|
|||
private int getMonthDays(Date date) { |
|||
//当前是否闰年
|
|||
boolean isLeapYear = DateUtil.isLeapYear(DateUtil.year(date)); |
|||
//获取当前月份
|
|||
int currentMonth = DateUtil.month(date) + 1; |
|||
//获取当前月的天数
|
|||
return DateUtil.lengthOfMonth(currentMonth, isLeapYear); |
|||
} |
|||
|
|||
@Override |
|||
public String type() { |
|||
return SendMessageType.DF_CHECKING_PLAN.name(); |
|||
} |
|||
} |
@ -0,0 +1,54 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import com.kms.yxgh.common.ApprovalStatusEnum; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.kms.yxgh.common.job.JobStatus; |
|||
import com.kms.yxgh.df.dto.DfYhPlanDetailDto; |
|||
import com.kms.yxgh.df.service.DfYhPlanService; |
|||
import com.shuili.common.utils.StringUtils; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class DfYhPlanSendMessageHandler extends AbstractSendMessageHandler { |
|||
|
|||
@Autowired() |
|||
@Lazy |
|||
private DfYhPlanService dfYhPlanService; |
|||
|
|||
@Override |
|||
public List<MsgVo> getMessagesAndUpdateJobTask(JobTask jobTask) { |
|||
DfYhPlanDetailDto dto = dfYhPlanService.getDetailById(jobTask.getBusinessId()); |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
jobTask.setTaskEndTime(new Date()); |
|||
if (dto == null || dto.getResponsiblePerson() == null || StringUtils.isBlank(dto.getResponsiblePerson().getUid())) { |
|||
log.error("DfYhPlanSendMessageHandler getMessagesAndUpdateJobTask error, dto or responsiblePerson is null, jobTask:{}", jobTask); |
|||
jobTask.setStatus(JobStatus.ERROR.getCode()); |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
long currentTime = System.currentTimeMillis(); |
|||
if (dto.getStartDate().getTime() > currentTime || dto.getEndDate().getTime() < currentTime || !dto.getStatus().equals(ApprovalStatusEnum.PASS.getValue())) { |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
MsgVo msgVo = new MsgVo(); |
|||
msgVo.setBizType(jobTask.getBusinessType()); |
|||
msgVo.setContent("堤防维修养护计划<<" + dto.getName() + ">>即将开始,请及时执行!\n 计划开始时间:" + dto.getStartDate() + " \n计划结束时间:" + dto.getEndDate()); |
|||
msgVo.setContentType(jobTask.getBusinessId()); |
|||
setReceiver(msgVo, dto.getResponsiblePerson().getUid()); |
|||
return Collections.singletonList(msgVo); |
|||
} |
|||
|
|||
@Override |
|||
public String type() { |
|||
return SendMessageType.DF_YH_PLAN.name(); |
|||
} |
|||
} |
@ -0,0 +1,10 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
|
|||
public interface JobHandler { |
|||
|
|||
String type(); |
|||
|
|||
JobTask runJob(JobTask jobTask); |
|||
} |
@ -0,0 +1,12 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
@Getter |
|||
public enum SendMessageType { |
|||
DF_YH_PLAN, |
|||
SZ_YH_PLAN, |
|||
DF_ANIMAL_PLAN, |
|||
DF_CHECKING_PLAN, |
|||
SZ_CHECKING_PLAN, |
|||
} |
@ -0,0 +1,133 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import cn.hutool.core.collection.CollectionUtil; |
|||
import cn.hutool.core.date.DateUtil; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.kms.yxgh.common.job.JobStatus; |
|||
import com.kms.yxgh.sz.dto.SzCheckingPlanContentDto; |
|||
import com.kms.yxgh.sz.dto.SzPlanDetailDto; |
|||
import com.kms.yxgh.sz.service.SzPlanService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class SzCheckingPlanSendMessageHandler extends AbstractSendMessageHandler { |
|||
|
|||
@Autowired() |
|||
@Lazy |
|||
private SzPlanService dfPlanService; |
|||
|
|||
@Override |
|||
public List<MsgVo> getMessagesAndUpdateJobTask(JobTask jobTask) { |
|||
SzPlanDetailDto dto = dfPlanService.getDetailById(jobTask.getBusinessId()); |
|||
if (dto == null || CollectionUtil.isEmpty(dto.getContents())) { |
|||
log.error("SzCheckingPlanSendMessageHandler getMessagesAndUpdateJobTask error, dto is null, jobTask:{}", jobTask); |
|||
jobTask.setStatus(JobStatus.ERROR.getCode()); |
|||
return Collections.emptyList(); |
|||
} |
|||
Date currentTime = new Date(); |
|||
Optional<SzCheckingPlanContentDto> optional = dto.getContents().stream().filter(content -> content.getStartDate().before(currentTime) && content.getEndDate().after(currentTime)) |
|||
.findFirst(); |
|||
if (optional.isPresent()) { |
|||
SzCheckingPlanContentDto content = optional.get(); |
|||
if (CollectionUtil.isEmpty(content.getOperator())) { |
|||
return Collections.emptyList(); |
|||
} |
|||
Date nextTime = getNextTime(dto, jobTask.getNextStartTime()); |
|||
if (dto.getContents().stream().anyMatch(c -> c.getEndDate().after(nextTime))) { |
|||
jobTask.setNextStartTime(nextTime); |
|||
} else { |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
} |
|||
return content.getOperator().stream() |
|||
.map(operator -> { |
|||
MsgVo msgVo = new MsgVo(); |
|||
msgVo.setBizType(jobTask.getBusinessType()); |
|||
msgVo.setContent("水闸巡视检查计划<<" + content.getName() + ">>即将开始,请及时执行!\n 计划开始时间:" + content.getStartDate() + " \n计划结束时间:" + content.getEndDate()); |
|||
msgVo.setContentType(jobTask.getBusinessId()); |
|||
setReceiver(msgVo, operator.getUid()); |
|||
return msgVo; |
|||
}).collect(Collectors.toList()); |
|||
} else { |
|||
if (dto.getContents().stream().anyMatch(content -> content.getEndDate().after(currentTime))) { |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
} |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
} |
|||
|
|||
private Date getNextTime(SzPlanDetailDto dto, Date nextStartTime) { |
|||
CycleType cycleType = CycleType.getByCode(dto.getCycleType()); |
|||
if (cycleType == null) { |
|||
return nextStartTime; |
|||
} |
|||
Date currentTime = new Date(); |
|||
switch (cycleType) { |
|||
case DAY: |
|||
return DateUtil.offsetDay(nextStartTime, 1); |
|||
case WEEK: |
|||
//获取现在是周几
|
|||
int currentWeek = DateUtil.dayOfWeek(currentTime); |
|||
if (dto.getOtherConfig() == null) { |
|||
return nextStartTime; |
|||
} |
|||
List<Integer> weeks = dto.getOtherConfig().getReminderWeeks(); |
|||
if (weeks == null) { |
|||
return nextStartTime; |
|||
} |
|||
weeks = weeks.stream().sorted().collect(Collectors.toList()); |
|||
int netWeek = weeks.stream().filter(w -> w > currentWeek).findFirst().orElse(weeks.get(0)); |
|||
if (netWeek > currentWeek) { |
|||
return DateUtil.offsetDay(nextStartTime, netWeek - currentWeek); |
|||
} else { |
|||
return DateUtil.offsetDay(nextStartTime, 7 - currentWeek + netWeek); |
|||
} |
|||
|
|||
case MONTH: |
|||
Date currentDate = new Date(); |
|||
int currentMonthDays = getMonthDays(currentDate); |
|||
int currentDay = DateUtil.dayOfMonth(currentDate); |
|||
if (dto.getOtherConfig() == null) { |
|||
return nextStartTime; |
|||
} |
|||
List<Integer> days = dto.getOtherConfig().getReminderDays(); |
|||
if (days == null) { |
|||
return nextStartTime; |
|||
} |
|||
days = days.stream().sorted().collect(Collectors.toList()); |
|||
int nextDay = days.stream().filter(d -> d > currentDay && d <= currentMonthDays).findFirst().orElse(days.get(0)); |
|||
if (nextDay > currentDay) { |
|||
return DateUtil.offsetDay(nextStartTime, nextDay - currentDay); |
|||
} else { |
|||
return DateUtil.offsetDay(nextStartTime, currentMonthDays - currentDay + nextDay); |
|||
} |
|||
default: |
|||
return nextStartTime; |
|||
} |
|||
} |
|||
|
|||
//获取指定时间月份的天数
|
|||
private int getMonthDays(Date date) { |
|||
//当前是否闰年
|
|||
boolean isLeapYear = DateUtil.isLeapYear(DateUtil.year(date)); |
|||
//获取当前月份
|
|||
int currentMonth = DateUtil.month(date) + 1; |
|||
//获取当前月的天数
|
|||
return DateUtil.lengthOfMonth(currentMonth, isLeapYear); |
|||
} |
|||
|
|||
@Override |
|||
public String type() { |
|||
return SendMessageType.SZ_CHECKING_PLAN.name(); |
|||
} |
|||
} |
@ -0,0 +1,54 @@ |
|||
package com.kms.yxgh.common.job.handler; |
|||
|
|||
import com.kms.yxgh.common.ApprovalStatusEnum; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.kms.yxgh.common.job.JobStatus; |
|||
import com.kms.yxgh.sz.dto.SzYhPlanDetailDto; |
|||
import com.kms.yxgh.sz.service.SzYhPlanService; |
|||
import com.shuili.common.utils.StringUtils; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class SzYhPlanSendMessageHandler extends AbstractSendMessageHandler { |
|||
|
|||
@Autowired() |
|||
@Lazy |
|||
private SzYhPlanService szYhPlanService; |
|||
|
|||
@Override |
|||
public List<MsgVo> getMessagesAndUpdateJobTask(JobTask jobTask) { |
|||
SzYhPlanDetailDto dto = szYhPlanService.getDetailById(jobTask.getBusinessId()); |
|||
jobTask.setStatus(JobStatus.FINISH.getCode()); |
|||
jobTask.setTaskEndTime(new Date()); |
|||
if (dto == null || dto.getResponsiblePerson() == null || StringUtils.isBlank(dto.getResponsiblePerson().getUid())) { |
|||
log.error("SzYhPlanSendMessageHandler getMessagesAndUpdateJobTask error, dto or responsiblePerson is null, jobTask:{}", jobTask); |
|||
jobTask.setStatus(JobStatus.ERROR.getCode()); |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
long currentTime = System.currentTimeMillis(); |
|||
if (dto.getStartDate().getTime() > currentTime || dto.getEndDate().getTime() < currentTime || !dto.getStatus().equals(ApprovalStatusEnum.PASS.getValue())) { |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
MsgVo msgVo = new MsgVo(); |
|||
msgVo.setBizType(jobTask.getBusinessType()); |
|||
msgVo.setContent("水闸维修养护计划<<" + dto.getName() + ">>即将开始,请及时执行!\n 计划开始时间:" + dto.getStartDate() + " \n计划结束时间:" + dto.getEndDate()); |
|||
msgVo.setContentType(jobTask.getBusinessId()); |
|||
setReceiver(msgVo, dto.getResponsiblePerson().getUid()); |
|||
return Collections.singletonList(msgVo); |
|||
} |
|||
|
|||
@Override |
|||
public String type() { |
|||
return SendMessageType.SZ_YH_PLAN.name(); |
|||
} |
|||
} |
@ -0,0 +1,11 @@ |
|||
package com.kms.yxgh.common.mapper; |
|||
|
|||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import org.springframework.stereotype.Repository; |
|||
|
|||
|
|||
@Repository |
|||
public interface JobTaskMapper extends BaseMapper<JobTask> { |
|||
|
|||
} |
@ -0,0 +1,109 @@ |
|||
package com.kms.yxgh.common.service; |
|||
|
|||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
|||
import com.kms.yxgh.common.domain.JobTask; |
|||
import com.kms.yxgh.common.job.JobStatus; |
|||
import com.kms.yxgh.common.job.handler.JobHandler; |
|||
import com.kms.yxgh.common.mapper.JobTaskMapper; |
|||
import com.shuili.common.core.service.BaseService; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.redisson.api.RLock; |
|||
import org.redisson.api.RedissonClient; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
|
|||
import java.util.Calendar; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
import java.util.Objects; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@RequiredArgsConstructor |
|||
public class JobTaskService extends BaseService<JobTaskMapper, JobTask> { |
|||
private final RedissonClient redissonClient; |
|||
private final List<JobHandler> handlers; |
|||
|
|||
public void runJob(String jobId) { |
|||
JobTask jobTask = getById(jobId); |
|||
if (jobTask == null) { |
|||
return; |
|||
} |
|||
if (Objects.equals(jobTask.getStatus(), JobStatus.RUNNING.getCode())) { |
|||
log.info("job [{}] is in running status [{}]", jobId, JobStatus.getByCode(jobTask.getStatus())); |
|||
// Use Redisson to lock the job
|
|||
RLock lock = redissonClient.getLock(jobId); |
|||
try { |
|||
// Try to acquire the lock with a wait time of 10 seconds and lease time of 60 seconds
|
|||
if (lock.tryLock(10, 60, TimeUnit.SECONDS)) { |
|||
try { |
|||
jobTask.setLastStartTime(new Date()); |
|||
JobHandler jobHandler = getHandler(jobTask.getBusinessType()); |
|||
if (jobHandler != null) { |
|||
jobHandler.runJob(jobTask); |
|||
} else { |
|||
jobTask.setStatus(JobStatus.ERROR.getCode()); |
|||
log.error("No job handler found for job [{},{}]", jobTask.getBusinessType(), jobId); |
|||
} |
|||
jobTask.setLastEndTime(new Date()); |
|||
this.baseMapper.updateById(jobTask); |
|||
} finally { |
|||
lock.unlock(); |
|||
log.info("Released lock for job [{}]", jobId); |
|||
} |
|||
} else { |
|||
log.info("Could not acquire lock for job [{}]", jobId); |
|||
} |
|||
} catch (InterruptedException e) { |
|||
log.error("Error while trying to acquire lock for job [{}]", jobId, e); |
|||
Thread.currentThread().interrupt(); |
|||
} |
|||
|
|||
} else { |
|||
log.info("job [{}] is not in running status [{}]", jobId, JobStatus.getByCode(jobTask.getStatus())); |
|||
} |
|||
|
|||
} |
|||
|
|||
@Transactional(rollbackFor = Exception.class) |
|||
public void updateOrInsertJobTask(JobTask jobTask, boolean isCancel) { |
|||
if (jobTask.getNextStartTime() == null) { |
|||
log.error("jobTask nextStartTime is null, jobTask:{}", jobTask); |
|||
} |
|||
JobTask old = this.baseMapper.selectOne(new QueryWrapper<JobTask>() |
|||
.eq("business_id", jobTask.getBusinessId()) |
|||
.eq("business_type", jobTask.getBusinessType())); |
|||
if (old == null) { |
|||
if (!isCancel) { |
|||
jobTask.setStatus(JobStatus.RUNNING.getCode()); |
|||
this.baseMapper.insert(jobTask); |
|||
} |
|||
} else { |
|||
if (isCancel) { |
|||
old.setStatus(JobStatus.STOP.getCode()); |
|||
this.baseMapper.updateById(old); |
|||
} else { |
|||
old.setNextStartTime(jobTask.getNextStartTime()); |
|||
old.setStatus(JobStatus.RUNNING.getCode()); |
|||
this.baseMapper.updateById(old); |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
private JobHandler getHandler(String handlerType) { |
|||
return handlers.stream().filter(handler -> handler.type().equals(handlerType)).findFirst().orElse(null); |
|||
} |
|||
|
|||
|
|||
public void clearExpireJob() { |
|||
//删除lastEndTime是3个月前的任务
|
|||
Calendar calendar = Calendar.getInstance(); |
|||
calendar.add(Calendar.MONTH, -3); |
|||
Date expireTime = calendar.getTime(); |
|||
this.baseMapper.delete(new QueryWrapper<JobTask>().lt("last_end_time", expireTime)); |
|||
|
|||
} |
|||
} |
Loading…
Reference in new issue