RocketMQ作为一款优秀的开源消息中间件,很多java开发者都在使用并研究里面的源码。目前网上有很多关于RocketMQ源代码的文章,但是很多文章只是从框架开发者的的角度分析源码,没有从技术实现本质进行剖析。因此很多源码学习者在读完后还是一知半解,当自己想动手写的时候不知要用到哪种技术,无从着手。笔者基于对RocketMQ的文件存储研究,结合开发者常见的技术,自己动手实现了一个简化版本的RocketMQ文件系统,希望能抽丝剥茧,帮助开发者从本质上理解RocketMQ文件存储的原理,起到抛砖引玉,举一反三的作用。
本文适合对RocketMQ的文件存储原理有一定的了解,熟悉java NIO,希望了解RocketMQ是如何通过java NIO实现的读者。以下代码部分:
1.手动生成10个消息,并创建commitLog文件,consumeQueue,indexFile文件
package org.apache.rocketmq.test.smoke;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.test.smoke.model.ConsumerQueueData;
import org.apache.rocketmq.test.smoke.model.IndexFileHeaderData;
import org.apache.rocketmq.test.smoke.model.IndexFileItemData;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
public class CommitLogWriteTest {
private static Long commitLogOffset = 0L;//8byte(commitlog offset)
private static Long lastTotalSize = 0L;
private static Long currentTotalSize = 0L;
private static List consumerQueueDatas = new ArrayList<>();
private static List indexFileItemDatas = new ArrayList<>();
private static int MESSAGE_COUNT = 10;
public static void main(String[] args) throws IOException {
createCommitLog();
createConsumerQueue();
createIndexFile();
}
private static void createCommitLog() throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
fileChannel.close();
Random random = new Random();
int count = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
long commitLogOffset = lastTotalSize;
String topic = "Topic-test";
String msgId = UUID.randomUUID().toString();
String msgBody = "消息内容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48));//
long queueOffset = i;//索引偏移量
String transactionId = UUID.randomUUID().toString();
/* 数据格式,位置固定
int totalSize;//消息长度
String msgId;
String topic;
long queueOffset;//索引偏移量
long bodySize;//消息长度
byte[] body;//消息内容
String transactionId;
long commitLogOffset;//从第一个文件开始算的偏移量
*/
int totalSize = 8 //totalSize长度
+ 64 //msgId长度
+ 64 //topic长度
+ 8 //索引偏移量长度
+ 8 //消息长度长度
+ msgBody.getBytes(StandardCharsets.UTF_8).length //消息内容长度
+ 64 //transactionId长度
+ 64 //commitLogOffset长度;
;
ByteBuffer b = ByteBuffer.allocate(totalSize);
// //如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300
mappedByteBuffer.position(Integer.valueOf(commitLogOffset + ""));
b.putLong(totalSize);//totalSize
b.put(getBytes(msgId, 64));//msgId
b.put(getBytes(topic, 64));//topic,定长64
b.putLong(queueOffset);//索引偏移量
b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
b.put(getBytes(transactionId, 64));
b.putLong(commitLogOffset);//bodySize
b.flip();
mappedByteBuffer.put(b);
lastTotalSize = totalSize + lastTotalSize;
System.out.println("写入消息,第:" + i + "次");
System.out.println("totalSize:" + totalSize);
System.out.println("msgId:" + msgId);
System.out.println("topic:" + topic);
System.out.println("msgBody:" + msgBody);
System.out.println("transactionId:" + transactionId);
System.out.println("commitLogOffset:" + commitLogOffset);
ConsumerQueueData consumerQueueData = new ConsumerQueueData();
consumerQueueData.setOffset(commitLogOffset);
consumerQueueData.setMsgLength(totalSize);
consumerQueueData.setTagCode(100L);
//准备生成consumeQueue文件
consumerQueueDatas.add(consumerQueueData);
IndexFileItemData indexFileItemData = new IndexFileItemData();
indexFileItemData.setKeyHash(msgId.hashCode());
indexFileItemData.setMessageId(msgId);
indexFileItemData.setPhyOffset(commitLogOffset);
//准备生成indexFile文件
indexFileItemDatas.add(indexFileItemData);
mappedByteBuffer.force();
count++;
}
System.out.println("commitLog数据保存完成,totalSize:" + count);
}
public static void createConsumerQueue() throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fileChannel.close();
int count = 0;
for (int i = 0; i < consumerQueueDatas.size(); i++) {
ConsumerQueueData consumerQueueData = consumerQueueDatas.get(i);
//指定写入位置
mappedByteBuffer.position(i * 20);
mappedByteBuffer.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)
mappedByteBuffer.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)
mappedByteBuffer.putLong(consumerQueueData.getTagCode());//8byte (tagCode)
count++;
System.out.println("consumerQueue数据写入完成:" + JSON.toJSONString(consumerQueueData));
mappedByteBuffer.force();
}
System.out.println("ConsumerQueue数据保存完成count:" + count);
}
public static void createIndexFile() throws IOException {
//文件场创建时间,在写第一条消息的时候创建
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
ByteBuffer headerByteBuffer = mappedByteBuffer.slice();
long firstDataTime = System.currentTimeMillis();
fileChannel.close();
//开始写hash槽,从头部后写入
/* 已经填充有index的slot数量
(并不是每个slot槽下都挂载有index索引单元,这 里统计的是所有挂载了index索引单元的slot槽的数量,hash冲突)*/
int hashSlotCount = 0;
/* 已该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和),
如果没有hash冲突,hashSlotCount = indexCount*/
int indexCount = 0;
//假设建立100个槽位(总长度400)
int soltNum = 100;
for (int i = 0; i < MESSAGE_COUNT; i++) {
IndexFileItemData indexFileItemData = indexFileItemDatas.get(i);
int keyHash = indexFileItemData.getKeyHash();
//取模,计算第几个槽位
int slotPos = keyHash % 100 > 0?keyHash % 100:-1*(keyHash % 100);
// slot存放第几条数据的的位置(字节数组位置)
int absSlotPos = 40 + slotPos * 4;
// 存储实际数据的位置(字节数组位置)
int absIndexPos =
40 + soltNum * 4
+ indexCount * 20;
//将hash槽的值设置为indexCount,建立索引,即第n条消息保存在XX位置
mappedByteBuffer.putInt(absSlotPos, indexCount);
//写入数据
mappedByteBuffer.putInt(absIndexPos,indexFileItemData.getKeyHash());//4byte msg hashcode
mappedByteBuffer.putLong(absIndexPos+4,indexFileItemData.getPhyOffset());//8byte pyhoffset
mappedByteBuffer.putInt(absIndexPos+4+8,Integer.valueOf((System.currentTimeMillis()- firstDataTime)+""));//4byte (timeDiff)
mappedByteBuffer.putInt(absIndexPos+4+8+4,0);//4byte (preIndex),暂置0,暂不考虑hash冲突的情况
//模拟最后一个文件,写入header
if (i == 0) {
//该indexFile中第一条消息的存储时间
headerByteBuffer.putLong(0, firstDataTime);
//该indexFile种第一条消息在commitlog种的偏移量commitlog offset
mappedByteBuffer.putLong(16, indexFileItemData.getPhyOffset());
}
//模拟最后一个文件,写入header
if (i == 99) {
//该indexFile种最后一条消息存储时间
headerByteBuffer.putLong(8, System.currentTimeMillis());
//该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
headerByteBuffer.putLong(24, indexFileItemData.getPhyOffset());
}
//已经填充有index的slot数量
headerByteBuffer.putInt(32, hashSlotCount+1);
//该indexFile中包含的索引单元个数
headerByteBuffer.putInt(36, indexCount+1);
mappedByteBuffer.force();
System.out.println("msgId:"+indexFileItemData.getMessageId()+",keyHash:"+keyHash+",保存槽位为"+slotPos+"的数据,absSlotPos="+absSlotPos+",值index="+indexCount+",绝对位置:"+absIndexPos+",commit-phyOffset:"+indexFileItemData.getPhyOffset());
indexCount ++;
hashSlotCount++;
}
}
//将变长字符串定长byte[],方便读取
private static byte[] getBytes(String s, int length) {
int fixLength = length - s.getBytes().length;
if (s.getBytes().length < length) {
byte[] S_bytes = new byte[length];
System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);
for (int x = length - fixLength; x < length; x++) {
S_bytes[x] = 0x00;
}
return S_bytes;
}
return s.getBytes(StandardCharsets.UTF_8);
}
}
运行结果:
写入消息,第:0次
totalSize:322
msgId:61b5c500-f7f5-4bfe-beb1-50a8148534c0
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsg
transactionId:9453ba39-3982-40e9-926d-47b51d360590
commitLogOffset:0
写入消息,第:1次
totalSize:306
msgId:d0fbf80f-223b-4721-a43e-518b152decc2
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgms
transactionId:e2ef1652-58fa-4849-bf74-885c7e5db9e3
commitLogOffset:322
写入消息,第:2次
totalSize:307
msgId:199053e3-e616-4611-ab0d-e5c7af4549a9
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsg
transactionId:33d21abe-0d8e-4c0e-9c78-f415daefd767
commitLogOffset:628
写入消息,第:3次
totalSize:339
msgId:8e799d8e-3290-4f6b-ab5d-289153446994
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:98b46b96-cc88-4969-a56f-282d25799085
commitLogOffset:935
写入消息,第:4次
totalSize:320
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:5c0ff6fe-aea3-40c3-8647-6f4bdd797a78
commitLogOffset:1274
写入消息,第:5次
totalSize:312
msgId:b33c6f31-cc96-462b-b095-99410459082c
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgms
transactionId:57420047-2539-43fa-a3f2-b2f55c7b059c
commitLogOffset:1594
写入消息,第:6次
totalSize:324
msgId:d0a6803f-8555-418e-988a-b3b9a70d14f0
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:29601335-3fcd-4193-b14f-140bbaf409a4
commitLogOffset:1906
写入消息,第:7次
totalSize:293
msgId:91151ec5-e76b-4560-90b7-ab77f9d04c9a
topic:Topic-test
msgBody:消息内容m
transactionId:291e54de-2ebe-41b1-b974-e81a2e9f1370
commitLogOffset:2230
写入消息,第:8次
totalSize:323
msgId:eb21df35-b4dc-43aa-8604-e9a103f25a7b
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:3005a39d-b8cb-4138-ae05-34b65fc135a2
commitLogOffset:2523
写入消息,第:9次
totalSize:296
msgId:abcda364-8fc3-4d18-ae5d-1d7a8ffd0929
topic:Topic-test
msgBody:消息内容msgm
transactionId:d42733b5-3911-4f0a-b1db-11eb45a30345
commitLogOffset:2846
commitLog数据保存完成,totalSize:10
创建consumerQueue文件开始
consumerQueue数据写入完成:{"msgLength":322,"offset":0,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":306,"offset":322,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":307,"offset":628,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":339,"offset":935,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":320,"offset":1274,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":312,"offset":1594,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":324,"offset":1906,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":293,"offset":2230,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":323,"offset":2523,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":296,"offset":2846,"tagCode":100}
ConsumerQueue数据保存完成count:10
创建索引文件开始
msgId:61b5c500-f7f5-4bfe-beb1-50a8148534c0,keyHash:249765627,保存槽位为27的数据,absSlotPos=148,值index=0,绝对位置:440,commit-phyOffset:0
msgId:d0fbf80f-223b-4721-a43e-518b152decc2,keyHash:1587335015,保存槽位为15的数据,absSlotPos=100,值index=1,绝对位置:460,commit-phyOffset:322
msgId:199053e3-e616-4611-ab0d-e5c7af4549a9,keyHash:791210473,保存槽位为73的数据,absSlotPos=332,值index=2,绝对位置:480,commit-phyOffset:628
msgId:8e799d8e-3290-4f6b-ab5d-289153446994,keyHash:1460275929,保存槽位为29的数据,absSlotPos=156,值index=3,绝对位置:500,commit-phyOffset:935
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b,keyHash:1174005465,保存槽位为65的数据,absSlotPos=300,值index=4,绝对位置:520,commit-phyOffset:1274
msgId:b33c6f31-cc96-462b-b095-99410459082c,keyHash:-1695757800,保存槽位为0的数据,absSlotPos=40,值index=5,绝对位置:540,commit-phyOffset:1594
msgId:d0a6803f-8555-418e-988a-b3b9a70d14f0,keyHash:1334295408,保存槽位为8的数据,absSlotPos=72,值index=6,绝对位置:560,commit-phyOffset:1906
msgId:91151ec5-e76b-4560-90b7-ab77f9d04c9a,keyHash:1287318090,保存槽位为90的数据,absSlotPos=400,值index=7,绝对位置:580,commit-phyOffset:2230
msgId:eb21df35-b4dc-43aa-8604-e9a103f25a7b,keyHash:239865974,保存槽位为74的数据,absSlotPos=336,值index=8,绝对位置:600,commit-phyOffset:2523
msgId:abcda364-8fc3-4d18-ae5d-1d7a8ffd0929,keyHash:-1173357775,保存槽位为75的数据,absSlotPos=340,值index=9,绝对位置:620,commit-phyOffset:2846
2.读取consumeQueue文件,并根据offset从commitLog读取一条完整的消息
package org.apache.rocketmq.test.smoke;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
public class ConsumeQueueMessageReadTest {
public static MappedByteBuffer mappedByteBuffer = null;
private static int MESSAGE_COUNT = 10;
public static void main(String[] args) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fileChannel.close();
mappedByteBuffer.position(0);
//根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中
int index = 0 ;
for(int i =index;i
运行结果:
=================commitlog读取偏移量为0的消息===================
totalSize:322
msgId:61b5c500-f7f5-4bfe-beb1-50a8148534c0
topic:Topic-test
queueOffset:0
bodySize:42
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsg
transactionId:9453ba39-3982-40e9-926d-47b51d360590
commitLogOffset:0
=================commitlog读取偏移量为322的消息===================
totalSize:306
msgId:d0fbf80f-223b-4721-a43e-518b152decc2
topic:Topic-test
queueOffset:1
bodySize:26
body:消息内容msgmsgmsgmsgms
transactionId:e2ef1652-58fa-4849-bf74-885c7e5db9e3
commitLogOffset:322
=================commitlog读取偏移量为628的消息===================
totalSize:307
msgId:199053e3-e616-4611-ab0d-e5c7af4549a9
topic:Topic-test
queueOffset:2
bodySize:27
body:消息内容msgmsgmsgmsgmsg
transactionId:33d21abe-0d8e-4c0e-9c78-f415daefd767
commitLogOffset:628
=================commitlog读取偏移量为935的消息===================
totalSize:339
msgId:8e799d8e-3290-4f6b-ab5d-289153446994
topic:Topic-test
queueOffset:3
bodySize:59
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:98b46b96-cc88-4969-a56f-282d25799085
commitLogOffset:935
=================commitlog读取偏移量为1274的消息===================
totalSize:320
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b
topic:Topic-test
queueOffset:4
bodySize:40
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:5c0ff6fe-aea3-40c3-8647-6f4bdd797a78
commitLogOffset:1274
=================commitlog读取偏移量为1594的消息===================
totalSize:312
msgId:b33c6f31-cc96-462b-b095-99410459082c
topic:Topic-test
queueOffset:5
bodySize:32
body:消息内容msgmsgmsgmsgmsgmsgms
transactionId:57420047-2539-43fa-a3f2-b2f55c7b059c
commitLogOffset:1594
=================commitlog读取偏移量为1906的消息===================
totalSize:324
msgId:d0a6803f-8555-418e-988a-b3b9a70d14f0
topic:Topic-test
queueOffset:6
bodySize:44
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:29601335-3fcd-4193-b14f-140bbaf409a4
commitLogOffset:1906
=================commitlog读取偏移量为2230的消息===================
totalSize:293
msgId:91151ec5-e76b-4560-90b7-ab77f9d04c9a
topic:Topic-test
queueOffset:7
bodySize:13
body:消息内容m
transactionId:291e54de-2ebe-41b1-b974-e81a2e9f1370
commitLogOffset:2230
=================commitlog读取偏移量为2523的消息===================
totalSize:323
msgId:eb21df35-b4dc-43aa-8604-e9a103f25a7b
topic:Topic-test
queueOffset:8
bodySize:43
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:3005a39d-b8cb-4138-ae05-34b65fc135a2
commitLogOffset:2523
=================commitlog读取偏移量为2846的消息===================
totalSize:296
msgId:abcda364-8fc3-4d18-ae5d-1d7a8ffd0929
topic:Topic-test
queueOffset:9
bodySize:16
body:消息内容msgm
transactionId:d42733b5-3911-4f0a-b1db-11eb45a30345
commitLogOffset:28
3.根据messageId读取indexFile,然后根据偏移量从CommitLog读取一条完整的消息
package org.apache.rocketmq.test.smoke;
import java.io.IOException;
import java.net.URI;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
public class IndexFileMessageReadTest {
public static MappedByteBuffer mappedByteBuffer = null;
public static void main(String[] args) throws IOException {
String msgId = "8b78474f-b28a-4442-99a0-6f7883f0302b";
readByMessageId(msgId);
}
private static void readByMessageId(String messageId) throws IOException {
FileChannel indexFileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer indexMappedByteBuffer = indexFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
indexFileChannel.close();
System.out.println("============get indexFile header===============");
System.out.println("beginTimestampIndex:"+indexMappedByteBuffer.getLong());
System.out.println("endTimestampIndex:"+indexMappedByteBuffer.getLong());
System.out.println("beginPhyoffsetIndex:"+indexMappedByteBuffer.getLong());
System.out.println("endPhyoffsetIndex:"+indexMappedByteBuffer.getLong());
System.out.println("hashSlotcountIndex:"+indexMappedByteBuffer.getInt());
System.out.println("indexCountIndex:"+indexMappedByteBuffer.getInt());
System.out.println("");
int keyHash = messageId.hashCode();
//取模,计算第几个槽位
int slotPos = keyHash % 100 > 0?keyHash % 100:-1*(keyHash % 100);
System.out.println("messageId:"+messageId+ ",取模为:"+slotPos);
// slot在文件中的字节数组位置
int absSlotPos = 40 + slotPos * 4;
System.out.println("哈希槽的字节数组位置:(40+"+slotPos+"*4)="+absSlotPos);
//获取hash槽上存取的件索引,第几个文件
int index =indexMappedByteBuffer.getInt(absSlotPos);
//计算数据需要存储的偏移量
int absIndexPos =
40 + 100 * 4
+ index * 20;
System.out.println("第几个文件index="+index+",实际存储数据的字节数组位置:(40 + 100 * 4+index *20)="+absIndexPos);
long keyHash1 = indexMappedByteBuffer.getInt(absIndexPos);
long pyhOffset = indexMappedByteBuffer.getLong(absIndexPos+4);
int timeDiff = indexMappedByteBuffer.getInt(absIndexPos+4+8);
int preIndexNo = indexMappedByteBuffer.getInt(absIndexPos+4+8+4);
System.out.println("从index获取到的commitLog偏移量为:"+pyhOffset);
System.out.println("");
readCommitLogByOffset((int)pyhOffset);
}
public static MappedByteBuffer initFileChannel() throws IOException {
if(mappedByteBuffer == null){
FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
commitLogfileChannel.close();
}
return mappedByteBuffer;
}
/*
*
* 根据偏移量读取CcommitLog
* */
public static void readCommitLogByOffset(int offset) throws IOException {
/* 存放顺序,读时候保持顺序一致
b.putLong(totalSize);//totalSize
b.put(getBytes(msgId, 64));//msgId
b.put(getBytes(topic, 64));//topic,定长64
b.putLong(queueOffset);//索引偏移量
b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
b.put(getBytes(transactionId, 64));
b.putLong(commitLogOffset);//commitLogOffset
*/
System.out.println("=================commitlog读取偏移量为"+offset+"的消息===================");
MappedByteBuffer mappedByteBuffer = initFileChannel();
mappedByteBuffer.position(offset);
long totalSize = mappedByteBuffer.getLong();//消息长度
byte[] msgIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(msgIdByte);
byte[] topicByte = new byte[64];// 固定是64
mappedByteBuffer.get(topicByte);
long queueOffset = mappedByteBuffer.getLong();
Long bodySize = mappedByteBuffer.getLong();
int bSize = Integer.valueOf(bodySize+"");
byte[] bodyByte = new byte[bSize];//bodySize 长度不固定
mappedByteBuffer.get(bodyByte);
byte[] transactionIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(transactionIdByte);
long commitLogOffset = mappedByteBuffer.getLong();//偏移量
System.out.println("totalSize:"+totalSize);
System.out.println("msgId:"+new String(msgIdByte));
System.out.println("topic:"+new String(topicByte));
System.out.println("queueOffset:"+queueOffset);
System.out.println("bodySize:"+bodySize);
System.out.println("body:"+new String(bodyByte));
System.out.println("transactionId:"+new String(transactionIdByte));
System.out.println("commitLogOffset:"+commitLogOffset);
}
public static byte[] toByteArray(long number) {
byte length = Long.BYTES;
byte[] bytes = new byte[length];
for (byte i = 0; i < length; i++) {
bytes[length - 1 - i] = (byte) number;
number >>= 8;
}
return bytes;
}
}
运行结果:
============get indexFile header===============
beginTimestampIndex:1669554286826
endTimestampIndex:1669552196010
beginPhyoffsetIndex:0
endPhyoffsetIndex:31259
hashSlotcountIndex:10
indexCountIndex:10
messageId:8b78474f-b28a-4442-99a0-6f7883f0302b,取模为:65
哈希槽的字节数组位置:(40+65*4)=300
第几个文件index=4,实际存储数据的字节数组位置:(40 + 100 * 4+index *20)=520
从index获取到的commitLog偏移量为:1274
=================commitlog读取偏移量为1274的消息===================
totalSize:320
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b
topic:Topic-test
queueOffset:4
bodySize:40
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:5c0ff6fe-aea3-40c3-8647-6f4bdd797a78
commitLogOffset:1274
本文基于java NIO实现了RocketMQ的文件系统的最精简的实现,希望能帮助相关开发人员了解文件系统底层的实现原理。欢迎一起交流讨论,不足的地方欢迎指正。
更新时间:2024-08-27
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号