diff --git a/buffer_lock.c b/buffer_lock.c new file mode 100644 index 0000000..4f55494 --- /dev/null +++ b/buffer_lock.c @@ -0,0 +1,41 @@ +#include "buffer_lock.h" + +void buffer_lock_capture(buffer_lock_t *buf_lock, buffer_t *buf) +{ + pthread_mutex_lock(&buf_lock->lock); + buffer_consumed(buf_lock->buf); + buffer_use(buf); + buf_lock->buf = buf; + buf_lock->counter++; + E_LOG_DEBUG(buf_lock, "Captured HTTP snapshot: %d", buf_lock->counter); + pthread_cond_broadcast(&buf_lock->cond_wait); + pthread_mutex_unlock(&buf_lock->lock); +} + +buffer_t *buffer_lock_get(buffer_lock_t *buf_lock, int timeout_s, int *counter) +{ + buffer_t *buf = NULL; + struct timeval now; + struct timespec timeout; + gettimeofday(&now, NULL); + timeout.tv_nsec = now.tv_usec; + timeout.tv_sec = now.tv_sec + timeout_s; + + pthread_mutex_lock(&buf_lock->lock); + if (*counter == buf_lock->counter && timeout_s > 0) { + int ret = pthread_cond_timedwait(&buf_lock->cond_wait, &buf_lock->lock, &timeout); + if (ret == ETIMEDOUT) { + goto ret; + } else if (ret < 0) { + goto ret; + } + } + + buf = buf_lock->buf; + buffer_use(buf); + *counter = buf_lock->counter; + +ret: + pthread_mutex_unlock(&buf_lock->lock); + return buf; +} diff --git a/buffer_lock.h b/buffer_lock.h new file mode 100644 index 0000000..e95f65e --- /dev/null +++ b/buffer_lock.h @@ -0,0 +1,18 @@ +#pragma once + +#include "buffer.h" + +#include + +typedef struct buffer_lock_s { + const char *name; + pthread_mutex_t lock; + pthread_cond_t cond_wait; + buffer_t *buf; + int counter; +} buffer_lock_t; + +#define DEFINE_BUFFER_LOCK(name) static buffer_lock_t name = { #name, PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, 0 }; + +void buffer_lock_capture(buffer_lock_t *buf_lock, buffer_t *buf); +buffer_t *buffer_lock_get(buffer_lock_t *buf_lock, int timeout_s, int *counter); diff --git a/buffer_queue.c b/buffer_queue.c index 23ef493..91dae52 100644 --- a/buffer_queue.c +++ b/buffer_queue.c @@ -2,18 +2,34 @@ #include "buffer_list.h" #include "device.h" +#include + +pthread_mutex_t buffer_lock = PTHREAD_MUTEX_INITIALIZER; + bool buffer_use(buffer_t *buf) { - if (buf->enqueued) { + if (!buf) { + return false; + } + + pthread_mutex_lock(&buffer_lock); + if (!buf->enqueued) { + pthread_mutex_unlock(&buffer_lock); return false; } buf->mmap_reflinks += 1; + pthread_mutex_unlock(&buffer_lock); return true; } bool buffer_consumed(buffer_t *buf) { + if (!buf) { + return false; + } + + pthread_mutex_lock(&buffer_lock); if (buf->mmap_reflinks > 0) { buf->mmap_reflinks--; } @@ -42,13 +58,22 @@ bool buffer_consumed(buffer_t *buf) buf->enqueued = true; } + pthread_mutex_unlock(&buffer_lock); return true; + error: - if (buf->mmap_source) { - buffer_consumed(buf->mmap_source); + { + buffer_t *mmap_source = buf->mmap_source; buf->mmap_source = NULL; + + pthread_mutex_unlock(&buffer_lock); + + if (mmap_source) { + buffer_consumed(mmap_source); + } } + return false; } diff --git a/http.c b/http.c index a9532b1..7bd1e2e 100644 --- a/http.c +++ b/http.c @@ -5,12 +5,14 @@ #include #include #include +#include +#include #include "http.h" #define BUFSIZE 256 -int http_listen(int listen_port, int maxcons) +static int http_listen(int listen_port, int maxcons) { struct sockaddr_in server = {0}; int listenfd = -1; @@ -22,14 +24,20 @@ int http_listen(int listen_port, int maxcons) listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { + perror("socket"); return -1; } + int optval = 1; + setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int)); + if (bind(listenfd, (struct sockaddr *)&server, sizeof(server)) < 0) { + perror("bind"); goto error; } if (listen(listenfd, maxcons) < 0) { + perror("listen"); goto error; } @@ -40,47 +48,99 @@ error: return -1; } -void http_process(int fd, struct sockaddr_in *addr) +static void http_process(http_worker_t *worker, FILE *stream) { - FILE *stream = fdopen(fd, "r+"); - if (!stream) { + // Read headers + if (!fgets(worker->client_method, sizeof(worker->client_method), stream)) { return; } - char line[BUFSIZE]; - fgets(line, BUFSIZE, stream); - - fprintf(stream, "HTTP/1.1 200 OK\n"); - fprintf(stream, "Content-Type: text/plain\n"); - fprintf(stream, "\r\n"); - fprintf(stream, "Text.\n"); - fflush(stream); - fclose(stream); -} - -int http_worker(int listenfd) -{ - while (1) { - struct sockaddr_in addr; - int addrlen = sizeof(addr); - int ret = accept(listenfd, (struct sockaddr *)&addr, &addrlen); - if (ret < 0) { - return -1; - } - - http_process(ret, &addr); - close(ret); + // Consume headers + for(int i = 0; i < 50; i++) { + char line[BUFSIZE]; + if (!fgets(line, BUFSIZE, stream)) + return; + if (line[0] == '\r' && line[1] == '\n') + break; } + for (int i = 0; worker->methods[i].name; i++) { + if (strstr(worker->client_method, worker->methods[i].name)) { + worker->methods[i].func(worker, stream); + return; + } + } + + http_404(worker, stream); +} + +static void http_client(http_worker_t *worker) +{ + worker->client_host = inet_ntoa(worker->client_addr.sin_addr); + printf("Client connected %s.\n", worker->client_host); + + struct timeval tv; + tv.tv_sec = 3; + tv.tv_usec = 0; + setsockopt(worker->client_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)); + setsockopt(worker->client_fd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof(tv)); + + int on = 1; + setsockopt(worker->client_fd, IPPROTO_TCP, TCP_NODELAY, (void *)&on, sizeof(on)); + setsockopt(worker->client_fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); + + FILE *stream = fdopen(worker->client_fd, "r+"); + if (stream) { + http_process(worker, stream); + fclose(stream); + } + + close(worker->client_fd); + worker->client_fd = -1; + + printf("Client disconnected %s.\n", worker->client_host); + worker->client_host = NULL; +} + +static int http_worker(http_worker_t *worker) +{ + printf("http_worker=%d\n", worker->listen_fd); + + while (1) { + int addrlen = sizeof(worker->client_addr); + worker->client_fd = accept(worker->listen_fd, (struct sockaddr *)&worker->client_addr, &addrlen); + if (worker->client_fd < 0) { + goto error; + } + + http_client(worker); + } + +error: + free(worker->name); + free(worker); return -1; } -int http_worker_threads(int listenfd, int nthreads) +int http_server(int listen_port, int maxcons, http_method_t *methods) { - while (nthreads-- > 0) { - pthread_t thread; - pthread_create(&thread, NULL, (void *(*)(void*))http_worker, (void*)listenfd); + int listen_fd = http_listen(9092, maxcons); + if (listen_fd < 0) { + return -1; } - return 0; + while (maxcons-- > 0) { + char name[20]; + sprintf(name, "HTTP%d/%d", listen_port, maxcons); + + http_worker_t *worker = calloc(1, sizeof(http_worker_t)); + worker->name = strdup(name); + worker->listen_fd = listen_fd; + worker->methods = methods; + worker->client_fd = -1; + pthread_create(&worker->thread, NULL, (void *(*)(void*))http_worker, worker); + } + + return listen_fd; } + diff --git a/http.h b/http.h index de7500e..61ce861 100644 --- a/http.h +++ b/http.h @@ -1,7 +1,38 @@ #pragma once -#include "v4l2.h" +#include -int http_listen(int listen_port, int maxcons); -int http_worker(int listenfd); -int http_worker_threads(int listenfd, int nthreads); +#include "v4l2.h" +#include "buffer.h" + +struct http_worker_s; + +typedef void (*http_method_fn)(struct http_worker_s *worker, FILE *stream); + +typedef struct http_method_s { + const char *name; + http_method_fn func; +} http_method_t; + +typedef struct http_worker_s { + char *name; + int listen_fd; + http_method_t *methods; + pthread_t thread; + + int client_fd; + struct sockaddr_in client_addr; + char *client_host; + char client_method[256]; +} http_worker_t; + +int http_server(int listen_port, int maxcons, http_method_t *methods); + +void http_index(http_worker_t *worker, FILE *stream); +void http_404_header(http_worker_t *worker, FILE *stream); +void http_404(http_worker_t *worker, FILE *stream); + +// M-JPEG +void http_snapshot(http_worker_t *worker, FILE *stream); +void http_stream(http_worker_t *worker, FILE *stream); +void http_jpeg_capture(struct buffer_s *buf); diff --git a/http_jpeg.c b/http_jpeg.c new file mode 100644 index 0000000..c9a9c60 --- /dev/null +++ b/http_jpeg.c @@ -0,0 +1,76 @@ +#include +#include + +#include "http.h" +#include "buffer.h" +#include "buffer_lock.h" + +DEFINE_BUFFER_LOCK(http_jpeg); + +#define PART_BOUNDARY "123456789000000000000987654321" +#define CONTENT_TYPE "image/jpeg" +#define CONTENT_LENGTH "Content-Length" + +static const char *const STREAM_HEADER = "HTTP/1.0 200 OK\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Connection: close\r\n" + "Content-Type: multipart/x-mixed-replace;boundary=" PART_BOUNDARY "\r\n" + "\r\n" + "--" PART_BOUNDARY "\r\n"; +static const char *const STREAM_ERROR = "Content-Type: text/plain\r\n" + "\r\n" + "Error: %d (%s).\r\n" + "--" PART_BOUNDARY "\r\n"; +static const char *const STREAM_TIMEDOUT = "Content-Type: text/plain\r\n" + "\r\n" + "Timedout.\r\n" + "--" PART_BOUNDARY "\r\n"; +static const char *const STREAM_PART = "Content-Type: " CONTENT_TYPE "\r\n" CONTENT_LENGTH ": %u\r\n\r\n"; +static const char *const STREAM_BOUNDARY = "\r\n" + "--" PART_BOUNDARY "\r\n"; + + +void http_jpeg_capture(buffer_t *buf) +{ + buffer_lock_capture(&http_jpeg, buf); +} + +void http_snapshot(http_worker_t *worker, FILE *stream) +{ + int counter = 0; + buffer_t *buf = buffer_lock_get(&http_jpeg, 1, &counter); + + if (!buf) { + http_404_header(worker, stream); + fprintf(stream, "No snapshot captured yet.\r\n"); + return; + } + + fprintf(stream, "HTTP/1.1 200 OK\r\n"); + fprintf(stream, "Content-Type: image/jpeg\r\n"); + fprintf(stream, "Content-Length: %d\r\n", buf->used); + fprintf(stream, "\r\n"); + fwrite(buf->start, buf->used, 1, stream); + buffer_consumed(buf); +} + +void http_stream(http_worker_t *worker, FILE *stream) +{ + int counter = 0; + fprintf(stream, STREAM_HEADER); + + while (!feof(stream)) { + buffer_t *buf = buffer_lock_get(&http_jpeg, 3, &counter); + + if (!buf) { + fprintf(stream, STREAM_ERROR, -1, "No frames."); + return; + } + + fprintf(stream, STREAM_PART, buf->used); + fwrite(buf->start, buf->used, 1, stream); + fprintf(stream, STREAM_BOUNDARY); + + buffer_consumed(buf); + } +} diff --git a/http_methods.c b/http_methods.c new file mode 100644 index 0000000..f72f5c3 --- /dev/null +++ b/http_methods.c @@ -0,0 +1,28 @@ +#include +#include + +#include "http.h" +#include "buffer.h" +#include "buffer_lock.h" + +void http_index(http_worker_t *worker, FILE *stream) +{ + fprintf(stream, "HTTP/1.1 200 OK\r\n"); + fprintf(stream, "Content-Type: text/plain\r\n"); + fprintf(stream, "\r\n"); + fprintf(stream, "Text.\r\n"); + fflush(stream); +} + +void http_404_header(http_worker_t *worker, FILE *stream) +{ + fprintf(stream, "HTTP/1.1 404 Not Found\r\n"); + fprintf(stream, "Content-Type: text/plain\r\n"); + fprintf(stream, "\r\n"); +} + +void http_404(http_worker_t *worker, FILE *stream) +{ + http_404_header(worker, stream); + fprintf(stream, "Nothing here?\r\n"); +} diff --git a/main.c b/main.c index f4c5bd7..cd04e5f 100644 --- a/main.c +++ b/main.c @@ -49,34 +49,7 @@ int open_h264(buffer_list_t *src, const char *tmp) return 0; } -void write_jpeg(buffer_t *buf) -{ - FILE *fp = fopen("/tmp/capture.jpg.tmp", "wb"); - if (fp) { - fwrite(buf->start, 1, buf->used, fp); - fclose(fp); - E_LOG_DEBUG(buf, "Wrote JPEG: %d", buf->used); - } - rename("/tmp/capture.jpg.tmp", "/tmp/capture.jpg"); -} - -bool check_streaming() -{ - return true; -} - -void write_h264(buffer_t *buf) -{ - FILE *fp = fopen("/tmp/capture.h264.tmp", "wb"); - if (fp) { - fwrite(buf->start, 1, buf->used, fp); - fclose(fp); - E_LOG_DEBUG(buf, "Wrote H264: %d", buf->used); - } - rename("/tmp/capture.h264.tmp", "/tmp/capture.h264"); -} - -int main(int argc, char *argv[]) +int open_camera() { camera = device_open("CAMERA", "/dev/video0"); @@ -96,13 +69,31 @@ int main(int argc, char *argv[]) codec_h264->allow_dma = false; if (open_isp(camera->capture_list, "/dev/video13", "/dev/video14", "/dev/video15") < 0) { - goto error; + return -1; } if (open_jpeg(camera_use_low ? isp_yuuv_low->capture_list : isp_yuuv->capture_list, "/dev/video31") < 0) { - goto error; + return -1; } if (open_h264(camera_use_low ? isp_yuuv_low->capture_list : isp_yuuv->capture_list, "/dev/video11") < 0) { - goto error; + return -1; + } + + return 0; +} + +bool check_streaming() +{ + return true; +} + +void write_h264(buffer_t *buf) +{ +} + +int main(int argc, char *argv[]) +{ + if (open_camera() < 0) { + return -1; } link_t links[] = { @@ -130,28 +121,32 @@ int main(int argc, char *argv[]) { codec_jpeg, { }, - { write_jpeg, check_streaming } + { http_jpeg_capture, check_streaming } }, { codec_h264, { }, - { write_h264, check_streaming } + { NULL, check_streaming } }, { NULL } }; - // if (links_init(links) < 0) { - // return -1; - // } + http_method_t http_methods[] = { + { "GET / ", http_index }, + { "GET /snapshot ", http_snapshot }, + { "GET /stream ", http_stream }, + { "GET /?action=snapshot ", http_snapshot }, + { "GET /?action=stream ", http_stream }, + { NULL, NULL } + }; - int httpfd = http_listen(9090, 5); - if (httpfd >= 0) { - http_worker_threads(httpfd, 8); - } + int http_fd = http_server(9092, 5, http_methods); bool running = false; links_loop(links, &running); + close(http_fd); + error: device_close(isp_yuuv_low); device_close(isp_yuuv);