diff --git a/searx/utils.py b/searx/utils.py index 0eb9f6a3..f74f2ac8 100644 --- a/searx/utils.py +++ b/searx/utils.py @@ -1,27 +1,21 @@ # -*- coding: utf-8 -*- import os import sys -import csv -import hashlib -import hmac import re import json -from codecs import getincrementalencoder from imp import load_source from numbers import Number from os.path import splitext, join -from io import open, StringIO +from io import open from random import choice from html.parser import HTMLParser from lxml.etree import XPath from babel.core import get_global -from babel.dates import format_date from searx import settings from searx.version import VERSION_STRING from searx.languages import language_codes -from searx import settings from searx import logger @@ -50,33 +44,6 @@ def gen_useragent(os=None): return str(useragents['ua'].format(os=os or choice(useragents['os']), version=choice(useragents['versions']))) -def highlight_content(content, query): - - if not content: - return None - # ignoring html contents - # TODO better html content detection - if content.find('<') != -1: - return content - - if content.lower().find(query.lower()) > -1: - query_regex = '({0})'.format(re.escape(query)) - content = re.sub(query_regex, '\\1', - content, flags=re.I | re.U) - else: - regex_parts = [] - for chunk in query.split(): - if len(chunk) == 1: - regex_parts.append('\\W+{0}\\W+'.format(re.escape(chunk))) - else: - regex_parts.append('{0}'.format(re.escape(chunk))) - query_regex = '({0})'.format('|'.join(regex_parts)) - content = re.sub(query_regex, '\\1', - content, flags=re.I | re.U) - - return content - - class HTMLTextExtractorException(Exception): pass @@ -139,91 +106,6 @@ def html_to_text(html): return s.get_text() -class UnicodeWriter: - """ - A CSV writer which will write rows to CSV file "f", - which is encoded in the given encoding. - """ - - def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): - # Redirect output to a queue - self.queue = StringIO() - self.writer = csv.writer(self.queue, dialect=dialect, **kwds) - self.stream = f - self.encoder = getincrementalencoder(encoding)() - - def writerow(self, row): - self.writer.writerow(row) - # Fetch UTF-8 output from the queue ... - data = self.queue.getvalue() - data = data.strip('\x00') - # ... and reencode it into the target encoding - data = self.encoder.encode(data) - # write to the target stream - self.stream.write(data.decode()) - # empty queue - self.queue.truncate(0) - - def writerows(self, rows): - for row in rows: - self.writerow(row) - - -def get_resources_directory(searx_directory, subdirectory, resources_directory): - if not resources_directory: - resources_directory = os.path.join(searx_directory, subdirectory) - if not os.path.isdir(resources_directory): - raise Exception(resources_directory + " is not a directory") - return resources_directory - - -def get_themes(templates_path): - """Returns available themes list.""" - themes = os.listdir(templates_path) - if '__common__' in themes: - themes.remove('__common__') - return themes - - -def get_static_files(static_path): - static_files = set() - static_path_length = len(static_path) + 1 - for directory, _, files in os.walk(static_path): - for filename in files: - f = os.path.join(directory[static_path_length:], filename) - static_files.add(f) - return static_files - - -def get_result_templates(templates_path): - result_templates = set() - templates_path_length = len(templates_path) + 1 - for directory, _, files in os.walk(templates_path): - if directory.endswith('result_templates'): - for filename in files: - f = os.path.join(directory[templates_path_length:], filename) - result_templates.add(f) - return result_templates - - -def format_date_by_locale(date, locale_string): - # strftime works only on dates after 1900 - - if date.year <= 1900: - return date.isoformat().split('T')[0] - - if locale_string == 'all': - locale_string = settings['ui']['default_locale'] or 'en_US' - - # to avoid crashing if locale is not supported by babel - try: - formatted_date = format_date(date, locale=locale_string) - except: - formatted_date = format_date(date, "YYYY-MM-dd") - - return formatted_date - - def dict_subset(d, properties): result = {} for k in properties: @@ -232,14 +114,6 @@ def dict_subset(d, properties): return result -def prettify_url(url, max_length=74): - if len(url) > max_length: - chunk_len = int(max_length / 2 + 1) - return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:]) - else: - return url - - # get element in list or default value def list_get(a_list, index, default=None): if len(a_list) > index: @@ -383,17 +257,6 @@ def load_module(filename, module_dir): return module -def new_hmac(secret_key, url): - try: - secret_key_bytes = bytes(secret_key, 'utf-8') - except TypeError as err: - if isinstance(secret_key, bytes): - secret_key_bytes = secret_key - else: - raise err - return hmac.new(secret_key_bytes, url, hashlib.sha256).hexdigest() - - def to_string(obj): if isinstance(obj, str): return obj diff --git a/searx/webapp.py b/searx/webapp.py index f2ef8b20..90bc8fc6 100755 --- a/searx/webapp.py +++ b/searx/webapp.py @@ -62,11 +62,12 @@ from searx.exceptions import SearxParameterException from searx.engines import ( categories, engines, engine_shortcuts, get_engines_stats, initialize_engines ) -from searx.utils import ( - UnicodeWriter, highlight_content, html_to_text, get_resources_directory, - get_static_files, get_result_templates, get_themes, gen_useragent, - dict_subset, prettify_url, match_language +from searx.webutils import ( + UnicodeWriter, highlight_content, get_resources_directory, + get_static_files, get_result_templates, get_themes, + prettify_url, new_hmac ) +from searx.utils import html_to_text, gen_useragent, dict_subset, match_language from searx.version import VERSION_STRING from searx.languages import language_codes as languages from searx.search import SearchWithPlugins, get_search_query_from_webapp @@ -76,7 +77,6 @@ from searx.plugins import plugins from searx.plugins.oa_doi_rewrite import get_doi_resolver from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES from searx.answerers import answerers -from searx.utils import new_hmac # check if the pyopenssl package is installed. # It is needed for SSL connection without trouble, see #298 diff --git a/searx/webutils.py b/searx/webutils.py new file mode 100644 index 00000000..2900c0ed --- /dev/null +++ b/searx/webutils.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +import os +import csv +import hashlib +import hmac +import re + +from io import StringIO +from codecs import getincrementalencoder + +from searx import logger + + +logger = logger.getChild('webutils') + + +class UnicodeWriter: + """ + A CSV writer which will write rows to CSV file "f", + which is encoded in the given encoding. + """ + + def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): + # Redirect output to a queue + self.queue = StringIO() + self.writer = csv.writer(self.queue, dialect=dialect, **kwds) + self.stream = f + self.encoder = getincrementalencoder(encoding)() + + def writerow(self, row): + self.writer.writerow(row) + # Fetch UTF-8 output from the queue ... + data = self.queue.getvalue() + data = data.strip('\x00') + # ... and reencode it into the target encoding + data = self.encoder.encode(data) + # write to the target stream + self.stream.write(data.decode()) + # empty queue + self.queue.truncate(0) + + def writerows(self, rows): + for row in rows: + self.writerow(row) + + +def get_resources_directory(searx_directory, subdirectory, resources_directory): + if not resources_directory: + resources_directory = os.path.join(searx_directory, subdirectory) + if not os.path.isdir(resources_directory): + raise Exception(resources_directory + " is not a directory") + return resources_directory + + +def get_themes(templates_path): + """Returns available themes list.""" + themes = os.listdir(templates_path) + if '__common__' in themes: + themes.remove('__common__') + return themes + + +def get_static_files(static_path): + static_files = set() + static_path_length = len(static_path) + 1 + for directory, _, files in os.walk(static_path): + for filename in files: + f = os.path.join(directory[static_path_length:], filename) + static_files.add(f) + return static_files + + +def get_result_templates(templates_path): + result_templates = set() + templates_path_length = len(templates_path) + 1 + for directory, _, files in os.walk(templates_path): + if directory.endswith('result_templates'): + for filename in files: + f = os.path.join(directory[templates_path_length:], filename) + result_templates.add(f) + return result_templates + + +def new_hmac(secret_key, url): + try: + secret_key_bytes = bytes(secret_key, 'utf-8') + except TypeError as err: + if isinstance(secret_key, bytes): + secret_key_bytes = secret_key + else: + raise err + return hmac.new(secret_key_bytes, url, hashlib.sha256).hexdigest() + + +def prettify_url(url, max_length=74): + if len(url) > max_length: + chunk_len = int(max_length / 2 + 1) + return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:]) + else: + return url + + +def highlight_content(content, query): + + if not content: + return None + # ignoring html contents + # TODO better html content detection + if content.find('<') != -1: + return content + + if content.lower().find(query.lower()) > -1: + query_regex = '({0})'.format(re.escape(query)) + content = re.sub(query_regex, '\\1', + content, flags=re.I | re.U) + else: + regex_parts = [] + for chunk in query.split(): + if len(chunk) == 1: + regex_parts.append('\\W+{0}\\W+'.format(re.escape(chunk))) + else: + regex_parts.append('{0}'.format(re.escape(chunk))) + query_regex = '({0})'.format('|'.join(regex_parts)) + content = re.sub(query_regex, '\\1', + content, flags=re.I | re.U) + + return content diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 08b75954..69f5ef92 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import mock from searx.testing import SearxTestCase from searx import utils @@ -16,25 +15,6 @@ class TestUtils(SearxTestCase): self.assertIsNotNone(utils.searx_useragent()) self.assertTrue(utils.searx_useragent().startswith('searx')) - def test_highlight_content(self): - self.assertEqual(utils.highlight_content(0, None), None) - self.assertEqual(utils.highlight_content(None, None), None) - self.assertEqual(utils.highlight_content('', None), None) - self.assertEqual(utils.highlight_content(False, None), None) - - contents = [ - '' - 'not<' - ] - for content in contents: - self.assertEqual(utils.highlight_content(content, None), content) - - content = 'a' - query = 'test' - self.assertEqual(utils.highlight_content(content, query), content) - query = 'a test' - self.assertEqual(utils.highlight_content(content, query), content) - def test_html_to_text(self): html = """ @@ -56,15 +36,6 @@ class TestUtils(SearxTestCase): html = '

