Last active
October 6, 2020 15:48
-
-
Save tomchristie/e6e5e06b6c7b77b9c726b74d1ba0acbc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Client(): | |
... | |
def request( | |
self, | |
method: str, | |
url: URLTypes, | |
*, | |
content: RequestContent = None, | |
data: RequestData = None, | |
files: RequestFiles = None, | |
json: typing.Any = None, | |
params: QueryParamTypes = None, | |
headers: HeaderTypes = None, | |
cookies: CookieTypes = None, | |
auth: typing.Union[AuthTypes, UnsetType] = UNSET, | |
allow_redirects: bool = True, | |
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET, | |
) -> Response: | |
""" | |
Build and send a request. | |
Equivalent to: | |
```python | |
request = client.build_request(...) | |
with client.send(request, ...) as response: | |
response.read() | |
``` | |
See `Client.build_request()`, `Client.send()` and | |
[Merging of configuration][0] for how the various parameters | |
are merged with client-level configuration. | |
[0]: /advanced/#merging-of-configuration | |
""" | |
request = self.build_request( | |
method=method, | |
url=url, | |
content=content, | |
data=data, | |
files=files, | |
json=json, | |
params=params, | |
headers=headers, | |
cookies=cookies, | |
) | |
with self.send( | |
request=request, | |
auth=auth, | |
allow_redirects=allow_redirects, | |
timeout=timeout, | |
) as response: | |
response.read() | |
return response | |
@contextlib.contextmanager | |
def stream( | |
self, | |
method: str, | |
url: URLTypes, | |
*, | |
content: RequestContent = None, | |
data: RequestData = None, | |
files: RequestFiles = None, | |
json: typing.Any = None, | |
params: QueryParamTypes = None, | |
headers: HeaderTypes = None, | |
cookies: CookieTypes = None, | |
auth: typing.Union[AuthTypes, UnsetType] = UNSET, | |
allow_redirects: bool = True, | |
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET, | |
) -> typing.ContextManager[Response]: | |
""" | |
Alternative to `httpx.request()` that streams the response body | |
instead of loading it into memory at once. | |
**Parameters**: See `httpx.request`. | |
See also: [Streaming Responses][0] | |
[0]: /quickstart#streaming-responses | |
""" | |
request = self.build_request( | |
method=method, | |
url=url, | |
content=content, | |
data=data, | |
files=files, | |
json=json, | |
params=params, | |
headers=headers, | |
cookies=cookies, | |
) | |
with self.send( | |
request=request, | |
auth=auth, | |
allow_redirects=allow_redirects, | |
timeout=timeout, | |
) as response: | |
yield response | |
@contextlib.contextmanager | |
def send( | |
self, | |
request: Request, | |
*, | |
auth: typing.Union[AuthTypes, UnsetType] = UNSET, | |
allow_redirects: bool = True, | |
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET, | |
) -> typing.ContextManager[Response]: | |
""" | |
Send a request. | |
The request is sent as-is, unmodified. | |
Typically you'll want to build one with `Client.build_request()` | |
so that any client-level configuration is merged into the request, | |
but passing an explicit `httpx.Request()` is supported as well. | |
See also: [Request instances][0] | |
[0]: /advanced/#request-instances | |
""" | |
if self._state == ClientState.CLOSED: | |
raise RuntimeError("Cannot send a request, as the client has been closed.") | |
self._state = ClientState.OPENED | |
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout) | |
auth = self._build_request_auth(request, auth) | |
with self._send_handling_auth( | |
request, | |
auth=auth, | |
timeout=timeout, | |
allow_redirects=allow_redirects, | |
history=[], | |
) as response: | |
for hook in self._event_hooks["response"]: | |
hook(response) | |
yield response | |
@contextlib.contextmanager | |
def _send_handling_auth( | |
self, | |
request: Request, | |
auth: Auth, | |
timeout: Timeout, | |
allow_redirects: bool, | |
history: typing.List[Response], | |
) -> typing.ContextManager[Response]: | |
auth_flow = auth.sync_auth_flow(request) | |
request = next(auth_flow) | |
for hook in self._event_hooks["request"]: | |
hook(request) | |
while True: | |
with self._send_handling_redirects( | |
request, | |
timeout=timeout, | |
allow_redirects=allow_redirects, | |
history=history, | |
) as response: | |
try: | |
next_request = auth_flow.send(response) | |
except StopIteration: | |
yield response | |
break | |
response.history = list(history) | |
response.read() | |
request = next_request | |
history.append(response) | |
@contextlib.contextmanager | |
def _send_handling_redirects( | |
self, | |
request: Request, | |
timeout: Timeout, | |
allow_redirects: bool, | |
history: typing.List[Response], | |
) -> typing.ContextManager[Response]: | |
while True: | |
if len(history) > self.max_redirects: | |
raise TooManyRedirects( | |
"Exceeded maximum allowed redirects.", request=request | |
) | |
with self._send_single_request(request, timeout) as response: | |
response.history = list(history) | |
if not response.is_redirect: | |
yield response | |
break | |
request = self._build_redirect_request(request, response) | |
history = history + [response] | |
if allow_redirects: | |
response.read() | |
else: | |
response.next_request = request | |
yield response | |
break | |
@contextlib.contextmanager | |
def _send_single_request(self, request: Request, timeout: Timeout) -> typing.ContextManager[Response]: | |
""" | |
Sends a single request, without handling any redirections. | |
""" | |
transport = self._transport_for_url(request.url) | |
timer = Timer() | |
timer.sync_start() | |
with map_exceptions(HTTPCORE_EXC_MAP, request=request): | |
with transport.request( | |
request.method.encode(), | |
request.url.raw, | |
headers=request.headers.raw, | |
stream=request.stream, # type: ignore | |
ext={"timeout": timeout.as_dict()}, | |
) as (status_code, headers, stream, ext): | |
response = Response( | |
status_code, | |
headers=headers, | |
stream=stream, # type: ignore | |
ext=ext, | |
request=request, | |
) | |
self.cookies.extract_cookies(response) | |
status = f"{response.status_code} {response.reason_phrase}" | |
response_line = f"{response.http_version} {status}" | |
logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"') | |
yield response | |
response.elapsed = datetime.timedelta(seconds=timer.sync_elapsed()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import httpx | |
class HTTPTransport: | |
@contextlib.contextmanager | |
def request(self, method, url, headers, stream, ext): | |
yield 200, {}, [], {} | |
def close(self): | |
pass | |
c = httpx.Client(transport=HTTPTransport()) | |
r = c.get('https://www.example.org/') | |
print(r) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment