webrtc-candidate源码分析

webrtc 信令

来自ChatGPT

WebRTC(Web Real-Time Communication)是一种可以在浏览器之间实现实时音视频通信的技术。信令是 WebRTC 中的一部分,用于管理会话的建立、维护和结束。简单来说,信令是用于交换元数据以建立连接的过程。

在 WebRTC 中,需要进行一些信令交换步骤来建立连接,例如:

交换SDP(Session Description Protocol):每个端点都需要将其本地的SDP发送给对方,并且解析对方发来的SDP。
交换ICE(Interactive Connectivity Establishment)候选项:WebRTC 会在两个端点之间建立多个连接,并尝试找到最佳的连接。ICE候选项是一组网络地址和端口,用于在对等方之间建立连接。
交换网络信息:包括网络类型、带宽和网络状况等信息,以帮助双方选择最佳的连接。
以上这些步骤需要通过信令服务器来完成。信令服务器负责协调连接双方之间的通信,并确保会话能够顺利进行。

WebRTC 并没有规定信令协议,因此开发人员可以自行选择适合自己应用的协议,例如 SIP(Session Initiation Protocol)、XMPP(Extensible Messaging and Presence Protocol)或 WebSocket 等。

本文看下是如何获取ICE候选项 ice candidate

在 WebRTC 中,ICE(Interactive Connectivity Establishment)协议用于在两个端点之间建立多个连接,并尝试找到最佳的连接。ICE协议使用候选项(Candidate)来表示可以建立连接的网络地址和端口组合。

在 WebRTC 中,候选项由 ICE 代理服务器生成,并发送给对端。ICE 代理服务器可以是 STUN(Session Traversal Utilities for NAT)服务器、TURN(Traversal Using Relay NAT)服务器或者同时支持 STUN 和 TURN 功能的服务器。

当 WebRTC 客户端开始建立连接时,它会向 ICE 代理服务器发送一个请求,以获取可用的候选项。ICE 代理服务器会返回一组网络地址和端口组合,用于尝试建立连接。ICE 候选项可能包括以下几种类型:

  1. 主机候选项(Host Candidate):本地计算机的网络地址和端口组合。
  2. 服务器反射候选项(Server Reflexive Candidate):通过 STUN 服务器获取的公网 IP 地址和端口组合。
  3. 对称候选项(Symmetric Candidate):使用对称 NAT 进行 NAT 穿透时获取的网络地址和端口组合。
  4. 中继候选项(Relay Candidate):使用 TURN 服务器进行 NAT 穿透时获取的网络地址和端口组合。

ICE 候选项是根据网络状况动态生成的,当网络环境发生变化时,WebRTC 客户端可能会重新生成新的候选项并发送给对端。因此,在建立 WebRTC 连接时,确保正确获取和处理 ICE 候选项非常重要,以确保连接的稳定性和质量。

SdpOfferAnswerHandler

crateoffer && setlocalDescription 之后就需要获取 candidate了,setlocalDescription 从 PeerConnection 进到 SdpOfferAnswerHandler

1
2
3
4
5
6
7
8
9
10
11
12
webrtc/src/pc/sdp_offer_answer.cc

void SdpOfferAnswerHandler::DoSetLocalDescription(...) {

...

// MaybeStartGathering needs to be called after informing the observer so that
// we don't signal any candidates before signaling that SetLocalDescription
// completed.
transport_controller_s()->MaybeStartGathering();

}

注释也说的很清楚了 transport_controller_s()->MaybeStartGathering(); 获取 candidates 的入口。

MaybeStartGathering

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/Users/blackox626/webrtc/src/pc/peer_connection.cc

PeerConnection::Initialize -> PeerConnection::InitializeTransportController_n

JsepTransportController* PeerConnection::InitializeTransportController_n(
const RTCConfiguration& configuration,
const PeerConnectionDependencies& dependencies) {
JsepTransportController::Config config;
config.redetermine_role_on_ice_restart =
configuration.redetermine_role_on_ice_restart;
config.ssl_max_version = options_.ssl_max_version;
config.disable_encryption = options_.disable_encryption;
config.bundle_policy = configuration.bundle_policy;
config.rtcp_mux_policy = configuration.rtcp_mux_policy;
// TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove
// this stub.
config.crypto_options = configuration.crypto_options.has_value()
? *configuration.crypto_options
: options_.crypto_options;
config.transport_observer = this;
config.rtcp_handler = InitializeRtcpCallback();
config.event_log = event_log_ptr_;
#if defined(ENABLE_EXTERNAL_AUTH)
config.enable_external_auth = true;
#endif
config.active_reset_srtp_params = configuration.active_reset_srtp_params;

// DTLS has to be enabled to use SCTP.
if (dtls_enabled_) {
config.sctp_factory = context_->sctp_transport_factory();
}

config.ice_transport_factory = ice_transport_factory_.get();
config.on_dtls_handshake_error_ =
[weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) {
if (weak_ptr) {
weak_ptr->OnTransportControllerDtlsHandshakeError(s);
}
};

config.field_trials = trials_.get();

transport_controller_.reset(
new JsepTransportController(network_thread(), port_allocator_.get(),
async_dns_resolver_factory_.get(), config));

transport_controller_->SubscribeIceConnectionState(
[this](cricket::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
if (s == cricket::kIceConnectionConnected) {
ReportTransportStats();
}
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerConnectionState(s);
}));
});
transport_controller_->SubscribeConnectionState(
[this](PeerConnectionInterface::PeerConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetConnectionState(s);
}));
});
transport_controller_->SubscribeStandardizedIceConnectionState(
[this](PeerConnectionInterface::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetStandardizedIceConnectionState(s);
}));
});
transport_controller_->SubscribeIceGatheringState(
[this](cricket::IceGatheringState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerGatheringState(s);
}));
});
transport_controller_->SubscribeIceCandidateGathered(
[this](const std::string& transport,
const std::vector<cricket::Candidate>& candidates) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(),
[this, t = transport, c = candidates]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesGathered(t, c);
}));
});
transport_controller_->SubscribeIceCandidateError(
[this](const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateError(event);
}));
});
transport_controller_->SubscribeIceCandidatesRemoved(
[this](const std::vector<cricket::Candidate>& c) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, c = c]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesRemoved(c);
}));
});
transport_controller_->SubscribeIceCandidatePairChanged(
[this](const cricket::CandidatePairChangeEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateChanged(event);
}));
});

transport_controller_->SetIceConfig(ParseIceConfig(configuration));
return transport_controller_.get();
}

注册了 candidate 获取的回调: transport_controller_->SubscribeIceCandidateGathered

