netbank: Changed login syntax. master
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 6 Jun 2021 00:10:15 +0000 (02:10 +0200)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 6 Jun 2021 00:10:15 +0000 (02:10 +0200)
fulbank/auth.py
fulbank/currency.py
fulbank/data.py
fulbank/fsb.py
fulbank/util.py [new file with mode: 0644]
netbank

index 013b873..deb065c 100644 (file)
@@ -1,4 +1,4 @@
-import sys, os, io, termios
+import sys, os, io, termios, tempfile, subprocess
 
 class autherror(Exception):
     pass
@@ -14,6 +14,8 @@ class conv(object):
         pass
     def prompt(self, prompt, echo, default=None):
         return default
+    def image(self, image):
+        pass
 
 class termconv(conv):
     def __init__(self, ifp, ofp):
@@ -49,6 +51,14 @@ class termconv(conv):
                 return ret[:-1]
             finally:
                 termios.tcsetattr(self.ifp.fileno(), termios.TCSANOW, bka)
+    def image(self, image):
+        fd, fn = tempfile.mkstemp()
+        try:
+            with os.fdopen(fd, "wb") as fp:
+                image.save(fp, "PNG")
+            subprocess.call(["sxiv", fn])
+        finally:
+            os.unlink(fn)
 
 class ctermconv(termconv):
     def __init__(self, fp):
index 547acb9..0c5660c 100644 (file)
@@ -143,3 +143,6 @@ class value(object):
         if self.currency != other.currency:
             raise ValueError("cannot compare %s with %s" % (self.currency.symbol, other.currency.symbol))
         return self.amount >= other.amount
+
+    def __hash__(self):
+        return hash(self.amount) + hash(self.currency)
index 36efe20..ea73b8d 100644 (file)
@@ -1,4 +1,4 @@
-import hashlib
+import os, pwd, hashlib, pickle
 
 def _localname(type):
     mod = type.__module__
@@ -46,3 +46,63 @@ class transaction(object):
 
     def __repr__(self):
         return "#<%s %s: %r>" % (_localname(type(self)), self.value, self.message)
+
+class session(object):
+    def save(self, filename):
+        with open(filename, "wb") as fp:
+            pickle.dump(self, fp)
+
+    @staticmethod
+    def load(filename):
+        with open(filename, "rb") as fp:
+            return pickle.load(fp)
+
+def getsessnam(name):
+    if name == "fsb":
+        from . import fsb
+        return fsb.session
+    raise ValueError("no such known session type: " + name)
+
+def _sesspath(name):
+    return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, ".cache/fulbank", name)
+
+def defaultsess():
+    ret = os.getenv("NETBANKSESS")
+    if ret:
+        return ret
+    return "master"
+
+def loadsess(name=None, default=FileNotFoundError):
+    if name is None: name = defaultsess()
+    path = _sesspath(name)
+    if not os.path.exists(path):
+        if default is FileNotFoundError:
+            raise FileNotFoundError(name)
+        return default
+    return session.load(path)
+
+def savesess(sess, name=None):
+    if name is None: name = defaultsess()
+    path = _sesspath(name)
+    if sess is not None:
+        sessdir = os.path.dirname(path)
+        if not os.path.isdir(sessdir):
+            os.makedirs(sessdir)
+        return sess.save(_sesspath(name))
+    else:
+        if os.path.exists(path):
+            os.unlink(path)
+
+class savedsess(object):
+    def __init__(self, name=None):
+        if name is None: name = defaultsess()
+        self.name = name
+        self.sess = None
+
+    def __enter__(self):
+        self.sess = loadsess(self.name)
+        return self.sess
+
+    def __exit__(self):
+        savesess(self.sess, name)
+        self.sess = None
index 145de7b..7a4a4a8 100644 (file)
@@ -1,4 +1,5 @@
-import json, http.cookiejar, binascii, time, datetime, pickle, urllib.error
+import json, http.cookiejar, binascii, time, datetime, pickle, urllib.error, io
+from PIL import Image
 from urllib import request, parse
 from bs4 import BeautifulSoup as soup
 from . import currency, auth, data
@@ -170,7 +171,7 @@ class cardaccount(data.cardaccount):
                 yield cardtransaction(self, tx)
             page += 1
 
-class session(object):
+class session(data.session):
     def __init__(self, dsid):
         self.dsid = dsid
         self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
@@ -227,6 +228,46 @@ class session(object):
         rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
         self._jreq(rolesw, method="POST")
 
