From d57feb0359a6a2f6eba7c523cc2e1c7ed16ef9b3 Mon Sep 17 00:00:00 2001 From: Kamil Trzcinski Date: Fri, 24 Feb 2023 00:18:49 +0100 Subject: [PATCH] links: enqueue and dequeue buffers in sync (output and capture) asynchronously --- device/buffer_list.c | 8 +++++ device/buffer_list.h | 7 ++++ device/buffer_queue.c | 27 ++++++++++++++++ device/links.c | 74 ++++++++++++++++++++++++------------------- 4 files changed, 84 insertions(+), 32 deletions(-) diff --git a/device/buffer_list.c b/device/buffer_list.c index 5c2cf21..595334f 100644 --- a/device/buffer_list.c +++ b/device/buffer_list.c @@ -99,6 +99,14 @@ int buffer_list_set_stream(buffer_list_t *buf_list, bool do_on) } buf_list->streaming = do_on; + if (!do_on) { + ARRAY_FOREACH(buffer_t*, queued_buf, buf_list->queued_bufs, buf_list->n_queued_bufs) { + buffer_consumed(*queued_buf, "stream stop"); + *queued_buf = NULL; + } + buf_list->n_queued_bufs = 0; + } + int enqueued = buffer_list_count_enqueued(buf_list); LOG_INFO(buf_list, "Streaming %s... Was %d of %d enqueud", do_on ? "started" : "stopped", enqueued, buf_list->nbufs); return 0; diff --git a/device/buffer_list.h b/device/buffer_list.h index ea836b3..e6e37e3 100644 --- a/device/buffer_list.h +++ b/device/buffer_list.h @@ -26,6 +26,8 @@ typedef struct buffer_stats_s { int frames, dropped; } buffer_stats_t; +#define MAX_BUFFER_QUEUE 1 + typedef struct buffer_list_s { char *name; char *path; @@ -43,6 +45,9 @@ typedef struct buffer_list_s { struct buffer_list_libcamera_s *libcamera; }; + buffer_t *queued_bufs[MAX_BUFFER_QUEUE]; + int n_queued_bufs; + uint64_t last_enqueued_us, last_dequeued_us; int last_capture_time_us, last_in_queue_time_us; bool streaming; @@ -59,3 +64,5 @@ buffer_t *buffer_list_find_slot(buffer_list_t *buf_list); buffer_t *buffer_list_dequeue(buffer_list_t *buf_list); int buffer_list_count_enqueued(buffer_list_t *buf_list); int buffer_list_enqueue(buffer_list_t *buf_list, buffer_t *dma_buf); +bool buffer_list_push_to_queue(buffer_list_t *buf_list, buffer_t *dma_buf); +buffer_t *buffer_list_pop_from_queue(buffer_list_t *buf_list); diff --git a/device/buffer_queue.c b/device/buffer_queue.c index e767aaf..d1f92bf 100644 --- a/device/buffer_queue.c +++ b/device/buffer_queue.c @@ -210,3 +210,30 @@ int buffer_list_pollfd(buffer_list_t *buf_list, struct pollfd *pollfd, bool can_ { return buf_list->dev->hw->buffer_list_pollfd(buf_list, pollfd, can_dequeue); } + +bool buffer_list_push_to_queue(buffer_list_t *buf_list, buffer_t *dma_buf) +{ + if (buf_list->dev->paused) + return true; + if (buf_list->n_queued_bufs >= MAX_BUFFER_QUEUE) + return false; + + buffer_use(dma_buf); + buf_list->queued_bufs[buf_list->n_queued_bufs++] = dma_buf; + return true; +} + +buffer_t *buffer_list_pop_from_queue(buffer_list_t *buf_list) +{ + if (buf_list->n_queued_bufs <= 0) + return NULL; + + buffer_t *buf = buf_list->queued_bufs[0]; + buf_list->n_queued_bufs--; + + for (int i = 0; i < buf_list->n_queued_bufs; i++) { + buf_list->queued_bufs[i] = buf_list->queued_bufs[i+1]; + } + + return buf; +} diff --git a/device/links.c b/device/links.c index 31a784d..1162855 100644 --- a/device/links.c +++ b/device/links.c @@ -115,12 +115,40 @@ static bool links_enqueue_capture_buffers(buffer_list_t *capture_list, int *time } // enqueue new capture buffer - buffer_consumed(capture_buf, "enqueued"); + buffer_list_t *output_list = capture_list->dev->output_list; - if (capture_list->fmt.interval_us == 0) + // no output, just give back capture_buf + if (!output_list) { + buffer_consumed(capture_buf, "enqueued"); + if (capture_list->fmt.interval_us > 0) + return false; return true; + } - return false; + // try to find matching output slot, ignore if not present + if (!buffer_list_find_slot(output_list)) { + return false; + } + + bool can_enqueue = false; + + // try to look for output, if there's a matching capture to be consumed + buffer_t *queued_capture_for_output_buf = buffer_list_pop_from_queue(output_list); + if (queued_capture_for_output_buf) { + // then push a capture from source into output for this capture + if (buffer_list_enqueue(output_list, queued_capture_for_output_buf)) { + buffer_consumed(capture_buf, "enqueued"); + if (capture_list->fmt.interval_us <= 0) + can_enqueue = true; + } else { + queued_capture_for_output_buf->buf_list->stats.dropped++; + } + + // release this buffer + buffer_consumed(queued_capture_for_output_buf, "from-queue"); + } + + return can_enqueue; } static void links_process_capture_buffers(link_t *all_links, int *timeout_next_ms) @@ -161,7 +189,8 @@ static int links_build_fds(link_t *all_links, link_pool_t *link_pool) for (int j = 0; j < link->n_output_lists; j++) { buffer_list_t *output_list = link->output_lists[j]; - if (buffer_list_count_enqueued(output_list) == 0) { + int count_output_enqueued = buffer_list_count_enqueued(output_list); + if (count_output_enqueued == 0) { continue; } @@ -169,8 +198,10 @@ static int links_build_fds(link_t *all_links, link_pool_t *link_pool) return -EINVAL; } + int count_capture_enqueued = buffer_list_count_enqueued(output_list->dev->capture_lists[0]); + // Can something be dequeued? - if (buffer_list_pollfd(output_list, &link_pool->fds[n], true) == 0) { + if (buffer_list_pollfd(output_list, &link_pool->fds[n], count_output_enqueued > count_capture_enqueued) == 0) { link_pool->output_lists[n] = output_list; link_pool->links[n] = NULL; n++; @@ -181,29 +212,6 @@ static int links_build_fds(link_t *all_links, link_pool_t *link_pool) return n; } -static bool links_output_list_can_enqueue(buffer_list_t *output_list) -{ - if (output_list->dev->paused) - return false; - - int output_count = buffer_list_count_enqueued(output_list); - - if (output_list->do_capture) { - perror("should not happen"); - } - - int capture_count = device_capture_enqueued(output_list->dev, NULL); - - // only enqueue on output, if there are already captures (and there's more of them) - if (capture_count <= output_count) { - LOG_DEBUG(output_list, "Skipping enqueue of output (output_enqueued=%d, capture_enqueued=%d)", - output_count, capture_count); - return false; - } - - return true; -} - static int links_enqueue_from_capture_list(buffer_list_t *capture_list, link_t *link) { if (!link) { @@ -232,9 +240,10 @@ static int links_enqueue_from_capture_list(buffer_list_t *capture_list, link_t * bool dropped = false; for (int j = 0; j < link->n_output_lists; j++) { - if (links_output_list_can_enqueue(link->output_lists[j])) { - buffer_list_enqueue(link->output_lists[j], buf); - } else { + if (link->output_lists[j]->dev->paused) { + continue; + } + if (!buffer_list_push_to_queue(link->output_lists[j], buf)) { dropped = true; } } @@ -390,13 +399,14 @@ static void links_refresh_stats(link_t *all_links, uint64_t *last_refresh_us) buffer_stats_t *now = &capture_list->stats; buffer_stats_t *prev = &capture_list->stats_last; - printf(" [%8s %2d FPS/%2d D/%3dms/%3dms/%c/O%d:C%d]", + printf(" [%8s %2d FPS/%2d D/%3dms/%3dms/%c/Q%d:O%d:C%d]", capture_list->dev->name, (now->frames - prev->frames) / log_options.stats, (now->dropped - prev->dropped) / log_options.stats, capture_list->last_capture_time_us > 0 ? capture_list->last_capture_time_us / 1000 : -1, capture_list->last_in_queue_time_us > 0 ? capture_list->last_in_queue_time_us / 1000 : -1, capture_list->streaming ? (capture_list->dev->paused ? 'P' : 'S') : 'X', + capture_list->dev->output_list ? capture_list->dev->output_list->n_queued_bufs : 0, capture_list->dev->output_list ? buffer_list_count_enqueued(capture_list->dev->output_list) : 0, buffer_list_count_enqueued(capture_list) );