[fix] connection reuse ++ shared dns

This commit is contained in:
Adam Tauber 2016-02-07 17:27:25 +01:00
parent 4f753fb14d
commit 34afcf2541
2 changed files with 39 additions and 54 deletions

View File

@ -1,14 +1,22 @@
import logging import logging
import pycurl import pycurl
import threading
from itertools import cycle
from StringIO import StringIO from StringIO import StringIO
from time import time
from urllib import urlencode from urllib import urlencode
CURL_SHARE = pycurl.CurlShare()
CURL_SHARE.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS)
MULTI_HANDLER = pycurl.CurlMulti()
def __test_callback(*args): def __test_callback(*args):
print "callback called" print "callback called"
def get_new_connection(source_address=None): def get_connection(source_address=None):
# pycurl initialization # pycurl initialization
h = pycurl.Curl() h = pycurl.Curl()
@ -18,46 +26,14 @@ def get_new_connection(source_address=None):
# consistently use ipv4 # consistently use ipv4
h.setopt(h.IPRESOLVE, pycurl.IPRESOLVE_V4) h.setopt(h.IPRESOLVE, pycurl.IPRESOLVE_V4)
h.setopt(pycurl.SHARE, CURL_SHARE)
if source_address: if source_address:
h.setopt(h.INTERFACE, source_address) h.setopt(h.INTERFACE, source_address)
return h return h
class ConnectionSources(object):
def __init__(self, sources=None):
self.sources = []
if sources:
for s in sources:
self.sources.append(get_new_connection(s))
else:
self.sources.append(get_new_connection())
self.ptr = 0
def get_source(self):
source = self.sources[self.ptr]
self.ptr = (self.ptr + 1) % len(self.sources)
return source
class ConnectionCache(object):
def __init__(self, source_ips=None):
self.source_ips = source_ips if source_ips else None
self.connections = {}
def get_connection(self, key):
if key in self.connections:
return self.connections[key].get_source()
sources = ConnectionSources(self.source_ips)
self.connections[key] = sources
return sources.get_source()
CONNECTION_CACHE = ConnectionCache()
MULTI_HANDLER = pycurl.CurlMulti()
class RequestContainer(object): class RequestContainer(object):
def __init__(self, def __init__(self,
url, url,
@ -110,29 +86,27 @@ class ResponseContainer(object):
class MultiRequest(object): class MultiRequest(object):
def __init__(self, connection_cache=None, multi_handler=None): def __init__(self, multi_handler=None, source_ips=None):
self.requests = {} self.requests = {}
if connection_cache:
self.connection_cache = connection_cache
else:
self.connection_cache = CONNECTION_CACHE
if multi_handler: if multi_handler:
self._curl_multi_handler = multi_handler self._curl_multi_handler = multi_handler
else: else:
self._curl_multi_handler = MULTI_HANDLER self._curl_multi_handler = MULTI_HANDLER
def add(self, connection_name, url, **kwargs): self.source_ips = cycle(source_ips) if source_ips else cycle([None])
handle = self.connection_cache.get_connection(connection_name)
def add(self, url, **kwargs):
handle = get_connection(next(self.source_ips))
request_container = RequestContainer(url, handle, **kwargs) request_container = RequestContainer(url, handle, **kwargs)
try: try:
self._curl_multi_handler.add_handle(handle) self._curl_multi_handler.add_handle(handle)
except: except:
print 'meep'
pass pass
self.requests[handle] = request_container self.requests[handle] = request_container
def perform_requests(self): def send_requests(self):
select_timeout = 0.1 select_timeout = 0.1
# set timeout # set timeout
@ -154,6 +128,7 @@ class MultiRequest(object):
else: else:
h.unsetopt(h.COOKIE) h.unsetopt(h.COOKIE)
search_start = time()
handles_num = len(self.requests) handles_num = len(self.requests)
while handles_num: while handles_num:
self._curl_multi_handler.select(select_timeout) self._curl_multi_handler.select(select_timeout)
@ -164,23 +139,35 @@ class MultiRequest(object):
_, success_list, error_list = self._curl_multi_handler.info_read() _, success_list, error_list = self._curl_multi_handler.info_read()
# calling callbacks # calling callbacks
for h in success_list: for h in success_list:
self.requests[h].finish() th = threading.Thread(
target=self.requests[h].finish(),
name='search_request',
)
th.start()
# self.requests[h].finish()
for h, err_code, err_string in error_list: for h, err_code, err_string in error_list:
logging.warn('Error on %s: "%s"', self.requests[h].url, err_string) logging.warn('Error on %s: "%s"', self.requests[h].url, err_string)
handles_num -= len(success_list) + len(error_list) handles_num -= len(success_list) + len(error_list)
if ret != pycurl.E_CALL_MULTI_PERFORM: if ret != pycurl.E_CALL_MULTI_PERFORM:
break break
for th in threading.enumerate():
if th.name == 'search_request':
remaining_time = max(0.0, timeout - (time() - search_start))
th.join(remaining_time)
if th.isAlive():
logging.warning('engine timeout: {0}'.format(th._engine_name))
# self._curl_multi_handler.close() # self._curl_multi_handler.close()
return self.requests.values() return self.requests.values()
if __name__ == '__main__': if __name__ == '__main__':
r = MultiRequest() r = MultiRequest()
r.add('a', 'http://httpbin.org/delay/0', headers={'User-Agent': 'x'}) r.add('http://httpbin.org/delay/0', headers={'User-Agent': 'x'})
r.add('d', 'http://127.0.0.1:7777/', headers={'User-Agent': 'x'}) r.add('http://127.0.0.1:7777/', headers={'User-Agent': 'x'})
r.add('b', 'http://httpbin.org/delay/0', cookies={'as': 'sa', 'bb': 'cc'}) r.add('http://httpbin.org/delay/0', cookies={'as': 'sa', 'bb': 'cc'})
r.add('c', 'http://httpbin.org/delay/0', callback=__test_callback, timeout=1.0, headers={'User-Agent': 'x'}) r.add('http://httpbin.org/delay/0', callback=__test_callback, timeout=1.0, headers={'User-Agent': 'x'})
for v in r.perform_requests(): for v in r.send_requests():
print v.url print v.url
print v.response.text print v.response.text

View File

@ -320,15 +320,13 @@ class Search(object):
continue continue
# append request to list # append request to list
mr.add((selected_engine['name'], selected_engine['category']), mr.add(request_params['url'], **request_args)
request_params['url'],
**request_args)
if not mr.requests: if not mr.requests:
return self return self
# send all search-request # send all search-request
mr.perform_requests() mr.send_requests()
# return results, suggestions, answers and infoboxes # return results, suggestions, answers and infoboxes
return self return self