RocketMQ源码分析之RemotingCommand网络通信协议源码分析

一、前言

在分析NameServer的请求和响应流程之前我们需要先看一下他的序列化协议是怎样的,RocketMQ支持的序列化协议有以下2种:

  1. JSON;
  2. RocketMQ自定义的协议;

json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间,一般成熟的中间件项目一般都会采用自定义的方式进行序列化和反序列化;

二、RemotingCommand源码分析

RemotingCommand为RocketMQ中自定义协议组件,其中包含了序列化和反序列化代码逻辑;

但是不向服务直接提供调用,而是通过前文讲解的NettyRemotingServer类中的NettyEncoder(编码器)和NettyDecoder(解码器)进行具体的调用;

序列化:就是将一段字节数组以固定的顺序的形式存放数据,第一个字节存放什么,后面4个字节存放什么,再后面几个字节存放什么;

反序列化:就是以固定的顺序取数据,你第一个字节存放的是消息的标志位,那你取出来就是消息的标志位,再后面4个为消息体的长度,那取出来就是消息体的长度,你再可以根据消息体的长度去获取对应长度字节的数据;

1、数据模型

public class RemotingCommand {

    public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
    public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
    public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
    private static final int RPC_ONEWAY = 1; // 0, RPC
    private static final Map, Field[]> CLASS_HASH_MAP =
        new HashMap, Field[]>();
    private static final Map CANONICAL_NAME_CACHE = new HashMap();
    // 1, Oneway
    // 1, RESPONSE_COMMAND
    private static final Map NULLABLE_FIELD_CACHE = new HashMap();
    private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
    private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
    private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
    private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
    private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
    private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
    private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
    private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
    private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
    private static volatile int configVersion = -1;
    private static AtomicInteger requestId = new AtomicInteger(0);

    private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;

    static {
        final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
        if (!isBlank(protocol)) {
            try {
                serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
            } catch (IllegalArgumentException e) {
                throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
            }
        }
    }

    // code编号,请求编号
    private int code;
    private LanguageCode language = LanguageCode.JAVA; // 编程语言,java
    private int version = 0; // 版本号
    private int opaque = requestId.getAndIncrement(); // 请求id
    private int flag = 0; // 标识
    private String remark; // 备注
    private HashMap extFields; // 扩展字段
    private transient CommandCustomHeader customHeader; // 自定义header头
    // 这一次rpc调用的序列化类型,默认就是json格式
    private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
    // 消息体,会把真正的消息体序列化成字节数组
    private transient byte[] body;
}

2、序列化

org.apache.rocketmq.remoting.netty.NettyEncoder#encode

public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
    throws Exception {
    try {
        ByteBuffer header = remotingCommand.encodeHeader();
        out.writeBytes(header);
        byte[] body = remotingCommand.getBody();
        if (body != null) {
            out.writeBytes(body);
        }
    } catch (Exception e) {
        log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
        if (remotingCommand != null) {
            log.error(remotingCommand.toString());
        }
        RemotingUtil.closeChannel(ctx.channel());
    }
}
public ByteBuffer encodeHeader() {
    return encodeHeader(this.body != null ? this.body.length : 0);
}

public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> header length size
    int length = 4;

    // 2> header data length
    byte[] headerData;
    headerData = this.headerEncode();

    length += headerData.length;

    // 3> body data length
    length += bodyLength;

    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // length
    result.putInt(length);

    // header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data
    result.put(headerData);

    result.flip();

    return result;
}

这里会去判断序列化协议的类型,json类型其实没什么好看的,JSON.toJSONString(obj, prettyFormat).getBytes(CHARSET_UTF8); 就没了,我们主要是看RocketMQ的自定义协议;

private byte[] headerEncode() {
    // 把自定义headers放到一个ext fields map里去
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
        return RemotingSerializable.encode(this);
    }
}
public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
        // 通过反射获取到自定义header类里面的fields
        Field[] fields = getClazzFields(customHeader.getClass());
        if (null == this.extFields) {
            this.extFields = new HashMap();
        }

        // 对自定义header类的fields进行遍历
        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();
                if (!name.startsWith("this")) {
                    Object value = null;
                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }

                    // 自定义header这些fields都是放到ext fields里面去
                    if (value != null) {
                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}
private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
    // 如果说你要是自定义了一套header以后,你搞一个类,实现接口
    // 然后在这个自定义头的类里,可以定义一堆的field,这些field就是你的自定义的头
    Field[] field = CLASS_HASH_MAP.get(classHeader);

    if (field == null) {
        // 通过反射直接获取到你自定义类里的头fields拿出来
        field = classHeader.getDeclaredFields();
        synchronized (CLASS_HASH_MAP) {
            CLASS_HASH_MAP.put(classHeader, field);
        }
    }
    return field;
}
public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode(); // header length里一共是4个字节,第一个字节是序列化类型code
    result[1] = (byte) ((source >> 16) & 0xFF); // 第二个字节开始到第四个字节,一共是3个字节都是跟header length是有关系的
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
}

