mirror of
https://github.com/DNSCrypt/dnscrypt-proxy.git
synced 2025-01-09 01:53:47 +01:00
252b10c996
This is very clumsy, as it doesn't handle time-based rules properly, and doesn't handle whitelists at all. Adding globs to the "names" list is also an ugly hack just to have them included in the final output.
325 lines
9.7 KiB
Python
Executable File
325 lines
9.7 KiB
Python
Executable File
#! /usr/bin/env python
|
|
|
|
# run with python generate-domains-blacklist.py > list.txt.tmp && mv -f list.txt.tmp list
|
|
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
import fnmatch
|
|
|
|
try:
|
|
import urllib2 as urllib
|
|
|
|
URLLIB_NEW = False
|
|
except (ImportError, ModuleNotFoundError):
|
|
import urllib.request as urllib
|
|
from urllib.request import Request
|
|
|
|
URLLIB_NEW = True
|
|
|
|
|
|
def parse_time_restricted_list(content):
|
|
rx_comment = re.compile(r"^(#|$)")
|
|
rx_inline_comment = re.compile(r"\s*#\s*[a-z0-9-].*$")
|
|
rx_trusted = re.compile(r"^([*a-z0-9.-]+)\s*(@\S+)?$")
|
|
|
|
names = set()
|
|
time_restrictions = {}
|
|
globs = set()
|
|
rx_set = [rx_trusted]
|
|
for line in content.splitlines():
|
|
line = str.lower(str.strip(line))
|
|
if rx_comment.match(line):
|
|
continue
|
|
line = str.strip(rx_inline_comment.sub("", line))
|
|
if is_glob(line):
|
|
globs.add(line)
|
|
names.add(line)
|
|
continue
|
|
for rx in rx_set:
|
|
matches = rx.match(line)
|
|
if not matches:
|
|
continue
|
|
name = matches.group(1)
|
|
names.add(name)
|
|
time_restriction = matches.group(2)
|
|
if time_restriction:
|
|
time_restrictions[name] = time_restriction
|
|
return names, time_restrictions, globs
|
|
|
|
|
|
def parse_trusted_list(content):
|
|
names, _time_restrictions, globs = parse_time_restricted_list(content)
|
|
time_restrictions = {}
|
|
return names, time_restrictions, globs
|
|
|
|
|
|
def parse_list(content, trusted=False):
|
|
rx_comment = re.compile(r"^(#|$)")
|
|
rx_inline_comment = re.compile(r"\s*#\s*[a-z0-9-].*$")
|
|
rx_u = re.compile(
|
|
r"^@*\|\|([a-z0-9][a-z0-9.-]*[.][a-z]{2,})\^?(\$(popup|third-party))?$")
|
|
rx_l = re.compile(r"^([a-z0-9][a-z0-9.-]*[.][a-z]{2,})$")
|
|
rx_h = re.compile(
|
|
r"^[0-9]{1,3}[.][0-9]{1,3}[.][0-9]{1,3}[.][0-9]{1,3}\s+([a-z0-9][a-z0-9.-]*[.][a-z]{2,})$"
|
|
)
|
|
rx_mdl = re.compile(r'^"[^"]+","([a-z0-9][a-z0-9.-]*[.][a-z]{2,})",')
|
|
rx_b = re.compile(r"^([a-z0-9][a-z0-9.-]*[.][a-z]{2,}),.+,[0-9: /-]+,")
|
|
rx_dq = re.compile(r"^address=/([a-z0-9][a-z0-9.-]*[.][a-z]{2,})/.")
|
|
|
|
if trusted:
|
|
return parse_trusted_list(content)
|
|
|
|
names = set()
|
|
time_restrictions = {}
|
|
globs = set()
|
|
rx_set = [rx_u, rx_l, rx_h, rx_mdl, rx_b, rx_dq]
|
|
for line in content.splitlines():
|
|
line = str.lower(str.strip(line))
|
|
if rx_comment.match(line):
|
|
continue
|
|
line = str.strip(rx_inline_comment.sub("", line))
|
|
if trusted and is_glob(line):
|
|
globs.add(line)
|
|
names.add(line)
|
|
continue
|
|
for rx in rx_set:
|
|
matches = rx.match(line)
|
|
if not matches:
|
|
continue
|
|
name = matches.group(1)
|
|
names.add(name)
|
|
return names, time_restrictions, globs
|
|
|
|
|
|
def print_restricted_name(output_fd, name, time_restrictions):
|
|
if name in time_restrictions:
|
|
print("{}\t{}".format(
|
|
name, time_restrictions[name]), file=output_fd, end='\n')
|
|
else:
|
|
print(
|
|
"# ignored: [{}] was in the time-restricted list, "
|
|
"but without a time restriction label".format(name), file=output_fd, end='\n'
|
|
)
|
|
|
|
|
|
def load_from_url(url):
|
|
sys.stderr.write("Loading data from [{}]\n".format(url))
|
|
req = urllib.Request(url=url, headers={"User-Agent": "dnscrypt-proxy"})
|
|
trusted = False
|
|
|
|
if URLLIB_NEW:
|
|
req_type = req.type
|
|
else:
|
|
req_type = req.get_type()
|
|
if req_type == "file":
|
|
trusted = True
|
|
|
|
response = None
|
|
try:
|
|
response = urllib.urlopen(req, timeout=int(args.timeout))
|
|
except urllib.URLError as err:
|
|
raise Exception("[{}] could not be loaded: {}\n".format(url, err))
|
|
if trusted is False and response.getcode() != 200:
|
|
raise Exception("[{}] returned HTTP code {}\n".format(
|
|
url, response.getcode()))
|
|
content = response.read()
|
|
if URLLIB_NEW:
|
|
content = content.decode("utf-8", errors="replace")
|
|
|
|
return content, trusted
|
|
|
|
|
|
def name_cmp(name):
|
|
parts = name.split(".")
|
|
parts.reverse()
|
|
return str.join(".", parts)
|
|
|
|
|
|
def is_glob(pattern):
|
|
maybe_glob = False
|
|
for i in range(len(pattern)):
|
|
c = pattern[i]
|
|
if c == "?" or c == "[":
|
|
maybe_glob = True
|
|
elif c == "*" and i != 0:
|
|
if i < len(pattern) - 1 or pattern[i - 1] == '.':
|
|
maybe_glob = True
|
|
if maybe_glob:
|
|
try:
|
|
fnmatch.fnmatch("example", pattern)
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
|
|
def covered_by_glob(globs, name):
|
|
if name in globs:
|
|
return False
|
|
for glob in globs:
|
|
try:
|
|
if fnmatch.fnmatch(name, glob):
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
|
|
def has_suffix(names, name):
|
|
parts = str.split(name, ".")
|
|
while parts:
|
|
parts = parts[1:]
|
|
if str.join(".", parts) in names:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def whitelist_from_url(url):
|
|
if not url:
|
|
return set()
|
|
content, trusted = load_from_url(url)
|
|
|
|
names, _time_restrictions, _globs = parse_list(content, trusted)
|
|
return names
|
|
|
|
|
|
def blacklists_from_config_file(
|
|
file, whitelist, time_restricted_url, ignore_retrieval_failure, output_file
|
|
):
|
|
blacklists = {}
|
|
whitelisted_names = set()
|
|
all_names = set()
|
|
unique_names = set()
|
|
all_globs = set()
|
|
|
|
# Load conf & blacklists
|
|
with open(file) as fd:
|
|
for line in fd:
|
|
line = str.strip(line)
|
|
if str.startswith(line, "#") or line == "":
|
|
continue
|
|
url = line
|
|
try:
|
|
content, trusted = load_from_url(url)
|
|
names, _time_restrictions, globs = parse_list(content, trusted)
|
|
blacklists[url] = names
|
|
all_names |= names
|
|
all_globs |= globs
|
|
except Exception as e:
|
|
sys.stderr.write(str(e))
|
|
if not ignore_retrieval_failure:
|
|
exit(1)
|
|
|
|
# Time-based blacklist
|
|
if time_restricted_url and not re.match(r"^[a-z0-9]+:", time_restricted_url):
|
|
time_restricted_url = "file:" + time_restricted_url
|
|
|
|
output_fd = sys.stdout
|
|
if output_file:
|
|
output_fd = open(output_file, "w")
|
|
|
|
if time_restricted_url:
|
|
time_restricted_content, _trusted = load_from_url(time_restricted_url)
|
|
time_restricted_names, time_restrictions, _globs = parse_time_restricted_list(
|
|
time_restricted_content)
|
|
|
|
if time_restricted_names:
|
|
print("########## Time-based blacklist ##########\n",
|
|
file=output_fd, end='\n')
|
|
for name in time_restricted_names:
|
|
print_restricted_name(output_fd, name, time_restrictions)
|
|
|
|
# Time restricted names should be whitelisted, or they could be always blocked
|
|
whitelisted_names |= time_restricted_names
|
|
|
|
# Whitelist
|
|
if whitelist and not re.match(r"^[a-z0-9]+:", whitelist):
|
|
whitelist = "file:" + whitelist
|
|
|
|
whitelisted_names |= whitelist_from_url(whitelist)
|
|
|
|
# Process blacklists
|
|
for url, names in blacklists.items():
|
|
print("\n\n########## Blacklist from {} ##########\n".format(
|
|
url), file=output_fd, end='\n')
|
|
ignored, glob_ignored, whitelisted = 0, 0, 0
|
|
list_names = list()
|
|
for name in names:
|
|
if covered_by_glob(all_globs, name):
|
|
glob_ignored = glob_ignored + 1
|
|
elif has_suffix(all_names, name) or name in unique_names:
|
|
ignored = ignored + 1
|
|
elif has_suffix(whitelisted_names, name) or name in whitelisted_names:
|
|
whitelisted = whitelisted + 1
|
|
else:
|
|
list_names.append(name)
|
|
unique_names.add(name)
|
|
|
|
list_names.sort(key=name_cmp)
|
|
if ignored:
|
|
print("# Ignored duplicates: {}".format(
|
|
ignored), file=output_fd, end='\n')
|
|
if glob_ignored:
|
|
print("# Ignored due to overlapping local patterns: {}".format(
|
|
glob_ignored), file=output_fd, end='\n')
|
|
if whitelisted:
|
|
print(
|
|
"# Ignored entries due to the whitelist: {}".format(whitelisted), file=output_fd, end='\n')
|
|
if ignored or glob_ignored or whitelisted:
|
|
print(file=output_fd, end='\n')
|
|
for name in list_names:
|
|
print(name, file=output_fd, end='\n')
|
|
|
|
output_fd.close()
|
|
|
|
|
|
argp = argparse.ArgumentParser(
|
|
description="Create a unified blacklist from a set of local and remote files"
|
|
)
|
|
argp.add_argument(
|
|
"-c",
|
|
"--config",
|
|
default="domains-blacklist.conf",
|
|
help="file containing blacklist sources",
|
|
)
|
|
argp.add_argument(
|
|
"-w",
|
|
"--whitelist",
|
|
default="domains-whitelist.txt",
|
|
help="file containing a set of names to exclude from the blacklist",
|
|
)
|
|
argp.add_argument(
|
|
"-r",
|
|
"--time-restricted",
|
|
default="domains-time-restricted.txt",
|
|
help="file containing a set of names to be time restricted",
|
|
)
|
|
argp.add_argument(
|
|
"-i",
|
|
"--ignore-retrieval-failure",
|
|
action="store_true",
|
|
help="generate list even if some urls couldn't be retrieved",
|
|
)
|
|
argp.add_argument(
|
|
"-o",
|
|
"--output-file",
|
|
default=None,
|
|
help="save generated blacklist to a text file with the provided file name",
|
|
)
|
|
argp.add_argument("-t", "--timeout", default=30, help="URL open timeout")
|
|
|
|
args = argp.parse_args()
|
|
|
|
conf = args.config
|
|
whitelist = args.whitelist
|
|
time_restricted = args.time_restricted
|
|
ignore_retrieval_failure = args.ignore_retrieval_failure
|
|
output_file = args.output_file
|
|
|
|
blacklists_from_config_file(
|
|
conf, whitelist, time_restricted, ignore_retrieval_failure, output_file)
|