diff --git a/hw/links.c b/hw/links.c index d0f6bc6..39dda73 100644 --- a/hw/links.c +++ b/hw/links.c @@ -47,7 +47,7 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis if (count_enqueued > 0) { fds[n] = (struct pollfd){ sink->device->fd, POLLOUT }; buf_lists[n] = sink; - links[n] = link; + links[n] = NULL; n++; } @@ -74,6 +74,47 @@ int _build_fds(link_t *all_links, struct pollfd *fds, link_t **links, buffer_lis return n; } +int links_enqueue_from_source(buffer_list_t *buf_list, link_t *link) +{ + if (!link) { + E_LOG_ERROR(buf_list, "Missing link for source"); + } + + buffer_t *buf = buffer_list_dequeue(buf_list); + if (!buf) { + E_LOG_ERROR(buf_list, "No buffer dequeued from source?"); + } + + for (int j = 0; link->sinks[j]; j++) { + if (link->sinks[j]->device->paused) { + continue; + } + buffer_list_enqueue(link->sinks[j], buf); + } + + if (link->callbacks.on_buffer) { + link->callbacks.on_buffer(buf); + } + + buffer_consumed(buf); + return 0; + +error: + return -1; +} + +int links_dequeue_from_sink(buffer_list_t *buf_list) { + buffer_t *buf = buffer_list_dequeue(buf_list); + if (!buf) { + E_LOG_ERROR(buf, "No buffer dequeued from sink?"); + } + + return 0; + +error: + return -1; +} + int links_step(link_t *all_links, int timeout) { struct pollfd fds[N_FDS] = {0}; @@ -92,31 +133,22 @@ int links_step(link_t *all_links, int timeout) buffer_list_t *buf_list = buf_lists[i]; link_t *link = links[i]; - E_LOG_DEBUG(buf_list, "pool i=%d revents=%08x streaming=%d enqueued=%d/%d paused=%d", i, fds[i].revents, buf_list->streaming, - buffer_list_count_enqueued(buf_list), buf_list->nbufs, link->source->device->paused); + E_LOG_DEBUG(buf_list, "pool i=%d revents=%08x streaming=%d enqueued=%d/%d", i, fds[i].revents, buf_list->streaming, + buffer_list_count_enqueued(buf_list), buf_list->nbufs); if (fds[i].revents & POLLIN) { E_LOG_DEBUG(buf_list, "POLLIN"); - if (buf = buffer_list_dequeue(buf_list)) { - for (int j = 0; link->sinks[j]; j++) { - if (link->sinks[j]->device->paused) { - continue; - } - buffer_list_enqueue(link->sinks[j], buf); - } - - if (link->callbacks.on_buffer) { - link->callbacks.on_buffer(buf); - } - - buffer_consumed(buf); + if (links_enqueue_from_source(buf_list, link) < 0) { + return -1; } } // Dequeue buffers that were processed if (fds[i].revents & POLLOUT) { E_LOG_DEBUG(buf_list, "POLLOUT"); - buffer_list_dequeue(buf_list); + if (links_dequeue_from_sink(buf_list) < 0) { + return -1; + } } if (fds[i].revents & POLLERR) { @@ -125,7 +157,7 @@ int links_step(link_t *all_links, int timeout) } if (fds[i].revents & ~(POLLIN|POLLOUT|POLLERR)) { - E_LOG_DEBUG(buf_list, "POLL%08x", fds[i].revents); + E_LOG_VERBOSE(buf_list, "POLL%08x", fds[i].revents); } } return 0;