OWT-client-ios信令交互源码分析

OWT

OWT 信令交互过程:

A POST /tokens/
A SocketIO connect
A SocketIO login
A SocketIO publish
A SocketIO soac offer
A SocketIO soac candidate
Portal SocketIO soac answer

B POST /tokens/
B SocketIO connect
B SocketIO login
B SocketIO subscribe
B SocketIO soac offer
B SocketIO soac candidate
Portal SocketIO soac answer

SocketIO logout

token

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-(void)getTokenFromBasicSample:(NSString *)basicServer roomId:(NSString *)roomId onSuccess:(void (^)(NSString *))onSuccess onFailure:(void (^)())onFailure{
AFHTTPRequestOperationManager *manager = [AFHTTPRequestOperationManager manager];
manager.requestSerializer = [AFJSONRequestSerializer serializer];
[manager.requestSerializer setValue:@"*/*" forHTTPHeaderField:@"Accept"];
[manager.requestSerializer setValue:@"application/json" forHTTPHeaderField:@"Content-Type"];
manager.responseSerializer = [AFHTTPResponseSerializer serializer];
manager.securityPolicy.allowInvalidCertificates=YES;
manager.securityPolicy.validatesDomainName=NO;
NSDictionary *params = [[NSDictionary alloc]initWithObjectsAndKeys:roomId, @"room", @"user", @"username", @"presenter", @"role", nil];
[manager POST:[basicServer stringByAppendingString:@"createToken/"] parameters:params success:^(AFHTTPRequestOperation *operation, id responseObject) {
NSData* data=[[NSData alloc]initWithData:responseObject];
onSuccess([[NSString alloc]initWithData:data encoding:NSUTF8StringEncoding]);
} failure:^(AFHTTPRequestOperation *operation, NSError *error) {
NSLog(@"Error: %@", error);
}];
}

data base64 decode之后

1
2
3
4
5
6
{
"tokenId":"63ec8bb0e20782d930e787bb",
"host":"172.19.35.107:8080",
"secure":true,
"signature":"NWE0YjQzM2M1Zjk3NDMwMTRkOGM3Nzg5Zjk3MGQ1YWZkM2I2YmI1MWNmZDk2NzM5NGJiYjBhNWRkMDA2NGE2OQ=="
}

connect

上一步 token 的 response, host 就是 socket链接的 url 地址。

connect就是与server建议长连接(socketIO),OWT.framework对外暴露的OWTConferenceClient中并没有提供connnect的方法,connect的过程其实是在join方法中

join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
../src/talk/owt/sdk/conference/objc/OWTConferenceClient.mm

- (void)joinWithToken:(NSString*)token
onSuccess:(void (^)(OWTConferenceInfo*))onSuccess
onFailure:(void (^)(NSError*))onFailure {
if (token == nil) {
if (onFailure != nil) {
NSError* err = [[NSError alloc]
initWithDomain:OWTErrorDomain
code:OWTConferenceErrorUnknown
userInfo:[[NSDictionary alloc]
initWithObjectsAndKeys:@"Token cannot be nil.",
NSLocalizedDescriptionKey,
nil]];
onFailure(err);
}
return;
}
const std::string nativeToken = [token UTF8String];
__weak OWTConferenceClient *weakSelf = self;
_nativeConferenceClient->Join(
nativeToken,
[=](std::shared_ptr<owt::conference::ConferenceInfo> info) {
if (onSuccess != nil)
onSuccess([[OWTConferenceInfo alloc]
initWithNativeInfo:info]);
},
[=](std::unique_ptr<owt::base::Exception> e) {
[weakSelf triggerOnFailure:onFailure withException:(std::move(e))];
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
../src/talk/owt/sdk/conference/conferenceclient.cc

void ConferenceClient::Join(
const std::string& token,
std::function<void(std::shared_ptr<ConferenceInfo>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
if (signaling_channel_connected_) {
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown,
"Already connected to conference server."));
on_failure(std::move(e));
});
}
return;
}
std::string token_base64(token);
if (!StringUtils::IsBase64EncodedString(token)) {
RTC_LOG(LS_WARNING) << "Passing token with Base64 decoded is deprecated, "
"please pass it without modification.";
token_base64 = rtc::Base64::Encode(token);
}

signaling_channel_->Connect(
token_base64,
[=](sio::message::ptr info) {
signaling_channel_connected_ = true;
// Get current user's participantId, user ID and role and fill in the
// ConferenceInfo.
std::string participant_id, user_id, role;
if (info->get_map()["id"]->get_flag() != sio::message::flag_string ||
info->get_map()["user"]->get_flag() != sio::message::flag_string ||
info->get_map()["role"]->get_flag() != sio::message::flag_string) {
RTC_LOG(LS_ERROR)
<< "Room info doesn't contain participant's ID/uerID/role.";
if (on_failure) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown,
"Received invalid user info from MCU."));
on_failure(std::move(e));
});
}
return;
} else {
participant_id = info->get_map()["id"]->get_string();
user_id = info->get_map()["user"]->get_string();
role = info->get_map()["role"]->get_string();
const std::lock_guard<std::mutex> lock(conference_info_mutex_);
if (current_conference_info_.get()) {
current_conference_info_.reset();
}
current_conference_info_.reset(new ConferenceInfo);
current_conference_info_->self_.reset(
new Participant(participant_id, role, user_id));
}
auto room_info = info->get_map()["room"];
if (room_info == nullptr ||
room_info->get_flag() != sio::message::flag_object) {
RTC_DCHECK(false);
return;
}
if (room_info->get_map()["id"]->get_flag() !=
sio::message::flag_string) {
RTC_DCHECK(false);
return;
} else {
current_conference_info_->id_ =
room_info->get_map()["id"]->get_string();
}
// Trigger OnUserJoin for existed users, and also fill in the
// ConferenceInfo.
if (room_info->get_map()["participants"]->get_flag() !=
sio::message::flag_array) {
RTC_LOG(LS_WARNING) << "Room info doesn't contain valid users.";
} else {
auto users = room_info->get_map()["participants"]->get_vector();
// Make sure |on_success| is triggered before any other events because
// OnUserJoined and OnStreamAdded should be triggered after join a
// conference.
for (auto it = users.begin(); it != users.end(); ++it) {
TriggerOnUserJoined(*it, true);
}
}
// Trigger OnStreamAdded for existed remote streams, and also fill in
// the ConferenceInfo.
if (room_info->get_map()["streams"]->get_flag() !=
sio::message::flag_array) {
RTC_LOG(LS_WARNING) << "Room info doesn't contain valid streams.";
} else {
auto streams = room_info->get_map()["streams"]->get_vector();
for (auto it = streams.begin(); it != streams.end(); ++it) {
RTC_LOG(LS_INFO) << "Find streams in the conference.";
TriggerOnStreamAdded(*it, true);
}
}
#ifdef OWT_ENABLE_QUIC
auto webtransport_token = info->get_map()["webTransportToken"];
if (webtransport_token != nullptr &&
webtransport_token->get_flag() == sio::message::flag_string) {
// Base64 encoded webTransportToken with format:
// {tokenId, transportId, participantId, issueTime}. Parse the transportId
// and save it.
webtransport_token_ =
info->get_map()["webTransportToken"]->get_string();
bool transport_id_get = ParseWebTransportToken();
// If server provides WebTransport channel, prepare the QUIC client as
// well. No underlying webtransport connection setup at this phase.
if (transport_id_get)
InitializeQuicClientIfSupported(token_base64);
}
#endif
// Invoke the success callback before trigger any participant join or
// stream added message.
if (on_success) {
event_queue_->PostTask(
[on_success, this]() { on_success(current_conference_info_); });
}
},
on_failure);
}

可以看到 先进行了 token 的校验,然后 进行 signaling_channel_->Connect,本质上 join 其实就是 执行connect。

