Initial commit.
[fulbank.git] / fulbank / fsb.py
1 import json, http.cookiejar, binascii, time
2 from urllib import request, parse
3 from bs4 import BeautifulSoup as soup
4 soupify = lambda cont: soup(cont, "html.parser")
5
6 apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/"
7 loginurl = "https://online.swedbank.se/app/privat/login"
8 serviceid = "B7dZHQcY78VRVz9l"
9
10 class fmterror(Exception):
11     pass
12
13 class autherror(Exception):
14     pass
15
16 def resolve(d, keys, default=fmterror):
17     def err():
18         if default is fmterror:
19             raise fmterror()
20         return default
21     def rec(d, keys):
22         if len(keys) == 0:
23             return d
24         if isinstance(d, dict):
25             if keys[0] not in d:
26                 return err()
27             return rec(d[keys[0]], keys[1:])
28         else:
29             return err()
30     return rec(d, keys)
31
32 def linkurl(ln):
33     if ln[0] != '/':
34         raise fmterror("unexpected link url: " + ln)
35     return parse.urljoin(apibase, ln[1:])
36
37 def getdsid():
38     with request.urlopen(loginurl) as resp:
39         if resp.code != 200:
40             raise fmterror("Unexpected HTTP status code: " + str(resp.code))
41         doc = soupify(resp.read())
42     dsel = doc.find("div", id="cust-sess-id")
43     if not dsel or not dsel.has_attr("value"):
44         raise fmterror("DSID DIV not on login page")
45     return dsel["value"]
46
47 def base64(data):
48     return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
49
50 class session(object):
51     def __init__(self, dsid):
52         self.dsid = dsid
53         self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
54         self.jar = request.HTTPCookieProcessor()
55         self.jar.cookiejar.set_cookie(http.cookiejar.Cookie(
56             version=0, name="dsid", value=dsid, path="/", path_specified=True,
57             domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True,
58             port=None, port_specified=False, secure=False, expires=None,
59             discard=True, comment=None, comment_url=None,
60             rest={}, rfc2109=False))
61         self.userid = None
62
63     def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws):
64         if "dsid" not in kws:
65             kws["dsid"] = self.dsid
66         kws = {k: v for (k, v) in kws.items() if v is not None}
67         url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws))
68         if isinstance(data, dict):
69             data = json.dumps(data).encode("utf-8")
70             ctype = "application/json;charset=UTF-8"
71         req = request.Request(url, data=data, method=method)
72         for hnam, hval in headers.items():
73             req.add_header(hnam, hval)
74         if ctype is not None:
75             req.add_header("Content-Type", ctype)
76         req.add_header("Authorization", self.auth)
77         self.jar.https_request(req)
78         with request.urlopen(req) as resp:
79             if resp.code != 200:
80                 raise fmterror("Unexpected HTTP status code: " + str(resp.code))
81             self.jar.https_response(req, resp)
82             return resp.read()
83
84     def _jreq(self, *args, **kwargs):
85         headers = kwargs.pop("headers", {})
86         headers["Accept"] = "application/json"
87         ret = self._req(*args, headers=headers, **kwargs)
88         return json.loads(ret.decode("utf-8"))
89
90     def auth_bankid(self, user):
91         data = self._jreq("v5/identification/bankid/mobile", data = {
92             "userId": user,
93             "useEasyLogin": False,
94             "generateEasyLoginId": False})
95         if data.get("status") != "USER_SIGN":
96             raise fmterror("unexpected bankid status: " + str(data.get("status")))
97         vfy = linkurl(resolve(data, ("links", "next", "uri")))
98         while True:
99             time.sleep(3)
100             vdat = self._jreq(vfy)
101             st = vdat.get("status")
102             if st == "USER_SIGN":
103                 continue
104             elif st == "COMPLETE":
105                 auth = self._jreq("v5/user/authenticationinfo")
106                 uid = auth.get("identifiedUser", "")
107                 if uid == "":
108                     raise fmterror("no identified user even after successful authentication")
109                 self.userid = uid
110                 return
111             elif st == "CANCELLED":
112                 raise autherror("authentication cancelled")
113             elif st == "CLIENT_NOT_STARTED":
114                 raise autherror("authentication client not started")
115             else:
116                 raise fmterror("unexpected bankid status: " + str(st))
117
118     def logout(self):
119         if self.userid is not None:
120             self._jreq("v5/identification/logout", method="PUT")
121             self.userid = None
122
123     def close(self):
124         self.logout()
125         self._req("v5/framework/clientsession", method="DELETE")
126
127     def __enter__(self):
128         return self
129
130     def __exit__(self, *excinfo):
131         self.close()
132         return False
133
134     @classmethod
135     def create(cls):
136         return cls(getdsid())