X-Git-Url: http://dolda2000.com/gitweb/?a=blobdiff_plain;f=daemon%2Ftransfer.c;h=ab2579d64cdeed5bcb52e6212b246b3f5734faf1;hb=f5dbbe62975d51f5096e1b38c3d982e6af093d8b;hp=61b3483de526b6bbc77812421648329952df96c3;hpb=2c086721aeffb8fb66d5d2653f9db0934563b5c4;p=doldaconnect.git diff --git a/daemon/transfer.c b/daemon/transfer.c index 61b3483..ab2579d 100644 --- a/daemon/transfer.c +++ b/daemon/transfer.c @@ -39,15 +39,19 @@ #include "client.h" static void killfilter(struct transfer *transfer); +static int transferread(struct socket *sk, struct transfer *transfer); +static int transferwrite(struct socket *sk, struct transfer *transfer); +static int transfererr(struct socket *sk, int errno, struct transfer *transfer); +static int filterread(struct socket *sk, struct transfer *transfer); +unsigned long long bytesupload = 0; +unsigned long long bytesdownload = 0; struct transfer *transfers = NULL; int numtransfers = 0; GCBCHAIN(newtransfercb, struct transfer *); void freetransfer(struct transfer *transfer) { - struct transarg *ta; - if(transfer == transfers) transfers = transfer->next; if(transfer->next != NULL) @@ -60,13 +64,8 @@ void freetransfer(struct transfer *transfer) CBCHAINFREE(transfer, trans_p); CBCHAINFREE(transfer, trans_destroy); CBCHAINFREE(transfer, trans_filterout); - while((ta = transfer->args) != NULL) - { - transfer->args = ta->next; - free(ta->rec); - free(ta->val); - free(ta); - } + while(transfer->args != NULL) + freewcspair(transfer->args, &transfer->args); if(transfer->filter != -1) killfilter(transfer); if(transfer->etimer != NULL) @@ -85,18 +84,18 @@ void freetransfer(struct transfer *transfer) free(transfer->filterbuf); if(transfer->hash != NULL) freehash(transfer->hash); + if(transfer->exitstatus != NULL) + free(transfer->exitstatus); if(transfer->localend != NULL) { - transfer->localend->readcb = NULL; - transfer->localend->writecb = NULL; - transfer->localend->errcb = NULL; + CBUNREG(transfer->localend, socket_read, transferread, transfer); + CBUNREG(transfer->localend, socket_write, transferwrite, transfer); + CBUNREG(transfer->localend, socket_err, transfererr, transfer); putsock(transfer->localend); } if(transfer->filterout != NULL) { - transfer->filterout->readcb = NULL; - transfer->filterout->writecb = NULL; - transfer->filterout->errcb = NULL; + CBUNREG(transfer->filterout, socket_read, filterread, transfer); putsock(transfer->filterout); } if(transfer->fn != NULL) @@ -128,17 +127,6 @@ struct transfer *newtransfer(void) return(new); } -void transferaddarg(struct transfer *transfer, wchar_t *rec, wchar_t *val) -{ - struct transarg *ta; - - ta = smalloc(sizeof(*ta)); - ta->rec = swcsdup(rec); - ta->val = swcsdup(val); - ta->next = transfer->args; - transfer->args = ta; -} - void transferattach(struct transfer *transfer, struct transferiface *iface, void *data) { if(transfer->iface != NULL) @@ -169,6 +157,18 @@ struct transfer *finddownload(wchar_t *peerid) return(transfer); } +struct transfer *hasupload(struct fnet *fnet, wchar_t *peerid) +{ + struct transfer *transfer; + + for(transfer = transfers; transfer != NULL; transfer = transfer->next) + { + if((transfer->dir == TRNSD_UP) && (transfer->fnet == fnet) && !wcscmp(transfer->peerid, peerid)) + break; + } + return(transfer); +} + struct transfer *newupload(struct fnetnode *fn, struct fnet *fnet, wchar_t *nickid, struct transferiface *iface, void *data) { struct transfer *transfer; @@ -233,24 +233,27 @@ static void transexpire(int cancelled, struct transfer *transfer) transfer->timeout = 0; } -static void transferread(struct socket *sk, struct transfer *transfer) +static int transferread(struct socket *sk, struct transfer *transfer) { if(sockgetdatalen(sk) >= 65536) sk->ignread = 1; if((transfer->iface != NULL) && (transfer->iface->gotdata != NULL)) transfer->iface->gotdata(transfer, transfer->ifacedata); + return(0); } -static void transferwrite(struct socket *sk, struct transfer *transfer) +static int transferwrite(struct socket *sk, struct transfer *transfer) { if((transfer->iface != NULL) && (transfer->iface->wantdata != NULL)) transfer->iface->wantdata(transfer, transfer->ifacedata); + return(0); } -static void transfererr(struct socket *sk, int errno, struct transfer *transfer) +static int transfererr(struct socket *sk, int errno, struct transfer *transfer) { if((transfer->iface != NULL) && (transfer->iface->endofdata != NULL)) transfer->iface->endofdata(transfer, transfer->ifacedata); + return(0); } void transferputdata(struct transfer *transfer, void *buf, size_t size) @@ -258,6 +261,7 @@ void transferputdata(struct transfer *transfer, void *buf, size_t size) time(&transfer->activity); sockqueue(transfer->localend, buf, size); transfer->curpos += size; + bytesdownload += size; CBCHAINDOCB(transfer, trans_p, transfer); } @@ -266,9 +270,9 @@ void transferendofdata(struct transfer *transfer) if(transfer->curpos >= transfer->size) { transfersetstate(transfer, TRNS_DONE); - transfer->localend->readcb = NULL; - transfer->localend->writecb = NULL; - transfer->localend->errcb = NULL; + CBUNREG(transfer->localend, socket_read, transferread, transfer); + CBUNREG(transfer->localend, socket_write, transferwrite, transfer); + CBUNREG(transfer->localend, socket_err, transfererr, transfer); putsock(transfer->localend); transfer->localend = NULL; } else { @@ -297,6 +301,7 @@ void *transfergetdata(struct transfer *transfer, size_t *size) buf = srealloc(buf, *size); } transfer->curpos += *size; + bytesupload += *size; CBCHAINDOCB(transfer, trans_p, transfer); return(buf); } @@ -329,20 +334,64 @@ void transfersetlocalend(struct transfer *transfer, struct socket *sk) if(transfer->localend != NULL) putsock(transfer->localend); getsock(transfer->localend = sk); - sk->data = transfer; - sk->readcb = (void (*)(struct socket *, void *))transferread; - sk->writecb = (void (*)(struct socket *, void *))transferwrite; - sk->errcb = (void (*)(struct socket *, int, void *))transfererr; + CBREG(sk, socket_read, (int (*)(struct socket *, void *))transferread, NULL, transfer); + CBREG(sk, socket_write, (int (*)(struct socket *, void *))transferwrite, NULL, transfer); + CBREG(sk, socket_err, (int (*)(struct socket *, int, void *))transfererr, NULL, transfer); } -void bumptransfer(struct transfer *transfer) +static int tryreq(struct transfer *transfer) { struct fnetnode *fn; struct fnetpeer *peer; + + if((fn = transfer->fn) != NULL) + { + if(fn->state != FNN_EST) + { + transfer->close = 1; + return(1); + } + peer = fnetfindpeer(fn, transfer->peerid); + } else { + peer = NULL; + for(fn = fnetnodes; fn != NULL; fn = fn->next) + { + if((fn->state == FNN_EST) && (fn->fnet == transfer->fnet) && ((peer = fnetfindpeer(fn, transfer->peerid)) != NULL)) + break; + } + } + if(peer != NULL) + { + time(&transfer->lastreq); + return(fn->fnet->reqconn(peer)); + } + return(1); +} + +void trytransferbypeer(struct fnet *fnet, wchar_t *peerid) +{ + struct transfer *transfer; + + for(transfer = transfers; transfer != NULL; transfer = transfer->next) + { + if((transfer->dir == TRNSD_DOWN) && (transfer->state == TRNS_WAITING)) + { + if((transfer->fnet == fnet) && !wcscmp(transfer->peerid, peerid)) + { + if(!tryreq(transfer)) + return; + } + } + } +} + +void bumptransfer(struct transfer *transfer) +{ time_t now; if((now = time(NULL)) < transfer->timeout) { + if(transfer->etimer == NULL) transfer->etimer = timercallback(transfer->timeout, (void (*)(int, void *))transexpire, transfer); return; @@ -352,32 +401,9 @@ void bumptransfer(struct transfer *transfer) switch(transfer->state) { case TRNS_WAITING: - if(transfer->fn != NULL) - { - fn = transfer->fn; - if(fn->state != FNN_EST) - { - transfer->close = 1; - return; - } - peer = fnetfindpeer(fn, transfer->peerid); - } else { - peer = NULL; - for(fn = fnetnodes; fn != NULL; fn = fn->next) - { - if((fn->state == FNN_EST) && (fn->fnet == transfer->fnet) && ((peer = fnetfindpeer(fn, transfer->peerid)) != NULL)) - break; - } - } transfer->etimer = timercallback(transfer->timeout = (time(NULL) + 30), (void (*)(int, void *))transexpire, transfer); if(now - transfer->lastreq > 30) - { - if(peer != NULL) - { - fn->fnet->reqconn(peer); - time(&transfer->lastreq); - } - } + tryreq(transfer); break; case TRNS_HS: if(transfer->dir == TRNSD_UP) @@ -486,15 +512,15 @@ static void killfilter(struct transfer *transfer) } if(transfer->localend) { - transfer->localend->readcb = NULL; - transfer->localend->writecb = NULL; - transfer->localend->errcb = NULL; + CBUNREG(transfer->localend, socket_read, transferread, transfer); + CBUNREG(transfer->localend, socket_write, transferwrite, transfer); + CBUNREG(transfer->localend, socket_err, transfererr, transfer); putsock(transfer->localend); transfer->localend = NULL; } if(transfer->filterout) { - transfer->filterout->readcb = NULL; + CBUNREG(transfer->filterout, socket_read, filterread, transfer); putsock(transfer->filterout); transfer->filterout = NULL; } @@ -558,17 +584,28 @@ static char *findfilter(struct passwd *pwd) return(NULL); } -static void filterread(struct socket *sk, struct transfer *transfer) +static void handletranscmd(struct transfer *transfer, wchar_t *cmd, wchar_t *arg) +{ + if(!wcscmp(cmd, L"status")) { + if(arg == NULL) + arg = L""; + if(transfer->exitstatus != NULL) + free(transfer->exitstatus); + transfer->exitstatus = swcsdup(arg); + } +} + +static int filterread(struct socket *sk, struct transfer *transfer) { char *buf, *p, *p2; size_t bufsize; wchar_t *cmd, *arg; if((buf = sockgetinbuf(sk, &bufsize)) == NULL) - return; + return(0); bufcat(transfer->filterbuf, buf, bufsize); free(buf); - if((p = memchr(transfer->filterbuf, '\n', transfer->filterbufdata)) != NULL) + while((p = memchr(transfer->filterbuf, '\n', transfer->filterbufdata)) != NULL) { *(p++) = 0; if((p2 = strchr(transfer->filterbuf, ' ')) != NULL) @@ -579,8 +616,9 @@ static void filterread(struct socket *sk, struct transfer *transfer) if(p2 != NULL) { if((arg = icmbstowcs(p2, NULL)) == NULL) - flog(LOG_WARNING, "filter sent a string which could not be converted into the local charset: %s: %s", transfer->filterbuf, strerror(errno)); + flog(LOG_WARNING, "filter sent a string which could not be converted into the local charset: %s: %s", p2, strerror(errno)); } + handletranscmd(transfer, cmd, arg); CBCHAINDOCB(transfer, trans_filterout, transfer, cmd, arg); if(arg != NULL) free(arg); @@ -590,11 +628,14 @@ static void filterread(struct socket *sk, struct transfer *transfer) } memmove(transfer->filterbuf, p, transfer->filterbufdata -= (p - transfer->filterbuf)); } + return(0); } static void filterexit(pid_t pid, int status, void *data) { struct transfer *transfer; + struct fnet *fnet; + wchar_t *peerid; for(transfer = transfers; transfer != NULL; transfer = transfer->next) { @@ -602,12 +643,14 @@ static void filterexit(pid_t pid, int status, void *data) { transfer->filter = -1; killfilter(transfer); + fnet = transfer->fnet; + peerid = swcsdup(transfer->peerid); if(WEXITSTATUS(status)) - { resettransfer(transfer); - } else { + else freetransfer(transfer); - } + trytransferbypeer(fnet, peerid); + free(peerid); break; } } @@ -623,7 +666,7 @@ int forkfilter(struct transfer *transfer) char **argv; size_t argvsize, argvdata; struct socket *insock, *outsock; - struct transarg *ta; + struct wcspair *ta; char *rec, *val; wfilename = transfer->path; @@ -682,9 +725,22 @@ int forkfilter(struct transfer *transfer) addtobuf(argv, filename); addtobuf(argv, buf); addtobuf(argv, peerid); + if(transfer->hash) + { + if((buf = icwcstombs(unparsehash(transfer->hash), NULL)) != NULL) + { + /* XXX: I am very doubtful of this, but it can just as + * well be argued that all data should be presented as + * key-value pairs. */ + addtobuf(argv, "hash"); + addtobuf(argv, buf); + } else { + flog(LOG_WARNING, "could not convert hash to local charset"); + } + } for(ta = transfer->args; ta != NULL; ta = ta->next) { - if((rec = icwcstombs(ta->rec, NULL)) == NULL) + if((rec = icwcstombs(ta->key, NULL)) == NULL) continue; if((val = icwcstombs(ta->val, NULL)) == NULL) continue; @@ -709,8 +765,7 @@ int forkfilter(struct transfer *transfer) transfer->filter = pid; transfersetlocalend(transfer, insock); getsock(transfer->filterout = outsock); - outsock->data = transfer; - outsock->readcb = (void (*)(struct socket *, void *))filterread; + CBREG(outsock, socket_read, (int (*)(struct socket *, void *))filterread, NULL, transfer); putsock(insock); putsock(outsock); free(filtername); @@ -751,6 +806,7 @@ static struct configvar myvars[] = {CONF_VAR_INT, "ultos", {.num = SOCK_TOS_MAXTP}}, {CONF_VAR_INT, "dltos", {.num = SOCK_TOS_MAXTP}}, {CONF_VAR_STRING, "filter", {.str = L"dc-filter"}}, + {CONF_VAR_BOOL, "ulquota", {.num = 0}}, {CONF_VAR_END} };