publish

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
-(void)doPublish{
if (_localStream == nil) {
#if TARGET_IPHONE_SIMULATOR
NSLog(@"Camera is not supported on simulator");
OWTStreamConstraints* constraints=[[OWTStreamConstraints alloc]init];
constraints.audio=YES;
constraints.video=nil;
#else
/* Create LocalStream with constraints */
OWTStreamConstraints* constraints=[[OWTStreamConstraints alloc] init];
constraints.audio=YES;
constraints.video=[[OWTVideoTrackConstraints alloc] init];
constraints.video.frameRate=24;
constraints.video.resolution=CGSizeMake(640,480);
constraints.video.devicePosition=AVCaptureDevicePositionFront;
#endif
RTCMediaStream *localRTCStream = [self createLocalSenderStream:constraints];
OWTStreamSourceInfo *sourceinfo = [[OWTStreamSourceInfo alloc] init];
sourceinfo.audio = OWTAudioSourceInfoMic;
sourceinfo.video = OWTVideoSourceInfoCamera;
_localStream=[[OWTLocalStream alloc] initWithMediaStream:localRTCStream source:sourceinfo];

#if TARGET_IPHONE_SIMULATOR
NSLog(@"Stream does not have video track.");
#else
dispatch_async(dispatch_get_main_queue(), ^{
[((SFUStreamView *)self.view).localVideoView setCaptureSession:[_capturer captureSession] ];
});
#endif
OWTPublishOptions* options=[[OWTPublishOptions alloc] init];
OWTAudioCodecParameters* opusParameters=[[OWTAudioCodecParameters alloc] init];
opusParameters.name=OWTAudioCodecOpus;
OWTAudioEncodingParameters *audioParameters=[[OWTAudioEncodingParameters alloc] init];
audioParameters.codec=opusParameters;
options.audio=[NSArray arrayWithObjects:audioParameters, nil];
OWTVideoCodecParameters *h264Parameters=[[OWTVideoCodecParameters alloc] init];
h264Parameters.name=OWTVideoCodecH264;
OWTVideoEncodingParameters *videoParameters=[[OWTVideoEncodingParameters alloc]init];
videoParameters.codec=h264Parameters;
options.video=[NSArray arrayWithObjects:videoParameters, nil];
[_conferenceClient publish:_localStream withOptions:options onSuccess:^(OWTConferencePublication* p) {
NSLog(@"[ZSPDEBUG Function:%s Line:%d] publish success! OWTConferencePublication:%@ id:%@", __FUNCTION__,__LINE__,p,p.publicationId);
_publication=p;
_publication.delegate=self;
[self mixToCommonView:p];

} onFailure:^(NSError* err) {
NSLog(@"publish failure!");
[self showMsg:[err localizedFailureReason]];
}];
_screenStream=appDelegate.screenStream;
_remoteStream=appDelegate.mixedStream;
[self subscribe];
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

../src/talk/owt/sdk/conference/objc/OWTConferenceClient.mm

- (void)publish:(OWTLocalStream*)stream
withOptions:(OWTPublishOptions*)options
onSuccess:(void (^)(OWTConferencePublication*))onSuccess
onFailure:(void (^)(NSError*))onFailure {
RTC_CHECK(stream);
auto nativeStreamRefPtr = [stream nativeStream];
std::shared_ptr<owt::base::LocalStream> nativeStream(
std::static_pointer_cast<owt::base::LocalStream>(nativeStreamRefPtr));
__weak OWTConferenceClient *weakSelf = self;
if (options == nil) {
_nativeConferenceClient->Publish(
nativeStream,
[=](std::shared_ptr<owt::conference::ConferencePublication>
publication) {
[_publishedStreams setObject:stream forKey:[stream streamId]];
if (onSuccess != nil)
onSuccess([[OWTConferencePublication alloc]
initWithNativePublication:publication]);
},
[=](std::unique_ptr<owt::base::Exception> e) {
[weakSelf triggerOnFailure:onFailure withException:(std::move(e))];
});
} else {
_nativeConferenceClient->Publish(
nativeStream, *[options nativePublishOptions].get(),
[=](std::shared_ptr<owt::conference::ConferencePublication>
publication) {
[_publishedStreams setObject:stream forKey:[stream streamId]];
if (onSuccess != nil)
onSuccess([[OWTConferencePublication alloc]
initWithNativePublication:publication]);
},
[=](std::unique_ptr<owt::base::Exception> e) {
[weakSelf triggerOnFailure:onFailure withException:(std::move(e))];
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
../src/talk/owt/sdk/conference/conferenceclient.cc

void ConferenceClient::Publish(
std::shared_ptr<LocalStream> stream,
const PublishOptions& options,
std::function<void(std::shared_ptr<ConferencePublication>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) {
RTC_LOG(LS_ERROR) << "Local stream cannot be nullptr.";
return;
}
if (!CheckSignalingChannelOnline(on_failure)) {
RTC_LOG(LS_ERROR) << "Signaling channel disconnected.";
return;
}
#ifdef OWT_ENABLE_QUIC
if (stream->DataEnabled()) {
if (web_transport_channel_ && web_transport_channel_connected_) {
std::weak_ptr<ConferenceClient> weak_this = shared_from_this();
web_transport_channel_->Publish(
stream,
[stream, on_success, weak_this](std::string session_id, std::string transport_id) {
auto that = weak_this.lock();
if (!that)
return;
// map current pcc
if (on_success != nullptr) {
// For QUIC stream we use session_id as stream_id for publication.
RTC_LOG(LS_INFO)
<< "Publication succeed. Returning session id/transport id:"
<< session_id;
std::shared_ptr<ConferencePublication> cp(
new ConferencePublication(that, session_id, session_id));
{
std::lock_guard<std::mutex> lock(that->quic_publications_mutex_);
that->quic_publications_[session_id] = cp;
}
RTC_LOG(LS_INFO)
<< "Writting session id for stream auth:" << session_id;
// Convert to hex16 and write for stream auth.
uint8_t* stream_uuid = new uint8_t[16];
ConvertUUID(session_id.c_str(), stream_uuid);
stream->Stream()->Write(stream_uuid, 16);
delete []stream_uuid;
on_success(cp);
}
},
on_failure);
} else {
RTC_LOG(LS_ERROR)
<< "Cannot publish a quic stream without quic client connected.";
std::string failure_message(
"Publishing quic stream without quic client connected");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
}
return;
}
#endif
if (!CheckNullPointer((uintptr_t)(stream->MediaStream()), on_failure)) {
RTC_LOG(LS_ERROR) << "Cannot publish a local stream without media stream.";
return;
}
if (stream->MediaStream()->GetAudioTracks().size() == 0 &&
stream->MediaStream()->GetVideoTracks().size() == 0) {
RTC_LOG(LS_ERROR) << "Cannot publish a local stream without audio & video";
std::string failure_message(
"Publishing local stream with neither audio nor video.");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
// Reorder SDP according to perference list.
PeerConnectionChannelConfiguration config =
GetPeerConnectionChannelConfiguration();
for (auto codec : options.video) {
config.video.push_back(VideoEncodingParameters(codec));
}
for (auto codec : options.audio) {
config.audio.push_back(AudioEncodingParameters(codec));
}
std::shared_ptr<ConferencePeerConnectionChannel> pcc(
new ConferencePeerConnectionChannel(config, signaling_channel_,
event_queue_));
pcc->AddObserver(*this);
{
std::lock_guard<std::mutex> lock(publish_pcs_mutex_);
publish_pcs_.push_back(pcc);
}
std::weak_ptr<ConferenceClient> weak_this = shared_from_this();
std::string stream_id = stream->Id();
pcc->Publish(stream,
[on_success, weak_this, stream_id](std::string session_id) {
auto that = weak_this.lock();
if (!that)
return;
// map current pcc
if (on_success != nullptr) {
std::shared_ptr<ConferencePublication> cp(
new ConferencePublication(that, session_id, stream_id));
on_success(cp);
}
},
on_failure);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159

../src/talk/owt/sdk/conference/ConferencePeerConnectionChannel.cc

void ConferencePeerConnectionChannel::Publish(
std::shared_ptr<LocalStream> stream,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
RTC_LOG(LS_INFO) << "Publish a local stream.";
published_stream_ = stream;
if ((!CheckNullPointer((uintptr_t)stream.get(), on_failure)) ||
(!CheckNullPointer((uintptr_t)stream->MediaStream(), on_failure))) {
RTC_LOG(LS_INFO) << "Local stream cannot be nullptr.";
}
if (IsMediaStreamEnded(stream->MediaStream())) {
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, "Cannot publish ended stream."));
on_failure(std::move(e));
});
}
return;
}
int audio_track_count = 0, video_track_count = 0;
audio_track_count = stream->MediaStream()->GetAudioTracks().size();
video_track_count = stream->MediaStream()->GetVideoTracks().size();
if (audio_track_count == 0 && video_track_count == 0) {
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown,
"Cannot publish media stream without any tracks."));
on_failure(std::move(e));
});
}
return;
}
publish_success_callback_ = on_success;
failure_callback_ = on_failure;
audio_transceiver_direction_=webrtc::RtpTransceiverDirection::kSendOnly;
video_transceiver_direction_=webrtc::RtpTransceiverDirection::kSendOnly;
sio::message::ptr options = sio::object_message::create();
// attributes
sio::message::ptr attributes_ptr = sio::object_message::create();
for (auto const& attr : stream->Attributes()) {
attributes_ptr->get_map()[attr.first] =
sio::string_message::create(attr.second);
}
options->get_map()[kStreamOptionAttributesKey] = attributes_ptr;
// TODO(jianlin): Currently we fix mid to 0/1. Need
// to update the flow to set local desc for retrieving the mid.
// See https://github.com/open-webrtc-toolkit/owt-client-native/issues/459
// for more details.
sio::message::ptr media_ptr = sio::object_message::create();
sio::message::ptr tracks_ptr = sio::array_message::create();
if (audio_track_count != 0) {
RTC_LOG(LS_INFO) << "Adding audio tracks for publish.";
sio::message::ptr audio_options = sio::object_message::create();
audio_options->get_map()["type"] = sio::string_message::create("audio");
audio_options->get_map()["mid"] = sio::string_message::create("0");
if (stream->Source().audio == owt::base::AudioSourceInfo::kScreenCast) {
audio_options->get_map()["source"] =
sio::string_message::create("screen-cast");
} else {
audio_options->get_map()["source"] = sio::string_message::create("mic");
}
tracks_ptr->get_vector().push_back(audio_options);
}
if (video_track_count != 0) {
RTC_LOG(LS_INFO) << "Adding video tracks for publish.";
sio::message::ptr video_options = sio::object_message::create();
video_options->get_map()["type"] = sio::string_message::create("video");
if (audio_track_count == 0) {
video_options->get_map()["mid"] = sio::string_message::create("0");
} else {
video_options->get_map()["mid"] = sio::string_message::create("1");
}
if (stream->Source().video == owt::base::VideoSourceInfo::kScreenCast) {
video_options->get_map()["source"] =
sio::string_message::create("screen-cast");
} else {
video_options->get_map()["source"] =
sio::string_message::create("camera");
}
tracks_ptr->get_vector().push_back(video_options);
}
media_ptr->get_map()["tracks"] = tracks_ptr;
options->get_map()["media"] = media_ptr;
sio::message::ptr transport_ptr = sio::object_message::create();
transport_ptr->get_map()["type"] = sio::string_message::create("webrtc");
options->get_map()["transport"] = transport_ptr;
SendPublishMessage(options, stream, on_failure);
}

