xxl-job客户端架构流程

一、xxl-job的调度流程和配置

任务调度器和执行器使用http协议通信,各自有轮询线程处理不同业务。




二、XXL-JOB客户端启动流程

  1. 加载Bean:
  2. 从spring容器获取所有对象,并遍历查找方法上标记XxlJob注解的方法 将xxljob配置的jobname作为key,对象Handle作为value注册Map 中 ConcurrentMap jobHandlerRepository的Map中维护;
  3. 创建执行任务的线程池;
  4. 启动内嵌的Netty服务;
  5. 启动注册线程,每隔30s上报一次注册信息。


public class EmbedServer {

    public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // param
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0,
                        200,
                        60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue(2000));

                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                             }
                        }).childOption(ChannelOption.SO_KEEPALIVE, true);

                ChannelFuture future = bootstrap.bind(port).sync();

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();
            }
        });
        thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }
}

三、任务的下发与执行

任务的下发与执行(服务端发送给客户端):


收到服务器不动执行进行任务分发:


private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

      switch (uri) {
          case "/beat":
              return executorBiz.beat();
          case "/idleBeat":
              IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
              return executorBiz.idleBeat(idleBeatParam);
          case "/run":
              TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
              return executorBiz.run(triggerParam);
          case "/kill":
              KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
              return executorBiz.kill(killParam);
          case "/log":
              LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
              return executorBiz.log(logParam);
          default:
              return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
        }
}

1 /beat:心跳保活检测

直接return success,用户服务器探活;

2 /idleBeat:任务执行策略配置为忙碌转移时使用;

等待队列如果存在待执行任务时,返回false;


等待队列为空时:返回true;

3 /run:接收到执行任务指令

将任务提交到执行队列中,并返回true;


队列满或handler不存在时返回false;

4 /kill:中断任务

对执行任务的线程执行 JobThread.interrupt();


每个任务Id会有一个线程,Kill仅杀死执行该任务Id的线程,下次再下发任务发现线程已中断会重新创建线程。

5 /log:获取执行log

返回客户端执行的本地log给服务端。

四、客户端注册和执行结果上报

客户端注册和执行结果上报(客户端发送给服务端)


@Override
public ReturnT callback(List callbackParamList) {
    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}

@Override
public ReturnT registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}

@Override
public ReturnT registryRemove(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}

1 /registry:注册客户端信息

启动线程定时注册自己的服务到调度器;


创建线程,30s轮询一次,上报注册信息。

2 /registryRemove:移出执行器请求

将自己从执行器列表移除;


程序退出时会调用一次,在Netty的finally代码块自动执行。

3 /callback:异步回调结果

执行器异步回调给调度器执行任务结果;


每次任务完成时上报。

五、附录

1 网络通信格式:

(1)客户端注册


http://127.0.0.1:8080/xxl-job-admin/api/registry
{
    "registryGroup": "EXECUTOR"
    "registryKey": "xxl-job-executor-sample"
    "registryValue": "http://172.30.0.67:9999/"
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}


(2)客户端移除注册


http://127.0.0.1:8080/xxl-job-admin/api/registryRemove
{
    "registryGroup": "EXECUTOR"
    "registryKey": "xxl-job-executor-sample"
    "registryValue": "http://xxljob-axzo.cn"
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}


(3)客户端执行任务结果上报


http://127.0.0.1:8080/xxl-job-admin/api/callback
{
    "logId": 1238
    "logDateTim": 1667197980007
    "handleCode": 200
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}


(4)执行器下发任务:同步回调仅代表任务是否发送成功


http://172.30.0.67:9999/run
{
    "jobId": 4
    "executorHandler": "demoJobHandler"
    "executorParams": ""
    "executorBlockStrategy": "SERIAL_EXECUTION"
    "executorTimeout": 0
    "logId": 1238
    "logDateTime": 1667197980007
    "glueType": "BEAN"
    "glueSource": ""
    "glueUpdatetime": 1666683613000
    "broadcastIndex": 0
    "broadcastTotal": 1
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}

2 Token配置详解

1.配置了token后,client发送的每隔http请求头会带上XXL-JOB-ACCESS-TOKEN :{xxl.job.accessToken} ;


2.该参数不会对请求参数加密;


3.如果配置不匹配,客户端请求报错:


{
"code": 500
"msg": "The access token is wrong."
"content": null
}


4.发送配置token的请求,Header中新增了Token参数



5.配置错token的返回





程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。


作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

展开阅读全文

页面更新:2024-04-23

标签:架构   客户端   队列   线程   服务端   中间件   公众   流程   参数   技术   信息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top