links: introduce link_pool_t
This commit is contained in:
parent
7c375f70a6
commit
8b3ed0b599
@ -10,7 +10,32 @@
|
||||
#define QUEUE_ON_CAPTURE // seems to provide better latency
|
||||
// #define LIMIT_CAPTURE_BUFFERS
|
||||
|
||||
int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_list_t **buf_lists, int max_n, int *max_timeout_ms)
|
||||
typedef struct link_pool_s
|
||||
{
|
||||
struct pollfd fds[N_FDS];
|
||||
link_t *links[N_FDS];
|
||||
buffer_list_t *buf_lists[N_FDS];
|
||||
int max_timeout_ms;
|
||||
} link_pool_t;
|
||||
|
||||
static bool link_needs_buffer_by_callbacks(link_t *link)
|
||||
{
|
||||
bool needs = false;
|
||||
|
||||
for (int j = 0; j < link->n_callbacks; j++) {
|
||||
if (link->callbacks[j].check_streaming && link->callbacks[j].check_streaming()) {
|
||||
needs = true;
|
||||
}
|
||||
|
||||
if (link->callbacks[j].buf_lock && buffer_lock_needs_buffer(link->callbacks[j].buf_lock)) {
|
||||
needs = true;
|
||||
}
|
||||
}
|
||||
|
||||
return needs;
|
||||
}
|
||||
|
||||
static int links_build_fds(link_t *all_links, link_pool_t *link_pool)
|
||||
{
|
||||
int n = 0, nlinks = 0;
|
||||
|
||||
@ -25,7 +50,7 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis
|
||||
link_t *link = &all_links[i];
|
||||
buffer_list_t *capture_list = link->capture_list;
|
||||
|
||||
if (n >= max_n) {
|
||||
if (n >= N_FDS) {
|
||||
return -EINVAL;
|
||||
}
|
||||
if (!capture_list->streaming) {
|
||||
@ -34,20 +59,14 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis
|
||||
|
||||
bool paused = true;
|
||||
|
||||
for (int j = 0; j < link->n_callbacks; j++) {
|
||||
if (link->callbacks[j].check_streaming && link->callbacks[j].check_streaming()) {
|
||||
paused = false;
|
||||
}
|
||||
|
||||
if (link->callbacks[j].buf_lock && buffer_lock_needs_buffer(link->callbacks[j].buf_lock)) {
|
||||
paused = false;
|
||||
}
|
||||
if (link_needs_buffer_by_callbacks(link)) {
|
||||
paused = false;
|
||||
}
|
||||
|
||||
for (int j = 0; link->output_lists[j]; j++) {
|
||||
buffer_list_t *sink = link->output_lists[j];
|
||||
|
||||
if (n >= max_n) {
|
||||
if (n >= N_FDS) {
|
||||
return -EINVAL;
|
||||
}
|
||||
if (!sink->streaming) {
|
||||
@ -55,9 +74,9 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis
|
||||
}
|
||||
|
||||
// Can something be dequeued?
|
||||
if (buffer_list_pollfd(sink, &fds[n], true) == 0) {
|
||||
buf_lists[n] = sink;
|
||||
links[n] = NULL;
|
||||
if (buffer_list_pollfd(sink, &link_pool->fds[n], true) == 0) {
|
||||
link_pool->buf_lists[n] = sink;
|
||||
link_pool->links[n] = NULL;
|
||||
n++;
|
||||
}
|
||||
|
||||
@ -78,13 +97,13 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis
|
||||
#ifndef QUEUE_ON_CAPTURE
|
||||
if (now_us - capture_list->last_dequeued_us < capture_list->fmt.interval_us) {
|
||||
can_dequeue = false;
|
||||
*max_timeout_ms = MIN(*max_timeout_ms, (capture_list->last_dequeued_us + capture_list->fmt.interval_us - now_us) / 1000);
|
||||
link_pool->max_timeout_ms = MIN(link_pool->max_timeout_ms, (capture_list->last_dequeued_us + capture_list->fmt.interval_us - now_us) / 1000);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (buffer_list_pollfd(capture_list, &fds[n], can_dequeue) == 0) {
|
||||
buf_lists[n] = capture_list;
|
||||
links[n] = link;
|
||||
if (buffer_list_pollfd(capture_list, &link_pool->fds[n], can_dequeue) == 0) {
|
||||
link_pool->buf_lists[n] = capture_list;
|
||||
link_pool->links[n] = link;
|
||||
n++;
|
||||
}
|
||||
}
|
||||
@ -188,14 +207,15 @@ static void print_pollfds(struct pollfd *fds, int n)
|
||||
|
||||
static int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_ms)
|
||||
{
|
||||
struct pollfd fds[N_FDS] = {0};
|
||||
link_t *links[N_FDS];
|
||||
buffer_list_t *buf_lists[N_FDS];
|
||||
link_pool_t pool = {
|
||||
.fds = {{0}},
|
||||
.max_timeout_ms = timeout_now_ms
|
||||
};
|
||||
|
||||
int n = _build_fds(all_links, fds, links, buf_lists, N_FDS, &timeout_now_ms);
|
||||
print_pollfds(fds, n);
|
||||
int ret = poll(fds, n, timeout_now_ms);
|
||||
print_pollfds(fds, n);
|
||||
int n = links_build_fds(all_links, &pool);
|
||||
print_pollfds(pool.fds, n);
|
||||
int ret = poll(pool.fds, n, timeout_now_ms);
|
||||
print_pollfds(pool.fds, n);
|
||||
|
||||
uint64_t now_us __attribute__((unused)) = get_monotonic_time_us(NULL, NULL);
|
||||
|
||||
@ -204,40 +224,40 @@ static int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_m
|
||||
}
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
buffer_list_t *buf_list = buf_lists[i];
|
||||
link_t *link = links[i];
|
||||
buffer_list_t *buf_list = pool.buf_lists[i];
|
||||
link_t *link = pool.links[i];
|
||||
|
||||
LOG_DEBUG(buf_list, "pool event=%s%s%s%s%s%08x streaming=%d enqueued=%d/%d paused=%d",
|
||||
!fds[i].revents ? "NONE/" : "",
|
||||
fds[i].revents & POLLIN ? "IN/" : "",
|
||||
fds[i].revents & POLLOUT ? "OUT/" : "",
|
||||
fds[i].revents & POLLHUP ? "HUP/" : "",
|
||||
fds[i].revents & POLLERR ? "ERR/" : "",
|
||||
fds[i].revents,
|
||||
!pool.fds[i].revents ? "NONE/" : "",
|
||||
pool.fds[i].revents & POLLIN ? "IN/" : "",
|
||||
pool.fds[i].revents & POLLOUT ? "OUT/" : "",
|
||||
pool.fds[i].revents & POLLHUP ? "HUP/" : "",
|
||||
pool.fds[i].revents & POLLERR ? "ERR/" : "",
|
||||
pool.fds[i].revents,
|
||||
buf_list->streaming,
|
||||
buffer_list_count_enqueued(buf_list),
|
||||
buf_list->nbufs,
|
||||
buf_list->dev->paused);
|
||||
|
||||
if (fds[i].revents & POLLIN) {
|
||||
if (pool.fds[i].revents & POLLIN) {
|
||||
if (links_enqueue_from_capture_list(buf_list, link) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// Dequeue buffers that were processed
|
||||
if (fds[i].revents & POLLOUT) {
|
||||
if (pool.fds[i].revents & POLLOUT) {
|
||||
if (links_dequeue_from_sink(buf_list) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (fds[i].revents & POLLHUP) {
|
||||
if (pool.fds[i].revents & POLLHUP) {
|
||||
LOG_INFO(buf_list, "Device disconnected.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (fds[i].revents & POLLERR) {
|
||||
if (pool.fds[i].revents & POLLERR) {
|
||||
LOG_INFO(buf_list, "Got an error");
|
||||
return -1;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user