epoll: Support very large CEQ sets

We now NR_FILE_DESC_MAX for the CEQ.  This allows the user to tap as many
(kernel) FDs as they want, without us worrying about the ceq size.  We'll
take the epoll size parameter as a hint for the ring size (which is the
expected max number of unique events in the CEQ).

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
diff --git a/user/iplib/epoll.c b/user/iplib/epoll.c
index d4f97a0..6d0e1f7 100644
--- a/user/iplib/epoll.c
+++ b/user/iplib/epoll.c
@@ -11,7 +11,6 @@
  * 	- there's no EPOLLONESHOT or level-triggered support.
  * 	- you can only tap one FD at a time, so you can't add the same FD to
  * 	multiple epoll sets.
- * 	- there is no support for growing the epoll set.
  * 	- closing the epoll is a little dangerous, if there are outstanding INDIR
  * 	events.  this will only pop up if you're yielding cores, maybe getting
  * 	preempted, and are unlucky.
@@ -51,7 +50,6 @@
 	struct event_queue			*ceq_evq;
 	struct event_queue			*alarm_evq;
 	struct ceq					*ceq;	/* convenience pointer */
-	unsigned int				size;
 	uth_mutex_t					mtx;
 	struct user_fd				ufd;
 };
@@ -111,6 +109,11 @@
 	return ep_ev;
 }
 
+static unsigned int ep_get_ceq_max_ever(struct epoll_ctlr *ep)
+{
+	return atomic_read(&ep->ceq_evq->ev_mbox->ceq.max_event_ever);
+}
+
 static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
 {
 	if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
@@ -131,11 +134,11 @@
 }
 
 /* Event queue helpers: */
-static struct event_queue *ep_get_ceq_evq(unsigned int ceq_size)
+static struct event_queue *ep_get_ceq_evq(unsigned int ceq_ring_sz)
 {
 	struct event_queue *ceq_evq = get_eventq_raw();
 	ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
-	ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, ceq_size, ceq_size);
+	ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, NR_FILE_DESC_MAX, ceq_ring_sz);
 	ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
 	evq_attach_wakeup_ctlr(ceq_evq);
 	return ceq_evq;
@@ -179,11 +182,12 @@
 	struct ep_fd_data *ep_fd_i;
 	int nr_tap_req = 0;
 	int nr_done = 0;
+	unsigned int max_ceq_events = ep_get_ceq_max_ever(ep);
 
-	tap_reqs = malloc(sizeof(struct fd_tap_req) * ep->size);
-	memset(tap_reqs, 0, sizeof(struct fd_tap_req) * ep->size);
+	tap_reqs = malloc(sizeof(struct fd_tap_req) * max_ceq_events);
+	memset(tap_reqs, 0, sizeof(struct fd_tap_req) * max_ceq_events);
 	/* Slightly painful, O(n) with no escape hatch */
-	for (int i = 0; i < ep->size; i++) {
+	for (int i = 0; i < max_ceq_events; i++) {
 		ceq_ev_i = ep_get_ceq_ev(ep, i);
 		/* CEQ should have been big enough for our size */
 		assert(ceq_ev_i);
@@ -213,17 +217,14 @@
 
 static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
 {
-	unsigned int ceq_size;
-
-	/* TODO: we don't grow yet.  Until then, we help out a little. */
 	if (size == 1)
 		size = 128;
-	ceq_size = ROUNDUPPWR2(size);
-	ep->size = ceq_size;
 	ep->mtx = uth_mutex_alloc();
 	ep->ufd.magic = EPOLL_UFD_MAGIC;
 	ep->ufd.close = epoll_close;
-	ep->ceq_evq = ep_get_ceq_evq(ceq_size);
+	/* Size is a hint for the CEQ concurrency.  We can actually handle as many
+	 * kernel FDs as is possible. */
+	ep->ceq_evq = ep_get_ceq_evq(ROUNDUPPWR2(size));
 	ep->alarm_evq = ep_get_alarm_evq();
 	return 0;
 }