void ConferencePeerConnectionChannel::SendPublishMessage(
sio::message::ptr options,
std::shared_ptr<LocalStream> stream,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
signaling_channel_->SendInitializationMessage(
options, stream->MediaStream()->id(), "",
[stream, this](std::string session_id, std::string transport_id) {
SetSessionId(session_id);
for (const auto& track : stream->MediaStream()->GetAudioTracks()) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.stream_ids.push_back(stream->MediaStream()->id());
transceiver_init.direction = webrtc::RtpTransceiverDirection::kSendOnly;
AddTransceiver(track, transceiver_init);
}
for (const auto& track : stream->MediaStream()->GetVideoTracks()) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.stream_ids.push_back(stream->MediaStream()->id());
transceiver_init.direction =
webrtc::RtpTransceiverDirection::kSendOnly;
if (configuration_.video.size() > 0 &&
configuration_.video[0].rtp_encoding_parameters.size() != 0) {
for (auto encoding :
configuration_.video[0].rtp_encoding_parameters) {
webrtc::RtpEncodingParameters param;
if (encoding.rid != "")
param.rid = encoding.rid;
if (encoding.max_bitrate_bps != 0)
param.max_bitrate_bps = encoding.max_bitrate_bps;
if (encoding.max_framerate != 0)
param.max_framerate = encoding.max_framerate;
if (encoding.scale_resolution_down_by > 0)
param.scale_resolution_down_by =
encoding.scale_resolution_down_by;
if (encoding.num_temporal_layers > 0 &&
encoding.num_temporal_layers <= 4) {
param.num_temporal_layers = encoding.num_temporal_layers;
}
if (encoding.priority != owt::base::NetworkPriority::kDefault) {
switch (encoding.priority) {
case owt::base::NetworkPriority::kVeryLow:
param.network_priority = webrtc::Priority::kVeryLow;
break;
case owt::base::NetworkPriority::kLow:
param.network_priority = webrtc::Priority::kLow;
break;
case owt::base::NetworkPriority::kMedium:
param.network_priority = webrtc::Priority::kMedium;
break;
case owt::base::NetworkPriority::kHigh:
param.network_priority = webrtc::Priority::kHigh;
break;
default:
break;
}
}
param.active = encoding.active;
transceiver_init.send_encodings.push_back(param);
}
}
AddTransceiver(track, transceiver_init);
}
CreateOffer();
},
on_failure);
}

const std::string kEventNamePublish = “publish”;

publish_stream_label 不为空,发送publish message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

void ConferenceSocketSignalingChannel::SendInitializationMessage(
sio::message::ptr options,
std::string publish_stream_label,
std::string subscribe_stream_label,
std::function<void(std::string, std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
sio::message::list message_list;
message_list.push(options);
std::string event_name;
if (publish_stream_label != "")
event_name = kEventNamePublish;
else if (subscribe_stream_label != "")
event_name = kEventNameSubscribe;
Emit(event_name, message_list,
[=](sio::message::list const& msg) {
RTC_LOG(LS_INFO) << "Received ack from server.";
if (on_success == nullptr) {
RTC_LOG(LS_WARNING) << "Does not implement success callback. Make sure "
"it is what you want.";
return;
}
sio::message::ptr message = msg.at(0);
if (message->get_flag() != sio::message::flag_string) {
RTC_LOG(LS_WARNING)
<< "The first element of publish ack is not a string.";
if (on_failure) {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceInvalidParam,
"Received unkown message from server."));
on_failure(std::move(e));
}
return;
}
if (message->get_string() == "ok") {
if (msg.at(1)->get_flag() != sio::message::flag_object) {
RTC_DCHECK(false);
return;
}
// TODO: Spec returns {transportId, publication/subscriptionId} while server impl
// is currently returning id and transportId.
RTC_LOG(LS_ERROR) << "Fetching transport ID:";
std::string session_id = msg.at(1)->get_map()["id"]->get_string();
std::string transport_id("");
auto transport_id_obj = msg.at(1)->get_map()["transportId"];
if (transport_id_obj != nullptr &&
transport_id_obj->get_flag() == sio::message::flag_string) {
transport_id = transport_id_obj->get_string();
}
RTC_LOG(LS_ERROR) << "Session ID:" << session_id
<< ", TransportID:" << transport_id;
if (event_name == kEventNamePublish || event_name == kEventNameSubscribe) {
on_success(session_id, transport_id);
return;
}
return;
} else if (message->get_string() == "error" && msg.at(1) != nullptr &&
msg.at(1)->get_flag() == sio::message::flag_string) {
if (on_failure) {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceNotSupported, msg.at(1)->get_string()));
on_failure(std::move(e));
}
} else {
if (on_failure) {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceInvalidParam,
"Ack for initializing message is not expected."));
on_failure(std::move(e));
}
return;
}
},
on_failure);
}
1
2
3
4
5
6
7
8

../src/talk/owt/sdk/base/PeerConnectionChannel.cc

