diff --git a/cmd/camera-streamer/opts.c b/cmd/camera-streamer/opts.c index 584bbc9..b55f8f4 100644 --- a/cmd/camera-streamer/opts.c +++ b/cmd/camera-streamer/opts.c @@ -52,6 +52,7 @@ log_options_t log_options = { rtsp_options_t rtsp_options = { .port = 0, + .allow_truncated = false }; webrtc_options_t webrtc_options = { diff --git a/output/rtsp/rtsp.cc b/output/rtsp/rtsp.cc index 9fe2f59..1746a76 100644 --- a/output/rtsp/rtsp.cc +++ b/output/rtsp/rtsp.cc @@ -41,33 +41,47 @@ class DynamicH264Stream : public FramedSource { public: DynamicH264Stream(UsageEnvironment& env) - : FramedSource(env), fHaveStartedReading(False), fFrameWaiting(False) + : FramedSource(env), fHaveStartedReading(False), locked_buf(NULL), locked_buf_offset(0) { } void doGetNextFrame() { - std::unique_lock lk(rtsp_streams_lock); - if (!fHaveStartedReading) { - rtsp_streams.insert(this); - fHaveStartedReading = True; + { + std::unique_lock streams_lock(rtsp_streams_lock); + if (!fHaveStartedReading) { + rtsp_streams.insert(this); + fHaveStartedReading = True; + } } + + send_buffer(); } void doStopGettingFrames() { - std::unique_lock lk(rtsp_streams_lock); - rtsp_streams.erase(this); - fHaveStartedReading = false; + { + std::unique_lock streams_lock(rtsp_streams_lock); + rtsp_streams.erase(this); + fHaveStartedReading = false; + } + + set_buffer(NULL); } - void receiveData(buffer_t *buf) + void receive_buf(buffer_t *buf) { - if (!isCurrentlyAwaitingData() || fFrameWaiting) { + std::unique_lock lk(lock); + + if (!fHaveStartedReading) { + return; + } + + if (locked_buf || !isCurrentlyAwaitingData()) { if (rtsp_options) { rtsp_options->dropped++; } - return; // we're not ready for the data yet + return; } if (!fHadKeyFrame) { @@ -86,37 +100,62 @@ public: } rtsp_options->frames++; - - if (buf->used > fMaxSize) { - fNumTruncatedBytes = buf->used - fMaxSize; - fFrameSize = fMaxSize; - if (rtsp_options) { - rtsp_options->truncated++; - } - } else { - fNumTruncatedBytes = 0; - fFrameSize = buf->used; - } - - memcpy(fTo, buf->start, fFrameSize); - fFrameWaiting = true; + set_buffer(buf); } - void doFinishFrameGet() + void set_buffer(buffer_t *buf) { - if (fFrameWaiting) { - // Tell our client that we have new data - fFrameWaiting = false; - afterGetting(this); + std::unique_lock lk(lock); + + if (locked_buf) { + buffer_consumed(locked_buf, "rstp"); + locked_buf = NULL; } + + locked_buf = buf; + locked_buf_offset = 0; + buffer_use(buf); + } + + bool send_buffer() + { + std::unique_lock lk(lock); + + if (!locked_buf) + return false; + + if (!isCurrentlyAwaitingData()) + return false; + + if (locked_buf->used - locked_buf_offset > fMaxSize) { + fNumTruncatedBytes = locked_buf->used - fMaxSize - locked_buf_offset; + fFrameSize = fMaxSize; + } else { + fNumTruncatedBytes = 0; + fFrameSize = locked_buf->used - locked_buf_offset; + } + + memcpy(fTo, (char*)locked_buf->start + locked_buf_offset, fFrameSize); + locked_buf_offset += fFrameSize; + + if (locked_buf->used == locked_buf_offset) { + set_buffer(NULL); + } else if (rtsp_options->allow_truncated) { + rtsp_options->truncated++; + set_buffer(NULL); + } + + afterGetting(this); + return true; } Boolean fHaveStartedReading; Boolean fHadKeyFrame; Boolean fRequestedKeyFrame; - Boolean fFrameWaiting; - DynamicH264Stream *pNextStream; + std::recursive_mutex lock; + buffer_t *locked_buf; + unsigned locked_buf_offset; }; class DynamicH264VideoFileServerMediaSubsession : public OnDemandServerMediaSubsession @@ -197,7 +236,7 @@ static void rtsp_frame_finish() { std::unique_lock lk(rtsp_streams_lock); for (auto *stream : rtsp_streams) { - stream->doFinishFrameGet(); + stream->send_buffer(); } if (rtsp_options) { rtsp_options->clients = rtsp_streams.size(); @@ -209,7 +248,7 @@ static void *rtsp_server_thread(void *opaque) UsageEnvironment* env = (UsageEnvironment*)opaque; BasicTaskScheduler0* taskScheduler = (BasicTaskScheduler*)&env->taskScheduler(); - while(true) { + while (true) { rtsp_frame_finish(); taskScheduler->SingleStep(0); } @@ -226,7 +265,7 @@ static void rtsp_h264_capture(buffer_lock_t *buf_lock, buffer_t *buf) { std::unique_lock lk(rtsp_streams_lock); for (auto *stream : rtsp_streams) { - stream->receiveData(buf); + stream->receive_buf(buf); } } diff --git a/output/rtsp/rtsp.h b/output/rtsp/rtsp.h index 88b1d7a..912409c 100644 --- a/output/rtsp/rtsp.h +++ b/output/rtsp/rtsp.h @@ -2,6 +2,7 @@ typedef struct rtsp_options_s { bool running; + bool allow_truncated; uint port; int clients; int frames;