Initial import
[ldd.git] / ldd / rec.py
CommitLineData
769e7ed9 1# ldd - DNS implementation in Python
2# Copyright (C) 2006 Fredrik Tolf <fredrik@dolda2000.com>
3#
4# This program is free software; you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation; either version 2 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17
18import socket
19import struct
20
21import proto
22import dn
23
24rtypes = []
25
26def addrtype(id, name, syntax):
27 rtypes.append((id, name, syntax))
28
29def rtypebyid(id):
30 for rtype in rtypes:
31 if rtype[0] == id:
32 return rtype
33 return None
34
35def rtypebyname(name):
36 for rtype in rtypes:
37 if rtype[1] == name.upper():
38 return rtype[0]
39 return None
40
41class error(Exception):
42 def __init__(self, text):
43 self.text = text
44
45 def __str__(self):
46 return self.text
47
48class malformedrr(Exception):
49 def __init__(self, text):
50 self.text = text
51
52 def __str__(self):
53 return self.text
54
55class rrhead:
56 def __init__(self, name = None, rtype = None, rclass = None):
57 if rclass is None: rclass = CLASSIN
58 if type(name) == str:
59 self.name = dn.fromstring(name)
60 else:
61 self.name = name
62 if type(rtype) == str:
63 self.rtype = rtypebyname(rtype)
64 if self.rtype is None:
65 raise error("no such rtype " + rtype)
66 else:
67 self.rtype = rtype
68 self.rclass = rclass
69
70 def encode(self, names, offset):
71 ret, names = proto.encodename(self.name, names, offset)
72 ret += struct.pack(">HH", self.rtype, self.rclass)
73 return ret, names
74
75 def __eq__(self, other):
76 return self.name == other.name and self.rtype == other.rtype
77
78 def __str__(self):
79 rtype = rtypebyid(self.rtype)
80 if rtype is None:
81 return "%02x RRhead %s" % (self.rtype, self.name)
82 else:
83 return "%s RRhead %s" % (rtype[1], self.name)
84
85 def istype(self, rtype):
86 if type(rtype) == str:
87 rtype = rtypebyname(rtype)
88 return self.rtype == rtype
89
90 def decode(self, packet, offset):
91 name, offset = proto.decodename(packet, offset)
92 rtype, rclass = struct.unpack(">HH", packet[offset:offset + struct.calcsize(">HH")])
93 offset += struct.calcsize(">HH")
94 ret = rrhead(name, rtype, rclass)
95 return ret, offset
96 decode = classmethod(decode)
97
98class rrdata:
99 def __init__(self, rtype, *args):
100 if type(rtype) == tuple and type(args[0]) == dict:
101 self.rtype = rtype
102 self.rdata = args[0]
103 return
104
105 if type(rtype) == str:
106 self.rtype = rtypebyname(rtype)
107 if self.rtype is None:
108 raise error("no such rtype " + rtype)
109 else:
110 self.rtype = rtype
111 rtid = self.rtype
112 self.rtype = rtypebyid(rtid)
113 if self.rtype is None:
114 raise error("no such rtype " + rtid)
115 self.rdata = {}
116 for i, e in enumerate(self.rtype[2]):
117 d = self.convdata(e[0], args[i])
118 self.rdata[e[1]] = d
119
120 def __eq__(self, other):
121 return(self.rdata == other.rdata)
122
123 def __str__(self):
124 ret = "{"
125 first = True
126 for e in self.rtype[2]:
127 if not first:
128 ret += ", "
129 first = False
130 ret += e[1] + ": "
131 d = self.rdata[e[1]]
132 if e[0] == "4":
133 ret += socket.inet_ntop(socket.AF_INET, d)
134 elif e[0] == "6":
135 ret += socket.inet_ntop(socket.AF_INET6, d)
136 elif e[0] == "s":
137 ret += '"' + d + '"'
138 else:
139 ret += str(d)
140 ret += "}"
141 return ret
142
143 def istype(self, rtype):
144 if type(rtype) == str:
145 rtype = rtypebyname(rtype)
146 return self.rtype[0] == rtype
147
148 def convdata(self, dtype, data):
149 if dtype == "4":
150 if type(data) != str:
151 raise error("IPv4 address must be a string")
152 if len(data) == 4:
153 d = data
154 else:
155 d = socket.inet_pton(socket.AF_INET, data)
156 if dtype == "6":
157 if type(data) != str:
158 raise error("IPv6 address must be a string")
159 if len(data) == 16 and data.find(":") == -1:
160 d = data
161 else:
162 d = socket.inet_pton(socket.AF_INET6, data)
163 if dtype == "d":
164 if type(data) == str:
165 d = dn.fromstring(data)
166 elif isinstance(data, dn.domainname):
167 d = data
168 else:
169 raise error("Domain name must be either proper or string")
170 if dtype == "s":
171 d = str(data)
172 if dtype == "i":
173 d = int(data)
174 return d
175
176 def __iter__(self):
177 return iter(self.rdata)
178
179 def __getitem__(self, i):
180 return self.rdata[i]
181
182 def __setitem__(self, i, v):
183 for e in self.rtype[2]:
184 if e[1] == i:
185 break
186 else:
187 raise error("No such data for " + self.rtype[1] + " record: " + str(i))
188 self.rdata[i] = self.convdata(e[0], v)
189
190 def encode(self, names, offset):
191 ret = ""
192 for e in self.rtype[2]:
193 d = self.rdata[e[1]]
194 if e[2] == "strc":
195 ret += d
196 offset += len(d)
197 if e[2] == "cmdn":
198 buf, names = proto.encodename(d, names, offset)
199 ret += buf
200 offset += len(buf)
201 if e[2] == "lstr":
202 ret += chr(len(d)) + d
203 offset += 1 + len(d)
204 if e[2] == "llstr":
205 ret += struct.pack(">H", len(d)) + d
206 offset += struct.calcsize(">H") + len(d)
207 if e[2] == "short":
208 ret += struct.pack(">H", d)
209 offset += struct.calcsize(">H")
210 if e[2] == "long":
211 ret += struct.pack(">L", d)
212 offset += struct.calcsize(">L")
213 if e[2] == "int6":
214 ret += struct.pack(">Q", d)[-6:]
215 offset += 6
216 return ret, names
217
218 def decode(self, rtid, packet, offset, dlen):
219 rtype = rtypebyid(rtid)
220 origoff = offset
221 rdata = {}
222 if rtype is None:
223 rtype = (rtid, "Unknown", [("s", "unknown", "strc", dlen)])
224 for e in rtype[2]:
225 if e[2] == "strc":
226 d = packet[offset:offset + e[3]]
227 offset += e[3]
228 if e[2] == "cmdn":
229 d, offset = proto.decodename(packet, offset)
230 if e[2] == "lstr":
231 dl = ord(packet[offset])
232 offset += 1
233 d = packet[offset:offset + dl]
234 offset += dl
235 if e[2] == "llstr":
236 (dl,) = struct.unpack(">H", packet[offset:offset + struct.calcsize(">H")])
237 offset += struct.calcsize(">H")
238 d = packet[offset:offset + dl]
239 offset += dl
240 if e[2] == "short":
241 (d,) = struct.unpack(">H", packet[offset:offset + struct.calcsize(">H")])
242 offset += struct.calcsize(">H")
243 if e[2] == "long":
244 (d,) = struct.unpack(">L", packet[offset:offset + struct.calcsize(">L")])
245 offset += struct.calcsize(">L")
246 if e[2] == "int6":
247 (d,) = struct.unpack(">Q", ("\0" * (struct.calcsize(">Q") - 6)) + packet[offset:offset + 6])
248 offset += 6
249 rdata[e[1]] = d
250 if origoff + dlen != offset:
251 raise malformedrr(rtype[1] + " RR data length mismatch")
252 return rrdata(rtype, rdata)
253 decode = classmethod(decode)
254
255class rr:
256 def __init__(self, head, ttl, data):
257 if type(head) == tuple:
258 self.head = rrhead(*head)
259 else:
260 self.head = head
261 self.ttl = ttl
262 self.data = data
263 self.flags = set()
264
265 def setflags(self, flags):
266 self.flags |= set(flags)
267
268 def clrflags(self, flags):
269 self.flags -= set(flags)
270
271 def encode(self, names, offset):
272 ret, names = self.head.encode(names, offset)
273 if self.data is None:
274 data = ""
275 else:
276 data, names = self.data.encode(names, offset + len(ret) + struct.calcsize(">LH"))
277 ret += struct.pack(">LH", self.ttl, len(data))
278 ret += data
279 return ret, names
280
281 def __eq__(self, other):
282 return self.head == other.head and self.ttl == other.ttl and self.data == other.data
283
284 def __str__(self):
285 rtype = rtypebyid(self.head.rtype)
286 if rtype is None:
287 ret = "%02x" % self.head.rtype
288 else:
289 ret = rtype[1]
290 ret += " RR %s, TTL=%i: %s" % (self.head.name, self.ttl, self.data)
291 if len(self.flags) > 0:
292 ret += " (Flags:"
293 for f in self.flags:
294 ret += " " + f
295 ret += ")"
296 return ret
297
298 def decode(self, packet, offset):
299 head, offset = rrhead.decode(packet, offset)
300 ttl, dlen = struct.unpack(">LH", packet[offset:offset + struct.calcsize(">LH")])
301 offset += struct.calcsize(">LH")
302 if dlen == 0:
303 data = None
304 else:
305 data = rrdata.decode(head.rtype, packet, offset, dlen)
306 offset += dlen
307 return rr(head, ttl, data), offset
308 decode = classmethod(decode)
309
310addrtype(0x01, "A", [("4", "address", "strc", 4)])
311addrtype(0x02, "NS", [("d", "nsname", "cmdn")])
312addrtype(0x05, "CNAME", [("d", "priname", "cmdn")])
313addrtype(0x06, "SOA", [("d", "priserv", "cmdn"),
314 ("d", "mailbox", "cmdn"),
315 ("i", "serial", "long"),
316 ("i", "refresh", "long"),
317 ("i", "retry", "long"),
318 ("i", "expire", "long"),
319 ("i", "minttl", "long")])
320addrtype(0x0c, "PTR", [("d", "target", "cmdn")])
321addrtype(0x0f, "MX", [("i", "prio", "short"),
322 ("d", "target", "cmdn")])
323addrtype(0x10, "TXT", [("s", "rrtext", "lstr")])
324addrtype(0x1c, "AAAA", [("6", "address", "strc", 16)])
325addrtype(0x21, "SRV", [("i", "prio", "short"),
326 ("i", "weight", "short"),
327 ("i", "port", "short"),
328 ("d", "target", "cmdn")])
329addrtype(0xfa, "TSIG", [("d", "algo", "cmdn"),
330 ("i", "stime", "int6"),
331 ("i", "fudge", "short"),
332 ("s", "mac", "llstr"),
333 ("i", "orgid", "short"),
334 ("i", "err", "short"),
335 ("s", "other", "llstr")])
336
337CLASSIN = 1
338CLASSCS = 2
339CLASSCH = 3
340CLASSHS = 4
341CLASSNONE = 254
342CLASSANY = 255