Incremental work on excorcising the transfer iface.
[doldaconnect.git] / daemon / fnet-dc.c
index 317a133..d2343da 100644 (file)
@@ -122,6 +122,7 @@ struct dcpeer
     struct timer *timeout;
     struct qcmdqueue queue;
     struct transfer *transfer;
+    struct socket *trpipe;
     int state;
     int ptclose;      /* Close after transfer is complete */
     int accepted;     /* If false, we connected, otherwise, we accepted */
@@ -139,7 +140,6 @@ struct dcpeer
 };
 
 static struct fnet dcnet;
-static struct transferiface dctransfer;
 static struct socket *udpsock = NULL;
 static struct lport *tcpsock = NULL;
 static struct dcpeer *peers = NULL;
@@ -150,6 +150,7 @@ static char *xmllistname = NULL;
 static char *xmlbz2listname = NULL;
 static struct timer *listwritetimer = NULL;
 
+static struct socket *mktrpipe(struct dcpeer *peer);
 static void peerconnect(struct socket *sk, int err, struct fnetnode *fn);
 static void freedcpeer(struct dcpeer *peer);
 static void transread(struct socket *sk, struct dcpeer *peer);
@@ -1622,14 +1623,14 @@ static void cmd_direction(struct socket *sk, struct dcpeer *peer, char *cmd, cha
                peer->close = 1;
                return;
            }
-           transfer = newupload(peer->fn, &dcnet, peer->wcsname, &dctransfer, peer);
+           transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer));
        } else {
            if((transfer = finddownload(peer->wcsname)) == NULL)
            {
                peer->close = 1;
                return;
            }
-           transferattach(transfer, &dctransfer, peer);
+           transferattach(transfer, peer->trpipe = mktrpipe(peer));
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
@@ -1674,10 +1675,10 @@ static void cmd_peerlock(struct socket *sk, struct dcpeer *peer, char *cmd, char
                return;
            }
            peer->direction = TRNSD_UP;
-           transfer = newupload(peer->fn, &dcnet, peer->wcsname, &dctransfer, peer);
+           transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer));
        } else {
            peer->direction = TRNSD_DOWN;
-           transferattach(transfer, &dctransfer, peer);
+           transferattach(transfer, peer->trpipe = mktrpipe(peer));
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
@@ -2674,6 +2675,15 @@ static struct command peercmds[] =
 };
 #undef cc
 
+static struct socket *mktrpipe(struct dcpeer *peer)
+{
+    struct socket *sk;
+    
+    sk = netsockpipe();
+    sk->data = peer;
+    return(sk);
+}
+
 static void dctransdetach(struct transfer *transfer, struct dcpeer *peer)
 {
     CBUNREG(transfer, trans_filterout, peer);
@@ -2691,7 +2701,7 @@ static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer)
     
     if((peer->state == PEER_TRNS) || (peer->state == PEER_SYNC))
     {
-       if(sockqueuesize(peer->sk) < 65536)
+       if(sockqueueleft(peer->sk) > 0)
        {
            if((buf = transfergetdata(transfer, &bufsize)) != NULL)
            {
@@ -2762,18 +2772,14 @@ static void dctransendofdata(struct transfer *transfer, struct dcpeer *peer)
     dctransgotdata(transfer, peer);
 }
 
-static void dcwantdata(struct transfer *transfer, struct dcpeer *peer)
-{
-    if(transferdatasize(transfer) < 65536)
-       sockblock(peer->sk, 0);
-}
-
 static void transread(struct socket *sk, struct dcpeer *peer)
 {
     void *buf;
     size_t bufsize;
     struct transfer *transfer;
     
+    if(transferdatasize(peer->transfer) < 0)
+       return;
     if((buf = sockgetinbuf(sk, &bufsize)) == NULL)
        return;
     if(peer->transfer == NULL)
@@ -2791,8 +2797,12 @@ static void transread(struct socket *sk, struct dcpeer *peer)
        transferendofdata(transfer);
        return;
     }
-    if(transferdatasize(peer->transfer) > 65535)
-       sockblock(sk, 1);
+}
+
+static void dcwantdata(struct transfer *transfer, struct dcpeer *peer)
+{
+    if(transferdatasize(transfer) > 0)
+       transread(peer->sk, peer);
 }
 
 static void transerr(struct socket *sk, int err, struct dcpeer *peer)
@@ -2993,6 +3003,8 @@ static void hubread(struct socket *sk, struct fnetnode *fn)
     char *p, *p2;
     
     hub = (struct dchub *)fn->data;
+    if(hub->queue.size > 1000)
+       return;
     if((newbuf = sockgetinbuf(sk, &datalen)) == NULL)
        return;
     if(hub->inbufdata > 500000) /* Discard possible malicious data */
@@ -3016,8 +3028,6 @@ static void hubread(struct socket *sk, struct fnetnode *fn)
        p = p2;
     }
     memmove(hub->inbuf, p, hub->inbufdata -= p - hub->inbuf);