void PeerConnectionChannel::AddTransceiver(
cricket::MediaType media_type,
const webrtc::RtpTransceiverInit& init) {
peer_connection_->AddTransceiver(media_type, init);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
.../src/third_party/webrtc/pc/peer_connection.cc

RTCErrorOr<rtc::scoped_refptr<RtpTransceiverInterface>>
PeerConnection::AddTransceiver(
cricket::MediaType media_type,
rtc::scoped_refptr<MediaStreamTrackInterface> track,
const RtpTransceiverInit& init,
bool update_negotiation_needed) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (!ConfiguredForMedia()) {
LOG_AND_RETURN_ERROR(RTCErrorType::UNSUPPORTED_OPERATION,
"Not configured for media");
}
RTC_DCHECK((media_type == cricket::MEDIA_TYPE_AUDIO ||
media_type == cricket::MEDIA_TYPE_VIDEO));
if (track) {
RTC_DCHECK_EQ(media_type,
(track->kind() == MediaStreamTrackInterface::kAudioKind
? cricket::MEDIA_TYPE_AUDIO
: cricket::MEDIA_TYPE_VIDEO));
}

RTC_HISTOGRAM_COUNTS_LINEAR(kSimulcastNumberOfEncodings,
init.send_encodings.size(), 0, 7, 8);

size_t num_rids = absl::c_count_if(init.send_encodings,
[](const RtpEncodingParameters& encoding) {
return !encoding.rid.empty();
});
if (num_rids > 0 && num_rids != init.send_encodings.size()) {
LOG_AND_RETURN_ERROR(
RTCErrorType::INVALID_PARAMETER,
"RIDs must be provided for either all or none of the send encodings.");
}

if (num_rids > 0 && absl::c_any_of(init.send_encodings,
[](const RtpEncodingParameters& encoding) {
return !IsLegalRsidName(encoding.rid);
})) {
LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER,
"Invalid RID value provided.");
}

if (absl::c_any_of(init.send_encodings,
[](const RtpEncodingParameters& encoding) {
return encoding.ssrc.has_value();
})) {
LOG_AND_RETURN_ERROR(
RTCErrorType::UNSUPPORTED_PARAMETER,
"Attempted to set an unimplemented parameter of RtpParameters.");
}

RtpParameters parameters;
parameters.encodings = init.send_encodings;

// Encodings are dropped from the tail if too many are provided.
size_t max_simulcast_streams =
media_type == cricket::MEDIA_TYPE_VIDEO ? kMaxSimulcastStreams : 1u;
if (parameters.encodings.size() > max_simulcast_streams) {
parameters.encodings.erase(
parameters.encodings.begin() + max_simulcast_streams,
parameters.encodings.end());
}

// Single RID should be removed.
if (parameters.encodings.size() == 1 &&
!parameters.encodings[0].rid.empty()) {
RTC_LOG(LS_INFO) << "Removing RID: " << parameters.encodings[0].rid << ".";
parameters.encodings[0].rid.clear();
}

// If RIDs were not provided, they are generated for simulcast scenario.
if (parameters.encodings.size() > 1 && num_rids == 0) {
rtc::UniqueStringGenerator rid_generator;
for (RtpEncodingParameters& encoding : parameters.encodings) {
encoding.rid = rid_generator();
}
}

if (UnimplementedRtpParameterHasValue(parameters)) {
LOG_AND_RETURN_ERROR(
RTCErrorType::UNSUPPORTED_PARAMETER,
"Attempted to set an unimplemented parameter of RtpParameters.");
}

auto result = cricket::CheckRtpParametersValues(parameters);
if (!result.ok()) {
LOG_AND_RETURN_ERROR(result.type(), result.message());
}

RTC_LOG(LS_INFO) << "Adding " << cricket::MediaTypeToString(media_type)
<< " transceiver in response to a call to AddTransceiver.";
// Set the sender ID equal to the track ID if the track is specified unless
// that sender ID is already in use.
std::string sender_id = (track && !rtp_manager()->FindSenderById(track->id())
? track->id()
: rtc::CreateRandomUuid());
auto sender = rtp_manager()->CreateSender(
media_type, sender_id, track, init.stream_ids, parameters.encodings);
auto receiver =
rtp_manager()->CreateReceiver(media_type, rtc::CreateRandomUuid());
auto transceiver = rtp_manager()->CreateAndAddTransceiver(sender, receiver);
transceiver->internal()->set_direction(init.direction);

if (update_negotiation_needed) {
sdp_handler_->UpdateNegotiationNeeded();
}

return rtc::scoped_refptr<RtpTransceiverInterface>(transceiver);
}

从 mediastream 获取 tracks 的信息,audio & video 构造publish message, 通过 signaling_channel_ 发送到 OWT server。

publish的过程最终还是进到了 webrtc里面 peer_connection的 AddTransceiver方法。
AddTransceiver 创建 transceiver ,Transceiver表示的是收发相同mid的receiver和sender的一个组合体 ,负责收发媒体数据,以Track为载体。

offer

publish 消息发送成功之后,就构造offer message , SetLocalDescription 后通过 signaling_channel_ 发送到 OWT server。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
../src/talk/owt/sdk/conference/ConferencePeerConnectionChannel.cc

void ConferencePeerConnectionChannel::CreateOffer() {
RTC_LOG(LS_INFO) << "Create offer.";
scoped_refptr<FunctionalCreateSessionDescriptionObserver> observer =
FunctionalCreateSessionDescriptionObserver::Create(
std::bind(&ConferencePeerConnectionChannel::
OnCreateSessionDescriptionSuccess,
this, std::placeholders::_1),
std::bind(&ConferencePeerConnectionChannel::
OnCreateSessionDescriptionFailure,
this, std::placeholders::_1));
bool rtp_no_mux = webrtc::field_trial::IsEnabled("OWT-IceUnbundle");
auto offer_answer_options =
webrtc::PeerConnectionInterface::RTCOfferAnswerOptions();
offer_answer_options.use_rtp_mux = !rtp_no_mux;
peer_connection_->CreateOffer(observer.get(), offer_answer_options);
}


void ConferencePeerConnectionChannel::OnCreateSessionDescriptionSuccess(
webrtc::SessionDescriptionInterface* desc) {
RTC_LOG(LS_INFO) << "Create sdp success.";
scoped_refptr<FunctionalSetSessionDescriptionObserver> observer =
FunctionalSetSessionDescriptionObserver::Create(
std::bind(&ConferencePeerConnectionChannel::
OnSetLocalSessionDescriptionSuccess,
this),
std::bind(&ConferencePeerConnectionChannel::
OnSetLocalSessionDescriptionFailure,
this, std::placeholders::_1));
std::string sdp_string;
if (!desc->ToString(&sdp_string)) {
RTC_LOG(LS_ERROR) << "Error parsing local description.";
RTC_DCHECK(false);
}
std::vector<AudioCodec> audio_codecs;
for (auto& audio_enc_param : configuration_.audio) {
audio_codecs.push_back(audio_enc_param.codec.name);
}
sdp_string = SdpUtils::SetPreferAudioCodecs(sdp_string, audio_codecs);
std::vector<VideoCodec> video_codecs;
for (auto& video_enc_param : configuration_.video) {
video_codecs.push_back(video_enc_param.codec.name);
}
bool is_screen = published_stream_.get() ? (published_stream_->Source().video ==
owt::base::VideoSourceInfo::kScreenCast)
: (subscribed_stream_.get()
? (subscribed_stream_->Source().video ==
owt::base::VideoSourceInfo::kScreenCast)
: false);
sdp_string = SdpUtils::SetPreferVideoCodecs(sdp_string, video_codecs, is_screen);
webrtc::SessionDescriptionInterface* new_desc(
webrtc::CreateSessionDescription(desc->type(), sdp_string, nullptr));
peer_connection_->SetLocalDescription(observer.get(), new_desc);
}

