Added some account inspection.
[fulbank.git] / fulbank / fsb.py
1 import json, http.cookiejar, binascii, time, pickle
2 from urllib import request, parse
3 from bs4 import BeautifulSoup as soup
4 soupify = lambda cont: soup(cont, "html.parser")
5
6 apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/"
7 loginurl = "https://online.swedbank.se/app/privat/login"
8 serviceid = "B7dZHQcY78VRVz9l"
9
10 class fmterror(Exception):
11     pass
12
13 class autherror(Exception):
14     pass
15
16 def resolve(d, keys, default=fmterror):
17     def err():
18         if default is fmterror:
19             raise fmterror()
20         return default
21     def rec(d, keys):
22         if len(keys) == 0:
23             return d
24         if isinstance(d, dict):
25             if keys[0] not in d:
26                 return err()
27             return rec(d[keys[0]], keys[1:])
28         else:
29             return err()
30     return rec(d, keys)
31
32 def linkurl(ln):
33     if ln[0] != '/':
34         raise fmterror("unexpected link url: " + ln)
35     return parse.urljoin(apibase, ln[1:])
36
37 def getdsid():
38     with request.urlopen(loginurl) as resp:
39         if resp.code != 200:
40             raise fmterror("Unexpected HTTP status code: " + str(resp.code))
41         doc = soupify(resp.read())
42     dsel = doc.find("div", id="cust-sess-id")
43     if not dsel or not dsel.has_attr("value"):
44         raise fmterror("DSID DIV not on login page")
45     return dsel["value"]
46
47 def base64(data):
48     return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
49
50 class transaction(object):
51     def __init__(self, account, data):
52         self.account = account
53         self._data = data
54
55     @property
56     def amount(self): return float(resolve(self._data, ("amount",)))
57     @property
58     def message(self): return resolve(self._data, ("details", "message"))
59
60     def __repr__(self):
61         return "#<fsb.transaction %.2f: %r>" % (self.amount, self.message)
62
63 class account(object):
64     def __init__(self, sess, id, idata):
65         self.sess = sess
66         self.id = id
67         self._data = None
68         self._idata = idata
69
70     @property
71     def data(self):
72         if self._data is None:
73             self._data = self.sess._jreq("v5/engagement/account/" + self.id)
74         return self._data
75
76     @property
77     def number(self): return resolve(self.data, ("accountNumber",))
78     @property
79     def clearing(self): return resolve(self.data, ("clearingNumber",))
80     @property
81     def fullnumber(self): return resolve(self.data, ("fullyFormattedNumber",))
82     @property
83     def name(self): return resolve(self._idata, ("name",))
84
85     def transactions(self):
86         pagesz = 50
87         data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=1)
88         for tx in resolve(data, ("transactions",)):
89             yield transaction(self, tx)
90
91     def __repr__(self):
92         return "#<fsb.account %s: %r>" % (self.fullnumber, self.name)
93
94 class session(object):
95     def __init__(self, dsid):
96         self.dsid = dsid
97         self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
98         self.jar = request.HTTPCookieProcessor()
99         self.jar.cookiejar.set_cookie(http.cookiejar.Cookie(
100             version=0, name="dsid", value=dsid, path="/", path_specified=True,
101             domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True,
102             port=None, port_specified=False, secure=False, expires=None,
103             discard=True, comment=None, comment_url=None,
104             rest={}, rfc2109=False))
105         self.userid = None
106         self._accounts = None
107
108     def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws):
109         if "dsid" not in kws:
110             kws["dsid"] = self.dsid
111         kws = {k: v for (k, v) in kws.items() if v is not None}
112         url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws))
113         if isinstance(data, dict):
114             data = json.dumps(data).encode("utf-8")
115             ctype = "application/json;charset=UTF-8"
116         req = request.Request(url, data=data, method=method)
117         for hnam, hval in headers.items():
118             req.add_header(hnam, hval)
119         if ctype is not None:
120             req.add_header("Content-Type", ctype)
121         req.add_header("Authorization", self.auth)
122         self.jar.https_request(req)
123         with request.urlopen(req) as resp:
124             if resp.code != 200 and resp.code != 201:
125                 raise fmterror("Unexpected HTTP status code: " + str(resp.code))
126             self.jar.https_response(req, resp)
127             return resp.read()
128
129     def _jreq(self, *args, **kwargs):
130         headers = kwargs.pop("headers", {})
131         headers["Accept"] = "application/json"
132         ret = self._req(*args, headers=headers, **kwargs)
133         return json.loads(ret.decode("utf-8"))
134
135     def _postlogin(self):
136         auth = self._jreq("v5/user/authenticationinfo")
137         uid = auth.get("identifiedUser", "")
138         if uid == "":
139             raise fmterror("no identified user even after successful authentication")
140         self.userid = uid
141         prof = self._jreq("v5/profile/")
142         if len(prof["banks"]) != 1:
143             raise fmterror("do not know the meaning of multiple banks")
144         rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
145         self._jreq(rolesw, method="POST")
146
147     def auth_bankid(self, user):
148         data = self._jreq("v5/identification/bankid/mobile", data = {
149             "userId": user,
150             "useEasyLogin": False,
151             "generateEasyLoginId": False})
152         if data.get("status") != "USER_SIGN":
153             raise fmterror("unexpected bankid status: " + str(data.get("status")))
154         vfy = linkurl(resolve(data, ("links", "next", "uri")))
155         while True:
156             time.sleep(3)
157             vdat = self._jreq(vfy)
158             st = vdat.get("status")
159             if st == "USER_SIGN":
160                 continue
161             elif st == "CLIENT_NOT_STARTED":
162                 continue
163             elif st == "COMPLETE":
164                 self._postlogin()
165                 return
166             elif st == "CANCELLED":
167                 raise autherror("authentication cancelled")
168             else:
169                 raise fmterror("unexpected bankid status: " + str(st))
170
171     def keepalive(self):
172         data = self._jreq("v5/framework/clientsession")
173         return data["timeoutInMillis"] / 1000
174
175     @property
176     def accounts(self):
177         if self._accounts is None:
178             data = self._jreq("v5/engagement/overview")
179             accounts = []
180             for acct in resolve(data, ("transactionAccounts",)):
181                 accounts.append(account(self, resolve(acct, ("id",)), acct))
182             self._accounts = accounts
183         return self._accounts
184
185     def logout(self):
186         if self.userid is not None:
187             self._jreq("v5/identification/logout", method="PUT")
188             self.userid = None
189
190     def close(self):
191         self.logout()
192         self._req("v5/framework/clientsession", method="DELETE")
193
194     def __enter__(self):
195         return self
196
197     def __exit__(self, *excinfo):
198         self.close()
199         return False
200
201     def __repr__(self):
202         if self.userid is not None:
203             return "#<fsb.session %s>" % self.userid
204         return "#<fsb.session>"
205
206     @classmethod
207     def create(cls):
208         return cls(getdsid())
209
210     def save(self, filename):
211         with open(filename, "wb") as fp:
212             pickle.dump(self, fp)
213
214     @classmethod
215     def load(cls, filename):
216         with open(filename, "rb") as fp:
217             return picke.load(fp)