本篇文章主要介绍下FLink的内存模型,在介绍Flink内存模型之前,我们首先学习下JVM内存结构
Java7 升级为 Java8的时候,JVM内存结构发生了改变,咱们看下区别是什么。这部分内容原文 Java8 JVM内存结构
很多人愿意将方法区称作永久代。
本质上来讲两者并不等价,仅因为Hotspot将GC分代扩展至方法区,或者说使用永久代来实现方法区。在其他虚拟机上是没有永久代的概念的。也就是说方法区是规范,永久代是Hotspot针对该规范进行的实现。
元空间MetaSpace存在于本地内存,意味着只要本地内存足够,它不会出现像Java7永久代中“java.lang.OutOfMemoryError: PermGen space”这种错误。
那为什么用MetaSpace代替了方法区呢?是因为通常使用PermSize和MaxPermSize设置永久代的大小就决定了永久代的上限,但是不知道应该设置多大合适, 如果使用默认值很容易遇到OOM错误。
当使用元空间时,可以加载多少类的元数据就不再由MaxPermSize控制, 而由系统的实际可用空间来控制。
这部分内容原文 堆内堆外内存
堆内内存
堆外内存
Flink1.10 对Flink的内存模型进行了改造,咱们分开来介绍Flink1.10之前版本,以及Flink1.10之后版本
首先看下内存模型图
看下flink源码,来分析下内存各个分区大小是怎么设置的,入口 ContaineredTaskManagerParameters#create 方法
/**
* Computes the parameters to be used to start a TaskManager Java process.
*
* @param config The Flink configuration.
* @param containerMemoryMB The size of the complete container, in megabytes.
* @return The parameters to start the TaskManager processes with.
*/
public static ContaineredTaskManagerParameters create(
Configuration config,
long containerMemoryMB,
int numSlots) {
// (1) try to compute how much memory used by container
final long cutoffMB = calculateCutoffMB(config, containerMemoryMB);
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
// use the cut-off memory for off-heap (that was its intention)
final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
// (3) obtain the additional environment variables from the configuration
final HashMap envVars = new HashMap<>();
final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
for (String key : config.keySet()) {
if (key.startsWith(prefix) && key.length() > prefix.length()) {
// remove prefix
String envVarKey = key.substring(prefix.length());
envVars.put(envVarKey, config.getString(key, null));
}
}
// done
return new ContaineredTaskManagerParameters(
containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
}
}
首先看下内存模型图
看下flink源码,来分析下内存各个分区大小是怎么设置的,入口 AbstractContainerizedClusterClientFactory#getClusterSpecification 方法,这个方法是在集群提交作业的时候被调度。
@Override
public ClusterSpecification getClusterSpecification(Configuration configuration) {
checkNotNull(configuration);
// JM 内存模型
final int jobManagerMemoryMB =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
// TM 内存模型
final int taskManagerMemoryMB =
TaskExecutorProcessUtils.processSpecFromConfig(
TaskExecutorProcessUtils
.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
configuration,
TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setSlotsPerTaskManager(slotsPerTaskManager)
.createClusterSpecification();
}
咱们主要是分析TM的内存模型,也就是直接看下TaskExecutorProcessUtils.processSpecFromConfig方法,该方法主要做了下面几件事情
对应TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS对象,根据configure文件读取,涉及到下面几个参数:
1.1 task 和 managed 内存相关
- taskmanager.memory.task.heap.size: task heap大小,没有默认值。
- taskmanager.memory.managed.size:flink框架manage 内存大小,没有默认值
1.2 Total Flink Memory 内存
- taskmanager.memory.flink.size: Total Flink Memory大小,没有默认值。
1.3 Total Process Memory 内存
- taskmanager.memory.process.size: Total Process Memory大小,没有默认值。
1.4 Jvm Metaspace和 Overhead 内存
- taskmanager.memory.jvm-metaspace.size: jvm-metaspace大小,主要存储类的元数据信息,默认值 256MB
- taskmanager.memory.jvm-overhead.min:jvm-overhead区域的最小值,默认值是 192 MB
- taskmanager.memory.jvm-overhead.max:jvm-overhead区域的最大值,默认值是 1GB
- taskmanager.memory.jvm-overhead.fraction:jvm-overhead占内存比例,默认值是0.1
static final ProcessMemoryOptions TM_PROCESS_MEMORY_OPTIONS =
new ProcessMemoryOptions(
Arrays.asList(
TaskManagerOptions.TASK_HEAP_MEMORY,
TaskManagerOptions.MANAGED_MEMORY_SIZE),
TaskManagerOptions.TOTAL_FLINK_MEMORY,
TaskManagerOptions.TOTAL_PROCESS_MEMORY,
new JvmMetaspaceAndOverheadOptions(
TaskManagerOptions.JVM_METASPACE,
TaskManagerOptions.JVM_OVERHEAD_MIN,
TaskManagerOptions.JVM_OVERHEAD_MAX,
TaskManagerOptions.JVM_OVERHEAD_FRACTION));
如果指定了Total Process Memory大小,可以接下来看 ProcessMemoryUtils#deriveProcessSpecWithTotalProcessMemory方法
根据设置的Total Process Memory大小,来计算其他区域内存大小
- 如果在conf文件中设置了taskmanager.memory.jvm-metaspace.size,就按照设置的来,否则走默认值 256MB
- 根据 taskmanager.memory.jvm-overhead.fraction比例来计算,OverheadMemorySize = totalProcessMemorySize * taskmanager.memory.jvm-overhead.fraction
如果,OverheadMemorySize正好介于taskmanager.memory.jvm-overhead.min(192MB)和 taskmanager.memory.jvm-overhead.max(1GB)之间,那取值就是OverheadMemorySize
否则,if OverheadMemorySize > taskmanager.memory.jvm-overhead.max ,那取值就是taskmanager.memory.jvm-overhead.max。
if OverheadMemorySize < taskmanager.memory.jvm-overhead.min ,那取值就是taskmanager.memory.jvm-overhead.min
- totalFlinkMemorySize = totalProcessMemorySize - jvmMetaspaceSize - jvmOverheadSize
Total Flink Memory中又涉及到好几块区域,分别来看下计算规则,都是基于totalFlinkMemorySize来计算的。对应TaskExecutorFlinkMemoryUtils#deriveFromTotalFlinkMemory方法,该方法主要做了下面几件事情:
接下来还有taskHeapMemory,networkMemory,managedMemory需要设置,这里会有if-else逻辑,第一种情况:如果明确指定了taskHeapMemory大小, else 是第二种情况,接下来咱们按照else这个分支分析下 taskHeapMemory,networkMemory,managedMemory这三块区域的内存大小。
至此,Flink内存模型已经介绍完成。
整个TM内存模型可以通过三种方式来指定
对应源码ProcessMemoryUtils#memoryProcessSpecFromConfig方法
public CommonProcessMemorySpec memoryProcessSpecFromConfig(Configuration config) {
if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
// all internal memory options are configured, use these to derive total Flink and
// process memory
return deriveProcessSpecWithExplicitInternalMemory(config);
} else if (config.contains(options.getTotalFlinkMemoryOption())) {
// internal memory options are not configured, total Flink memory is configured,
// derive from total flink memory
return deriveProcessSpecWithTotalFlinkMemory(config);
} else if (config.contains(options.getTotalProcessMemoryOption())) {
// total Flink memory is not configured, total process memory is configured,
// derive from total process memory
return deriveProcessSpecWithTotalProcessMemory(config);
}
return failBecauseRequiredOptionsNotConfigured();
}
本文介绍了Flink1.9 和 Flink1.12的内存模型以及各个区域的计算方法。
简单总结下Flink1.10之后的内存模型:
Non-Heap Max 是 JVM 自己决定的,所以通常会比 Flink 配置的 Metaspace + Overhead 要大。
可以这样理解,Flink 将整个 TM 的内存预算划分给了不同的用途,但是并不能严格保证各部分的内存都不超用,只能是 Best Effort。
其中,Managed、Network、Metaspace 是严格限制的,Off-Heap、Overhead 是不能完全严格限制的,Heap 整体是严格限制的但是 Task/Framework 之间是非严格的。
页面更新:2024-02-26
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号