Incremental work on excorcising the transfer iface.
authorFredrik Tolf <fredrik@dolda2000.com>
Fri, 11 Apr 2008 04:26:59 +0000 (06:26 +0200)
committerFredrik Tolf <fredrik@dolda2000.com>
Fri, 11 Apr 2008 04:26:59 +0000 (06:26 +0200)
daemon/fnet-dc.c
daemon/net.c
daemon/net.h
daemon/transfer.c

index 708215a..d2343da 100644 (file)
@@ -122,6 +122,7 @@ struct dcpeer
     struct timer *timeout;
     struct qcmdqueue queue;
     struct transfer *transfer;
+    struct socket *trpipe;
     int state;
     int ptclose;      /* Close after transfer is complete */
     int accepted;     /* If false, we connected, otherwise, we accepted */
@@ -139,7 +140,6 @@ struct dcpeer
 };
 
 static struct fnet dcnet;
-static struct transferiface dctransfer;
 static struct socket *udpsock = NULL;
 static struct lport *tcpsock = NULL;
 static struct dcpeer *peers = NULL;
@@ -150,6 +150,7 @@ static char *xmllistname = NULL;
 static char *xmlbz2listname = NULL;
 static struct timer *listwritetimer = NULL;
 
+static struct socket *mktrpipe(struct dcpeer *peer);
 static void peerconnect(struct socket *sk, int err, struct fnetnode *fn);
 static void freedcpeer(struct dcpeer *peer);
 static void transread(struct socket *sk, struct dcpeer *peer);
@@ -1622,14 +1623,14 @@ static void cmd_direction(struct socket *sk, struct dcpeer *peer, char *cmd, cha
                peer->close = 1;
                return;
            }
-           transfer = newupload(peer->fn, &dcnet, peer->wcsname, &dctransfer, peer);
+           transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer));
        } else {
            if((transfer = finddownload(peer->wcsname)) == NULL)
            {
                peer->close = 1;
                return;
            }
-           transferattach(transfer, &dctransfer, peer);
+           transferattach(transfer, peer->trpipe = mktrpipe(peer));
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
@@ -1674,10 +1675,10 @@ static void cmd_peerlock(struct socket *sk, struct dcpeer *peer, char *cmd, char
                return;
            }
            peer->direction = TRNSD_UP;
-           transfer = newupload(peer->fn, &dcnet, peer->wcsname, &dctransfer, peer);
+           transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer));
        } else {
            peer->direction = TRNSD_DOWN;
-           transferattach(transfer, &dctransfer, peer);
+           transferattach(transfer, peer->trpipe = mktrpipe(peer));
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
@@ -2674,6 +2675,15 @@ static struct command peercmds[] =
 };
 #undef cc
 
