First potentially working version of the transsocket.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 8 Jun 2008 12:02:18 +0000 (14:02 +0200)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 8 Jun 2008 12:02:18 +0000 (14:02 +0200)
daemon/fnet-dc.c
daemon/net.c
daemon/net.h
daemon/transfer.c

index 36e4297..8f83123 100644 (file)
@@ -1623,14 +1623,14 @@ static void cmd_direction(struct socket *sk, struct dcpeer *peer, char *cmd, cha
                peer->close = 1;
                return;
            }
                peer->close = 1;
                return;
            }
-           transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer));
+           transfer = newupload(peer->fn, &dcnet, peer->wcsname, (peer->trpipe = mktrpipe(peer))->back);
        } else {
            if((transfer = finddownload(peer->wcsname)) == NULL)
            {
                peer->close = 1;
                return;
            }
        } else {
            if((transfer = finddownload(peer->wcsname)) == NULL)
            {
                peer->close = 1;
                return;
            }
-           transferattach(transfer, peer->trpipe = mktrpipe(peer));
+           transferattach(transfer, (peer->trpipe = mktrpipe(peer))->back);
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
@@ -1675,10 +1675,10 @@ static void cmd_peerlock(struct socket *sk, struct dcpeer *peer, char *cmd, char
                return;
            }
            peer->direction = TRNSD_UP;
                return;
            }
            peer->direction = TRNSD_UP;
-           transfer = newupload(peer->fn, &dcnet, peer->wcsname, peer->trpipe = mktrpipe(peer));
+           transfer = newupload(peer->fn, &dcnet, peer->wcsname, (peer->trpipe = mktrpipe(peer))->back);
        } else {
            peer->direction = TRNSD_DOWN;
        } else {
            peer->direction = TRNSD_DOWN;
-           transferattach(transfer, peer->trpipe = mktrpipe(peer));
+           transferattach(transfer, (peer->trpipe = mktrpipe(peer))->back);
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
            transfersetstate(transfer, TRNS_HS);
        }
        transfersetnick(transfer, peer->wcsname);
@@ -2675,22 +2675,6 @@ static struct command peercmds[] =
 };
 #undef cc
 
 };
 #undef cc
 
