-import json, http.cookiejar, binascii, time, pickle
+import json, http.cookiejar, binascii, time, datetime, pickle, urllib.error
from urllib import request, parse
from bs4 import BeautifulSoup as soup
-from . import currency
+from . import currency, auth, data
soupify = lambda cont: soup(cont, "html.parser")
apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/"
class fmterror(Exception):
pass
-class autherror(Exception):
+class autherror(auth.autherror):
pass
+class jsonerror(Exception):
+ def __init__(self, code, data, headers):
+ self.code = code
+ self.data = data
+ self.headers = headers
+
+ @classmethod
+ def fromerr(cls, err):
+ cs = err.headers.get_content_charset()
+ if cs is None:
+ cs = "utf-8"
+ data = json.loads(err.read().decode(cs))
+ return cls(err.code, data, err.headers)
+
def resolve(d, keys, default=fmterror):
- def err():
+ def err(key):
if default is fmterror:
- raise fmterror()
+ raise fmterror(key)
return default
def rec(d, keys):
if len(keys) == 0:
return d
if isinstance(d, dict):
if keys[0] not in d:
- return err()
+ return err(keys[0])
+ return rec(d[keys[0]], keys[1:])
+ elif isinstance(d, list):
+ if not 0 <= keys[0] < len(d):
+ return err(keys[0])
return rec(d[keys[0]], keys[1:])
else:
- return err()
+ return err(keys[0])
return rec(d, keys)
def linkurl(ln):
def base64(data):
return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
-class transaction(object):
+class transaction(data.transaction):
def __init__(self, account, data):
self.account = account
self._data = data
+ _datefmt = "%Y-%m-%d"
+
@property
def value(self): return currency.currency.get(resolve(self._data, ("currency",))).parse(resolve(self._data, ("amount",)))
@property
- def message(self): return resolve(self._data, ("details", "message"))
-
- def __repr__(self):
- return "#<fsb.transaction %s: %r>" % (self.value, self.message)
+ def message(self): return resolve(self._data, ("description",))
+ @property
+ def date(self):
+ p = time.strptime(resolve(self._data, ("accountingDate",)), self._datefmt)
+ return datetime.date(p.tm_year, p.tm_mon, p.tm_mday)
-class account(object):
+class txnaccount(data.txnaccount):
def __init__(self, sess, id, idata):
self.sess = sess
self.id = id
@property
def fullnumber(self): return resolve(self.data, ("fullyFormattedNumber",))
@property
+ def balance(self): return currency.currency.get(resolve(self.data, ("balance", "currencyCode"))).parse(resolve(self.data, ("balance", "amount")))
+ @property
def name(self): return resolve(self._idata, ("name",))
def transactions(self):
pagesz = 50
- data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=1)
- for tx in resolve(data, ("transactions",)):
- yield transaction(self, tx)
+ page = 1
+ while True:
+ data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=page)
+ txlist = resolve(data, ("transactions",))
+ if len(txlist) < 1:
+ break
+ for tx in txlist:
+ yield transaction(self, tx)
+ page += 1
- def __repr__(self):
- return "#<fsb.account %s: %r>" % (self.fullnumber, self.name)
+class cardtransaction(data.transaction):
+ def __init__(self, account, data):
+ self.account = account
+ self._data = data
+
+ _datefmt = "%Y-%m-%d"
+
+ @property
+ def value(self):
+ am = resolve(self._data, ("localAmount",))
+ return currency.currency.get(resolve(am, ("currencyCode",))).parse(resolve(am, ("amount",)))
+ @property
+ def message(self): return resolve(self._data, ("description",))
+ @property
+ def date(self):
+ p = time.strptime(resolve(self._data, ("date",)), self._datefmt)
+ return datetime.date(p.tm_year, p.tm_mon, p.tm_mday)
+
+class cardaccount(data.cardaccount):
+ def __init__(self, sess, id, idata):
+ self.sess = sess
+ self.id = id
+ self._data = None
+ self._idata = idata
+
+ @property
+ def data(self):
+ if self._data is None:
+ self._data = self.sess._jreq("v5/engagement/cardaccount/" + self.id)
+ return self._data
+
+ @property
+ def number(self): return resolve(self.data, ("cardAccount", "cardNumber"))
+ @property
+ def balance(self):
+ cc = resolve(self.data, ("transactions", 0, "localAmount", "currencyCode"))
+ return currency.currency.get(cc).parse(resolve(self.data, ("cardAccount", "currentBalance")))
+ @property
+ def name(self): return resolve(self._idata, ("name",))
+
+ def transactions(self):
+ pagesz = 50
+ page = 1
+ while True:
+ data = self.sess._jreq("v5/engagement/cardaccount/" + self.id, transactionsPerPage=pagesz, page=page)
+ txlist = resolve(data, ("transactions",))
+ if len(txlist) < 1:
+ break
+ for tx in txlist:
+ yield cardtransaction(self, tx)
+ page += 1
class session(object):
def __init__(self, dsid):
def _jreq(self, *args, **kwargs):
headers = kwargs.pop("headers", {})
headers["Accept"] = "application/json"
- ret = self._req(*args, headers=headers, **kwargs)
+ try:
+ ret = self._req(*args, headers=headers, **kwargs)
+ except urllib.error.HTTPError as e:
+ if e.headers.get_content_type() == "application/json":
+ raise jsonerror.fromerr(e)
return json.loads(ret.decode("utf-8"))
def _postlogin(self):
rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
self._jreq(rolesw, method="POST")
- def auth_bankid(self, user):
- data = self._jreq("v5/identification/bankid/mobile", data = {
- "userId": user,
- "useEasyLogin": False,
- "generateEasyLoginId": False})
- if data.get("status") != "USER_SIGN":
- raise fmterror("unexpected bankid status: " + str(data.get("status")))
+ def auth_bankid(self, user, conv=None):
+ if conv is None:
+ conv = auth.default()
+ try:
+ data = self._jreq("v5/identification/bankid/mobile", data = {
+ "userId": user,
+ "useEasyLogin": False,
+ "generateEasyLoginId": False})
+ except jsonerror as e:
+ if e.code == 400:
+ flds = resolve(e.data, ("errorMessages", "fields"), False)
+ if isinstance(flds, list):
+ for fld in flds:
+ if resolve(fld, ("field",), None) == "userId":
+ raise autherror(fld["message"])
+ raise
+ st = data.get("status")
vfy = linkurl(resolve(data, ("links", "next", "uri")))
+ fst = None
while True:
- time.sleep(3)
- vdat = self._jreq(vfy)
- st = vdat.get("status")
- if st == "USER_SIGN":
- continue
- elif st == "CLIENT_NOT_STARTED":
- continue
+ if st in {"USER_SIGN", "CLIENT_NOT_STARTED"}:
+ if st != fst:
+ conv.message("Status: %s" % (st,), auth.conv.msg_info)
+ fst = st
elif st == "COMPLETE":
self._postlogin()
return
elif st == "CANCELLED":
raise autherror("authentication cancelled")
+ elif st == "OUTSTANDING_TRANSACTION":
+ raise autherror("another bankid transaction already in progress")
else:
raise fmterror("unexpected bankid status: " + str(st))
+ time.sleep(3)
+ vdat = self._jreq(vfy)
+ st = vdat.get("status")
def keepalive(self):
data = self._jreq("v5/framework/clientsession")
data = self._jreq("v5/engagement/overview")
accounts = []
for acct in resolve(data, ("transactionAccounts",)):
- accounts.append(account(self, resolve(acct, ("id",)), acct))
+ accounts.append(txnaccount(self, resolve(acct, ("id",)), acct))
+ for acct in resolve(data, ("cardAccounts",)):
+ accounts.append(cardaccount(self, resolve(acct, ("id",)), acct))
self._accounts = accounts
return self._accounts
self.logout()
self._req("v5/framework/clientsession", method="DELETE")
+ def __getstate__(self):
+ state = dict(self.__dict__)
+ state["jar"] = list(state["jar"].cookiejar)
+ return state
+
+ def __setstate__(self, state):
+ jar = request.HTTPCookieProcessor()
+ for cookie in state["jar"]:
+ jar.cookiejar.set_cookie(cookie)
+ state["jar"] = jar
+ self.__dict__.update(state)
+
def __enter__(self):
return self
@classmethod
def load(cls, filename):
with open(filename, "rb") as fp:
- return picke.load(fp)
+ return pickle.load(fp)