任务调度器和执行器使用http协议通信,各自有轮询线程处理不同业务。
public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(2000));
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
}).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
}
任务的下发与执行(服务端发送给客户端):
收到服务器不动执行进行任务分发:
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
}
直接return success,用户服务器探活;
等待队列如果存在待执行任务时,返回false;
等待队列为空时:返回true;
将任务提交到执行队列中,并返回true;
队列满或handler不存在时返回false;
对执行任务的线程执行 JobThread.interrupt();
每个任务Id会有一个线程,Kill仅杀死执行该任务Id的线程,下次再下发任务发现线程已中断会重新创建线程。
返回客户端执行的本地log给服务端。
客户端注册和执行结果上报(客户端发送给服务端)
@Override
public ReturnT callback(List callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
@Override
public ReturnT registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
启动线程定时注册自己的服务到调度器;
创建线程,30s轮询一次,上报注册信息。
将自己从执行器列表移除;
程序退出时会调用一次,在Netty的finally代码块自动执行。
执行器异步回调给调度器执行任务结果;
每次任务完成时上报。
(1)客户端注册
http://127.0.0.1:8080/xxl-job-admin/api/registry
{
"registryGroup": "EXECUTOR"
"registryKey": "xxl-job-executor-sample"
"registryValue": "http://172.30.0.67:9999/"
}
Response:
{
"code": 200
"msg": null
"content": null
}
(2)客户端移除注册
http://127.0.0.1:8080/xxl-job-admin/api/registryRemove
{
"registryGroup": "EXECUTOR"
"registryKey": "xxl-job-executor-sample"
"registryValue": "http://xxljob-axzo.cn"
}
Response:
{
"code": 200
"msg": null
"content": null
}
(3)客户端执行任务结果上报
http://127.0.0.1:8080/xxl-job-admin/api/callback
{
"logId": 1238
"logDateTim": 1667197980007
"handleCode": 200
}
Response:
{
"code": 200
"msg": null
"content": null
}
(4)执行器下发任务:同步回调仅代表任务是否发送成功
http://172.30.0.67:9999/run
{
"jobId": 4
"executorHandler": "demoJobHandler"
"executorParams": ""
"executorBlockStrategy": "SERIAL_EXECUTION"
"executorTimeout": 0
"logId": 1238
"logDateTime": 1667197980007
"glueType": "BEAN"
"glueSource": ""
"glueUpdatetime": 1666683613000
"broadcastIndex": 0
"broadcastTotal": 1
}
Response:
{
"code": 200
"msg": null
"content": null
}
1.配置了token后,client发送的每隔http请求头会带上XXL-JOB-ACCESS-TOKEN :{xxl.job.accessToken} ;
2.该参数不会对请求参数加密;
3.如果配置不匹配,客户端请求报错:
{
"code": 500
"msg": "The access token is wrong."
"content": null
}
4.发送配置token的请求,Header中新增了Token参数
5.配置错token的返回
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
页面更新:2024-04-23
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号