-static struct socket *mktrpipe(struct dcpeer *peer)
-{
-    struct socket *sk;
-    
-    sk = netsockpipe();
-    sk->data = peer;
-    return(sk);
-}
-
-static void dctransdetach(struct transfer *transfer, struct dcpeer *peer)
-{
-    CBUNREG(transfer, trans_filterout, peer);
-    peer->transfer = NULL;
-    peer->close = 1;
-}
-
 static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer)
 {
     int ret;
 static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer)
 {
     int ret;
@@ -2703,26 +2687,30 @@ static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer)
     {
        if(sockqueueleft(peer->sk) > 0)
        {
     {
        if(sockqueueleft(peer->sk) > 0)
        {
-           if((buf = transfergetdata(transfer, &bufsize)) != NULL)
+           if((buf = sockgetinbuf(peer->trpipe, &bufsize)) != NULL)
            {
            {
-               if(peer->compress == CPRS_NONE)
-               {
-                   sockqueue(peer->sk, buf, bufsize);
-               } else if(peer->compress == CPRS_ZLIB) {
-                   cstr = peer->cprsdata;
-                   cstr->next_in = buf;
-                   cstr->avail_in = bufsize;
-                   while(cstr->avail_in > 0)
+               if((transfer->endpos >= 0) && (transfer->curpos + bufsize >= transfer->endpos))
+                   bufsize = transfer->endpos - transfer->curpos;
+               if(bufsize > 0) {
+                   if(peer->compress == CPRS_NONE)
                    {
                    {
-                       cstr->next_out = outbuf;
-                       cstr->avail_out = sizeof(outbuf);
-                       if((ret = deflate(cstr, 0)) != Z_OK)
+                       sockqueue(peer->sk, buf, bufsize);
+                   } else if(peer->compress == CPRS_ZLIB) {
+                       cstr = peer->cprsdata;
+                       cstr->next_in = buf;
+                       cstr->avail_in = bufsize;
+                       while(cstr->avail_in > 0)
                        {
                        {
-                           flog(LOG_WARNING, "bug? deflate() did not return Z_OK (but rather %i)", ret);
-                           freedcpeer(peer);
-                           return;
+                           cstr->next_out = outbuf;
+                           cstr->avail_out = sizeof(outbuf);
+                           if((ret = deflate(cstr, 0)) != Z_OK)
+                           {
+                               flog(LOG_WARNING, "bug? deflate() did not return Z_OK (but rather %i)", ret);
+                               freedcpeer(peer);
+                               return;
+                           }
+                           sockqueue(peer->sk, outbuf, sizeof(outbuf) - cstr->avail_out);
                        }
                        }
-                       sockqueue(peer->sk, outbuf, sizeof(outbuf) - cstr->avail_out);
                    }
                }
                free(buf);
                    }
                }
                free(buf);
@@ -2766,19 +2754,40 @@ static void dctransgotdata(struct transfer *transfer, struct dcpeer *peer)
     }
 }
 
     }
 }
 
-static void dctransendofdata(struct transfer *transfer, struct dcpeer *peer)
+void trpiperead(struct socket *sk, struct dcpeer *peer)
+{
+    dctransgotdata(peer->transfer, peer);
+}
+
+void trpipewrite(struct socket *sk, struct dcpeer *peer)
+{
+}
+
+void trpipeerr(struct socket *sk, int errno, struct dcpeer *peer)
 {
     peer->state = PEER_SYNC;
 {
     peer->state = PEER_SYNC;
-    dctransgotdata(transfer, peer);
+    dctransgotdata(peer->transfer, peer);
+    CBUNREG(peer->transfer, trans_filterout, peer);
+}
+
+static struct socket *mktrpipe(struct dcpeer *peer)
+{
+    struct socket *sk;
+    
+    sk = netsockpipe();
+    sk->data = peer;
+    sk->readcb = (void (*)(struct socket *, void *))trpiperead;
+    sk->writecb = (void (*)(struct socket *, void *))trpipewrite;
+    sk->errcb = (void (*)(struct socket *, int, void *))trpipeerr;
+    return(sk);
 }
 
 static void transread(struct socket *sk, struct dcpeer *peer)
 {
     void *buf;
     size_t bufsize;
 }
 
 static void transread(struct socket *sk, struct dcpeer *peer)
 {
     void *buf;
     size_t bufsize;
-    struct transfer *transfer;
     
     
-    if(transferdatasize(peer->transfer) < 0)
+    if(sockqueueleft(peer->trpipe) < 0)
        return;
     if((buf = sockgetinbuf(sk, &bufsize)) == NULL)
        return;
        return;
     if((buf = sockgetinbuf(sk, &bufsize)) == NULL)
        return;
@@ -2788,23 +2797,17 @@ static void transread(struct socket *sk, struct dcpeer *peer)
        freedcpeer(peer);
        return;
     }
        freedcpeer(peer);
        return;
     }
-    transferputdata(peer->transfer, buf, bufsize);
+    sockqueue(peer->trpipe, buf, bufsize);
     free(buf);
     if(peer->transfer->curpos >= peer->transfer->size)
     {
     free(buf);
     if(peer->transfer->curpos >= peer->transfer->size)
     {
-       transfer = peer->transfer;
-       transferdetach(transfer);
-       transferendofdata(transfer);
+       closesock(peer->trpipe);
+       quitsock(peer->trpipe);
+       peer->close = 1;
        return;
     }
 }
 
        return;
     }
 }
 
-static void dcwantdata(struct transfer *transfer, struct dcpeer *peer)
-{
-    if(transferdatasize(transfer) > 0)
-       transread(peer->sk, peer);
-}
-
 static void transerr(struct socket *sk, int err, struct dcpeer *peer)
 {
     struct transfer *transfer;
 static void transerr(struct socket *sk, int err, struct dcpeer *peer)
 {
     struct transfer *transfer;
@@ -2814,8 +2817,9 @@ static void transerr(struct socket *sk, int err, struct dcpeer *peer)
        freedcpeer(peer);
        return;
     }
        freedcpeer(peer);
        return;
     }
-    transferdetach(transfer);
-    transferendofdata(transfer);
+    closesock(peer->trpipe);
+    quitsock(peer->trpipe);
+    peer->close = 1;
 }
 
 static void transwrite(struct socket *sk, struct dcpeer *peer)
 }
 
 static void transwrite(struct socket *sk, struct dcpeer *peer)
