Golang流媒体实战之六:lal拉流服务源码阅读

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

《Golang流媒体实战》系列的链接

  1. 体验开源项目lal
  2. 回源
  3. 转推和录制
  4. lalserver的启动源码阅读
  5. Golang流媒体实战之五:lal推流服务源码阅读
  6. Golang流媒体实战之六:lal拉流服务源码阅读

本篇概览

直接跳过一部分源码

开始阅读

func (s *ServerSession) RunLoop() (err error) {
	if err = s.handshake(); err != nil {
		_ = s.dispose(err)
		return err
	}

	err = s.runReadLoop()
	_ = s.dispose(err)

	return err
}
func (s *ServerSession) doMsg(stream *Stream) error {
	if err := s.writeAcknowledgementIfNeeded(stream); err != nil {
		return err
	}

	//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)
	switch stream.header.MsgTypeId {
	case base.RtmpTypeIdWinAckSize:
		return s.doWinAckSize(stream)
	case base.RtmpTypeIdSetChunkSize:
		// noop
		// 因为底层的 chunk composer 已经处理过了,这里就不用处理
	case base.RtmpTypeIdCommandMessageAmf0:
		return s.doCommandMessage(stream)
	case base.RtmpTypeIdCommandMessageAmf3:
		return s.doCommandAmf3Message(stream)
	case base.RtmpTypeIdMetadata:
		return s.doDataMessageAmf0(stream)
	case base.RtmpTypeIdAck:
		return s.doAck(stream)
	case base.RtmpTypeIdUserControl:
		s.doUserControl(stream)
	case base.RtmpTypeIdAudio:
		fallthrough
	case base.RtmpTypeIdVideo:
		if s.sessionStat.BaseType() != base.SessionBaseTypePubStr {
			return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)
		}
		s.avObserver.OnReadRtmpAvMsg(stream.toAvMsg())
	default:
		Log.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey(), stream.header.MsgTypeId, stream.toDebugString())

	}
	return nil
}




INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:3 MsgLen:196 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [connect] - server_session.go:345
INFO [RTMPPUBSUB4] < R connect('live'). tcUrl=rtmp://127.0.0.1:1935/live - server_session.go:413
INFO [RTMPPUBSUB4] > W Window Acknowledgement Size 5000000. - server_session.go:417
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:2 MsgLen:4 MsgTypeId:5 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO [RTMPPUBSUB4] < R Window Acknowledgement Size: 5000000 - server_session.go:257
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:3 MsgLen:25 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [createStream] - server_session.go:345
INFO [RTMPPUBSUB4] < R createStream(). - server_session.go:444
INFO [RTMPPUBSUB4] > W _result(). - server_session.go:445
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:8 MsgLen:38 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [getStreamLength] - server_session.go:345
2023/04/08 10:09:06.774588 DEBUG [RTMPPUBSUB4] read command message, ignore it. cmd=getStreamLength, header={Csid:8 MsgLen:38 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=4096, rpos=27, wpos=38, hex=00000000  05 02 00 07 74 65 73 74  31 31 31                 |....test111|
 - server_session.go:366
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:8 MsgLen:36 MsgTypeId:20 MsgStreamId:1 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [play] - server_session.go:345
INFO [RTMPPUBSUB4] < R play('test111'). - server_session.go:507
INFO [RTMPPUBSUB4] > W onStatus('NetStream.Play.Start'). - server_session.go:517
2023/04/08 10:09:06.774929 DEBUG [GROUP2] [RTMPPUBSUB4] add SubSession into group. - group__out_sub.go:20
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:2 MsgLen:10 MsgTypeId:4 MsgStreamId:0 TimestampAbs:1},  - server_session.go:216
connect
->
server bandwidth
->
createStream
->
getStreamLength
->
play
->
control message



会话类型是何时确定的










server bandwidth(5)

func (s *ServerSession) doWinAckSize(stream *Stream) error {
	if stream.msg.Len() < 4 {
		return base.NewErrRtmpShortBuffer(4, int(stream.msg.Len()), "ServerSession::doWinAckSize")
	}

	s.peerWinAckSize = int(bele.BeUint32(stream.msg.buff.Bytes()))
	Log.Infof("[%s] < R Window Acknowledgement Size: %d", s.UniqueKey(), s.peerWinAckSize)
	return nil
}

createStream

func (s *ServerSession) doCreateStream(tid int, stream *Stream) error {
	Log.Infof("[%s] < R createStream().", s.UniqueKey())
	Log.Infof("[%s] > W _result().", s.UniqueKey())
	if err := s.packer.writeCreateStreamResult(s.conn, tid); err != nil {
		return err
	}
	return nil
}

getStreamLength



play

