From 6d30270b161dea2cdc4e8343f2fd94b5e1561a62 Mon Sep 17 00:00:00 2001 From: Kamil Trzcinski Date: Wed, 22 Feb 2023 14:22:00 +0100 Subject: [PATCH] links: improve the implementation (simplify it) --- device/links.c | 233 +++++++++++++++++++++++++++---------------------- 1 file changed, 129 insertions(+), 104 deletions(-) diff --git a/device/links.c b/device/links.c index 4f44ba7..a4a071d 100644 --- a/device/links.c +++ b/device/links.c @@ -10,15 +10,13 @@ #define CAPTURE_TIMEOUT_US (1000*1000) #define N_FDS 50 -#define QUEUE_ON_CAPTURE // seems to provide better latency -// #define LIMIT_CAPTURE_BUFFERS typedef struct link_pool_s { struct pollfd fds[N_FDS]; link_t *links[N_FDS]; - buffer_list_t *buf_lists[N_FDS]; - int max_timeout_ms; + buffer_list_t *capture_lists[N_FDS]; + buffer_list_t *output_lists[N_FDS]; } link_pool_t; static bool link_needs_buffer_by_callbacks(link_t *link) @@ -38,77 +36,142 @@ static bool link_needs_buffer_by_callbacks(link_t *link) return needs; } -static int links_build_fds(link_t *all_links, link_pool_t *link_pool) +static bool link_needs_buffer_by_sinks(link_t *link) { - int n = 0, nlinks = 0; + bool needs = false; - uint64_t now_us __attribute__((unused)) = get_monotonic_time_us(NULL, NULL); + for (int j = 0; j < link->n_output_lists; j++) { + buffer_list_t *output_list = link->output_lists[j]; - for (nlinks = 0; all_links[nlinks].capture_list; nlinks++); + if (!output_list->dev->paused) { + needs = true; + } + } + return needs; +} + +static int links_count(link_t *all_links) +{ + int n = 0; + for (n = 0; all_links[n].capture_list; n++); + return n; +} + +static void links_process_paused(link_t *all_links) +{ // This traverses in reverse order as it requires to first fix outputs // and go back into captures - for (int i = nlinks; i-- > 0; ) { + for (int i = links_count(all_links); i-- > 0; ) { link_t *link = &all_links[i]; buffer_list_t *capture_list = link->capture_list; - if (n >= N_FDS) { - return -EINVAL; - } - if (!capture_list->streaming) { - continue; - } - bool paused = true; if (link_needs_buffer_by_callbacks(link)) { paused = false; } - for (int j = 0; j < link->n_output_lists; j++) { - buffer_list_t *sink = link->output_lists[j]; - - if (n >= N_FDS) { - return -EINVAL; - } - if (!sink->streaming) { - continue; - } - - // Can something be dequeued? - if (buffer_list_pollfd(sink, &link_pool->fds[n], true) == 0) { - link_pool->buf_lists[n] = sink; - link_pool->links[n] = NULL; - n++; - } - - // Can this chain pauses - int count_enqueued = buffer_list_count_enqueued(sink); - if (!sink->dev->paused && count_enqueued < sink->nbufs) { - paused = false; - } else if (count_enqueued > 0) { - paused = false; - } + if (link_needs_buffer_by_sinks(link)) { + paused = false; } capture_list->dev->paused = paused; + } +} + +static bool links_enqueue_capture_buffers(buffer_list_t *capture_list, int *timeout_next_ms) +{ + buffer_t *capture_buf = NULL; + + // skip if all enqueued + capture_buf = buffer_list_find_slot(capture_list); + if (capture_buf == NULL) + return false; + + uint64_t now_us = get_monotonic_time_us(NULL, NULL); + + // skip if trying to enqueue to fast + if (capture_list->fmt.interval_us > 0 && now_us - capture_list->last_enqueued_us < capture_list->fmt.interval_us) { + *timeout_next_ms = MIN(*timeout_next_ms, (capture_list->last_enqueued_us + capture_list->fmt.interval_us - now_us) / 1000); + + LOG_DEBUG(capture_list, "skipping dequeue: %.1f / %.1f. enqueued=%d", + (now_us - capture_list->last_enqueued_us) / 1000.0f, + capture_list->fmt.interval_us / 1000.0f, + buffer_list_count_enqueued(capture_list)); + return false; + } + + if (capture_list->fmt.interval_us > 0) { + LOG_DEBUG(capture_list, "since last: %.1f / %.1f. enqueued=%d", + (now_us - capture_list->last_enqueued_us) / 1000.0f, + capture_list->fmt.interval_us / 1000.0f, + buffer_list_count_enqueued(capture_list)); + } + + // enqueue new capture buffer + buffer_consumed(capture_buf, "enqueued"); + + if (capture_list->fmt.interval_us == 0) + return true; + + return false; +} + +static void links_process_capture_buffers(link_t *all_links, int *timeout_next_ms) +{ + for (int i = 0; all_links[i].capture_list; i++) { + link_t *link = &all_links[i]; + buffer_list_t *capture_list = link->capture_list; + + if (capture_list->dev->paused) + continue; + + while (links_enqueue_capture_buffers(capture_list, timeout_next_ms)) { + } + } +} + +static int links_build_fds(link_t *all_links, link_pool_t *link_pool) +{ + int n = 0; + + for (int i = 0; all_links[i].capture_list; i++) { + link_t *link = &all_links[i]; + buffer_list_t *capture_list = link->capture_list; + + if (n >= N_FDS) { + return -EINVAL; + } int count_enqueued = buffer_list_count_enqueued(capture_list); bool can_dequeue = count_enqueued > 0; -#ifndef QUEUE_ON_CAPTURE - if (now_us - capture_list->last_dequeued_us < capture_list->fmt.interval_us) { - can_dequeue = false; - link_pool->max_timeout_ms = MIN(link_pool->max_timeout_ms, (capture_list->last_dequeued_us + capture_list->fmt.interval_us - now_us) / 1000); - } -#endif - if (buffer_list_pollfd(capture_list, &link_pool->fds[n], can_dequeue) == 0) { - link_pool->buf_lists[n] = capture_list; + link_pool->capture_lists[n] = capture_list; link_pool->links[n] = link; n++; } + + for (int j = 0; j < link->n_output_lists; j++) { + buffer_list_t *output_list = link->output_lists[j]; + + if (buffer_list_count_enqueued(output_list) == 0) { + continue; + } + + if (n >= N_FDS) { + return -EINVAL; + } + + // Can something be dequeued? + if (buffer_list_pollfd(output_list, &link_pool->fds[n], true) == 0) { + link_pool->output_lists[n] = output_list; + link_pool->links[n] = NULL; + n++; + } + } } return n; @@ -116,24 +179,21 @@ static int links_build_fds(link_t *all_links, link_pool_t *link_pool) static bool links_output_list_can_enqueue(buffer_list_t *output_list) { - int current = buffer_list_count_enqueued(output_list); + if (output_list->dev->paused) + return false; + + int output_count = buffer_list_count_enqueued(output_list); if (output_list->do_capture) { perror("should not happen"); } - int capture_max = 0; - - for (int i = 0; i < output_list->dev->n_capture_list; i++) { - int capture_count = buffer_list_count_enqueued(output_list->dev->capture_lists[i]); - if (capture_max < capture_count) - capture_max = capture_count; - } + int capture_count = device_capture_enqueued(output_list->dev, NULL); // only enqueue on output, if there are already captures (and there's more of them) - if (capture_max <= current) { + if (capture_count <= output_count) { LOG_DEBUG(output_list, "Skipping enqueue of output (output_enqueued=%d, capture_enqueued=%d)", - current, capture_max); + output_count, capture_count); return false; } @@ -168,9 +228,6 @@ static int links_enqueue_from_capture_list(buffer_list_t *capture_list, link_t * bool dropped = false; for (int j = 0; j < link->n_output_lists; j++) { - if (link->output_lists[j]->dev->paused) { - continue; - } if (links_output_list_can_enqueue(link->output_lists[j])) { buffer_list_enqueue(link->output_lists[j], buf); } else { @@ -227,22 +284,27 @@ static int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_m { link_pool_t pool = { .fds = {{0}}, - .max_timeout_ms = timeout_now_ms + .links = {0}, + .capture_lists = {0}, + .output_lists = {0} }; + links_process_paused(all_links); + links_process_capture_buffers(all_links, timeout_next_ms); + int n = links_build_fds(all_links, &pool); print_pollfds(pool.fds, n); int ret = poll(pool.fds, n, timeout_now_ms); print_pollfds(pool.fds, n); - uint64_t now_us __attribute__((unused)) = get_monotonic_time_us(NULL, NULL); - if (ret < 0 && errno != EINTR) { return errno; } for (int i = 0; i < n; i++) { - buffer_list_t *buf_list = pool.buf_lists[i]; + buffer_list_t *capture_list = pool.capture_lists[i]; + buffer_list_t *output_list = pool.output_lists[i]; + buffer_list_t *buf_list = capture_list ? capture_list : output_list; link_t *link = pool.links[i]; LOG_DEBUG(buf_list, "pool event=%08x revent=%s%s%s%s%s%08x streaming=%d enqueued=%d/%d paused=%d", @@ -259,14 +321,14 @@ static int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_m buf_list->dev->paused); if (pool.fds[i].revents & POLLIN) { - if (links_enqueue_from_capture_list(buf_list, link) < 0) { + if (links_enqueue_from_capture_list(capture_list, link) < 0) { return -1; } } // Dequeue buffers that were processed if (pool.fds[i].revents & POLLOUT) { - if (links_dequeue_from_output_list(buf_list) < 0) { + if (links_dequeue_from_output_list(output_list) < 0) { return -1; } } @@ -280,43 +342,6 @@ static int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_m LOG_INFO(buf_list, "Got an error"); return -1; } - - if (!buf_list->dev->paused && buf_list->do_capture && buf_list->do_mmap) { - buffer_t *buf; - -#ifdef QUEUE_ON_CAPTURE - if (buf_list->fmt.interval_us > 0 && now_us - buf_list->last_enqueued_us < buf_list->fmt.interval_us) { - *timeout_next_ms = MIN(*timeout_next_ms, (buf_list->last_enqueued_us + buf_list->fmt.interval_us - now_us) / 1000); - - LOG_DEBUG(buf_list, "skipping dequeue: %.1f / %.1f. enqueued=%d", - (now_us - buf_list->last_enqueued_us) / 1000.0f, - buf_list->fmt.interval_us / 1000.0f, - buffer_list_count_enqueued(buf_list)); - continue; - } else if (buf_list->fmt.interval_us > 0) { - LOG_DEBUG(buf_list, "since last: %.1f / %.1f. enqueued=%d", - (now_us - buf_list->last_enqueued_us) / 1000.0f, - buf_list->fmt.interval_us / 1000.0f, - buffer_list_count_enqueued(buf_list)); - } -#else - // feed capture queue (two buffers) - int count_enqueued = buffer_list_count_enqueued(buf_list); - if (count_enqueued > 1) - continue; -#endif - -#ifdef LIMIT_CAPTURE_BUFFERS - // Do not enqueue more buffers than enqueued by output - if (buf_list->dev->output_list && buffer_list_count_enqueued(buf_list) >= buffer_list_count_enqueued(buf_list->dev->output_list)) { - continue; - } -#endif - - if ((buf = buffer_list_find_slot(buf_list)) != NULL) { - buffer_consumed(buf, "enqueued"); - } - } } return 0; }