+    def auth_token(self, user, conv=None):
+        if conv is None:
+            conv = auth.default()
+        try:
+            data = self._jreq("v5/identification/securitytoken/challenge", data = {
+                "userId": user,
+                "useEasyLogin": "false",
+                "generateEasyLoginId": "false"})
+        except jsonerror as e:
+            if e.code == 400:
+                flds = resolve(e.data, ("errorMessages", "fields"), False)
+                if isinstance(flds, list):
+                    for fld in flds:
+                        if resolve(fld, ("field",), None) == "userId":
+                            raise autherror(fld["message"])
+            raise
+        if data.get("useOneTimePassword"):
+            raise fmterror("unexpectedly found useOneTimePassword")
+        if data.get("challenge") != "":
+            raise fmterror("unexpected challenge: " + str(data.get("challenge")))
+        if not isinstance(data.get("imageChallenge"), dict) or resolve(data, ("imageChallenge", "method")) != "GET":
+            raise fmterror("invalid image challenge: " + str(data.get("imageChallenge")))
+        iurl = linkurl(resolve(data, ("imageChallenge", "uri")))
+        vfy = linkurl(resolve(data, ("links", "next", "uri")))
+        img = Image.open(io.BytesIO(self._req(iurl)))
+        conv.image(img)
+        response = conv.prompt("Token response: ", True)
+        try:
+            data = self._jreq(vfy, data={"response": response})
+        except jsonerror as e:
+            msgs = resolve(e.data, ("errorMessages", "general"), False)
+            if isinstance(msgs, list):
+                for msg in msgs:
+                    if msg.get("message"):
+                        raise autherror(msg.get("message"))
+            raise
+        if not data.get("authenticationRole", ""):
+            raise fmterror("authentication appears to have succeded, but there is no authenticationRole: " + str(data))
+        self._postlogin()
+
     def auth_bankid(self, user, conv=None):
         if conv is None:
             conv = auth.default()
@@ -271,11 +312,12 @@ class session(object):
     @property
     def accounts(self):
         if self._accounts is None:
-            data = self._jreq("v5/engagement/overview")
+            txndata = self._jreq("v5/engagement/overview")
+            crddata = self._jreq("v5/card/creditcard")
             accounts = []
-            for acct in resolve(data, ("transactionAccounts",)):
+            for acct in resolve(txndata, ("transactionAccounts",)):
                 accounts.append(txnaccount(self, resolve(acct, ("id",)), acct))
-            for acct in resolve(data, ("cardAccounts",)):
+            for acct in resolve(crddata, ("cardAccounts",)):
                 accounts.append(cardaccount(self, resolve(acct, ("id",)), acct))
             self._accounts = accounts
         return self._accounts
@@ -289,6 +331,18 @@ class session(object):
         self.logout()
         self._req("v5/framework/clientsession", method="DELETE")
 
+    def __getstate__(self):
+        state = dict(self.__dict__)
+        state["jar"] = list(state["jar"].cookiejar)
+        return state
+
+    def __setstate__(self, state):
+        jar = request.HTTPCookieProcessor()
+        for cookie in state["jar"]:
+            jar.cookiejar.set_cookie(cookie)
+        state["jar"] = jar
+        self.__dict__.update(state)
+
     def __enter__(self):
         return self
 
@@ -304,12 +358,3 @@ class session(object):
     @classmethod
     def create(cls):
         return cls(getdsid())
-
-    def save(self, filename):
-        with open(filename, "wb") as fp:
-            pickle.dump(self, fp)
-
-    @classmethod
-    def load(cls, filename):
-        with open(filename, "rb") as fp:
-            return pickle.load(fp)
diff --git a/fulbank/util.py b/fulbank/util.py
new file mode 100644 (file)
index 0000000..eaf1249
--- /dev/null
@@ -0,0 +1,37 @@
+import operator
+
+def pfxmatch(pfx, item):
+    return str(item)[:len(pfx)] == pfx
+
+def ipfxmatch(pfx, item):
+    return str(item).upper()[:len(pfx)] == pfx.upper()
+
+class ambiguous(LookupError):
+    def __init__(self, a, b):
+        super().__init__("ambigous match: %s and %s" % (a, b))
+        self.a = a
+        self.b = b
+
+def find(seq, *, item=None, test=None, match=None, key=None, default=LookupError):
+    if key is None:
+        key = lambda o: o
+    if match is None and item is not None:
+        match = lambda o: test(item, o)
+    if test is None:
+        test = operator.eq
+    found = None
+    for thing in seq:
+        if match(key(thing)):
+            if found is None:
+                found = thing
+            else:
+                if default is LookupError:
+                    raise ambiguous(key(found), key(thing))
+                else:
+                    return default
+    if found is not None:
+        return found
+    if default is LookupError:
+        raise LookupError()
+    else:
+        return default
diff --git a/netbank b/netbank
index a1d3566..7f52628 100755 (executable)
--- a/netbank
+++ b/netbank
@@ -1,46 +1,13 @@
 #!/usr/bin/python3
 
-import sys, os, getopt, pwd, operator
-from fulbank import auth
+import sys, os, getopt, pwd
+from fulbank import auth, data, util
 
-sesstype = None
+sessname = data.defaultsess()
 sess = None
 