+static struct socket *mktrpipe(struct dcpeer *peer)
+{
+    struct socket *sk;
+    
+    sk = netsockpipe();
+    sk->data = peer;
+    return(sk);
+}
+
 static void dctransdetach(struct transfer *transfer, struct dcpeer *peer)
 {
     CBUNREG(transfer, trans_filterout, peer);
@@ -3121,6 +3131,10 @@ static void freedcpeer(struct dcpeer *peer)
        peer->next->prev = peer->prev;
     if(peer->prev != NULL)
        peer->prev->next = peer->next;
+    if(peer->trpipe != NULL) {
+       closesock(peer->trpipe);
+       putsock(peer->trpipe);
+    }
     if(peer->transfer != NULL)
     {
        if(peer->transfer->dir == TRNSD_UP)
@@ -3208,14 +3222,6 @@ static void hubkill(struct fnetnode *fn)
     closesock(hub->sk);
 }
 
-static struct transferiface dctransfer =
-{
-    .detach = (void (*)(struct transfer *, void *))dctransdetach,
-    .gotdata = (void (*)(struct transfer *, void *))dctransgotdata,
-    .endofdata = (void (*)(struct transfer *, void *))dctransendofdata,
-    .wantdata = (void (*)(struct transfer *, void *))dcwantdata
-};
-
 static struct fnet dcnet =
 {
     .name = L"dc",
index d64e8c8..6f83266 100644 (file)
@@ -234,6 +234,15 @@ static void sksetstate(struct socket *sk, int state)
     sk->back->state = state;
 }
 
+struct socket *netsockpipe(void)
+{
+    struct socket *sk;
+    
+    sk = sockpair(0);
+    sksetstate(sk, SOCK_EST);
+    return(sk);
+}
+
 static void closeufd(struct ufd *ufd)
 {
     if(ufd->fd != -1)
index 24e7ca5..9ae9650 100644 (file)
@@ -77,6 +77,7 @@ struct lport {
 
 void putsock(struct socket *sk);
 void getsock(struct socket *sk);
+struct socket *netsockpipe(void);
 struct lport *netcslisten(int type, struct sockaddr *name, socklen_t namelen, void (*func)(struct lport *, struct socket *, void *), void *data);
 struct lport *netcslistenlocal(int type, struct sockaddr *name, socklen_t namelen, void (*func)(struct lport *, struct socket *, void *), void *data);
 struct lport *netcstcplisten(int port, int local, void (*func)(struct lport *, struct socket *, void *), void *data);
index 9ad6efe..9c50ed1 100644 (file)
@@ -126,15 +126,68 @@ struct transfer *newtransfer(void)
     return(new);
 }
 
+static void localread(struct socket *sk, struct transfer *transfer)
+{
+    void *buf;
+    size_t blen;
+    
+    if((transfer->datapipe != NULL) && (sockqueueleft(transfer->datapipe) > 0)) {
+       buf = sockgetinbuf(sk, &blen);
+       sockqueue(transfer->datapipe, buf, blen);
+    }
+}
+
+static void dataread(struct socket *sk, struct transfer *transfer)
+{
+    void *buf;
+    size_t blen;
+    
+    if((transfer->localend != NULL) && (sockqueueleft(transfer->localend) > 0)) {
+       buf = sockgetinbuf(sk, &blen);
+       sockqueue(transfer->localend, buf, blen);
+    }
+}
+
+static void localwrite(struct socket *sk, struct transfer *transfer)
+{
+    if(transfer->datapipe != NULL)
+       dataread(transfer->datapipe, transfer);
+}
+
+static void datawrite(struct socket *sk, struct transfer *transfer)
+{
+    if(transfer->localend != NULL)
+       localread(transfer->localend, transfer);
+}
+
+static void localerr(struct socket *sk, int errno, struct transfer *transfer)
+{
+    if(transfer->datapipe != NULL)
+       closesock(transfer->datapipe);
+}
+
+static void dataerr(struct socket *sk, int errno, struct transfer *transfer)
+{
+    if(transfer->localend != NULL)
+       closesock(transfer->localend);
+}
+
 void transferattach(struct transfer *transfer, struct socket *dpipe)
 {
     transferdetach(transfer);
     getsock(transfer->datapipe = dpipe);
+    dpipe->readcb = (void (*)(struct socket *, void *))dataread;
+    dpipe->writecb = (void (*)(struct socket *, void *))datawrite;
+    dpipe->errcb = (void (*)(struct socket *, int, void *))dataerr;
+    dpipe->data = transfer;
 }
 
 void transferdetach(struct transfer *transfer)
 {
     if(transfer->datapipe != NULL) {
+       transfer->datapipe->readcb = NULL;
+       transfer->datapipe->writecb = NULL;
+       transfer->datapipe->errcb = NULL;
        closesock(transfer->datapipe);
        putsock(transfer->datapipe);
     }
@@ -228,39 +281,7 @@ static void transexpire(int cancelled, struct transfer *transfer)
        transfer->timeout = 0;
 }
 
-static void localread(struct socket *sk, struct transfer *transfer)
-{
-    void *buf;
-    size_t blen;
-    
-    if(transfer->datapipe != NULL) {
-       buf = sockgetinbuf(sk, &blen);
-       sockqueue(transfer->datapipe, buf, blen);
-       if(sockqueuesize(transfer->datapipe) >= 65536)
-           sockblock(sk, 1);
-       else
-           sockblock(sk, 0);
-    } else {
-       if(sockgetdatalen(sk) >= 65536)
-           sockblock(sk, 1);
-    }
-}
-
-static void localwrite(struct socket *sk, struct transfer *transfer)
-{
-    void *buf;
-    size_t blen;
-    
-    
-}
-
-static void localerr(struct socket *sk, int errno, struct transfer *transfer)
-{
-    if((transfer->iface != NULL) && (transfer->iface->endofdata != NULL))
-       transfer->iface->endofdata(transfer, transfer->ifacedata);
-}
-
-void transferputdata(struct transfer *transfer, void *buf, size_t size)
+static void transferputdata(struct transfer *transfer, void *buf, size_t size)
 {
     time(&transfer->activity);
     sockqueue(transfer->localend, buf, size);
@@ -269,7 +290,7 @@ void transferputdata(struct transfer *transfer, void *buf, size_t size)
     CBCHAINDOCB(transfer, trans_p, transfer);
 }
 
-void transferendofdata(struct transfer *transfer)
+static void transferendofdata(struct transfer *transfer)
 {
     if(transfer->curpos >= transfer->size)
     {
@@ -284,12 +305,12 @@ void transferendofdata(struct transfer *transfer)
     }
 }
 
-ssize_t transferdatasize(struct transfer *transfer)
+static ssize_t transferdatasize(struct transfer *transfer)
 {
     return(sockqueueleft(transfer->localend));
 }
 
-void *transfergetdata(struct transfer *transfer, size_t *size)
+static void *transfergetdata(struct transfer *transfer, size_t *size)
 {
     void *buf;
     
@@ -332,7 +353,7 @@ void transferstartul(struct transfer *transfer, struct socket *sk)
     transfersetstate(transfer, TRNS_MAIN);
     socksettos(sk, confgetint("transfer", "ultos"));
     if(transfer->localend != NULL)
-       transferread(transfer->localend, transfer);
+       localread(transfer->localend, transfer);
 }
 
 void transfersetlocalend(struct transfer *transfer, struct socket *sk)
@@ -740,6 +761,7 @@ static int run(void)
 {
     struct transfer *transfer, *next;
     
+    /*
     for(transfer = transfers; transfer != NULL; transfer = transfer->next)
     {
        if((transfer->endpos >= 0) && (transfer->state == TRNS_MAIN) && (transfer->localend != NULL) && (transfer->localend->state == SOCK_EST) && (transfer->curpos >= transfer->endpos))
@@ -749,6 +771,7 @@ static int run(void)
            closesock(transfer->localend);
        }
     }
+    */
     for(transfer = transfers; transfer != NULL; transfer = next)
     {
        next = transfer->next;