-import sys, os, io, termios
+import sys, os, io, termios, tempfile, subprocess
class autherror(Exception):
pass
pass
def prompt(self, prompt, echo, default=None):
return default
+ def image(self, image):
+ pass
class termconv(conv):
def __init__(self, ifp, ofp):
return ret[:-1]
finally:
termios.tcsetattr(self.ifp.fileno(), termios.TCSANOW, bka)
+ def image(self, image):
+ fd, fn = tempfile.mkstemp()
+ try:
+ with os.fdopen(fd, "wb") as fp:
+ image.save(fp, "PNG")
+ subprocess.call(["sxiv", fn])
+ finally:
+ os.unlink(fn)
class ctermconv(termconv):
def __init__(self, fp):
if self.currency != other.currency:
raise ValueError("cannot compare %s with %s" % (self.currency.symbol, other.currency.symbol))
return self.amount >= other.amount
+
+ def __hash__(self):
+ return hash(self.amount) + hash(self.currency)
-import hashlib
+import os, pwd, hashlib, pickle
def _localname(type):
mod = type.__module__
def __repr__(self):
return "#<%s %s: %r>" % (_localname(type(self)), self.value, self.message)
+
+class session(object):
+ def save(self, filename):
+ with open(filename, "wb") as fp:
+ pickle.dump(self, fp)
+
+ @staticmethod
+ def load(filename):
+ with open(filename, "rb") as fp:
+ return pickle.load(fp)
+
+def getsessnam(name):
+ if name == "fsb":
+ from . import fsb
+ return fsb.session
+ raise ValueError("no such known session type: " + name)
+
+def _sesspath(name):
+ return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, ".cache/fulbank", name)
+
+def defaultsess():
+ ret = os.getenv("NETBANKSESS")
+ if ret:
+ return ret
+ return "master"
+
+def loadsess(name=None, default=FileNotFoundError):
+ if name is None: name = defaultsess()
+ path = _sesspath(name)
+ if not os.path.exists(path):
+ if default is FileNotFoundError:
+ raise FileNotFoundError(name)
+ return default
+ return session.load(path)
+
+def savesess(sess, name=None):
+ if name is None: name = defaultsess()
+ path = _sesspath(name)
+ if sess is not None:
+ sessdir = os.path.dirname(path)
+ if not os.path.isdir(sessdir):
+ os.makedirs(sessdir)
+ return sess.save(_sesspath(name))
+ else:
+ if os.path.exists(path):
+ os.unlink(path)
+
+class savedsess(object):
+ def __init__(self, name=None):
+ if name is None: name = defaultsess()
+ self.name = name
+ self.sess = None
+
+ def __enter__(self):
+ self.sess = loadsess(self.name)
+ return self.sess
+
+ def __exit__(self):
+ savesess(self.sess, name)
+ self.sess = None
-import json, http.cookiejar, binascii, time, datetime, pickle, urllib.error
+import json, http.cookiejar, binascii, time, datetime, pickle, urllib.error, io
+from PIL import Image
from urllib import request, parse
from bs4 import BeautifulSoup as soup
from . import currency, auth, data
yield cardtransaction(self, tx)
page += 1
-class session(object):
+class session(data.session):
def __init__(self, dsid):
self.dsid = dsid
self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
self._jreq(rolesw, method="POST")
+ def auth_token(self, user, conv=None):
+ if conv is None:
+ conv = auth.default()
+ try:
+ data = self._jreq("v5/identification/securitytoken/challenge", data = {
+ "userId": user,
+ "useEasyLogin": "false",
+ "generateEasyLoginId": "false"})
+ except jsonerror as e:
+ if e.code == 400:
+ flds = resolve(e.data, ("errorMessages", "fields"), False)
+ if isinstance(flds, list):
+ for fld in flds:
+ if resolve(fld, ("field",), None) == "userId":
+ raise autherror(fld["message"])
+ raise
+ if data.get("useOneTimePassword"):
+ raise fmterror("unexpectedly found useOneTimePassword")
+ if data.get("challenge") != "":
+ raise fmterror("unexpected challenge: " + str(data.get("challenge")))
+ if not isinstance(data.get("imageChallenge"), dict) or resolve(data, ("imageChallenge", "method")) != "GET":
+ raise fmterror("invalid image challenge: " + str(data.get("imageChallenge")))
+ iurl = linkurl(resolve(data, ("imageChallenge", "uri")))
+ vfy = linkurl(resolve(data, ("links", "next", "uri")))
+ img = Image.open(io.BytesIO(self._req(iurl)))
+ conv.image(img)
+ response = conv.prompt("Token response: ", True)
+ try:
+ data = self._jreq(vfy, data={"response": response})
+ except jsonerror as e:
+ msgs = resolve(e.data, ("errorMessages", "general"), False)
+ if isinstance(msgs, list):
+ for msg in msgs:
+ if msg.get("message"):
+ raise autherror(msg.get("message"))
+ raise
+ if not data.get("authenticationRole", ""):
+ raise fmterror("authentication appears to have succeded, but there is no authenticationRole: " + str(data))
+ self._postlogin()
+
def auth_bankid(self, user, conv=None):
if conv is None:
conv = auth.default()
@property
def accounts(self):
if self._accounts is None:
- data = self._jreq("v5/engagement/overview")
+ txndata = self._jreq("v5/engagement/overview")
+ crddata = self._jreq("v5/card/creditcard")
accounts = []
- for acct in resolve(data, ("transactionAccounts",)):
+ for acct in resolve(txndata, ("transactionAccounts",)):
accounts.append(txnaccount(self, resolve(acct, ("id",)), acct))
- for acct in resolve(data, ("cardAccounts",)):
+ for acct in resolve(crddata, ("cardAccounts",)):
accounts.append(cardaccount(self, resolve(acct, ("id",)), acct))
self._accounts = accounts
return self._accounts
self.logout()
self._req("v5/framework/clientsession", method="DELETE")
+ def __getstate__(self):
+ state = dict(self.__dict__)
+ state["jar"] = list(state["jar"].cookiejar)
+ return state
+
+ def __setstate__(self, state):
+ jar = request.HTTPCookieProcessor()
+ for cookie in state["jar"]:
+ jar.cookiejar.set_cookie(cookie)
+ state["jar"] = jar
+ self.__dict__.update(state)
+
def __enter__(self):
return self
@classmethod
def create(cls):
return cls(getdsid())
-
- def save(self, filename):
- with open(filename, "wb") as fp:
- pickle.dump(self, fp)
-
- @classmethod
- def load(cls, filename):
- with open(filename, "rb") as fp:
- return pickle.load(fp)
--- /dev/null
+import operator
+
+def pfxmatch(pfx, item):
+ return str(item)[:len(pfx)] == pfx
+
+def ipfxmatch(pfx, item):
+ return str(item).upper()[:len(pfx)] == pfx.upper()
+
+class ambiguous(LookupError):
+ def __init__(self, a, b):
+ super().__init__("ambigous match: %s and %s" % (a, b))
+ self.a = a
+ self.b = b
+
+def find(seq, *, item=None, test=None, match=None, key=None, default=LookupError):
+ if key is None:
+ key = lambda o: o
+ if match is None and item is not None:
+ match = lambda o: test(item, o)
+ if test is None:
+ test = operator.eq
+ found = None
+ for thing in seq:
+ if match(key(thing)):
+ if found is None:
+ found = thing
+ else:
+ if default is LookupError:
+ raise ambiguous(key(found), key(thing))
+ else:
+ return default
+ if found is not None:
+ return found
+ if default is LookupError:
+ raise LookupError()
+ else:
+ return default
#!/usr/bin/python3
-import sys, os, getopt, pwd, operator
-from fulbank import auth
+import sys, os, getopt, pwd
+from fulbank import auth, data, util
-sesstype = None
+sessname = data.defaultsess()
sess = None
-def pfxmatch(pfx, item):
- return str(item)[:len(pfx)] == pfx
-
-class ambiguous(LookupError):
- def __init__(self, a, b):
- super().__init__("ambigous match: %s and %s" % (a, b))
- self.a = a
- self.b = b
-
-def find(seq, *, item=None, test=None, match=None, key=None, default=LookupError):
- if key is None:
- key = lambda o: o
- if match is None and item is not None:
- match = lambda o: test(item, o)
- if test is None:
- test = operator.eq
- found = None
- for thing in seq:
- if match(key(thing)):
- if found is None:
- found = thing
- else:
- if default is LookupError:
- raise ambiguous(key(found), key(thing))
- else:
- return default
- if found is not None:
- return found
- if default is LookupError:
- raise LookupError()
- else:
- return default
-
def usage(out):
- out.write("usage: netbank [-h] BANK-ID COMMAND [ARGS...]\n")
+ out.write("usage: netbank [-h] [-s SESSION-ID] COMMAND [ARGS...]\n")
def requiresess(fn):
def wrap(cmd, args):
def cmd_login(cmd, args):
global sess
- if len(args) < 1:
- sys.stderr.write("usage: login TYPE\n")
+ if len(args) < 2:
+ sys.stderr.write("usage: login BANK-ID TYPE [ARGS...]\n")
sys.exit(1)
- sess = sesstype.create()
- if args[0] == "bankid":
- if len(args) < 2:
- sys.stderr.write("usage: login bankid USER-ID\n")
- sys.exit(1)
- with auth.ttyconv() as conv:
- try:
- sess.auth_bankid(args[1], conv)
- except auth.autherror as err:
- sys.stderr.write("netbank: authentication failed: %s\n" % err)
- sys.exit(1)
+ sess = data.getsessnam(args[0]).create()
+ if args[1] == "bankid":
+ authfun = sess.auth_bankid
+ elif args[1] == "token":
+ authfun = sess.auth_token
else:
- sys.stderr.write("netbank: %s: unknown authentication type\n" % (args[0]))
+ sys.stderr.write("netbank: %s: unknown authentication type\n" % (args[1]))
sys.exit(1)
+ if len(args) < 3:
+ sys.stderr.write("usage: login bankid USER-ID\n")
+ sys.exit(1)
+ with auth.ttyconv() as conv:
+ try:
+ authfun(args[2], conv)
+ except auth.autherror as err:
+ sys.stderr.write("netbank: authentication failed: %s\n" % err)
+ sys.exit(1)
commands["login"] = cmd_login
@requiresess
sys.stderr.write("usage: lstxn [-n NUM] ACCOUNT\n")
sys.exit(1)
try:
- acct = find(sess.accounts, item=args[0], key=lambda acct: acct.number, test=pfxmatch)
- except ambiguous as exc:
+ acct = util.find(sess.accounts, item=args[0], key=lambda acct: acct.number, test=util.pfxmatch)
+ except util.ambiguous as exc:
sys.stderr.write("netbank: %s: ambiguous match between %s and %s\n" % (args[0], exc.a, exc.b))
sys.exit(1)
except LookupError:
commands["lstxn"] = cmd_lstxn
def main():
- global sess, sesstype
+ global sess, sessname
- opts, args = getopt.getopt(sys.argv[1:], "h")
+ opts, args = getopt.getopt(sys.argv[1:], "hs:")
for o, a in opts:
if o == "-h":
usage(sys.stdout)
sys.exit(0)
- if len(args) < 2:
+ if o == "-s":
+ sessname = a
+ if len(args) < 1:
usage(sys.stderr)
sys.exit(1)
- if args[0] == "fsb":
- import fulbank.fsb
- sesstype = fulbank.fsb.session
- else:
- sys.stderr.write("netbank: %s: unknown bank id\n" % (args[0]))
- sys.exit(1)
- sesspath = os.path.join(pwd.getpwuid(os.getuid()).pw_dir, ".cache/fulbank", args[0])
- cmd = args[1]
- args = args[2:]
+ cmd = args[0]
+ args = args[1:]
- if os.path.exists(sesspath):
- sess = sesstype.load(sesspath)
- else:
- sess = None
+ sess = data.loadsess(sessname, None)
if cmd in commands:
commands[cmd](cmd, args)
else:
sys.stderr.write("netbank: %s: unknown command\n" % (cmd))
sys.exit(1)
- if sess is not None:
- sessdir = os.path.dirname(sesspath)
- if not os.path.isdir(sessdir):
- os.makedirs(sessdir)
- sess.save(sesspath)
- else:
- if os.path.exists(sesspath):
- os.unlink(sesspath)
+ data.savesess(sess, sessname)
try:
if __name__ == "__main__":