flink启动原理入口分析

此处使用分析的flink版本为1.10.0

直接使用jobmanager.sh 和 taskmanager.sh 启动单机模式的flink服务的脚本逻辑其实是调用 flink-daemon.sh

Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]

里面有启动的入口类,即可以追踪到flink单机版启动逻辑

case $DAEMON in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

    (standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

    (standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
    ;;

可以得出jobmanager和taskmanager的独立启动入口类

org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint

org.apache.flink.runtime.taskexecutor.TaskManagerRunner

和本地集群方式启动入口类

org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

其他脚本的执行其实都是使用的这个脚本逻辑,如start-cluster.sh脚本启动,不管是启动HA模式,还是非HA模式,都会执行jobmanager.sh脚本,所以都会走flink-daemon.sh


那我们就可以通过代码进行分析了

StandaloneSessionClusterEntrypoint启动源码,StandaloneSessionClusterEntrypoint继承自SessionClusterEntrypoint

// 进行启动检查,获取启动环境信息,如版本,scala版本,git提交号,jvm版本,hadoop版本,javahome等
EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
// 捕捉系统信号windows包含:终止(TERM),INT(键盘中断);其它系统多包含:HUP(终端挂起或者控制进程终止)
SignalHandler.register(LOG);
// 注册JVM关闭的钩子函数 设置5秒延迟退出
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 获取启动行参数,使用的是org.apache.commons.cli
// 所有的启动参数都写在了org.apache.flink.runtime.entrypoint.parser.CommandLineOptions类中
// 包括:
// c configDir: Directory which contains the configuration file flink-conf.yml.
// r webui-port: Port for the rest endpoint and the web UI.
// D property=value: use value for given property
// h host: Hostname for the RPC service.
// x executionMode: Deprecated option
EntrypointClusterConfiguration entrypointClusterConfiguration = null;
final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());

try {
   entrypointClusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
   LOG.error("Could not parse command line arguments {}.", args, e);
   commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
   System.exit(1);
}
// 获取到参数
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
// 运行 执行父类方法
ClusterEntrypoint.runClusterEntrypoint(entrypoint);

SessionClusterEntrypoint 继承ClusterEntrypoint

	public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

		final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
		try {
//启动集群方法
			clusterEntrypoint.startCluster();
		} catch (ClusterEntrypointException e) {
			LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
			System.exit(STARTUP_FAILURE_RETURN_CODE);
		}

		clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
			final int returnCode;

			if (throwable != null) {
				returnCode = RUNTIME_FAILURE_RETURN_CODE;
			} else {
				returnCode = applicationStatus.processExitCode();
			}

			LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
			System.exit(returnCode);
		});
	}

进入ClusterEntrypoint的startCluster方法


	public void startCluster() throws ClusterEntrypointException {
		try {
			replaceGracefulExitWithHaltIfConfigured(configuration);
			// 初始化共享文件系统设置,会将文件系统映射为url
			configureFileSystems(configuration);
			// 初始化安全上下文:其中为进程范围的安全配置,使用可用的安全模块(即Hadoop、JAAS)应用配置。
			SecurityContext securityContext = installSecurityContext(configuration);
			securityContext.runSecured((Callable) () -> {
				runCluster(configuration);
				return null;
			});
		}
	}

接下来进入runCluster方法

	private void runCluster(Configuration configuration) throws Exception {
		synchronized (lock) {
			//初始化集群服务,
			initializeServices(configuration);

			// write host information into configuration
			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

			final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

			clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				ioExecutor,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
				this);

根据上述服务创建资源,调度,监控进程DispatcherResourceManagerComponent,并设置同步状态关闭,这之中还包含网关及查询服务检索器的创建,基本了解了flink启动读取运行参数,获取本地配置,并增加了JVM关闭钩子等一些我们平常开发不常用的一些方法,flink的启动流程刚刚迈入门槛。

后续看看flink是如何实现这些服务,并管理好task,保证服务的稳定性的。

展开阅读全文

页面更新:2024-03-31

标签:都会   钩子   集群   初始化   脚本   逻辑   进程   入口   原理   参数   版本   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top