blob: 87106a53b1c6740fc2bea394ffbb9682b15efff9 [file] [log] [blame]
/* Copyright (c) 2013 The Regents of the University of California
* Barret Rhoden <brho@cs.berkeley.edu>
* See LICENSE for details.
*
* Atomic pipes. Multi-reader, multi-writer pipes, similar to sys_pipe except
* that they operate on fixed sized chunks of data.
*
* A note on broadcast wakeups. We broadcast in a few places. If we don't,
* then all paths (like error paths) will have to signal. Not a big deal
* either way, but just need to catch all the cases. Other non-obvious
* cases are that read and write methods need to wake other readers and
* writers (in the absence of a broadcast wakeup) */
#include <apipe.h>
#include <ros/ring_buffer.h>
#include <string.h>
#include <stdio.h>
void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
size_t elem_sz)
{
ap->ap_buf = buf;
/* power of two number of elements in the ring. */
ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz);
ap->ap_elem_sz = elem_sz;
ap->ap_rd_off = 0;
ap->ap_wr_off = 0;
ap->ap_nr_readers = 1;
ap->ap_nr_writers = 1;
/* Three CVs, all using the same lock. */
spinlock_init(&ap->ap_lock);
cv_init_with_lock(&ap->ap_priority_reader, &ap->ap_lock);
cv_init_with_lock(&ap->ap_general_readers, &ap->ap_lock);
cv_init_with_lock(&ap->ap_writers, &ap->ap_lock);
ap->ap_has_priority_reader = FALSE;
}
void apipe_open_reader(struct atomic_pipe *ap)
{
spin_lock(&ap->ap_lock);
ap->ap_nr_readers++;
spin_unlock(&ap->ap_lock);
}
void apipe_open_writer(struct atomic_pipe *ap)
{
spin_lock(&ap->ap_lock);
ap->ap_nr_writers++;
spin_unlock(&ap->ap_lock);
}
/* Helper: Wake the appropriate readers. When there's a priority reader, only
* that one wakes up. It's up to the priority reader to wake the other readers,
* by clearing has_prior and calling this again. */
static void __apipe_wake_readers(struct atomic_pipe *ap)
{
if (ap->ap_has_priority_reader)
__cv_signal(&ap->ap_priority_reader);
else
__cv_broadcast(&ap->ap_general_readers);
}
/* When closing, there might be others blocked waiting for us. For example,
* a writer could have blocked on a full pipe, waiting for us to read. Instead
* of reading, the last reader closes. The writer needs to be woken up so it
* can return 0. */
void apipe_close_reader(struct atomic_pipe *ap)
{
spin_lock(&ap->ap_lock);
ap->ap_nr_readers--;
__cv_broadcast(&ap->ap_writers);
spin_unlock(&ap->ap_lock);
}
void apipe_close_writer(struct atomic_pipe *ap)
{
spin_lock(&ap->ap_lock);
ap->ap_nr_writers--;
__apipe_wake_readers(ap);
spin_unlock(&ap->ap_lock);
}
/* read a pipe that is already locked. */
int apipe_read_locked(struct atomic_pipe *ap, void *buf, size_t nr_elem)
{
size_t rd_idx;
int nr_copied = 0;
for (int i = 0; i < nr_elem; i++) {
/* readers that call read_locked directly might have failed to
* check for emptiness, so we'll double check early. */
if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
break;
/* power of 2 elements in the ring buffer, index is the lower n
* bits */
rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz,
ap->ap_elem_sz);
ap->ap_rd_off++;
buf += ap->ap_elem_sz;
nr_copied++;
}
/* We could have multiple writers blocked. Just broadcast for them all.
* Alternatively, we could signal one, and then it's on the writers to
* signal further writers (see the note at the top of this file). */
__cv_broadcast(&ap->ap_writers);
return nr_copied;
}
int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
{
size_t rd_idx;
int nr_copied = 0;
spin_lock(&ap->ap_lock);
/* Need to wait til the priority reader is gone, and the ring isn't
* empty. If we do this as two steps, (either of priority check or
* empty check first), there's a chance the second one will fail, and
* when we sleep and wake up, the first condition could have changed.
* (An alternative would be to block priority readers too, by promoting
* ourselves to a priority reader). */
while (ap->ap_has_priority_reader ||
__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
if (!ap->ap_nr_writers) {
spin_unlock(&ap->ap_lock);
return 0;
}
cv_wait(&ap->ap_general_readers);
cpu_relax();
}
/* This read call wakes up writers */
nr_copied = apipe_read_locked(ap, buf, nr_elem);
/* If the writer didn't broadcast, we'd need to wake other readers
* (imagine a long queue of blocked readers, and a queue filled by one
* massive write). (same with the error case). */
spin_unlock(&ap->ap_lock);
return nr_copied;
}
int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem)
{
size_t wr_idx;
int nr_copied = 0;
spin_lock(&ap->ap_lock);
/* not sure if we want to check for readers first or not */
while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) {
if (!ap->ap_nr_readers) {
spin_unlock(&ap->ap_lock);
return 0;
}
cv_wait(&ap->ap_writers);
cpu_relax();
}
for (int i = 0; i < nr_elem; i++) {
/* power of 2 elements in the ring buffer, index is the lower n
* bits */
wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1);
memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf,
ap->ap_elem_sz);
ap->ap_wr_off++;
buf += ap->ap_elem_sz;
nr_copied++;
if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off))
break;
}
/* We only need to wake readers, since the reader that woke us used a
* broadcast. o/w, we'd need to wake the next writer. (same goes for
* the error case). */
__apipe_wake_readers(ap);
spin_unlock(&ap->ap_lock);
return nr_copied;
}
void *apipe_head(struct atomic_pipe *ap)
{
if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
return 0;
return ap->ap_buf +
(ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
}
/*
* Read data from the pipe until a condition is satisfied.
* f is the function that determines the condition. f saves its
* state in arg. When f returns non-zero, this function exits,
* and returns the value to its caller. Note that f can return -1
* to indicate an error. But returning zero will keep you trapped in
* this function. The intent here is to ensure one-reader-at-a-time
* operation.
*/
int apipe_read_cond(struct atomic_pipe *ap,
int(*f)(struct atomic_pipe *pipe, void *arg), void *arg)
{
size_t rd_idx;
int ret;
spin_lock(&ap->ap_lock);
/* Can only have one priority reader at a time. Wait our turn. */
while (ap->ap_has_priority_reader) {
cv_wait(&ap->ap_general_readers);
cpu_relax();
}
ap->ap_has_priority_reader = TRUE;
while (1) {
/* Each time there is a need to check the pipe, call
* f. f will maintain its state in arg. It is expected that f
* will dequeue elements from the pipe as they are available.
* N.B. this is being done for protocols like IPV4 that can
* fragment an RPC request. For IPV6, it is likely that this
* will end up looking like a blocking read. Thus was it ever
* with legacy code. F is supposed to call apipe_read_locked().
*/
ret = f(ap, arg);
if (ret)
break;
/* if nr_writers goes to zero, that's bad. return -1 because
* they're going to have to clean up. We should have been able
* to call f once though, to pull out any remaining elements.
* The main concern here is sleeping on the cv when no one (no
* writers) will wake us. */
if (!ap->ap_nr_writers) {
ret = -1;
goto out;
}
cv_wait(&ap->ap_priority_reader);
cpu_relax();
}
out:
/* All out paths need to wake other readers. When we were woken up,
* there was no broadcast sent to the other readers. Plus, there may be
* other potential priority readers. */
ap->ap_has_priority_reader = FALSE;
__apipe_wake_readers(ap);
/* FYI, writers were woken up after an actual read. If we had an error
* (ret == -1), there should be no writers. */
spin_unlock(&ap->ap_lock);
return ret;
}