void ConferencePeerConnectionChannel::OnSetLocalSessionDescriptionSuccess() {
RTC_LOG(LS_INFO) << "Set local sdp success.";
// For conference, it's now OK to set bandwidth
ApplyBitrateSettings();
auto desc = LocalDescription();
string sdp;
desc->ToString(&sdp);
sio::message::ptr message = sio::object_message::create();
message->get_map()["id"] = sio::string_message::create(session_id_);
sio::message::ptr sdp_message = sio::object_message::create();
sdp_message->get_map()["type"] = sio::string_message::create(desc->type());
sdp_message->get_map()["sdp"] = sio::string_message::create(sdp);
message->get_map()["signaling"] = sdp_message;
signaling_channel_->SendSdp(message, nullptr, nullptr);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

../src/talk/owt/sdk/conference/ConferenceSocketSignalingChannel.cc

void ConferenceSocketSignalingChannel::SendSdp(
sio::message::ptr message,
std::function<void()> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
std::weak_ptr<ConferenceSocketSignalingChannel> weak_this =
shared_from_this();
sio::message::list message_list(message);
// Add a null message for |to_to_deprecated|. Don't know its meaning.
message_list.push(sio::null_message::create());
Emit(kEventNameSignalingMessage, message_list,
[weak_this, on_success, on_failure](sio::message::list const& msg) {
if (auto that = weak_this.lock()) {
that->OnEmitAck(msg, on_success, on_failure);
}
},
on_failure);
}

answer

先看下 ConferenceSocketSignalingChannel channel 方法中 这段逻辑

1
2
3
4
5
6
7
8
9
10
11
12
../src/talk/owt/sdk/conference/ConferenceSocketSignalingChannel.cc

for (const std::string& notification_name :
{kEventNameStreamMessage, kEventNameTextMessage,
kEventNameOnUserPresence, kEventNameOnSignalingMessage,
kEventNameOnDrop}) {
socket_client_->socket()->on(
notification_name,
sio::socket::event_listener_aux(std::bind(
&ConferenceSocketSignalingChannel::OnNotificationFromServer, this,
std::placeholders::_1, std::placeholders::_2)));
}

const std::string kEventNameSignalingMessage = “soac”; //only for soac message
const std::string kEventNameOnSignalingMessage = “progress”;

发送offer 是 kEventNameSignalingMessage soac 事件, OWT server 收到 offer 之后,会通过 progress事件 返回 answer, 看下 OnNotificationFromServer 逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

../src/talk/owt/sdk/conference/ConferenceSocketSignalingChannel.cc

void ConferenceSocketSignalingChannel::OnNotificationFromServer(
const std::string& name,
sio::message::ptr const& data) {
if (name == kEventNameStreamMessage) {
RTC_LOG(LS_VERBOSE) << "Received stream event.";
if (data->get_map()["status"] != nullptr &&
data->get_map()["status"]->get_flag() == sio::message::flag_string &&
data->get_map()["id"] != nullptr &&
data->get_map()["id"]->get_flag() == sio::message::flag_string) {
std::string stream_status = data->get_map()["status"]->get_string();
std::string stream_id = data->get_map()["id"]->get_string();
if (stream_status == "add") {
auto stream_info = data->get_map()["data"];
if (stream_info != nullptr &&
stream_info->get_flag() == sio::message::flag_object) {
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnStreamAdded(stream_info);
}
}
} else if (stream_status == "update") {
sio::message::ptr update_message = sio::object_message::create();
update_message->get_map()["id"] =
sio::string_message::create(stream_id);
auto stream_update = data->get_map()["data"];
if (stream_update != nullptr &&
stream_update->get_flag() == sio::message::flag_object) {
update_message->get_map()["event"] = stream_update;
}
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnStreamUpdated(update_message);
}
} else if (stream_status == "remove") {
sio::message::ptr remove_message = sio::object_message::create();
remove_message->get_map()["id"] =
sio::string_message::create(stream_id);
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnStreamRemoved(remove_message);
}
}
}
} else if (name == kEventNameTextMessage) {
RTC_LOG(LS_VERBOSE) << "Received custom message.";
std::string from = data->get_map()["from"]->get_string();
std::string message = data->get_map()["message"]->get_string();
std::string to = "me";
auto target = data->get_map()["to"];
if (target != nullptr && target->get_flag() == sio::message::flag_string) {
to = target->get_string();
}
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnCustomMessage(from, message, to);
}
} else if (name == kEventNameOnUserPresence) {
RTC_LOG(LS_VERBOSE) << "Received user join/leave message.";
if (data == nullptr || data->get_flag() != sio::message::flag_object ||
data->get_map()["action"] == nullptr ||
data->get_map()["action"]->get_flag() != sio::message::flag_string) {
RTC_DCHECK(false);
return;
}
auto participant_action = data->get_map()["action"]->get_string();
if (participant_action == "join") {
// Get the pariticipant ID from data;
auto participant_info = data->get_map()["data"];
if (participant_info != nullptr &&
participant_info->get_flag() == sio::message::flag_object &&
participant_info->get_map()["id"] != nullptr &&
participant_info->get_map()["id"]->get_flag() ==
sio::message::flag_string) {
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnUserJoined(participant_info);
}
}
} else if (participant_action == "leave") {
auto participant_info = data->get_map()["data"];
if (participant_info != nullptr &&
participant_info->get_flag() == sio::message::flag_string) {
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnUserLeft(participant_info);
}
}
} else {
RTC_DCHECK_NOTREACHED();
}
} else if (name == kEventNameOnSignalingMessage) {
RTC_LOG(LS_VERBOSE) << "Received signaling message from server.";
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnSignalingMessage(data);
}
} else if (name == kEventNameOnDrop) {
RTC_LOG(LS_INFO) << "Received drop message.";
socket_client_->set_reconnect_attempts(0);
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnServerDisconnected();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

.../src/talk/owt/sdk/conference/conferencepeerconnectionchannel.cc

void ConferencePeerConnectionChannel::OnSignalingMessage(
sio::message::ptr message) {
if (message == nullptr) {
RTC_LOG(LS_INFO) << "Ignore empty signaling message";
return;
}
if (message->get_flag() == sio::message::flag_string) {
if (message->get_string() == "success") {
std::weak_ptr<ConferencePeerConnectionChannel> weak_this =
shared_from_this();
if (publish_success_callback_) {
event_queue_->PostTask([weak_this] {
auto that = weak_this.lock();
std::lock_guard<std::mutex> lock(that->callback_mutex_);
if (!that || !that->publish_success_callback_)
return;
that->publish_success_callback_(that->GetSessionId());
that->ResetCallbacks();
});
} else if (subscribe_success_callback_) {
bool stream_added = false;
{
std::lock_guard<std::mutex> lock(sub_stream_added_mutex_);
stream_added = sub_stream_added_;
sub_server_ready_ = true;
if (stream_added) {
event_queue_->PostTask([weak_this] {
auto that = weak_this.lock();
std::lock_guard<std::mutex> lock(that->callback_mutex_);
if (!that || !that->subscribe_success_callback_)
return;
that->subscribe_success_callback_(that->GetSessionId());
that->ResetCallbacks();
});
sub_server_ready_ = false;
sub_stream_added_ = false;
}
}
}
return;
} else if (message->get_string() == "failure") {
if (!connected_ && failure_callback_) {
std::weak_ptr<ConferencePeerConnectionChannel> weak_this =
shared_from_this();
event_queue_->PostTask([weak_this] {
auto that = weak_this.lock();
std::lock_guard<std::mutex> lock(that->callback_mutex_);
if (!that || !that->failure_callback_)
return;
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown,
"Server internal error during connection establishment."));
that->failure_callback_(std::move(e));
that->ResetCallbacks();
});
}
}
return;
} else if (message->get_flag() != sio::message::flag_object) {
RTC_LOG(LS_WARNING) << "Ignore invalid signaling message from server.";
return;
}
// Since trickle ICE from server is not supported, we parse the message as
// SOAC message, not Canddiate message.
if (message->get_map().find("type") == message->get_map().end()) {
RTC_LOG(LS_INFO) << "Ignore message without type from server.";
return;
}
if (message->get_map()["type"]->get_flag() != sio::message::flag_string ||
message->get_map()["sdp"] == nullptr ||
message->get_map()["sdp"]->get_flag() != sio::message::flag_string) {
RTC_LOG(LS_ERROR) << "Invalid signaling message";
return;
}
const std::string type = message->get_map()["type"]->get_string();
RTC_LOG(LS_INFO) << "On signaling message: " << type;
if (type == "answer") {
const std::string sdp = message->get_map()["sdp"]->get_string();
SetRemoteDescription(type, sdp);
} else {
RTC_LOG(LS_ERROR)
<< "Ignoring signaling message from server other than answer.";
}
}

