springboot集成PowerJob-openAPI和回调完整流程

原创
2020/10/10 15:30
阅读数 4.5K

参考官网:https://www.yuque.com/powerjob/guidence/olgyf0

https://github.com/KFCFans/PowerJob

推荐使用3.3.0版本   

导入需要的jar包,servie也需要使用3.3.0版本

<dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-client</artifactId>
            <version>3.3.0</version>
        </dependency>

        <dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-worker-spring-boot-starter</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>

<dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.14.9</version>
        </dependency>

一.  使用openAPI 开发一次性的任务,保证任务只调度一次就好

  1.  编写yml配置文件

后台访问地址:http://192.168.2.11:7700/

powerjob:
  worker:
    akka-port: 27777   # akka 工作端口,可选,默认 27777
    app-name: sass-openapi     # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
    server-address: 192.168.2.11:7700  # 调度服务器地址,IP:Port 或 域名,多值逗号分隔
    password: shiye9527  # 密码
    store-strategy: disk    # 持久化方式,可选,默认 disk

2. 编写config配置文件 PowerJobConfig 

package com.un.common.utils.job;

import com.github.kfcfans.powerjob.client.OhMyClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author shiye
 * @create 2020-10-09 16:33
 */
@Configuration
public class PowerJobConfig {

    /**
     * # akka 工作端口,可选,默认 27777
     */
    @Value("${powerjob.worker.akka-port}")
    private Integer akkaPort;

    /**
     * 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
     */
    @Value("${powerjob.worker.app-name}")
    private String appName;

    /**
     * 调度服务器地址,IP:Port 或 域名,多值逗号分隔
     */
    @Value("${powerjob.worker.server-address}")
    private String serverAddress;

    @Value("${powerjob.worker.password}")
    private String password;

    /**
     * 持久化方式,可选,默认 disk
     */
    @Value("${powerjob.worker.store-strategy}")
    private String storeStrategy;


    @Bean
    public OhMyClient getOhMyClient() {
        return new OhMyClient(serverAddress, appName, password);
    }

}

3. 编写util工具类 PowerJobUtil 

package com.un.common.utils.job;

import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.un.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author shiye
 * @create 2020-10-09 16:55
 */
@Configuration
public class PowerJobUtil {

    protected final Logger logger = LoggerFactory.getLogger(PowerJobUtil.class);

    @Autowired
    private OhMyClient ohMyClient;

    @Bean
    public PowerJobUtil getPowerJobUtil() {
        return new PowerJobUtil();
    }

    /**
     * 创建一个单核一年只执行一次得任务
     *
     * @param StartTime     任务开始时间
     * @param params        任务参数
     * @param processorInfo 回调得类全类名如:com.un.framework.task.SysNoticeScheduProcessor
     * @param jobId         任务id:如果为空就是创建任务
     * @return 返回结果
     * @throws Exception
     */
    public ResultDTO<Long> saveJob(Date StartTime, String params, String processorInfo, String jobId, String jobName, String jobDescription) throws Exception {
        logger.info("saveJob .......{},{},{},{}", StartTime, params, processorInfo, jobId);

        SaveJobInfoRequest request = new SaveJobInfoRequest();
        if (StringUtils.isNotEmpty(jobId)) {
            request.setId(Long.valueOf(jobId));
        }
        //任务名称
        request.setJobName(jobName);
        //任务描述
        request.setJobDescription(jobDescription);
        //任务参数,Processor#process方法入参TaskContext对象的jobParams字段
        request.setJobParams(params);
        //时间表达式类型,枚举值
        request.setTimeExpressionType(TimeExpressionType.CRON);
        //时间表达式,填写类型由timeExpressionType决定,比如CRON需要填写CRON表达式
        request.setTimeExpression(getCron(StartTime));
        //执行类型,枚举值
        request.setExecuteType(ExecuteType.STANDALONE);
        //处理器类型,枚举值
        request.setProcessorType(ProcessorType.EMBEDDED_JAVA);
        //处理器参数,填写类型由processorType决定,如Java处理器需要填写全限定类名,如:com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo
        request.setProcessorInfo(processorInfo);
        //最大实例数,该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例)
        request.setMaxInstanceNum(1);
        //单机线程并发数,表示该实例执行过程中每个Worker使用的线程数量
        request.setConcurrency(1);
        //任务实例运行时间限制,0代表无任何限制,超时会被打断并判定为执行失败
        request.setInstanceTimeLimit(0l);
        //任务实例重试次数,整个任务失败时重试,代价大,不推荐使用
        request.setMaxInstanceNum(0);
        //Task重试次数,每个子Task失败后单独重试,代价小,推荐使用
        request.setTaskRetryNum(2);
        //最小可用CPU核心数,CPU可用核心数小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinCpuCores(0);
        //最小内存大小(GB),可用内存小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinMemorySpace(0);
        //最小磁盘大小(GB),可用磁盘空间小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinDiskSpace(0);
        //指定机器执行,设置该参数后只有列表中的机器允许执行该任务,空代表不指定机器
        request.setDesignatedWorkers(null);
        //最大执行机器数量,限定调动执行的机器数量,0代表无限制
        request.setMaxWorkerCount(1);
        //是否启用该任务,未启用的任务不会被调度
        request.setEnable(true);

        ResultDTO<Long> resultDTO = ohMyClient.saveJob(request);
        return resultDTO;
    }

