| 1 | # ldd - DNS implementation in Python |
| 2 | # Copyright (C) 2006 Fredrik Tolf <fredrik@dolda2000.com> |
| 3 | # |
| 4 | # This program is free software; you can redistribute it and/or modify |
| 5 | # it under the terms of the GNU General Public License as published by |
| 6 | # the Free Software Foundation; either version 2 of the License, or |
| 7 | # (at your option) any later version. |
| 8 | # |
| 9 | # This program is distributed in the hope that it will be useful, |
| 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | # GNU General Public License for more details. |
| 13 | # |
| 14 | # You should have received a copy of the GNU General Public License |
| 15 | # along with this program; if not, write to the Free Software |
| 16 | # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
| 17 | |
| 18 | import base64 |
| 19 | import time |
| 20 | import struct |
| 21 | from Crypto.Hash import HMAC, MD5 |
| 22 | |
| 23 | import proto, rec, dn |
| 24 | |
| 25 | class tsigkey: |
| 26 | def __init__(self, name, algo, secret): |
| 27 | if type(name) == str: |
| 28 | self.name = dn.fromstring(name) |
| 29 | else: |
| 30 | self.name = name |
| 31 | if type(algo) == str: |
| 32 | self.algo = algobyname[algo] |
| 33 | else: |
| 34 | self.algo = algo |
| 35 | self.secret = secret |
| 36 | |
| 37 | def sign(self, message): |
| 38 | return self.algo.sign(self.secret, message) |
| 39 | |
| 40 | class tsigalgo: |
| 41 | def __init__(self, name, cname, function): |
| 42 | self.name = name |
| 43 | if type(cname) == str: |
| 44 | self.cname = dn.fromstring(cname) |
| 45 | else: |
| 46 | self.cname = cname |
| 47 | self.function = function |
| 48 | |
| 49 | def sign(self, secret, message): |
| 50 | return self.function(secret, message) |
| 51 | |
| 52 | class tsigctx: |
| 53 | def __init__(self, key, pkt, sr): |
| 54 | self.key = key |
| 55 | self.prevmac = sr.data["mac"] |
| 56 | self.error = 0 |
| 57 | |
| 58 | def signpkt(self, pkt): |
| 59 | tsigsign(pkt, None, ctx = self, error = self.error) |
| 60 | |
| 61 | def tsigsign(pkt, key, stime = None, fudge = 300, error = 0, other = "", ctx = None): |
| 62 | if stime is None: stime = int(time.time()) |
| 63 | msg = "" |
| 64 | if ctx is not None: |
| 65 | if key is None: |
| 66 | key = ctx.key |
| 67 | msg += struct.pack(">H", len(ctx.prevmac)) + ctx.prevmac |
| 68 | msg += pkt.encode() |
| 69 | msg += key.name.canonwire() |
| 70 | msg += struct.pack(">HL", rec.CLASSANY, 0) |
| 71 | msg += key.algo.cname.canonwire() |
| 72 | msg += struct.pack(">Q", stime)[-6:] |
| 73 | msg += struct.pack(">3H", fudge, error, len(other)) |
| 74 | msg += other |
| 75 | digest = key.sign(msg) |
| 76 | pkt.addad(rec.rr((key.name, "TSIG", rec.CLASSANY), 0, rec.rrdata("TSIG", key.algo.cname, stime, fudge, digest, pkt.qid, error, other))) |
| 77 | pkt.signed = True |
| 78 | |
| 79 | def tsigverify(pkt, keys, vertime = None): |
| 80 | if vertime is None: vertime = int(time.time()) |
| 81 | if len(pkt.adlist) < 1: |
| 82 | return proto.FORMERR |
| 83 | sr = pkt.adlist[-1] |
| 84 | pkt.adlist = pkt.adlist[:-1] |
| 85 | if not sr.head.istype("TSIG") or sr.head.rclass != rec.CLASSANY: |
| 86 | return proto.FORMERR |
| 87 | for key in keys: |
| 88 | if key.name == sr.head.name: |
| 89 | break |
| 90 | else: |
| 91 | return proto.BADKEY |
| 92 | if key.algo.cname != sr.data["algo"]: |
| 93 | return proto.BADKEY |
| 94 | |
| 95 | pkt.tsigctx = ctx = tsigctx(key, pkt, sr) |
| 96 | |
| 97 | other = sr.data["other"] |
| 98 | msg = pkt.encode() |
| 99 | msg += key.name.canonwire() |
| 100 | msg += struct.pack(">HL", rec.CLASSANY, 0) |
| 101 | msg += key.algo.cname.canonwire() |
| 102 | msg += struct.pack(">Q", sr.data["stime"])[-6:] |
| 103 | msg += struct.pack(">3H", sr.data["fudge"], sr.data["err"], len(other)) |
| 104 | msg += other |
| 105 | digest = key.sign(msg) |
| 106 | if digest != sr.data["mac"]: |
| 107 | pkt.tsigctx = proto.BADSIG |
| 108 | return proto.BADSIG |
| 109 | if vertime != 0: |
| 110 | if abs(vertime - sr.data["stime"]) > sr.data["fudge"]: |
| 111 | pkt.tsigctx = proto.BADTIME |
| 112 | return proto.BADTIME |
| 113 | return key |
| 114 | |
| 115 | def signhmacmd5(secret, message): |
| 116 | s = HMAC.HMAC(secret, digestmod = MD5) |
| 117 | s.update(message) |
| 118 | return s.digest() |
| 119 | |
| 120 | def readkeys(keyfile): |
| 121 | close = False |
| 122 | if type(keyfile) == str: |
| 123 | keyfile = open(keyfile, "r") |
| 124 | close = True |
| 125 | try: |
| 126 | ret = [] |
| 127 | for line in keyfile: |
| 128 | words = line.split() |
| 129 | if len(words) < 3: |
| 130 | continue |
| 131 | ret += [tsigkey(dn.fromstring(words[0]), words[1], base64.b64decode(words[2]))] |
| 132 | return ret |
| 133 | finally: |
| 134 | if close: keyfile.close() |
| 135 | |
| 136 | algos = [tsigalgo("hmac-md5", "hmac-md5.sig-alg.reg.int.", signhmacmd5)] |
| 137 | |
| 138 | algobyname = dict([(a.name, a) for a in algos]) |