commit 7fd17d98bfc061200cb45f71e1b7b12457eb18fc Author: octospacc Date: Sun Dec 8 20:26:16 2024 +0100 Init da c5d4fbba93a80f84eeb735f05284796d39c0c354b9a82dcd557bb657c0ee2eb0 pyjod-main.tar.gz diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68bc17f --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..fa47f22 --- /dev/null +++ b/README.md @@ -0,0 +1,77 @@ + + + +# pyjod + +Open-source implementation of the Strong Customer Authentication client agent used by the italian provider Poste Italiane S.p.A. + +### DISCLAIMER: +* The main purpose of this project is research and understanding of the techniques and protocols used. +* This project is under development and unstable, using it could cause revocation or ban of your user account. +* Using this software would very likely compromise most of the security of having a 2-factor authentication scheme. Also, this software is not security-audited at all and is storing all your private keys and secret UNENCRYPTED on your computer! + +In conclusion: USE IT ONLY AT YOUR RISKS, UNDER YOUR RESPONSABILITY, AND IF YOU KNOW VERY WELL WHAT YOU ARE DOING! + + +## Installation + +All the following instruction assumes that you are using a BASH shell under Linux, +you will have to adapt them if you use a different Operating System or a different shell. + +1. Ensure you have a proper and recent Python (> 3.9) installed. +2. (Strongly suggested) create a dedicated Python virtual environment + ```sh + mkdir -p ~/.venvs + python3 -m venv ~/.venvs/pyjod + ``` +3. Activate the virtual environment + ```sh + source ~/.venvs/pyjod/bin/activate + ``` +4. Install `pyjod` and the optional dependencies you want + ```sh + # For a basic installation + pip install "git+https://projects.lilik.it/zolfa/pyjod" + # To include QR generator for OTP Authenticator apps + pip install "git+https://projects.lilik.it/zolfa/pyjod#egg=pyjod[qr]" + # To include also the QR screen scanner for instant authentication + pip install "git+https://projects.lilik.it/zolfa/pyjod#egg=pyjod[qr,scanqr]" + ``` +5. Create a symbolic link from a directory available in your `$PATH` + ```sh + ln -s ~/.venvs/pyjod/bin/posteid ~/.local/bin/posteid + ``` + +## Basic usage + +1. Execute `posteid` in a terminal, you will be guided into logging in and + configuring the application and enroling the device to your account. + ```sh + posteid + ``` +2. After the first execution, you can call `posteid` again to check the status of + your device-account enrolment. +2. Generate an OTP-code to authenticate using the "Generate a single-use code" option. + ```sh + posteid otp + ``` +3. List all the pending authentication request, and authorize one of them. + You will only see the requests generated using the "Send a notification to my device" + option. + ```sh + posteid authorize + ``` +4. Export the QR generation seed to a compatible app using a standard provisioning QR-code + _You will need the extra requirements [qr]_ + ```sh + posteid qr + ``` +5. Detect a rapid-authentication qr in your screen and authorize the access using it. + _You will need the extra requirements [scanqr]_ + ```sh + posteid scanqr + ``` +6. Revoke your device enrolment, disabling private keys and OTP generator. + ```sh + posteid revoke + ``` diff --git a/pyjod/__init__.py b/pyjod/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyjod/appdata.py b/pyjod/appdata.py new file mode 100644 index 0000000..2d50c31 --- /dev/null +++ b/pyjod/appdata.py @@ -0,0 +1,128 @@ +import json +from base64 import b64encode, b64decode +from pathlib import Path + +import xdgappdirs +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization +from jwcrypto import jwk +from pathvalidate import sanitize_filename + +data_dir = xdgappdirs.user_data_dir('pyjod', as_path=True) + + +class RSAPrivateKey: + def __init__(self, key): + self.key = key + + @classmethod + def generate(cls): + key = rsa.generate_private_key(65537, 2048) + return cls(key) + + @classmethod + def from_pem(cls, data): + key = serialization.load_pem_private_key(data, None) + return cls(key) + + @property + def jwk(self): + return jwk.JWK.from_pyca(self.key) + + @property + def pubkey_b64(self): + pubkey = self.key.public_key() + pubkey_bytes = pubkey.public_bytes( + serialization.Encoding.DER, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + return b64encode(pubkey_bytes).decode('utf-8') + + def to_pem(self): + return self.key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption() + ) + + +class RSAPublicKey: + def __init__(self, key): + self.key = key + + @classmethod + def from_b64(cls, data): + key_bytes = b64decode(data) + key = serialization.load_der_public_key(key_bytes) + return cls(key) + + @classmethod + def from_pem(cls, data): + key = serialization.load_pem_public_key(data) + return cls(key) + + @property + def jwk(self): + return jwk.JWK.from_pyca(self.key) + + def to_pem(self): + return self.key.public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + + +class AppData: + def __init__(self, profile_name): + self.profile_dir = data_dir / sanitize_filename(profile_name) + self.profile_dir.mkdir(exist_ok=True, parents=True) + + self.values_file = self.profile_dir / "values.json" + + @property + def app_privkey(self): + key_file = self.profile_dir / 'app_privkey.pem' + if key_file.is_file(): + with key_file.open('rb') as f: + key_bytes = f.read() + return RSAPrivateKey.from_pem(key_bytes) + else: + key = RSAPrivateKey.generate() + with key_file.open('wb') as f: + f.write(key.to_pem()) + return key + + @property + def serv_pubkey(self): + key_file = self.profile_dir / 'serv_pubkey.pem' + if key_file.is_file(): + with key_file.open('rb') as f: + key_bytes = f.read() + return RSAPublicKey.from_pem(key_bytes) + else: + return None + + @serv_pubkey.setter + def serv_pubkey(self, key_b64): + key = RSAPublicKey.from_b64(key_b64) + key_file = self.profile_dir / 'serv_pubkey.pem' + with key_file.open('wb') as f: + f.write(key.to_pem()) + + def __getitem__(self, key): + if self.values_file.is_file(): + with self.values_file.open() as f: + values = json.load(f) + if key in values: + return values[key] + return None + + def __setitem__(self, key, value): + if self.values_file.is_file(): + with self.values_file.open() as f: + values = json.load(f) + else: + values = {} + values[key] = value + with self.values_file.open('w') as f: + json.dump(values, f) diff --git a/pyjod/cli.py b/pyjod/cli.py new file mode 100644 index 0000000..35fc26b --- /dev/null +++ b/pyjod/cli.py @@ -0,0 +1,228 @@ +import argparse +import getpass +import json +import logging +import re +import time + +from argparse import ArgumentParser + +from .xkey import XKey +from .sca import SCA, NotInitializedPIN, AuthError + + +logger = logging.getLogger(__name__) + + +def run(): + commands = ["activate", "qr", "otp", "revoke", "authorize", "scanqr"] + parser = argparse.ArgumentParser(description="Gestisci OTP PosteID.") + parser.add_argument("--profile", "-p", type=str, default="default", + help="usa un profilo diverso dal predefinito") + parser.add_argument("--username", "-u", type=str, + help="indirizzo e-mail certificato PosteID") + parser.add_argument("command", nargs="?", choices=commands) + args = parser.parse_args() + + logger.debug("Inizializzazione modulo XKey.") + xkey = XKey(args.profile) + if not xkey.appdata['xkey_appuuid']: + xkey.register_xkey() + + logger.debug("Inizializzazione modulo SCA.") + sca = SCA(args.profile) + sca_app_regid = sca.appdata['sca_app_regid'] + + cli_prefix = "posteid" + if args.profile != "default": + cli_prefix += " --profile " + args.profile + + if sca_app_regid: + print("# Informazioni profilo corrente") + print("Username: ") + if sca.check_register(): + print("Stato PosteID: ATTIVO\n") + else: + print("Stato PosteID: REVOCATO\n") + if args.command == "activate": + print("# Riattivazione credenziali PosteID.") + perform_2fa_auth(sca, args.username) + else: + print("Le tue credenziali non sono più attive.") + print("Esegui `" + cli_prefix + " activate` per riattivarle.") + else: + logger.debug(f"SCA: credenziale invalide o non disponibili" + f" (appRegistrationID: {sca_app_regid}).") + print("Attivazione credenziali PosteID.") + perform_2fa_auth(sca, args.username) + + if args.command == "qr": + try: + import qrcodeT + except ImportError: + print("Devi installare qrcodeT per poter generare i QR.") + print("Prova con `pip install qrcodeT`.") + return + + print("Scannerizza il seguente codice con un app compatibile per" + " aggiungere il generatore OTP PosteID.\n") + qrcodeT.qrcodeT(sca.totp.provisioning_uri()) + + if args.command == "otp": + totp = sca.totp + remaining = totp.interval - time.time() % totp.interval + + print(f"Codice OTP corrente: {totp.now()}" + f" (tempo rimanente: {remaining:.0f}s).\n") + + if args.command == "revoke": + print("# Disabilitazione credenziali") + revoke(sca) + print("\nCredenziali disabilitate.") + + if args.command == "authorize": + pin_login(sca) + authorize(sca) + + if args.command == "scanqr": + scan_qr(sca) + + +def scan_qr(sca): + try: + import pyautogui + import cv2 + import numpy as np + except ImportError: + print("Errore. Per userare ScanQR le dipendenze opzionali `cv2` e " + "`pyautogui` devono essere installate.") + scr = pyautogui.screenshot() + detector = cv2.QRCodeDetector() + mm = re.compile(r"^https://secureholder\.mobile\.poste\.it" + r"/jod-secure-holder/qrcodeResolver/(\w+)") + qr = detector.detectAndDecode(np.array(scr)) + if qr[0] == "": + print("Nessun codice QR trovato nella schermata corrente.") + return + match_url = mm.match(qr[0]) + if not match_url: + print("Codice QR trovato ma non valido!") + return + tx_id = match_url.groups()[0] + ch = sca.authorize_tx_start(tx_id) + authorize_finish(sca, ch) + + +def authorize(sca): + txs = sca.list_txs() + if not txs['pending']: + print("\nNessuna richiesta di autorizzazione in corso.\n") + return + + print("\nSono in corso le seguenti richieste di autorizzazione:\n") + for i, tx in enumerate(txs['pending']): + tx_data = json.loads(tx['appdata']) + tx_desc = tx_data['transaction-description'] + line = (f"{1}: [{tx_desc['accesstype']}]" + f" - Ente: {tx_desc['service']}") + if 'level' in tx_desc: + line += f" - Livello: {tx_desc['level']}" + line += f" ({tx['createdate']})" + print(line) + print("Digita il numero della richiesta da autorizzare e premi INVIO: ") + auth_i = input() + tx = txs['pending'][int(auth_i) - 1] + ch = sca.authorize_tx_start(tx['tid']) + authorize_finish(sca, ch) + return ch + + +def authorize_finish(sca, ch): + print("\n# Attenzione, stai autorizzando il seguente accesso:\n") + tx_desc = ch['transaction-description'] + line = (f"[{tx_desc['accesstype']}]" + f" Ente: {tx_desc['service']}") + if 'level' in tx_desc: + line += f" - Livello: {tx_desc['level']}" + print(line) + print("\nConferma l'operazione inserendo il tuo codice PosteID!\n") + userpin = getpass.getpass("Codice PosteID: ") + sca.authorize_tx_finish(ch, userpin) + print("Accesso autorizzato!") + + +def pin_login(sca, attempts=0): + userpin = getpass.getpass("Codice PosteID: ") + try: + sca._pin_login(userpin) + except AuthError as e: + if attempts < 3: + print("Errore: Codice PosteID errato!") + print("Attenzione, il codice sarà bloccato dopo 5 tentativi.") + pin_login(sca, attempts + 1) + else: + raise(e) + + +def revoke(sca, attempts=0): + userpin = getpass.getpass("Codice PosteID: ") + try: + sca.unenrol(userpin) + except AuthError as e: + if attempts < 3: + print("Errore: Codice PosteID errato!") + print("Attenzione, il codice sarà bloccato dopo 5 tentativi.") + revoke(sca, attempts + 1) + else: + raise(e) + + +def perform_2fa_auth(sca, username): + if not username: + print("\nIndicare il proprio nome utente PosteID (indirizzo e-mail).") + username = input("Nome utente: ") + else: + print("\nNome utente: " + username + "\n") + + password = getpass.getpass("Password: ") + tel = sca.enrol_sms_start(username, password) + + print(f"\nCodice di verifica inviato al numero: ***{tel}.\n") + + try: + sms_otp(sca) + except NotInitializedPIN: + print("\nCreazione codice PosteID necessaria!") + print("Scegli un codice PIN numerico di 6 cifre.") + initialize_pin(sca) + + +def sms_otp(sca, attempts=0): + otp = getpass.getpass("Codice verifica SMS: ") + try: + sca.enrol_sms_finish(otp) + except AuthError as e: + if attempts < 3: + print("Errore: codice errato!\n") + sms_otp(sca, attempts + 1) + else: + raise(e) + + +def initialize_pin(sca): + pin1 = getpass.getpass("Nuovo codice PosteID: ") + if not re.match(r"^[0-9]{6}$", pin1): + print("Errore: il formato del PIN non è corrretto!") + initialize_pin(sca) + return + pin2 = getpass.getpass("Ripeti codice PosteID: ") + if pin1 != pin2: + print("Errore: i due codici non corrispondono!") + initialize_pin(sca) + return + sca._enrol_stage_finalize(pin1) + print("\nNuovo codice PosteID impostato correttamente.\n") + + +if __name__ == "__main__": + run() diff --git a/pyjod/jwe_handler.py b/pyjod/jwe_handler.py new file mode 100644 index 0000000..4f4ab54 --- /dev/null +++ b/pyjod/jwe_handler.py @@ -0,0 +1,69 @@ +import json +from random import randint +from time import time +from uuid import uuid4 + +from jwcrypto.jwe import JWE +from pyotp import HOTP +from requests import Request + +from .utils import sha256_base64, hmac_sha256 + + +class JWEHandler: + def __init__(self, appdata): + self.appdata = appdata + + def new_otp_specs(self): + generator = HOTP(self.appdata['xkey_seed'], 8, "SHA1") + counter = randint(0, 99999999) + otp_specs = {} + otp_specs['movingFactor'] = counter + otp_specs['otp'] = generator.generate_otp(counter) + otp_specs['type'] = "HMAC-SHA1" + return otp_specs + + def encrypt(self, sub, data, auth=False): + now = int(time()) + claims = {'data': data} + claims['sub'] = sub + claims['jti'] = str(uuid4()) + claims['iat'] = now + claims['nbf'] = now + claims['exp'] = now + 60 + claims['iss'] = "app-posteid-v3" + headers = {} + headers['cty'] = "JWE" + headers['typ'] = "JWT" + headers['alg'] = "RSA-OAEP-256" + headers['enc'] = "A256CBC-HS512" + if auth: + app_uuid = self.appdata['xkey_appuuid'] + headers['kid'] = app_uuid + claims['kid-sha256'] = sha256_base64(app_uuid) + claims['otp-specs'] = self.new_otp_specs() + jwe_token = JWE(json.dumps(claims), + protected=headers, + recipient=self.appdata.serv_pubkey.jwk) + return jwe_token.serialize(compact=True) + + def req_jwe_post(self, url, sub, data, auth=False): + jwe_token = self.encrypt(sub, data, auth) + headers = {'Content-Type': "application/json; charset=UTF-8"} + req = Request('POST', url, headers, data=jwe_token) + return req + + def req_jwe_bearer(self, url, sub, data, auth=False): + jwe_token = self.encrypt(sub, data, auth) + headers = {'Content-Type': "", + 'Authorization': "Bearer " + jwe_token} + req = Request('GET', url, headers) + return req + + def decrypt(self, serialized_jwe_token): + if isinstance(serialized_jwe_token, bytes): + serialized_jwe_token = serialized_jwe_token.decode('utf-8') + jwe_token = JWE() + jwe_token.deserialize(serialized_jwe_token) + jwe_token.decrypt(self.appdata.app_privkey.jwk) + return json.loads(jwe_token.payload) diff --git a/pyjod/sca.py b/pyjod/sca.py new file mode 100644 index 0000000..29e8d68 --- /dev/null +++ b/pyjod/sca.py @@ -0,0 +1,323 @@ +import json +import logging +import re +from base64 import b32encode +from getpass import getpass +from urllib.parse import urljoin +from uuid import uuid4 + +import requests +from pyotp import TOTP + +from .appdata import AppData +from .jwe_handler import JWEHandler +from .utils import RequestFailure, hmac_sha256 + +logger = logging.getLogger(__name__) + + +class NotInitializedPIN(Exception): + pass + + +class AuthError(Exception): + pass + + +class SCA: + LOGIN_URL = "https://posteid.poste.it/jod-securelogin-schema/" + AUTH_URL = "https://posteid.poste.it/jod-login-schema/" + SH_URL = ("https://sh2-web-posteid.poste.it/jod-secure-holder2-web" + "/public/app/") + + def __init__(self, profile_name): + self.appdata = AppData(profile_name) + self.s = requests.Session() + self.s.headers = {'Accept-Encoding': "gzip", + 'User-Agent': "okhttp/3.12.1", + 'Connection': "keep-alive"} + self.jwe_handler = JWEHandler(self.appdata) + self.reg_token = None + self.profile_token = None + self.access_token = None + + def _send_req(self, request): + prepped = self.s.prepare_request(request) + return self.s.send(prepped) + + def _parse_jwe_response(self, ans): + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + if not ans.headers.get('Content-Type').startswith("application/json"): + raise RequestFailure("Response not JSON.", ans) + ans_json = ans.json() + if not ans_json.get('command-success'): + raise RequestFailure(f"Command failed: {ans_json}", ans, + error_code=ans_json.get('command-error-code')) + if ans_json.get('command-result-type') not in ["JWE", "JSON"]: + raise RequestFailure(f"Result is not JWE/JSON: {ans_json}", ans) + return ans_json.get('command-result') + + def _parse_v4(self, ans): + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + if not ans.headers.get('Content-Type').startswith("application/json"): + raise RequestFailure("Response not JSON.", ans) + ans_json = ans.json() + if (ans_json.get('status') + not in ["v4_success", "v4_pending", "v4_signed"]): + if ans_json.get('status') == "v4_error": + reason = ans_json.get('reason') + raise RequestFailure(f"Request v4 error: {reason}.", ans, + error_code=reason) + raise RequestFailure("Unknown v4 failure.", ans) + return ans_json + + def _sign_challenge_v4(self, challenge, userpin): + userpin = str(userpin) + signature = {} + key = self.appdata['sca_seed'] + userpin + challenge['randK'] + message = challenge['transaction-challenge'] + signature['jti'] = challenge['jti'] + signature['signature'] = hmac_sha256(key, message) + signature['userpin'] = userpin + signature['authzTool'] = "POSTEID" + return json.dumps(signature) + + def _enrol_stage_basiclogin(self, username, password): + url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt") + data = {} + data['password'] = password + data['authLevel'] = "0" + data['userid'] = username + logger.debug("Enrol(basiclogin): sending username and password.") + jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data, + auth=True) + ans = self._send_req(jwe_req) + if ans.status_code == 401: + raise AuthError("Wrong username or password!") + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + if 'X-PI' not in ans.headers: + raise RequestFailure(f"Malformed request, missing 'X-PI'.", ans) + uid = ans.headers.get('X-PI') + logger.debug(f"Enrol(basiclogin): completed, uid='{uid}'.") + + def _enrol_stage_req_sms_otp(self, username): + url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt") + data = {} + data['password'] = str(uuid4()) + data['authLevel'] = "3" + data['userid'] = username + logger.debug("Enrol(SMS-OTP-REQ): requesting SMS-OTP verification..") + jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data, + auth=True) + ans = self._send_req(jwe_req) + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + if 'X-TEL' not in ans.headers: + raise RequestFailure(f"Malformed request, missing 'X-TEL'.", ans) + logger.debug("Enrol(SMS-OTP-REQ): SMS-OTP verification requested.") + tel = ans.headers.get('X-TEL') + logger.debug(f"Enrol(SMS-OTP-REQ): SMS-OTP sent to ***{tel}.") + return tel + + def _enrol_stage_send_sms_otp(self, otp): + url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt") + data = {} + data['otp'] = str(otp) + data['authLevel'] = "2" + data['nonce'] = str(uuid4()) + logger.debug("Enrol(SMS-OTP-AUTH): authenticating SMS-OTP.") + jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data, + auth=True) + ans = self._send_req(jwe_req) + if ans.status_code == 401: + raise AuthError("Wrong SMS-OTP code!") + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + if 'X-RESULT' not in ans.headers: + raise RequestFailure( + f"Malformed request, missing 'X-RESULT'.", ans) + logger.debug("Enrol(SMS-OTP-AUTH): decrypting response.") + ans_json = self.jwe_handler.decrypt(ans.headers.get('X-RESULT')) + logger.debug(f"Enrol(SMS-OTP-AUTH): decrypted response: {ans_json}") + if 'data' not in ans_json: + raise RequestFailure("Malformed request, missing 'data'.", ans) + if 'token' not in ans_json['data']: + raise RequestFailure("Malformed request, missing 'token'.", ans) + reg_token = ans_json['data']['token'] + logger.debug(f"Enrol(SMS-OTP-AUTH): got reg token: {reg_token}.") + self.reg_token = reg_token + + def _enrol_stage_finalize(self, userpin=""): + url = urljoin(self.SH_URL, "v1/registerApp") + data = {} + data['userPIN'] = userpin + data['idpAccessToken'] = "" + data['registerToken'] = self.reg_token + logger.debug("Enrol(REG-APP): encrypting request.") + print(data) + jwe_req = self.jwe_handler.req_jwe_post( + url, 'registerApp', data, auth=True) + logger.debug("Enrol(REG-APP): sending request.") + ans = self._send_req(jwe_req) + + try: + ans_jwe = self._parse_jwe_response(ans) + except RequestFailure as e: + if e.error_code == "PIN-ERR-1": + raise NotInitializedPIN() + raise e + + ans_json = self.jwe_handler.decrypt(ans_jwe) + if 'data' not in ans_json: + raise RequestFailure("Malformed request, missing 'data'.", ans) + if 'appRegisterID' not in ans_json['data']: + raise RequestFailure( + "Malformed request, missing 'appRegisterID'.", ans) + if 'secretAPP' not in ans_json['data']: + raise RequestFailure( + "Malformed request, missing 'secretAPP'.", ans) + self.appdata['sca_app_regid'] = ans_json['data']['appRegisterID'] + self.appdata['sca_seed'] = ans_json['data']['secretAPP'] + logger.debug("Enrol(REG-APP): completed successfully!") + + def check_register(self): + url = urljoin(self.SH_URL, "v1/checkRegisterApp") + data = {'appRegisterID': self.appdata['sca_app_regid']} + jwe_req = self.jwe_handler.req_jwe_post(url, "checkRegisterApp", data, + auth=True) + ans = self._send_req(jwe_req) + result = self._parse_jwe_response(ans) + if 'valid' not in result: + raise RequestFailure( + "Malformed request, missing 'valid'.", ans) + return result["valid"] + + def enrol_sms_start(self, username, password): + self._enrol_stage_basiclogin(username, password) + tel = self._enrol_stage_req_sms_otp(username) + return tel + + def enrol_sms_finish(self, otp, force_userpin=None): + self._enrol_stage_send_sms_otp(otp) + try: + self._enrol_stage_finalize() + except NotInitializedPIN as e: + if force_userpin: + logger.info('Enrol(REG-APP): Setting the new provided PIN.') + self._enrol_stage_finalize(force_userpin) + return + raise e + + def _pin_login(self, userpin): + url_challenge = urljoin(self.LOGIN_URL, + "secureholder/v4/native/challenge") + url_authorize = urljoin(self.LOGIN_URL, + "secureholder/v4/native/az") + logger.debug("PINLogin: acquiring v4 challenge.") + req = self.jwe_handler.req_jwe_post(url_challenge, "login", {}, + auth=True) + ans = self._send_req(req) + ans = self._parse_v4(ans) + logger.debug(f"PINLogin: got v4 challenge: {ans}.") + logger.debug(f"PINLogin: preparing challenge response.") + data = {} + data['signature'] = self._sign_challenge_v4(ans, userpin) + data['appRegisterID'] = self.appdata['sca_app_regid'] + logger.debug(f"PINLogin: sending response {data}.") + req = self.jwe_handler.req_jwe_post(url_authorize, "login", data, + auth=True) + ans = self._send_req(req) + try: + ans = self._parse_v4(ans) + except RequestFailure as e: + if e.error_code == "PIN-ERR-3": + raise AuthError("Codice PosteID errato!") + elif e.error_code == "CERT-ERR-2": + raise AuthError("Codice PosteID bloccato per troppi errori!") + raise(e) + self.profile_token = ans['profile_token'] + self.access_token = ans['access_token'] + logger.debug(f"PINLogin: logged in, session token aquired.") + + def _unenrol(self): + if not self.access_token: + raise Exception( + 'Need to acquire an access token (pin_login) before.') + logger.debug("Unenrol: unenrolling device.") + url = urljoin(self.LOGIN_URL, "secureholder/v4/native/delete-posteid") + jwe_req = self.jwe_handler.req_jwe_bearer(url, "delete_posteid", {}, + auth=True) + jwe_req.headers['Authorization'] = "Bearer " + self.access_token + ans = self._send_req(jwe_req) + ans = self._parse_v4(ans) + logger.debug("Unenrol: device unenrolled.") + + def list_txs(self): + if not self.access_token: + raise Exception( + "Need to acquire an access token (pin_login) before.") + url = urljoin(self.LOGIN_URL, + "secureholder/v4/native/list-transaction") + jwe_req = self.jwe_handler.req_jwe_post(url, "login", {}, auth=True) + jwe_req.headers['Authorization'] = "Bearer " + self.access_token + ans = self._send_req(jwe_req) + ans_json = self._parse_v4(ans) + if 'transaction' not in ans_json: + raise RequestFailure( + f"Malformed response, missing 'transaction'.", ans) + return ans_json['transaction'] + + def authorize_tx_start(self, tx_id): + data = {} + data['jti'] = tx_id + data['appRegisterID']: self.appdata['sca_app_regid'] + url = urljoin(self.AUTH_URL, "secureholder/v4/challenge") + jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True) + ans = self._send_req(jwe_req) + ans_json = self._parse_v4(ans) + if ans_json['status'] != "v4_pending": + raise Exception( + f"TX Status is '{ans_json['status']}', not pending.") + if 'jti' not in ans_json: + raise RequestFailure(f"Malformed response, missing 'jti'.", ans) + if 'randK' not in ans_json: + raise RequestFailure(f"Malformed response, missing 'randK'.", ans) + if 'transaction-challenge' not in ans_json: + raise RequestFailure( + f"Malformed response, missing 'transaction-challenge'.", ans) + return ans_json + + def authorize_tx_finish(self, challenge, userpin): + signature = self._sign_challenge_v4(challenge, userpin) + data = {} + data['signature'] = signature + data['appRegisterID'] = self.appdata['sca_app_regid'] + url = urljoin(self.AUTH_URL, "secureholder/v4/az") + jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True) + ans = self._send_req(jwe_req) + try: + ans_json = self._parse_v4(ans) + except RequestFailure as e: + if e.error_code == "PIN-ERR-3": + raise AuthError("Codice PosteID errato!") + elif e.error_code == "CERT-ERR-2": + raise AuthError("Codice PosteID bloccato per troppi errori!") + raise(e) + if ans_json['status'] != "v4_signed": + raise Exception( + f"TX Status is '{ans_json['status']}', not signed.") + + def unenrol(self, userpin): + self._pin_login(userpin) + self._unenrol() + + @property + def totp(self): + seed = self.appdata['sca_seed'] + seed = b32encode(seed.encode("utf-8") + b'\0' * 32) + seed = seed.decode("utf-8").replace('=', '') + totp = TOTP(seed, interval=120) + return totp diff --git a/pyjod/utils.py b/pyjod/utils.py new file mode 100644 index 0000000..aca7c8c --- /dev/null +++ b/pyjod/utils.py @@ -0,0 +1,25 @@ +import hmac +import json +from base64 import b64encode, urlsafe_b64encode +from hashlib import sha256 + + +def sha256_base64(payload): + out = payload.encode('utf-8') + out = sha256(out).digest() + out = b64encode(out) + return out.decode('utf-8') + + +def hmac_sha256(key, message): + hm = hmac.new(key.encode('utf-8'), digestmod=sha256) + hm.update(message.encode('utf-8')) + digest = hm.digest() + return urlsafe_b64encode(digest).decode('utf-8').replace('=', '') + + +class RequestFailure(Exception): + def __init__(self, message, ans, error_code=None): + super().__init__(message) + self.ans = ans + self.error_code = error_code diff --git a/pyjod/version.py b/pyjod/version.py new file mode 100644 index 0000000..0a263c4 --- /dev/null +++ b/pyjod/version.py @@ -0,0 +1,5 @@ +# coding: utf-8 +# file generated by setuptools_scm +# don't change, don't track in version control +__version__ = version = '0.1.dev0' +__version_tuple__ = version_tuple = (0, 1, 'dev0') diff --git a/pyjod/xkey.py b/pyjod/xkey.py new file mode 100644 index 0000000..fc6c547 --- /dev/null +++ b/pyjod/xkey.py @@ -0,0 +1,126 @@ +import logging +from urllib.parse import urljoin +from uuid import uuid4 + +import requests + +from .appdata import AppData +from .jwe_handler import JWEHandler +from .utils import sha256_base64, RequestFailure + +logger = logging.getLogger(__name__) + + +class XKey: + REGISTRY_URL = ("https://appregistry-posteid.mobile.poste.it" + "/jod-app-registry/") + APP_NAME = "app-posteid-v3" + ACTIVITY_ID = "C6050AC80E8B5288A01237" + DEVICE_SPECS = ("Android", "11", + "sdk_gphone_x86_64_arm64", "4.5.204", + "true") + + def __init__(self, profile_name): + self.appdata = AppData(profile_name) + self.s = requests.Session() + self.s.headers = {'Accept-Encoding': "gzip", + 'User-Agent': "okhttp/3.12.1", + 'Connection': "keep-alive"} + self.jwe_handler = JWEHandler(self.appdata) + + def _send_req(self, request): + prepped = self.s.prepare_request(request) + return self.s.send(prepped) + + def _register_stage_init(self, register_nonce): + url = urljoin(self.REGISTRY_URL, "v2/registerInit") + headers = {'Content-Type': "application/json; charset=UTF-8"} + data = {} + data['appName'] = "app-posteid-v3" + data['initCodeChallenge'] = sha256_base64(register_nonce) + logger.debug(f"Registration(INIT): sending challenge: {data}") + ans = self.s.post(url, headers=headers, json=data) + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + if not ans.headers.get('Content-Type').startswith("application/json"): + raise RequestFailure("Response not JSON.", ans) + ans_json = ans.json() + if 'pubServerKey' not in ans_json: + raise RequetFailure("Response does not contain 'pubServerKey'", + ans) + pubkey = ans_json['pubServerKey'] + self.appdata.serv_pubkey = pubkey + logger.debug(f"Registration(INIT): got server pubkey: {pubkey}.") + + def _register_stage_register(self, register_nonce): + url = urljoin(self.REGISTRY_URL, "v2/register") + data = {} + data['initCodeVerifier'] = register_nonce + data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS) + data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64 + logger.debug("Registration(REG): encrypting app data.") + jwe_req = self.jwe_handler.req_jwe_post(url, "register", data) + logger.debug("Registration(REG): sending app data.") + ans = self._send_req(jwe_req) + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + logger.debug("Registration(REG): decrypting response.") + ans_json = self.jwe_handler.decrypt(ans.content) + logger.debug(f"Registration(REG): decrypted response: {ans_json}") + if 'data' not in ans_json: + raise RequestFailure("Malformed request, missing 'data'.", ans) + if 'app-uuid' not in ans_json['data']: + raise RequestFailure("Malformed request, missing 'app-uuid'.", ans) + if 'otpSecretKey' not in ans_json['data']: + raise RequestFailure("Malformed request, missing 'otpSecretKey'.", + ans) + self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid'] + self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey'] + + def _register_stage_activate(self): + url = urljoin(self.REGISTRY_URL, "v2/activation") + logger.debug("Registration(ACTIVATE): encrypting request.") + jwe_req = self.jwe_handler.req_jwe_post(url, "activation", {}, + auth=True) + logger.debug("Registration(ACTIVATE): sending request.") + ans = self._send_req(jwe_req) + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + self.appdata['activated'] = True + logger.debug("Registration(ACTIVATE): activated.") + + def _register_stage_update(self, register_nonce): + url = urljoin(self.REGISTRY_URL, "v2/register") + data = {} + data['initCodeVerifier'] = register_nonce + data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS) + data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64 + logger.debug("Registration(UPDATE): encrypting app data.") + jwe_req = self.jwe_handler.req_jwe_post(url, "registerUpdate", data, + auth=True) + logger.debug("Registration(UPDATE): sending app data.") + ans = self._send_req(jwe_req) + if ans.status_code != 200: + raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) + logger.debug("Registration(UPDATE): decrypting response.") + ans_json = self.jwe_handler.decrypt(ans.content) + logger.debug(f"Registration(UPDATE): decrypted response: {ans_json}") + if 'data' not in ans_json: + raise RequestFailure("Malformed request, missing 'data'.", ans) + if 'app-uuid' not in ans_json['data']: + raise RequestFailure("Malformed request, missing 'app-uuid'.", ans) + if 'otpSecretKey' not in ans_json['data']: + raise RequestFailure("Malformed request, missing 'otpSecretKey'.", + ans) + self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid'] + self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey'] + + def register_xkey(self): + register_nonce = str(uuid4()) + logger.debug(f"Starting registration (nonce: {register_nonce}).") + self._register_stage_init(register_nonce) + if self.appdata['xkey_seed']: + self._register_stage_update(register_nonce) + else: + self._register_stage_register(register_nonce) + self._register_stage_activate() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..685f945 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,10 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel", + "setuptools_scm[toml]>3.4", +] +build-backend = "setuptools.build_meta" + +[tool.setuptools_scm] +write_to = "pyjod/version.py" diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..d49e954 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,42 @@ +[metadata] +name = pyjod +version = attr:pyjod.version +description = Python implementation of PosteID. +long_description = file:README.md +long_description_content_type = text/markdown +author = Vendetta +author_email = aaron@guerrilla.open +classifiers = + Development Status :: 1 - Planning + Programming Language :: Python + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + +platforms = any + +[options] +packages = + pyjod +install_requires = + xdgappdirs>=1.4 + cryptography>=3.3 + jwcrypto>=1.3 + pathvalidate>=2.5 + pyotp>=2.6 + requests + +setup_requires = + setuptools_scm + +[options.extras_require] +qr = + numpy + qrcodeT +scanqr = + numpy + opencv-python + pyautogui + +[options.entry_points] +console_scripts = + posteid = pyjod.cli:run diff --git a/test.png b/test.png new file mode 100644 index 0000000..5018201 Binary files /dev/null and b/test.png differ