func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
	if err = stream.msg.readNull(); err != nil {
		return err
	}
	s.streamNameWithRawQuery, err = stream.msg.readStringWithType()
	if err != nil {
		return err
	}
	ss := strings.Split(s.streamNameWithRawQuery, "?")
	s.streamName = ss[0]
	if len(ss) == 2 {
		s.rawQuery = ss[1]
	}

	s.url = fmt.Sprintf("%s/%s", s.tcUrl, s.streamNameWithRawQuery)

	Log.Infof("[%s] < R play('%s').", s.UniqueKey(), s.streamNameWithRawQuery)
	// TODO chef: start duration reset

	if err := s.packer.writeStreamIsRecorded(s.conn, Msid1); err != nil {
		return err
	}
	if err := s.packer.writeStreamBegin(s.conn, Msid1); err != nil {
		return err
	}

	Log.Infof("[%s] > W onStatus('NetStream.Play.Start').", s.UniqueKey())
	if err := s.packer.writeOnStatusPlay(s.conn, Msid1); err != nil {
		return err
	}

	// 回复完信令后修改 connection 的属性
	s.modConnProps()

	s.sessionStat.SetBaseType(base.SessionBaseTypeSubStr)
	err = s.observer.OnNewRtmpSubSession(s)
	if err != nil {
		s.DisposeByObserverFlag = true
	}
	return err
}



func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) error {
	sm.mutex.Lock()
	defer sm.mutex.Unlock()

	info := base.Session2SubStartInfo(session)

	if err := sm.option.Authentication.OnSubStart(info); err != nil {
		return err
	}

	group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
	group.AddRtmpSubSession(session)

	info.HasInSession = group.HasInSession()
	info.HasOutSession = group.HasOutSession()

	sm.option.NotifyHandler.OnSubStart(info)
	return nil
}

Control Message

func (s *ServerSession) doUserControl(stream *Stream) error {
	// TODO(chef): 检查buff长度有效性 202301
	userControlType := bele.BeUint16(stream.msg.buff.Bytes())
	if userControlType == uint16(base.RtmpUserControlPingRequest) {
		stream.msg.buff.Skip(2)
		timestamp := bele.BeUint32(stream.msg.buff.Bytes())
		return s.packer.writePingResponse(s.conn, timestamp)
	}
	return nil
}

拉流动作

	for session := range group.rtmpSubSessionSet {
		if session.IsFresh {
			// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
			if group.rtmpGopCache.MetadataEnsureWithoutSetDataFrame != nil {
				Log.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey())
				_ = session.Write(group.rtmpGopCache.MetadataEnsureWithoutSetDataFrame)
			}
			if group.rtmpGopCache.VideoSeqHeader != nil {
				Log.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey())
				_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
			}
			if group.rtmpGopCache.AacSeqHeader != nil {
				Log.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey())
				_ = session.Write(group.rtmpGopCache.AacSeqHeader)
			}
			gopCount := group.rtmpGopCache.GetGopCount()
			if gopCount > 0 {
				// GOP缓存中肯定包含了关键帧
				session.ShouldWaitVideoKeyFrame = false

				Log.Debugf("[%s] [%s] write gop cache. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
			}
			for i := 0; i < gopCount; i++ {
				for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
					_ = session.Write(item)
				}
			}

			// 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session
			// 从而确保新加入的sub session不会发送这部分脏的数据
			// 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据
			if group.rtmpMergeWriter != nil {
				group.rtmpMergeWriter.Flush()
			}

			session.IsFresh = false
		}

		if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() {
			// 有sub session在等待关键帧,并且当前是关键帧
			// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session
			// 并且修改这个sub session的标志
			// 让rtmp buf writer来发送这个关键帧
			if group.rtmpMergeWriter != nil {
				group.rtmpMergeWriter.Flush()
			}
			session.ShouldWaitVideoKeyFrame = false
		}
	} 
	if len(group.rtmpSubSessionSet) > 0 {
		if group.rtmpMergeWriter == nil {
			group.write2RtmpSubSessions(lazyRtmpChunkDivider.GetEnsureWithoutSdf())
		} else {
			group.rtmpMergeWriter.Write(lazyRtmpChunkDivider.GetEnsureWithoutSdf())
		}
	}
func (group *Group) write2RtmpSubSessions(b []byte) {
	for session := range group.rtmpSubSessionSet {
		if session.IsFresh || session.ShouldWaitVideoKeyFrame {
			continue
		}
		_ = session.Write(b)
	}
}

欢迎关注头条号:程序员欣宸

展开阅读全文

页面更新:2024-05-15

标签:流媒体   实战   源码   命令   关键   消息   类型   代码   方法   日志

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top