Move http/ and rtsp/ into output/

This commit is contained in:
Kamil Trzcinski
2022-09-02 22:06:19 +02:00
parent 17ec167dd9
commit dd3aa47805
11 changed files with 36 additions and 24 deletions

165
output/http_ffmpeg.c Normal file
View File

@@ -0,0 +1,165 @@
#include <stdio.h>
#include <stdlib.h>
#include "output.h"
#include "util/opts/log.h"
#include "util/http/http.h"
#include "device/buffer.h"
#include "device/buffer_lock.h"
#include "device/buffer_list.h"
#include "device/device.h"
#include "util/ffmpeg/remuxer.h"
buffer_lock_t *http_h264_buffer_for_res(http_worker_t *worker);
static const char *const VIDEO_HEADER =
"HTTP/1.0 200 OK\r\n"
"Access-Control-Allow-Origin: *\r\n"
"Connection: close\r\n"
"Content-Type: %s\r\n"
"\r\n";
typedef struct {
const char *name;
FILE *stream;
const char *content_type;
bool had_key_frame;
bool requested_key_frame;
bool wrote_header;
buffer_t *buf;
unsigned buf_offset;
unsigned stream_offset;
ffmpeg_remuxer_t *remuxer;
} http_ffmpeg_status_t;
static int http_ffmpeg_read_from_buf(void *opaque, uint8_t *buf, int buf_size)
{
http_ffmpeg_status_t *status = opaque;
if (!status->buf)
return FFMPEG_DATA_PACKET_EOF;
buf_size = MIN(buf_size, status->buf->used - status->buf_offset);
if (!buf_size)
return FFMPEG_DATA_PACKET_EOF;
LOG_DEBUG(status, "http_ffmpeg_read_from_buf: offset=%d, n=%d", status->buf_offset, buf_size);
memcpy(buf, (char*)status->buf->start + status->buf_offset, buf_size);
status->buf_offset += buf_size;
return buf_size;
}
static int http_ffmpeg_write_to_stream(void *opaque, uint8_t *buf, int buf_size)
{
http_ffmpeg_status_t *status = opaque;
if (!status->stream)
return FFMPEG_DATA_PACKET_EOF;
if (!status->wrote_header) {
fprintf(status->stream, VIDEO_HEADER, status->content_type);
status->wrote_header = true;
}
size_t n = fwrite(buf, 1, buf_size, status->stream);
fflush(status->stream);
LOG_DEBUG(status, "http_ffmpeg_write_to_stream: offset=%d, n=%zu, buf_size=%d, error=%d",
status->stream_offset, n, buf_size, ferror(status->stream));
status->stream_offset += n;
if (ferror(status->stream))
return FFMPEG_DATA_PACKET_EOF;
return n;
}
static int http_ffmpeg_video_buf_part(buffer_lock_t *buf_lock, buffer_t *buf, int frame, http_ffmpeg_status_t *status)
{
if (!status->had_key_frame) {
status->had_key_frame = h264_is_key_frame(buf);
}
if (!status->had_key_frame) {
if (!status->requested_key_frame) {
device_video_force_key(buf->buf_list->dev);
status->requested_key_frame = true;
}
return 0;
}
int ret = -1;
status->buf = buf;
status->buf_offset = 0;
if ((ret = ffmpeg_remuxer_open(status->remuxer)) < 0)
goto error;
if ((ret = ffmpeg_remuxer_feed(status->remuxer, 0)) < 0)
goto error;
ret = 1;
error:
status->buf = NULL;
return ret;
}
static void http_ffmpeg_video(http_worker_t *worker, FILE *stream, const char *content_type, const char *video_format)
{
http_ffmpeg_status_t status = {
.name = worker->name,
.stream = stream,
.content_type = content_type,
};
ffmpeg_remuxer_t remuxer = {
.name = worker->name,
.input_format = "h264",
.video_format = video_format,
.opaque = &status,
.read_packet = http_ffmpeg_read_from_buf,
.write_packet = http_ffmpeg_write_to_stream,
};
status.remuxer = &remuxer;
#ifdef USE_FFMPEG
av_dict_set_int(&remuxer.output_opts, "direct", 1, 0);
//av_dict_set_int(&remuxer.output_opts, "frag_duration", 1, 0);
av_dict_set_int(&remuxer.output_opts, "frag_size", 4096, 0);
av_dict_set_int(&remuxer.output_opts, "low_delay", 1, 0);
av_dict_set_int(&remuxer.output_opts, "nobuffer", 1, 0);
av_dict_set_int(&remuxer.output_opts, "flush_packets", 1, 0);
#endif
int n = buffer_lock_write_loop(
http_h264_buffer_for_res(worker),
0,
(buffer_write_fn)http_ffmpeg_video_buf_part,
&status);
ffmpeg_remuxer_close(&remuxer);
if (status.wrote_header) {
return;
}
http_500(stream, NULL);
if (n == 0) {
fprintf(stream, "No frames.\n");
} else if (n < 0) {
fprintf(stream, "Interrupted. Received %d frames", -n);
}
}
void http_mkv_video(http_worker_t *worker, FILE *stream)
{
http_ffmpeg_video(worker, stream, "video/mp4", "matroska");
}
void http_mp4_video(http_worker_t *worker, FILE *stream)
{
http_ffmpeg_video(worker, stream, "video/mp4", "mp4");
}

