TelegramIndex-Fork/app/views/middlewhere.py

86 lines
2.3 KiB
Python
Raw Permalink Normal View History

2021-05-22 16:00:08 +02:00
import time
import logging
2021-11-01 05:02:46 +01:00
from typing import Coroutine, Union
2021-05-22 16:00:08 +02:00
2021-11-01 05:02:46 +01:00
from aiohttp.web import middleware, HTTPFound, Response, Request
from aiohttp import BasicAuth, hdrs
from aiohttp_session import get_session
2021-05-22 16:00:08 +02:00
log = logging.getLogger(__name__)
2021-11-01 05:02:46 +01:00
def _do_basic_auth_check(request: Request) -> Union[None, bool]:
2021-06-19 18:42:12 +02:00
if "download_" not in request.match_info.route.name:
return
2021-06-19 18:42:12 +02:00
auth = None
auth_header = request.headers.get(hdrs.AUTHORIZATION)
if auth_header is not None:
try:
auth = BasicAuth.decode(auth_header=auth_header)
except ValueError:
pass
if auth is None:
try:
auth = BasicAuth.from_url(request.url)
except ValueError:
pass
if not auth:
2021-06-19 18:42:12 +02:00
return Response(
body=b"",
status=401,
reason="UNAUTHORIZED",
headers={hdrs.WWW_AUTHENTICATE: 'Basic realm=""'},
)
if auth.login is None or auth.password is None:
return
2021-06-14 17:53:33 +02:00
if (
auth.login != request.app["username"]
or auth.password != request.app["password"]
):
return
return True
2021-11-01 05:02:46 +01:00
async def _do_cookies_auth_check(request: Request) -> Union[None, bool]:
session = await get_session(request)
if not session.get("logged_in", False):
return
session["last_at"] = time.time()
return True
2021-11-01 05:02:46 +01:00
def middleware_factory() -> Coroutine:
2021-05-22 16:00:08 +02:00
@middleware
2021-11-01 05:02:46 +01:00
async def factory(request: Request, handler: Coroutine) -> Response:
2021-05-22 16:00:08 +02:00
if request.app["is_authenticated"] and str(request.rel_url.path) not in [
"/login",
"/logout",
"/favicon.ico",
2021-05-22 16:00:08 +02:00
]:
url = request.app.router["login_page"].url_for()
2021-06-10 05:54:15 +02:00
if str(request.rel_url) != "/":
url = url.with_query(redirect_to=str(request.rel_url))
basic_auth_check_resp = _do_basic_auth_check(request)
2021-06-14 17:53:33 +02:00
if basic_auth_check_resp is True:
return await handler(request)
cookies_auth_check_resp = await _do_cookies_auth_check(request)
if cookies_auth_check_resp is not None:
return await handler(request)
2021-06-14 17:53:33 +02:00
if isinstance(basic_auth_check_resp, Response):
return basic_auth_check_resp
return HTTPFound(url)
2021-05-22 16:00:08 +02:00
return await handler(request)
return factory