[mod] add searx/webutils.py

contains utility functions and classes used only by webapp.py
This commit is contained in:
Alexandre Flament 2020-09-19 18:25:24 +02:00
parent f9664037a6
commit ad0758e52a
5 changed files with 198 additions and 202 deletions

View File

@ -1,27 +1,21 @@
# -*- coding: utf-8 -*-
import os
import sys
import csv
import hashlib
import hmac
import re
import json
from codecs import getincrementalencoder
from imp import load_source
from numbers import Number
from os.path import splitext, join
from io import open, StringIO
from io import open
from random import choice
from html.parser import HTMLParser
from lxml.etree import XPath
from babel.core import get_global
from babel.dates import format_date
from searx import settings
from searx.version import VERSION_STRING
from searx.languages import language_codes
from searx import settings
from searx import logger
@ -50,33 +44,6 @@ def gen_useragent(os=None):
return str(useragents['ua'].format(os=os or choice(useragents['os']), version=choice(useragents['versions'])))
def highlight_content(content, query):
if not content:
return None
# ignoring html contents
# TODO better html content detection
if content.find('<') != -1:
return content
if content.lower().find(query.lower()) > -1:
query_regex = '({0})'.format(re.escape(query))
content = re.sub(query_regex, '<span class="highlight">\\1</span>',
content, flags=re.I | re.U)
else:
regex_parts = []
for chunk in query.split():
if len(chunk) == 1:
regex_parts.append('\\W+{0}\\W+'.format(re.escape(chunk)))
else:
regex_parts.append('{0}'.format(re.escape(chunk)))
query_regex = '({0})'.format('|'.join(regex_parts))
content = re.sub(query_regex, '<span class="highlight">\\1</span>',
content, flags=re.I | re.U)
return content
class HTMLTextExtractorException(Exception):
pass
@ -139,91 +106,6 @@ def html_to_text(html):
return s.get_text()
class UnicodeWriter:
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow(row)
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.strip('\x00')
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data.decode())
# empty queue
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
def get_resources_directory(searx_directory, subdirectory, resources_directory):
if not resources_directory:
resources_directory = os.path.join(searx_directory, subdirectory)
if not os.path.isdir(resources_directory):
raise Exception(resources_directory + " is not a directory")
return resources_directory
def get_themes(templates_path):
"""Returns available themes list."""
themes = os.listdir(templates_path)
if '__common__' in themes:
themes.remove('__common__')
return themes
def get_static_files(static_path):
static_files = set()
static_path_length = len(static_path) + 1
for directory, _, files in os.walk(static_path):
for filename in files:
f = os.path.join(directory[static_path_length:], filename)
static_files.add(f)
return static_files
def get_result_templates(templates_path):
result_templates = set()
templates_path_length = len(templates_path) + 1
for directory, _, files in os.walk(templates_path):
if directory.endswith('result_templates'):
for filename in files:
f = os.path.join(directory[templates_path_length:], filename)
result_templates.add(f)
return result_templates
def format_date_by_locale(date, locale_string):
# strftime works only on dates after 1900
if date.year <= 1900:
return date.isoformat().split('T')[0]
if locale_string == 'all':
locale_string = settings['ui']['default_locale'] or 'en_US'
# to avoid crashing if locale is not supported by babel
try:
formatted_date = format_date(date, locale=locale_string)
except:
formatted_date = format_date(date, "YYYY-MM-dd")
return formatted_date
def dict_subset(d, properties):
result = {}
for k in properties:
@ -232,14 +114,6 @@ def dict_subset(d, properties):
return result
def prettify_url(url, max_length=74):
if len(url) > max_length:
chunk_len = int(max_length / 2 + 1)
return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
else:
return url
# get element in list or default value
def list_get(a_list, index, default=None):
if len(a_list) > index:
@ -383,17 +257,6 @@ def load_module(filename, module_dir):
return module
def new_hmac(secret_key, url):
try:
secret_key_bytes = bytes(secret_key, 'utf-8')
except TypeError as err:
if isinstance(secret_key, bytes):
secret_key_bytes = secret_key
else:
raise err
return hmac.new(secret_key_bytes, url, hashlib.sha256).hexdigest()
def to_string(obj):
if isinstance(obj, str):
return obj

View File