109
output/http_h264.c Normal file
View File

@@ -0,0 +1,109 @@
#include <stdio.h>
#include <stdlib.h>
#include "output.h"
#include "util/opts/log.h"
#include "util/http/http.h"
#include "device/buffer.h"
#include "device/buffer_lock.h"
#include "device/buffer_list.h"
#include "device/device.h"
DEFINE_BUFFER_LOCK(http_h264, 0);
DEFINE_BUFFER_LOCK(http_h264_lowres, 0);
static const char *const VIDEO_HEADER =
"HTTP/1.0 200 OK\r\n"
"Access-Control-Allow-Origin: *\r\n"
"Connection: close\r\n"
"Content-Type: application/octet-stream\r\n"
"\r\n";
void http_h264_capture(buffer_t *buf)
{
buffer_lock_capture(&http_h264, buf);
}
void http_h264_lowres_capture(buffer_t *buf)
{
buffer_lock_capture(&http_h264_lowres, buf);
}
bool http_h264_needs_buffer()
{
return buffer_lock_needs_buffer(&http_h264) | buffer_lock_needs_buffer(&http_h264_lowres);
}
buffer_lock_t *http_h264_buffer_for_res(http_worker_t *worker)
{
if (strstr(worker->client_method, HTTP_LOW_RES_PARAM))
return &http_h264_lowres;
else
return &http_h264;
}
typedef struct {
FILE *stream;
bool wrote_header;
bool had_key_frame;
bool requested_key_frame;
} http_video_status_t;
bool h264_is_key_frame(buffer_t *buf)
{
unsigned char *data = buf->start;
if (buf->flags.is_keyframe) {
LOG_DEBUG(buf, "Got key frame (from V4L2)!");
return true;
} else if (buf->used >= 5 && (data[4] & 0x1F) == 0x07) {
LOG_DEBUG(buf, "Got key frame (from buffer)!");
return true;
}
return false;
}
int http_video_buf_part(buffer_lock_t *buf_lock, buffer_t *buf, int frame, http_video_status_t *status)
{
if (!status->had_key_frame) {
status->had_key_frame = h264_is_key_frame(buf);
}
if (!status->had_key_frame) {
if (!status->requested_key_frame) {
device_video_force_key(buf->buf_list->dev);
status->requested_key_frame = true;
}
return 0;
}
if (!status->wrote_header) {
fputs(VIDEO_HEADER, status->stream);
status->wrote_header = true;
}
if (!fwrite(buf->start, buf->used, 1, status->stream)) {
return -1;
}
fflush(status->stream);
return 1;
}
void http_h264_video(http_worker_t *worker, FILE *stream)
{
http_video_status_t status = { stream };
int n = buffer_lock_write_loop(http_h264_buffer_for_res(worker), 0, (buffer_write_fn)http_video_buf_part, &status);
if (status.wrote_header) {
return;
}
http_500(stream, NULL);
if (n == 0) {
fprintf(stream, "No frames.\n");
} else if (n < 0) {
fprintf(stream, "Interrupted. Received %d frames", -n);
}
}

97
output/http_jpeg.c Normal file
View File

