blob: c670474261df941f57b68acb91b1678384ac2de5 [file] [log] [blame]
//#define DEBUG
#include <vfs.h>
#include <kfs.h>
#include <slab.h>
#include <kmalloc.h>
#include <kref.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <error.h>
#include <cpio.h>
#include <pmap.h>
#include <smp.h>
struct pipe {
qlock_t qlock;
struct pipe *next;
struct kref ref;
uint32_t path;
struct queue *q[2];
int qref[2];
};
struct {
spinlock_t lock;
uint32_t path;
} pipealloc;
enum {
Qdir,
Qdata0,
Qdata1,
};
struct dirtab pipedir[] = {
{".", {Qdir, 0, QTDIR}, 0, DMDIR | 0500},
{"data", {Qdata0}, 0, 0600},
{"data1", {Qdata1}, 0, 0600},
};
#define NPIPEDIR 3
#define PIPETYPE(x) (((unsigned)x)&0x1f)
#define PIPEID(x) ((((unsigned)x))>>5)
#define PIPEQID(i, t) ((((unsigned)i)<<5)|(t))
enum {
/* Plan 9 default for nmach > 1 */
Pipeqsize = 256 * 1024
};
static void pipe_release(struct kref *kref)
{
printd("pipe release\n");
}
static void pipeinit(void)
{
printd("%s\n", __func__);
}
/*
* create a pipe, no streams are created until an open
*/
static struct chan *pipeattach(char *spec)
{
struct pipe *p;
struct chan *c;
printd("%s\n", __func__);
c = devattach('P', spec);
p = kzmalloc(sizeof(struct pipe), 0);
if (p == 0)
panic("memory");
kref_init(&p->ref, pipe_release, 1);
qlock_init(&p->qlock);
p->q[0] = qopen(Pipeqsize, 0, 0, 0);
if (p->q[0] == 0) {
kfree(p);
panic("memory");
}
p->q[1] = qopen(Pipeqsize, 0, 0, 0);
if (p->q[1] == 0) {
kfree(p->q[0]);
kfree(p);
panic("memory");
}
spin_lock(&pipealloc.lock);
p->path = ++pipealloc.path;
spin_unlock(&pipealloc.lock);
mkqid(&c->qid, PIPEQID(2 * p->path, Qdir), 0, QTDIR);
c->aux = p;
c->devno = 0;
return c;
}
static int
pipegen(struct chan *c, char *unused, struct dirtab *tab, int ntab, int i,
struct dir *dp)
{
struct qid q;
int len;
struct pipe *p;
printd("%s\n", __func__);
if (i == DEVDOTDOT) {
devdir(c, c->qid, "#P", 0, eve, DMDIR | 0555, dp);
return 1;
}
i++; /* skip . */
if (tab == 0 || i >= ntab)
return -1;
tab += i;
p = c->aux;
switch ((uint32_t) tab->qid.path) {
case Qdata0:
len = qlen(p->q[0]);
break;
case Qdata1:
len = qlen(p->q[1]);
break;
default:
len = tab->length;
break;
}
mkqid(&q, PIPEQID(PIPEID(c->qid.path), tab->qid.path), 0, QTFILE);
devdir(c, q, tab->name, len, eve, tab->perm, dp);
return 1;
}
static struct walkqid *pipewalk(struct chan *c, struct chan *nc, char **name,
int nname)
{
struct walkqid *wq;
struct pipe *p;
printd("%s\n", __func__);
wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
if (wq != NULL && wq->clone != NULL && wq->clone != c) {
p = c->aux;
qlock(&p->qlock);
kref_get(&p->ref, 1);
if (c->flag & COPEN) {
printd("channel open in pipewalk\n");
switch (PIPETYPE(c->qid.path)) {
case Qdata0:
p->qref[0]++;
break;
case Qdata1:
p->qref[1]++;
break;
}
}
qunlock(&p->qlock);
}
return wq;
}
static long
pipestat(struct chan *c, uint8_t * db, long n)
{
struct pipe *p;
struct dir dir;
printd("%s\n", __func__);
p = c->aux;
switch (PIPETYPE(c->qid.path)) {
case Qdir:
devdir(c, c->qid, ".", 0, eve, DMDIR | 0555, &dir);
break;
case Qdata0:
devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
break;
case Qdata1:
devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
break;
default:
panic("pipestat");
}
n = convD2M(&dir, db, n);
if (n < sizeof(uint16_t))
error(Eshortstat);
return n;
}
/*
* if the stream doesn't exist, create it
*/
static struct chan *pipeopen(struct chan *c, int omode)
{
struct pipe *p;
printd("%s\n", __func__);
if (c->qid.type & QTDIR) {
if (omode != OREAD)
error(Ebadarg);
c->mode = omode;
c->flag |= COPEN;
c->offset = 0;
return c;
}
p = c->aux;
qlock(&p->qlock);
switch (PIPETYPE(c->qid.path)) {
case Qdata0:
p->qref[0]++;
break;
case Qdata1:
p->qref[1]++;
break;
}
qunlock(&p->qlock);
c->mode = openmode(omode);
c->flag |= COPEN;
c->offset = 0;
c->iounit = qiomaxatomic;
return c;
}
static void pipeclose(struct chan *c)
{
struct pipe *p;
printd("%s\n", __func__);
p = c->aux;
qlock(&p->qlock);
if (c->flag & COPEN) {
/*
* closing either side hangs up the stream
*/
switch (PIPETYPE(c->qid.path)) {
case Qdata0:
p->qref[0]--;
if (p->qref[0] == 0) {
qhangup(p->q[1], 0);
qclose(p->q[0]);
}
break;
case Qdata1:
p->qref[1]--;
if (p->qref[1] == 0) {
qhangup(p->q[0], 0);
qclose(p->q[1]);
}
break;
}
}
/*
* if both sides are closed, they are reusable
*/
if (p->qref[0] == 0 && p->qref[1] == 0) {
qreopen(p->q[0]);
qreopen(p->q[1]);
}
/*
* free the structure on last close
*/
kref_put(&p->ref);
if (kref_refcnt(&p->ref) == 0) {
qunlock(&p->qlock);
kfree(p->q[0]);
kfree(p->q[1]);
kfree(p);
} else
qunlock(&p->qlock);
}
static long
piperead(struct chan *c, void *va, long n, int64_t unused)
{
struct pipe *p;
printd("%s\n", __func__);
p = c->aux;
switch (PIPETYPE(c->qid.path)) {
case Qdir:
return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
case Qdata0:
return qread(p->q[0], va, n);
case Qdata1:
return qread(p->q[1], va, n);
default:
panic("piperead");
}
return -1; /* not reached */
}
static struct block *pipebread(struct chan *c, long n, int64_t offset)
{
struct pipe *p;
printd("%s\n", __func__);
p = c->aux;
switch (PIPETYPE(c->qid.path)) {
case Qdata0:
return qbread(p->q[0], n);
case Qdata1:
return qbread(p->q[1], n);
}
return devbread(c, n, offset);
}
/*
* a write to a closed pipe causes a note to be sent to
* the process.
*/
static long
pipewrite(struct chan *c, void *va, long n, int64_t unused)
{
ERRSTACK(2);
struct pipe *p;
printd("%s\n", __func__);
if (waserror()) {
/* how do we deliver this one
if((c->flag & CMSG) == 0)
postnote(up, 1, "sys: write on closed pipe", NUser);
*/
nexterror();
}
p = c->aux;
switch (PIPETYPE(c->qid.path)) {
case Qdata0:
n = qwrite(p->q[1], va, n);
break;
case Qdata1:
n = qwrite(p->q[0], va, n);
break;
default:
panic("pipewrite");
}
poperror();
return n;
}
static long
pipebwrite(struct chan *c, struct block *bp, int64_t unused)
{
ERRSTACK(2);
long n;
struct pipe *p;
printd("%s\n", __func__);
if (waserror()) {
/* avoid notes when pipe is a mounted queue
how do we do this
if((c->flag & CMSG) == 0)
postnote(up, 1, "sys: write on closed pipe", NUser);
*/
nexterror();
}
p = c->aux;
switch (PIPETYPE(c->qid.path)) {
case Qdata0:
n = qbwrite(p->q[1], bp);
break;
case Qdata1:
n = qbwrite(p->q[0], bp);
break;
default:
n = 0;
panic("pipebwrite");
}
poperror();
return n;
}
struct dev pipedevtab = {
'P',
"pipe",
devreset,
pipeinit,
devshutdown,
pipeattach,
pipewalk,
pipestat,
pipeopen,
devcreate,
pipeclose,
piperead,
pipebread,
pipewrite,
pipebwrite,
devremove,
devwstat,
devpower,
devconfig,
devchaninfo,
};