Use currency values in transactions.
[fulbank.git] / fulbank / fsb.py
... / ...
CommitLineData
1import json, http.cookiejar, binascii, time, pickle
2from urllib import request, parse
3from bs4 import BeautifulSoup as soup
4from . import currency
5soupify = lambda cont: soup(cont, "html.parser")
6
7apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/"
8loginurl = "https://online.swedbank.se/app/privat/login"
9serviceid = "B7dZHQcY78VRVz9l"
10
11class fmterror(Exception):
12 pass
13
14class autherror(Exception):
15 pass
16
17def resolve(d, keys, default=fmterror):
18 def err():
19 if default is fmterror:
20 raise fmterror()
21 return default
22 def rec(d, keys):
23 if len(keys) == 0:
24 return d
25 if isinstance(d, dict):
26 if keys[0] not in d:
27 return err()
28 return rec(d[keys[0]], keys[1:])
29 else:
30 return err()
31 return rec(d, keys)
32
33def linkurl(ln):
34 if ln[0] != '/':
35 raise fmterror("unexpected link url: " + ln)
36 return parse.urljoin(apibase, ln[1:])
37
38def getdsid():
39 with request.urlopen(loginurl) as resp:
40 if resp.code != 200:
41 raise fmterror("Unexpected HTTP status code: " + str(resp.code))
42 doc = soupify(resp.read())
43 dsel = doc.find("div", id="cust-sess-id")
44 if not dsel or not dsel.has_attr("value"):
45 raise fmterror("DSID DIV not on login page")
46 return dsel["value"]
47
48def base64(data):
49 return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
50
51class transaction(object):
52 def __init__(self, account, data):
53 self.account = account
54 self._data = data
55
56 @property
57 def value(self): return currency.currency.get(resolve(self._data, ("currency",))).parse(resolve(self._data, ("amount",)))
58 @property
59 def message(self): return resolve(self._data, ("details", "message"))
60
61 def __repr__(self):
62 return "#<fsb.transaction %s: %r>" % (self.value, self.message)
63
64class account(object):
65 def __init__(self, sess, id, idata):
66 self.sess = sess
67 self.id = id
68 self._data = None
69 self._idata = idata
70
71 @property
72 def data(self):
73 if self._data is None:
74 self._data = self.sess._jreq("v5/engagement/account/" + self.id)
75 return self._data
76
77 @property
78 def number(self): return resolve(self.data, ("accountNumber",))
79 @property
80 def clearing(self): return resolve(self.data, ("clearingNumber",))
81 @property
82 def fullnumber(self): return resolve(self.data, ("fullyFormattedNumber",))
83 @property
84 def name(self): return resolve(self._idata, ("name",))
85
86 def transactions(self):
87 pagesz = 50
88 data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=1)
89 for tx in resolve(data, ("transactions",)):
90 yield transaction(self, tx)
91
92 def __repr__(self):
93 return "#<fsb.account %s: %r>" % (self.fullnumber, self.name)
94
95class session(object):
96 def __init__(self, dsid):
97 self.dsid = dsid
98 self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
99 self.jar = request.HTTPCookieProcessor()
100 self.jar.cookiejar.set_cookie(http.cookiejar.Cookie(
101 version=0, name="dsid", value=dsid, path="/", path_specified=True,
102 domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True,
103 port=None, port_specified=False, secure=False, expires=None,
104 discard=True, comment=None, comment_url=None,
105 rest={}, rfc2109=False))
106 self.userid = None
107 self._accounts = None
108
109 def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws):
110 if "dsid" not in kws:
111 kws["dsid"] = self.dsid
112 kws = {k: v for (k, v) in kws.items() if v is not None}
113 url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws))
114 if isinstance(data, dict):
115 data = json.dumps(data).encode("utf-8")
116 ctype = "application/json;charset=UTF-8"
117 req = request.Request(url, data=data, method=method)
118 for hnam, hval in headers.items():
119 req.add_header(hnam, hval)
120 if ctype is not None:
121 req.add_header("Content-Type", ctype)
122 req.add_header("Authorization", self.auth)
123 self.jar.https_request(req)
124 with request.urlopen(req) as resp:
125 if resp.code != 200 and resp.code != 201:
126 raise fmterror("Unexpected HTTP status code: " + str(resp.code))
127 self.jar.https_response(req, resp)
128 return resp.read()
129
130 def _jreq(self, *args, **kwargs):
131 headers = kwargs.pop("headers", {})
132 headers["Accept"] = "application/json"
133 ret = self._req(*args, headers=headers, **kwargs)
134 return json.loads(ret.decode("utf-8"))
135
136 def _postlogin(self):
137 auth = self._jreq("v5/user/authenticationinfo")
138 uid = auth.get("identifiedUser", "")
139 if uid == "":
140 raise fmterror("no identified user even after successful authentication")
141 self.userid = uid
142 prof = self._jreq("v5/profile/")
143 if len(prof["banks"]) != 1:
144 raise fmterror("do not know the meaning of multiple banks")
145 rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
146 self._jreq(rolesw, method="POST")
147
148 def auth_bankid(self, user):
149 data = self._jreq("v5/identification/bankid/mobile", data = {
150 "userId": user,
151 "useEasyLogin": False,
152 "generateEasyLoginId": False})
153 if data.get("status") != "USER_SIGN":
154 raise fmterror("unexpected bankid status: " + str(data.get("status")))
155 vfy = linkurl(resolve(data, ("links", "next", "uri")))
156 while True:
157 time.sleep(3)
158 vdat = self._jreq(vfy)
159 st = vdat.get("status")
160 if st == "USER_SIGN":
161 continue
162 elif st == "CLIENT_NOT_STARTED":
163 continue
164 elif st == "COMPLETE":
165 self._postlogin()
166 return
167 elif st == "CANCELLED":
168 raise autherror("authentication cancelled")
169 else:
170 raise fmterror("unexpected bankid status: " + str(st))
171
172 def keepalive(self):
173 data = self._jreq("v5/framework/clientsession")
174 return data["timeoutInMillis"] / 1000
175
176 @property
177 def accounts(self):
178 if self._accounts is None:
179 data = self._jreq("v5/engagement/overview")
180 accounts = []
181 for acct in resolve(data, ("transactionAccounts",)):
182 accounts.append(account(self, resolve(acct, ("id",)), acct))
183 self._accounts = accounts
184 return self._accounts
185
186 def logout(self):
187 if self.userid is not None:
188 self._jreq("v5/identification/logout", method="PUT")
189 self.userid = None
190
191 def close(self):
192 self.logout()
193 self._req("v5/framework/clientsession", method="DELETE")
194
195 def __enter__(self):
196 return self
197
198 def __exit__(self, *excinfo):
199 self.close()
200 return False
201
202 def __repr__(self):
203 if self.userid is not None:
204 return "#<fsb.session %s>" % self.userid
205 return "#<fsb.session>"
206
207 @classmethod
208 def create(cls):
209 return cls(getdsid())
210
211 def save(self, filename):
212 with open(filename, "wb") as fp:
213 pickle.dump(self, fp)
214
215 @classmethod
216 def load(cls, filename):
217 with open(filename, "rb") as fp:
218 return picke.load(fp)