先看下收集之后的行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void PeerConnection::OnTransportControllerCandidatesGathered(
const std::string& transport_name,
const cricket::Candidates& candidates) {
// TODO(bugs.webrtc.org/12427): Expect this to come in on the network thread
// (not signaling as it currently does), handle appropriately.
int sdp_mline_index;
if (!GetLocalCandidateMediaIndex(transport_name, &sdp_mline_index)) {
RTC_LOG(LS_ERROR)
<< "OnTransportControllerCandidatesGathered: content name "
<< transport_name << " not found";
return;
}

for (cricket::Candidates::const_iterator citer = candidates.begin();
citer != candidates.end(); ++citer) {
// Use transport_name as the candidate media id.
std::unique_ptr<JsepIceCandidate> candidate(
new JsepIceCandidate(transport_name, sdp_mline_index, *citer));
sdp_handler_->AddLocalIceCandidate(candidate.get());
OnIceCandidate(std::move(candidate));
}
}

void PeerConnection::OnIceCandidate(
std::unique_ptr<IceCandidateInterface> candidate) {
if (IsClosed()) {
return;
}
ReportIceCandidateCollected(candidate->candidate());
ClearStatsCache();
Observer()->OnIceCandidate(candidate.get());
}

首先 sdp_handler_->AddLocalIceCandidate(candidate.get()); 设置本地ice candidate
然后 通过 OnIceCandidate 事件回调 传递到上层app ,app通过 signal message 发送到webrtc server,webrtc server 转发给对端

app 同样也会收到 webrtc server转发过来的对端的 ice candidate

1
2
3
4
5
6
7
8
9
10
void PeerConnection::AddIceCandidate(
std::unique_ptr<IceCandidateInterface> candidate,
std::function<void(RTCError)> callback) {
RTC_DCHECK_RUN_ON(signaling_thread());
sdp_handler_->AddIceCandidate(std::move(candidate),
[this, callback](webrtc::RTCError result) {
ClearStatsCache();
callback(result);
});
}

通过 sdp_handler_->AddIceCandidate 设置对端的ice candidate;

JsepTransportController

JavaScript Session Establishment Protocol (JSEP) JavaScript 会话建立协议

1
2
3
4
5
6
7
8
9
10
11
12
/webrtc/src/pc/jsep_transport_controller.cc

void JsepTransportController::MaybeStartGathering() {
if (!network_thread_->IsCurrent()) {
network_thread_->BlockingCall([&] { MaybeStartGathering(); });
return;
}

for (auto& dtls : GetDtlsTransports()) {
dtls->ice_transport()->MaybeStartGathering();
}
}

先看下 DtlsTransports 的生产过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
SdpOfferAnswerHandler::DoSetLocalDescription ->
SdpOfferAnswerHandler::ApplyLocalDescription ->
SdpOfferAnswerHandler::PushdownTransportDescription ->

JsepTransportController::SetLocalDescription ->

JsepTransportController::ApplyDescription_n ->

RTCError JsepTransportController::ApplyDescription_n(
bool local,
SdpType type,
const cricket::SessionDescription* description) {
TRACE_EVENT0("webrtc", "JsepTransportController::ApplyDescription_n");
RTC_DCHECK(description);

if (local) {
local_desc_ = description;
} else {
remote_desc_ = description;
}

RTCError error;
error = ValidateAndMaybeUpdateBundleGroups(local, type, description);
if (!error.ok()) {
return error;
}

std::map<const cricket::ContentGroup*, std::vector<int>>
merged_encrypted_extension_ids_by_bundle;
if (!bundles_.bundle_groups().empty()) {
merged_encrypted_extension_ids_by_bundle =
MergeEncryptedHeaderExtensionIdsForBundles(description);
}

for (const cricket::ContentInfo& content_info : description->contents()) {
// Don't create transports for rejected m-lines and bundled m-lines.
if (content_info.rejected ||
!bundles_.IsFirstMidInGroup(content_info.name)) {
continue;
}
error = MaybeCreateJsepTransport(local, content_info, *description);
if (!error.ok()) {
return error;
}
}

RTC_DCHECK(description->contents().size() ==
description->transport_infos().size());
for (size_t i = 0; i < description->contents().size(); ++i) {
const cricket::ContentInfo& content_info = description->contents()[i];
const cricket::TransportInfo& transport_info =
description->transport_infos()[i];

if (content_info.rejected) {
// This may cause groups to be removed from |bundles_.bundle_groups()|.
HandleRejectedContent(content_info);
continue;
}

const cricket::ContentGroup* established_bundle_group =
bundles_.LookupGroupByMid(content_info.name);

// For bundle members that are not BUNDLE-tagged (not first in the group),
// configure their transport to be the same as the BUNDLE-tagged transport.
if (established_bundle_group &&
content_info.name != *established_bundle_group->FirstContentName()) {
if (!HandleBundledContent(content_info, *established_bundle_group)) {
return RTCError(RTCErrorType::INVALID_PARAMETER,
"Failed to process the bundled m= section with "
"mid='" +
content_info.name + "'.");
}
continue;
}

error = ValidateContent(content_info);
if (!error.ok()) {
return error;
}

std::vector<int> extension_ids;
// Is BUNDLE-tagged (first in the group)?
if (established_bundle_group &&
content_info.name == *established_bundle_group->FirstContentName()) {
auto it = merged_encrypted_extension_ids_by_bundle.find(
established_bundle_group);
RTC_DCHECK(it != merged_encrypted_extension_ids_by_bundle.end());
extension_ids = it->second;
} else {
extension_ids = GetEncryptedHeaderExtensionIds(content_info);
}

int rtp_abs_sendtime_extn_id =
GetRtpAbsSendTimeHeaderExtensionId(content_info);

cricket::JsepTransport* transport =
GetJsepTransportForMid(content_info.name);
RTC_DCHECK(transport);

SetIceRole_n(DetermineIceRole(transport, transport_info, type, local));

cricket::JsepTransportDescription jsep_description =
CreateJsepTransportDescription(content_info, transport_info,
extension_ids, rtp_abs_sendtime_extn_id);
if (local) {
error =
transport->SetLocalJsepTransportDescription(jsep_description, type);
} else {
error =
transport->SetRemoteJsepTransportDescription(jsep_description, type);
}

if (!error.ok()) {
LOG_AND_RETURN_ERROR(
RTCErrorType::INVALID_PARAMETER,
"Failed to apply the description for m= section with mid='" +
content_info.name + "': " + error.message());
}
}
if (type == SdpType::kAnswer) {
transports_.CommitTransports();
bundles_.Commit();
}
return RTCError::OK();
}


