168{
169
170 m_pWebSocket->open(szSignallingServerURL);
171
173
175
176
177 m_pWebSocket->onOpen(
178 [this]()
179 {
180
181 LOG_INFO(logging::g_qSharedLogger, "Connected to the signalling server via {}. Checking if stream {} exists...", m_szSignallingServerURL, m_szStreamerID);
182
183
184 nlohmann::json jsnStreamList;
185 jsnStreamList["type"] = "listStreamers";
186 m_pWebSocket->send(jsnStreamList.dump());
187 });
188
189
190 m_pWebSocket->onClosed(
191 [this]()
192 {
193
194 LOG_INFO(logging::g_qSharedLogger, "Closed {} stream and disconnected from the signalling server.", m_szStreamerID);
195 });
196
197
198 m_pWebSocket->onMessage(
199 [this](std::variant<rtc::binary, rtc::string> rtcMessage)
200 {
201 try
202 {
203
204 nlohmann::json jsnMessage;
205
206
207 if (std::holds_alternative<rtc::string>(rtcMessage))
208 {
209
210 std::string szMessage = std::get<rtc::string>(rtcMessage);
211
212
213 jsnMessage = nlohmann::json::parse(szMessage);
214 LOG_DEBUG(logging::g_qSharedLogger, "Received message from signalling server: {}", szMessage);
215 }
216 else if (std::holds_alternative<rtc::binary>(rtcMessage))
217 {
218
219 rtc::binary rtcBinaryData = std::get<rtc::binary>(rtcMessage);
220
221 LOG_DEBUG(logging::g_qSharedLogger, "Received binary data of length: {}", rtcBinaryData.size());
222
223
224 std::string szBinaryDataStr(reinterpret_cast<const char*>(rtcBinaryData.data()), rtcBinaryData.size());
225
226 LOG_DEBUG(logging::g_qSharedLogger, "Received binary data: {}", szBinaryDataStr);
227
228 jsnMessage = nlohmann::json::parse(szBinaryDataStr);
229 }
230 else
231 {
232 LOG_ERROR(logging::g_qSharedLogger, "Received unknown message type from signalling server");
233 }
234
235
236 if (jsnMessage.contains("type"))
237 {
238 std::string szType = jsnMessage["type"];
239
240 if (szType == "config")
241 {
242
243 LOG_DEBUG(logging::g_qSharedLogger, "Received config message from signalling server: {}", jsnMessage.dump());
244 }
245
246 else if (szType == "offer")
247 {
248
249 std::string sdp = jsnMessage["sdp"];
250 m_pPeerConnection->setRemoteDescription(rtc::Description(sdp, "offer"));
251 LOG_DEBUG(logging::g_qSharedLogger, "Processing SDP offer from signalling server: {}", sdp);
252 }
253
254 else if (szType == "answer")
255 {
256
257 std::string sdp = jsnMessage["sdp"];
258 m_pPeerConnection->setRemoteDescription(rtc::Description(sdp, "answer"));
259 LOG_DEBUG(logging::g_qSharedLogger, "Processing SDP answer from signalling server: {}", sdp);
260 }
261
262 else if (szType == "iceCandidate")
263 {
264
265 nlohmann::json jsnCandidate = jsnMessage["candidate"];
266 std::string szCandidateStr = jsnCandidate["candidate"];
267
268 rtc::Candidate rtcCandidate = rtc::Candidate(szCandidateStr);
269 m_pPeerConnection->addRemoteCandidate(rtcCandidate);
270 LOG_DEBUG(logging::g_qSharedLogger, "Added ICE candidate to peer connection: {}", szCandidateStr);
271 }
272 else if (szType == "streamerList")
273 {
274
275 LOG_DEBUG(logging::g_qSharedLogger, "Streamer List: {}", jsnMessage.dump());
276
277
278 if (jsnMessage.contains("ids"))
279 {
280 std::vector<std::string> streamerList = jsnMessage["ids"].get<std::vector<std::string>>();
281 if (std::find(streamerList.begin(), streamerList.end(), m_szStreamerID) != streamerList.end())
282 {
283
284 nlohmann::json jsnStream;
285 jsnStream["type"] = "subscribe";
286 jsnStream["streamerId"] = m_szStreamerID;
287 m_pWebSocket->send(jsnStream.dump());
288
289 LOG_DEBUG(logging::g_qSharedLogger, "Streamer ID {} found in the streamer list. Subscribing to stream...", m_szStreamerID);
290 }
291 else
292 {
293 LOG_ERROR(logging::g_qSharedLogger, "Streamer ID {} not found in the streamer list!", m_szStreamerID);
294 }
295 }
296 else
297 {
298 LOG_ERROR(logging::g_qSharedLogger, "Streamer list does not contain 'ids' field!");
299 }
300 }
301 else
302 {
303 LOG_ERROR(logging::g_qSharedLogger, "Unknown message type received from signalling server: {}", szType);
304 }
305 }
306 }
307 catch (const std::exception& e)
308 {
309
310 LOG_ERROR(logging::g_qSharedLogger, "Error occurred while negotiating with the Signalling Server: {}", e.what());
311 }
312 });
313
314 m_pWebSocket->onError(
315 [this](const std::string& szError)
316 {
317
318 LOG_ERROR(logging::g_qSharedLogger, "Error occurred on WebSocket: {}", szError);
319 });
320
322
324
325 m_pPeerConnection->onLocalDescription(
326 [this](rtc::Description rtcDescription)
327 {
328
329 if (rtcDescription.typeString() == "offer")
330 {
331 return;
332 }
333
334
335 nlohmann::json jsnConfigMessage;
336 jsnConfigMessage["type"] = "layerPreference";
337 jsnConfigMessage["spatialLayer"] = 0;
338 jsnConfigMessage["temporalLayer"] = 0;
339 jsnConfigMessage["playerId"] = "";
340 m_pWebSocket->send(jsnConfigMessage.dump());
341
342
343 nlohmann::json jsnMessage;
344 jsnMessage["type"] = rtcDescription.typeString();
345
346 jsnMessage["minBitrateBps"] = 0;
347 jsnMessage["maxBitrateBps"] = 0;
348
349 std::string szSDP = rtcDescription.generateSdp();
350
351 std::string szMungedSDP =
352 std::regex_replace(szSDP, std::regex("(a=fmtp:\\d+ level-asymmetry-allowed=.*)\r\n"), "$1;x-google-start-bitrate=10000;x-google-max-bitrate=100000\r\n");
353 jsnMessage["sdp"] = szMungedSDP;
354
355 m_pWebSocket->send(jsnMessage.dump());
356
357
358 LOG_DEBUG(logging::g_qSharedLogger, "Sending local description to signalling server: {}", jsnMessage.dump());
359 });
360
361 m_pPeerConnection->onTrack(
362 [this](std::shared_ptr<rtc::Track> rtcTrack)
363 {
364
365 rtc::Description::Media rtcMediaDescription = rtcTrack->description();
366
367 std::string szMediaType = rtcMediaDescription.type();
368
369
370 if (szMediaType != "video")
371 {
372 return;
373 }
374
375
376 m_pVideoTrack1 = rtcTrack;
377
378
379 m_pTrack1H264DepacketizationHandler = std::make_shared<rtc::H264RtpDepacketizer>(rtc::NalUnit::Separator::LongStartSequence);
380 m_pTrack1RtcpReceivingSession = std::make_shared<rtc::RtcpReceivingSession>();
381 m_pTrack1H264DepacketizationHandler->addToChain(m_pTrack1RtcpReceivingSession);
382 m_pVideoTrack1->setMediaHandler(m_pTrack1H264DepacketizationHandler);
383
384
385 m_pVideoTrack1->onFrame(
386 [this](rtc::binary rtcBinaryMessage, rtc::FrameInfo rtcFrameInfo)
387 {
388
389 if (rtcFrameInfo.payloadType == 96)
390 {
391
392 std::vector<uint8_t> vH264EncodedBytes;
393 vH264EncodedBytes.reserve(rtcBinaryMessage.size());
394 for (std::byte stdByte : rtcBinaryMessage)
395 {
396 vH264EncodedBytes.push_back(static_cast<uint8_t>(stdByte));
397 }
398
399
400 std::unique_lock<std::shared_mutex> lkDecoderLock(m_muDecoderMutex);
401
403
404
405 if (m_fnOnFrameReceivedCallback)
406 {
407
408 m_fnOnFrameReceivedCallback(m_cvFrame);
409 }
410
411 lkDecoderLock.unlock();
412 }
413 });
414 });
415
416 m_pPeerConnection->onGatheringStateChange(
417 [this](rtc::PeerConnection::GatheringState eGatheringState)
418 {
419
420 switch (eGatheringState)
421 {
422 case rtc::PeerConnection::GatheringState::Complete: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE gathering state changed to: Complete"); break;
423
424 case rtc::PeerConnection::GatheringState::InProgress:
425 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE gathering state changed to: InProgress");
426 break;
427 case rtc::PeerConnection::GatheringState::New: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE gathering state changed to: New"); break;
428 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection ICE gathering state changed to: Unknown"); break;
429 }
430 });
431
432 m_pPeerConnection->onIceStateChange(
433 [this](rtc::PeerConnection::IceState eIceState)
434 {
435
436 switch (eIceState)
437 {
438 case rtc::PeerConnection::IceState::Checking: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Checking"); break;
439 case rtc::PeerConnection::IceState::Closed: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Closed"); break;
440 case rtc::PeerConnection::IceState::Completed: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Completed"); break;
441 case rtc::PeerConnection::IceState::Connected: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Connected"); break;
442 case rtc::PeerConnection::IceState::Disconnected: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Disconnected"); break;
443 case rtc::PeerConnection::IceState::Failed: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Failed"); break;
444 case rtc::PeerConnection::IceState::New: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: New"); break;
445 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection ICE state changed to: Unknown"); break;
446 }
447 });
448 m_pPeerConnection->onSignalingStateChange(
449 [this](rtc::PeerConnection::SignalingState eSignalingState)
450 {
451
452 switch (eSignalingState)
453 {
454 case rtc::PeerConnection::SignalingState::HaveLocalOffer:
455 {
456 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveLocalOffer");
457 break;
458 }
459 case rtc::PeerConnection::SignalingState::HaveLocalPranswer:
460 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveLocalPranswer");
461 break;
462 case rtc::PeerConnection::SignalingState::HaveRemoteOffer:
463 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveRemoteOffer");
464 break;
465 case rtc::PeerConnection::SignalingState::HaveRemotePranswer:
466 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveRemotePrAnswer");
467 break;
468 case rtc::PeerConnection::SignalingState::Stable: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: Stable"); break;
469 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection signaling state changed to: Unknown"); break;
470 }
471 });
472
473 m_pPeerConnection->onStateChange(
474 [this](rtc::PeerConnection::State eState)
475 {
476
477 switch (eState)
478 {
479 case rtc::PeerConnection::State::Closed: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Closed"); break;
480 case rtc::PeerConnection::State::Connected: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Connected"); break;
481 case rtc::PeerConnection::State::Connecting: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Connecting"); break;
482 case rtc::PeerConnection::State::Disconnected: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Disconnected"); break;
483 case rtc::PeerConnection::State::Failed: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Failed"); break;
484 case rtc::PeerConnection::State::New: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: New"); break;
485 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Unknown"); break;
486 }
487 });
488
490
492
493 m_pDataChannel->onOpen(
494 [this]()
495 {
496
497 LOG_INFO(logging::g_qSharedLogger, "Data channel opened.");
498
499
500 m_pDataChannel->send(std::string(1, static_cast<char>(1)));
501 });
502
503 m_pDataChannel->onMessage(
504 [this](std::variant<rtc::binary, rtc::string> rtcMessage)
505 {
506 try
507 {
508
509 nlohmann::json jsnMessage;
510
511
512 if (std::holds_alternative<rtc::string>(rtcMessage))
513 {
514
515 std::string szMessage = std::get<rtc::string>(rtcMessage);
516
517
518 jsnMessage = nlohmann::json::parse(szMessage);
519 LOG_NOTICE(logging::g_qSharedLogger, "DATA_CHANNEL Received message from peer: {}", szMessage);
520 }
521 else if (std::holds_alternative<rtc::binary>(rtcMessage))
522 {
523
524 rtc::binary rtcBinaryData = std::get<rtc::binary>(rtcMessage);
525
526
527 std::string szBinaryDataStr;
528 for (auto byte : rtcBinaryData)
529 {
530 if (std::isprint(static_cast<unsigned char>(byte)))
531 {
532 szBinaryDataStr += static_cast<char>(byte);
533 }
534 }
535
536
537 LOG_DEBUG(logging::g_qSharedLogger, "DATA_CHANNEL Received binary data ({} bytes): {}", rtcBinaryData.size(), szBinaryDataStr);
538 }
539 else
540 {
541 LOG_ERROR(logging::g_qSharedLogger, "Received unknown message type from peer");
542 }
543 }
544 catch (const std::exception& e)
545 {
546
547 LOG_ERROR(logging::g_qSharedLogger, "Error occurred while negotiating with the datachannel: {}", e.what());
548 }
549 });
550
551 return true;
552}
bool DecodeH264BytesToCVMat(const std::vector< uint8_t > &vH264EncodedBytes, cv::Mat &cvDecodedFrame, const AVPixelFormat eOutputPixelFormat)
Decodes H264 encoded bytes to a cv::Mat using FFmpeg.
Definition WebRTC.cpp:625