-    if(hub->queue.size > 1000)
-       sockblock(sk, 1);
 }
 
 static void huberr(struct socket *sk, int err, struct fnetnode *fn)
@@ -3121,6 +3131,10 @@ static void freedcpeer(struct dcpeer *peer)
        peer->next->prev = peer->prev;
     if(peer->prev != NULL)
        peer->prev->next = peer->next;
+    if(peer->trpipe != NULL) {
+       closesock(peer->trpipe);
+       putsock(peer->trpipe);
+    }
     if(peer->transfer != NULL)
     {
        if(peer->transfer->dir == TRNSD_UP)
@@ -3208,14 +3222,6 @@ static void hubkill(struct fnetnode *fn)
     closesock(hub->sk);
 }
 
-static struct transferiface dctransfer =
-{
-    .detach = (void (*)(struct transfer *, void *))dctransdetach,
-    .gotdata = (void (*)(struct transfer *, void *))dctransgotdata,
-    .endofdata = (void (*)(struct transfer *, void *))dctransendofdata,
-    .wantdata = (void (*)(struct transfer *, void *))dcwantdata
-};
-
 static struct fnet dcnet =
 {
     .name = L"dc",
@@ -3234,6 +3240,10 @@ static void peerread(struct socket *sk, struct dcpeer *peer)
     size_t datalen, cnlen;
     struct command *cmd;
 
+    if(peer->state == PEER_CMD) {
+       if((peer->queue.size > 50) || (peer->inbufdata > 65536))
+           return;
+    }
     if((newbuf = sockgetinbuf(sk, &datalen)) == NULL)
        return;
     sizebuf2(peer->inbuf, peer->inbufdata + datalen, 1);
@@ -3259,16 +3269,11 @@ static void peerread(struct socket *sk, struct dcpeer *peer)
            {
                peer->state = PEER_STOP;
                break;
-           } else {
-               if(peer->queue.size > 50)
-                   sockblock(sk, 1);
            }
        }
     } else if(peer->state == PEER_TTHL) {
        handletthl(peer);
     }
-    if(peer->inbufdata > 500000)
-       sockblock(sk, 1);
 }
 
 static void peererror(struct socket *sk, int err, struct dcpeer *peer)
@@ -3772,7 +3777,7 @@ static int run(void)
            quota--;
        }
        if(hub->queue.size < 1000)
-           sockblock(hub->sk, 0);
+           hubread(hub->sk, fn);
        if(quota < 1)
            break;
     }
@@ -3791,7 +3796,7 @@ static int run(void)
            quota--;
        }
        if((peer->queue.size < 50) && (peer->inbufdata < 500000))
-           sockblock(peer->sk, 0);
+           peerread(peer->sk, peer);
        if(quota < 1)
            break;
     }