RTCError JsepTransportController::MaybeCreateJsepTransport(
bool local,
const cricket::ContentInfo& content_info,
const cricket::SessionDescription& description) {
cricket::JsepTransport* transport = GetJsepTransportByName(content_info.name);
if (transport) {
return RTCError::OK();
}
const cricket::MediaContentDescription* content_desc =
content_info.media_description();
if (certificate_ && !content_desc->cryptos().empty()) {
return RTCError(RTCErrorType::INVALID_PARAMETER,
"SDES and DTLS-SRTP cannot be enabled at the same time.");
}

rtc::scoped_refptr<webrtc::IceTransportInterface> ice =
CreateIceTransport(content_info.name, /*rtcp=*/false);

std::unique_ptr<cricket::DtlsTransportInternal> rtp_dtls_transport =
CreateDtlsTransport(content_info, ice->internal());

std::unique_ptr<cricket::DtlsTransportInternal> rtcp_dtls_transport;
std::unique_ptr<RtpTransport> unencrypted_rtp_transport;
std::unique_ptr<SrtpTransport> sdes_transport;
std::unique_ptr<DtlsSrtpTransport> dtls_srtp_transport;

rtc::scoped_refptr<webrtc::IceTransportInterface> rtcp_ice;
if (config_.rtcp_mux_policy !=
PeerConnectionInterface::kRtcpMuxPolicyRequire &&
content_info.type == cricket::MediaProtocolType::kRtp) {
rtcp_ice = CreateIceTransport(content_info.name, /*rtcp=*/true);
rtcp_dtls_transport =
CreateDtlsTransport(content_info, rtcp_ice->internal());
}

if (config_.disable_encryption) {
RTC_LOG(LS_INFO)
<< "Creating UnencryptedRtpTransport, becayse encryption is disabled.";
unencrypted_rtp_transport = CreateUnencryptedRtpTransport(
content_info.name, rtp_dtls_transport.get(), rtcp_dtls_transport.get());
} else if (!content_desc->cryptos().empty()) {
sdes_transport = CreateSdesTransport(
content_info.name, rtp_dtls_transport.get(), rtcp_dtls_transport.get());
RTC_LOG(LS_INFO) << "Creating SdesTransport.";
} else {
RTC_LOG(LS_INFO) << "Creating DtlsSrtpTransport.";
dtls_srtp_transport = CreateDtlsSrtpTransport(
content_info.name, rtp_dtls_transport.get(), rtcp_dtls_transport.get());
}

std::unique_ptr<cricket::SctpTransportInternal> sctp_transport;
if (config_.sctp_factory) {
sctp_transport =
config_.sctp_factory->CreateSctpTransport(rtp_dtls_transport.get());
}

std::unique_ptr<cricket::JsepTransport> jsep_transport =
std::make_unique<cricket::JsepTransport>(
content_info.name, certificate_, std::move(ice), std::move(rtcp_ice),
std::move(unencrypted_rtp_transport), std::move(sdes_transport),
std::move(dtls_srtp_transport), std::move(rtp_dtls_transport),
std::move(rtcp_dtls_transport), std::move(sctp_transport), [&]() {
RTC_DCHECK_RUN_ON(network_thread_);
UpdateAggregateStates_n();
});

jsep_transport->rtp_transport()->SignalRtcpPacketReceived.connect(
this, &JsepTransportController::OnRtcpPacketReceived_n);

transports_.RegisterTransport(content_info.name, std::move(jsep_transport));
UpdateAggregateStates_n();
return RTCError::OK();
}


rtc::scoped_refptr<webrtc::IceTransportInterface>
JsepTransportController::CreateIceTransport(const std::string& transport_name,
bool rtcp) {
int component = rtcp ? cricket::ICE_CANDIDATE_COMPONENT_RTCP
: cricket::ICE_CANDIDATE_COMPONENT_RTP;

IceTransportInit init;
init.set_port_allocator(port_allocator_);
init.set_async_dns_resolver_factory(async_dns_resolver_factory_);
init.set_event_log(config_.event_log);
init.set_field_trials(config_.field_trials);
auto transport = config_.ice_transport_factory->CreateIceTransport(
transport_name, component, std::move(init));
RTC_DCHECK(transport);
transport->internal()->SetIceRole(ice_role_);
transport->internal()->SetIceTiebreaker(ice_tiebreaker_);
transport->internal()->SetIceConfig(ice_config_);
return transport;
}

