links: enqueue and dequeue buffers in sync (output and capture) asynchronously
This commit is contained in:
@ -115,12 +115,40 @@ static bool links_enqueue_capture_buffers(buffer_list_t *capture_list, int *time
|
||||
}
|
||||
|
||||
// enqueue new capture buffer
|
||||
buffer_consumed(capture_buf, "enqueued");
|
||||
buffer_list_t *output_list = capture_list->dev->output_list;
|
||||
|
||||
if (capture_list->fmt.interval_us == 0)
|
||||
// no output, just give back capture_buf
|
||||
if (!output_list) {
|
||||
buffer_consumed(capture_buf, "enqueued");
|
||||
if (capture_list->fmt.interval_us > 0)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
// try to find matching output slot, ignore if not present
|
||||
if (!buffer_list_find_slot(output_list)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool can_enqueue = false;
|
||||
|
||||
// try to look for output, if there's a matching capture to be consumed
|
||||
buffer_t *queued_capture_for_output_buf = buffer_list_pop_from_queue(output_list);
|
||||
if (queued_capture_for_output_buf) {
|
||||
// then push a capture from source into output for this capture
|
||||
if (buffer_list_enqueue(output_list, queued_capture_for_output_buf)) {
|
||||
buffer_consumed(capture_buf, "enqueued");
|
||||
if (capture_list->fmt.interval_us <= 0)
|
||||
can_enqueue = true;
|
||||
} else {
|
||||
queued_capture_for_output_buf->buf_list->stats.dropped++;
|
||||
}
|
||||
|
||||
// release this buffer
|
||||
buffer_consumed(queued_capture_for_output_buf, "from-queue");
|
||||
}
|
||||
|
||||
return can_enqueue;
|
||||
}
|
||||
|
||||
static void links_process_capture_buffers(link_t *all_links, int *timeout_next_ms)
|
||||
@ -161,7 +189,8 @@ static int links_build_fds(link_t *all_links, link_pool_t *link_pool)
|
||||
for (int j = 0; j < link->n_output_lists; j++) {
|
||||
buffer_list_t *output_list = link->output_lists[j];
|
||||
|
||||
if (buffer_list_count_enqueued(output_list) == 0) {
|
||||
int count_output_enqueued = buffer_list_count_enqueued(output_list);
|
||||
if (count_output_enqueued == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -169,8 +198,10 @@ static int links_build_fds(link_t *all_links, link_pool_t *link_pool)
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
int count_capture_enqueued = buffer_list_count_enqueued(output_list->dev->capture_lists[0]);
|
||||
|
||||
// Can something be dequeued?
|
||||
if (buffer_list_pollfd(output_list, &link_pool->fds[n], true) == 0) {
|
||||
if (buffer_list_pollfd(output_list, &link_pool->fds[n], count_output_enqueued > count_capture_enqueued) == 0) {
|
||||
link_pool->output_lists[n] = output_list;
|
||||
link_pool->links[n] = NULL;
|
||||
n++;
|
||||
@ -181,29 +212,6 @@ static int links_build_fds(link_t *all_links, link_pool_t *link_pool)
|
||||
return n;
|
||||
}
|
||||
|
||||
static bool links_output_list_can_enqueue(buffer_list_t *output_list)
|
||||
{
|
||||
if (output_list->dev->paused)
|
||||
return false;
|
||||
|
||||
int output_count = buffer_list_count_enqueued(output_list);
|
||||
|
||||
if (output_list->do_capture) {
|
||||
perror("should not happen");
|
||||
}
|
||||
|
||||
int capture_count = device_capture_enqueued(output_list->dev, NULL);
|
||||
|
||||
// only enqueue on output, if there are already captures (and there's more of them)
|
||||
if (capture_count <= output_count) {
|
||||
LOG_DEBUG(output_list, "Skipping enqueue of output (output_enqueued=%d, capture_enqueued=%d)",
|
||||
output_count, capture_count);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static int links_enqueue_from_capture_list(buffer_list_t *capture_list, link_t *link)
|
||||
{
|
||||
if (!link) {
|
||||
@ -232,9 +240,10 @@ static int links_enqueue_from_capture_list(buffer_list_t *capture_list, link_t *
|
||||
bool dropped = false;
|
||||
|
||||
for (int j = 0; j < link->n_output_lists; j++) {
|
||||
if (links_output_list_can_enqueue(link->output_lists[j])) {
|
||||
buffer_list_enqueue(link->output_lists[j], buf);
|
||||
} else {
|
||||
if (link->output_lists[j]->dev->paused) {
|
||||
continue;
|
||||
}
|
||||
if (!buffer_list_push_to_queue(link->output_lists[j], buf)) {
|
||||
dropped = true;
|
||||
}
|
||||
}
|
||||
@ -390,13 +399,14 @@ static void links_refresh_stats(link_t *all_links, uint64_t *last_refresh_us)
|
||||
buffer_stats_t *now = &capture_list->stats;
|
||||
buffer_stats_t *prev = &capture_list->stats_last;
|
||||
|
||||
printf(" [%8s %2d FPS/%2d D/%3dms/%3dms/%c/O%d:C%d]",
|
||||
printf(" [%8s %2d FPS/%2d D/%3dms/%3dms/%c/Q%d:O%d:C%d]",
|
||||
capture_list->dev->name,
|
||||
(now->frames - prev->frames) / log_options.stats,
|
||||
(now->dropped - prev->dropped) / log_options.stats,
|
||||
capture_list->last_capture_time_us > 0 ? capture_list->last_capture_time_us / 1000 : -1,
|
||||
capture_list->last_in_queue_time_us > 0 ? capture_list->last_in_queue_time_us / 1000 : -1,
|
||||
capture_list->streaming ? (capture_list->dev->paused ? 'P' : 'S') : 'X',
|
||||
capture_list->dev->output_list ? capture_list->dev->output_list->n_queued_bufs : 0,
|
||||
capture_list->dev->output_list ? buffer_list_count_enqueued(capture_list->dev->output_list) : 0,
|
||||
buffer_list_count_enqueued(capture_list)
|
||||
);
|
||||
|
Reference in New Issue
Block a user