Fixed up the transfer system a great deal.
authorFredrik Tolf <fredrik@dolda2000.com>
Fri, 17 Oct 2008 22:46:58 +0000 (00:46 +0200)
committerFredrik Tolf <fredrik@dolda2000.com>
Fri, 17 Oct 2008 22:46:58 +0000 (00:46 +0200)
daemon/fnet-dc.c
daemon/transfer.c

index f48d63d..b34ed3a 100644 (file)
@@ -1714,6 +1714,7 @@ static void startul(struct dcpeer *peer)
     peer->state = PEER_TRNS;
     transferstartul(peer->transfer, peer->sk);
     peer->sk->writecb = (void (*)(struct socket *, void *))transwrite;
+    transwrite(peer->sk, peer);
 }
 
 static void cmd_filelength(struct socket *sk, struct dcpeer *peer, char *cmd, char *args)
@@ -2688,28 +2689,24 @@ static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer)
        {
            if((buf = sockgetinbuf(peer->trpipe, &bufsize)) != NULL)
            {
-               if((transfer->endpos >= 0) && (transfer->curpos + bufsize >= transfer->endpos))
-                   bufsize = transfer->endpos - transfer->curpos;
-               if(bufsize > 0) {
-                   if(peer->compress == CPRS_NONE)
+               if(peer->compress == CPRS_NONE)
+               {
+                   sockqueue(peer->sk, buf, bufsize);
+               } else if(peer->compress == CPRS_ZLIB) {
+                   cstr = peer->cprsdata;
+                   cstr->next_in = buf;
+                   cstr->avail_in = bufsize;
+                   while(cstr->avail_in > 0)
                    {
-                       sockqueue(peer->sk, buf, bufsize);
-                   } else if(peer->compress == CPRS_ZLIB) {
-                       cstr = peer->cprsdata;
-                       cstr->next_in = buf;
-                       cstr->avail_in = bufsize;
-                       while(cstr->avail_in > 0)
+                       cstr->next_out = outbuf;
+                       cstr->avail_out = sizeof(outbuf);
+                       if((ret = deflate(cstr, 0)) != Z_OK)
                        {
-                           cstr->next_out = outbuf;
-                           cstr->avail_out = sizeof(outbuf);
-                           if((ret = deflate(cstr, 0)) != Z_OK)
-                           {
-                               flog(LOG_WARNING, "bug? deflate() did not return Z_OK (but rather %i)", ret);
-                               freedcpeer(peer);
-                               return;
-                           }
-                           sockqueue(peer->sk, outbuf, sizeof(outbuf) - cstr->avail_out);
+                           flog(LOG_WARNING, "bug? deflate() did not return Z_OK (but rather %i)", ret);
+                           freedcpeer(peer);
+                           return;
                        }
+                       sockqueue(peer->sk, outbuf, sizeof(outbuf) - cstr->avail_out);
                    }
                }
                free(buf);
@@ -2753,34 +2750,6 @@ static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer)
     }
 }
 
-void trpiperead(struct socket *sk, struct dcpeer *peer)
-{
-    dctransgotdata(peer->transfer, peer);
-}
-
-void trpipewrite(struct socket *sk, struct dcpeer *peer)
-{
-}
-
-void trpipeerr(struct socket *sk, int errno, struct dcpeer *peer)
-{
-    peer->state = PEER_SYNC;
-    dctransgotdata(peer->transfer, peer);
-    CBUNREG(peer->transfer, trans_filterout, peer);
-}
-
-static struct socket *mktrpipe(struct dcpeer *peer)
-{
-    struct socket *sk;
-    
-    sk = netsockpipe();
-    sk->data = peer;
-    sk->readcb = (void (*)(struct socket *, void *))trpiperead;
-    sk->writecb = (void (*)(struct socket *, void *))trpipewrite;
-    sk->errcb = (void (*)(struct socket *, int, void *))trpipeerr;
-    return(sk);
-}
-
 static void transread(struct socket *sk, struct dcpeer *peer)
 {
     void *buf;
@@ -2788,20 +2757,23 @@ static void transread(struct socket *sk, struct dcpeer *peer)
     
     if(sockqueueleft(peer->trpipe) < 0)
        return;
-    if((buf = sockgetinbuf(sk, &bufsize)) == NULL)
-       return;
-    if(peer->transfer == NULL)
+    if((buf = sockgetinbuf(sk, &bufsize)) != NULL)
     {
+       if(peer->transfer == NULL)
+       {
+           free(buf);
+           freedcpeer(peer);
+           return;
+       }
+       sockqueue(peer->trpipe, buf, bufsize);
        free(buf);
-       freedcpeer(peer);
-       return;
     }
-    sockqueue(peer->trpipe, buf, bufsize);
-    free(buf);
     if(peer->transfer->curpos >= peer->transfer->size)
     {
        closesock(peer->trpipe);
        quitsock(peer->trpipe);
+       peer->trpipe = NULL;
+       peer->transfer = NULL;
        peer->close = 1;
        return;
     }
@@ -2818,6 +2790,8 @@ static void transerr(struct socket *sk, int err, struct dcpeer *peer)
     }
     closesock(peer->trpipe);
     quitsock(peer->trpipe);