std::unique_ptr<cricket::DtlsTransportInternal>
JsepTransportController::CreateDtlsTransport(
const cricket::ContentInfo& content_info,
cricket::IceTransportInternal* ice) {
RTC_DCHECK_RUN_ON(network_thread_);

std::unique_ptr<cricket::DtlsTransportInternal> dtls;

if (config_.dtls_transport_factory) {
dtls = config_.dtls_transport_factory->CreateDtlsTransport(
ice, config_.crypto_options, config_.ssl_max_version);
} else {
dtls = std::make_unique<cricket::DtlsTransport>(ice, config_.crypto_options,
config_.event_log,
config_.ssl_max_version);
}

RTC_DCHECK(dtls);
RTC_DCHECK_EQ(ice, dtls->ice_transport());

if (certificate_) {
bool set_cert_success = dtls->SetLocalCertificate(certificate_);
RTC_DCHECK(set_cert_success);
}

// Connect to signals offered by the DTLS and ICE transport.
dtls->SignalWritableState.connect(
this, &JsepTransportController::OnTransportWritableState_n);
dtls->SignalReceivingState.connect(
this, &JsepTransportController::OnTransportReceivingState_n);
dtls->ice_transport()->SignalGatheringState.connect(
this, &JsepTransportController::OnTransportGatheringState_n);
dtls->ice_transport()->SignalCandidateGathered.connect(
this, &JsepTransportController::OnTransportCandidateGathered_n);
dtls->ice_transport()->SignalCandidateError.connect(
this, &JsepTransportController::OnTransportCandidateError_n);
dtls->ice_transport()->SignalCandidatesRemoved.connect(
this, &JsepTransportController::OnTransportCandidatesRemoved_n);
dtls->ice_transport()->SignalRoleConflict.connect(
this, &JsepTransportController::OnTransportRoleConflict_n);
dtls->ice_transport()->SignalStateChanged.connect(
this, &JsepTransportController::OnTransportStateChanged_n);
dtls->ice_transport()->SignalIceTransportStateChanged.connect(
this, &JsepTransportController::OnTransportStateChanged_n);
dtls->ice_transport()->SignalCandidatePairChanged.connect(
this, &JsepTransportController::OnTransportCandidatePairChanged_n);

dtls->SubscribeDtlsHandshakeError(
[this](rtc::SSLHandshakeError error) { OnDtlsHandshakeError(error); });
return dtls;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
.../webrtc/src/p2p/base/default_ice_transport_factory.cc

rtc::scoped_refptr<IceTransportInterface>
DefaultIceTransportFactory::CreateIceTransport(
const std::string& transport_name,
int component,
IceTransportInit init) {
BasicIceControllerFactory factory;
init.set_ice_controller_factory(&factory);
return rtc::make_ref_counted<DefaultIceTransport>(
cricket::P2PTransportChannel::Create(transport_name, component,
std::move(init)));
}

在来看 dtls->ice_transport()->MaybeStartGathering() dtls->ice_transport() 就是 P2PTransportChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
dtls->ice_transport()->SignalCandidateGathered.connect(
this, &JsepTransportController::OnTransportCandidateGathered_n);


void JsepTransportController::OnTransportCandidateGathered_n(
cricket::IceTransportInternal* transport,
const cricket::Candidate& candidate) {
// We should never signal peer-reflexive candidates.
if (candidate.type() == cricket::PRFLX_PORT_TYPE) {
RTC_DCHECK_NOTREACHED();
return;
}

signal_ice_candidates_gathered_.Send(
transport->transport_name(), std::vector<cricket::Candidate>{candidate});
}

void SubscribeIceCandidateGathered(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidates_gathered_.AddReceiver(std::forward<F>(callback));
}

这里就跟之前的peer_connection 中 SubscribeIceCandidateGathered 注册回调 对应上了

P2PTransportChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
.../webrtc/src/p2p/base/p2p_transport_channel.cc

void P2PTransportChannel::MaybeStartGathering() {
RTC_DCHECK_RUN_ON(network_thread_);
// TODO(bugs.webrtc.org/14605): ensure tie_breaker_ is set.
if (ice_parameters_.ufrag.empty() || ice_parameters_.pwd.empty()) {
RTC_LOG(LS_ERROR)
<< "Cannot gather candidates because ICE parameters are empty"
" ufrag: "
<< ice_parameters_.ufrag << " pwd: " << ice_parameters_.pwd;
return;
}
// Start gathering if we never started before, or if an ICE restart occurred.
if (allocator_sessions_.empty() ||
IceCredentialsChanged(allocator_sessions_.back()->ice_ufrag(),
allocator_sessions_.back()->ice_pwd(),
ice_parameters_.ufrag, ice_parameters_.pwd)) {
if (gathering_state_ != kIceGatheringGathering) {
gathering_state_ = kIceGatheringGathering;
SignalGatheringState(this);
}

if (!allocator_sessions_.empty()) {
IceRestartState state;
if (writable()) {
state = IceRestartState::CONNECTED;
} else if (IsGettingPorts()) {
state = IceRestartState::CONNECTING;
} else {
state = IceRestartState::DISCONNECTED;
}
RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IceRestartState",
static_cast<int>(state),
static_cast<int>(IceRestartState::MAX_VALUE));
}

for (const auto& session : allocator_sessions_) {
if (session->IsStopped()) {
continue;
}
session->StopGettingPorts();
}

// Time for a new allocator.
std::unique_ptr<PortAllocatorSession> pooled_session =
allocator_->TakePooledSession(transport_name(), component(),
ice_parameters_.ufrag,
ice_parameters_.pwd);
if (pooled_session) {
pooled_session->set_ice_tiebreaker(tiebreaker_);
AddAllocatorSession(std::move(pooled_session));
PortAllocatorSession* raw_pooled_session =
allocator_sessions_.back().get();
// Process the pooled session's existing candidates/ports, if they exist.
OnCandidatesReady(raw_pooled_session,
raw_pooled_session->ReadyCandidates());
for (PortInterface* port : allocator_sessions_.back()->ReadyPorts()) {
OnPortReady(raw_pooled_session, port);
}
if (allocator_sessions_.back()->CandidatesAllocationDone()) {
OnCandidatesAllocationDone(raw_pooled_session);
}
} else {
AddAllocatorSession(allocator_->CreateSession(
transport_name(), component(), ice_parameters_.ufrag,
ice_parameters_.pwd));
allocator_sessions_.back()->set_ice_tiebreaker(tiebreaker_);
allocator_sessions_.back()->StartGettingPorts();
}
}
}

