Source code for fdroid_dl.download.futuressession

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import logging
import hashlib
import time
import copy
from datetime import timedelta
from zipfile import ZipFile
from tempfile import NamedTemporaryFile
from requests.adapters import HTTPAdapter
from requests_futures.sessions import FuturesSession


LOGGER = logging.getLogger('download.FuturesSessionFlex')
[docs]class FuturesSessionFlex(FuturesSession): BLOCKSIZE = 65536 SUFFIXES = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] def __init__(self, max_workers=1, user_agent='Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)', *args, **kwargs): kwargs.update({'max_workers': max_workers}) super(FuturesSessionFlex, self).__init__(*args, **kwargs) self.__sessions = {} self.__sessions_keys = [] self.__fs_kwargs = {} self.__fs_kwargs.update(kwargs) _adapter_kwargs = {'pool_connections': max_workers, 'pool_maxsize': max_workers, 'pool_block': True} self.mount('https://', HTTPAdapter(**_adapter_kwargs)) self.mount('http://', HTTPAdapter(**_adapter_kwargs)) if self.headers is None: self.headers = {} self.headers.update({'User-Agent': user_agent})
[docs] def map(self, pattern='http://', session=None): ''' if called with session None -> default session for ctor is used ''' kwargs = copy.deepcopy(self.__fs_kwargs) kwargs['session'] = session if pattern not in self.__sessions: self.__sessions_keys.append(pattern) self.__sessions[pattern] = FuturesSessionFlex(*(), **kwargs) self.__sessions_keys = sorted(self.__sessions_keys, key=len, reverse=True)
def set_headers(self, headers): self.headers.update(headers) @staticmethod def h_size(nbytes): i = 0 while nbytes >= 1024 and i < len(FuturesSessionFlex.SUFFIXES)-1: nbytes /= 1024. i += 1 fsize = ('%.2f' % nbytes).rstrip('0').rstrip('.') return '%s %s' % (fsize, FuturesSessionFlex.SUFFIXES[i]) # pylint: disable=W0613 @staticmethod def add_size(response, *args, **kwargs): if 'Content-Length' in response.headers: response.size = int(response.headers.get('Content-Length', 0)) else: LOGGER.warning("Content-Length Header not provided by %s", response.url) response.size = len(response.content) response.h_size = FuturesSessionFlex.h_size(response.size) return response # pylint: disable=W0613 @staticmethod def add_hash(response, *args, **kwargs): response.hash = None if response.ok: timestamp = str(time.time()) cache_var = response.headers.get('Last-Modified', timestamp) + response.headers.get('ETag', timestamp) response.hash = hashlib.sha1(str(cache_var).encode('UTF-8')).hexdigest() return response @staticmethod def extract_jar(response, *args, **kwargs): if response.ok: start = time.time() response = FuturesSessionFlex.add_size(response, *args, **kwargs) response.index = NamedTemporaryFile() with NamedTemporaryFile() as temp_file: for chunk in response.iter_content(chunk_size=FuturesSessionFlex.BLOCKSIZE): if chunk: temp_file.write(chunk) zip_file = ZipFile(temp_file) idxfile = 'index-v1.json' if 'index-v1.json' in zip_file.namelist() else 'index.xml' with zip_file.open(idxfile) as file: while True: byte = file.read(FuturesSessionFlex.BLOCKSIZE) if not byte: break response.index.write(byte) LOGGER.debug("%s - %s - (%s)", response.index.name, response.url, FuturesSessionFlex.h_size(os.stat(response.index.name).st_size)) elapsed = time.time() - start response.elapsed += timedelta(seconds=elapsed) return response def __lookup_fs_session(self, url): # fast direct matches if url in self.__sessions: return self.__sessions[url] # slower pattern search depends on pattern count and size for k in self.__sessions_keys: if url.find(k) == 0: return self.__sessions[k] return None
[docs] def request(self, *args, **kwargs): session = self.__lookup_fs_session(args[1]) if not session is None: return session.request(*args, **kwargs) return super(FuturesSessionFlex, self).request(*args, **kwargs)
[docs] def close(self): try: for key, session in self.__sessions.items(): session.close() except Exception: LOGGER.exception("Error closing sessions")