void ConferencePeerConnectionChannel::SetRemoteDescription(
const std::string& type,
const std::string& sdp) {
std::unique_ptr<webrtc::SessionDescriptionInterface> desc(
webrtc::CreateSessionDescription(
"answer", sdp,
nullptr)); // TODO(jianjun): change answer to type.toLowerCase.
if (!desc) {
RTC_LOG(LS_ERROR) << "Failed to create session description.";
return;
}
scoped_refptr<FunctionalSetRemoteDescriptionObserver> observer =
FunctionalSetRemoteDescriptionObserver::Create(std::bind(
&ConferencePeerConnectionChannel::OnSetRemoteDescriptionComplete,
this, std::placeholders::_1));
peer_connection_->SetRemoteDescription(std::move(desc), observer);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
.../src/talk/owt/sdk/base/peerconnectionchannel.cc

void PeerConnectionChannel::OnSetRemoteDescriptionComplete(
webrtc::RTCError error) {
if (error.ok()) {
OnSetRemoteSessionDescriptionSuccess();
} else {
OnSetRemoteSessionDescriptionFailure(error.message());
}
}

void PeerConnectionChannel::OnSetRemoteSessionDescriptionSuccess() {
RTC_LOG(LS_INFO) << "Set remote sdp success.";
if (peer_connection_->remote_description() &&
peer_connection_->remote_description()->type() == "offer") {
CreateAnswer();
}
}

如果收到对方发的offer,才需要创建answer,如果是SFU模式,client 都是直接跟 owt server 交互,是不需要创建answer,如果是 MCU模式,就需要 CreateAnswer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

.../src/talk/owt/sdk/conference/conferencepeerconnectionchannel.cc

void ConferencePeerConnectionChannel::CreateAnswer() {
RTC_LOG(LS_INFO) << "Create answer.";
scoped_refptr<FunctionalCreateSessionDescriptionObserver> observer =
FunctionalCreateSessionDescriptionObserver::Create(
std::bind(&ConferencePeerConnectionChannel::
OnCreateSessionDescriptionSuccess,
this, std::placeholders::_1),
std::bind(&ConferencePeerConnectionChannel::
OnCreateSessionDescriptionFailure,
this, std::placeholders::_1));
bool rtp_no_mux = webrtc::field_trial::IsEnabled("OWT-IceUnbundle");
auto offer_answer_options =
webrtc::PeerConnectionInterface::RTCOfferAnswerOptions();
offer_answer_options.use_rtp_mux = !rtp_no_mux;
peer_connection_->CreateAnswer(observer.get(), offer_answer_options);
}

CreateAnswer 回调 跟 createoffer 的 回调逻辑处理是一样的 OnCreateSessionDescriptionSuccess,不多赘述

最后也会通过signaling_channel_发给owt server

subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    OWTConferenceSubscribeOptions* subOption =
[[OWTConferenceSubscribeOptions alloc] init];
subOption.video=[[OWTConferenceVideoSubscriptionConstraints alloc]init];
OWTVideoCodecParameters* h264Codec = [[OWTVideoCodecParameters alloc] init];
h264Codec.name = OWTVideoCodecH264;
h264Codec.profile = @"M";
subOption.video.codecs = [NSArray arrayWithObjects:h264Codec, nil];
subOption.audio = [[OWTConferenceAudioSubscriptionConstraints alloc]init];
// OWTAudioCodecParameters* pcmCodec = [[OWTAudioCodecParameters alloc] init];
// pcmCodec.name = OWTAudioCodecPcma;
// subOption.audio.codecs = [NSArray arrayWithObjects:pcmCodec, nil];
// subOption.video.bitrateMultiplier = 2.0f;
int width = INT_MAX;
int height = INT_MAX;
for (NSValue* value in appDelegate.mixedStream.capabilities.video.resolutions) {
CGSize resolution=[value CGSizeValue];
if (resolution.width == 640 && resolution.height == 480) {
width = resolution.width;
height = resolution.height;
break;
}
if (resolution.width < width && resolution.height != 0) {
width = resolution.width;
height = resolution.height;
}
}
[[AVAudioSession sharedInstance]
overrideOutputAudioPort:AVAudioSessionPortOverrideSpeaker
error:nil];
[_conferenceClient subscribe:appDelegate.mixedStream
withOptions:subOption
onSuccess:^(OWTConferenceSubscription* subscription) {
_subscription=subscription;
_subscription.delegate=self;
_getStatsTimer = [NSTimer timerWithTimeInterval:1.0
target:self
selector:@selector(printStats)
userInfo:nil
repeats:YES];
[[NSRunLoop mainRunLoop] addTimer:_getStatsTimer
forMode:NSDefaultRunLoopMode];
dispatch_async(dispatch_get_main_queue(), ^{
_remoteStream = appDelegate.mixedStream;
NSLog(@"Subscribe stream success.");
//[_remoteStream attach:((SFUStreamView*)self.view).remoteVideoView];

UIView<RTCVideoRenderer> *videoView = [_streamView addRemoteRenderer:_remoteStream];
[_remoteStream attach:videoView];

[_streamView.act stopAnimating];
_subscribedMix = YES;
});
}
onFailure:^(NSError* err) {
NSLog(@"Subscribe stream failed. %@", [err localizedDescription]);
}];
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
.../src/talk/owt/sdk/conference/objc/OWTConferenceClient.mm

- (void)subscribe:(OWTRemoteStream*)stream
withOptions:(OWTConferenceSubscribeOptions*)options
onSuccess:(void (^)(OWTConferenceSubscription*))onSuccess
onFailure:(void (^)(NSError*))onFailure {
RTC_CHECK(stream);
auto nativeStreamRefPtr = [stream nativeStream];
std::shared_ptr<owt::base::RemoteStream> nativeStream(
std::static_pointer_cast<owt::base::RemoteStream>(nativeStreamRefPtr));
__weak OWTConferenceClient *weakSelf = self;
if (options == nil) {
_nativeConferenceClient->Subscribe(
nativeStream,
[=](std::shared_ptr<owt::conference::ConferenceSubscription>
subscription) {
OWTConferenceSubscription* sub = [[OWTConferenceSubscription alloc]
initWithNativeSubscription:subscription];
[stream setNativeStream:nativeStream];
if (onSuccess != nil) {
onSuccess(sub);
}
},
[=](std::unique_ptr<owt::base::Exception> e) {
[weakSelf triggerOnFailure:onFailure withException:(std::move(e))];
});
} else {
_nativeConferenceClient->Subscribe(
nativeStream, *[options nativeSubscribeOptions].get(),
[=](std::shared_ptr<owt::conference::ConferenceSubscription>
subscription) {
OWTConferenceSubscription* sub = [[OWTConferenceSubscription alloc]
initWithNativeSubscription:subscription];
[stream setNativeStream:nativeStream];
if (onSuccess != nil) {
onSuccess(sub);
}
},
[=](std::unique_ptr<owt::base::Exception> e) {
[weakSelf triggerOnFailure:onFailure withException:(std::move(e))];
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

.../src/talk/owt/sdk/conference/conferenceclient.cc

void ConferenceClient::Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions& options,
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
if (!CheckSignalingChannelOnline(on_failure)) {
return;
}
#ifdef OWT_ENABLE_QUIC
if (stream->DataEnabled()) {
RTC_LOG(LS_INFO) << "Requesting subscibe of quic stream.";
if (web_transport_channel_ && web_transport_channel_connected_) {
std::weak_ptr<ConferenceClient> weak_this = shared_from_this();
web_transport_channel_->Subscribe(
stream,
[on_success, weak_this](std::string session_id) {
auto that = weak_this.lock();
if (!that)
return;
if (on_success != nullptr) {
// For QUIC stream we use session_id as stream_id for publication.
std::shared_ptr<ConferenceSubscription> cp(
new ConferenceSubscription(that, session_id, session_id));
{
std::lock_guard<std::mutex> lock(
that->quic_subscriptions_mutex_);
that->quic_subscriptions_[session_id] = cp;
}
// Check if any pending stream for this to be attached.
{
std::lock_guard<std::mutex> stream_lock(
that->pending_quic_streams_mutex_);
if (that->pending_incoming_streams_.find(session_id) !=
that->pending_incoming_streams_.end()) {
that->TriggerOnIncomingStream(
session_id, that->pending_incoming_streams_[session_id]);
that->pending_incoming_streams_.erase(session_id);
}
}
on_success(cp);
}
},
on_failure);
} else {
RTC_LOG(LS_ERROR)
<< "Cannot subscribe a quic stream without quic client connected.";
std::string failure_message(
"Subscribing quic stream without quic client connected");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
}
return;
}
#endif

if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) {
RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr.";
return;
}
if (added_stream_type_.find(stream->Id()) == added_stream_type_.end()) {
std::string failure_message(
"Subscribing an invalid stream. Please check whether this stream is "
"removed.");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
if (options.video.disabled && options.audio.disabled) {
std::string failure_message(
"Subscribing with both audio and video disabled is not allowed.");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
// Avoid subscribing the same stream twice.
{
std::lock_guard<std::mutex> lock(subscribe_pcs_mutex_);
// Search subscirbe pcs
auto it = std::find_if(
subscribe_pcs_.begin(), subscribe_pcs_.end(),
[&](std::shared_ptr<ConferencePeerConnectionChannel> o) -> bool {
return o->GetSubStreamId() == stream->Id();
});
if (it != subscribe_pcs_.end()) {
std::string failure_message(
"The same remote stream has already been subscribed. Subcribe after "
"it is unsubscribed");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
}
// Reorder SDP according to perference list.
PeerConnectionChannelConfiguration config =
GetPeerConnectionChannelConfiguration();
for (auto codec : options.video.codecs) {
config.video.push_back(VideoEncodingParameters(codec, 0, false));
}
for (auto codec : options.audio.codecs) {
config.audio.push_back(AudioEncodingParameters(codec, 0));
}
std::shared_ptr<ConferencePeerConnectionChannel> pcc(
new ConferencePeerConnectionChannel(config, signaling_channel_,
event_queue_));
pcc->AddObserver(*this);
{
std::lock_guard<std::mutex> lock(subscribe_pcs_mutex_);
subscribe_pcs_.push_back(pcc);
}
std::weak_ptr<ConferenceClient> weak_this = shared_from_this();
std::string stream_id = stream->Id();
pcc->Subscribe(
stream, options,
[on_success, weak_this, stream_id](std::string session_id) {
auto that = weak_this.lock();
if (!that)
return;
// map current pcc
if (on_success != nullptr) {
std::shared_ptr<ConferenceSubscription> cp(
new ConferenceSubscription(that, session_id, stream_id));
on_success(cp);
}
},
on_failure);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139

.../src/talk/owt/sdk/conference/conferencepeerconnectionchannel.cc

void ConferencePeerConnectionChannel::Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions& subscribe_options,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
RTC_LOG(LS_INFO) << "Subscribe a remote stream. It has audio? "
<< stream->has_audio_ << ", has video? "
<< stream->has_video_;
if (!SubOptionAllowed(subscribe_options, stream->Settings(), stream->Capabilities())) {
RTC_LOG(LS_ERROR)
<< "Subscribe option mismatch with stream subcription capabilities.";
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown,
"Unsupported subscribe option."));
on_failure(std::move(e));
});
}
return;
}
if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) {
RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr.";
return;
}
if (subscribe_success_callback_) {
if (on_failure) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, "Subscribing this stream."));
on_failure(std::move(e));
});
}
}
subscribe_success_callback_ = on_success;
failure_callback_ = on_failure;
int audio_track_count = 0, video_track_count = 0;
if (stream->has_audio_ && !subscribe_options.audio.disabled) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly;
AddTransceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, transceiver_init);
audio_track_count = 1;
}
if (stream->has_video_ && !subscribe_options.video.disabled) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly;
AddTransceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, transceiver_init);
video_track_count = 1;
}
sio::message::ptr sio_options = sio::object_message::create();
sio::message::ptr media_options = sio::object_message::create();
sio::message::ptr tracks_options = sio::array_message::create();
if (audio_track_count > 0) {
sio::message::ptr audio_options = sio::object_message::create();
audio_options->get_map()["type"] = sio::string_message::create("audio");
audio_options->get_map()["mid"] = sio::string_message::create("0");
audio_options->get_map()["from"] =
sio::string_message::create(stream->Id());
tracks_options->get_vector().push_back(audio_options);
}
if (video_track_count > 0) {
sio::message::ptr video_options = sio::object_message::create();
video_options->get_map()["type"] = sio::string_message::create("video");
if (audio_track_count == 0) {
video_options->get_map()["mid"] = sio::string_message::create("0");
} else {
video_options->get_map()["mid"] = sio::string_message::create("1");
}
auto publication_settings = stream->Settings();
if (subscribe_options.video.rid != "") {
for (auto video_setting : publication_settings.video) {
if (video_setting.rid == subscribe_options.video.rid) {
std::string track_id = video_setting.track_id;
video_options->get_map()["from"] =
sio::string_message::create(track_id);
break;
}
}
} else {
video_options->get_map()["from"] =
sio::string_message::create(stream->Id());
}
sio::message::ptr video_spec = sio::object_message::create();
sio::message::ptr resolution_options = sio::object_message::create();
if (subscribe_options.video.resolution.width != 0 &&
subscribe_options.video.resolution.height != 0) {
resolution_options->get_map()["width"] =
sio::int_message::create(subscribe_options.video.resolution.width);
resolution_options->get_map()["height"] =
sio::int_message::create(subscribe_options.video.resolution.height);
video_spec->get_map()["resolution"] = resolution_options;
}
// If bitrateMultiplier is not specified, do not include it in video spec.
std::string quality_level("x1.0");
if (subscribe_options.video.bitrateMultiplier != 0) {
quality_level =
"x" + std::to_string(subscribe_options.video.bitrateMultiplier)
.substr(0, 3);
}
if (quality_level != "x1.0") {
sio::message::ptr quality_options =
sio::string_message::create(quality_level);
video_spec->get_map()["bitrate"] = quality_options;
}
if (subscribe_options.video.keyFrameInterval != 0) {
video_spec->get_map()["keyFrameInterval"] =
sio::int_message::create(subscribe_options.video.keyFrameInterval);
}
if (subscribe_options.video.frameRate != 0) {
video_spec->get_map()["framerate"] =
sio::int_message::create(subscribe_options.video.frameRate);
}
video_options->get_map()["parameters"] = video_spec;
if (subscribe_options.video.rid != "") {
video_options->get_map()["simulcastRid"] =
sio::string_message::create(subscribe_options.video.rid);
}
tracks_options->get_vector().push_back(video_options);
}