@@ -0,0 +1,97 @@
#include <stdio.h>
#include <stdlib.h>
#include "output.h"
#include "util/http/http.h"
#include "device/buffer.h"
#include "device/buffer_lock.h"
DEFINE_BUFFER_LOCK(http_jpeg, 1000);
DEFINE_BUFFER_LOCK(http_jpeg_lowres, 1000);
#define PART_BOUNDARY "123456789000000000000987654321"
#define CONTENT_TYPE "image/jpeg"
#define CONTENT_LENGTH "Content-Length"
static const char *const STREAM_HEADER = "HTTP/1.0 200 OK\r\n"
"Access-Control-Allow-Origin: *\r\n"
"Connection: close\r\n"
"Content-Type: multipart/x-mixed-replace;boundary=" PART_BOUNDARY "\r\n"
"\r\n"
"--" PART_BOUNDARY "\r\n";
static const char *const STREAM_PART = "Content-Type: " CONTENT_TYPE "\r\n" CONTENT_LENGTH ": %u\r\n\r\n";
static const char *const STREAM_BOUNDARY = "\r\n"
"--" PART_BOUNDARY "\r\n";
bool http_jpeg_needs_buffer()
{
return buffer_lock_needs_buffer(&http_jpeg) | buffer_lock_needs_buffer(&http_jpeg_lowres);
}
void http_jpeg_capture(buffer_t *buf)
{
buffer_lock_capture(&http_jpeg, buf);
}
void http_jpeg_lowres_capture(buffer_t *buf)
{
buffer_lock_capture(&http_jpeg_lowres, buf);
}
buffer_lock_t *http_jpeg_buffer_for_res(http_worker_t *worker)
{
if (strstr(worker->client_method, HTTP_LOW_RES_PARAM))
return &http_jpeg_lowres;
else
return &http_jpeg;
}
int http_snapshot_buf_part(buffer_lock_t *buf_lock, buffer_t *buf, int frame, FILE *stream)
{
fprintf(stream, "HTTP/1.1 200 OK\r\n");
fprintf(stream, "Content-Type: image/jpeg\r\n");
fprintf(stream, "Content-Length: %zu\r\n", buf->used);
fprintf(stream, "\r\n");
fwrite(buf->start, buf->used, 1, stream);
return 1;
}
void http_snapshot(http_worker_t *worker, FILE *stream)
{
int n = buffer_lock_write_loop(http_jpeg_buffer_for_res(worker), 1, (buffer_write_fn)http_snapshot_buf_part, stream);
if (n <= 0) {
http_500(stream, NULL);
fprintf(stream, "No snapshot captured yet.\r\n");
}
}
int http_stream_buf_part(buffer_lock_t *buf_lock, buffer_t *buf, int frame, FILE *stream)
{
if (!frame && !fputs(STREAM_HEADER, stream)) {
return -1;
}
if (!fprintf(stream, STREAM_PART, buf->used)) {
return -1;
}
if (!fwrite(buf->start, buf->used, 1, stream)) {
return -1;
}
if (!fputs(STREAM_BOUNDARY, stream)) {
return -1;
}
return 1;
}
void http_stream(http_worker_t *worker, FILE *stream)
{
int n = buffer_lock_write_loop(http_jpeg_buffer_for_res(worker), 0, (buffer_write_fn)http_stream_buf_part, stream);
if (n == 0) {
http_500(stream, NULL);
fprintf(stream, "No frames.\n");
} else if (n < 0) {
fprintf(stream, "Interrupted. Received %d frames", -n);
}
}

26
output/output.h Normal file
View File

@@ -0,0 +1,26 @@
#pragma once
#include <stdbool.h>
struct http_worker_s;
struct buffer_s;
// M-JPEG
void http_snapshot(struct http_worker_s *worker, FILE *stream);
void http_stream(struct http_worker_s *worker, FILE *stream);
void http_jpeg_capture(struct buffer_s *buf);
void http_jpeg_lowres_capture(struct buffer_s *buf);
bool http_jpeg_needs_buffer();
void http_option(struct http_worker_s *worker, FILE *stream);
// H264
bool http_h264_needs_buffer();
void http_h264_capture(struct buffer_s *buf);
void http_h264_lowres_capture(struct buffer_s *buf);
void http_h264_video(struct http_worker_s *worker, FILE *stream);
bool h264_is_key_frame(struct buffer_s *buf);
void http_mkv_video(struct http_worker_s *worker, FILE *stream);
void http_mp4_video(struct http_worker_s *worker, FILE *stream);
void http_mov_video(struct http_worker_s *worker, FILE *stream);
#define HTTP_LOW_RES_PARAM "res=low"

287
output/rtsp/rtsp.cc Normal file
View File

