Created
October 14, 2020 15:06
-
-
Save sgraaf/81eefa6fcb1f9efee25fb28eaa379ded to your computer and use it in GitHub Desktop.
A requests Session that can make concurrent GET and POST-requests.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from concurrent.futures import Future, ThreadPoolExecutor | |
from functools import partial | |
from itertools import repeat | |
from typing import Any, Dict, List, Optional, Union | |
from requests import Response, Session | |
class ConcurrentSession(Session): | |
def __init__(self, max_workers: Optional[int] = os.cpu_count() * 5) -> None: | |
super().__init__() | |
self.executor = ThreadPoolExecutor(max_workers=max_workers) | |
def get( | |
self, | |
url_or_urls: Union[str, List[str]], | |
return_futures: Optional[bool] = False, | |
**kwargs | |
) -> Union[Response, List[Response], Future, List[Future]]: | |
_get = partial(super().get, **kwargs) | |
if isinstance(url_or_urls, str): # single URL | |
future = self.executor.submit(_get, url_or_urls) | |
return future if return_futures else future.result() | |
if return_futures: # return a `Future` object instead of a `Response` object | |
return [self.executor.submit(_get, url) for url in url_or_urls] | |
return list(self.executor.map(_get, url_or_urls)) | |
def post( | |
self, | |
url_or_urls: Union[str, List[str]], | |
data: Optional[Union[Any, List[Any]]] = None, | |
json: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, | |
return_futures: Optional[bool] = False, | |
**kwargs | |
) -> Union[Response, List[Response]]: | |
_post = partial(super().post, data=data, json=json, **kwargs) | |
if isinstance(url_or_urls, str): # single URL | |
assert not(isinstance(data, list) or isinstance(json, list)) | |
future = self.executor.submit(_post, url_or_urls) | |
return future if return_futures else future.result() | |
if isinstance(data, list): # different data for each URL | |
assert len(url_or_urls) == len(data) | |
if return_futures: # return a `Future` object instead of a `Response` object | |
return [self.executor.submit(partial(super().post, json=json, **kwargs), url, _data) for url, _data in zip(url_or_urls, data)] | |
return list(self.executor.map(partial(super().post, json=json, **kwargs), url_or_urls, data)) | |
if isinstance(json, list): # different JSON for each URL | |
assert len(url_or_urls) == len(json) | |
if return_futures: # return a `Future` object instead of a `Response` object | |
return [self.executor.submit(partial(super().post, **kwargs), url, None, _json) for url, _json in zip(url_or_urls, json)] | |
return list(self.executor.map(partial(super().post, **kwargs), url_or_urls, repeat[None], json)) | |
if return_futures: # return a `Future` object instead of a `Response` object | |
return [self.executor.submit(_post, url) for url in url_or_urls] | |
return list(self.executor.map(_post, url_or_urls)) | |
def close(self) -> None: | |
self.executor.shutdown(wait=True) | |
super().close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment