这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
func (s *ServerSession) RunLoop() (err error) {
if err = s.handshake(); err != nil {
_ = s.dispose(err)
return err
}
err = s.runReadLoop()
_ = s.dispose(err)
return err
}
func (s *ServerSession) doMsg(stream *Stream) error {
if err := s.writeAcknowledgementIfNeeded(stream); err != nil {
return err
}
//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)
switch stream.header.MsgTypeId {
case base.RtmpTypeIdWinAckSize:
return s.doWinAckSize(stream)
case base.RtmpTypeIdSetChunkSize:
// noop
// 因为底层的 chunk composer 已经处理过了,这里就不用处理
case base.RtmpTypeIdCommandMessageAmf0:
return s.doCommandMessage(stream)
case base.RtmpTypeIdCommandMessageAmf3:
return s.doCommandAmf3Message(stream)
case base.RtmpTypeIdMetadata:
return s.doDataMessageAmf0(stream)
case base.RtmpTypeIdAck:
return s.doAck(stream)
case base.RtmpTypeIdUserControl:
s.doUserControl(stream)
case base.RtmpTypeIdAudio:
fallthrough
case base.RtmpTypeIdVideo:
if s.sessionStat.BaseType() != base.SessionBaseTypePubStr {
return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)
}
s.avObserver.OnReadRtmpAvMsg(stream.toAvMsg())
default:
Log.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey(), stream.header.MsgTypeId, stream.toDebugString())
}
return nil
}
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:3 MsgLen:196 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [connect] - server_session.go:345
INFO [RTMPPUBSUB4] < R connect('live'). tcUrl=rtmp://127.0.0.1:1935/live - server_session.go:413
INFO [RTMPPUBSUB4] > W Window Acknowledgement Size 5000000. - server_session.go:417
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:2 MsgLen:4 MsgTypeId:5 MsgStreamId:0 TimestampAbs:0}, - server_session.go:216
INFO [RTMPPUBSUB4] < R Window Acknowledgement Size: 5000000 - server_session.go:257
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:3 MsgLen:25 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [createStream] - server_session.go:345
INFO [RTMPPUBSUB4] < R createStream(). - server_session.go:444
INFO [RTMPPUBSUB4] > W _result(). - server_session.go:445
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:8 MsgLen:38 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [getStreamLength] - server_session.go:345
2023/04/08 10:09:06.774588 DEBUG [RTMPPUBSUB4] read command message, ignore it. cmd=getStreamLength, header={Csid:8 MsgLen:38 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=4096, rpos=27, wpos=38, hex=00000000 05 02 00 07 74 65 73 74 31 31 31 |....test111|
- server_session.go:366
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:8 MsgLen:36 MsgTypeId:20 MsgStreamId:1 TimestampAbs:0}, - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [play] - server_session.go:345
INFO [RTMPPUBSUB4] < R play('test111'). - server_session.go:507
INFO [RTMPPUBSUB4] > W onStatus('NetStream.Play.Start'). - server_session.go:517
2023/04/08 10:09:06.774929 DEBUG [GROUP2] [RTMPPUBSUB4] add SubSession into group. - group__out_sub.go:20
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:2 MsgLen:10 MsgTypeId:4 MsgStreamId:0 TimestampAbs:1}, - server_session.go:216
connect
->
server bandwidth
->
createStream
->
getStreamLength
->
play
->
control message
func (s *ServerSession) doWinAckSize(stream *Stream) error {
if stream.msg.Len() < 4 {
return base.NewErrRtmpShortBuffer(4, int(stream.msg.Len()), "ServerSession::doWinAckSize")
}
s.peerWinAckSize = int(bele.BeUint32(stream.msg.buff.Bytes()))
Log.Infof("[%s] < R Window Acknowledgement Size: %d", s.UniqueKey(), s.peerWinAckSize)
return nil
}
func (s *ServerSession) doCreateStream(tid int, stream *Stream) error {
Log.Infof("[%s] < R createStream().", s.UniqueKey())
Log.Infof("[%s] > W _result().", s.UniqueKey())
if err := s.packer.writeCreateStreamResult(s.conn, tid); err != nil {
return err
}
return nil
}
func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
if err = stream.msg.readNull(); err != nil {
return err
}
s.streamNameWithRawQuery, err = stream.msg.readStringWithType()
if err != nil {
return err
}
ss := strings.Split(s.streamNameWithRawQuery, "?")
s.streamName = ss[0]
if len(ss) == 2 {
s.rawQuery = ss[1]
}
s.url = fmt.Sprintf("%s/%s", s.tcUrl, s.streamNameWithRawQuery)
Log.Infof("[%s] < R play('%s').", s.UniqueKey(), s.streamNameWithRawQuery)
// TODO chef: start duration reset
if err := s.packer.writeStreamIsRecorded(s.conn, Msid1); err != nil {
return err
}
if err := s.packer.writeStreamBegin(s.conn, Msid1); err != nil {
return err
}
Log.Infof("[%s] > W onStatus('NetStream.Play.Start').", s.UniqueKey())
if err := s.packer.writeOnStatusPlay(s.conn, Msid1); err != nil {
return err
}
// 回复完信令后修改 connection 的属性
s.modConnProps()
s.sessionStat.SetBaseType(base.SessionBaseTypeSubStr)
err = s.observer.OnNewRtmpSubSession(s)
if err != nil {
s.DisposeByObserverFlag = true
}
return err
}
func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2SubStartInfo(session)
if err := sm.option.Authentication.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddRtmpSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnSubStart(info)
return nil
}
func (s *ServerSession) doUserControl(stream *Stream) error {
// TODO(chef): 检查buff长度有效性 202301
userControlType := bele.BeUint16(stream.msg.buff.Bytes())
if userControlType == uint16(base.RtmpUserControlPingRequest) {
stream.msg.buff.Skip(2)
timestamp := bele.BeUint32(stream.msg.buff.Bytes())
return s.packer.writePingResponse(s.conn, timestamp)
}
return nil
}
for session := range group.rtmpSubSessionSet {
if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.rtmpGopCache.MetadataEnsureWithoutSetDataFrame != nil {
Log.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.MetadataEnsureWithoutSetDataFrame)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
Log.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AacSeqHeader != nil {
Log.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.AacSeqHeader)
}
gopCount := group.rtmpGopCache.GetGopCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
Log.Debugf("[%s] [%s] write gop cache. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
}
for i := 0; i < gopCount; i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
_ = session.Write(item)
}
}
// 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 从而确保新加入的sub session不会发送这部分脏的数据
// 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据
if group.rtmpMergeWriter != nil {
group.rtmpMergeWriter.Flush()
}
session.IsFresh = false
}
if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() {
// 有sub session在等待关键帧,并且当前是关键帧
// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 并且修改这个sub session的标志
// 让rtmp buf writer来发送这个关键帧
if group.rtmpMergeWriter != nil {
group.rtmpMergeWriter.Flush()
}
session.ShouldWaitVideoKeyFrame = false
}
}
if len(group.rtmpSubSessionSet) > 0 {
if group.rtmpMergeWriter == nil {
group.write2RtmpSubSessions(lazyRtmpChunkDivider.GetEnsureWithoutSdf())
} else {
group.rtmpMergeWriter.Write(lazyRtmpChunkDivider.GetEnsureWithoutSdf())
}
}
func (group *Group) write2RtmpSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Write(b)
}
}
页面更新:2024-05-15
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号