@@ -0,0 +1,287 @@
extern "C" {
#include "util/http/http.h"
#include "device/buffer.h"
#include "device/buffer_list.h"
#include "device/device.h"
#include "util/opts/log.h"
#include "util/opts/fourcc.h"
#include "util/opts/control.h"
#include "output/output.h"
#include "rtsp.h"
};
#ifdef USE_RTSP
#include <BasicUsageEnvironment.hh>
#include <RTSPServerSupportingHTTPStreaming.hh>
#include <OnDemandServerMediaSubsession.hh>
#include <H264VideoStreamFramer.hh>
#include <H264VideoRTPSink.hh>
static pthread_t rtsp_thread;
static pthread_mutex_t rtsp_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static class DynamicH264Stream *rtsp_streams;
static const char *stream_name = "stream.h264";
static const char *stream_low_res_name = "stream_low_res.h264";
class DynamicH264Stream : public FramedSource
{
public:
DynamicH264Stream(UsageEnvironment& env, Boolean lowResMode)
: FramedSource(env), fHaveStartedReading(False), fLowResMode(lowResMode)
{
}
void doGetNextFrame()
{
pthread_mutex_lock(&rtsp_lock);
if (!fHaveStartedReading) {
pNextStream = rtsp_streams;
rtsp_streams = this;
fHaveStartedReading = True;
}
pthread_mutex_unlock(&rtsp_lock);
}
void doStopGettingFrames()
{
pthread_mutex_lock(&rtsp_lock);
if (fHaveStartedReading) {
DynamicH264Stream **streamp = &rtsp_streams;
while (*streamp) {
if (*streamp == this) {
*streamp = pNextStream;
pNextStream = NULL;
break;
}
streamp = &(*streamp)->pNextStream;
}
fHaveStartedReading = False;
}
pthread_mutex_unlock(&rtsp_lock);
}
void receiveData(buffer_t *buf, bool lowResMode)
{
if (!isCurrentlyAwaitingData()) {
return; // we're not ready for the data yet
}
if (fLowResMode != lowResMode) {
return;
}
if (h264_is_key_frame(buf)) {
fHadKeyFrame = true;
}
if (!fRequestedKeyFrame) {
if (!fHadKeyFrame) {
printf("device_video_force_key: %p\n", this);
device_video_force_key(buf->buf_list->dev);
}
fRequestedKeyFrame = true;
}
if (!fHadKeyFrame) {
return;
}
if (buf->used > fMaxSize) {
fNumTruncatedBytes = buf->used - fMaxSize;
fFrameSize = fMaxSize;
} else {
fNumTruncatedBytes = 0;
fFrameSize = buf->used;
}
memcpy(fTo, buf->start, fFrameSize);
// Tell our client that we have new data:
afterGetting(this); // we're preceded by a net read; no infinite recursion
}
private:
Boolean fHaveStartedReading;
Boolean fHadKeyFrame;
Boolean fRequestedKeyFrame;
Boolean fLowResMode;
public:
DynamicH264Stream *pNextStream;
};
class DynamicH264VideoFileServerMediaSubsession : public OnDemandServerMediaSubsession
{
public:
DynamicH264VideoFileServerMediaSubsession(UsageEnvironment& env, Boolean reuseFirstSource, Boolean lowResMode)
: OnDemandServerMediaSubsession(env, reuseFirstSource), fLowResMode(lowResMode)
{
}
virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate)
{
estBitrate = 500; // kbps, estimate
return H264VideoStreamFramer::createNew(envir(), new DynamicH264Stream(envir(), fLowResMode));
}
virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* /*inputSource*/)
{
return H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
}
private:
Boolean fLowResMode;
};
class DynamicRTSPServer: public RTSPServerSupportingHTTPStreaming
{
public:
static DynamicRTSPServer* createNew(UsageEnvironment& env, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds = 65)
{
int ourSocket = setUpOurSocket(env, ourPort);
if (ourSocket == -1) return NULL;
return new DynamicRTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);
}
protected:
DynamicRTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,
UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds)
: RTSPServerSupportingHTTPStreaming(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds)
{
}
// called only by createNew();
virtual ~DynamicRTSPServer()
{
}
protected: // redefined virtual functions
virtual ServerMediaSession* lookupServerMediaSession(char const* streamName, Boolean isFirstLookupInSession)
{
bool lowResMode = false;
if (strcmp(streamName, stream_name) == 0) {
LOG_INFO(NULL, "Requesting %s stream...", streamName);
} else if (strcmp(streamName, stream_low_res_name) == 0) {
LOG_INFO(NULL, "Requesting %s stream (low resolution mode)...", streamName);
lowResMode = true;
} else {
LOG_INFO(NULL, "No stream available: '%s'", streamName);
return NULL;
}
auto sms = RTSPServer::lookupServerMediaSession(streamName);
if (sms && isFirstLookupInSession) {
// Remove the existing "ServerMediaSession" and create a new one, in case the underlying
// file has changed in some way:
removeServerMediaSession(sms);
sms = NULL;
}
sms = ServerMediaSession::createNew(envir(), streamName, streamName, "streamed by the LIVE555 Media Server");;
OutPacketBuffer::maxSize = 2000000; // allow for some possibly large H.264 frames
auto subsession = new DynamicH264VideoFileServerMediaSubsession(envir(), false, lowResMode);
sms->addSubsession(subsession);
addServerMediaSession(sms);
return sms;
}
};
static void *rtsp_server_thread(void *opaque)
{
UsageEnvironment* env = (UsageEnvironment*)opaque;
env->taskScheduler().doEventLoop(); // does not return
return NULL;
}
extern "C" int rtsp_server()
{
// Begin by setting up our usage environment:
TaskScheduler* scheduler = BasicTaskScheduler::createNew();
UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);
UserAuthenticationDatabase* authDB = NULL;
#ifdef ACCESS_CONTROL
// To implement client access control to the RTSP server, do the following:
authDB = new UserAuthenticationDatabase;
authDB->addUserRecord("username1", "password1"); // replace these with real strings
// Repeat the above with each <username>, <password> that you wish to allow
// access to the server.
#endif
RTSPServer* rtspServer;
rtspServer = DynamicRTSPServer::createNew(*env, rtsp_options.port, authDB);
if (rtspServer == NULL) {
LOG_ERROR(NULL, "Failed to create RTSP server: %s", env->getResultMsg());
return -1;
}
LOG_INFO(NULL, "Running RTSP server on '%d'", rtsp_options.port);
// if (rtspServer->setUpTunnelingOverHTTP(80) || rtspServer->setUpTunnelingOverHTTP(8000) || rtspServer->setUpTunnelingOverHTTP(8080)) {
// LOG_INFO(NULL, "Running RTSP-over-HTTP tunneling on '%d'", rtspServer->httpServerPortNum());
// *env << "(We use port " << rtspServer->httpServerPortNum() << " for optional RTSP-over-HTTP tunneling, or for HTTP live streaming (for indexed Transport Stream files only).)\n";
// } else {
// LOG_INFO(NULL, "The RTSP-over-HTTP is not available.");
// }
pthread_create(&rtsp_thread, NULL, rtsp_server_thread, env);
return 0;
error:
return -1;
}
extern "C" bool rtsp_h264_needs_buffer()
{
return rtsp_streams != NULL;
}
extern "C" void rtsp_h264_capture(buffer_t *buf)
{
pthread_mutex_lock(&rtsp_lock);
for (DynamicH264Stream *stream = rtsp_streams; stream; stream = stream->pNextStream) {
stream->receiveData(buf, false);
}
pthread_mutex_unlock(&rtsp_lock);
}
extern "C" void rtsp_h264_low_res_capture(struct buffer_s *buf)
{
pthread_mutex_lock(&rtsp_lock);
for (DynamicH264Stream *stream = rtsp_streams; stream; stream = stream->pNextStream) {
stream->receiveData(buf, true);
}
pthread_mutex_unlock(&rtsp_lock);
}
#else // USE_RTSP
extern "C" int rtsp_server()
{
return 0;
}
extern "C" bool rtsp_h264_needs_buffer()
{
return false;
}
extern "C" void rtsp_h264_capture(buffer_t *buf)
{
}
extern "C" void rtsp_h264_low_res_capture(struct buffer_s *buf)
{
}
#endif // USE_RTSP

12
output/rtsp/rtsp.h Normal file
View File

@@ -0,0 +1,12 @@
#pragma once
typedef struct rtsp_options_s {
uint port;
} rtsp_options_t;
extern rtsp_options_t rtsp_options;
int rtsp_server();
bool rtsp_h264_needs_buffer();
void rtsp_h264_capture(struct buffer_s *buf);
void rtsp_h264_low_res_capture(struct buffer_s *buf);