pion ICE流程分析
func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, error) {// https://w3c.github.io/webrtc-pc/#constructor (Step #2)// Some variables defined explicitly despite their implicit zero values to// allow better readability to understand what is happening.pc := &PeerConnection{statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()),configuration: Configuration{ICEServers: []ICEServer{},ICETransportPolicy: ICETransportPolicyAll,BundlePolicy: BundlePolicyBalanced,RTCPMuxPolicy: RTCPMuxPolicyRequire,Certificates: []Certificate{},ICECandidatePoolSize: 0,},ops: newOperations(),isClosed: &atomicBool{},isNegotiationNeeded: &atomicBool{},negotiationNeededState: negotiationNeededStateEmpty,lastOffer:"",lastAnswer: "",greaterMid: -1,signalingState: SignalingStateStable,api: api,log: api.settingEngine.LoggerFactory.NewLogger("pc"),}pc.iceConnectionState.Store(ICEConnectionStateNew)pc.connectionState.Store(PeerConnectionStateNew)i, err := api.interceptorRegistry.Build("")if err != nil {return nil, err}pc.api = &API{settingEngine: api.settingEngine,interceptor: i,}if api.settingEngine.disableMediaEngineCopy {pc.api.mediaEngine = api.mediaEngine} else {pc.api.mediaEngine = api.mediaEngine.copy()}if err = pc.initConfiguration(configuration); err != nil {return nil, err}pc.iceGatherer, err = pc.createICEGatherer()if err != nil {return nil, err}// Create the ice transporticeTransport := pc.createICETransport()pc.iceTransport = iceTransport// Create the DTLS transportdtlsTransport, err := pc.api.NewDTLSTransport(pc.iceTransport, pc.configuration.Certificates)if err != nil {return nil, err}pc.dtlsTransport = dtlsTransport// Create the SCTP transportpc.sctpTransport = pc.api.NewSCTPTransport(pc.dtlsTransport)// Wire up the on datachannel handlerpc.sctpTransport.OnDataChannel(func(d *DataChannel) {pc.mu.RLock()handler := pc.onDataChannelHandlerpc.mu.RUnlock()if handler != nil {handler(d)}})pc.interceptorRTCPWriter = pc.api.interceptor.BindRTCPWriter(interceptor.RTCPWriterFunc(pc.writeRTCP))return pc, nil}
// NewAgent creates a new Agentfunc NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognitvar err errorif config.PortMax < config.PortMin {return nil, ErrPort}mDNSName := config.MulticastDNSHostNameif mDNSName == "" {if mDNSName, err = generateMulticastDNSName(); err != nil {return nil, err}}if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {return nil, ErrInvalidMulticastDNSHostName}mDNSMode := config.MulticastDNSModeif mDNSMode == 0 {mDNSMode = MulticastDNSModeQueryOnly}loggerFactory := config.LoggerFactoryif loggerFactory == nil {loggerFactory = logging.NewDefaultLoggerFactory()}log := loggerFactory.NewLogger("ice")var mDNSConn *mdns.ConnmDNSConn, mDNSMode, err = createMulticastDNS(mDNSMode, mDNSName, log)// Opportunistic mDNS: If we can't open the connection, that's ok: we// can continue without it.if err != nil {log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)}closeMDNSConn := func() {if mDNSConn != nil {if mdnsCloseErr := mDNSConn.Close(); mdnsCloseErr != nil {log.Warnf("Failed to close mDNS: %v", mdnsCloseErr)}}}startedCtx, startedFn := context.WithCancel(context.Background())a := &Agent{chanTask: make(chan task),chanState: make(chan ConnectionState),chanCandidate: make(chan Candidate),chanCandidatePair: make(chan *CandidatePair),tieBreaker: globalMathRandomGenerator.Uint64(),lite:config.Lite,gatheringState: GatheringStateNew,connectionState: ConnectionStateNew,localCandidates: make(map[NetworkType][]Candidate),remoteCandidates: make(map[NetworkType][]Candidate),urls:config.Urls,networkTypes: config.NetworkTypes,onConnected:make(chan struct{}),buffer: packetio.NewBuffer(),done:make(chan struct{}),taskLoopDone: make(chan struct{}),startedCh: startedCtx.Done(),startedFn: startedFn,portmin: config.PortMin,portmax: config.PortMax,loggerFactory: loggerFactory,log: log,net: config.Net,proxyDialer:config.ProxyDialer,mDNSMode: mDNSMode,mDNSName: mDNSName,mDNSConn: mDNSConn,gatherCandidateCancel: func() {},forceCandidateContact: make(chan bool, 1),interfaceFilter: config.InterfaceFilter,insecureSkipVerify: config.InsecureSkipVerify,}a.tcpMux = config.TCPMuxif a.tcpMux == nil {a.tcpMux = newInvalidTCPMux()}a.udpMux = config.UDPMuxa.udpMuxSrflx = config.UDPMuxSrflxif a.net == nil {a.net = vnet.NewNet(nil)} else if a.net.IsVirtual() {a.log.Warn("vnet is enabled")if a.mDNSMode != MulticastDNSModeDisabled {a.log.Warn("vnet does not support mDNS yet")}}config.initWithDefaults(a)// Make sure the buffer doesn't grow indefinitely.// NOTE: We actually won't get anywhere close to this limit.// SRTP will constantly read from the endpoint and drop packets if it's full.a.buffer.SetLimitSize(maxBufferSize)if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {closeMDNSConn()return nil, ErrLiteUsingNonHostCandidates}if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {closeMDNSConn()return nil, ErrUselessUrlsProvided}if err = config.initExtIPMapping(a); err != nil {closeMDNSConn()return nil, err}go a.taskLoop()a.startOnConnectionStateChangeRoutine()// Restart is also used to initialize the agent for the first timeif err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {closeMDNSConn()_ = a.Close()return nil, err}return a, nil}
推荐一个零声学院免费公开课程,个人觉得老师讲得不错,分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习