media_options->get_map()["tracks"] = tracks_options;
sio_options->get_map()["media"] = media_options;
sio::message::ptr transport_ptr = sio::object_message::create();
transport_ptr->get_map()["type"] = sio::string_message::create("webrtc");
sio_options->get_map()["transport"] = transport_ptr;

signaling_channel_->SendInitializationMessage(
sio_options, "", stream->Id(),
[this](std::string session_id, std::string transport_id) {
// Pre-set the session's ID.
SetSessionId(session_id);
CreateOffer();
},
on_failure); // TODO: on_failure
subscribed_stream_ = stream;
}

subscribe 的过程 跟 publish有很多类似的地方,AddTransceiver 创建 sender & receiver 收发媒体流,构造subscribe message 通过 signaling_channel_ 发送 到 owt server
发送成功之后,创建offer、发送sdp、setlocalsdp 、接受owt server 的answer、setremotesdp 等等跟publish的过程是一样,不再赘述。

signaling channel

看下在 ConferenceSocketSignalingChannel::Connect 方法中的一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

const std::string kEventNameStreamMessage = "stream";
const std::string kEventNameTextMessage = "text";
const std::string kEventNameOnUserPresence = "participant";
const std::string kEventNameOnSignalingMessage = "progress";
const std::string kEventNameOnDrop = "drop";

for (const std::string& notification_name :
{kEventNameStreamMessage, kEventNameTextMessage,
kEventNameOnUserPresence, kEventNameOnSignalingMessage,
kEventNameOnDrop}) {
socket_client_->socket()->on(
notification_name,
sio::socket::event_listener_aux(std::bind(
&ConferenceSocketSignalingChannel::OnNotificationFromServer, this,
std::placeholders::_1, std::placeholders::_2)));
}

kEventNameOnSignalingMessage 这个之前介绍过了

kEventNameOnUserPresence : 有用户加入或者离开(action 区分),server 通过 participant 事件通知 client,
kEventNameStreamMessage : 用户发布流后,server 通过 stream 事件通知 client
kEventNameTextMessage : 自定义消息
kEventNameOnDrop : server 断开连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
.../src/talk/owt/sdk/conference/conferencesocketsignalingchannel.cc