+    peer->trpipe = NULL;
+    peer->transfer = NULL;
     peer->close = 1;
 }
 
@@ -2833,6 +2807,36 @@ static void transwrite(struct socket *sk, struct dcpeer *peer)
     dctransgotdata(peer->transfer, peer);
 }
 
+static void trpiperead(struct socket *sk, struct dcpeer *peer)
+{
+    dctransgotdata(peer->transfer, peer);
+}
+
+static void trpipewrite(struct socket *sk, struct dcpeer *peer)
+{
+    transread(peer->sk, peer);
+}
+
+static void trpipeerr(struct socket *sk, int errno, struct dcpeer *peer)
+{
+    peer->state = PEER_SYNC;
+    dctransgotdata(peer->transfer, peer);
+    CBUNREG(peer->transfer, trans_filterout, peer);
+}
+
+static struct socket *mktrpipe(struct dcpeer *peer)
+{
+    struct socket *sk;
+    
+    sk = netsockpipe();
+    socksetdebug(sk, 2, "trpipe");
+    sk->data = peer;
+    sk->readcb = (void (*)(struct socket *, void *))trpiperead;
+    sk->writecb = (void (*)(struct socket *, void *))trpipewrite;
+    sk->errcb = (void (*)(struct socket *, int, void *))trpipeerr;
+    return(sk);
+}
+
 static void udpread(struct socket *sk, void *data)
 {
     char *buf, *p, *p2, *hashbuf;
@@ -3110,6 +3114,7 @@ static struct dcpeer *newdcpeer(struct socket *sk)
     new = smalloc(sizeof(*new));
     memset(new, 0, sizeof(*new));
     new->transfer = NULL;
+    socksetdebug(sk, 2, "peersock");
     getsock(sk);
     new->sk = sk;
     if(confgetint("dc", "dcppemu"))
@@ -3247,6 +3252,9 @@ static void peerread(struct socket *sk, struct dcpeer *peer)
     if(peer->state == PEER_CMD) {
        if((peer->queue.size > 50) || (peer->inbufdata > 65536))
            return;
+    } else if(peer->state == PEER_TTHL) {
+    } else {
+       return;
     }
     if((newbuf = sockgetinbuf(sk, &datalen)) == NULL)
        return;
index ee5f673..b713f52 100644 (file)
@@ -133,6 +133,8 @@ static void localread(struct socket *sk, struct transfer *transfer)
     
     if((transfer->datapipe != NULL) && (sockqueueleft(transfer->datapipe) > 0)) {
        buf = sockgetinbuf(sk, &blen);
+       if((transfer->endpos >= 0) && (transfer->curpos + blen > transfer->endpos))
+           blen = transfer->endpos - transfer->curpos;
        sockqueue(transfer->datapipe, buf, blen);
        free(buf);
        time(&transfer->activity);
@@ -149,6 +151,8 @@ static void dataread(struct socket *sk, struct transfer *transfer)
     
     if((transfer->localend != NULL) && (sockqueueleft(transfer->localend) > 0)) {
        buf = sockgetinbuf(sk, &blen);
+       if((transfer->endpos >= 0) && (transfer->curpos + blen > transfer->endpos))
+           blen = transfer->endpos - transfer->curpos;
        sockqueue(transfer->localend, buf, blen);
        free(buf);
        transfer->curpos += blen;
@@ -179,9 +183,11 @@ static void dataerr(struct socket *sk, int errno, struct transfer *transfer)
 {
     if(transfer->curpos >= transfer->size) {
        transfersetstate(transfer, TRNS_DONE);
-       closesock(transfer->localend);
-       quitsock(transfer->localend);
-       transfer->localend = NULL;
+       if(transfer->localend != NULL) {
+           closesock(transfer->localend);
+           quitsock(transfer->localend);
+           transfer->localend = NULL;
+       }
     } else {
        resettransfer(transfer);
     }
@@ -319,6 +325,7 @@ void transfersetlocalend(struct transfer *transfer, struct socket *sk)
 {
     if(transfer->localend != NULL)
        putsock(transfer->localend);
+    socksetdebug(sk, 2, "localend");
     getsock(transfer->localend = sk);
     sk->data = transfer;
     sk->readcb = (void (*)(struct socket *, void *))localread;