178{
179
180 m_pWebSocket->open(szSignallingServerURL);
181
183
185
186
187 m_pWebSocket->onOpen(
188 [this]()
189 {
190
191 LOG_INFO(logging::g_qSharedLogger, "Connected to the signalling server via {}. Checking if stream {} exists...", m_szSignallingServerURL, m_szStreamerID);
192
193
194 nlohmann::json jsnStreamList;
195 jsnStreamList["type"] = "listStreamers";
196 m_pWebSocket->send(jsnStreamList.dump());
197 });
198
199
200 m_pWebSocket->onClosed(
201 [this]()
202 {
203
204 LOG_INFO(logging::g_qSharedLogger, "Closed {} stream and disconnected from the signalling server.", m_szStreamerID);
205 });
206
207
208 m_pWebSocket->onMessage(
209 [this](std::variant<rtc::binary, rtc::string> rtcMessage)
210 {
211 try
212 {
213
214 nlohmann::json jsnMessage;
215
216
217 if (std::holds_alternative<rtc::string>(rtcMessage))
218 {
219
220 std::string szMessage = std::get<rtc::string>(rtcMessage);
221
222
223 jsnMessage = nlohmann::json::parse(szMessage);
224 LOG_DEBUG(logging::g_qSharedLogger, "Received message from signalling server: {}", szMessage);
225 }
226 else if (std::holds_alternative<rtc::binary>(rtcMessage))
227 {
228
229 rtc::binary rtcBinaryData = std::get<rtc::binary>(rtcMessage);
230
231 LOG_DEBUG(logging::g_qSharedLogger, "Received binary data of length: {}", rtcBinaryData.size());
232
233
234 std::string szBinaryDataStr(reinterpret_cast<const char*>(rtcBinaryData.data()), rtcBinaryData.size());
235
236 LOG_DEBUG(logging::g_qSharedLogger, "Received binary data: {}", szBinaryDataStr);
237
238 jsnMessage = nlohmann::json::parse(szBinaryDataStr);
239 }
240 else
241 {
242 LOG_ERROR(logging::g_qSharedLogger, "Received unknown message type from signalling server");
243 }
244
245
246 if (jsnMessage.contains("type"))
247 {
248 std::string szType = jsnMessage["type"];
249
250 if (szType == "config")
251 {
252
253 LOG_DEBUG(logging::g_qSharedLogger, "Received config message from signalling server: {}", jsnMessage.dump());
254 }
255
256 else if (szType == "offer")
257 {
258
259 std::string sdp = jsnMessage["sdp"];
260 m_pPeerConnection->setRemoteDescription(rtc::Description(sdp, "offer"));
261 LOG_DEBUG(logging::g_qSharedLogger, "Processing SDP offer from signalling server: {}", sdp);
262 }
263
264 else if (szType == "answer")
265 {
266
267 std::string sdp = jsnMessage["sdp"];
268 m_pPeerConnection->setRemoteDescription(rtc::Description(sdp, "answer"));
269 LOG_DEBUG(logging::g_qSharedLogger, "Processing SDP answer from signalling server: {}", sdp);
270 }
271
272 else if (szType == "iceCandidate")
273 {
274
275 nlohmann::json jsnCandidate = jsnMessage["candidate"];
276 std::string szCandidateStr = jsnCandidate["candidate"];
277
278 rtc::Candidate rtcCandidate = rtc::Candidate(szCandidateStr);
279 m_pPeerConnection->addRemoteCandidate(rtcCandidate);
280 LOG_DEBUG(logging::g_qSharedLogger, "Added ICE candidate to peer connection: {}", szCandidateStr);
281 }
282 else if (szType == "streamerList")
283 {
284
285 LOG_DEBUG(logging::g_qSharedLogger, "Streamer List: {}", jsnMessage.dump());
286
287
288 if (jsnMessage.contains("ids"))
289 {
290 std::vector<std::string> streamerList = jsnMessage["ids"].get<std::vector<std::string>>();
291 if (std::find(streamerList.begin(), streamerList.end(), m_szStreamerID) != streamerList.end())
292 {
293
294 nlohmann::json jsnStream;
295 jsnStream["type"] = "subscribe";
296 jsnStream["streamerId"] = m_szStreamerID;
297 m_pWebSocket->send(jsnStream.dump());
298
299 LOG_DEBUG(logging::g_qSharedLogger, "Streamer ID {} found in the streamer list. Subscribing to stream...", m_szStreamerID);
300 }
301 else
302 {
303 LOG_ERROR(logging::g_qSharedLogger, "Streamer ID {} not found in the streamer list!", m_szStreamerID);
304 }
305 }
306 else
307 {
308 LOG_ERROR(logging::g_qSharedLogger, "Streamer list does not contain 'ids' field!");
309 }
310 }
311 else
312 {
313 LOG_ERROR(logging::g_qSharedLogger, "Unknown message type received from signalling server: {}", szType);
314 }
315 }
316 }
317 catch (const std::exception& e)
318 {
319
320 LOG_ERROR(logging::g_qSharedLogger, "Error occurred while negotiating with the Signalling Server: {}", e.what());
321 }
322 });
323
324 m_pWebSocket->onError(
325 [this](const std::string& szError)
326 {
327
328 LOG_ERROR(logging::g_qSharedLogger, "Error occurred on WebSocket: {}", szError);
329 });
330
332
334
335 m_pPeerConnection->onLocalDescription(
336 [this](rtc::Description rtcDescription)
337 {
338
339 if (rtcDescription.typeString() == "offer")
340 {
341 return;
342 }
343
344
345 nlohmann::json jsnConfigMessage;
346 jsnConfigMessage["type"] = "layerPreference";
347 jsnConfigMessage["spatialLayer"] = 0;
348 jsnConfigMessage["temporalLayer"] = 0;
349 jsnConfigMessage["playerId"] = "";
350 m_pWebSocket->send(jsnConfigMessage.dump());
351
352
353 nlohmann::json jsnMessage;
354 jsnMessage["type"] = rtcDescription.typeString();
355
356 jsnMessage["minBitrateBps"] = 0;
357 jsnMessage["maxBitrateBps"] = 0;
358
359 std::string szSDP = rtcDescription.generateSdp();
360
361 std::string szMungedSDP =
362 std::regex_replace(szSDP, std::regex("(a=fmtp:\\d+ level-asymmetry-allowed=.*)\r\n"), "$1;x-google-start-bitrate=10000;x-google-max-bitrate=100000\r\n");
363 jsnMessage["sdp"] = szMungedSDP;
364
365 m_pWebSocket->send(jsnMessage.dump());
366
367
368 LOG_DEBUG(logging::g_qSharedLogger, "Sending local description to signalling server: {}", jsnMessage.dump());
369 });
370
371 m_pPeerConnection->onTrack(
372 [this](std::shared_ptr<rtc::Track> rtcTrack)
373 {
374
375 rtc::Description::Media rtcMediaDescription = rtcTrack->description();
376
377 std::string szMediaType = rtcMediaDescription.type();
378
379
380 if (szMediaType != "video")
381 {
382 return;
383 }
384
385
386 m_pVideoTrack1 = rtcTrack;
387
388
389 m_pTrack1H264DepacketizationHandler = std::make_shared<rtc::H264RtpDepacketizer>(rtc::NalUnit::Separator::LongStartSequence);
390 m_pTrack1RtcpReceivingSession = std::make_shared<rtc::RtcpReceivingSession>();
391 m_pTrack1H264DepacketizationHandler->addToChain(m_pTrack1RtcpReceivingSession);
392 m_pVideoTrack1->setMediaHandler(m_pTrack1H264DepacketizationHandler);
393
394
395 m_pVideoTrack1->onFrame(
396 [this](rtc::binary rtcBinaryMessage, rtc::FrameInfo rtcFrameInfo)
397 {
398
399 if (rtcFrameInfo.payloadType == 96)
400 {
401
402 std::vector<uint8_t> vH264EncodedBytes;
403 vH264EncodedBytes.reserve(rtcBinaryMessage.size());
404 for (std::byte stdByte : rtcBinaryMessage)
405 {
406 vH264EncodedBytes.push_back(static_cast<uint8_t>(stdByte));
407 }
408
409
410 std::unique_lock<std::shared_mutex> lkDecoderLock(m_muDecoderMutex);
411
413
414
415 if (m_fnOnFrameReceivedCallback)
416 {
417
418 m_fnOnFrameReceivedCallback(m_cvFrame);
419 }
420
421 lkDecoderLock.unlock();
422 }
423 });
424 });
425
426 m_pPeerConnection->onGatheringStateChange(
427 [this](rtc::PeerConnection::GatheringState eGatheringState)
428 {
429
430 switch (eGatheringState)
431 {
432 case rtc::PeerConnection::GatheringState::Complete: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE gathering state changed to: Complete"); break;
433
434 case rtc::PeerConnection::GatheringState::InProgress:
435 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE gathering state changed to: InProgress");
436 break;
437 case rtc::PeerConnection::GatheringState::New: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE gathering state changed to: New"); break;
438 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection ICE gathering state changed to: Unknown"); break;
439 }
440 });
441
442 m_pPeerConnection->onIceStateChange(
443 [this](rtc::PeerConnection::IceState eIceState)
444 {
445
446 switch (eIceState)
447 {
448 case rtc::PeerConnection::IceState::Checking: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Checking"); break;
449 case rtc::PeerConnection::IceState::Closed: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Closed"); break;
450 case rtc::PeerConnection::IceState::Completed: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Completed"); break;
451 case rtc::PeerConnection::IceState::Connected: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Connected"); break;
452 case rtc::PeerConnection::IceState::Disconnected: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Disconnected"); break;
453 case rtc::PeerConnection::IceState::Failed: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: Failed"); break;
454 case rtc::PeerConnection::IceState::New: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection ICE state changed to: New"); break;
455 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection ICE state changed to: Unknown"); break;
456 }
457 });
458 m_pPeerConnection->onSignalingStateChange(
459 [this](rtc::PeerConnection::SignalingState eSignalingState)
460 {
461
462 switch (eSignalingState)
463 {
464 case rtc::PeerConnection::SignalingState::HaveLocalOffer:
465 {
466 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveLocalOffer");
467 break;
468 }
469 case rtc::PeerConnection::SignalingState::HaveLocalPranswer:
470 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveLocalPranswer");
471 break;
472 case rtc::PeerConnection::SignalingState::HaveRemoteOffer:
473 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveRemoteOffer");
474 break;
475 case rtc::PeerConnection::SignalingState::HaveRemotePranswer:
476 LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: HaveRemotePrAnswer");
477 break;
478 case rtc::PeerConnection::SignalingState::Stable: LOG_DEBUG(logging::g_qSharedLogger, "PeerConnection signaling state changed to: Stable"); break;
479 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection signaling state changed to: Unknown"); break;
480 }
481 });
482
483 m_pPeerConnection->onStateChange(
484 [this](rtc::PeerConnection::State eState)
485 {
486
487 switch (eState)
488 {
489 case rtc::PeerConnection::State::Closed: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Closed"); break;
490 case rtc::PeerConnection::State::Connected: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Connected"); break;
491 case rtc::PeerConnection::State::Connecting: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Connecting"); break;
492 case rtc::PeerConnection::State::Disconnected: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Disconnected"); break;
493 case rtc::PeerConnection::State::Failed: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Failed"); break;
494 case rtc::PeerConnection::State::New: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: New"); break;
495 default: LOG_DEBUG(logging::g_qSharedLogger, "Peer connection state changed to: Unknown"); break;
496 }
497 });
498
500
502
503 m_pDataChannel->onOpen(
504 [this]()
505 {
506
507 LOG_INFO(logging::g_qSharedLogger, "Data channel opened.");
508
509
510 m_pDataChannel->send(std::string(1, static_cast<char>(1)));
511 });
512
513 m_pDataChannel->onMessage(
514 [this](std::variant<rtc::binary, rtc::string> rtcMessage)
515 {
516 try
517 {
518
519 nlohmann::json jsnMessage;
520
521
522 if (std::holds_alternative<rtc::string>(rtcMessage))
523 {
524
525 std::string szMessage = std::get<rtc::string>(rtcMessage);
526
527
528 jsnMessage = nlohmann::json::parse(szMessage);
529 LOG_NOTICE(logging::g_qSharedLogger, "DATA_CHANNEL Received message from peer: {}", szMessage);
530 }
531 else if (std::holds_alternative<rtc::binary>(rtcMessage))
532 {
533
534 rtc::binary rtcBinaryData = std::get<rtc::binary>(rtcMessage);
535
536
537 std::string szBinaryDataStr;
538 for (auto byte : rtcBinaryData)
539 {
540 if (std::isprint(static_cast<unsigned char>(byte)))
541 {
542 szBinaryDataStr += static_cast<char>(byte);
543 }
544 }
545
546
547 LOG_DEBUG(logging::g_qSharedLogger, "DATA_CHANNEL Received binary data ({} bytes): {}", rtcBinaryData.size(), szBinaryDataStr);
548 }
549 else
550 {
551 LOG_ERROR(logging::g_qSharedLogger, "Received unknown message type from peer");
552 }
553 }
554 catch (const std::exception& e)
555 {
556
557 LOG_ERROR(logging::g_qSharedLogger, "Error occurred while negotiating with the datachannel: {}", e.what());
558 }
559 });
560
561 return true;
562}
bool DecodeH264BytesToCVMat(const std::vector< uint8_t > &vH264EncodedBytes, cv::Mat &cvDecodedFrame, const AVPixelFormat eOutputPixelFormat)
Decodes H264 encoded bytes to a cv::Mat using FFmpeg.
Definition WebRTC.cpp:635