@@ -3133,10 +3137,11 @@ static void freedcpeer(struct dcpeer *peer)
        peer->prev->next = peer->next;
     if(peer->trpipe != NULL) {
        closesock(peer->trpipe);
        peer->prev->next = peer->next;
     if(peer->trpipe != NULL) {
        closesock(peer->trpipe);
-       putsock(peer->trpipe);
+       quitsock(peer->trpipe);
     }
     if(peer->transfer != NULL)
     {
     }
     if(peer->transfer != NULL)
     {
+       CBUNREG(peer->transfer, trans_filterout, peer);
        if(peer->transfer->dir == TRNSD_UP)
            peer->transfer->close = 1;
        if(peer->transfer->dir == TRNSD_DOWN)
        if(peer->transfer->dir == TRNSD_UP)
            peer->transfer->close = 1;
        if(peer->transfer->dir == TRNSD_DOWN)
index 6f83266..ed7ad9b 100644 (file)
@@ -390,6 +390,14 @@ void putsock(struct socket *sk)
     }
 }
 
     }
 }
 
+void quitsock(struct socket *sk)
+{
+    sk->readcb = NULL;
+    sk->writecb = NULL;
+    sk->errcb = NULL;
+    putsock(sk);
+}
+
 static void linksock(struct scons **list, struct socket *sk)
 {
     struct scons *sc;
 static void linksock(struct scons **list, struct socket *sk)
 {
     struct scons *sc;
@@ -771,6 +779,18 @@ size_t sockgetdatalen(struct socket *sk)
 /*     return(sockgetdatalen(sk->back)); */
 /* } */
 
 /*     return(sockgetdatalen(sk->back)); */
 /* } */
 
+size_t socktqueuesize(struct socket *sk)
+{
+    size_t ret;
+    
+    ret = 0;
+    while(1) {
+       ret += sockgetdatalen(sk->back);
+       if((sk = sk->back->pnext) == NULL)
+           return(ret);
+    }
+}
+
 ssize_t sockqueueleft(struct socket *sk)
 {
     return(sk->back->maxbuf - sockgetdatalen(sk->back));
 ssize_t sockqueueleft(struct socket *sk)
 {
     return(sk->back->maxbuf - sockgetdatalen(sk->back));
index 9ae9650..b539860 100644 (file)
@@ -87,6 +87,7 @@ void freedgbuf(struct dgrambuf *dg);
 void sockqueue(struct socket *sk, void *data, size_t size);
 void sockerror(struct socket *sk, int en);
 /* size_t sockqueuesize(struct socket *sk); */
 void sockqueue(struct socket *sk, void *data, size_t size);
 void sockerror(struct socket *sk, int en);
 /* size_t sockqueuesize(struct socket *sk); */
+size_t socktqueuesize(struct socket *sk);
 ssize_t sockqueueleft(struct socket *sk);
 int netresolve(char *addr, void (*callback)(struct sockaddr *addr, int addrlen, void *data), void *data);
 struct socket *netcsdgram(struct sockaddr *name, socklen_t namelen);
 ssize_t sockqueueleft(struct socket *sk);
 int netresolve(char *addr, void (*callback)(struct sockaddr *addr, int addrlen, void *data), void *data);
 struct socket *netcsdgram(struct sockaddr *name, socklen_t namelen);
@@ -109,5 +110,6 @@ void sockpushdata(struct socket *sk, void *buf, size_t size);
 int sockpeeraddr(struct socket *sk, struct sockaddr **namebuf, socklen_t *lenbuf);
 int getucred(struct socket *sk, uid_t *uid, gid_t *gid);
 int sockfamily(struct socket *sk);
 int sockpeeraddr(struct socket *sk, struct sockaddr **namebuf, socklen_t *lenbuf);
 int getucred(struct socket *sk, uid_t *uid, gid_t *gid);
 int sockfamily(struct socket *sk);
+void quitsock(struct socket *sk);
 
 #endif
 
 #endif
index 9c50ed1..f23a3ed 100644 (file)
@@ -134,6 +134,10 @@ static void localread(struct socket *sk, struct transfer *transfer)
     if((transfer->datapipe != NULL) && (sockqueueleft(transfer->datapipe) > 0)) {
        buf = sockgetinbuf(sk, &blen);
        sockqueue(transfer->datapipe, buf, blen);
     if((transfer->datapipe != NULL) && (sockqueueleft(transfer->datapipe) > 0)) {
        buf = sockgetinbuf(sk, &blen);
        sockqueue(transfer->datapipe, buf, blen);
+       time(&transfer->activity);
+       transfer->curpos += blen;
+       bytesupload += blen;
+       CBCHAINDOCB(transfer, trans_p, transfer);
     }
 }
 
     }
 }
 
@@ -145,6 +149,9 @@ static void dataread(struct socket *sk, struct transfer *transfer)
     if((transfer->localend != NULL) && (sockqueueleft(transfer->localend) > 0)) {
        buf = sockgetinbuf(sk, &blen);
        sockqueue(transfer->localend, buf, blen);
     if((transfer->localend != NULL) && (sockqueueleft(transfer->localend) > 0)) {
        buf = sockgetinbuf(sk, &blen);
        sockqueue(transfer->localend, buf, blen);
+       transfer->curpos += blen;
+       bytesdownload += blen;
+       CBCHAINDOCB(transfer, trans_p, transfer);
     }
 }
 
     }
 }
 