    /**
     * 禁用某个任务
     *
     * @param jobId
     * @return
     * @throws Exception
     */
    public ResultDTO<Void> disableJob(Long jobId) {
        logger.info("disableJob .......{}", jobId);
        try {
            TimeUnit.MINUTES.sleep(5);
            return ohMyClient.disableJob(jobId);
        } catch (Exception e) {
            logger.error("disableJob  error.......{},{}", e, jobId);
        }
        return null;
    }

    /**
     * 删除某个任务
     *
     * @param jobId
     * @return
     * @throws Exception
     */
    public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
        return ohMyClient.deleteJob(jobId);
    }

    /**
     * 通过输入指定日期时间生成cron表达式
     *
     * @param date
     * @return cron表达式
     */
    public String getCron(Date date) {
        logger.info("create cron 接收到得时间", date.toString());
        String dateFormat = "ss mm HH dd MM ? yyyy-yyyy";
        SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
        String formatTimeStr = null;
        if (date != null) {
            formatTimeStr = sdf.format(date);
        }
        System.out.println("当前时间得CRON:" + formatTimeStr);
        return formatTimeStr;
    }

}

4. 在业务代码中集成进去

Date startTimeCron = new Date(Long.parseLong(String.valueOf(startTime * 1000)));
ResultDTO<Long> resultDTO = powerJobUtil.saveJob(startTimeCron, pBasActivity.getId(), "com.un.framework.task.ActivityProcessorHandler", createActiveVo1.getStartJobId(),"活动定时任务","活动定时-开始任务");
if (resultDTO.isSuccess()) {
   pBasActivity.setStartJobId(resultDTO.getData() + "");
} else {
   return AjaxResult.error("创建任务失败,请联系管理员");
}

 

二.  消费端,负责处理任务具体的调用

package com.un.framework.task;

import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.un.common.utils.job.PowerJobUtil;
import com.un.framework.manager.AsyncManager;
import com.un.framework.manager.factory.AsyncFactory;
import com.un.project.system.domain.dto.TaskNoticeDto;
import com.un.project.system.domain.vo.UpdateNoticeVo;
import com.un.project.system.mapper.PBasNoticeMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
 * @author shiye
 * @create 2020-09-30 9:42
 */
@Component
public class NoticeProcessorHandler implements BasicProcessor {

    protected final Logger logger = LoggerFactory.getLogger(NoticeScheduTask.class);

    @Autowired
    private PowerJobUtil powerJobUtil;

    @Autowired
    private PBasNoticeMapper noticeMapper;

    public static Lock lock = new ReentrantLock();

    @Override
    public ProcessResult process(TaskContext context) throws Exception {

        ProcessResult result = new ProcessResult(true, "ok");
        try {
            lock.lock();
            String params = context.getJobParams();
            logger.info("Start a notice job task......" + params);


           //具体得物业处理


            logger.info("end a notice job task......{},{}", noticeIds1, noticeIds2);

        } catch (Exception e) {
            logger.error("error a notice job task......{},{}", e, context.toString());
        } finally {
            lock.unlock();
        }
        return result;
    }
}

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部