void P2PTransportChannel::OnCandidatesReady(
PortAllocatorSession* session,
const std::vector<Candidate>& candidates) {
RTC_DCHECK_RUN_ON(network_thread_);
for (size_t i = 0; i < candidates.size(); ++i) {
SignalCandidateGathered(this, candidates[i]);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
.../webrtc/src/p2p/client/basic_port_allocator.cc

// BasicPortAllocatorSession
BasicPortAllocatorSession::BasicPortAllocatorSession(
BasicPortAllocator* allocator,
absl::string_view content_name,
int component,
absl::string_view ice_ufrag,
absl::string_view ice_pwd)
: PortAllocatorSession(content_name,
component,
ice_ufrag,
ice_pwd,
allocator->flags()),
allocator_(allocator),
network_thread_(rtc::Thread::Current()),
socket_factory_(allocator->socket_factory()),
allocation_started_(false),
network_manager_started_(false),
allocation_sequences_created_(false),
turn_port_prune_policy_(allocator->turn_port_prune_policy()) {
TRACE_EVENT0("webrtc",
"BasicPortAllocatorSession::BasicPortAllocatorSession");
allocator_->network_manager()->SignalNetworksChanged.connect(
this, &BasicPortAllocatorSession::OnNetworksChanged);

/// 收集networks BasicNetworkManager
allocator_->network_manager()->StartUpdating();
}

void BasicPortAllocatorSession::StartGettingPorts() {
RTC_DCHECK_RUN_ON(network_thread_);
state_ = SessionState::GATHERING;

network_thread_->PostTask(
SafeTask(network_safety_.flag(), [this] { GetPortConfigurations(); }));

RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
<< turn_port_prune_policy_;
}

void BasicPortAllocatorSession::GetPortConfigurations() {
RTC_DCHECK_RUN_ON(network_thread_);

auto config = std::make_unique<PortConfiguration>(
allocator_->stun_servers(), username(), password(),
allocator()->field_trials());

for (const RelayServerConfig& turn_server : allocator_->turn_servers()) {
config->AddRelay(turn_server);
}
ConfigReady(std::move(config));
}

void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {
RTC_DCHECK_RUN_ON(network_thread_);
ConfigReady(absl::WrapUnique(config));
}

void BasicPortAllocatorSession::ConfigReady(
std::unique_ptr<PortConfiguration> config) {
RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->PostTask(SafeTask(
network_safety_.flag(), [this, config = std::move(config)]() mutable {
OnConfigReady(std::move(config));
}));
}

// Adds a configuration to the list.
void BasicPortAllocatorSession::OnConfigReady(
std::unique_ptr<PortConfiguration> config) {
RTC_DCHECK_RUN_ON(network_thread_);
if (config)
configs_.push_back(std::move(config));

AllocatePorts();
}

void BasicPortAllocatorSession::AllocatePorts() {
RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->PostTask(SafeTask(
network_safety_.flag(), [this, allocation_epoch = allocation_epoch_] {
OnAllocate(allocation_epoch);
}));
}

void BasicPortAllocatorSession::OnAllocate(int allocation_epoch) {
RTC_DCHECK_RUN_ON(network_thread_);
if (allocation_epoch != allocation_epoch_)
return;

if (network_manager_started_ && !IsStopped()) {
bool disable_equivalent_phases = true;
DoAllocate(disable_equivalent_phases);
}

allocation_started_ = true;
}

void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {
RTC_DCHECK_RUN_ON(network_thread_);
bool done_signal_needed = false;
std::vector<const rtc::Network*> networks = GetNetworks();
if (networks.empty()) {
RTC_LOG(LS_WARNING)
<< "Machine has no networks; no ports will be allocated";
done_signal_needed = true;
} else {
RTC_LOG(LS_INFO) << "Allocate ports on " << NetworksToString(networks);
PortConfiguration* config =
configs_.empty() ? nullptr : configs_.back().get();
for (uint32_t i = 0; i < networks.size(); ++i) {
uint32_t sequence_flags = flags();
if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
// If all the ports are disabled we should just fire the allocation
// done event and return.
done_signal_needed = true;
break;
}

if (!config || config->relays.empty()) {
// No relay ports specified in this config.
sequence_flags |= PORTALLOCATOR_DISABLE_RELAY;
}

if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6) &&
networks[i]->GetBestIP().family() == AF_INET6) {
// Skip IPv6 networks unless the flag's been set.
continue;
}

if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6_ON_WIFI) &&
networks[i]->GetBestIP().family() == AF_INET6 &&
networks[i]->type() == rtc::ADAPTER_TYPE_WIFI) {
// Skip IPv6 Wi-Fi networks unless the flag's been set.
continue;
}

if (disable_equivalent) {
// Disable phases that would only create ports equivalent to
// ones that we have already made.
DisableEquivalentPhases(networks[i], config, &sequence_flags);

if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
// New AllocationSequence would have nothing to do, so don't make it.
continue;
}
}

AllocationSequence* sequence =
new AllocationSequence(this, networks[i], config, sequence_flags,
[this, safety_flag = network_safety_.flag()] {
if (safety_flag->alive())
OnPortAllocationComplete();
});
sequence->Init();
sequence->Start();
sequences_.push_back(sequence);
done_signal_needed = true;
}
}
if (done_signal_needed) {
network_thread_->PostTask(SafeTask(network_safety_.flag(), [this] {
OnAllocationSequenceObjectsCreated();
}));
}
}

std::vector<const rtc::Network*> BasicPortAllocatorSession::GetNetworks() {
RTC_DCHECK_RUN_ON(network_thread_);
std::vector<const rtc::Network*> networks;
rtc::NetworkManager* network_manager = allocator_->network_manager();
RTC_DCHECK(network_manager != nullptr);
// If the network permission state is BLOCKED, we just act as if the flag has
// been passed in.
if (network_manager->enumeration_permission() ==
rtc::NetworkManager::ENUMERATION_BLOCKED) {
set_flags(flags() | PORTALLOCATOR_DISABLE_ADAPTER_ENUMERATION);
}
// If the adapter enumeration is disabled, we'll just bind to any address
// instead of specific NIC. This is to ensure the same routing for http
// traffic by OS is also used here to avoid any local or public IP leakage
// during stun process.
if (flags() & PORTALLOCATOR_DISABLE_ADAPTER_ENUMERATION) {
networks = network_manager->GetAnyAddressNetworks();
} else {
networks = network_manager->GetNetworks();
// If network enumeration fails, use the ANY address as a fallback, so we
// can at least try gathering candidates using the default route chosen by
// the OS. Or, if the PORTALLOCATOR_ENABLE_ANY_ADDRESS_PORTS flag is
// set, we'll use ANY address candidates either way.
if (networks.empty() ||
(flags() & PORTALLOCATOR_ENABLE_ANY_ADDRESS_PORTS)) {
std::vector<const rtc::Network*> any_address_networks =
network_manager->GetAnyAddressNetworks();
networks.insert(networks.end(), any_address_networks.begin(),
any_address_networks.end());
}
}
// Filter out link-local networks if needed.
if (flags() & PORTALLOCATOR_DISABLE_LINK_LOCAL_NETWORKS) {
NetworkFilter link_local_filter(
[](const rtc::Network* network) {
return IPIsLinkLocal(network->prefix());
},
"link-local");
FilterNetworks(&networks, link_local_filter);
}
// Do some more filtering, depending on the network ignore mask and "disable
// costly networks" flag.
NetworkFilter ignored_filter(
[this](const rtc::Network* network) {
return allocator_->GetNetworkIgnoreMask() & network->type();
},
"ignored");
FilterNetworks(&networks, ignored_filter);
if (flags() & PORTALLOCATOR_DISABLE_COSTLY_NETWORKS) {
uint16_t lowest_cost = rtc::kNetworkCostMax;
for (const rtc::Network* network : networks) {
// Don't determine the lowest cost from a link-local network.
// On iOS, a device connected to the computer will get a link-local
// network for communicating with the computer, however this network can't
// be used to connect to a peer outside the network.
if (rtc::IPIsLinkLocal(network->GetBestIP())) {
continue;
}
lowest_cost = std::min<uint16_t>(
lowest_cost, network->GetCost(*allocator()->field_trials()));
}
NetworkFilter costly_filter(
[lowest_cost, this](const rtc::Network* network) {
return network->GetCost(*allocator()->field_trials()) >
lowest_cost + rtc::kNetworkCostLow;
},
"costly");
FilterNetworks(&networks, costly_filter);
}

// Lastly, if we have a limit for the number of IPv6 network interfaces (by
// default, it's 5), remove networks to ensure that limit is satisfied.
//
// TODO(deadbeef): Instead of just taking the first N arbitrary IPv6
// networks, we could try to choose a set that's "most likely to work". It's
// hard to define what that means though; it's not just "lowest cost".
// Alternatively, we could just focus on making our ICE pinging logic smarter
// such that this filtering isn't necessary in the first place.
const webrtc::FieldTrialsView* field_trials = allocator_->field_trials();
if (IsDiversifyIpv6InterfacesEnabled(field_trials)) {
std::vector<const rtc::Network*> ipv6_networks;
for (auto it = networks.begin(); it != networks.end();) {
if ((*it)->prefix().family() == AF_INET6) {
ipv6_networks.push_back(*it);
it = networks.erase(it);
continue;
}
++it;
}
ipv6_networks =
SelectIPv6Networks(ipv6_networks, allocator_->max_ipv6_networks());
networks.insert(networks.end(), ipv6_networks.begin(), ipv6_networks.end());
} else {
int ipv6_networks = 0;
for (auto it = networks.begin(); it != networks.end();) {
if ((*it)->prefix().family() == AF_INET6) {
if (ipv6_networks >= allocator_->max_ipv6_networks()) {
it = networks.erase(it);
continue;
} else {
++ipv6_networks;
}
}
++it;
}
}
return networks;
}

