diff --git a/device/links.c b/device/links.c index 66fd0e1..d82ff80 100644 --- a/device/links.c +++ b/device/links.c @@ -10,7 +10,32 @@ #define QUEUE_ON_CAPTURE // seems to provide better latency // #define LIMIT_CAPTURE_BUFFERS -int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_list_t **buf_lists, int max_n, int *max_timeout_ms) +typedef struct link_pool_s +{ + struct pollfd fds[N_FDS]; + link_t *links[N_FDS]; + buffer_list_t *buf_lists[N_FDS]; + int max_timeout_ms; +} link_pool_t; + +static bool link_needs_buffer_by_callbacks(link_t *link) +{ + bool needs = false; + + for (int j = 0; j < link->n_callbacks; j++) { + if (link->callbacks[j].check_streaming && link->callbacks[j].check_streaming()) { + needs = true; + } + + if (link->callbacks[j].buf_lock && buffer_lock_needs_buffer(link->callbacks[j].buf_lock)) { + needs = true; + } + } + + return needs; +} + +static int links_build_fds(link_t *all_links, link_pool_t *link_pool) { int n = 0, nlinks = 0; @@ -25,7 +50,7 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis link_t *link = &all_links[i]; buffer_list_t *capture_list = link->capture_list; - if (n >= max_n) { + if (n >= N_FDS) { return -EINVAL; } if (!capture_list->streaming) { @@ -34,20 +59,14 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis bool paused = true; - for (int j = 0; j < link->n_callbacks; j++) { - if (link->callbacks[j].check_streaming && link->callbacks[j].check_streaming()) { - paused = false; - } - - if (link->callbacks[j].buf_lock && buffer_lock_needs_buffer(link->callbacks[j].buf_lock)) { - paused = false; - } + if (link_needs_buffer_by_callbacks(link)) { + paused = false; } for (int j = 0; link->output_lists[j]; j++) { buffer_list_t *sink = link->output_lists[j]; - if (n >= max_n) { + if (n >= N_FDS) { return -EINVAL; } if (!sink->streaming) { @@ -55,9 +74,9 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis } // Can something be dequeued? - if (buffer_list_pollfd(sink, &fds[n], true) == 0) { - buf_lists[n] = sink; - links[n] = NULL; + if (buffer_list_pollfd(sink, &link_pool->fds[n], true) == 0) { + link_pool->buf_lists[n] = sink; + link_pool->links[n] = NULL; n++; } @@ -78,13 +97,13 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis #ifndef QUEUE_ON_CAPTURE if (now_us - capture_list->last_dequeued_us < capture_list->fmt.interval_us) { can_dequeue = false; - *max_timeout_ms = MIN(*max_timeout_ms, (capture_list->last_dequeued_us + capture_list->fmt.interval_us - now_us) / 1000); + link_pool->max_timeout_ms = MIN(link_pool->max_timeout_ms, (capture_list->last_dequeued_us + capture_list->fmt.interval_us - now_us) / 1000); } #endif - if (buffer_list_pollfd(capture_list, &fds[n], can_dequeue) == 0) { - buf_lists[n] = capture_list; - links[n] = link; + if (buffer_list_pollfd(capture_list, &link_pool->fds[n], can_dequeue) == 0) { + link_pool->buf_lists[n] = capture_list; + link_pool->links[n] = link; n++; } } @@ -188,14 +207,15 @@ static void print_pollfds(struct pollfd *fds, int n) static int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_ms) { - struct pollfd fds[N_FDS] = {0}; - link_t *links[N_FDS]; - buffer_list_t *buf_lists[N_FDS]; + link_pool_t pool = { + .fds = {{0}}, + .max_timeout_ms = timeout_now_ms + }; - int n = _build_fds(all_links, fds, links, buf_lists, N_FDS, &timeout_now_ms); - print_pollfds(fds, n); - int ret = poll(fds, n, timeout_now_ms); - print_pollfds(fds, n); + int n = links_build_fds(all_links, &pool); + print_pollfds(pool.fds, n); + int ret = poll(pool.fds, n, timeout_now_ms); + print_pollfds(pool.fds, n); uint64_t now_us __attribute__((unused)) = get_monotonic_time_us(NULL, NULL); @@ -204,40 +224,40 @@ static int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_m } for (int i = 0; i < n; i++) { - buffer_list_t *buf_list = buf_lists[i]; - link_t *link = links[i]; + buffer_list_t *buf_list = pool.buf_lists[i]; + link_t *link = pool.links[i]; LOG_DEBUG(buf_list, "pool event=%s%s%s%s%s%08x streaming=%d enqueued=%d/%d paused=%d", - !fds[i].revents ? "NONE/" : "", - fds[i].revents & POLLIN ? "IN/" : "", - fds[i].revents & POLLOUT ? "OUT/" : "", - fds[i].revents & POLLHUP ? "HUP/" : "", - fds[i].revents & POLLERR ? "ERR/" : "", - fds[i].revents, + !pool.fds[i].revents ? "NONE/" : "", + pool.fds[i].revents & POLLIN ? "IN/" : "", + pool.fds[i].revents & POLLOUT ? "OUT/" : "", + pool.fds[i].revents & POLLHUP ? "HUP/" : "", + pool.fds[i].revents & POLLERR ? "ERR/" : "", + pool.fds[i].revents, buf_list->streaming, buffer_list_count_enqueued(buf_list), buf_list->nbufs, buf_list->dev->paused); - if (fds[i].revents & POLLIN) { + if (pool.fds[i].revents & POLLIN) { if (links_enqueue_from_capture_list(buf_list, link) < 0) { return -1; } } // Dequeue buffers that were processed - if (fds[i].revents & POLLOUT) { + if (pool.fds[i].revents & POLLOUT) { if (links_dequeue_from_sink(buf_list) < 0) { return -1; } } - if (fds[i].revents & POLLHUP) { + if (pool.fds[i].revents & POLLHUP) { LOG_INFO(buf_list, "Device disconnected."); return -1; } - if (fds[i].revents & POLLERR) { + if (pool.fds[i].revents & POLLERR) { LOG_INFO(buf_list, "Got an error"); return -1; }