#头条创作挑战赛#
前文 RocketMQ源码分析之核心磁盘数据结构CommitLog 让我们知道这个CommitLog是干什么用的,就是broker会将消息写入本地磁盘的CommitLog文件中。
但是CommitLog采用的Master/Slave 部署模式,提供了一定的高可用性。但这样的部署模式,有一定缺陷。比如故障转移方面,如果主节点挂了,还需要人为手动进行重启或者切换,无法自动将一个从节点转换为主节点。
所以RocketMQ通过DLedgerCommitLog来实现基于 raft 协议的 commitlog 存储库,也是 RocketMQ 实现新的高可用多副本架构的关键。
DefaultMessageStore构造时进行根据是否开启高可用来初始化DLedgerCommitLog还是commitLog;
上面这行代码也就是说如果开启了高可用的话默认初始化一个DLedgerCommitLog否则就初始化原始的commitLog,我们到这里就可以想到了,这个DLedgerCommitLog和原始的CommitLog相比肯定是多了往子节点同步的部分。
/**
* Store all metadata downtime for recovery, data protection reliability
* 他是commitlog的子类,他可以去继承我么的commitlog把数据写入到本地磁盘文件里去,以及flush这样的功能
* 对于我们的数据恢复、以及数据保护可以去做一个多副本策略,高可用架构
*/
public class DLedgerCommitLog extends CommitLog {
// 开源dledger框架的高可用同步服务器组件
private final DLedgerServer dLedgerServer;
// 开源dledger框架的配置组件
private final DLedgerConfig dLedgerConfig;
// 开源dledger框架的mmap内存映射文件存储组件
private final DLedgerMmapFileStore dLedgerFileStore;
// 开源dledger框架的mmap内存映射文件list
private final MmapFileList dLedgerFileList;
//The id identifies the broker role, 0 means master, others means slave
private final int id;
// 消息序列器
private final MessageSerializer messageSerializer;
// 用于记录消息追加的时耗(日志追加所持有锁时间)
private volatile long beginTimeInDledgerLock = 0;
//This offset separate the old commitlog from dledger commitlog
// 记录的旧Commitlog文件中的最大偏移量,如果访问的偏移量大于它,则访问Dledger 管理的文件
private long pidedCommitlogOffset = -1;
// 是否正在恢复旧的Commitlog文件
private boolean isInrecoveringOldCommitlog = false;
private final StringBuilder msgIdBuilder = new StringBuilder();
}
我们可以看到DLedgerCommitLog实际上是继承了CommitLog的,那么DLedgerCommitLog的存储结构又是怎么样的呢,如何兼容CommitLog呢,其实我们根据上面的知识可以想到其实我们的主从高可用只是比普通模式的Log需要多记录一些term,channel等这些元数据信息:
看到这里我们能想到,我们只要把commitLog的原本信息放到body里不就可以兼容commitLog了,而且改动也不大,对于历史数据也能很好的兼容,rocketmq确实是这么做的。
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
// 调用父类的构造函数 也就是说开启了主从架构也会兼容历史的消息
super(defaultMessageStore);
dLedgerConfig = new DLedgerConfig();
// 是否强制删除文件,取自Broker配置属性cleanFileForciblyEnable,默认为true
dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
// DLedger存储类型,固定为基于文件的存储模式
dLedgerConfig.setStoreType(DLedgerConfig.FILE);
// Leader节点的id 名称,示例配置:n0,其配置要求第二个字符后必须是数字。
dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
// DLeger group 的名称,建议与broker 配置属性brokerName 保持一致
dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
// DLeger Group 中所有的节点信息,其配置示例n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多个节点使用分号隔开。
dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
// 设置DLedger 的日志文件的根目录,取自borker 配件文件中的storePathRootDir ,即RocketMQ 的数据存储根路径。
dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
// 设置DLedger 的单个日志文件的大小,取自Broker 配置文件中的mapedFileSizeCommitLog,即与Commitlog 文件的单个文件大小一致
dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
// DLedger 日志文件的删除时间,取自Broker 配置文件中的deleteWhen,默认为凌晨4 点
dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
// DLedger 日志文件保留时长,取自Broker 配置文件中的fileReservedHours,默认为72h
dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId());
dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush());
id = Integer.parseInt(dLedgerConfig.getSelfId().substring(1)) + 1;
// 初始化DledgerServer 主要是进行主从复制以及选举使用
dLedgerServer = new DLedgerServer(dLedgerConfig);
dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
// 在dledger框架的存储层里加一个append钩子,追加数据之前需要什么定位到这条数据的一个位置
// 然后加入进去这条数据在commitlog里面的全局物理offset
DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
// 我们上面说过其实当我们开启了主从同步之后我们追加消息的时候
// 其实只有body是存储的原始的commitLog结构其他对于客户端都是无用的信息
// 所以这里设置的追加消息的钩子函数就是为了返回body的offset
assert bodyOffset == DLedgerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset);
};
dLedgerFileStore.addAppendHook(appendHook);
dLedgerFileList = dLedgerFileStore.getDataFileList();
this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
主要流程节点:
这里其实就是去加载commitLog中的信息为了进行历史消息的兼容
public boolean load() {
return super.load();
}
最后还是调用到了父类CommitLog中的load方法,其中mappedFileQueue的load方法,前文 RocketMQ源码分析之映射文件队列MappedFileQueue 有进行讲解;
// CommitLog里面数据都是在多个磁盘文件里的,每个磁盘文件都是一个MappedFile
// 他应该是属于把所有的磁盘文件mappedfile的数据,从磁盘里load加载到映射内存区域里来
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
private void recover(long maxPhyOffsetOfConsumeQueue) {
// 主要是加载commitLog以及index文件的wrotePosition,flushedPosition,committedPosition重要的指针
dLedgerFileStore.load();
if (dLedgerFileList.getMappedFiles().size() > 0) {
// 如果存在dLedgerFile 只需要恢复dLedgerFile即可
// 存在dLedgerFile 恢复dLedgerFile
dLedgerFileStore.recover();
// 设置pidedCommitlogOffset为dLedger文件的最小offset
// 作为和老的commitLog的分割,小于这个offset需要访问老的commitLog
pidedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile != null) {
// 如果存在旧的commitLog则禁止删除Dledger防止出现日志断层影响查询
disableDeleteDledger();
}
// 最大物理offset
long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {
log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
}
return;
}
//Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog
isInrecoveringOldCommitlog = true;
//No need the abnormal recover
// 调用commitLog的recoverNormall() 进行commitLog文件的恢复
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;
// 如果不存在旧的commitLog直接结束文件日志的恢复流程
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 不存在旧的commitLog直接返回
if (mappedFile == null) {
return;
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true;
// 1 TOTAL SIZE
byteBuffer.getInt(); //size
int magicCode = byteBuffer.getInt();
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
needWriteMagicCode = false;
} else {
log.info("Recover old commitlog found a illegal magic code={}", magicCode);
}
dLedgerConfig.setEnableDiskForceClean(false);
pidedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} pidedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), pidedCommitlogOffset);
if (needWriteMagicCode) {
byteBuffer.position(mappedFile.getWrotePosition());
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
byteBuffer.putInt(BLANK_MAGIC_CODE);
mappedFile.flush(0);
}
// 设置最后一个文件的WrotePosition,CommittedPosition,FlushedPosition 表示文件已经被写满
mappedFile.setWrotePosition(mappedFile.getFileSize());
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
dLedgerFileList.getLastMappedFile(pidedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", pidedCommitlogOffset);
}
public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
setMessageInfo(msg, tranType);
final String finalTopic = msg.getTopic();
// Back to Results
AppendMessageResult appendResult;
AppendFuture dledgerFuture;
EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false);
// 追加消息的时候不再写入之前的commitLog
// 而是调用dlegerserver的handleAppend进行日志的写入&子节点日志的复制(后面会详细讲解)
// 只有超过半数以上的节点复制成功才会返回成功
// 如果追加成功则会返回追加成功的起始偏移量即pos属性类似于commitLog中的物理偏移量
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.getData());
dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
// 根据dledger的起始偏移量计算真正的消息的存储offset。
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break;
default:
break;
}
} catch (Exception e) {
log.error("Put message error", e);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
}
return dledgerFuture.thenApply(appendEntryResponse -> {
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case LEADER_NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).add(appendResult.getWroteBytes());
}
return putMessageResult;
});
}
消息的查找起始和原来还是没有什么区别的,还是使用二分查找法通过offset获取mappedFile文件,只是多了一个pidedCommitlogOffset的判断是否是老数据,如果是老数据直接走commitLog,新数据就走Dledger维护的文件列表;
public SelectMappedBufferResult getMessage(final long offset, final int size) {
// 如果是小于pidedCommitlogOffset 证明是旧数据 -> 从commitLog获取
if (offset < pidedCommitlogOffset) {
return super.getMessage(offset, size);
}
// 从 dledger获取
int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
// 获取文件并转换为 DLedgerSelectMappedBufferResult 类型
return convertSbr(mappedFile.selectMappedBuffer(pos, size));
}
return null;
}
页面更新:2024-05-20
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号