void AllocationSequence::Start() {
state_ = kRunning;

session_->network_thread()->PostTask(
SafeTask(safety_.flag(), [this, epoch = epoch_] { Process(epoch); }));
// Take a snapshot of the best IP, so that when DisableEquivalentPhases is
// called next time, we enable all phases if the best IP has since changed.
previous_best_ip_ = network_->GetBestIP();
}

void AllocationSequence::Process(int epoch) {
RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};

if (epoch != epoch_)
return;

// Perform all of the phases in the current step.
RTC_LOG(LS_INFO) << network_->ToString()
<< ": Allocation Phase=" << PHASE_NAMES[phase_];

switch (phase_) {
case PHASE_UDP:
CreateUDPPorts();
CreateStunPorts();
break;

case PHASE_RELAY:
CreateRelayPorts();
break;

case PHASE_TCP:
CreateTCPPorts();
state_ = kCompleted;
break;

default:
RTC_DCHECK_NOTREACHED();
}

if (state() == kRunning) {
++phase_;
session_->network_thread()->PostDelayedTask(
SafeTask(safety_.flag(), [this, epoch = epoch_] { Process(epoch); }),
TimeDelta::Millis(session_->allocator()->step_delay()));
} else {
// No allocation steps needed further if all phases in AllocationSequence
// are completed. Cause further Process calls in the previous epoch to be
// ignored.
++epoch_;
port_allocation_complete_callback_();
}
}

void AllocationSequence::CreateUDPPorts() {
if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) {
RTC_LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping.";
return;
}

// TODO(mallinath) - Remove UDPPort creating socket after shared socket
// is enabled completely.
std::unique_ptr<UDPPort> port;
bool emit_local_candidate_for_anyaddress =
!IsFlagSet(PORTALLOCATOR_DISABLE_DEFAULT_LOCAL_CANDIDATE);
if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) {
port = UDPPort::Create(
session_->network_thread(), session_->socket_factory(), network_,
udp_socket_.get(), session_->username(), session_->password(),
emit_local_candidate_for_anyaddress,
session_->allocator()->stun_candidate_keepalive_interval(),
session_->allocator()->field_trials());
} else {
port = UDPPort::Create(
session_->network_thread(), session_->socket_factory(), network_,
session_->allocator()->min_port(), session_->allocator()->max_port(),
session_->username(), session_->password(),
emit_local_candidate_for_anyaddress,
session_->allocator()->stun_candidate_keepalive_interval(),
session_->allocator()->field_trials());
}

if (port) {
port->SetIceTiebreaker(session_->ice_tiebreaker());
// If shared socket is enabled, STUN candidate will be allocated by the
// UDPPort.
if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
udp_port_ = port.get();
port->SubscribePortDestroyed(
[this](PortInterface* port) { OnPortDestroyed(port); });

// If STUN is not disabled, setting stun server address to port.
if (!IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
if (config_ && !config_->StunServers().empty()) {
RTC_LOG(LS_INFO)
<< "AllocationSequence: UDPPort will be handling the "
"STUN candidate generation.";
port->set_server_addresses(config_->StunServers());
}
}
}

session_->AddAllocatedPort(port.release(), this);
}
}