void ConferenceSocketSignalingChannel::OnNotificationFromServer(
const std::string& name,
sio::message::ptr const& data) {
if (name == kEventNameStreamMessage) {
RTC_LOG(LS_VERBOSE) << "Received stream event.";
if (data->get_map()["status"] != nullptr &&
data->get_map()["status"]->get_flag() == sio::message::flag_string &&
data->get_map()["id"] != nullptr &&
data->get_map()["id"]->get_flag() == sio::message::flag_string) {
std::string stream_status = data->get_map()["status"]->get_string();
std::string stream_id = data->get_map()["id"]->get_string();
if (stream_status == "add") {
auto stream_info = data->get_map()["data"];
if (stream_info != nullptr &&
stream_info->get_flag() == sio::message::flag_object) {
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnStreamAdded(stream_info);
}
}
} else if (stream_status == "update") {
sio::message::ptr update_message = sio::object_message::create();
update_message->get_map()["id"] =
sio::string_message::create(stream_id);
auto stream_update = data->get_map()["data"];
if (stream_update != nullptr &&
stream_update->get_flag() == sio::message::flag_object) {
update_message->get_map()["event"] = stream_update;
}
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnStreamUpdated(update_message);
}
} else if (stream_status == "remove") {
sio::message::ptr remove_message = sio::object_message::create();
remove_message->get_map()["id"] =
sio::string_message::create(stream_id);
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnStreamRemoved(remove_message);
}
}
}
} else if (name == kEventNameTextMessage) {
RTC_LOG(LS_VERBOSE) << "Received custom message.";
std::string from = data->get_map()["from"]->get_string();
std::string message = data->get_map()["message"]->get_string();
std::string to = "me";
auto target = data->get_map()["to"];
if (target != nullptr && target->get_flag() == sio::message::flag_string) {
to = target->get_string();
}
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnCustomMessage(from, message, to);
}
} else if (name == kEventNameOnUserPresence) {
RTC_LOG(LS_VERBOSE) << "Received user join/leave message.";
if (data == nullptr || data->get_flag() != sio::message::flag_object ||
data->get_map()["action"] == nullptr ||
data->get_map()["action"]->get_flag() != sio::message::flag_string) {
RTC_DCHECK(false);
return;
}
auto participant_action = data->get_map()["action"]->get_string();
if (participant_action == "join") {
// Get the pariticipant ID from data;
auto participant_info = data->get_map()["data"];
if (participant_info != nullptr &&
participant_info->get_flag() == sio::message::flag_object &&
participant_info->get_map()["id"] != nullptr &&
participant_info->get_map()["id"]->get_flag() ==
sio::message::flag_string) {
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnUserJoined(participant_info);
}
}
} else if (participant_action == "leave") {
auto participant_info = data->get_map()["data"];
if (participant_info != nullptr &&
participant_info->get_flag() == sio::message::flag_string) {
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnUserLeft(participant_info);
}
}
} else {
RTC_DCHECK_NOTREACHED();
}
} else if (name == kEventNameOnSignalingMessage) {
RTC_LOG(LS_VERBOSE) << "Received signaling message from server.";
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnSignalingMessage(data);
}
} else if (name == kEventNameOnDrop) {
RTC_LOG(LS_INFO) << "Received drop message.";
socket_client_->set_reconnect_attempts(0);
std::lock_guard<std::mutex> lock(observer_mutex_);
for (auto it = observers_.begin(); it != observers_.end(); ++it) {
(*it)->OnServerDisconnected();
}
}
}

observer & delegate

各种事件的传递,通过 observer & delegate 回调的方式 从 c++ 到 OC

以conferenceClient举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

.../src/talk/owt/sdk/conference/objc/OWTConferenceClient.mm

- (void)setDelegate:(id<OWTConferenceClientDelegate>)delegate {
if (delegate != nil) {
__weak OWTConferenceClient *weakSelf = self;
_observer = std::unique_ptr<
owt::conference::ConferenceClientObserverObjcImpl,
std::function<void(owt::conference::ConferenceClientObserverObjcImpl*)>>(
new owt::conference::ConferenceClientObserverObjcImpl(weakSelf, delegate),
[=](owt::conference::ConferenceClientObserverObjcImpl* observer) {
__strong OWTConferenceClient *strongSelf = weakSelf;
if (strongSelf != nil) {
strongSelf->_nativeConferenceClient->RemoveObserver(*observer);
}
delete observer;
});
_nativeConferenceClient->AddObserver(*_observer.get());
} else {
_observer.reset();
}
_delegate = delegate;
}

通过 ConferenceClientObserverObjcImpl 这个类包装下,打通 oc 跟 c++ 链路。
类似的用法 还有 ConferencePublicationObserverObjcImpl 、ConferenceSubscriptionObserverObjcImpl、ParticipantObserverObjcImpl 不一一说了

再来看下 各种 observer 的定义 以及 AddObserver & RemoveObserver 的方法声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

.../src/talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h

class OWT_EXPORT ConferenceClientObserver {
public:
/**
@brief Triggers when a stream is added.
@param stream The stream which is added.
*/
virtual void OnStreamAdded(
std::shared_ptr<RemoteStream> stream){}
/**
@brief Triggers when a mixed stream is added.
@param stream The stream which is added.
*/
virtual void OnStreamAdded(
std::shared_ptr<RemoteMixedStream> stream){}
/**
@brief Triggers when a message is received.
@param message Message received.
@param sender_id Sender's ID.
@param to "all" if it is a broadcast message. "me"
if it is sent only to current conference client.
*/
virtual void OnMessageReceived(std::string& message,
std::string& sender_id,
std::string& to){}
/**
@brief Triggers when a participant joined conference.
@param user The user joined.
*/
virtual void OnParticipantJoined(std::shared_ptr<Participant>){}
/**
@brief Triggers when server is disconnected.
*/
virtual void OnServerDisconnected(){}
};

/// An asynchronous class for app to communicate with a conference in MCU.
class OWT_EXPORT ConferenceClient final
: ConferenceSocketSignalingChannelObserver,
ConferencePeerConnectionChannelObserver

...
/// Add an observer for conferenc client.
void AddObserver(ConferenceClientObserver& observer);
/// Remove an object from conference client.
void RemoveObserver(ConferenceClientObserver& observer);

...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
.../src/talk/owt/sdk/conference/conferencepeerconnectionchannel.h

class ConferencePeerConnectionChannel
: public PeerConnectionChannel,
public std::enable_shared_from_this<ConferencePeerConnectionChannel> {
public:

// Add a ConferencePeerConnectionChannel observer so it will be notified when
// this object have some events.
void AddObserver(ConferencePeerConnectionChannelObserver& observer);
// Remove a ConferencePeerConnectionChannel observer. If the observer doesn't
// exist, it will do nothing.
void RemoveObserver(ConferencePeerConnectionChannelObserver& observer);

...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
.../src/talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h

class OWT_EXPORT ConferenceSocketSignalingChannelObserver {
public:
virtual ~ConferenceSocketSignalingChannelObserver(){}
virtual void OnUserJoined(std::shared_ptr<sio::message> user) = 0;
virtual void OnUserLeft(std::shared_ptr<sio::message> user) = 0;
virtual void OnStreamAdded(std::shared_ptr<sio::message> stream) = 0;
virtual void OnStreamRemoved(std::shared_ptr<sio::message> stream) = 0;
virtual void OnStreamUpdated(std::shared_ptr<sio::message> stream) = 0;
virtual void OnServerDisconnected() = 0;
virtual void OnCustomMessage(std::string& from, std::string& message, std::string& to) = 0;
virtual void OnSignalingMessage(std::shared_ptr<sio::message> message) = 0;
virtual void OnStreamError(std::shared_ptr<sio::message> stream) = 0;
// Notify the ID for a published/subscribed stream.
virtual void OnStreamId(const std::string& id, const std::string& label) = 0;
virtual void OnSubscriptionId(const std::string& subscription_id,
const std::string& stream_id) = 0;
};

class OWT_EXPORT ConferencePeerConnectionChannelObserver {
public:
virtual ~ConferencePeerConnectionChannelObserver(){}
// Triggered when an unrecoverable error happened. Error may reported by MCU
// or detected by client. Currently, only errors from MCU are handled.
virtual void OnStreamError(
std::shared_ptr<Stream> stream,
std::shared_ptr<const Exception> exception) = 0;
};