此处使用分析的flink版本为1.10.0
直接使用jobmanager.sh 和 taskmanager.sh 启动单机模式的flink服务的脚本逻辑其实是调用 flink-daemon.sh
Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]
里面有启动的入口类,即可以追踪到flink单机版启动逻辑
case $DAEMON in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(historyserver)
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
;;
可以得出jobmanager和taskmanager的独立启动入口类
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
和本地集群方式启动入口类
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
其他脚本的执行其实都是使用的这个脚本逻辑,如start-cluster.sh脚本启动,不管是启动HA模式,还是非HA模式,都会执行jobmanager.sh脚本,所以都会走flink-daemon.sh
那我们就可以通过代码进行分析了
StandaloneSessionClusterEntrypoint启动源码,StandaloneSessionClusterEntrypoint继承自SessionClusterEntrypoint
// 进行启动检查,获取启动环境信息,如版本,scala版本,git提交号,jvm版本,hadoop版本,javahome等
EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
// 捕捉系统信号windows包含:终止(TERM),INT(键盘中断);其它系统多包含:HUP(终端挂起或者控制进程终止)
SignalHandler.register(LOG);
// 注册JVM关闭的钩子函数 设置5秒延迟退出
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 获取启动行参数,使用的是org.apache.commons.cli
// 所有的启动参数都写在了org.apache.flink.runtime.entrypoint.parser.CommandLineOptions类中
// 包括:
// c configDir: Directory which contains the configuration file flink-conf.yml.
// r webui-port: Port for the rest endpoint and the web UI.
// D property=value: use value for given property
// h host: Hostname for the RPC service.
// x executionMode: Deprecated option
EntrypointClusterConfiguration entrypointClusterConfiguration = null;
final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
try {
entrypointClusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
LOG.error("Could not parse command line arguments {}.", args, e);
commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
System.exit(1);
}
// 获取到参数
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
// 运行 执行父类方法
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
SessionClusterEntrypoint 继承ClusterEntrypoint
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
//启动集群方法
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
final int returnCode;
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode = applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}
进入ClusterEntrypoint的startCluster方法
public void startCluster() throws ClusterEntrypointException {
try {
replaceGracefulExitWithHaltIfConfigured(configuration);
// 初始化共享文件系统设置,会将文件系统映射为url
configureFileSystems(configuration);
// 初始化安全上下文:其中为进程范围的安全配置,使用可用的安全模块(即Hadoop、JAAS)应用配置。
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable) () -> {
runCluster(configuration);
return null;
});
}
}
接下来进入runCluster方法
private void runCluster(Configuration configuration) throws Exception {
synchronized (lock) {
//初始化集群服务,
initializeServices(configuration);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
根据上述服务创建资源,调度,监控进程DispatcherResourceManagerComponent,并设置同步状态关闭,这之中还包含网关及查询服务检索器的创建,基本了解了flink启动读取运行参数,获取本地配置,并增加了JVM关闭钩子等一些我们平常开发不常用的一些方法,flink的启动流程刚刚迈入门槛。
后续看看flink是如何实现这些服务,并管理好task,保证服务的稳定性的。
页面更新:2024-03-31
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号