| /* Copyright (c) 2013 The Regents of the University of California |
| * Barret Rhoden <brho@cs.berkeley.edu> |
| * See LICENSE for details. |
| * |
| * Atomic pipes. Multi-reader, multi-writer pipes, similar to sys_pipe except |
| * that they operate on fixed sized chunks of data. |
| * |
| * A note on broadcast wakeups. We broadcast in a few places. If we don't, |
| * then all paths (like error paths) will have to signal. Not a big deal |
| * either way, but just need to catch all the cases. Other non-obvious |
| * cases are that read and write methods need to wake other readers and |
| * writers (in the absence of a broadcast wakeup) */ |
| |
| #include <apipe.h> |
| #include <ros/ring_buffer.h> |
| #include <string.h> |
| #include <stdio.h> |
| |
| void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz, |
| size_t elem_sz) |
| { |
| ap->ap_buf = buf; |
| /* power of two number of elements in the ring. */ |
| ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz); |
| ap->ap_elem_sz = elem_sz; |
| ap->ap_rd_off = 0; |
| ap->ap_wr_off = 0; |
| ap->ap_nr_readers = 1; |
| ap->ap_nr_writers = 1; |
| /* Three CVs, all using the same lock. */ |
| spinlock_init(&ap->ap_lock); |
| cv_init_with_lock(&ap->ap_priority_reader, &ap->ap_lock); |
| cv_init_with_lock(&ap->ap_general_readers, &ap->ap_lock); |
| cv_init_with_lock(&ap->ap_writers, &ap->ap_lock); |
| ap->ap_has_priority_reader = FALSE; |
| } |
| |
| void apipe_open_reader(struct atomic_pipe *ap) |
| { |
| spin_lock(&ap->ap_lock); |
| ap->ap_nr_readers++; |
| spin_unlock(&ap->ap_lock); |
| } |
| |
| void apipe_open_writer(struct atomic_pipe *ap) |
| { |
| spin_lock(&ap->ap_lock); |
| ap->ap_nr_writers++; |
| spin_unlock(&ap->ap_lock); |
| } |
| |
| /* Helper: Wake the appropriate readers. When there's a priority reader, only |
| * that one wakes up. It's up to the priority reader to wake the other readers, |
| * by clearing has_prior and calling this again. */ |
| static void __apipe_wake_readers(struct atomic_pipe *ap) |
| { |
| if (ap->ap_has_priority_reader) |
| __cv_signal(&ap->ap_priority_reader); |
| else |
| __cv_broadcast(&ap->ap_general_readers); |
| } |
| |
| /* When closing, there might be others blocked waiting for us. For example, |
| * a writer could have blocked on a full pipe, waiting for us to read. Instead |
| * of reading, the last reader closes. The writer needs to be woken up so it |
| * can return 0. */ |
| void apipe_close_reader(struct atomic_pipe *ap) |
| { |
| spin_lock(&ap->ap_lock); |
| ap->ap_nr_readers--; |
| __cv_broadcast(&ap->ap_writers); |
| spin_unlock(&ap->ap_lock); |
| } |
| |
| void apipe_close_writer(struct atomic_pipe *ap) |
| { |
| spin_lock(&ap->ap_lock); |
| ap->ap_nr_writers--; |
| __apipe_wake_readers(ap); |
| spin_unlock(&ap->ap_lock); |
| } |
| |
| /* read a pipe that is already locked. */ |
| int apipe_read_locked(struct atomic_pipe *ap, void *buf, size_t nr_elem) |
| { |
| size_t rd_idx; |
| int nr_copied = 0; |
| |
| for (int i = 0; i < nr_elem; i++) { |
| /* readers that call read_locked directly might have failed to |
| * check for emptiness, so we'll double check early. */ |
| if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) |
| break; |
| /* power of 2 elements in the ring buffer, index is the lower n |
| * bits */ |
| rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1); |
| memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz, |
| ap->ap_elem_sz); |
| ap->ap_rd_off++; |
| buf += ap->ap_elem_sz; |
| nr_copied++; |
| } |
| /* We could have multiple writers blocked. Just broadcast for them all. |
| * Alternatively, we could signal one, and then it's on the writers to |
| * signal further writers (see the note at the top of this file). */ |
| __cv_broadcast(&ap->ap_writers); |
| return nr_copied; |
| } |
| |
| |
| int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem) |
| { |
| size_t rd_idx; |
| int nr_copied = 0; |
| |
| spin_lock(&ap->ap_lock); |
| /* Need to wait til the priority reader is gone, and the ring isn't |
| * empty. If we do this as two steps, (either of priority check or |
| * empty check first), there's a chance the second one will fail, and |
| * when we sleep and wake up, the first condition could have changed. |
| * (An alternative would be to block priority readers too, by promoting |
| * ourselves to a priority reader). */ |
| while (ap->ap_has_priority_reader || |
| __ring_empty(ap->ap_wr_off, ap->ap_rd_off)) { |
| if (!ap->ap_nr_writers) { |
| spin_unlock(&ap->ap_lock); |
| return 0; |
| } |
| cv_wait(&ap->ap_general_readers); |
| cpu_relax(); |
| } |
| /* This read call wakes up writers */ |
| nr_copied = apipe_read_locked(ap, buf, nr_elem); |
| /* If the writer didn't broadcast, we'd need to wake other readers |
| * (imagine a long queue of blocked readers, and a queue filled by one |
| * massive write). (same with the error case). */ |
| spin_unlock(&ap->ap_lock); |
| return nr_copied; |
| } |
| |
| int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem) |
| { |
| size_t wr_idx; |
| int nr_copied = 0; |
| |
| spin_lock(&ap->ap_lock); |
| /* not sure if we want to check for readers first or not */ |
| while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) { |
| if (!ap->ap_nr_readers) { |
| spin_unlock(&ap->ap_lock); |
| return 0; |
| } |
| cv_wait(&ap->ap_writers); |
| cpu_relax(); |
| } |
| for (int i = 0; i < nr_elem; i++) { |
| /* power of 2 elements in the ring buffer, index is the lower n |
| * bits */ |
| wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1); |
| memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf, |
| ap->ap_elem_sz); |
| ap->ap_wr_off++; |
| buf += ap->ap_elem_sz; |
| nr_copied++; |
| if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) |
| break; |
| } |
| /* We only need to wake readers, since the reader that woke us used a |
| * broadcast. o/w, we'd need to wake the next writer. (same goes for |
| * the error case). */ |
| __apipe_wake_readers(ap); |
| spin_unlock(&ap->ap_lock); |
| return nr_copied; |
| } |
| |
| void *apipe_head(struct atomic_pipe *ap) |
| { |
| if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) |
| return 0; |
| return ap->ap_buf + |
| (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz; |
| } |
| |
| /* |
| * Read data from the pipe until a condition is satisfied. |
| * f is the function that determines the condition. f saves its |
| * state in arg. When f returns non-zero, this function exits, |
| * and returns the value to its caller. Note that f can return -1 |
| * to indicate an error. But returning zero will keep you trapped in |
| * this function. The intent here is to ensure one-reader-at-a-time |
| * operation. |
| */ |
| int apipe_read_cond(struct atomic_pipe *ap, |
| int(*f)(struct atomic_pipe *pipe, void *arg), void *arg) |
| { |
| size_t rd_idx; |
| int ret; |
| |
| spin_lock(&ap->ap_lock); |
| /* Can only have one priority reader at a time. Wait our turn. */ |
| while (ap->ap_has_priority_reader) { |
| cv_wait(&ap->ap_general_readers); |
| cpu_relax(); |
| } |
| ap->ap_has_priority_reader = TRUE; |
| while (1) { |
| /* Each time there is a need to check the pipe, call |
| * f. f will maintain its state in arg. It is expected that f |
| * will dequeue elements from the pipe as they are available. |
| * N.B. this is being done for protocols like IPV4 that can |
| * fragment an RPC request. For IPV6, it is likely that this |
| * will end up looking like a blocking read. Thus was it ever |
| * with legacy code. F is supposed to call apipe_read_locked(). |
| */ |
| ret = f(ap, arg); |
| if (ret) |
| break; |
| /* if nr_writers goes to zero, that's bad. return -1 because |
| * they're going to have to clean up. We should have been able |
| * to call f once though, to pull out any remaining elements. |
| * The main concern here is sleeping on the cv when no one (no |
| * writers) will wake us. */ |
| if (!ap->ap_nr_writers) { |
| ret = -1; |
| goto out; |
| } |
| cv_wait(&ap->ap_priority_reader); |
| cpu_relax(); |
| } |
| out: |
| /* All out paths need to wake other readers. When we were woken up, |
| * there was no broadcast sent to the other readers. Plus, there may be |
| * other potential priority readers. */ |
| ap->ap_has_priority_reader = FALSE; |
| __apipe_wake_readers(ap); |
| /* FYI, writers were woken up after an actual read. If we had an error |
| * (ret == -1), there should be no writers. */ |
| spin_unlock(&ap->ap_lock); |
| return ret; |
| } |