769e7ed9 |
1 | # ldd - DNS implementation in Python |
2 | # Copyright (C) 2006 Fredrik Tolf <fredrik@dolda2000.com> |
3 | # |
4 | # This program is free software; you can redistribute it and/or modify |
5 | # it under the terms of the GNU General Public License as published by |
6 | # the Free Software Foundation; either version 2 of the License, or |
7 | # (at your option) any later version. |
8 | # |
9 | # This program is distributed in the hope that it will be useful, |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | # GNU General Public License for more details. |
13 | # |
14 | # You should have received a copy of the GNU General Public License |
15 | # along with this program; if not, write to the Free Software |
16 | # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
17 | |
18 | import base64 |
19 | import time |
20 | import struct |
21 | from Crypto.Hash import HMAC, MD5 |
22 | |
23 | import proto, rec, dn |
24 | |
25 | class tsigkey: |
26 | def __init__(self, name, algo, secret): |
27 | if type(name) == str: |
28 | self.name = dn.fromstring(name) |
29 | else: |
30 | self.name = name |
31 | if type(algo) == str: |
32 | self.algo = algobyname[algo] |
33 | else: |
34 | self.algo = algo |
35 | self.secret = secret |
36 | |
37 | def sign(self, message): |
38 | return self.algo.sign(self.secret, message) |
39 | |
40 | class tsigalgo: |
41 | def __init__(self, name, cname, function): |
42 | self.name = name |
43 | if type(cname) == str: |
44 | self.cname = dn.fromstring(cname) |
45 | else: |
46 | self.cname = cname |
47 | self.function = function |
48 | |
49 | def sign(self, secret, message): |
50 | return self.function(secret, message) |
51 | |
52 | class tsigctx: |
53 | def __init__(self, key, pkt, sr): |
54 | self.key = key |
55 | self.prevmac = sr.data["mac"] |
56 | self.error = 0 |
57 | |
58 | def signpkt(self, pkt): |
59 | tsigsign(pkt, None, ctx = self, error = self.error) |
60 | |
61 | def tsigsign(pkt, key, stime = None, fudge = 300, error = 0, other = "", ctx = None): |
62 | if stime is None: stime = int(time.time()) |
63 | msg = "" |
64 | if ctx is not None: |
65 | if key is None: |
66 | key = ctx.key |
67 | msg += struct.pack(">H", len(ctx.prevmac)) + ctx.prevmac |
68 | msg += pkt.encode() |
69 | msg += key.name.canonwire() |
70 | msg += struct.pack(">HL", rec.CLASSANY, 0) |
71 | msg += key.algo.cname.canonwire() |
72 | msg += struct.pack(">Q", stime)[-6:] |
73 | msg += struct.pack(">3H", fudge, error, len(other)) |
74 | msg += other |
75 | digest = key.sign(msg) |
76 | pkt.addad(rec.rr((key.name, "TSIG", rec.CLASSANY), 0, rec.rrdata("TSIG", key.algo.cname, stime, fudge, digest, pkt.qid, error, other))) |
77 | pkt.signed = True |
78 | |
79 | def tsigverify(pkt, keys, vertime = None): |
80 | if vertime is None: vertime = int(time.time()) |
81 | if len(pkt.adlist) < 1: |
82 | return proto.FORMERR |
83 | sr = pkt.adlist[-1] |
84 | pkt.adlist = pkt.adlist[:-1] |
85 | if not sr.head.istype("TSIG") or sr.head.rclass != rec.CLASSANY: |
86 | return proto.FORMERR |
87 | for key in keys: |
88 | if key.name == sr.head.name: |
89 | break |
90 | else: |
91 | return proto.BADKEY |
92 | if key.algo.cname != sr.data["algo"]: |
93 | return proto.BADKEY |
94 | |
95 | pkt.tsigctx = ctx = tsigctx(key, pkt, sr) |
96 | |
97 | other = sr.data["other"] |
98 | msg = pkt.encode() |
99 | msg += key.name.canonwire() |
100 | msg += struct.pack(">HL", rec.CLASSANY, 0) |
101 | msg += key.algo.cname.canonwire() |
102 | msg += struct.pack(">Q", sr.data["stime"])[-6:] |
103 | msg += struct.pack(">3H", sr.data["fudge"], sr.data["err"], len(other)) |
104 | msg += other |
105 | digest = key.sign(msg) |
106 | if digest != sr.data["mac"]: |
107 | pkt.tsigctx = proto.BADSIG |
108 | return proto.BADSIG |
109 | if vertime != 0: |
110 | if abs(vertime - sr.data["stime"]) > sr.data["fudge"]: |
111 | pkt.tsigctx = proto.BADTIME |
112 | return proto.BADTIME |
113 | return key |
114 | |
115 | def signhmacmd5(secret, message): |
116 | s = HMAC.HMAC(secret, digestmod = MD5) |
117 | s.update(message) |
118 | return s.digest() |
119 | |
120 | def readkeys(keyfile): |
121 | close = False |
122 | if type(keyfile) == str: |
123 | keyfile = open(keyfile, "r") |
124 | close = True |
125 | try: |
126 | ret = [] |
127 | for line in keyfile: |
128 | words = line.split() |
129 | if len(words) < 3: |
130 | continue |
131 | ret += [tsigkey(dn.fromstring(words[0]), words[1], base64.b64decode(words[2]))] |
132 | return ret |
133 | finally: |
134 | if close: keyfile.close() |
135 | |
136 | algos = [tsigalgo("hmac-md5", "hmac-md5.sig-alg.reg.int.", signhmacmd5)] |
137 | |
138 | algobyname = dict([(a.name, a) for a in algos]) |