| /* Copyright (c) 2011 The Regents of the University of California |
| * Barret Rhoden <brho@cs.berkeley.edu> |
| * See LICENSE for details. |
| * |
| * Unbounded concurrent queues, user side. Check k/i/r/ucq.h or the |
| * Documentation for more info. */ |
| |
| #include <ros/arch/membar.h> |
| #include <parlib/arch/atomic.h> |
| #include <parlib/arch/arch.h> |
| #include <parlib/ucq.h> |
| #include <parlib/spinlock.h> |
| #include <sys/mman.h> |
| #include <parlib/assert.h> |
| #include <parlib/stdio.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <parlib/vcore.h> |
| #include <parlib/ros_debug.h> /* for printd() */ |
| |
| /* Initializes a ucq. You pass in addresses of mmaped pages for the main page |
| * (prod_idx) and the spare page. I recommend mmaping a big chunk and breaking |
| * it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */ |
| void ucq_init_raw(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2) |
| { |
| printd("[user] initializing ucq %08p for proc %d\n", ucq, getpid()); |
| assert(!PGOFF(pg1)); |
| assert(!PGOFF(pg2)); |
| /* Prod and cons both start on the first page, slot 0. When they are |
| * equal, the ucq is empty. */ |
| atomic_set(&ucq->prod_idx, pg1); |
| atomic_set(&ucq->cons_idx, pg1); |
| ucq->prod_overflow = FALSE; |
| atomic_set(&ucq->nr_extra_pgs, 0); |
| atomic_set(&ucq->spare_pg, pg2); |
| parlib_static_assert(sizeof(struct spin_pdr_lock) <= |
| sizeof(ucq->u_lock)); |
| spin_pdr_init((struct spin_pdr_lock*)(&ucq->u_lock)); |
| ucq->ucq_ready = TRUE; |
| } |
| |
| /* Inits a ucq, where you don't have to bother with the memory allocation. This |
| * would be appropriate for one or two UCQs, though if you're allocating in |
| * bulk, use the raw version. */ |
| void ucq_init(struct ucq *ucq) |
| { |
| uintptr_t two_pages = (uintptr_t)mmap(0, PGSIZE * 2, |
| PROT_WRITE | PROT_READ, |
| MAP_POPULATE | MAP_ANONYMOUS | |
| MAP_PRIVATE, -1, 0); |
| |
| assert(two_pages); |
| ucq_init_raw(ucq, two_pages, two_pages + PGSIZE); |
| } |
| |
| /* Only call this on ucq's made with the simple ucq_init(). And be sure the ucq |
| * is no longer in use. */ |
| void ucq_free_pgs(struct ucq *ucq) |
| { |
| uintptr_t pg1 = atomic_read(&ucq->prod_idx); |
| uintptr_t pg2 = atomic_read(&ucq->spare_pg); |
| |
| assert(pg1 && pg2); |
| munmap((void*)pg1, PGSIZE); |
| munmap((void*)pg2, PGSIZE); |
| } |
| |
| /* Consumer side, returns TRUE on success and fills *msg with the ev_msg. If |
| * the ucq appears empty, it will return FALSE. Messages may have arrived after |
| * we started getting that we do not receive. */ |
| bool get_ucq_msg(struct ucq *ucq, struct event_msg *msg) |
| { |
| uintptr_t my_idx; |
| struct ucq_page *old_page, *other_page; |
| struct msg_container *my_msg; |
| struct spin_pdr_lock *ucq_lock = (struct spin_pdr_lock*)(&ucq->u_lock); |
| |
| do { |
| loop_top: |
| cmb(); |
| my_idx = atomic_read(&ucq->cons_idx); |
| /* The ucq is empty if the consumer and producer are on the same |
| * 'next' slot. */ |
| if (my_idx == atomic_read(&ucq->prod_idx)) |
| return FALSE; |
| /* Is the slot we want good? If not, we're going to need to try |
| * and move on to the next page. If it is, we bypass all of |
| * this and try to CAS on us getting my_idx. */ |
| if (slot_is_good(my_idx)) |
| goto claim_slot; |
| /* Slot is bad, let's try and fix it */ |
| spin_pdr_lock(ucq_lock); |
| /* Reread the idx, in case someone else fixed things up while we |
| * were waiting/fighting for the lock */ |
| my_idx = atomic_read(&ucq->cons_idx); |
| if (slot_is_good(my_idx)) { |
| /* Someone else fixed it already, let's just try to get |
| * out */ |
| spin_pdr_unlock(ucq_lock); |
| /* Make sure this new slot has a producer (ucq isn't |
| * empty) */ |
| if (my_idx == atomic_read(&ucq->prod_idx)) |
| return FALSE; |
| goto claim_slot; |
| } |
| /* At this point, the slot is bad, and all other possible |
| * consumers are spinning on the lock. Time to fix things up: |
| * Set the counter to the next page, and free the old one. */ |
| /* First, we need to wait and make sure the kernel has posted |
| * the next page. Worst case, we know that the kernel is |
| * working on it, since prod_idx != cons_idx */ |
| old_page = (struct ucq_page*)PTE_ADDR(my_idx); |
| while (!old_page->header.cons_next_pg) |
| cpu_relax(); |
| /* Now set the counter to the next page */ |
| assert(!PGOFF(old_page->header.cons_next_pg)); |
| atomic_set(&ucq->cons_idx, old_page->header.cons_next_pg); |
| /* Side note: at this point, any *new* consumers coming in will |
| * grab slots based off the new counter index (cons_idx) */ |
| /* Now free up the old page. Need to make sure all other |
| * consumers are done. We spin til enough are done, like an |
| * inverted refcnt. */ |
| while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE) |
| { |
| /* spinning on userspace here, specifically, another |
| * vcore and we don't know who it is. This will spin a |
| * bit, then make sure they aren't preeempted */ |
| cpu_relax_any(); |
| } |
| /* Now the page is done. 0 its metadata and give it up. */ |
| old_page->header.cons_next_pg = 0; |
| atomic_set(&old_page->header.nr_cons, 0); |
| /* We want to "free" the page. We'll try and set it as the |
| * spare. If there is already a spare, we'll free that one. */ |
| other_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg, |
| (long)old_page); |
| assert(!PGOFF(other_page)); |
| if (other_page) { |
| munmap(other_page, PGSIZE); |
| atomic_dec(&ucq->nr_extra_pgs); |
| } |
| /* All fixed up, unlock. Other consumers may lock and check to |
| * make sure things are done. */ |
| spin_pdr_unlock(ucq_lock); |
| /* Now that everything is fixed, try again from the top */ |
| goto loop_top; |
| claim_slot: |
| cmb(); /* so we can goto claim_slot */ |
| /* If we're still here, my_idx is good, and we'll try to claim |
| * it. If we fail, we need to repeat the whole process. */ |
| } while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1)); |
| assert(slot_is_good(my_idx)); |
| /* Now we have a good slot that we can consume */ |
| my_msg = slot2msg(my_idx); |
| /* linux would put an rmb_depends() here */ |
| /* Wait til the msg is ready (kernel sets this flag) */ |
| while (!my_msg->ready) |
| cpu_relax(); |
| rmb(); /* order the ready read before the contents */ |
| /* Copy out */ |
| *msg = my_msg->ev_msg; |
| /* Unset this for the next usage of the container */ |
| my_msg->ready = FALSE; |
| wmb(); /* post the ready write before incrementing */ |
| /* Increment nr_cons, showing we're done */ |
| atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons); |
| return TRUE; |
| } |
| |
| bool ucq_is_empty(struct ucq *ucq) |
| { |
| /* The ucq is empty if the consumer and producer are on the same 'next' |
| * slot. */ |
| return (atomic_read(&ucq->cons_idx) == atomic_read(&ucq->prod_idx)); |
| } |