-def pfxmatch(pfx, item):
-    return str(item)[:len(pfx)] == pfx
-
-class ambiguous(LookupError):
-    def __init__(self, a, b):
-        super().__init__("ambigous match: %s and %s" % (a, b))
-        self.a = a
-        self.b = b
-
-def find(seq, *, item=None, test=None, match=None, key=None, default=LookupError):
-    if key is None:
-        key = lambda o: o
-    if match is None and item is not None:
-        match = lambda o: test(item, o)
-    if test is None:
-        test = operator.eq
-    found = None
-    for thing in seq:
-        if match(key(thing)):
-            if found is None:
-                found = thing
-            else:
-                if default is LookupError:
-                    raise ambiguous(key(found), key(thing))
-                else:
-                    return default
-    if found is not None:
-        return found
-    if default is LookupError:
-        raise LookupError()
-    else:
-        return default
-
 def usage(out):
-    out.write("usage: netbank [-h] BANK-ID COMMAND [ARGS...]\n")
+    out.write("usage: netbank [-h] [-s SESSION-ID] COMMAND [ARGS...]\n")
 
 def requiresess(fn):
     def wrap(cmd, args):
@@ -54,23 +21,26 @@ commands = {}
 
 def cmd_login(cmd, args):
     global sess
-    if len(args) < 1:
-        sys.stderr.write("usage: login TYPE\n")
+    if len(args) < 2:
+        sys.stderr.write("usage: login BANK-ID TYPE [ARGS...]\n")
         sys.exit(1)
-    sess = sesstype.create()
-    if args[0] == "bankid":
-        if len(args) < 2:
-            sys.stderr.write("usage: login bankid USER-ID\n")
-            sys.exit(1)
-        with auth.ttyconv() as conv:
-            try:
-                sess.auth_bankid(args[1], conv)
-            except auth.autherror as err:
-                sys.stderr.write("netbank: authentication failed: %s\n" % err)
-                sys.exit(1)
+    sess = data.getsessnam(args[0]).create()
+    if args[1] == "bankid":
+        authfun = sess.auth_bankid
+    elif args[1] == "token":
+        authfun = sess.auth_token
     else:
-        sys.stderr.write("netbank: %s: unknown authentication type\n" % (args[0]))
+        sys.stderr.write("netbank: %s: unknown authentication type\n" % (args[1]))
         sys.exit(1)
+    if len(args) < 3:
+        sys.stderr.write("usage: login bankid USER-ID\n")
+        sys.exit(1)
+    with auth.ttyconv() as conv:
+        try:
+            authfun(args[2], conv)
+        except auth.autherror as err:
+            sys.stderr.write("netbank: authentication failed: %s\n" % err)
+            sys.exit(1)
 commands["login"] = cmd_login
 
 @requiresess
@@ -103,8 +73,8 @@ def cmd_lstxn(cmd, args):
         sys.stderr.write("usage: lstxn [-n NUM] ACCOUNT\n")
         sys.exit(1)
     try:
-        acct = find(sess.accounts, item=args[0], key=lambda acct: acct.number, test=pfxmatch)
-    except ambiguous as exc:
+        acct = util.find(sess.accounts, item=args[0], key=lambda acct: acct.number, test=util.pfxmatch)
+    except util.ambiguous as exc:
         sys.stderr.write("netbank: %s: ambiguous match between %s and %s\n" % (args[0], exc.a, exc.b))
         sys.exit(1)
     except LookupError:
@@ -115,44 +85,29 @@ def cmd_lstxn(cmd, args):
 commands["lstxn"] = cmd_lstxn
 
 def main():
-    global sess, sesstype
+    global sess, sessname
 
-    opts, args = getopt.getopt(sys.argv[1:], "h")
+    opts, args = getopt.getopt(sys.argv[1:], "hs:")
     for o, a in opts:
         if o == "-h":
             usage(sys.stdout)
             sys.exit(0)
-    if len(args) < 2:
+        if o == "-s":
+            sessname = a
+    if len(args) < 1:
         usage(sys.stderr)
         sys.exit(1)
 
-    if args[0] == "fsb":
-        import fulbank.fsb
-        sesstype = fulbank.fsb.session
-    else:
-        sys.stderr.write("netbank: %s: unknown bank id\n" % (args[0]))
-        sys.exit(1)
-    sesspath = os.path.join(pwd.getpwuid(os.getuid()).pw_dir, ".cache/fulbank", args[0])
-    cmd = args[1]
-    args = args[2:]
+    cmd = args[0]
+    args = args[1:]
 
-    if os.path.exists(sesspath):
-        sess = sesstype.load(sesspath)
-    else:
-        sess = None
+    sess = data.loadsess(sessname, None)
     if cmd in commands:
         commands[cmd](cmd, args)
     else:
         sys.stderr.write("netbank: %s: unknown command\n" % (cmd))
         sys.exit(1)
-    if sess is not None:
-        sessdir = os.path.dirname(sesspath)
-        if not os.path.isdir(sessdir):
-            os.makedirs(sessdir)
-        sess.save(sesspath)
-    else:
-        if os.path.exists(sesspath):
-            os.unlink(sesspath)
+    data.savesess(sess, sessname)
 
 try:
     if __name__ == "__main__":