50 lines
1.5 KiB
Python
50 lines
1.5 KiB
Python
import time
|
|
import hmac
|
|
import hashlib
|
|
import logging
|
|
|
|
from aiohttp.web import middleware, HTTPFound
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def middleware_factory():
|
|
@middleware
|
|
async def factory(request, handler):
|
|
if request.app["is_authenticated"] and str(request.rel_url.path) not in [
|
|
"/login",
|
|
"/logout",
|
|
]:
|
|
cookies = request.cookies
|
|
url = request.app.router["login_page"].url_for()
|
|
if any(x not in cookies for x in ("_tgindex_session", "_tgindex_secret")):
|
|
raise HTTPFound(url)
|
|
|
|
tgindex_session = cookies["_tgindex_session"]
|
|
tgindex_secret = cookies["_tgindex_secret"]
|
|
calculated_digest = hmac.new(
|
|
request.app["SECRET_KEY"].encode(),
|
|
str(tgindex_session).encode(),
|
|
hashlib.sha256,
|
|
).hexdigest()
|
|
if tgindex_secret != calculated_digest:
|
|
raise HTTPFound(url)
|
|
|
|
try:
|
|
created_at = (
|
|
float(tgindex_session) + request.app["SESSION_COOKIE_LIFETIME"]
|
|
)
|
|
if (
|
|
time.time()
|
|
> created_at + 60 * request.app["SESSION_COOKIE_LIFETIME"]
|
|
):
|
|
raise HTTPFound(url)
|
|
except Exception as e:
|
|
log.error(e, exc_info=True)
|
|
raise HTTPFound(url)
|
|
|
|
return await handler(request)
|
|
|
|
return factory
|