diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c21d2d..895be54 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,3 +94,4 @@ add_subdirectory(simple_joystick_receiver) add_subdirectory(ping_pong_ping) add_subdirectory(ping_pong_pong) add_subdirectory(user_timestamped_video) +add_subdirectory(encoded-video-ingest) diff --git a/encoded-video-ingest/CMakeLists.txt b/encoded-video-ingest/CMakeLists.txt new file mode 100644 index 0000000..79ae191 --- /dev/null +++ b/encoded-video-ingest/CMakeLists.txt @@ -0,0 +1,40 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(EncodedVideoIngestProducer + producer.cpp +) + +target_include_directories(EncodedVideoIngestProducer PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(EncodedVideoIngestProducer PRIVATE ${LIVEKIT_CORE_TARGET}) + +if(WIN32) + target_link_libraries(EncodedVideoIngestProducer PRIVATE ws2_32) +endif() + +livekit_copy_windows_runtime_dlls(EncodedVideoIngestProducer) + +add_executable(EncodedVideoIngestConsumer + consumer.cpp + ../simple_room/sdl_video_renderer.cpp + ../simple_room/sdl_video_renderer.h +) + +target_include_directories(EncodedVideoIngestConsumer PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/../simple_room +) +target_link_libraries(EncodedVideoIngestConsumer PRIVATE ${LIVEKIT_CORE_TARGET} SDL3::SDL3) + +livekit_copy_windows_runtime_dlls(EncodedVideoIngestConsumer) diff --git a/encoded-video-ingest/README.md b/encoded-video-ingest/README.md new file mode 100644 index 0000000..09aac26 --- /dev/null +++ b/encoded-video-ingest/README.md @@ -0,0 +1,100 @@ +# encoded-video-ingest + +End-to-end encoded video ingest demo for the LiveKit C++ SDK. GStreamer serves +an encoded bytestream over TCP; the producer reads that stream, splits complete +frames, pushes them into an encoded `VideoSource`, and publishes a normal local +video track. The consumer subscribes to the decoded track and renders it with +SDL. + +This example intentionally does not use the removed `EncodedTcpIngest` helper. +TCP reconnect and bytestream framing are application code; the SDK surface is +`VideoSource(width, height, EncodedVideoSourceOptions{codec})` plus +`captureEncodedFrame`. + +## Prerequisites + +- A LiveKit server, such as `livekit-server --dev`. +- GStreamer 1.22+ with `good`, `bad`, `ugly`, and `libav` plugins. +- Producer and consumer tokens for the same room. + +```bash +export LIVEKIT_URL=ws://localhost:7880 +export PRODUCER_TOKEN="$(lk token create -r encoded-video-demo -i encoded-sender --join --publish)" +export CONSUMER_TOKEN="$(lk token create -r encoded-video-demo -i encoded-receiver --join --subscribe)" +``` + +## H.264 + +Start a camera pipeline that serves AUD-delimited Annex-B H.264 on TCP port +5005: + +```bash +gst-launch-1.0 -v \ + avfvideosrc device-index=0 ! \ + video/x-raw,width=640,height=480,format=NV12,framerate=30/1 ! \ + videoconvert ! \ + x264enc tune=zerolatency speed-preset=ultrafast bitrate=1000 key-int-max=60 aud=true ! \ + h264parse config-interval=1 ! \ + video/x-h264,stream-format=byte-stream,alignment=au ! \ + tcpserversink host=0.0.0.0 port=5005 sync=false async=false +``` + +Linux: replace `avfvideosrc device-index=0` with +`v4l2src device=/dev/video0`. Windows: use `mfvideosrc device-index=0`. + +Run the producer: + +```bash +LIVEKIT_URL=ws://localhost:7880 LIVEKIT_TOKEN="$PRODUCER_TOKEN" \ + ./build-release/cpp-example-collection/encoded-video-ingest/EncodedVideoIngestProducer \ + --tcp-host 127.0.0.1 --tcp-port 5005 \ + --width 640 --height 480 \ + --codec h264 +``` + +Run the consumer: + +```bash +LIVEKIT_URL=ws://localhost:7880 LIVEKIT_TOKEN="$CONSUMER_TOKEN" \ + ./build-release/cpp-example-collection/encoded-video-ingest/EncodedVideoIngestConsumer \ + --from encoded-sender \ + --track-name encoded-h264 +``` + +## Other Codecs + +H.265 uses the same Annex-B framing path: + +```bash +gst-launch-1.0 -v \ + avfvideosrc device-index=0 ! \ + video/x-raw,width=640,height=480,format=NV12,framerate=30/1 ! \ + videoconvert ! \ + x265enc tune=zerolatency speed-preset=ultrafast bitrate=1000 key-int-max=60 \ + option-string="aud=1:repeat-headers=1" ! \ + h265parse config-interval=1 ! \ + video/x-h265,stream-format=byte-stream,alignment=au ! \ + tcpserversink host=0.0.0.0 port=5005 sync=false async=false +``` + +Pass `--codec h265` and use `--track-name encoded-h265` on the consumer. + +VP8 and AV1 use IVF framing: + +```bash +gst-launch-1.0 -v \ + avfvideosrc device-index=0 ! \ + video/x-raw,width=640,height=480,format=NV12,framerate=30/1 ! \ + videoconvert ! \ + vp8enc deadline=1 target-bitrate=1000000 keyframe-max-dist=60 ! \ + ivfmux ! \ + tcpserversink host=0.0.0.0 port=5005 sync=false async=false +``` + +Pass `--codec vp8` and use `--track-name encoded-vp8`. For AV1, replace the +encoder with `av1enc`, `svtav1enc`, or `rav1enc`, pass `--codec av1`, and use +`--track-name encoded-av1`. + +VP9 is intentionally omitted from this example because the current passthrough +path does not yet provide the VP9 RTP descriptor plumbing needed for reliable +ingest. diff --git a/encoded-video-ingest/consumer.cpp b/encoded-video-ingest/consumer.cpp new file mode 100644 index 0000000..6ec34bf --- /dev/null +++ b/encoded-video-ingest/consumer.cpp @@ -0,0 +1,261 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "livekit/livekit.h" +#include "sdl_video_renderer.h" + +using namespace livekit; + +namespace { + +constexpr const char *kDefaultTrackName = "encoded-h264"; + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +std::string getenvOrEmpty(const char *name) { + const char *value = std::getenv(name); + return value ? std::string(value) : std::string{}; +} + +class MainThreadDispatcher { +public: + static void dispatch(std::function fn) { + std::lock_guard lock(mutex_); + queue_.push(std::move(fn)); + } + + static void update() { + std::queue> local; + { + std::lock_guard lock(mutex_); + std::swap(local, queue_); + } + + while (!local.empty()) { + local.front()(); + local.pop(); + } + } + +private: + static inline std::mutex mutex_; + static inline std::queue> queue_; +}; + +struct Args { + std::string url; + std::string token; + std::string from_identity = "encoded-sender"; + std::string track_name = kDefaultTrackName; + int window_width = 640; + int window_height = 480; +}; + +void printUsage(const char *program) { + std::cerr << "Usage:\n" + << " " << program << " [flags]\n" + << "or:\n" + << " LIVEKIT_URL=... LIVEKIT_TOKEN=... " << program + << " [flags]\n\n" + << "Flags:\n" + << " --from default: encoded-sender\n" + << " --track-name default: " << kDefaultTrackName + << "\n" + << " --width initial window width, default: 640\n" + << " --height initial window height, default: 480\n"; +} + +std::string takeValue(int &index, int argc, char *argv[]) { + if (index + 1 >= argc) { + throw std::invalid_argument(std::string("missing value for ") + + argv[index]); + } + ++index; + return argv[index]; +} + +bool parseArgs(int argc, char *argv[], Args &args) { + args.url = getenvOrEmpty("LIVEKIT_URL"); + args.token = getenvOrEmpty("LIVEKIT_TOKEN"); + std::vector positional; + + for (int i = 1; i < argc; ++i) { + const std::string arg = argv[i]; + if (arg == "-h" || arg == "--help") { + return false; + } + if (arg == "--from") { + args.from_identity = takeValue(i, argc, argv); + } else if (arg == "--track-name") { + args.track_name = takeValue(i, argc, argv); + } else if (arg == "--width") { + args.window_width = std::stoi(takeValue(i, argc, argv)); + } else if (arg == "--height") { + args.window_height = std::stoi(takeValue(i, argc, argv)); + } else { + positional.push_back(arg); + } + } + + if (positional.size() >= 2) { + args.url = positional[0]; + args.token = positional[1]; + } + + return !(args.url.empty() || args.token.empty()); +} + +class ConsumerDelegate : public RoomDelegate { +public: + ConsumerDelegate(const Args &args, SDLVideoRenderer &renderer) + : args_(args), renderer_(renderer) {} + + void onTrackSubscribed(Room &, const TrackSubscribedEvent &event) override { + if (!event.track || !event.participant || !event.publication) { + return; + } + if (event.track->kind() != TrackKind::KIND_VIDEO) { + return; + } + if (event.participant->identity() != args_.from_identity || + event.publication->name() != args_.track_name) { + return; + } + + VideoStream::Options options; + options.capacity = 1; + options.format = VideoBufferType::I420; + auto stream = VideoStream::fromTrack(event.track, options); + if (!stream) { + std::cerr << "[consumer] failed to create video stream for " + << args_.from_identity << " track=\"" << args_.track_name + << "\"\n"; + return; + } + + MainThreadDispatcher::dispatch([this, stream] { + renderer_.setStream(stream); + std::cout << "[consumer] rendering " << args_.from_identity + << " track=\"" << args_.track_name << "\"\n"; + }); + } + + void onTrackUnsubscribed(Room &, const TrackUnsubscribedEvent &event) override { + if (!event.participant || !event.publication) { + return; + } + if (event.participant->identity() != args_.from_identity || + event.publication->name() != args_.track_name) { + return; + } + + MainThreadDispatcher::dispatch([this] { + renderer_.setStream(nullptr); + std::cout << "[consumer] stopped rendering " << args_.from_identity + << " track=\"" << args_.track_name << "\"\n"; + }); + } + +private: + const Args &args_; + SDLVideoRenderer &renderer_; +}; + +} // namespace + +int main(int argc, char *argv[]) { + Args args; + try { + if (!parseArgs(argc, argv, args)) { + printUsage(argv[0]); + return 1; + } + } catch (const std::exception &error) { + std::cerr << "[consumer] argument error: " << error.what() << "\n"; + printUsage(argv[0]); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + if (!SDL_Init(SDL_INIT_VIDEO)) { + std::cerr << "[consumer] SDL_Init(SDL_INIT_VIDEO) failed: " + << SDL_GetError() << "\n"; + return 1; + } + + SDLVideoRenderer renderer; + if (!renderer.init("LiveKit Encoded Ingest", args.window_width, + args.window_height)) { + SDL_Quit(); + return 1; + } + + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + int exit_code = 0; + + { + Room room; + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + ConsumerDelegate delegate(args, renderer); + room.setDelegate(&delegate); + + std::cout << "[consumer] connecting to " << args.url << "\n"; + if (!room.Connect(args.url, args.token, options)) { + std::cerr << "[consumer] failed to connect\n"; + exit_code = 1; + } else { + std::cout << "[consumer] connected as " + << room.localParticipant()->identity() << " to room '" + << room.room_info().name << "'\n"; + + while (g_running.load(std::memory_order_relaxed)) { + MainThreadDispatcher::update(); + renderer.render(); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + } + } + + renderer.shutdown(); + livekit::shutdown(); + SDL_Quit(); + return exit_code; +} diff --git a/encoded-video-ingest/producer.cpp b/encoded-video-ingest/producer.cpp new file mode 100644 index 0000000..877c3eb --- /dev/null +++ b/encoded-video-ingest/producer.cpp @@ -0,0 +1,675 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// EncodedVideoIngestProducer +/// +/// Reads encoded frames from a GStreamer tcpserversink and publishes them via +/// VideoSource(..., EncodedVideoSourceOptions). The TCP reader/demuxing lives +/// in the example; the SDK only owns the encoded video source and WebRTC path. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(_WIN32) +#define NOMINMAX +#include +#include +#else +#include +#include +#include +#endif + +#include "livekit/livekit.h" + +using namespace livekit; + +namespace { + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +std::string getenvOrEmpty(const char *name) { + const char *value = std::getenv(name); + return value ? std::string(value) : std::string{}; +} + +enum class CodecArg { + H264, + H265, + VP8, + AV1, +}; + +struct Args { + std::string url; + std::string token; + std::string tcp_host = "127.0.0.1"; + std::uint16_t tcp_port = 5005; + std::uint32_t width = 640; + std::uint32_t height = 480; + std::uint64_t max_bitrate_bps = 1'000'000; + double max_framerate = 30.0; + CodecArg codec = CodecArg::H264; + std::optional track_name; +}; + +struct Stats { + std::uint64_t accepted = 0; + std::uint64_t dropped = 0; + std::uint64_t keyframes = 0; + std::uint64_t bytes = 0; +}; + +VideoCodec toVideoCodec(CodecArg codec) { + switch (codec) { + case CodecArg::H264: + return VideoCodec::H264; + case CodecArg::H265: + return VideoCodec::H265; + case CodecArg::VP8: + return VideoCodec::VP8; + case CodecArg::AV1: + return VideoCodec::AV1; + } + return VideoCodec::H264; +} + +const char *codecName(CodecArg codec) { + switch (codec) { + case CodecArg::H264: + return "H.264"; + case CodecArg::H265: + return "H.265"; + case CodecArg::VP8: + return "VP8"; + case CodecArg::AV1: + return "AV1"; + } + return "unknown"; +} + +std::string defaultTrackName(CodecArg codec) { + switch (codec) { + case CodecArg::H264: + return "encoded-h264"; + case CodecArg::H265: + return "encoded-h265"; + case CodecArg::VP8: + return "encoded-vp8"; + case CodecArg::AV1: + return "encoded-av1"; + } + return "encoded-video"; +} + +void printUsage(const char *program) { + std::cerr << "Usage:\n" + << " " << program << " [flags]\n" + << "or:\n" + << " LIVEKIT_URL=... LIVEKIT_TOKEN=... " << program + << " [flags]\n\n" + << "Flags:\n" + << " --tcp-host default: 127.0.0.1\n" + << " --tcp-port default: 5005\n" + << " --width default: 640\n" + << " --height default: 480\n" + << " --codec default: h264\n" + << " --track-name default: encoded-\n" + << " --max-bitrate-kbps default: 1000\n" + << " --max-framerate default: 30\n"; +} + +std::string takeValue(int &index, int argc, char *argv[]) { + if (index + 1 >= argc) { + throw std::invalid_argument(std::string("missing value for ") + + argv[index]); + } + ++index; + return argv[index]; +} + +CodecArg parseCodec(const std::string &value) { + if (value == "h264") { + return CodecArg::H264; + } + if (value == "h265") { + return CodecArg::H265; + } + if (value == "vp8") { + return CodecArg::VP8; + } + if (value == "av1") { + return CodecArg::AV1; + } + throw std::invalid_argument("unsupported codec: " + value); +} + +bool parseArgs(int argc, char *argv[], Args &args) { + args.url = getenvOrEmpty("LIVEKIT_URL"); + args.token = getenvOrEmpty("LIVEKIT_TOKEN"); + std::vector positional; + + for (int i = 1; i < argc; ++i) { + const std::string arg = argv[i]; + if (arg == "-h" || arg == "--help") { + return false; + } + if (arg == "--tcp-host") { + args.tcp_host = takeValue(i, argc, argv); + } else if (arg == "--tcp-port") { + args.tcp_port = + static_cast(std::stoul(takeValue(i, argc, argv))); + } else if (arg == "--width") { + args.width = + static_cast(std::stoul(takeValue(i, argc, argv))); + } else if (arg == "--height") { + args.height = + static_cast(std::stoul(takeValue(i, argc, argv))); + } else if (arg == "--codec") { + args.codec = parseCodec(takeValue(i, argc, argv)); + } else if (arg == "--track-name") { + args.track_name = takeValue(i, argc, argv); + } else if (arg == "--max-bitrate-kbps") { + args.max_bitrate_bps = + static_cast(std::stoull(takeValue(i, argc, argv))) * + 1000; + } else if (arg == "--max-framerate") { + args.max_framerate = std::stod(takeValue(i, argc, argv)); + } else { + positional.push_back(arg); + } + } + + if (positional.size() >= 2) { + args.url = positional[0]; + args.token = positional[1]; + } + + return !(args.url.empty() || args.token.empty()); +} + +std::optional> +findStartCode(const std::vector &data, std::size_t offset) { + for (std::size_t i = offset; i + 3 <= data.size(); ++i) { + if (data[i] == 0 && data[i + 1] == 0 && data[i + 2] == 1) { + return std::make_pair(i, 3); + } + if (i + 4 <= data.size() && data[i] == 0 && data[i + 1] == 0 && + data[i + 2] == 0 && data[i + 3] == 1) { + return std::make_pair(i, 4); + } + } + return std::nullopt; +} + +std::uint8_t nalType(CodecArg codec, std::uint8_t first_byte) { + if (codec == CodecArg::H264) { + return first_byte & 0x1F; + } + if (codec == CodecArg::H265) { + return (first_byte >> 1) & 0x3F; + } + return 0; +} + +bool isAud(CodecArg codec, std::uint8_t nal_type) { + return (codec == CodecArg::H264 && nal_type == 9) || + (codec == CodecArg::H265 && nal_type == 35); +} + +bool isAnnexBKeyframe(CodecArg codec, const std::vector &data) { + std::size_t offset = 0; + while (auto start = findStartCode(data, offset)) { + const std::size_t nal_offset = start->first + start->second; + if (nal_offset >= data.size()) { + break; + } + const std::uint8_t type = nalType(codec, data[nal_offset]); + if (codec == CodecArg::H264 && type == 5) { + return true; + } + if (codec == CodecArg::H265 && type >= 16 && type <= 23) { + return true; + } + offset = nal_offset + 1; + } + return false; +} + +bool readLeb128(const std::vector &data, std::size_t &offset, + std::uint32_t &value) { + value = 0; + for (int i = 0; i < 8 && offset < data.size(); ++i) { + const std::uint8_t byte = data[offset++]; + value |= static_cast(byte & 0x7F) << (i * 7); + if ((byte & 0x80) == 0) { + return true; + } + } + return false; +} + +bool isAv1Keyframe(const std::vector &data) { + std::size_t offset = 0; + while (offset < data.size()) { + const std::uint8_t header = data[offset++]; + const std::uint8_t obu_type = (header >> 3) & 0x0F; + const bool has_extension = (header & 0x04) != 0; + const bool has_size = (header & 0x02) != 0; + if (has_extension) { + if (offset >= data.size()) { + return false; + } + ++offset; + } + std::uint32_t payload_size = 0; + if (has_size) { + if (!readLeb128(data, offset, payload_size)) { + return false; + } + } else { + payload_size = static_cast(data.size() - offset); + } + if (obu_type == 1) { + return true; + } + offset += std::min(payload_size, data.size() - offset); + } + return false; +} + +bool isKeyframe(CodecArg codec, const std::vector &data) { + switch (codec) { + case CodecArg::H264: + case CodecArg::H265: + return isAnnexBKeyframe(codec, data); + case CodecArg::VP8: + return !data.empty() && ((data[0] & 0x01) == 0); + case CodecArg::AV1: + return isAv1Keyframe(data); + } + return false; +} + +class Demuxer { +public: + explicit Demuxer(CodecArg codec) : codec_(codec) {} + + void feed(const std::uint8_t *data, std::size_t size, + std::vector> &frames) { + buffer_.insert(buffer_.end(), data, data + size); + if (codec_ == CodecArg::H264 || codec_ == CodecArg::H265) { + feedAnnexB(frames); + } else { + feedIvf(frames); + } + } + +private: + void feedAnnexB(std::vector> &frames) { + std::size_t search = 0; + while (auto start = findStartCode(buffer_, search)) { + const std::size_t nal_offset = start->first + start->second; + if (nal_offset >= buffer_.size()) { + break; + } + const bool aud = isAud(codec_, nalType(codec_, buffer_[nal_offset])); + if (aud) { + if (au_start_ && start->first > *au_start_) { + frames.emplace_back(buffer_.begin() + static_cast(*au_start_), + buffer_.begin() + static_cast(start->first)); + buffer_.erase(buffer_.begin(), + buffer_.begin() + static_cast(start->first)); + au_start_ = 0; + search = start->second + 1; + continue; + } + au_start_ = start->first; + } else if (!au_start_) { + au_start_ = start->first; + } + search = nal_offset + 1; + } + } + + static std::uint32_t readLe32(const std::vector &data) { + return static_cast(data[0]) | + (static_cast(data[1]) << 8) | + (static_cast(data[2]) << 16) | + (static_cast(data[3]) << 24); + } + + void feedIvf(std::vector> &frames) { + if (!ivf_header_checked_) { + if (buffer_.size() >= 32 && std::memcmp(buffer_.data(), "DKIF", 4) == 0) { + buffer_.erase(buffer_.begin(), buffer_.begin() + 32); + } + ivf_header_checked_ = true; + } + + while (buffer_.size() >= 12) { + const std::uint32_t frame_size = readLe32(buffer_); + if (frame_size == 0 || frame_size > 16 * 1024 * 1024) { + throw std::runtime_error("invalid IVF frame size"); + } + if (buffer_.size() < static_cast(12 + frame_size)) { + return; + } + frames.emplace_back(buffer_.begin() + 12, + buffer_.begin() + 12 + frame_size); + buffer_.erase(buffer_.begin(), buffer_.begin() + 12 + frame_size); + } + } + + CodecArg codec_; + std::vector buffer_; + std::optional au_start_; + bool ivf_header_checked_ = false; +}; + +#if defined(_WIN32) +using SocketHandle = SOCKET; +constexpr SocketHandle kInvalidSocket = INVALID_SOCKET; + +void closeSocket(SocketHandle socket) { closesocket(socket); } +#else +using SocketHandle = int; +constexpr SocketHandle kInvalidSocket = -1; + +void closeSocket(SocketHandle socket) { close(socket); } +#endif + +class NetworkRuntime { +public: + NetworkRuntime() { +#if defined(_WIN32) + WSADATA data{}; + if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { + throw std::runtime_error("WSAStartup failed"); + } +#endif + } + + ~NetworkRuntime() { +#if defined(_WIN32) + WSACleanup(); +#endif + } +}; + +class TcpSocket { +public: + TcpSocket() = default; + explicit TcpSocket(SocketHandle socket) : socket_(socket) {} + ~TcpSocket() { reset(); } + + TcpSocket(const TcpSocket &) = delete; + TcpSocket &operator=(const TcpSocket &) = delete; + + TcpSocket(TcpSocket &&other) noexcept : socket_(other.socket_) { + other.socket_ = kInvalidSocket; + } + + TcpSocket &operator=(TcpSocket &&other) noexcept { + if (this != &other) { + reset(); + socket_ = other.socket_; + other.socket_ = kInvalidSocket; + } + return *this; + } + + int read(std::uint8_t *buffer, int length) { +#if defined(_WIN32) + return recv(socket_, reinterpret_cast(buffer), length, 0); +#else + return static_cast(recv(socket_, buffer, static_cast(length), 0)); +#endif + } + + explicit operator bool() const noexcept { return socket_ != kInvalidSocket; } + +private: + void reset() { + if (socket_ != kInvalidSocket) { + closeSocket(socket_); + socket_ = kInvalidSocket; + } + } + + SocketHandle socket_ = kInvalidSocket; +}; + +TcpSocket connectTcp(const std::string &host, std::uint16_t port) { + addrinfo hints{}; + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = AF_UNSPEC; + + addrinfo *results = nullptr; + const std::string service = std::to_string(port); + const int rc = getaddrinfo(host.c_str(), service.c_str(), &hints, &results); + if (rc != 0) { + throw std::runtime_error("getaddrinfo failed for " + host); + } + + for (addrinfo *addr = results; addr != nullptr; addr = addr->ai_next) { + SocketHandle socket = + ::socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (socket == kInvalidSocket) { + continue; + } + if (::connect(socket, addr->ai_addr, + static_cast(addr->ai_addrlen)) == 0) { + freeaddrinfo(results); + return TcpSocket(socket); + } + closeSocket(socket); + } + + freeaddrinfo(results); + throw std::runtime_error("connect failed for " + host + ":" + service); +} + +class LoggingObserver : public EncodedVideoSourceObserver { +public: + explicit LoggingObserver(std::atomic &target_bitrate_bps) + : target_bitrate_bps_(target_bitrate_bps) {} + + void onKeyframeRequested() override { + std::cout << "[producer] keyframe requested by WebRTC\n"; + } + + void onTargetBitrate(std::uint32_t bitrate_bps, + double framerate_fps) override { + target_bitrate_bps_.store(bitrate_bps, std::memory_order_relaxed); + std::cout << "[producer] target bitrate " << (bitrate_bps / 1000) + << " kbps @ " << framerate_fps << " fps\n"; + } + +private: + std::atomic &target_bitrate_bps_; +}; + +} // namespace + +int main(int argc, char *argv[]) { + Args args; + try { + if (!parseArgs(argc, argv, args)) { + printUsage(argv[0]); + return 1; + } + } catch (const std::exception &error) { + std::cerr << "[producer] argument error: " << error.what() << "\n"; + printUsage(argv[0]); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + try { + NetworkRuntime network; + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + + Room room; + RoomOptions room_options; + room_options.auto_subscribe = false; + room_options.dynacast = false; + + std::cout << "[producer] connecting to " << args.url << "\n"; + if (!room.Connect(args.url, args.token, room_options)) { + std::cerr << "[producer] failed to connect\n"; + livekit::shutdown(); + return 1; + } + + std::cout << "[producer] connected as " + << room.localParticipant()->identity() << " to room '" + << room.room_info().name << "'\n"; + + auto source = std::make_shared( + static_cast(args.width), static_cast(args.height), + EncodedVideoSourceOptions{toVideoCodec(args.codec)}); + std::atomic target_bitrate_bps{0}; + source->setEncodedObserver( + std::make_shared(target_bitrate_bps)); + + const std::string track_name = + args.track_name.value_or(defaultTrackName(args.codec)); + auto track = LocalVideoTrack::createLocalVideoTrack(track_name, source); + + TrackPublishOptions publish_options; + publish_options.source = TrackSource::SOURCE_CAMERA; + publish_options.simulcast = false; + publish_options.video_codec = toVideoCodec(args.codec); + publish_options.video_encoding = + VideoEncodingOptions{args.max_bitrate_bps, args.max_framerate}; + room.localParticipant()->publishTrack(track, publish_options); + + std::cout << "[producer] published " << codecName(args.codec) + << " track name=\"" << track_name << "\" at " << args.width + << "x" << args.height << "\n"; + + Stats total; + Stats last; + auto last_log = std::chrono::steady_clock::now(); + std::vector read_buffer(64 * 1024); + std::vector> frames; + + while (g_running.load(std::memory_order_relaxed)) { + const std::string endpoint = + args.tcp_host + ":" + std::to_string(args.tcp_port); + try { + std::cout << "[producer] connecting to " << endpoint << " for " + << codecName(args.codec) + << ((args.codec == CodecArg::H264 || + args.codec == CodecArg::H265) + ? " Annex-B" + : " IVF") + << " bytestream\n"; + TcpSocket socket = connectTcp(args.tcp_host, args.tcp_port); + std::cout << "[producer] connected to " << endpoint << "\n"; + + Demuxer demuxer(args.codec); + while (g_running.load(std::memory_order_relaxed)) { + const int n = socket.read(read_buffer.data(), + static_cast(read_buffer.size())); + if (n <= 0) { + std::cerr << "[producer] TCP stream closed\n"; + break; + } + + frames.clear(); + demuxer.feed(read_buffer.data(), static_cast(n), + frames); + for (const auto &frame : frames) { + const bool keyframe = isKeyframe(args.codec, frame); + EncodedVideoFrameInfo info; + info.is_keyframe = keyframe; + info.has_sps_pps = false; + info.width = args.width; + info.height = args.height; + info.capture_time_us = 0; + + total.bytes += frame.size(); + if (keyframe) { + ++total.keyframes; + } + if (source->captureEncodedFrame(frame, info)) { + ++total.accepted; + } else { + ++total.dropped; + } + } + + const auto now = std::chrono::steady_clock::now(); + const double elapsed = + std::chrono::duration(now - last_log).count(); + if (elapsed >= 2.0) { + const std::uint64_t accepted = total.accepted - last.accepted; + const std::uint64_t dropped = total.dropped - last.dropped; + const std::uint64_t keyframes = total.keyframes - last.keyframes; + const std::uint64_t bytes = total.bytes - last.bytes; + if (accepted + dropped > 0) { + const double kbps = + static_cast(bytes) * 8.0 / elapsed / 1000.0; + std::cout << "[producer] ingest: " + << (static_cast(accepted) / elapsed) + << " fps accepted, " + << (static_cast(dropped) / elapsed) + << " fps dropped, " << kbps + << " kbps encoded (target " + << target_bitrate_bps.load(std::memory_order_relaxed) / + 1000 + << " kbps), " << keyframes << " keyframes\n"; + } + last = total; + last_log = now; + } + } + } catch (const std::exception &error) { + std::cerr << "[producer] " << error.what() << "; retrying in 1s\n"; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + livekit::shutdown(); + return 0; + } catch (const std::exception &error) { + std::cerr << "[producer] fatal error: " << error.what() << "\n"; + livekit::shutdown(); + return 1; + } +} diff --git a/simple_room/sdl_video_renderer.cpp b/simple_room/sdl_video_renderer.cpp index 3554ccf..4c23b37 100644 --- a/simple_room/sdl_video_renderer.cpp +++ b/simple_room/sdl_video_renderer.cpp @@ -17,13 +17,20 @@ #include "sdl_video_renderer.h" #include "livekit/livekit.h" -#include #include +#include using namespace livekit; constexpr int kMaxFPS = 60; +SDL_PixelFormat textureFormatFor(livekit::VideoBufferType type) { + if (type == livekit::VideoBufferType::I420) { + return SDL_PIXELFORMAT_IYUV; + } + return SDL_PIXELFORMAT_RGBA32; +} + SDLVideoRenderer::SDLVideoRenderer() = default; SDLVideoRenderer::~SDLVideoRenderer() { shutdown(); } @@ -45,22 +52,24 @@ bool SDLVideoRenderer::init(const char *title, int width, int height) { return false; } - // Note, web will send out BGRA as default, and we can't use ARGB since ffi - // does not support converting from BGRA to ARGB. - texture_ = SDL_CreateTexture(renderer_, SDL_PIXELFORMAT_RGBA8888, + texture_ = SDL_CreateTexture(renderer_, SDL_PIXELFORMAT_RGBA32, SDL_TEXTUREACCESS_STREAMING, width_, height_); if (!texture_) { std::cerr << "[error] SDL_CreateTexture failed: " << SDL_GetError() << "\n"; return false; } + texture_format_ = SDL_PIXELFORMAT_RGBA32; return true; } void SDLVideoRenderer::shutdown() { + stopReader(); + if (texture_) { SDL_DestroyTexture(texture_); texture_ = nullptr; + texture_format_ = SDL_PIXELFORMAT_UNKNOWN; } if (renderer_) { SDL_DestroyRenderer(renderer_); @@ -71,11 +80,45 @@ void SDLVideoRenderer::shutdown() { window_ = nullptr; } - stream_.reset(); + { + std::lock_guard lock(latest_frame_lock_); + latest_frame_.reset(); + } } void SDLVideoRenderer::setStream(std::shared_ptr stream) { + stopReader(); stream_ = std::move(stream); + if (!stream_) { + return; + } + + reader_running_.store(true, std::memory_order_relaxed); + reader_thread_ = std::thread(&SDLVideoRenderer::readerLoop, this, stream_); +} + +void SDLVideoRenderer::stopReader() { + reader_running_.store(false, std::memory_order_relaxed); + if (stream_) { + stream_->close(); + } + if (reader_thread_.joinable()) { + reader_thread_.join(); + } + stream_.reset(); +} + +void SDLVideoRenderer::readerLoop( + std::shared_ptr stream) { + while (reader_running_.load(std::memory_order_relaxed)) { + auto event = std::make_unique(); + if (!stream->read(*event)) { + break; + } + + std::lock_guard lock(latest_frame_lock_); + latest_frame_ = std::move(event); + } } void SDLVideoRenderer::render() { @@ -92,11 +135,6 @@ void SDLVideoRenderer::render() { } } - // 2) If no stream, nothing to render - if (!stream_) { - return; - } - // Throttle rendering to kMaxFPS const auto now = std::chrono::steady_clock::now(); if (last_render_time_.time_since_epoch().count() != 0) { @@ -105,70 +143,115 @@ void SDLVideoRenderer::render() { return; } } - last_render_time_ = now; + std::unique_ptr latest_frame; + { + std::lock_guard lock(latest_frame_lock_); + latest_frame.swap(latest_frame_); + } - // 3) Read a frame from VideoStream (blocking until one is available) - livekit::VideoFrameEvent vfe; - bool gotFrame = stream_->read(vfe); - if (!gotFrame) { - // EOS / closed – nothing more to render + if (!latest_frame) { return; } + last_render_time_ = now; - livekit::VideoFrame &frame = vfe.frame; + livekit::VideoFrame &frame = latest_frame->frame; - // 4) Ensure the frame is RGBA. - // Ideally you requested RGBA from VideoStream::Options so this is a no-op. - if (frame.type() != livekit::VideoBufferType::RGBA) { + if (frame.type() != livekit::VideoBufferType::RGBA && + frame.type() != livekit::VideoBufferType::I420) { try { frame = frame.convert(livekit::VideoBufferType::RGBA, false); } catch (const std::exception &ex) { - std::cerr << "[error] SDLVideoRenderer: convert to RGBA failed: " << ex.what() << "\n"; + std::cerr << "[error] SDLVideoRenderer: convert to RGBA failed: " + << ex.what() << "\n"; return; } } - // Handle size change: recreate texture if needed - if (frame.width() != width_ || frame.height() != height_) { + const SDL_PixelFormat frame_texture_format = textureFormatFor(frame.type()); + if (frame.width() != width_ || frame.height() != height_ || + frame_texture_format != texture_format_) { width_ = frame.width(); height_ = frame.height(); + texture_format_ = frame_texture_format; if (texture_) { SDL_DestroyTexture(texture_); texture_ = nullptr; } - texture_ = SDL_CreateTexture( - renderer_, - SDL_PIXELFORMAT_RGBA32, // Note, SDL_PIXELFORMAT_RGBA8888 is not - // compatible with Livekit RGBA format. - SDL_TEXTUREACCESS_STREAMING, width_, height_); + texture_ = SDL_CreateTexture(renderer_, texture_format_, + SDL_TEXTUREACCESS_STREAMING, width_, height_); if (!texture_) { - std::cerr << "[error] SDLVideoRenderer: SDL_CreateTexture failed: " << SDL_GetError() << "\n"; + std::cerr << "[error] SDLVideoRenderer: SDL_CreateTexture failed: " + << SDL_GetError() << "\n"; return; } } - // 6) Upload RGBA data to SDL texture - void *pixels = nullptr; - int pitch = 0; - if (!SDL_LockTexture(texture_, nullptr, &pixels, &pitch)) { - std::cerr << "[error] SDLVideoRenderer: SDL_LockTexture failed: " << SDL_GetError() << "\n"; - return; - } + if (frame.type() == livekit::VideoBufferType::I420) { + const int chroma_width = (frame.width() + 1) / 2; + const int chroma_height = (frame.height() + 1) / 2; + const std::size_t y_size = + static_cast(frame.width()) * frame.height(); + const std::size_t chroma_size = + static_cast(chroma_width) * chroma_height; + if (frame.dataSize() < y_size + 2 * chroma_size) { + std::cerr << "[error] SDLVideoRenderer: I420 frame buffer is too small\n"; + return; + } + const std::uint8_t *y_plane = frame.data(); + const std::uint8_t *u_plane = y_plane + y_size; + const std::uint8_t *v_plane = u_plane + chroma_size; + if (!SDL_UpdateYUVTexture(texture_, nullptr, y_plane, frame.width(), + u_plane, chroma_width, v_plane, + chroma_width)) { + std::cerr << "[error] SDLVideoRenderer: SDL_UpdateYUVTexture failed: " + << SDL_GetError() << "\n"; + return; + } + } else { + void *pixels = nullptr; + int pitch = 0; + if (!SDL_LockTexture(texture_, nullptr, &pixels, &pitch)) { + std::cerr << "[error] SDLVideoRenderer: SDL_LockTexture failed: " + << SDL_GetError() << "\n"; + return; + } + + const std::uint8_t *src = frame.data(); + const int src_pitch = frame.width() * 4; - const std::uint8_t *src = frame.data(); - const int srcPitch = frame.width() * 4; // RGBA: 4 bytes per pixel + for (int y = 0; y < frame.height(); ++y) { + std::memcpy(static_cast(pixels) + y * pitch, + src + y * src_pitch, src_pitch); + } - for (int y = 0; y < frame.height(); ++y) { - std::memcpy(static_cast(pixels) + y * pitch, - src + y * srcPitch, srcPitch); + SDL_UnlockTexture(texture_); } - SDL_UnlockTexture(texture_); + int window_width = width_; + int window_height = height_; + SDL_GetWindowSize(window_, &window_width, &window_height); + + const float frame_aspect = + static_cast(frame.width()) / static_cast(frame.height()); + const float window_aspect = + static_cast(window_width) / static_cast(window_height); + SDL_FRect destination{}; + if (window_aspect > frame_aspect) { + destination.h = static_cast(window_height); + destination.w = destination.h * frame_aspect; + destination.x = (static_cast(window_width) - destination.w) / 2.0F; + destination.y = 0.0F; + } else { + destination.w = static_cast(window_width); + destination.h = destination.w / frame_aspect; + destination.x = 0.0F; + destination.y = + (static_cast(window_height) - destination.h) / 2.0F; + } - // 7) Present SDL_SetRenderDrawColor(renderer_, 0, 0, 0, 255); SDL_RenderClear(renderer_); - SDL_RenderTexture(renderer_, texture_, nullptr, nullptr); + SDL_RenderTexture(renderer_, texture_, nullptr, &destination); SDL_RenderPresent(renderer_); } diff --git a/simple_room/sdl_video_renderer.h b/simple_room/sdl_video_renderer.h index fb0d41e..6efa7f4 100644 --- a/simple_room/sdl_video_renderer.h +++ b/simple_room/sdl_video_renderer.h @@ -19,10 +19,12 @@ #include #include #include +#include #include namespace livekit { class VideoStream; +struct VideoFrameEvent; } class SDLVideoRenderer { @@ -45,9 +47,17 @@ class SDLVideoRenderer { SDL_Window *window_ = nullptr; SDL_Renderer *renderer_ = nullptr; SDL_Texture *texture_ = nullptr; + SDL_PixelFormat texture_format_ = SDL_PIXELFORMAT_UNKNOWN; std::shared_ptr stream_; + std::unique_ptr latest_frame_; + std::mutex latest_frame_lock_; + std::thread reader_thread_; + std::atomic reader_running_{false}; int width_ = 0; int height_ = 0; std::chrono::steady_clock::time_point last_render_time_{}; + + void stopReader(); + void readerLoop(std::shared_ptr stream); };