X-Git-Url: http://dolda2000.com/gitweb/?a=blobdiff_plain;f=wrw%2Fproto.py;h=70f8f77b8a68bfe20b471cbec687ac6ee1d9ddb0;hb=HEAD;hp=80e24197843c4b71f2ea04c7de826998a169eab2;hpb=51a13716668cc48bf87e0d0296f8b9900fafe83b;p=wrw.git diff --git a/wrw/proto.py b/wrw/proto.py index 80e2419..2438c7c 100644 --- a/wrw/proto.py +++ b/wrw/proto.py @@ -1,4 +1,4 @@ -import time, calendar, collections, binascii, base64 +import time, calendar, collections.abc, binascii, base64 statusinfo = { 400: ("Bad Request", "Invalid HTTP request."), @@ -6,6 +6,7 @@ statusinfo = { 403: ("Forbidden", "You are not authorized to request the requested resource."), 404: ("Not Found", "The requested resource was not found."), 405: ("Method Not Allowed", "The request method is not recognized or permitted by the requested resource."), + 429: ("Too Many Requests", "Your client is sending more frequent requests than are accepted."), 500: ("Server Error", "An internal error occurred."), 501: ("Not Implemented", "The requested functionality has not been implemented."), 503: ("Service Unavailable", "Service is being denied at this time."), @@ -106,7 +107,7 @@ def urlq(url): if isinstance(url, str): url = url.encode("utf-8") ret = "" - invalid = b"%;&=#?/\"'" + invalid = b"%;&=+#?/\"'" for c in url: if c in invalid or (c <= 32) or (c >= 128): ret += "%%%02X" % c @@ -187,11 +188,15 @@ def parstring(pars={}, **augment): del augment[key] else: val = pars[key] + if val is None: + continue if buf != "": buf += "&" buf += urlq(key) + "=" + urlq(str(val)) - for key in augment: + for key, val in augment.items(): + if val is None: + continue if buf != "": buf += "&" - buf += urlq(key) + "=" + urlq(str(augment[key])) + buf += urlq(key) + "=" + urlq(str(val)) return buf def parurl(url, pars={}, **augment): @@ -205,7 +210,7 @@ def parurl(url, pars={}, **augment): def enhex(bs): return base64.b16encode(bs).decode("us-ascii") def unhex(es): - if not isinstance(es, collections.ByteString): + if not isinstance(es, collections.abc.ByteString): try: es = es.encode("us-ascii") except UnicodeError: @@ -214,7 +219,7 @@ def unhex(es): def enb32(bs): return base64.b32encode(bs).decode("us-ascii") def unb32(es): - if not isinstance(es, collections.ByteString): + if not isinstance(es, collections.abc.ByteString): try: es = es.encode("us-ascii") except UnicodeError: @@ -226,7 +231,7 @@ def unb32(es): def enb64(bs): return base64.b64encode(bs).decode("us-ascii") def unb64(es): - if not isinstance(es, collections.ByteString): + if not isinstance(es, collections.abc.ByteString): try: es = es.encode("us-ascii") except UnicodeError: @@ -234,3 +239,84 @@ def unb64(es): if (len(es) % 4) != 0: es += b"=" * (4 - (len(es) % 4)) return base64.b64decode(es) + +def _quoprisafe(): + ret = [False] * 256 + for c in "-!*+/": + ret[ord(c)] = True + for c in range(ord('0'), ord('9') + 1): + ret[c] = True + for c in range(ord('A'), ord('Z') + 1): + ret[c] = True + for c in range(ord('a'), ord('z') + 1): + ret[c] = True + return ret +_quoprisafe = _quoprisafe() +def quopri(s, charset="utf-8"): + bv = s.encode(charset) + qn = sum(not _quoprisafe[b] for b in bv) + if qn == 0: + return s + if qn > len(bv) / 2: + return "=?%s?B?%s?=" % (charset, enb64(bv)) + else: + return "=?%s?Q?%s?=" % (charset, "".join(chr(b) if _quoprisafe[b] else "=%02X" % b for b in bv)) + +class mimeparam(object): + def __init__(self, name, val, fallback=None, charset="utf-8", lang=""): + self.name = name + self.val = val + self.fallback = fallback + self.charset = charset + self.lang = lang + + def __str__(self): + self.name.encode("ascii") + try: + self.val.encode("ascii") + except UnicodeError: + pass + else: + return "%s=%s" % (self.name, self.val) + val = self.val.encode(self.charset) + self.charset.encode("ascii") + self.lang.encode("ascii") + ret = "" + if self.fallback is not None: + self.fallback.encode("ascii") + ret += "%s=%s; " % (self.name, self.fallback) + ret += "%s*=%s'%s'%s" % (self.name, self.charset, self.lang, urlq(val)) + return ret + +class mimeheader(object): + def __init__(self, name, val, *, mime_charset="utf-8", mime_lang="", **params): + self.name = name + self.val = val + self.params = {} + self.charset = mime_charset + self.lang = mime_lang + for k, v in params.items(): + self[k] = v + + def __getitem__(self, nm): + return self.params[nm.lower()] + + def __setitem__(self, nm, val): + if not isinstance(val, mimeparam): + val = mimeparam(nm, val, charset=self.charset, lang=self.lang) + self.params[nm.lower()] = val + + def __delitem__(self, nm): + del self.params[nm.lower()] + + def value(self): + parts = [] + if self.val != None: + parts.append(quopri(self.val)) + parts.extend(str(x) for x in self.params.values()) + return("; ".join(parts)) + + def __str__(self): + if self.name is None: + return self.value() + return "%s: %s" % (self.name, self.value())