| 1 | import json, http.cookiejar, binascii, time |
| 2 | from urllib import request, parse |
| 3 | from bs4 import BeautifulSoup as soup |
| 4 | soupify = lambda cont: soup(cont, "html.parser") |
| 5 | |
| 6 | apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/" |
| 7 | loginurl = "https://online.swedbank.se/app/privat/login" |
| 8 | serviceid = "B7dZHQcY78VRVz9l" |
| 9 | |
| 10 | class fmterror(Exception): |
| 11 | pass |
| 12 | |
| 13 | class autherror(Exception): |
| 14 | pass |
| 15 | |
| 16 | def resolve(d, keys, default=fmterror): |
| 17 | def err(): |
| 18 | if default is fmterror: |
| 19 | raise fmterror() |
| 20 | return default |
| 21 | def rec(d, keys): |
| 22 | if len(keys) == 0: |
| 23 | return d |
| 24 | if isinstance(d, dict): |
| 25 | if keys[0] not in d: |
| 26 | return err() |
| 27 | return rec(d[keys[0]], keys[1:]) |
| 28 | else: |
| 29 | return err() |
| 30 | return rec(d, keys) |
| 31 | |
| 32 | def linkurl(ln): |
| 33 | if ln[0] != '/': |
| 34 | raise fmterror("unexpected link url: " + ln) |
| 35 | return parse.urljoin(apibase, ln[1:]) |
| 36 | |
| 37 | def getdsid(): |
| 38 | with request.urlopen(loginurl) as resp: |
| 39 | if resp.code != 200: |
| 40 | raise fmterror("Unexpected HTTP status code: " + str(resp.code)) |
| 41 | doc = soupify(resp.read()) |
| 42 | dsel = doc.find("div", id="cust-sess-id") |
| 43 | if not dsel or not dsel.has_attr("value"): |
| 44 | raise fmterror("DSID DIV not on login page") |
| 45 | return dsel["value"] |
| 46 | |
| 47 | def base64(data): |
| 48 | return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=") |
| 49 | |
| 50 | class session(object): |
| 51 | def __init__(self, dsid): |
| 52 | self.dsid = dsid |
| 53 | self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii")) |
| 54 | self.jar = request.HTTPCookieProcessor() |
| 55 | self.jar.cookiejar.set_cookie(http.cookiejar.Cookie( |
| 56 | version=0, name="dsid", value=dsid, path="/", path_specified=True, |
| 57 | domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True, |
| 58 | port=None, port_specified=False, secure=False, expires=None, |
| 59 | discard=True, comment=None, comment_url=None, |
| 60 | rest={}, rfc2109=False)) |
| 61 | self.userid = None |
| 62 | |
| 63 | def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws): |
| 64 | if "dsid" not in kws: |
| 65 | kws["dsid"] = self.dsid |
| 66 | kws = {k: v for (k, v) in kws.items() if v is not None} |
| 67 | url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws)) |
| 68 | if isinstance(data, dict): |
| 69 | data = json.dumps(data).encode("utf-8") |
| 70 | ctype = "application/json;charset=UTF-8" |
| 71 | req = request.Request(url, data=data, method=method) |
| 72 | for hnam, hval in headers.items(): |
| 73 | req.add_header(hnam, hval) |
| 74 | if ctype is not None: |
| 75 | req.add_header("Content-Type", ctype) |
| 76 | req.add_header("Authorization", self.auth) |
| 77 | self.jar.https_request(req) |
| 78 | with request.urlopen(req) as resp: |
| 79 | if resp.code != 200: |
| 80 | raise fmterror("Unexpected HTTP status code: " + str(resp.code)) |
| 81 | self.jar.https_response(req, resp) |
| 82 | return resp.read() |
| 83 | |
| 84 | def _jreq(self, *args, **kwargs): |
| 85 | headers = kwargs.pop("headers", {}) |
| 86 | headers["Accept"] = "application/json" |
| 87 | ret = self._req(*args, headers=headers, **kwargs) |
| 88 | return json.loads(ret.decode("utf-8")) |
| 89 | |
| 90 | def auth_bankid(self, user): |
| 91 | data = self._jreq("v5/identification/bankid/mobile", data = { |
| 92 | "userId": user, |
| 93 | "useEasyLogin": False, |
| 94 | "generateEasyLoginId": False}) |
| 95 | if data.get("status") != "USER_SIGN": |
| 96 | raise fmterror("unexpected bankid status: " + str(data.get("status"))) |
| 97 | vfy = linkurl(resolve(data, ("links", "next", "uri"))) |
| 98 | while True: |
| 99 | time.sleep(3) |
| 100 | vdat = self._jreq(vfy) |
| 101 | st = vdat.get("status") |
| 102 | if st == "USER_SIGN": |
| 103 | continue |
| 104 | elif st == "COMPLETE": |
| 105 | auth = self._jreq("v5/user/authenticationinfo") |
| 106 | uid = auth.get("identifiedUser", "") |
| 107 | if uid == "": |
| 108 | raise fmterror("no identified user even after successful authentication") |
| 109 | self.userid = uid |
| 110 | return |
| 111 | elif st == "CANCELLED": |
| 112 | raise autherror("authentication cancelled") |
| 113 | elif st == "CLIENT_NOT_STARTED": |
| 114 | raise autherror("authentication client not started") |
| 115 | else: |
| 116 | raise fmterror("unexpected bankid status: " + str(st)) |
| 117 | |
| 118 | def logout(self): |
| 119 | if self.userid is not None: |
| 120 | self._jreq("v5/identification/logout", method="PUT") |
| 121 | self.userid = None |
| 122 | |
| 123 | def close(self): |
| 124 | self.logout() |
| 125 | self._req("v5/framework/clientsession", method="DELETE") |
| 126 | |
| 127 | def __enter__(self): |
| 128 | return self |
| 129 | |
| 130 | def __exit__(self, *excinfo): |
| 131 | self.close() |
| 132 | return False |
| 133 | |
| 134 | @classmethod |
| 135 | def create(cls): |
| 136 | return cls(getdsid()) |