@@ -168,8 +175,14 @@ static void localerr(struct socket *sk, int errno, struct transfer *transfer)
 
 static void dataerr(struct socket *sk, int errno, struct transfer *transfer)
 {
 
 static void dataerr(struct socket *sk, int errno, struct transfer *transfer)
 {
-    if(transfer->localend != NULL)
+    if(transfer->curpos >= transfer->size) {
+       transfersetstate(transfer, TRNS_DONE);
        closesock(transfer->localend);
        closesock(transfer->localend);
+       quitsock(transfer->localend);
+       transfer->localend = NULL;
+    } else {
+       resettransfer(transfer);
+    }
 }
 
 void transferattach(struct transfer *transfer, struct socket *dpipe)
 }
 
 void transferattach(struct transfer *transfer, struct socket *dpipe)
@@ -185,11 +198,8 @@ void transferattach(struct transfer *transfer, struct socket *dpipe)
 void transferdetach(struct transfer *transfer)
 {
     if(transfer->datapipe != NULL) {
 void transferdetach(struct transfer *transfer)
 {
     if(transfer->datapipe != NULL) {
-       transfer->datapipe->readcb = NULL;
-       transfer->datapipe->writecb = NULL;
-       transfer->datapipe->errcb = NULL;
        closesock(transfer->datapipe);
        closesock(transfer->datapipe);
-       putsock(transfer->datapipe);
+       quitsock(transfer->datapipe);
     }
     transfer->datapipe = NULL;
 }
     }
     transfer->datapipe = NULL;
 }
@@ -281,59 +291,6 @@ static void transexpire(int cancelled, struct transfer *transfer)
        transfer->timeout = 0;
 }
 
        transfer->timeout = 0;
 }
 
-static void transferputdata(struct transfer *transfer, void *buf, size_t size)
-{
-    time(&transfer->activity);
-    sockqueue(transfer->localend, buf, size);
-    transfer->curpos += size;
-    bytesdownload += size;
-    CBCHAINDOCB(transfer, trans_p, transfer);
-}
-
-static void transferendofdata(struct transfer *transfer)
-{
-    if(transfer->curpos >= transfer->size)
-    {
-       transfersetstate(transfer, TRNS_DONE);
-       transfer->localend->readcb = NULL;
-       transfer->localend->writecb = NULL;
-       transfer->localend->errcb = NULL;
-       putsock(transfer->localend);
-       transfer->localend = NULL;
-    } else {
-       resettransfer(transfer);
-    }
-}
-
-static ssize_t transferdatasize(struct transfer *transfer)
-{
-    return(sockqueueleft(transfer->localend));
-}
-
-static void *transfergetdata(struct transfer *transfer, size_t *size)
-{
-    void *buf;
-    
-    if(transfer->localend == NULL)
-       return(NULL);
-    time(&transfer->activity);
-    if((buf = sockgetinbuf(transfer->localend, size)) == NULL)
-       return(NULL);
-    if((transfer->endpos >= 0) && (transfer->curpos + *size >= transfer->endpos))
-    {
-       if((*size = transfer->endpos - transfer->curpos) == 0) {
-           free(buf);
-           buf = NULL;
-       } else {
-           buf = srealloc(buf, *size);
-       }
-    }
-    transfer->curpos += *size;
-    bytesupload += *size;
-    CBCHAINDOCB(transfer, trans_p, transfer);
-    return(buf);
-}
-
 void transferprepul(struct transfer *transfer, off_t size, off_t start, off_t end, struct socket *lesk)
 {
     transfersetsize(transfer, size);
 void transferprepul(struct transfer *transfer, off_t size, off_t start, off_t end, struct socket *lesk)
 {
     transfersetsize(transfer, size);