其实自定义序列化就是搞一个byte数组,采用固定的显示进行构建。

如:第一个字节放请求类型,后面四个字节放消息体总长度,在后面发具体的消息体。消息体前面几位为header长度,后面为header消息体等等,通过固定排列的顺序进行构建,这样解析的时候我们就可以根据字节顺序来读取消息了。

public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
    // 用json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间一些
    // 常规做法是自己对RemotingCommand协议数据对象进行序列化
    // 编码,对象 -> 字节数组

    // String remark
    byte[] remarkBytes = null;
    int remarkLen = 0;
    if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
        remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
        remarkLen = remarkBytes.length;
    }

    // HashMap extFields
    // ext fields,是我们可能的自定义headers就在这里,把扩展头序列化为字节数组
    byte[] extFieldsBytes = null;
    int extLen = 0;
    if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
        extFieldsBytes = mapSerialize(cmd.getExtFields());
        extLen = extFieldsBytes.length;
    }

    // 计算出来消息头总长度
    int totalLen = calTotalLen(remarkLen, extLen);

    ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
    // int code(~32767)
    headerBuffer.putShort((short) cmd.getCode());
    // LanguageCode language
    headerBuffer.put(cmd.getLanguage().getCode());
    // int version(~32767)
    headerBuffer.putShort((short) cmd.getVersion());
    // int opaque
    headerBuffer.putInt(cmd.getOpaque());
    // int flag
    headerBuffer.putInt(cmd.getFlag());
    // String remark
    if (remarkBytes != null) {
        headerBuffer.putInt(remarkBytes.length);
        headerBuffer.put(remarkBytes);
    } else {
        headerBuffer.putInt(0);
    }
    // HashMap extFields;
    if (extFieldsBytes != null) {
        headerBuffer.putInt(extFieldsBytes.length);
        headerBuffer.put(extFieldsBytes);
    } else {
        headerBuffer.putInt(0);
    }

    return headerBuffer.array();
}

3、反序列化

org.apache.rocketmq.remoting.netty.NettyDecoder#decode

public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    ByteBuf frame = null;
    try {
        frame = (ByteBuf) super.decode(ctx, in);
        if (null == frame) {
            return null;
        }

        ByteBuffer byteBuffer = frame.nioBuffer();

        return RemotingCommand.decode(byteBuffer);
    } catch (Exception e) {
        log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
        RemotingUtil.closeChannel(ctx.channel());
    } finally {
        if (null != frame) {
            frame.release();
        }
    }

    return null;
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException {
    // 解码的过程就是编码过程的逆向过程
    int length = byteBuffer.limit(); // 总长度
    int oriHeaderLen = byteBuffer.getInt(); // 头长度
    int headerLength = getHeaderLength(oriHeaderLen);

    // 搞一个头长度的字节数组,一次性把headers都读出来放到字节数组里去
    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    // 对header要做一个解码
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}

这里判断header是用什么协议进行序列化的,就会使用什么协议进行反序列化;

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException {
    switch (type) {
        case JSON:
            RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
            resultJson.setSerializeTypeCurrentRPC(type);
            return resultJson;
        case ROCKETMQ:
            RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
            resultRMQ.setSerializeTypeCurrentRPC(type);
            return resultRMQ;
        default:
            break;
    }

    return null;
}

我们之间看rocketMQ自定义的协议吧,其实就是一个逆向的过程,你之前放的什么,他就根据字节拿出来;

public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException {
    RemotingCommand cmd = new RemotingCommand();
    ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
    // int code(~32767)
    cmd.setCode(headerBuffer.getShort());
    // LanguageCode language
    cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
    // int version(~32767)
    cmd.setVersion(headerBuffer.getShort());
    // int opaque
    cmd.setOpaque(headerBuffer.getInt());
    // int flag
    cmd.setFlag(headerBuffer.getInt());
    // String remark
    int remarkLength = headerBuffer.getInt();
    if (remarkLength > 0) {
        if (remarkLength > headerArray.length) {
            throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length);
        }
        byte[] remarkContent = new byte[remarkLength];
        headerBuffer.get(remarkContent);
        cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
    }

    // HashMap extFields
    int extFieldsLength = headerBuffer.getInt();
    if (extFieldsLength > 0) {
        if (extFieldsLength > headerArray.length) {
            throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length);
        }
        byte[] extFieldsBytes = new byte[extFieldsLength];
        headerBuffer.get(extFieldsBytes);
        cmd.setExtFields(mapDeserialize(extFieldsBytes));
    }
    
    return cmd;
}

三、总结

展开阅读全文

页面更新:2024-03-31

标签:数组   字节   源码   顺序   长度   做法   协议   过程   消息   类型   通信协议   数据   网络

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top