@ -62,11 +62,12 @@ from searx.exceptions import SearxParameterException
from searx.engines import (
categories, engines, engine_shortcuts, get_engines_stats, initialize_engines
)
from searx.utils import (
UnicodeWriter, highlight_content, html_to_text, get_resources_directory,
get_static_files, get_result_templates, get_themes, gen_useragent,
dict_subset, prettify_url, match_language
from searx.webutils import (
UnicodeWriter, highlight_content, get_resources_directory,
get_static_files, get_result_templates, get_themes,
prettify_url, new_hmac
)
from searx.utils import html_to_text, gen_useragent, dict_subset, match_language
from searx.version import VERSION_STRING
from searx.languages import language_codes as languages
from searx.search import SearchWithPlugins, get_search_query_from_webapp
@ -76,7 +77,6 @@ from searx.plugins import plugins
from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES
from searx.answerers import answerers
from searx.utils import new_hmac
# check if the pyopenssl package is installed.
# It is needed for SSL connection without trouble, see #298

127
searx/webutils.py Normal file
View File

@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
import os
import csv
import hashlib
import hmac
import re
from io import StringIO
from codecs import getincrementalencoder
from searx import logger
logger = logger.getChild('webutils')
class UnicodeWriter:
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow(row)
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.strip('\x00')
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data.decode())
# empty queue
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
def get_resources_directory(searx_directory, subdirectory, resources_directory):
if not resources_directory:
resources_directory = os.path.join(searx_directory, subdirectory)
if not os.path.isdir(resources_directory):
raise Exception(resources_directory + " is not a directory")
return resources_directory
def get_themes(templates_path):
"""Returns available themes list."""
themes = os.listdir(templates_path)
if '__common__' in themes:
themes.remove('__common__')
return themes
def get_static_files(static_path):
static_files = set()
static_path_length = len(static_path) + 1
for directory, _, files in os.walk(static_path):
for filename in files:
f = os.path.join(directory[static_path_length:], filename)
static_files.add(f)
return static_files
def get_result_templates(templates_path):
result_templates = set()
templates_path_length = len(templates_path) + 1
for directory, _, files in os.walk(templates_path):
if directory.endswith('result_templates'):
for filename in files:
f = os.path.join(directory[templates_path_length:], filename)
result_templates.add(f)
return result_templates
def new_hmac(secret_key, url):
try:
secret_key_bytes = bytes(secret_key, 'utf-8')
except TypeError as err:
if isinstance(secret_key, bytes):
secret_key_bytes = secret_key
else:
raise err
return hmac.new(secret_key_bytes, url, hashlib.sha256).hexdigest()
def prettify_url(url, max_length=74):
if len(url) > max_length:
chunk_len = int(max_length / 2 + 1)
return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
else:
return url
def highlight_content(content, query):
if not content:
return None
# ignoring html contents
# TODO better html content detection
if content.find('<') != -1:
return content
if content.lower().find(query.lower()) > -1:
query_regex = '({0})'.format(re.escape(query))
content = re.sub(query_regex, '<span class="highlight">\\1</span>',
content, flags=re.I | re.U)
else:
regex_parts = []
for chunk in query.split():
if len(chunk) == 1:
regex_parts.append('\\W+{0}\\W+'.format(re.escape(chunk)))
else:
regex_parts.append('{0}'.format(re.escape(chunk)))
query_regex = '({0})'.format('|'.join(regex_parts))
content = re.sub(query_regex, '<span class="highlight">\\1</span>',
content, flags=re.I | re.U)
return content

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import mock
from searx.testing import SearxTestCase
from searx import utils
@ -16,25 +15,6 @@ class TestUtils(SearxTestCase):
self.assertIsNotNone(utils.searx_useragent())
self.assertTrue(utils.searx_useragent().startswith('searx'))
def test_highlight_content(self):
self.assertEqual(utils.highlight_content(0, None), None)
self.assertEqual(utils.highlight_content(None, None), None)
self.assertEqual(utils.highlight_content('', None), None)
self.assertEqual(utils.highlight_content(False, None), None)
contents = [
'<html></html>'
'not<'
]
for content in contents:
self.assertEqual(utils.highlight_content(content, None), content)
content = 'a'
query = 'test'
self.assertEqual(utils.highlight_content(content, query), content)
query = 'a test'
self.assertEqual(utils.highlight_content(content, query), content)
def test_html_to_text(self):
html = """
<a href="/testlink" class="link_access_account">
@ -56,15 +36,6 @@ class TestUtils(SearxTestCase):
html = '<p><b>Lorem ipsum</i>dolor sit amet</p>'
self.assertEqual(utils.html_to_text(html), "Lorem ipsum")
def test_prettify_url(self):
data = (('https://searx.me/', 'https://searx.me/'),
('https://searx.me/ű', 'https://searx.me/ű'),
('https://searx.me/' + (100 * 'a'), 'https://searx.me/[...]aaaaaaaaaaaaaaaaa'),
('https://searx.me/' + (100 * 'ű'), 'https://searx.me/[...]űűűűűűűűűűűűűűűűű'))
for test_url, expected in data:
self.assertEqual(utils.prettify_url(test_url, max_length=32), expected)
def test_match_language(self):
self.assertEqual(utils.match_language('es', ['es']), 'es')
self.assertEqual(utils.match_language('es', [], fallback='fallback'), 'fallback')
@ -124,33 +95,3 @@ class TestHTMLTextExtractor(SearxTestCase):
text = '<p><b>Lorem ipsum</i>dolor sit amet</p>'
with self.assertRaises(utils.HTMLTextExtractorException):
self.html_text_extractor.feed(text)
class TestUnicodeWriter(SearxTestCase):
def setUp(self):
self.unicode_writer = utils.UnicodeWriter(mock.MagicMock())
def test_write_row(self):
row = [1, 2, 3]
self.assertEqual(self.unicode_writer.writerow(row), None)
def test_write_rows(self):
self.unicode_writer.writerow = mock.MagicMock()
rows = [1, 2, 3]
self.unicode_writer.writerows(rows)
self.assertEqual(self.unicode_writer.writerow.call_count, len(rows))
class TestNewHmac(SearxTestCase):
def test_bytes(self):
for secret_key in ['secret', b'secret', 1]:
if secret_key == 1:
with self.assertRaises(TypeError):
utils.new_hmac(secret_key, b'http://example.com')
continue
res = utils.new_hmac(secret_key, b'http://example.com')
self.assertEqual(
res,
'23e2baa2404012a5cc8e4a18b4aabf0dde4cb9b56f679ddc0fd6d7c24339d819')

View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
import mock
from searx.testing import SearxTestCase
from searx import webutils
class TestWebUtils(SearxTestCase):
def test_prettify_url(self):
data = (('https://searx.me/', 'https://searx.me/'),
('https://searx.me/ű', 'https://searx.me/ű'),
('https://searx.me/' + (100 * 'a'), 'https://searx.me/[...]aaaaaaaaaaaaaaaaa'),
('https://searx.me/' + (100 * 'ű'), 'https://searx.me/[...]űűűűűűűűűűűűűűűűű'))
for test_url, expected in data:
self.assertEqual(webutils.prettify_url(test_url, max_length=32), expected)
def test_highlight_content(self):
self.assertEqual(webutils.highlight_content(0, None), None)
self.assertEqual(webutils.highlight_content(None, None), None)
self.assertEqual(webutils.highlight_content('', None), None)
self.assertEqual(webutils.highlight_content(False, None), None)
contents = [
'<html></html>'
'not<'
]
for content in contents:
self.assertEqual(webutils.highlight_content(content, None), content)
content = 'a'
query = 'test'
self.assertEqual(webutils.highlight_content(content, query), content)
query = 'a test'
self.assertEqual(webutils.highlight_content(content, query), content)
class TestUnicodeWriter(SearxTestCase):
def setUp(self):
self.unicode_writer = webutils.UnicodeWriter(mock.MagicMock())
def test_write_row(self):
row = [1, 2, 3]
self.assertEqual(self.unicode_writer.writerow(row), None)
def test_write_rows(self):
self.unicode_writer.writerow = mock.MagicMock()
rows = [1, 2, 3]
self.unicode_writer.writerows(rows)
self.assertEqual(self.unicode_writer.writerow.call_count, len(rows))
class TestNewHmac(SearxTestCase):
def test_bytes(self):
for secret_key in ['secret', b'secret', 1]:
if secret_key == 1:
with self.assertRaises(TypeError):
webutils.new_hmac(secret_key, b'http://example.com')
continue
res = webutils.new_hmac(secret_key, b'http://example.com')
self.assertEqual(
res,
'23e2baa2404012a5cc8e4a18b4aabf0dde4cb9b56f679ddc0fd6d7c24339d819')