Init da c5d4fbba93a80f84eeb735f05284796d39c0c354b9a82dcd557bb657c0ee2eb0 pyjod-main.tar.gz
This commit is contained in:
160
.gitignore
vendored
Normal file
160
.gitignore
vendored
Normal file
@@ -0,0 +1,160 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
77
README.md
Normal file
77
README.md
Normal file
@@ -0,0 +1,77 @@
|
||||
|
||||
|
||||
<!-- ABOUT THE PROJECT -->
|
||||
# pyjod
|
||||
|
||||
Open-source implementation of the Strong Customer Authentication client agent used by the italian provider Poste Italiane S.p.A.
|
||||
|
||||
### DISCLAIMER:
|
||||
* The main purpose of this project is research and understanding of the techniques and protocols used.
|
||||
* This project is under development and unstable, using it could cause revocation or ban of your user account.
|
||||
* Using this software would very likely compromise most of the security of having a 2-factor authentication scheme. Also, this software is not security-audited at all and is storing all your private keys and secret UNENCRYPTED on your computer!
|
||||
|
||||
In conclusion: USE IT ONLY AT YOUR RISKS, UNDER YOUR RESPONSABILITY, AND IF YOU KNOW VERY WELL WHAT YOU ARE DOING!
|
||||
|
||||
|
||||
## Installation
|
||||
|
||||
All the following instruction assumes that you are using a BASH shell under Linux,
|
||||
you will have to adapt them if you use a different Operating System or a different shell.
|
||||
|
||||
1. Ensure you have a proper and recent Python (> 3.9) installed.
|
||||
2. (Strongly suggested) create a dedicated Python virtual environment
|
||||
```sh
|
||||
mkdir -p ~/.venvs
|
||||
python3 -m venv ~/.venvs/pyjod
|
||||
```
|
||||
3. Activate the virtual environment
|
||||
```sh
|
||||
source ~/.venvs/pyjod/bin/activate
|
||||
```
|
||||
4. Install `pyjod` and the optional dependencies you want
|
||||
```sh
|
||||
# For a basic installation
|
||||
pip install "git+https://projects.lilik.it/zolfa/pyjod"
|
||||
# To include QR generator for OTP Authenticator apps
|
||||
pip install "git+https://projects.lilik.it/zolfa/pyjod#egg=pyjod[qr]"
|
||||
# To include also the QR screen scanner for instant authentication
|
||||
pip install "git+https://projects.lilik.it/zolfa/pyjod#egg=pyjod[qr,scanqr]"
|
||||
```
|
||||
5. Create a symbolic link from a directory available in your `$PATH`
|
||||
```sh
|
||||
ln -s ~/.venvs/pyjod/bin/posteid ~/.local/bin/posteid
|
||||
```
|
||||
|
||||
## Basic usage
|
||||
|
||||
1. Execute `posteid` in a terminal, you will be guided into logging in and
|
||||
configuring the application and enroling the device to your account.
|
||||
```sh
|
||||
posteid
|
||||
```
|
||||
2. After the first execution, you can call `posteid` again to check the status of
|
||||
your device-account enrolment.
|
||||
2. Generate an OTP-code to authenticate using the "Generate a single-use code" option.
|
||||
```sh
|
||||
posteid otp
|
||||
```
|
||||
3. List all the pending authentication request, and authorize one of them.
|
||||
You will only see the requests generated using the "Send a notification to my device"
|
||||
option.
|
||||
```sh
|
||||
posteid authorize
|
||||
```
|
||||
4. Export the QR generation seed to a compatible app using a standard provisioning QR-code
|
||||
_You will need the extra requirements [qr]_
|
||||
```sh
|
||||
posteid qr
|
||||
```
|
||||
5. Detect a rapid-authentication qr in your screen and authorize the access using it.
|
||||
_You will need the extra requirements [scanqr]_
|
||||
```sh
|
||||
posteid scanqr
|
||||
```
|
||||
6. Revoke your device enrolment, disabling private keys and OTP generator.
|
||||
```sh
|
||||
posteid revoke
|
||||
```
|
0
pyjod/__init__.py
Normal file
0
pyjod/__init__.py
Normal file
128
pyjod/appdata.py
Normal file
128
pyjod/appdata.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import json
|
||||
from base64 import b64encode, b64decode
|
||||
from pathlib import Path
|
||||
|
||||
import xdgappdirs
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from jwcrypto import jwk
|
||||
from pathvalidate import sanitize_filename
|
||||
|
||||
data_dir = xdgappdirs.user_data_dir('pyjod', as_path=True)
|
||||
|
||||
|
||||
class RSAPrivateKey:
|
||||
def __init__(self, key):
|
||||
self.key = key
|
||||
|
||||
@classmethod
|
||||
def generate(cls):
|
||||
key = rsa.generate_private_key(65537, 2048)
|
||||
return cls(key)
|
||||
|
||||
@classmethod
|
||||
def from_pem(cls, data):
|
||||
key = serialization.load_pem_private_key(data, None)
|
||||
return cls(key)
|
||||
|
||||
@property
|
||||
def jwk(self):
|
||||
return jwk.JWK.from_pyca(self.key)
|
||||
|
||||
@property
|
||||
def pubkey_b64(self):
|
||||
pubkey = self.key.public_key()
|
||||
pubkey_bytes = pubkey.public_bytes(
|
||||
serialization.Encoding.DER,
|
||||
serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
return b64encode(pubkey_bytes).decode('utf-8')
|
||||
|
||||
def to_pem(self):
|
||||
return self.key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()
|
||||
)
|
||||
|
||||
|
||||
class RSAPublicKey:
|
||||
def __init__(self, key):
|
||||
self.key = key
|
||||
|
||||
@classmethod
|
||||
def from_b64(cls, data):
|
||||
key_bytes = b64decode(data)
|
||||
key = serialization.load_der_public_key(key_bytes)
|
||||
return cls(key)
|
||||
|
||||
@classmethod
|
||||
def from_pem(cls, data):
|
||||
key = serialization.load_pem_public_key(data)
|
||||
return cls(key)
|
||||
|
||||
@property
|
||||
def jwk(self):
|
||||
return jwk.JWK.from_pyca(self.key)
|
||||
|
||||
def to_pem(self):
|
||||
return self.key.public_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
|
||||
|
||||
class AppData:
|
||||
def __init__(self, profile_name):
|
||||
self.profile_dir = data_dir / sanitize_filename(profile_name)
|
||||
self.profile_dir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
self.values_file = self.profile_dir / "values.json"
|
||||
|
||||
@property
|
||||
def app_privkey(self):
|
||||
key_file = self.profile_dir / 'app_privkey.pem'
|
||||
if key_file.is_file():
|
||||
with key_file.open('rb') as f:
|
||||
key_bytes = f.read()
|
||||
return RSAPrivateKey.from_pem(key_bytes)
|
||||
else:
|
||||
key = RSAPrivateKey.generate()
|
||||
with key_file.open('wb') as f:
|
||||
f.write(key.to_pem())
|
||||
return key
|
||||
|
||||
@property
|
||||
def serv_pubkey(self):
|
||||
key_file = self.profile_dir / 'serv_pubkey.pem'
|
||||
if key_file.is_file():
|
||||
with key_file.open('rb') as f:
|
||||
key_bytes = f.read()
|
||||
return RSAPublicKey.from_pem(key_bytes)
|
||||
else:
|
||||
return None
|
||||
|
||||
@serv_pubkey.setter
|
||||
def serv_pubkey(self, key_b64):
|
||||
key = RSAPublicKey.from_b64(key_b64)
|
||||
key_file = self.profile_dir / 'serv_pubkey.pem'
|
||||
with key_file.open('wb') as f:
|
||||
f.write(key.to_pem())
|
||||
|
||||
def __getitem__(self, key):
|
||||
if self.values_file.is_file():
|
||||
with self.values_file.open() as f:
|
||||
values = json.load(f)
|
||||
if key in values:
|
||||
return values[key]
|
||||
return None
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if self.values_file.is_file():
|
||||
with self.values_file.open() as f:
|
||||
values = json.load(f)
|
||||
else:
|
||||
values = {}
|
||||
values[key] = value
|
||||
with self.values_file.open('w') as f:
|
||||
json.dump(values, f)
|
228
pyjod/cli.py
Normal file
228
pyjod/cli.py
Normal file
@@ -0,0 +1,228 @@
|
||||
import argparse
|
||||
import getpass
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from .xkey import XKey
|
||||
from .sca import SCA, NotInitializedPIN, AuthError
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run():
|
||||
commands = ["activate", "qr", "otp", "revoke", "authorize", "scanqr"]
|
||||
parser = argparse.ArgumentParser(description="Gestisci OTP PosteID.")
|
||||
parser.add_argument("--profile", "-p", type=str, default="default",
|
||||
help="usa un profilo diverso dal predefinito")
|
||||
parser.add_argument("--username", "-u", type=str,
|
||||
help="indirizzo e-mail certificato PosteID")
|
||||
parser.add_argument("command", nargs="?", choices=commands)
|
||||
args = parser.parse_args()
|
||||
|
||||
logger.debug("Inizializzazione modulo XKey.")
|
||||
xkey = XKey(args.profile)
|
||||
if not xkey.appdata['xkey_appuuid']:
|
||||
xkey.register_xkey()
|
||||
|
||||
logger.debug("Inizializzazione modulo SCA.")
|
||||
sca = SCA(args.profile)
|
||||
sca_app_regid = sca.appdata['sca_app_regid']
|
||||
|
||||
cli_prefix = "posteid"
|
||||
if args.profile != "default":
|
||||
cli_prefix += " --profile " + args.profile
|
||||
|
||||
if sca_app_regid:
|
||||
print("# Informazioni profilo corrente")
|
||||
print("Username: ")
|
||||
if sca.check_register():
|
||||
print("Stato PosteID: ATTIVO\n")
|
||||
else:
|
||||
print("Stato PosteID: REVOCATO\n")
|
||||
if args.command == "activate":
|
||||
print("# Riattivazione credenziali PosteID.")
|
||||
perform_2fa_auth(sca, args.username)
|
||||
else:
|
||||
print("Le tue credenziali non sono più attive.")
|
||||
print("Esegui `" + cli_prefix + " activate` per riattivarle.")
|
||||
else:
|
||||
logger.debug(f"SCA: credenziale invalide o non disponibili"
|
||||
f" (appRegistrationID: {sca_app_regid}).")
|
||||
print("Attivazione credenziali PosteID.")
|
||||
perform_2fa_auth(sca, args.username)
|
||||
|
||||
if args.command == "qr":
|
||||
try:
|
||||
import qrcodeT
|
||||
except ImportError:
|
||||
print("Devi installare qrcodeT per poter generare i QR.")
|
||||
print("Prova con `pip install qrcodeT`.")
|
||||
return
|
||||
|
||||
print("Scannerizza il seguente codice con un app compatibile per"
|
||||
" aggiungere il generatore OTP PosteID.\n")
|
||||
qrcodeT.qrcodeT(sca.totp.provisioning_uri())
|
||||
|
||||
if args.command == "otp":
|
||||
totp = sca.totp
|
||||
remaining = totp.interval - time.time() % totp.interval
|
||||
|
||||
print(f"Codice OTP corrente: {totp.now()}"
|
||||
f" (tempo rimanente: {remaining:.0f}s).\n")
|
||||
|
||||
if args.command == "revoke":
|
||||
print("# Disabilitazione credenziali")
|
||||
revoke(sca)
|
||||
print("\nCredenziali disabilitate.")
|
||||
|
||||
if args.command == "authorize":
|
||||
pin_login(sca)
|
||||
authorize(sca)
|
||||
|
||||
if args.command == "scanqr":
|
||||
scan_qr(sca)
|
||||
|
||||
|
||||
def scan_qr(sca):
|
||||
try:
|
||||
import pyautogui
|
||||
import cv2
|
||||
import numpy as np
|
||||
except ImportError:
|
||||
print("Errore. Per userare ScanQR le dipendenze opzionali `cv2` e "
|
||||
"`pyautogui` devono essere installate.")
|
||||
scr = pyautogui.screenshot()
|
||||
detector = cv2.QRCodeDetector()
|
||||
mm = re.compile(r"^https://secureholder\.mobile\.poste\.it"
|
||||
r"/jod-secure-holder/qrcodeResolver/(\w+)")
|
||||
qr = detector.detectAndDecode(np.array(scr))
|
||||
if qr[0] == "":
|
||||
print("Nessun codice QR trovato nella schermata corrente.")
|
||||
return
|
||||
match_url = mm.match(qr[0])
|
||||
if not match_url:
|
||||
print("Codice QR trovato ma non valido!")
|
||||
return
|
||||
tx_id = match_url.groups()[0]
|
||||
ch = sca.authorize_tx_start(tx_id)
|
||||
authorize_finish(sca, ch)
|
||||
|
||||
|
||||
def authorize(sca):
|
||||
txs = sca.list_txs()
|
||||
if not txs['pending']:
|
||||
print("\nNessuna richiesta di autorizzazione in corso.\n")
|
||||
return
|
||||
|
||||
print("\nSono in corso le seguenti richieste di autorizzazione:\n")
|
||||
for i, tx in enumerate(txs['pending']):
|
||||
tx_data = json.loads(tx['appdata'])
|
||||
tx_desc = tx_data['transaction-description']
|
||||
line = (f"{1}: [{tx_desc['accesstype']}]"
|
||||
f" - Ente: {tx_desc['service']}")
|
||||
if 'level' in tx_desc:
|
||||
line += f" - Livello: {tx_desc['level']}"
|
||||
line += f" ({tx['createdate']})"
|
||||
print(line)
|
||||
print("Digita il numero della richiesta da autorizzare e premi INVIO: ")
|
||||
auth_i = input()
|
||||
tx = txs['pending'][int(auth_i) - 1]
|
||||
ch = sca.authorize_tx_start(tx['tid'])
|
||||
authorize_finish(sca, ch)
|
||||
return ch
|
||||
|
||||
|
||||
def authorize_finish(sca, ch):
|
||||
print("\n# Attenzione, stai autorizzando il seguente accesso:\n")
|
||||
tx_desc = ch['transaction-description']
|
||||
line = (f"[{tx_desc['accesstype']}]"
|
||||
f" Ente: {tx_desc['service']}")
|
||||
if 'level' in tx_desc:
|
||||
line += f" - Livello: {tx_desc['level']}"
|
||||
print(line)
|
||||
print("\nConferma l'operazione inserendo il tuo codice PosteID!\n")
|
||||
userpin = getpass.getpass("Codice PosteID: ")
|
||||
sca.authorize_tx_finish(ch, userpin)
|
||||
print("Accesso autorizzato!")
|
||||
|
||||
|
||||
def pin_login(sca, attempts=0):
|
||||
userpin = getpass.getpass("Codice PosteID: ")
|
||||
try:
|
||||
sca._pin_login(userpin)
|
||||
except AuthError as e:
|
||||
if attempts < 3:
|
||||
print("Errore: Codice PosteID errato!")
|
||||
print("Attenzione, il codice sarà bloccato dopo 5 tentativi.")
|
||||
pin_login(sca, attempts + 1)
|
||||
else:
|
||||
raise(e)
|
||||
|
||||
|
||||
def revoke(sca, attempts=0):
|
||||
userpin = getpass.getpass("Codice PosteID: ")
|
||||
try:
|
||||
sca.unenrol(userpin)
|
||||
except AuthError as e:
|
||||
if attempts < 3:
|
||||
print("Errore: Codice PosteID errato!")
|
||||
print("Attenzione, il codice sarà bloccato dopo 5 tentativi.")
|
||||
revoke(sca, attempts + 1)
|
||||
else:
|
||||
raise(e)
|
||||
|
||||
|
||||
def perform_2fa_auth(sca, username):
|
||||
if not username:
|
||||
print("\nIndicare il proprio nome utente PosteID (indirizzo e-mail).")
|
||||
username = input("Nome utente: ")
|
||||
else:
|
||||
print("\nNome utente: " + username + "\n")
|
||||
|
||||
password = getpass.getpass("Password: ")
|
||||
tel = sca.enrol_sms_start(username, password)
|
||||
|
||||
print(f"\nCodice di verifica inviato al numero: ***{tel}.\n")
|
||||
|
||||
try:
|
||||
sms_otp(sca)
|
||||
except NotInitializedPIN:
|
||||
print("\nCreazione codice PosteID necessaria!")
|
||||
print("Scegli un codice PIN numerico di 6 cifre.")
|
||||
initialize_pin(sca)
|
||||
|
||||
|
||||
def sms_otp(sca, attempts=0):
|
||||
otp = getpass.getpass("Codice verifica SMS: ")
|
||||
try:
|
||||
sca.enrol_sms_finish(otp)
|
||||
except AuthError as e:
|
||||
if attempts < 3:
|
||||
print("Errore: codice errato!\n")
|
||||
sms_otp(sca, attempts + 1)
|
||||
else:
|
||||
raise(e)
|
||||
|
||||
|
||||
def initialize_pin(sca):
|
||||
pin1 = getpass.getpass("Nuovo codice PosteID: ")
|
||||
if not re.match(r"^[0-9]{6}$", pin1):
|
||||
print("Errore: il formato del PIN non è corrretto!")
|
||||
initialize_pin(sca)
|
||||
return
|
||||
pin2 = getpass.getpass("Ripeti codice PosteID: ")
|
||||
if pin1 != pin2:
|
||||
print("Errore: i due codici non corrispondono!")
|
||||
initialize_pin(sca)
|
||||
return
|
||||
sca._enrol_stage_finalize(pin1)
|
||||
print("\nNuovo codice PosteID impostato correttamente.\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
69
pyjod/jwe_handler.py
Normal file
69
pyjod/jwe_handler.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import json
|
||||
from random import randint
|
||||
from time import time
|
||||
from uuid import uuid4
|
||||
|
||||
from jwcrypto.jwe import JWE
|
||||
from pyotp import HOTP
|
||||
from requests import Request
|
||||
|
||||
from .utils import sha256_base64, hmac_sha256
|
||||
|
||||
|
||||
class JWEHandler:
|
||||
def __init__(self, appdata):
|
||||
self.appdata = appdata
|
||||
|
||||
def new_otp_specs(self):
|
||||
generator = HOTP(self.appdata['xkey_seed'], 8, "SHA1")
|
||||
counter = randint(0, 99999999)
|
||||
otp_specs = {}
|
||||
otp_specs['movingFactor'] = counter
|
||||
otp_specs['otp'] = generator.generate_otp(counter)
|
||||
otp_specs['type'] = "HMAC-SHA1"
|
||||
return otp_specs
|
||||
|
||||
def encrypt(self, sub, data, auth=False):
|
||||
now = int(time())
|
||||
claims = {'data': data}
|
||||
claims['sub'] = sub
|
||||
claims['jti'] = str(uuid4())
|
||||
claims['iat'] = now
|
||||
claims['nbf'] = now
|
||||
claims['exp'] = now + 60
|
||||
claims['iss'] = "app-posteid-v3"
|
||||
headers = {}
|
||||
headers['cty'] = "JWE"
|
||||
headers['typ'] = "JWT"
|
||||
headers['alg'] = "RSA-OAEP-256"
|
||||
headers['enc'] = "A256CBC-HS512"
|
||||
if auth:
|
||||
app_uuid = self.appdata['xkey_appuuid']
|
||||
headers['kid'] = app_uuid
|
||||
claims['kid-sha256'] = sha256_base64(app_uuid)
|
||||
claims['otp-specs'] = self.new_otp_specs()
|
||||
jwe_token = JWE(json.dumps(claims),
|
||||
protected=headers,
|
||||
recipient=self.appdata.serv_pubkey.jwk)
|
||||
return jwe_token.serialize(compact=True)
|
||||
|
||||
def req_jwe_post(self, url, sub, data, auth=False):
|
||||
jwe_token = self.encrypt(sub, data, auth)
|
||||
headers = {'Content-Type': "application/json; charset=UTF-8"}
|
||||
req = Request('POST', url, headers, data=jwe_token)
|
||||
return req
|
||||
|
||||
def req_jwe_bearer(self, url, sub, data, auth=False):
|
||||
jwe_token = self.encrypt(sub, data, auth)
|
||||
headers = {'Content-Type': "",
|
||||
'Authorization': "Bearer " + jwe_token}
|
||||
req = Request('GET', url, headers)
|
||||
return req
|
||||
|
||||
def decrypt(self, serialized_jwe_token):
|
||||
if isinstance(serialized_jwe_token, bytes):
|
||||
serialized_jwe_token = serialized_jwe_token.decode('utf-8')
|
||||
jwe_token = JWE()
|
||||
jwe_token.deserialize(serialized_jwe_token)
|
||||
jwe_token.decrypt(self.appdata.app_privkey.jwk)
|
||||
return json.loads(jwe_token.payload)
|
323
pyjod/sca.py
Normal file
323
pyjod/sca.py
Normal file
@@ -0,0 +1,323 @@
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from base64 import b32encode
|
||||
from getpass import getpass
|
||||
from urllib.parse import urljoin
|
||||
from uuid import uuid4
|
||||
|
||||
import requests
|
||||
from pyotp import TOTP
|
||||
|
||||
from .appdata import AppData
|
||||
from .jwe_handler import JWEHandler
|
||||
from .utils import RequestFailure, hmac_sha256
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotInitializedPIN(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AuthError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SCA:
|
||||
LOGIN_URL = "https://posteid.poste.it/jod-securelogin-schema/"
|
||||
AUTH_URL = "https://posteid.poste.it/jod-login-schema/"
|
||||
SH_URL = ("https://sh2-web-posteid.poste.it/jod-secure-holder2-web"
|
||||
"/public/app/")
|
||||
|
||||
def __init__(self, profile_name):
|
||||
self.appdata = AppData(profile_name)
|
||||
self.s = requests.Session()
|
||||
self.s.headers = {'Accept-Encoding': "gzip",
|
||||
'User-Agent': "okhttp/3.12.1",
|
||||
'Connection': "keep-alive"}
|
||||
self.jwe_handler = JWEHandler(self.appdata)
|
||||
self.reg_token = None
|
||||
self.profile_token = None
|
||||
self.access_token = None
|
||||
|
||||
def _send_req(self, request):
|
||||
prepped = self.s.prepare_request(request)
|
||||
return self.s.send(prepped)
|
||||
|
||||
def _parse_jwe_response(self, ans):
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
if not ans.headers.get('Content-Type').startswith("application/json"):
|
||||
raise RequestFailure("Response not JSON.", ans)
|
||||
ans_json = ans.json()
|
||||
if not ans_json.get('command-success'):
|
||||
raise RequestFailure(f"Command failed: {ans_json}", ans,
|
||||
error_code=ans_json.get('command-error-code'))
|
||||
if ans_json.get('command-result-type') not in ["JWE", "JSON"]:
|
||||
raise RequestFailure(f"Result is not JWE/JSON: {ans_json}", ans)
|
||||
return ans_json.get('command-result')
|
||||
|
||||
def _parse_v4(self, ans):
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
if not ans.headers.get('Content-Type').startswith("application/json"):
|
||||
raise RequestFailure("Response not JSON.", ans)
|
||||
ans_json = ans.json()
|
||||
if (ans_json.get('status')
|
||||
not in ["v4_success", "v4_pending", "v4_signed"]):
|
||||
if ans_json.get('status') == "v4_error":
|
||||
reason = ans_json.get('reason')
|
||||
raise RequestFailure(f"Request v4 error: {reason}.", ans,
|
||||
error_code=reason)
|
||||
raise RequestFailure("Unknown v4 failure.", ans)
|
||||
return ans_json
|
||||
|
||||
def _sign_challenge_v4(self, challenge, userpin):
|
||||
userpin = str(userpin)
|
||||
signature = {}
|
||||
key = self.appdata['sca_seed'] + userpin + challenge['randK']
|
||||
message = challenge['transaction-challenge']
|
||||
signature['jti'] = challenge['jti']
|
||||
signature['signature'] = hmac_sha256(key, message)
|
||||
signature['userpin'] = userpin
|
||||
signature['authzTool'] = "POSTEID"
|
||||
return json.dumps(signature)
|
||||
|
||||
def _enrol_stage_basiclogin(self, username, password):
|
||||
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
|
||||
data = {}
|
||||
data['password'] = password
|
||||
data['authLevel'] = "0"
|
||||
data['userid'] = username
|
||||
logger.debug("Enrol(basiclogin): sending username and password.")
|
||||
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
|
||||
auth=True)
|
||||
ans = self._send_req(jwe_req)
|
||||
if ans.status_code == 401:
|
||||
raise AuthError("Wrong username or password!")
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
if 'X-PI' not in ans.headers:
|
||||
raise RequestFailure(f"Malformed request, missing 'X-PI'.", ans)
|
||||
uid = ans.headers.get('X-PI')
|
||||
logger.debug(f"Enrol(basiclogin): completed, uid='{uid}'.")
|
||||
|
||||
def _enrol_stage_req_sms_otp(self, username):
|
||||
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
|
||||
data = {}
|
||||
data['password'] = str(uuid4())
|
||||
data['authLevel'] = "3"
|
||||
data['userid'] = username
|
||||
logger.debug("Enrol(SMS-OTP-REQ): requesting SMS-OTP verification..")
|
||||
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
|
||||
auth=True)
|
||||
ans = self._send_req(jwe_req)
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
if 'X-TEL' not in ans.headers:
|
||||
raise RequestFailure(f"Malformed request, missing 'X-TEL'.", ans)
|
||||
logger.debug("Enrol(SMS-OTP-REQ): SMS-OTP verification requested.")
|
||||
tel = ans.headers.get('X-TEL')
|
||||
logger.debug(f"Enrol(SMS-OTP-REQ): SMS-OTP sent to ***{tel}.")
|
||||
return tel
|
||||
|
||||
def _enrol_stage_send_sms_otp(self, otp):
|
||||
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
|
||||
data = {}
|
||||
data['otp'] = str(otp)
|
||||
data['authLevel'] = "2"
|
||||
data['nonce'] = str(uuid4())
|
||||
logger.debug("Enrol(SMS-OTP-AUTH): authenticating SMS-OTP.")
|
||||
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
|
||||
auth=True)
|
||||
ans = self._send_req(jwe_req)
|
||||
if ans.status_code == 401:
|
||||
raise AuthError("Wrong SMS-OTP code!")
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
if 'X-RESULT' not in ans.headers:
|
||||
raise RequestFailure(
|
||||
f"Malformed request, missing 'X-RESULT'.", ans)
|
||||
logger.debug("Enrol(SMS-OTP-AUTH): decrypting response.")
|
||||
ans_json = self.jwe_handler.decrypt(ans.headers.get('X-RESULT'))
|
||||
logger.debug(f"Enrol(SMS-OTP-AUTH): decrypted response: {ans_json}")
|
||||
if 'data' not in ans_json:
|
||||
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
||||
if 'token' not in ans_json['data']:
|
||||
raise RequestFailure("Malformed request, missing 'token'.", ans)
|
||||
reg_token = ans_json['data']['token']
|
||||
logger.debug(f"Enrol(SMS-OTP-AUTH): got reg token: {reg_token}.")
|
||||
self.reg_token = reg_token
|
||||
|
||||
def _enrol_stage_finalize(self, userpin=""):
|
||||
url = urljoin(self.SH_URL, "v1/registerApp")
|
||||
data = {}
|
||||
data['userPIN'] = userpin
|
||||
data['idpAccessToken'] = ""
|
||||
data['registerToken'] = self.reg_token
|
||||
logger.debug("Enrol(REG-APP): encrypting request.")
|
||||
print(data)
|
||||
jwe_req = self.jwe_handler.req_jwe_post(
|
||||
url, 'registerApp', data, auth=True)
|
||||
logger.debug("Enrol(REG-APP): sending request.")
|
||||
ans = self._send_req(jwe_req)
|
||||
|
||||
try:
|
||||
ans_jwe = self._parse_jwe_response(ans)
|
||||
except RequestFailure as e:
|
||||
if e.error_code == "PIN-ERR-1":
|
||||
raise NotInitializedPIN()
|
||||
raise e
|
||||
|
||||
ans_json = self.jwe_handler.decrypt(ans_jwe)
|
||||
if 'data' not in ans_json:
|
||||
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
||||
if 'appRegisterID' not in ans_json['data']:
|
||||
raise RequestFailure(
|
||||
"Malformed request, missing 'appRegisterID'.", ans)
|
||||
if 'secretAPP' not in ans_json['data']:
|
||||
raise RequestFailure(
|
||||
"Malformed request, missing 'secretAPP'.", ans)
|
||||
self.appdata['sca_app_regid'] = ans_json['data']['appRegisterID']
|
||||
self.appdata['sca_seed'] = ans_json['data']['secretAPP']
|
||||
logger.debug("Enrol(REG-APP): completed successfully!")
|
||||
|
||||
def check_register(self):
|
||||
url = urljoin(self.SH_URL, "v1/checkRegisterApp")
|
||||
data = {'appRegisterID': self.appdata['sca_app_regid']}
|
||||
jwe_req = self.jwe_handler.req_jwe_post(url, "checkRegisterApp", data,
|
||||
auth=True)
|
||||
ans = self._send_req(jwe_req)
|
||||
result = self._parse_jwe_response(ans)
|
||||
if 'valid' not in result:
|
||||
raise RequestFailure(
|
||||
"Malformed request, missing 'valid'.", ans)
|
||||
return result["valid"]
|
||||
|
||||
def enrol_sms_start(self, username, password):
|
||||
self._enrol_stage_basiclogin(username, password)
|
||||
tel = self._enrol_stage_req_sms_otp(username)
|
||||
return tel
|
||||
|
||||
def enrol_sms_finish(self, otp, force_userpin=None):
|
||||
self._enrol_stage_send_sms_otp(otp)
|
||||
try:
|
||||
self._enrol_stage_finalize()
|
||||
except NotInitializedPIN as e:
|
||||
if force_userpin:
|
||||
logger.info('Enrol(REG-APP): Setting the new provided PIN.')
|
||||
self._enrol_stage_finalize(force_userpin)
|
||||
return
|
||||
raise e
|
||||
|
||||
def _pin_login(self, userpin):
|
||||
url_challenge = urljoin(self.LOGIN_URL,
|
||||
"secureholder/v4/native/challenge")
|
||||
url_authorize = urljoin(self.LOGIN_URL,
|
||||
"secureholder/v4/native/az")
|
||||
logger.debug("PINLogin: acquiring v4 challenge.")
|
||||
req = self.jwe_handler.req_jwe_post(url_challenge, "login", {},
|
||||
auth=True)
|
||||
ans = self._send_req(req)
|
||||
ans = self._parse_v4(ans)
|
||||
logger.debug(f"PINLogin: got v4 challenge: {ans}.")
|
||||
logger.debug(f"PINLogin: preparing challenge response.")
|
||||
data = {}
|
||||
data['signature'] = self._sign_challenge_v4(ans, userpin)
|
||||
data['appRegisterID'] = self.appdata['sca_app_regid']
|
||||
logger.debug(f"PINLogin: sending response {data}.")
|
||||
req = self.jwe_handler.req_jwe_post(url_authorize, "login", data,
|
||||
auth=True)
|
||||
ans = self._send_req(req)
|
||||
try:
|
||||
ans = self._parse_v4(ans)
|
||||
except RequestFailure as e:
|
||||
if e.error_code == "PIN-ERR-3":
|
||||
raise AuthError("Codice PosteID errato!")
|
||||
elif e.error_code == "CERT-ERR-2":
|
||||
raise AuthError("Codice PosteID bloccato per troppi errori!")
|
||||
raise(e)
|
||||
self.profile_token = ans['profile_token']
|
||||
self.access_token = ans['access_token']
|
||||
logger.debug(f"PINLogin: logged in, session token aquired.")
|
||||
|
||||
def _unenrol(self):
|
||||
if not self.access_token:
|
||||
raise Exception(
|
||||
'Need to acquire an access token (pin_login) before.')
|
||||
logger.debug("Unenrol: unenrolling device.")
|
||||
url = urljoin(self.LOGIN_URL, "secureholder/v4/native/delete-posteid")
|
||||
jwe_req = self.jwe_handler.req_jwe_bearer(url, "delete_posteid", {},
|
||||
auth=True)
|
||||
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
|
||||
ans = self._send_req(jwe_req)
|
||||
ans = self._parse_v4(ans)
|
||||
logger.debug("Unenrol: device unenrolled.")
|
||||
|
||||
def list_txs(self):
|
||||
if not self.access_token:
|
||||
raise Exception(
|
||||
"Need to acquire an access token (pin_login) before.")
|
||||
url = urljoin(self.LOGIN_URL,
|
||||
"secureholder/v4/native/list-transaction")
|
||||
jwe_req = self.jwe_handler.req_jwe_post(url, "login", {}, auth=True)
|
||||
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
|
||||
ans = self._send_req(jwe_req)
|
||||
ans_json = self._parse_v4(ans)
|
||||
if 'transaction' not in ans_json:
|
||||
raise RequestFailure(
|
||||
f"Malformed response, missing 'transaction'.", ans)
|
||||
return ans_json['transaction']
|
||||
|
||||
def authorize_tx_start(self, tx_id):
|
||||
data = {}
|
||||
data['jti'] = tx_id
|
||||
data['appRegisterID']: self.appdata['sca_app_regid']
|
||||
url = urljoin(self.AUTH_URL, "secureholder/v4/challenge")
|
||||
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
|
||||
ans = self._send_req(jwe_req)
|
||||
ans_json = self._parse_v4(ans)
|
||||
if ans_json['status'] != "v4_pending":
|
||||
raise Exception(
|
||||
f"TX Status is '{ans_json['status']}', not pending.")
|
||||
if 'jti' not in ans_json:
|
||||
raise RequestFailure(f"Malformed response, missing 'jti'.", ans)
|
||||
if 'randK' not in ans_json:
|
||||
raise RequestFailure(f"Malformed response, missing 'randK'.", ans)
|
||||
if 'transaction-challenge' not in ans_json:
|
||||
raise RequestFailure(
|
||||
f"Malformed response, missing 'transaction-challenge'.", ans)
|
||||
return ans_json
|
||||
|
||||
def authorize_tx_finish(self, challenge, userpin):
|
||||
signature = self._sign_challenge_v4(challenge, userpin)
|
||||
data = {}
|
||||
data['signature'] = signature
|
||||
data['appRegisterID'] = self.appdata['sca_app_regid']
|
||||
url = urljoin(self.AUTH_URL, "secureholder/v4/az")
|
||||
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
|
||||
ans = self._send_req(jwe_req)
|
||||
try:
|
||||
ans_json = self._parse_v4(ans)
|
||||
except RequestFailure as e:
|
||||
if e.error_code == "PIN-ERR-3":
|
||||
raise AuthError("Codice PosteID errato!")
|
||||
elif e.error_code == "CERT-ERR-2":
|
||||
raise AuthError("Codice PosteID bloccato per troppi errori!")
|
||||
raise(e)
|
||||
if ans_json['status'] != "v4_signed":
|
||||
raise Exception(
|
||||
f"TX Status is '{ans_json['status']}', not signed.")
|
||||
|
||||
def unenrol(self, userpin):
|
||||
self._pin_login(userpin)
|
||||
self._unenrol()
|
||||
|
||||
@property
|
||||
def totp(self):
|
||||
seed = self.appdata['sca_seed']
|
||||
seed = b32encode(seed.encode("utf-8") + b'\0' * 32)
|
||||
seed = seed.decode("utf-8").replace('=', '')
|
||||
totp = TOTP(seed, interval=120)
|
||||
return totp
|
25
pyjod/utils.py
Normal file
25
pyjod/utils.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import hmac
|
||||
import json
|
||||
from base64 import b64encode, urlsafe_b64encode
|
||||
from hashlib import sha256
|
||||
|
||||
|
||||
def sha256_base64(payload):
|
||||
out = payload.encode('utf-8')
|
||||
out = sha256(out).digest()
|
||||
out = b64encode(out)
|
||||
return out.decode('utf-8')
|
||||
|
||||
|
||||
def hmac_sha256(key, message):
|
||||
hm = hmac.new(key.encode('utf-8'), digestmod=sha256)
|
||||
hm.update(message.encode('utf-8'))
|
||||
digest = hm.digest()
|
||||
return urlsafe_b64encode(digest).decode('utf-8').replace('=', '')
|
||||
|
||||
|
||||
class RequestFailure(Exception):
|
||||
def __init__(self, message, ans, error_code=None):
|
||||
super().__init__(message)
|
||||
self.ans = ans
|
||||
self.error_code = error_code
|
5
pyjod/version.py
Normal file
5
pyjod/version.py
Normal file
@@ -0,0 +1,5 @@
|
||||
# coding: utf-8
|
||||
# file generated by setuptools_scm
|
||||
# don't change, don't track in version control
|
||||
__version__ = version = '0.1.dev0'
|
||||
__version_tuple__ = version_tuple = (0, 1, 'dev0')
|
126
pyjod/xkey.py
Normal file
126
pyjod/xkey.py
Normal file
@@ -0,0 +1,126 @@
|
||||
import logging
|
||||
from urllib.parse import urljoin
|
||||
from uuid import uuid4
|
||||
|
||||
import requests
|
||||
|
||||
from .appdata import AppData
|
||||
from .jwe_handler import JWEHandler
|
||||
from .utils import sha256_base64, RequestFailure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class XKey:
|
||||
REGISTRY_URL = ("https://appregistry-posteid.mobile.poste.it"
|
||||
"/jod-app-registry/")
|
||||
APP_NAME = "app-posteid-v3"
|
||||
ACTIVITY_ID = "C6050AC80E8B5288A01237"
|
||||
DEVICE_SPECS = ("Android", "11",
|
||||
"sdk_gphone_x86_64_arm64", "4.5.204",
|
||||
"true")
|
||||
|
||||
def __init__(self, profile_name):
|
||||
self.appdata = AppData(profile_name)
|
||||
self.s = requests.Session()
|
||||
self.s.headers = {'Accept-Encoding': "gzip",
|
||||
'User-Agent': "okhttp/3.12.1",
|
||||
'Connection': "keep-alive"}
|
||||
self.jwe_handler = JWEHandler(self.appdata)
|
||||
|
||||
def _send_req(self, request):
|
||||
prepped = self.s.prepare_request(request)
|
||||
return self.s.send(prepped)
|
||||
|
||||
def _register_stage_init(self, register_nonce):
|
||||
url = urljoin(self.REGISTRY_URL, "v2/registerInit")
|
||||
headers = {'Content-Type': "application/json; charset=UTF-8"}
|
||||
data = {}
|
||||
data['appName'] = "app-posteid-v3"
|
||||
data['initCodeChallenge'] = sha256_base64(register_nonce)
|
||||
logger.debug(f"Registration(INIT): sending challenge: {data}")
|
||||
ans = self.s.post(url, headers=headers, json=data)
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
if not ans.headers.get('Content-Type').startswith("application/json"):
|
||||
raise RequestFailure("Response not JSON.", ans)
|
||||
ans_json = ans.json()
|
||||
if 'pubServerKey' not in ans_json:
|
||||
raise RequetFailure("Response does not contain 'pubServerKey'",
|
||||
ans)
|
||||
pubkey = ans_json['pubServerKey']
|
||||
self.appdata.serv_pubkey = pubkey
|
||||
logger.debug(f"Registration(INIT): got server pubkey: {pubkey}.")
|
||||
|
||||
def _register_stage_register(self, register_nonce):
|
||||
url = urljoin(self.REGISTRY_URL, "v2/register")
|
||||
data = {}
|
||||
data['initCodeVerifier'] = register_nonce
|
||||
data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS)
|
||||
data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64
|
||||
logger.debug("Registration(REG): encrypting app data.")
|
||||
jwe_req = self.jwe_handler.req_jwe_post(url, "register", data)
|
||||
logger.debug("Registration(REG): sending app data.")
|
||||
ans = self._send_req(jwe_req)
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
logger.debug("Registration(REG): decrypting response.")
|
||||
ans_json = self.jwe_handler.decrypt(ans.content)
|
||||
logger.debug(f"Registration(REG): decrypted response: {ans_json}")
|
||||
if 'data' not in ans_json:
|
||||
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
||||
if 'app-uuid' not in ans_json['data']:
|
||||
raise RequestFailure("Malformed request, missing 'app-uuid'.", ans)
|
||||
if 'otpSecretKey' not in ans_json['data']:
|
||||
raise RequestFailure("Malformed request, missing 'otpSecretKey'.",
|
||||
ans)
|
||||
self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid']
|
||||
self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey']
|
||||
|
||||
def _register_stage_activate(self):
|
||||
url = urljoin(self.REGISTRY_URL, "v2/activation")
|
||||
logger.debug("Registration(ACTIVATE): encrypting request.")
|
||||
jwe_req = self.jwe_handler.req_jwe_post(url, "activation", {},
|
||||
auth=True)
|
||||
logger.debug("Registration(ACTIVATE): sending request.")
|
||||
ans = self._send_req(jwe_req)
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
self.appdata['activated'] = True
|
||||
logger.debug("Registration(ACTIVATE): activated.")
|
||||
|
||||
def _register_stage_update(self, register_nonce):
|
||||
url = urljoin(self.REGISTRY_URL, "v2/register")
|
||||
data = {}
|
||||
data['initCodeVerifier'] = register_nonce
|
||||
data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS)
|
||||
data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64
|
||||
logger.debug("Registration(UPDATE): encrypting app data.")
|
||||
jwe_req = self.jwe_handler.req_jwe_post(url, "registerUpdate", data,
|
||||
auth=True)
|
||||
logger.debug("Registration(UPDATE): sending app data.")
|
||||
ans = self._send_req(jwe_req)
|
||||
if ans.status_code != 200:
|
||||
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
||||
logger.debug("Registration(UPDATE): decrypting response.")
|
||||
ans_json = self.jwe_handler.decrypt(ans.content)
|
||||
logger.debug(f"Registration(UPDATE): decrypted response: {ans_json}")
|
||||
if 'data' not in ans_json:
|
||||
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
||||
if 'app-uuid' not in ans_json['data']:
|
||||
raise RequestFailure("Malformed request, missing 'app-uuid'.", ans)
|
||||
if 'otpSecretKey' not in ans_json['data']:
|
||||
raise RequestFailure("Malformed request, missing 'otpSecretKey'.",
|
||||
ans)
|
||||
self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid']
|
||||
self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey']
|
||||
|
||||
def register_xkey(self):
|
||||
register_nonce = str(uuid4())
|
||||
logger.debug(f"Starting registration (nonce: {register_nonce}).")
|
||||
self._register_stage_init(register_nonce)
|
||||
if self.appdata['xkey_seed']:
|
||||
self._register_stage_update(register_nonce)
|
||||
else:
|
||||
self._register_stage_register(register_nonce)
|
||||
self._register_stage_activate()
|
10
pyproject.toml
Normal file
10
pyproject.toml
Normal file
@@ -0,0 +1,10 @@
|
||||
[build-system]
|
||||
requires = [
|
||||
"setuptools>=42",
|
||||
"wheel",
|
||||
"setuptools_scm[toml]>3.4",
|
||||
]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.setuptools_scm]
|
||||
write_to = "pyjod/version.py"
|
42
setup.cfg
Normal file
42
setup.cfg
Normal file
@@ -0,0 +1,42 @@
|
||||
[metadata]
|
||||
name = pyjod
|
||||
version = attr:pyjod.version
|
||||
description = Python implementation of PosteID.
|
||||
long_description = file:README.md
|
||||
long_description_content_type = text/markdown
|
||||
author = Vendetta
|
||||
author_email = aaron@guerrilla.open
|
||||
classifiers =
|
||||
Development Status :: 1 - Planning
|
||||
Programming Language :: Python
|
||||
Programming Language :: Python :: 3.9
|
||||
Programming Language :: Python :: 3.10
|
||||
|
||||
platforms = any
|
||||
|
||||
[options]
|
||||
packages =
|
||||
pyjod
|
||||
install_requires =
|
||||
xdgappdirs>=1.4
|
||||
cryptography>=3.3
|
||||
jwcrypto>=1.3
|
||||
pathvalidate>=2.5
|
||||
pyotp>=2.6
|
||||
requests
|
||||
|
||||
setup_requires =
|
||||
setuptools_scm
|
||||
|
||||
[options.extras_require]
|
||||
qr =
|
||||
numpy
|
||||
qrcodeT
|
||||
scanqr =
|
||||
numpy
|
||||
opencv-python
|
||||
pyautogui
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
posteid = pyjod.cli:run
|
Reference in New Issue
Block a user