void BasicPortAllocatorSession::AddAllocatedPort(Port* port,
AllocationSequence* seq) {
RTC_DCHECK_RUN_ON(network_thread_);
if (!port)
return;

RTC_LOG(LS_INFO) << "Adding allocated port for " << content_name();
port->set_content_name(content_name());
port->set_component(component());
port->set_generation(generation());
if (allocator_->proxy().type != rtc::PROXY_NONE)
port->set_proxy(allocator_->user_agent(), allocator_->proxy());
port->set_send_retransmit_count_attribute(
(flags() & PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);

PortData data(port, seq);
ports_.push_back(data);

port->SignalCandidateReady.connect(
this, &BasicPortAllocatorSession::OnCandidateReady);
port->SignalCandidateError.connect(
this, &BasicPortAllocatorSession::OnCandidateError);
port->SignalPortComplete.connect(this,
&BasicPortAllocatorSession::OnPortComplete);
port->SubscribePortDestroyed(
[this](PortInterface* port) { OnPortDestroyed(port); });

port->SignalPortError.connect(this, &BasicPortAllocatorSession::OnPortError);
RTC_LOG(LS_INFO) << port->ToString() << ": Added port to allocator";

port->PrepareAddress();
}

void BasicPortAllocatorSession::OnCandidateReady(Port* port,
const Candidate& c) {
RTC_DCHECK_RUN_ON(network_thread_);
PortData* data = FindPort(port);
RTC_DCHECK(data != NULL);
RTC_LOG(LS_INFO) << port->ToString()
<< ": Gathered candidate: " << c.ToSensitiveString();
// Discarding any candidate signal if port allocation status is
// already done with gathering.
if (!data->inprogress()) {
RTC_LOG(LS_WARNING)
<< "Discarding candidate because port is already done gathering.";
return;
}

// Mark that the port has a pairable candidate, either because we have a
// usable candidate from the port, or simply because the port is bound to the
// any address and therefore has no host candidate. This will trigger the port
// to start creating candidate pairs (connections) and issue connectivity
// checks. If port has already been marked as having a pairable candidate,
// do nothing here.
// Note: We should check whether any candidates may become ready after this
// because there we will check whether the candidate is generated by the ready
// ports, which may include this port.
bool pruned = false;
if (CandidatePairable(c, port) && !data->has_pairable_candidate()) {
data->set_has_pairable_candidate(true);

if (port->Type() == RELAY_PORT_TYPE) {
if (turn_port_prune_policy_ == webrtc::KEEP_FIRST_READY) {
pruned = PruneNewlyPairableTurnPort(data);
} else if (turn_port_prune_policy_ == webrtc::PRUNE_BASED_ON_PRIORITY) {
pruned = PruneTurnPorts(port);
}
}

// If the current port is not pruned yet, SignalPortReady.
if (!data->pruned()) {
RTC_LOG(LS_INFO) << port->ToString() << ": Port ready.";
SignalPortReady(this, port);
port->KeepAliveUntilPruned();
}
}

if (data->ready() && CheckCandidateFilter(c)) {
std::vector<Candidate> candidates;
candidates.push_back(allocator_->SanitizeCandidate(c));
SignalCandidatesReady(this, candidates);
} else {
RTC_LOG(LS_INFO) << "Discarding candidate because it doesn't match filter.";
}

// If we have pruned any port, maybe need to signal port allocation done.
if (pruned) {
MaybeSignalCandidatesAllocationDone();
}
}

void BasicPortAllocatorSession::OnPortComplete(Port* port) {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_LOG(LS_INFO) << port->ToString()
<< ": Port completed gathering candidates.";
PortData* data = FindPort(port);
RTC_DCHECK(data != NULL);

// Ignore any late signals.
if (!data->inprogress()) {
return;
}

// Moving to COMPLETE state.
data->set_state(PortData::STATE_COMPLETE);
// Send candidate allocation complete signal if this was the last port.
MaybeSignalCandidatesAllocationDone();
}

通过 udp port 连接到 ice server , 注册 OnCandidateReady 回调 就跟前面 p2p_transport_channel OnCandidateReady 对应上了

port

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
.../webrtc/src/p2p/base/stun_port.cc

UDPPort::UDPPort(rtc::Thread* thread,
rtc::PacketSocketFactory* factory,
const rtc::Network* network,
rtc::AsyncPacketSocket* socket,
absl::string_view username,
absl::string_view password,
bool emit_local_for_anyaddress,
const webrtc::FieldTrialsView* field_trials)
: Port(thread,
LOCAL_PORT_TYPE,
factory,
network,
username,
password,
field_trials),
request_manager_(
thread,
[this](const void* data, size_t size, StunRequest* request) {
OnSendPacket(data, size, request);
}),
socket_(socket),
error_(0),
ready_(false),
stun_keepalive_delay_(STUN_KEEPALIVE_INTERVAL),
dscp_(rtc::DSCP_NO_CHANGE),
emit_local_for_anyaddress_(emit_local_for_anyaddress) {}

UDPPort::UDPPort(rtc::Thread* thread,
rtc::PacketSocketFactory* factory,
const rtc::Network* network,
uint16_t min_port,
uint16_t max_port,
absl::string_view username,
absl::string_view password,
bool emit_local_for_anyaddress,
const webrtc::FieldTrialsView* field_trials)
: Port(thread,
LOCAL_PORT_TYPE,
factory,
network,
min_port,
max_port,
username,
password,
field_trials),
request_manager_(
thread,
[this](const void* data, size_t size, StunRequest* request) {
OnSendPacket(data, size, request);
}),
socket_(nullptr),
error_(0),
ready_(false),
stun_keepalive_delay_(STUN_KEEPALIVE_INTERVAL),
dscp_(rtc::DSCP_NO_CHANGE),
emit_local_for_anyaddress_(emit_local_for_anyaddress) {}

bool UDPPort::Init() {
stun_keepalive_lifetime_ = GetStunKeepaliveLifetime();
if (!SharedSocket()) {
RTC_DCHECK(socket_ == nullptr);
socket_ = socket_factory()->CreateUdpSocket(
rtc::SocketAddress(Network()->GetBestIP(), 0), min_port(), max_port());
if (!socket_) {
RTC_LOG(LS_WARNING) << ToString() << ": UDP socket creation failed";
return false;
}
socket_->SignalReadPacket.connect(this, &UDPPort::OnReadPacket);
}
socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket);
socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend);
socket_->SignalAddressReady.connect(this, &UDPPort::OnLocalAddressReady);
return true;
}

void UDPPort::PrepareAddress() {
RTC_DCHECK(request_manager_.empty());
if (socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND) {
OnLocalAddressReady(socket_, socket_->GetLocalAddress());
}
}

void UDPPort::OnLocalAddressReady(rtc::AsyncPacketSocket* socket,
const rtc::SocketAddress& address) {
// When adapter enumeration is disabled and binding to the any address, the
// default local address will be issued as a candidate instead if
// `emit_local_for_anyaddress` is true. This is to allow connectivity for
// applications which absolutely requires a HOST candidate.
rtc::SocketAddress addr = address;

// If MaybeSetDefaultLocalAddress fails, we keep the "any" IP so that at
// least the port is listening.
MaybeSetDefaultLocalAddress(&addr);

AddAddress(addr, addr, rtc::SocketAddress(), UDP_PROTOCOL_NAME, "", "",
LOCAL_PORT_TYPE, ICE_TYPE_PREFERENCE_HOST, 0, "", false);
MaybePrepareStunCandidate();

}

void UDPPort::MaybePrepareStunCandidate() {
// Sending binding request to the STUN server if address is available to
// prepare STUN candidate.
if (!server_addresses_.empty()) {
SendStunBindingRequests();
} else {
// Port is done allocating candidates.
MaybeSetPortCompleteOrError();
}
}

void UDPPort::SendStunBindingRequests() {
// We will keep pinging the stun server to make sure our NAT pin-hole stays
// open until the deadline (specified in SendStunBindingRequest).
RTC_DCHECK(request_manager_.empty());

for (ServerAddresses::const_iterator it = server_addresses_.begin();
it != server_addresses_.end();) {
// sending a STUN binding request may cause the current SocketAddress to be
// erased from the set, invalidating the loop iterator before it is
// incremented (even if the SocketAddress itself still exists). So make a
// copy of the loop iterator, which may be safely invalidated.
ServerAddresses::const_iterator addr = it++;
SendStunBindingRequest(*addr);
}
}

void UDPPort::SendStunBindingRequest(const rtc::SocketAddress& stun_addr) {
if (stun_addr.IsUnresolvedIP()) {
ResolveStunAddress(stun_addr);

} else if (socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND) {
// Check if `server_addr_` is compatible with the port's ip.
if (IsCompatibleAddress(stun_addr)) {
request_manager_.Send(
new StunBindingRequest(this, stun_addr, rtc::TimeMillis()));
} else {
// Since we can't send stun messages to the server, we should mark this
// port ready.
const char* reason = "STUN server address is incompatible.";
RTC_LOG(LS_WARNING) << reason;
OnStunBindingOrResolveRequestFailed(stun_addr, SERVER_NOT_REACHABLE_ERROR,
reason);
}
}
}