Lorem ipsumdolor sit amet

' self.assertEqual(utils.html_to_text(html), "Lorem ipsum") - def test_prettify_url(self): - data = (('https://searx.me/', 'https://searx.me/'), - ('https://searx.me/ű', 'https://searx.me/ű'), - ('https://searx.me/' + (100 * 'a'), 'https://searx.me/[...]aaaaaaaaaaaaaaaaa'), - ('https://searx.me/' + (100 * 'ű'), 'https://searx.me/[...]űűűűűűűűűűűűűűűűű')) - - for test_url, expected in data: - self.assertEqual(utils.prettify_url(test_url, max_length=32), expected) - def test_match_language(self): self.assertEqual(utils.match_language('es', ['es']), 'es') self.assertEqual(utils.match_language('es', [], fallback='fallback'), 'fallback') @@ -124,33 +95,3 @@ class TestHTMLTextExtractor(SearxTestCase): text = '

Lorem ipsumdolor sit amet

' with self.assertRaises(utils.HTMLTextExtractorException): self.html_text_extractor.feed(text) - - -class TestUnicodeWriter(SearxTestCase): - - def setUp(self): - self.unicode_writer = utils.UnicodeWriter(mock.MagicMock()) - - def test_write_row(self): - row = [1, 2, 3] - self.assertEqual(self.unicode_writer.writerow(row), None) - - def test_write_rows(self): - self.unicode_writer.writerow = mock.MagicMock() - rows = [1, 2, 3] - self.unicode_writer.writerows(rows) - self.assertEqual(self.unicode_writer.writerow.call_count, len(rows)) - - -class TestNewHmac(SearxTestCase): - - def test_bytes(self): - for secret_key in ['secret', b'secret', 1]: - if secret_key == 1: - with self.assertRaises(TypeError): - utils.new_hmac(secret_key, b'http://example.com') - continue - res = utils.new_hmac(secret_key, b'http://example.com') - self.assertEqual( - res, - '23e2baa2404012a5cc8e4a18b4aabf0dde4cb9b56f679ddc0fd6d7c24339d819') diff --git a/tests/unit/test_webutils.py b/tests/unit/test_webutils.py new file mode 100644 index 00000000..aa464688 --- /dev/null +++ b/tests/unit/test_webutils.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +import mock +from searx.testing import SearxTestCase +from searx import webutils + + +class TestWebUtils(SearxTestCase): + + def test_prettify_url(self): + data = (('https://searx.me/', 'https://searx.me/'), + ('https://searx.me/ű', 'https://searx.me/ű'), + ('https://searx.me/' + (100 * 'a'), 'https://searx.me/[...]aaaaaaaaaaaaaaaaa'), + ('https://searx.me/' + (100 * 'ű'), 'https://searx.me/[...]űűűűűűűűűűűűűűűűű')) + + for test_url, expected in data: + self.assertEqual(webutils.prettify_url(test_url, max_length=32), expected) + + def test_highlight_content(self): + self.assertEqual(webutils.highlight_content(0, None), None) + self.assertEqual(webutils.highlight_content(None, None), None) + self.assertEqual(webutils.highlight_content('', None), None) + self.assertEqual(webutils.highlight_content(False, None), None) + + contents = [ + '' + 'not<' + ] + for content in contents: + self.assertEqual(webutils.highlight_content(content, None), content) + + content = 'a' + query = 'test' + self.assertEqual(webutils.highlight_content(content, query), content) + query = 'a test' + self.assertEqual(webutils.highlight_content(content, query), content) + + +class TestUnicodeWriter(SearxTestCase): + + def setUp(self): + self.unicode_writer = webutils.UnicodeWriter(mock.MagicMock()) + + def test_write_row(self): + row = [1, 2, 3] + self.assertEqual(self.unicode_writer.writerow(row), None) + + def test_write_rows(self): + self.unicode_writer.writerow = mock.MagicMock() + rows = [1, 2, 3] + self.unicode_writer.writerows(rows) + self.assertEqual(self.unicode_writer.writerow.call_count, len(rows)) + + +class TestNewHmac(SearxTestCase): + + def test_bytes(self): + for secret_key in ['secret', b'secret', 1]: + if secret_key == 1: + with self.assertRaises(TypeError): + webutils.new_hmac(secret_key, b'http://example.com') + continue + res = webutils.new_hmac(secret_key, b'http://example.com') + self.assertEqual( + res, + '23e2baa2404012a5cc8e4a18b4aabf0dde4cb9b56f679ddc0fd6d7c24339d819')