Move device/hw/links.h
to device/links.h
This commit is contained in:
@ -169,7 +169,7 @@ error:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int buffer_list_request(buffer_list_t *buf_list, int nbufs)
|
||||
int buffer_list_set_buffers(buffer_list_t *buf_list, int nbufs)
|
||||
{
|
||||
struct v4l2_requestbuffers v4l2_req = {0};
|
||||
v4l2_req.count = nbufs;
|
||||
@ -206,7 +206,7 @@ error:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int buffer_list_stream(buffer_list_t *buf_list, bool do_on)
|
||||
int buffer_list_set_stream(buffer_list_t *buf_list, bool do_on)
|
||||
{
|
||||
if (!buf_list) {
|
||||
return -1;
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
typedef struct buffer_s buffer_t;
|
||||
typedef struct device_s device_t;
|
||||
struct pollfd;
|
||||
|
||||
typedef struct buffer_list_s {
|
||||
char *name;
|
||||
@ -33,14 +34,13 @@ typedef struct buffer_list_s {
|
||||
buffer_list_t *buffer_list_open(const char *name, struct device_s *dev, unsigned type, bool do_mmap);
|
||||
void buffer_list_close(buffer_list_t *buf_list);
|
||||
|
||||
int buffer_list_set_stream(buffer_list_t *buf_list, bool do_on);
|
||||
int buffer_list_set_format(buffer_list_t *buffer_list, unsigned width, unsigned height, unsigned format, unsigned bytesperline);
|
||||
int buffer_list_request(buffer_list_t *buf_list, int nbufs);
|
||||
|
||||
int buffer_list_stream(buffer_list_t *buf_list, bool do_on);
|
||||
int buffer_list_set_buffers(buffer_list_t *buf_list, int nbufs);
|
||||
|
||||
int buffer_list_pollfd(buffer_list_t *buf_list, struct pollfd *pollfd, bool can_dequeue);
|
||||
buffer_t *buffer_list_find_slot(buffer_list_t *buf_list);
|
||||
buffer_t *buffer_list_dequeue(buffer_list_t *buf_list);
|
||||
int buffer_list_count_enqueued(buffer_list_t *buf_list);
|
||||
|
||||
int buffer_list_enqueue(buffer_list_t *buf_list, buffer_t *dma_buf);
|
||||
int buffer_list_refresh_states(buffer_list_t *buf_list);
|
||||
|
@ -210,3 +210,19 @@ buffer_t *buffer_list_dequeue(buffer_list_t *buf_list)
|
||||
error:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int buffer_list_pollfd(buffer_list_t *buf_list, struct pollfd *pollfd, bool can_dequeue)
|
||||
{
|
||||
int count_enqueued = buffer_list_count_enqueued(buf_list);
|
||||
|
||||
// Can something be dequeued?
|
||||
pollfd->fd = buf_list->device->fd;
|
||||
pollfd->events = POLLHUP;
|
||||
if (count_enqueued > 0 && can_dequeue)
|
||||
if (buf_list->do_capture)
|
||||
pollfd->events |= POLLIN;
|
||||
else
|
||||
pollfd->events |= POLLOUT;
|
||||
pollfd->revents = 0;
|
||||
return 0;
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ int device_open_buffer_list(device_t *dev, bool do_capture, unsigned width, unsi
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (buffer_list_request(*buf_list, nbufs) < 0) {
|
||||
if (buffer_list_set_buffers(*buf_list, nbufs) < 0) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
@ -160,13 +160,13 @@ int device_set_stream(device_t *dev, bool do_on)
|
||||
xioctl(dev_name(dev), dev->fd, do_on ? VIDIOC_SUBSCRIBE_EVENT : VIDIOC_UNSUBSCRIBE_EVENT, &sub);
|
||||
|
||||
if (dev->capture_list) {
|
||||
if (buffer_list_stream(dev->capture_list, do_on) < 0) {
|
||||
if (buffer_list_set_stream(dev->capture_list, do_on) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (dev->output_list) {
|
||||
if (buffer_list_stream(dev->output_list, do_on) < 0) {
|
||||
if (buffer_list_set_stream(dev->output_list, do_on) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1,286 +0,0 @@
|
||||
#include "device/hw/links.h"
|
||||
#include "device/hw/device.h"
|
||||
#include "device/hw/buffer.h"
|
||||
#include "device/hw/buffer_list.h"
|
||||
#include "device/hw/v4l2.h"
|
||||
|
||||
#define N_FDS 50
|
||||
#define QUEUE_ON_CAPTURE // seems to provide better latency
|
||||
|
||||
int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_list_t **buf_lists, int max_n, int *max_timeout_ms)
|
||||
{
|
||||
int n = 0, nlinks = 0;
|
||||
uint64_t now_us = get_monotonic_time_us(NULL, NULL);
|
||||
|
||||
for (nlinks = 0; all_links[nlinks].source; nlinks++);
|
||||
|
||||
// This traverses in reverse order as it requires to first fix outputs
|
||||
// and go back into captures
|
||||
|
||||
for (int i = nlinks; i-- > 0; ) {
|
||||
link_t *link = &all_links[i];
|
||||
buffer_list_t *source = link->source;
|
||||
|
||||
if (n >= max_n) {
|
||||
return -EINVAL;
|
||||
}
|
||||
if (!source->streaming) {
|
||||
continue;
|
||||
}
|
||||
|
||||
bool paused = true;
|
||||
|
||||
if (link->callbacks.check_streaming && link->callbacks.check_streaming()) {
|
||||
paused = false;
|
||||
}
|
||||
|
||||
for (int j = 0; link->sinks[j]; j++) {
|
||||
buffer_list_t *sink = link->sinks[j];
|
||||
|
||||
if (n >= max_n) {
|
||||
return -EINVAL;
|
||||
}
|
||||
if (!sink->streaming) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int count_enqueued = buffer_list_count_enqueued(sink);
|
||||
|
||||
// Can something be dequeued?
|
||||
fds[n].fd = sink->device->fd;
|
||||
fds[n].events = POLLHUP;
|
||||
if (count_enqueued > 0)
|
||||
fds[n].events |= POLLOUT;
|
||||
fds[n].revents = 0;
|
||||
buf_lists[n] = sink;
|
||||
links[n] = NULL;
|
||||
n++;
|
||||
|
||||
// Can this chain pauses
|
||||
if (!sink->device->paused && count_enqueued < sink->nbufs) {
|
||||
paused = false;
|
||||
}
|
||||
}
|
||||
|
||||
source->device->paused = paused;
|
||||
|
||||
if (source->device->output_device) {
|
||||
source->device->output_device->paused = paused;
|
||||
}
|
||||
|
||||
int count_enqueued = buffer_list_count_enqueued(source);
|
||||
bool can_dequeue = count_enqueued > 0;
|
||||
|
||||
#ifndef QUEUE_ON_CAPTURE
|
||||
if (now_us - source->last_dequeued_us < source->fmt_interval_us) {
|
||||
can_dequeue = false;
|
||||
*max_timeout_ms = MIN(*max_timeout_ms, (source->last_dequeued_us + source->fmt_interval_us - now_us) / 1000);
|
||||
}
|
||||
#endif
|
||||
|
||||
fds[n].fd = source->device->fd;
|
||||
fds[n].events = POLLHUP;
|
||||
if (can_dequeue)
|
||||
fds[n].events |= POLLIN;
|
||||
fds[n].revents = 0;
|
||||
buf_lists[n] = source;
|
||||
links[n] = link;
|
||||
n++;
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int links_enqueue_from_source(buffer_list_t *buf_list, link_t *link)
|
||||
{
|
||||
if (!link) {
|
||||
E_LOG_ERROR(buf_list, "Missing link for source");
|
||||
}
|
||||
|
||||
buffer_t *buf = buffer_list_dequeue(buf_list);
|
||||
if (!buf) {
|
||||
E_LOG_ERROR(buf_list, "No buffer dequeued from source?");
|
||||
}
|
||||
|
||||
if (link->callbacks.validate_buffer && !link->callbacks.validate_buffer(link, buf)) {
|
||||
E_LOG_DEBUG(buf_list, "Buffer rejected by validation");
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (int j = 0; link->sinks[j]; j++) {
|
||||
if (link->sinks[j]->device->paused) {
|
||||
continue;
|
||||
}
|
||||
buffer_list_enqueue(link->sinks[j], buf);
|
||||
}
|
||||
|
||||
if (link->callbacks.on_buffer) {
|
||||
link->callbacks.on_buffer(buf);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
error:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int links_dequeue_from_sink(buffer_list_t *buf_list) {
|
||||
buffer_t *buf = buffer_list_dequeue(buf_list);
|
||||
if (!buf) {
|
||||
E_LOG_ERROR(buf, "No buffer dequeued from sink?");
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
error:
|
||||
return -1;
|
||||
}
|
||||
|
||||
void print_pollfds(struct pollfd *fds, int n)
|
||||
{
|
||||
if (!getenv("DEBUG_FDS")) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
printf("poll(i=%i, fd=%d, events=%08x, revents=%08x)\n", i, fds[i].fd, fds[i].events, fds[i].revents);
|
||||
}
|
||||
printf("pollfds = %d\n", n);
|
||||
}
|
||||
|
||||
int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_ms)
|
||||
{
|
||||
struct pollfd fds[N_FDS] = {0};
|
||||
link_t *links[N_FDS];
|
||||
buffer_list_t *buf_lists[N_FDS];
|
||||
buffer_t *buf;
|
||||
|
||||
int n = _build_fds(all_links, fds, links, buf_lists, N_FDS, &timeout_now_ms);
|
||||
print_pollfds(fds, n);
|
||||
int ret = poll(fds, n, timeout_now_ms);
|
||||
print_pollfds(fds, n);
|
||||
|
||||
uint64_t now_us = get_monotonic_time_us(NULL, NULL);
|
||||
|
||||
if (ret < 0 && errno != EINTR) {
|
||||
return errno;
|
||||
}
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
buffer_list_t *buf_list = buf_lists[i];
|
||||
link_t *link = links[i];
|
||||
|
||||
E_LOG_DEBUG(buf_list, "pool event=%s%s%s%s%s%08x streaming=%d enqueued=%d/%d paused=%d",
|
||||
!fds[i].revents ? "NONE/" : "",
|
||||
fds[i].revents & POLLIN ? "IN/" : "",
|
||||
fds[i].revents & POLLOUT ? "OUT/" : "",
|
||||
fds[i].revents & POLLHUP ? "HUP/" : "",
|
||||
fds[i].revents & POLLERR ? "ERR/" : "",
|
||||
fds[i].revents,
|
||||
buf_list->streaming,
|
||||
buffer_list_count_enqueued(buf_list),
|
||||
buf_list->nbufs,
|
||||
buf_list->device->paused);
|
||||
|
||||
if (fds[i].revents & POLLIN) {
|
||||
if (links_enqueue_from_source(buf_list, link) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// Dequeue buffers that were processed
|
||||
if (fds[i].revents & POLLOUT) {
|
||||
if (links_dequeue_from_sink(buf_list) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (fds[i].revents & POLLHUP) {
|
||||
E_LOG_INFO(buf_list, "Device disconnected.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (fds[i].revents & POLLERR) {
|
||||
E_LOG_INFO(buf_list, "Got an error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!buf_list->device->paused && buf_list->do_capture && buf_list->do_mmap) {
|
||||
buffer_t *buf;
|
||||
|
||||
#ifdef QUEUE_ON_CAPTURE
|
||||
if (buf_list->fmt_interval_us > 0 && now_us - buf_list->last_enqueued_us < buf_list->fmt_interval_us) {
|
||||
*timeout_next_ms = MIN(*timeout_next_ms, (buf_list->last_enqueued_us + buf_list->fmt_interval_us - now_us) / 1000);
|
||||
|
||||
E_LOG_DEBUG(buf_list, "skipping dequeue: %.1f / %.1f. enqueued=%d",
|
||||
(now_us - buf_list->last_enqueued_us) / 1000.0f,
|
||||
buf_list->fmt_interval_us / 1000.0f,
|
||||
buffer_list_count_enqueued(buf_list));
|
||||
continue;
|
||||
} else if (buf_list->fmt_interval_us > 0) {
|
||||
E_LOG_DEBUG(buf_list, "since last: %.1f / %.1f. enqueued=%d",
|
||||
(now_us - buf_list->last_enqueued_us) / 1000.0f,
|
||||
buf_list->fmt_interval_us / 1000.0f,
|
||||
buffer_list_count_enqueued(buf_list));
|
||||
}
|
||||
#else
|
||||
// feed capture queue (two buffers)
|
||||
int count_enqueued = buffer_list_count_enqueued(buf_list);
|
||||
if (count_enqueued > 1)
|
||||
continue;
|
||||
#endif
|
||||
|
||||
if (buf = buffer_list_find_slot(buf_list)) {
|
||||
buffer_consumed(buf, "enqueued");
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int links_stream(link_t *all_links, bool do_stream)
|
||||
{
|
||||
for (int i = 0; all_links[i].source; i++) {
|
||||
bool streaming = true;
|
||||
link_t *link = &all_links[i];
|
||||
|
||||
if (buffer_list_stream(link->source, streaming) < 0) {
|
||||
E_LOG_ERROR(link->source, "Failed to start streaming");
|
||||
}
|
||||
|
||||
for (int j = 0; link->sinks[j]; j++) {
|
||||
if (buffer_list_stream(link->sinks[j], streaming) < 0) {
|
||||
E_LOG_ERROR(link->sinks[j], "Failed to start streaming");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
error:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int links_loop(link_t *all_links, bool *running)
|
||||
{
|
||||
*running = true;
|
||||
|
||||
if (links_stream(all_links, true) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int timeout_ms = LINKS_LOOP_INTERVAL;
|
||||
|
||||
while(*running) {
|
||||
int timeout_now_ms = timeout_ms;
|
||||
timeout_ms = LINKS_LOOP_INTERVAL;
|
||||
|
||||
if (links_step(all_links, timeout_now_ms, &timeout_ms) < 0) {
|
||||
links_stream(all_links, false);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
links_stream(all_links, false);
|
||||
return 0;
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
#define LINKS_LOOP_INTERVAL 100
|
||||
|
||||
typedef struct buffer_s buffer_t;
|
||||
typedef struct buffer_list_s buffer_list_t;
|
||||
typedef struct link_s link_t;
|
||||
|
||||
typedef void (*link_on_buffer)(buffer_t *buf);
|
||||
typedef bool (*link_check_streaming)();
|
||||
typedef bool (*link_validate_buffer)(struct link_s *link, buffer_t *buf);
|
||||
|
||||
typedef struct link_s {
|
||||
struct buffer_list_s *source; // capture_list
|
||||
struct buffer_list_s *sinks[10];
|
||||
struct {
|
||||
link_on_buffer on_buffer;
|
||||
link_check_streaming check_streaming;
|
||||
link_validate_buffer validate_buffer;
|
||||
} callbacks;
|
||||
} link_t;
|
||||
|
||||
int links_init(link_t *all_links);
|
||||
int links_step(link_t *all_links, int timeout_now_ms, int *timeout_next_ms);
|
||||
int links_loop(link_t *all_links, bool *running);
|
Reference in New Issue
Block a user