StunBindingRequest(UDPPort* port,
const rtc::SocketAddress& addr,
int64_t start_time)
: StunRequest(port->request_manager(),
std::make_unique<StunMessage>(STUN_BINDING_REQUEST)),
port_(port),
server_addr_(addr),
start_time_(start_time) {}

const rtc::SocketAddress& server_addr() const { return server_addr_; }

void OnResponse(StunMessage* response) override {
const StunAddressAttribute* addr_attr =
response->GetAddress(STUN_ATTR_MAPPED_ADDRESS);
if (!addr_attr) {
RTC_LOG(LS_ERROR) << "Binding response missing mapped address.";
} else if (addr_attr->family() != STUN_ADDRESS_IPV4 &&
addr_attr->family() != STUN_ADDRESS_IPV6) {
RTC_LOG(LS_ERROR) << "Binding address has bad family";
} else {
rtc::SocketAddress addr(addr_attr->ipaddr(), addr_attr->port());
port_->OnStunBindingRequestSucceeded(this->Elapsed(), server_addr_, addr);
}

// The keep-alive requests will be stopped after its lifetime has passed.
if (WithinLifetime(rtc::TimeMillis())) {
port_->request_manager_.SendDelayed(
new StunBindingRequest(port_, server_addr_, start_time_),
port_->stun_keepalive_delay());
}
}

void UDPPort::OnStunBindingRequestSucceeded(
int rtt_ms,
const rtc::SocketAddress& stun_server_addr,
const rtc::SocketAddress& stun_reflected_addr) {
RTC_DCHECK(stats_.stun_binding_responses_received <
stats_.stun_binding_requests_sent);
stats_.stun_binding_responses_received++;
stats_.stun_binding_rtt_ms_total += rtt_ms;
stats_.stun_binding_rtt_ms_squared_total += rtt_ms * rtt_ms;
if (bind_request_succeeded_servers_.find(stun_server_addr) !=
bind_request_succeeded_servers_.end()) {
return;
}
bind_request_succeeded_servers_.insert(stun_server_addr);
// If socket is shared and `stun_reflected_addr` is equal to local socket
// address and mDNS obfuscation is not enabled, or if the same address has
// been added by another STUN server, then discarding the stun address.
// For STUN, related address is the local socket address.
if ((!SharedSocket() || stun_reflected_addr != socket_->GetLocalAddress() ||
Network()->GetMdnsResponder() != nullptr) &&
!HasStunCandidateWithAddress(stun_reflected_addr)) {
rtc::SocketAddress related_address = socket_->GetLocalAddress();
// If we can't stamp the related address correctly, empty it to avoid leak.
if (!MaybeSetDefaultLocalAddress(&related_address)) {
related_address =
rtc::EmptySocketAddressWithFamily(related_address.family());
}

rtc::StringBuilder url;
url << "stun:" << stun_server_addr.hostname() << ":"
<< stun_server_addr.port();
AddAddress(stun_reflected_addr, socket_->GetLocalAddress(), related_address,
UDP_PROTOCOL_NAME, "", "", STUN_PORT_TYPE,
ICE_TYPE_PREFERENCE_SRFLX, 0, url.str(), false);
}
MaybeSetPortCompleteOrError();
}

udp port 跟 stun/turn server socket连接 发送 SendStunBindingRequest,拿到 related_address。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
.../webrtc/src/p2p/base/port.cc

void Port::AddAddress(const rtc::SocketAddress& address,
const rtc::SocketAddress& base_address,
const rtc::SocketAddress& related_address,
absl::string_view protocol,
absl::string_view relay_protocol,
absl::string_view tcptype,
absl::string_view type,
uint32_t type_preference,
uint32_t relay_preference,
absl::string_view url,
bool is_final) {
RTC_DCHECK_RUN_ON(thread_);
if (protocol == TCP_PROTOCOL_NAME && type == LOCAL_PORT_TYPE) {
RTC_DCHECK(!tcptype.empty());
}

std::string foundation =
ComputeFoundation(type, protocol, relay_protocol, base_address);
Candidate c(component_, protocol, address, 0U, username_fragment(), password_,
type, generation_, foundation, network_->id(), network_cost_);
c.set_priority(
c.GetPriority(type_preference, network_->preference(), relay_preference));
c.set_relay_protocol(relay_protocol);
c.set_tcptype(tcptype);
c.set_network_name(network_->name());
c.set_network_type(network_->type());
c.set_underlying_type_for_vpn(network_->underlying_type_for_vpn());
c.set_url(url);
c.set_related_address(related_address);

bool pending = MaybeObfuscateAddress(&c, type, is_final);

if (!pending) {
FinishAddingAddress(c, is_final);
}
}

void Port::FinishAddingAddress(const Candidate& c, bool is_final) {
candidates_.push_back(c);
SignalCandidateReady(this, c);

PostAddAddress(is_final);
}

void Port::PostAddAddress(bool is_final) {
if (is_final) {
SignalPortComplete(this);
}
}

AddAddress 创建 Candidate,通过 SignalCandidateReady 层层回调返回 回去….

conclusion

整个 Candidate 的 收集过程就走完了 ~ OVER ~

在 WebRTC 中,ICE 协议使用 ICE Candidate 信息来描述设备的网络地址。ICE Candidate 包含以下信息:

媒体类型(音频或视频)
协议类型(UDP 或 TCP)
IP 地址
端口号
套接字类型(IPv4 或 IPv6)
优先级
基础地址类型(服务器反射地址、对称 NAT 地址、中继地址等)
根据 ICE Candidate 中的基础地址类型,可以将 ICE Candidate 分为以下四种类型:

主机候选(host candidate):主机候选是指设备的本地地址,即通过 STUN 服务器获取的本地 IP 地址和端口号。主机候选可以直接用于通信,是 ICE 协议中优先级最高的候选类型。
服务器反射候选(server reflexive candidate):服务器反射候选是指通过 STUN 服务器获取的公网 IP 地址和端口号。服务器反射候选可以用于 NAT 环境下的通信,优先级次于主机候选。(stun bingding request)
中继候选(relay candidate):中继候选是指通过 TURN 服务器获取的 IP 地址和端口号。中继候选可以用于 NAT 环境下的通信,但通信质量可能会较差,优先级最低。

连通性测试过程中,在来自对方的数据报文里看到的地址(peer reflexive,缩写为prflx) (Connection Request)

reference

JSEP