sim16/matteo_env/Lib/site-packages/aiohttp/client_reqrep.py

1134 lines
36 KiB
Python
Raw Normal View History

2020-12-20 00:08:09 +00:00
import asyncio
import codecs
2022-09-18 13:17:20 +00:00
import functools
2020-12-20 00:08:09 +00:00
import io
import re
import sys
import traceback
import warnings
from hashlib import md5, sha1, sha256
from http.cookies import CookieError, Morsel, SimpleCookie
from types import MappingProxyType, TracebackType
2022-09-18 13:17:20 +00:00
from typing import (
2020-12-20 00:08:09 +00:00
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Mapping,
Optional,
Tuple,
Type,
Union,
cast,
)
import attr
from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
from yarl import URL
from . import hdrs, helpers, http, multipart, payload
from .abc import AbstractStreamWriter
from .client_exceptions import (
ClientConnectionError,
ClientOSError,
ClientResponseError,
ContentTypeError,
InvalidURL,
ServerFingerprintMismatch,
)
from .formdata import FormData
2022-09-18 13:17:20 +00:00
from .helpers import (
2020-12-20 00:08:09 +00:00
PY_36,
BaseTimerContext,
BasicAuth,
HeadersMixin,
TimerNoop,
noop,
reify,
set_result,
)
from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter
from .log import client_logger
2022-09-18 13:17:20 +00:00
from .streams import StreamReader
2020-12-20 00:08:09 +00:00
from .typedefs import (
DEFAULT_JSON_DECODER,
JSONDecoder,
LooseCookies,
LooseHeaders,
RawHeaders,
)
try:
import ssl
from ssl import SSLContext
except ImportError: # pragma: no cover
2022-09-18 13:17:20 +00:00
ssl = None # type: ignore[assignment]
SSLContext = object # type: ignore[misc,assignment]
2020-12-20 00:08:09 +00:00
try:
import cchardet as chardet
except ImportError: # pragma: no cover
2022-09-18 13:17:20 +00:00
import charset_normalizer as chardet # type: ignore[no-redef]
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
2020-12-20 00:08:09 +00:00
if TYPE_CHECKING: # pragma: no cover
2022-09-18 13:17:20 +00:00
from .client import ClientSession
from .connector import Connection
from .tracing import Trace
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
@attr.s(auto_attribs=True, frozen=True, slots=True)
2020-12-20 00:08:09 +00:00
class ContentDisposition:
2022-09-18 13:17:20 +00:00
type: Optional[str]
parameters: "MappingProxyType[str, str]"
filename: Optional[str]
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
@attr.s(auto_attribs=True, frozen=True, slots=True)
2020-12-20 00:08:09 +00:00
class RequestInfo:
2022-09-18 13:17:20 +00:00
url: URL
method: str
headers: "CIMultiDictProxy[str]"
real_url: URL = attr.ib()
2020-12-20 00:08:09 +00:00
@real_url.default
def real_url_default(self) -> URL:
return self.url
class Fingerprint:
HASHFUNC_BY_DIGESTLEN = {
16: md5,
20: sha1,
32: sha256,
}
def __init__(self, fingerprint: bytes) -> None:
digestlen = len(fingerprint)
hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
if not hashfunc:
2022-09-18 13:17:20 +00:00
raise ValueError("fingerprint has invalid length")
2020-12-20 00:08:09 +00:00
elif hashfunc is md5 or hashfunc is sha1:
2022-09-18 13:17:20 +00:00
raise ValueError(
"md5 and sha1 are insecure and " "not supported. Use sha256."
)
2020-12-20 00:08:09 +00:00
self._hashfunc = hashfunc
self._fingerprint = fingerprint
@property
def fingerprint(self) -> bytes:
return self._fingerprint
def check(self, transport: asyncio.Transport) -> None:
2022-09-18 13:17:20 +00:00
if not transport.get_extra_info("sslcontext"):
2020-12-20 00:08:09 +00:00
return
2022-09-18 13:17:20 +00:00
sslobj = transport.get_extra_info("ssl_object")
2020-12-20 00:08:09 +00:00
cert = sslobj.getpeercert(binary_form=True)
got = self._hashfunc(cert).digest()
if got != self._fingerprint:
2022-09-18 13:17:20 +00:00
host, port, *_ = transport.get_extra_info("peername")
raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
2020-12-20 00:08:09 +00:00
if ssl is not None:
SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
else: # pragma: no cover
SSL_ALLOWED_TYPES = type(None)
def _merge_ssl_params(
2022-09-18 13:17:20 +00:00
ssl: Union["SSLContext", bool, Fingerprint, None],
verify_ssl: Optional[bool],
ssl_context: Optional["SSLContext"],
fingerprint: Optional[bytes],
) -> Union["SSLContext", bool, Fingerprint, None]:
2020-12-20 00:08:09 +00:00
if verify_ssl is not None and not verify_ssl:
2022-09-18 13:17:20 +00:00
warnings.warn(
"verify_ssl is deprecated, use ssl=False instead",
DeprecationWarning,
stacklevel=3,
)
2020-12-20 00:08:09 +00:00
if ssl is not None:
2022-09-18 13:17:20 +00:00
raise ValueError(
"verify_ssl, ssl_context, fingerprint and ssl "
"parameters are mutually exclusive"
)
2020-12-20 00:08:09 +00:00
else:
ssl = False
if ssl_context is not None:
2022-09-18 13:17:20 +00:00
warnings.warn(
"ssl_context is deprecated, use ssl=context instead",
DeprecationWarning,
stacklevel=3,
)
2020-12-20 00:08:09 +00:00
if ssl is not None:
2022-09-18 13:17:20 +00:00
raise ValueError(
"verify_ssl, ssl_context, fingerprint and ssl "
"parameters are mutually exclusive"
)
2020-12-20 00:08:09 +00:00
else:
ssl = ssl_context
if fingerprint is not None:
2022-09-18 13:17:20 +00:00
warnings.warn(
"fingerprint is deprecated, " "use ssl=Fingerprint(fingerprint) instead",
DeprecationWarning,
stacklevel=3,
)
2020-12-20 00:08:09 +00:00
if ssl is not None:
2022-09-18 13:17:20 +00:00
raise ValueError(
"verify_ssl, ssl_context, fingerprint and ssl "
"parameters are mutually exclusive"
)
2020-12-20 00:08:09 +00:00
else:
ssl = Fingerprint(fingerprint)
if not isinstance(ssl, SSL_ALLOWED_TYPES):
2022-09-18 13:17:20 +00:00
raise TypeError(
"ssl should be SSLContext, bool, Fingerprint or None, "
"got {!r} instead.".format(ssl)
)
2020-12-20 00:08:09 +00:00
return ssl
2022-09-18 13:17:20 +00:00
@attr.s(auto_attribs=True, slots=True, frozen=True)
2020-12-20 00:08:09 +00:00
class ConnectionKey:
# the key should contain an information about used proxy / TLS
# to prevent reusing wrong connections from a pool
2022-09-18 13:17:20 +00:00
host: str
port: Optional[int]
is_ssl: bool
ssl: Union[SSLContext, None, bool, Fingerprint]
proxy: Optional[URL]
proxy_auth: Optional[BasicAuth]
proxy_headers_hash: Optional[int] # hash(CIMultiDict)
def _is_expected_content_type(
response_content_type: str, expected_content_type: str
) -> bool:
if expected_content_type == "application/json":
2020-12-20 00:08:09 +00:00
return json_re.match(response_content_type) is not None
return expected_content_type in response_content_type
class ClientRequest:
GET_METHODS = {
hdrs.METH_GET,
hdrs.METH_HEAD,
hdrs.METH_OPTIONS,
hdrs.METH_TRACE,
}
POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
DEFAULT_HEADERS = {
2022-09-18 13:17:20 +00:00
hdrs.ACCEPT: "*/*",
hdrs.ACCEPT_ENCODING: "gzip, deflate",
2020-12-20 00:08:09 +00:00
}
2022-09-18 13:17:20 +00:00
body = b""
2020-12-20 00:08:09 +00:00
auth = None
response = None
_writer = None # async task for streaming data
_continue = None # waiter future for '100 Continue' response
# N.B.
# Adding __del__ method with self._writer closing doesn't make sense
# because _writer is instance method, thus it keeps a reference to self.
# Until writer has finished finalizer will not be called.
2022-09-18 13:17:20 +00:00
def __init__(
self,
method: str,
url: URL,
*,
params: Optional[Mapping[str, str]] = None,
headers: Optional[LooseHeaders] = None,
skip_auto_headers: Iterable[str] = frozenset(),
data: Any = None,
cookies: Optional[LooseCookies] = None,
auth: Optional[BasicAuth] = None,
version: http.HttpVersion = http.HttpVersion11,
compress: Optional[str] = None,
chunked: Optional[bool] = None,
expect100: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
response_class: Optional[Type["ClientResponse"]] = None,
proxy: Optional[URL] = None,
proxy_auth: Optional[BasicAuth] = None,
timer: Optional[BaseTimerContext] = None,
session: Optional["ClientSession"] = None,
ssl: Union[SSLContext, bool, Fingerprint, None] = None,
proxy_headers: Optional[LooseHeaders] = None,
traces: Optional[List["Trace"]] = None,
):
2020-12-20 00:08:09 +00:00
if loop is None:
loop = asyncio.get_event_loop()
assert isinstance(url, URL), url
assert isinstance(proxy, (URL, type(None))), proxy
# FIXME: session is None in tests only, need to fix tests
# assert session is not None
2022-09-18 13:17:20 +00:00
self._session = cast("ClientSession", session)
2020-12-20 00:08:09 +00:00
if params:
q = MultiDict(url.query)
url2 = url.with_query(params)
q.extend(url2.query)
url = url.with_query(q)
self.original_url = url
self.url = url.with_fragment(None)
self.method = method.upper()
self.chunked = chunked
self.compress = compress
self.loop = loop
self.length = None
if response_class is None:
real_response_class = ClientResponse
else:
real_response_class = response_class
self.response_class = real_response_class # type: Type[ClientResponse]
self._timer = timer if timer is not None else TimerNoop()
self._ssl = ssl
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
self.update_version(version)
self.update_host(url)
self.update_headers(headers)
self.update_auto_headers(skip_auto_headers)
self.update_cookies(cookies)
self.update_content_encoding(data)
self.update_auth(auth)
self.update_proxy(proxy, proxy_auth, proxy_headers)
self.update_body_from_data(data)
2022-09-18 13:17:20 +00:00
if data is not None or self.method not in self.GET_METHODS:
2020-12-20 00:08:09 +00:00
self.update_transfer_encoding()
self.update_expect_continue(expect100)
if traces is None:
traces = []
self._traces = traces
def is_ssl(self) -> bool:
2022-09-18 13:17:20 +00:00
return self.url.scheme in ("https", "wss")
2020-12-20 00:08:09 +00:00
@property
2022-09-18 13:17:20 +00:00
def ssl(self) -> Union["SSLContext", None, bool, Fingerprint]:
2020-12-20 00:08:09 +00:00
return self._ssl
@property
def connection_key(self) -> ConnectionKey:
proxy_headers = self.proxy_headers
if proxy_headers:
2022-09-18 13:17:20 +00:00
h = hash(
tuple((k, v) for k, v in proxy_headers.items())
) # type: Optional[int]
2020-12-20 00:08:09 +00:00
else:
h = None
2022-09-18 13:17:20 +00:00
return ConnectionKey(
self.host,
self.port,
self.is_ssl(),
self.ssl,
self.proxy,
self.proxy_auth,
h,
)
2020-12-20 00:08:09 +00:00
@property
def host(self) -> str:
2022-09-18 13:17:20 +00:00
ret = self.url.raw_host
2020-12-20 00:08:09 +00:00
assert ret is not None
return ret
@property
def port(self) -> Optional[int]:
return self.url.port
@property
def request_info(self) -> RequestInfo:
headers = CIMultiDictProxy(self.headers) # type: CIMultiDictProxy[str]
2022-09-18 13:17:20 +00:00
return RequestInfo(self.url, self.method, headers, self.original_url)
2020-12-20 00:08:09 +00:00
def update_host(self, url: URL) -> None:
"""Update destination host, port and connection type (ssl)."""
# get host/port
2022-09-18 13:17:20 +00:00
if not url.raw_host:
2020-12-20 00:08:09 +00:00
raise InvalidURL(url)
# basic auth info
username, password = url.user, url.password
if username:
2022-09-18 13:17:20 +00:00
self.auth = helpers.BasicAuth(username, password or "")
2020-12-20 00:08:09 +00:00
def update_version(self, version: Union[http.HttpVersion, str]) -> None:
"""Convert request version to two elements tuple.
parser HTTP version '1.1' => (1, 1)
"""
if isinstance(version, str):
2022-09-18 13:17:20 +00:00
v = [part.strip() for part in version.split(".", 1)]
2020-12-20 00:08:09 +00:00
try:
version = http.HttpVersion(int(v[0]), int(v[1]))
except ValueError:
raise ValueError(
2022-09-18 13:17:20 +00:00
f"Can not parse http version number: {version}"
) from None
2020-12-20 00:08:09 +00:00
self.version = version
def update_headers(self, headers: Optional[LooseHeaders]) -> None:
"""Update request headers."""
self.headers = CIMultiDict() # type: CIMultiDict[str]
# add host
netloc = cast(str, self.url.raw_host)
if helpers.is_ipv6_address(netloc):
2022-09-18 13:17:20 +00:00
netloc = f"[{netloc}]"
2020-12-20 00:08:09 +00:00
if self.url.port is not None and not self.url.is_default_port():
2022-09-18 13:17:20 +00:00
netloc += ":" + str(self.url.port)
2020-12-20 00:08:09 +00:00
self.headers[hdrs.HOST] = netloc
if headers:
if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
2022-09-18 13:17:20 +00:00
headers = headers.items() # type: ignore[assignment]
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
for key, value in headers: # type: ignore[misc]
2020-12-20 00:08:09 +00:00
# A special case for Host header
2022-09-18 13:17:20 +00:00
if key.lower() == "host":
2020-12-20 00:08:09 +00:00
self.headers[key] = value
else:
self.headers.add(key, value)
def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None:
self.skip_auto_headers = CIMultiDict(
2022-09-18 13:17:20 +00:00
(hdr, None) for hdr in sorted(skip_auto_headers)
)
2020-12-20 00:08:09 +00:00
used_headers = self.headers.copy()
2022-09-18 13:17:20 +00:00
used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type]
2020-12-20 00:08:09 +00:00
for hdr, val in self.DEFAULT_HEADERS.items():
if hdr not in used_headers:
self.headers.add(hdr, val)
if hdrs.USER_AGENT not in used_headers:
self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
"""Update request cookies header."""
if not cookies:
return
2022-09-18 13:17:20 +00:00
c = SimpleCookie() # type: SimpleCookie[str]
2020-12-20 00:08:09 +00:00
if hdrs.COOKIE in self.headers:
2022-09-18 13:17:20 +00:00
c.load(self.headers.get(hdrs.COOKIE, ""))
2020-12-20 00:08:09 +00:00
del self.headers[hdrs.COOKIE]
if isinstance(cookies, Mapping):
iter_cookies = cookies.items()
else:
2022-09-18 13:17:20 +00:00
iter_cookies = cookies # type: ignore[assignment]
2020-12-20 00:08:09 +00:00
for name, value in iter_cookies:
if isinstance(value, Morsel):
# Preserve coded_value
mrsl_val = value.get(value.key, Morsel())
mrsl_val.set(value.key, value.value, value.coded_value)
c[name] = mrsl_val
else:
2022-09-18 13:17:20 +00:00
c[name] = value # type: ignore[assignment]
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
2020-12-20 00:08:09 +00:00
def update_content_encoding(self, data: Any) -> None:
"""Set request content encoding."""
2022-09-18 13:17:20 +00:00
if data is None:
2020-12-20 00:08:09 +00:00
return
2022-09-18 13:17:20 +00:00
enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower()
2020-12-20 00:08:09 +00:00
if enc:
if self.compress:
raise ValueError(
2022-09-18 13:17:20 +00:00
"compress can not be set " "if Content-Encoding header is set"
)
2020-12-20 00:08:09 +00:00
elif self.compress:
if not isinstance(self.compress, str):
2022-09-18 13:17:20 +00:00
self.compress = "deflate"
2020-12-20 00:08:09 +00:00
self.headers[hdrs.CONTENT_ENCODING] = self.compress
self.chunked = True # enable chunked, no need to deal with length
def update_transfer_encoding(self) -> None:
"""Analyze transfer-encoding header."""
2022-09-18 13:17:20 +00:00
te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
if "chunked" in te:
2020-12-20 00:08:09 +00:00
if self.chunked:
raise ValueError(
2022-09-18 13:17:20 +00:00
"chunked can not be set "
'if "Transfer-Encoding: chunked" header is set'
)
2020-12-20 00:08:09 +00:00
elif self.chunked:
if hdrs.CONTENT_LENGTH in self.headers:
raise ValueError(
2022-09-18 13:17:20 +00:00
"chunked can not be set " "if Content-Length header is set"
)
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
2020-12-20 00:08:09 +00:00
else:
if hdrs.CONTENT_LENGTH not in self.headers:
self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
def update_auth(self, auth: Optional[BasicAuth]) -> None:
"""Set basic auth."""
if auth is None:
auth = self.auth
if auth is None:
return
if not isinstance(auth, helpers.BasicAuth):
2022-09-18 13:17:20 +00:00
raise TypeError("BasicAuth() tuple is required instead")
2020-12-20 00:08:09 +00:00
self.headers[hdrs.AUTHORIZATION] = auth.encode()
def update_body_from_data(self, body: Any) -> None:
2022-09-18 13:17:20 +00:00
if body is None:
2020-12-20 00:08:09 +00:00
return
# FormData
if isinstance(body, FormData):
body = body()
try:
body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
except payload.LookupError:
body = FormData(body)()
self.body = body
# enable chunked encoding if needed
if not self.chunked:
if hdrs.CONTENT_LENGTH not in self.headers:
size = body.size
if size is None:
self.chunked = True
else:
if hdrs.CONTENT_LENGTH not in self.headers:
self.headers[hdrs.CONTENT_LENGTH] = str(size)
# copy payload headers
assert body.headers
for (key, value) in body.headers.items():
if key in self.headers:
continue
if key in self.skip_auto_headers:
continue
self.headers[key] = value
2022-09-18 13:17:20 +00:00
def update_expect_continue(self, expect: bool = False) -> None:
2020-12-20 00:08:09 +00:00
if expect:
2022-09-18 13:17:20 +00:00
self.headers[hdrs.EXPECT] = "100-continue"
elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue":
2020-12-20 00:08:09 +00:00
expect = True
if expect:
self._continue = self.loop.create_future()
2022-09-18 13:17:20 +00:00
def update_proxy(
self,
proxy: Optional[URL],
proxy_auth: Optional[BasicAuth],
proxy_headers: Optional[LooseHeaders],
) -> None:
2020-12-20 00:08:09 +00:00
if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
raise ValueError("proxy_auth must be None or BasicAuth() tuple")
self.proxy = proxy
self.proxy_auth = proxy_auth
self.proxy_headers = proxy_headers
def keep_alive(self) -> bool:
if self.version < HttpVersion10:
# keep alive not supported at all
return False
if self.version == HttpVersion10:
2022-09-18 13:17:20 +00:00
if self.headers.get(hdrs.CONNECTION) == "keep-alive":
2020-12-20 00:08:09 +00:00
return True
else: # no headers means we close for Http 1.0
return False
2022-09-18 13:17:20 +00:00
elif self.headers.get(hdrs.CONNECTION) == "close":
2020-12-20 00:08:09 +00:00
return False
return True
2022-09-18 13:17:20 +00:00
async def write_bytes(
self, writer: AbstractStreamWriter, conn: "Connection"
) -> None:
2020-12-20 00:08:09 +00:00
"""Support coroutines that yields bytes objects."""
# 100 response
if self._continue is not None:
await writer.drain()
await self._continue
protocol = conn.protocol
assert protocol is not None
try:
if isinstance(self.body, payload.Payload):
await self.body.write(writer)
else:
if isinstance(self.body, (bytes, bytearray)):
2022-09-18 13:17:20 +00:00
self.body = (self.body,) # type: ignore[assignment]
2020-12-20 00:08:09 +00:00
for chunk in self.body:
2022-09-18 13:17:20 +00:00
await writer.write(chunk) # type: ignore[arg-type]
2020-12-20 00:08:09 +00:00
await writer.write_eof()
except OSError as exc:
new_exc = ClientOSError(
2022-09-18 13:17:20 +00:00
exc.errno, "Can not write request body for %s" % self.url
)
2020-12-20 00:08:09 +00:00
new_exc.__context__ = exc
new_exc.__cause__ = exc
protocol.set_exception(new_exc)
except asyncio.CancelledError as exc:
if not conn.closed:
protocol.set_exception(exc)
except Exception as exc:
protocol.set_exception(exc)
finally:
self._writer = None
2022-09-18 13:17:20 +00:00
async def send(self, conn: "Connection") -> "ClientResponse":
2020-12-20 00:08:09 +00:00
# Specify request target:
# - CONNECT request must send authority form URI
# - not CONNECT proxy must send absolute form URI
# - most common is origin form URI
if self.method == hdrs.METH_CONNECT:
connect_host = self.url.raw_host
assert connect_host is not None
if helpers.is_ipv6_address(connect_host):
2022-09-18 13:17:20 +00:00
connect_host = f"[{connect_host}]"
path = f"{connect_host}:{self.url.port}"
2020-12-20 00:08:09 +00:00
elif self.proxy and not self.is_ssl():
path = str(self.url)
else:
path = self.url.raw_path
if self.url.raw_query_string:
2022-09-18 13:17:20 +00:00
path += "?" + self.url.raw_query_string
2020-12-20 00:08:09 +00:00
protocol = conn.protocol
assert protocol is not None
writer = StreamWriter(
2022-09-18 13:17:20 +00:00
protocol,
self.loop,
on_chunk_sent=functools.partial(
self._on_chunk_request_sent, self.method, self.url
),
on_headers_sent=functools.partial(
self._on_headers_request_sent, self.method, self.url
),
2020-12-20 00:08:09 +00:00
)
if self.compress:
writer.enable_compression(self.compress)
if self.chunked is not None:
writer.enable_chunking()
# set default content-type
2022-09-18 13:17:20 +00:00
if (
self.method in self.POST_METHODS
and hdrs.CONTENT_TYPE not in self.skip_auto_headers
and hdrs.CONTENT_TYPE not in self.headers
):
self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
2020-12-20 00:08:09 +00:00
# set the connection header
connection = self.headers.get(hdrs.CONNECTION)
if not connection:
if self.keep_alive():
if self.version == HttpVersion10:
2022-09-18 13:17:20 +00:00
connection = "keep-alive"
2020-12-20 00:08:09 +00:00
else:
if self.version == HttpVersion11:
2022-09-18 13:17:20 +00:00
connection = "close"
2020-12-20 00:08:09 +00:00
if connection is not None:
self.headers[hdrs.CONNECTION] = connection
# status + headers
2022-09-18 13:17:20 +00:00
status_line = "{0} {1} HTTP/{2[0]}.{2[1]}".format(
self.method, path, self.version
)
2020-12-20 00:08:09 +00:00
await writer.write_headers(status_line, self.headers)
self._writer = self.loop.create_task(self.write_bytes(writer, conn))
response_class = self.response_class
assert response_class is not None
self.response = response_class(
2022-09-18 13:17:20 +00:00
self.method,
self.original_url,
writer=self._writer,
continue100=self._continue,
timer=self._timer,
2020-12-20 00:08:09 +00:00
request_info=self.request_info,
traces=self._traces,
loop=self.loop,
2022-09-18 13:17:20 +00:00
session=self._session,
2020-12-20 00:08:09 +00:00
)
return self.response
async def close(self) -> None:
if self._writer is not None:
try:
await self._writer
finally:
self._writer = None
def terminate(self) -> None:
if self._writer is not None:
if not self.loop.is_closed():
self._writer.cancel()
self._writer = None
2022-09-18 13:17:20 +00:00
async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
2020-12-20 00:08:09 +00:00
for trace in self._traces:
2022-09-18 13:17:20 +00:00
await trace.send_request_chunk_sent(method, url, chunk)
async def _on_headers_request_sent(
self, method: str, url: URL, headers: "CIMultiDict[str]"
) -> None:
for trace in self._traces:
await trace.send_request_headers(method, url, headers)
2020-12-20 00:08:09 +00:00
class ClientResponse(HeadersMixin):
# from the Status-Line of the response
version = None # HTTP-Version
2022-09-18 13:17:20 +00:00
status = None # type: int # Status-Code
reason = None # Reason-Phrase
2020-12-20 00:08:09 +00:00
content = None # type: StreamReader # Payload stream
_headers = None # type: CIMultiDictProxy[str] # Response headers
_raw_headers = None # type: RawHeaders # Response raw headers
_connection = None # current connection
_source_traceback = None
# setted up by ClientRequest after ClientResponse object creation
# post-init stage allows to not change ctor signature
_closed = True # to allow __del__ for non-initialized properly response
_released = False
2022-09-18 13:17:20 +00:00
def __init__(
self,
method: str,
url: URL,
*,
writer: "asyncio.Task[None]",
continue100: Optional["asyncio.Future[bool]"],
timer: BaseTimerContext,
request_info: RequestInfo,
traces: List["Trace"],
loop: asyncio.AbstractEventLoop,
session: "ClientSession",
) -> None:
2020-12-20 00:08:09 +00:00
assert isinstance(url, URL)
self.method = method
2022-09-18 13:17:20 +00:00
self.cookies = SimpleCookie() # type: SimpleCookie[str]
2020-12-20 00:08:09 +00:00
self._real_url = url
self._url = url.with_fragment(None)
self._body = None # type: Any
self._writer = writer # type: Optional[asyncio.Task[None]]
self._continue = continue100 # None by default
self._closed = True
self._history = () # type: Tuple[ClientResponse, ...]
self._request_info = request_info
self._timer = timer if timer is not None else TimerNoop()
self._cache = {} # type: Dict[str, Any]
self._traces = traces
self._loop = loop
# store a reference to session #1985
self._session = session # type: Optional[ClientSession]
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
@reify
def url(self) -> URL:
return self._url
@reify
def url_obj(self) -> URL:
2022-09-18 13:17:20 +00:00
warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
2020-12-20 00:08:09 +00:00
return self._url
@reify
def real_url(self) -> URL:
return self._real_url
@reify
def host(self) -> str:
assert self._url.host is not None
return self._url.host
@reify
2022-09-18 13:17:20 +00:00
def headers(self) -> "CIMultiDictProxy[str]":
2020-12-20 00:08:09 +00:00
return self._headers
@reify
def raw_headers(self) -> RawHeaders:
return self._raw_headers
@reify
def request_info(self) -> RequestInfo:
return self._request_info
@reify
def content_disposition(self) -> Optional[ContentDisposition]:
raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
if raw is None:
return None
disposition_type, params_dct = multipart.parse_content_disposition(raw)
params = MappingProxyType(params_dct)
filename = multipart.content_disposition_filename(params)
return ContentDisposition(disposition_type, params, filename)
2022-09-18 13:17:20 +00:00
def __del__(self, _warnings: Any = warnings) -> None:
2020-12-20 00:08:09 +00:00
if self._closed:
return
if self._connection is not None:
self._connection.release()
self._cleanup_writer()
if self._loop.get_debug():
if PY_36:
2022-09-18 13:17:20 +00:00
kwargs = {"source": self}
2020-12-20 00:08:09 +00:00
else:
kwargs = {}
2022-09-18 13:17:20 +00:00
_warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
context = {"client_response": self, "message": "Unclosed response"}
2020-12-20 00:08:09 +00:00
if self._source_traceback:
2022-09-18 13:17:20 +00:00
context["source_traceback"] = self._source_traceback
2020-12-20 00:08:09 +00:00
self._loop.call_exception_handler(context)
def __repr__(self) -> str:
out = io.StringIO()
ascii_encodable_url = str(self.url)
if self.reason:
2022-09-18 13:17:20 +00:00
ascii_encodable_reason = self.reason.encode(
"ascii", "backslashreplace"
).decode("ascii")
2020-12-20 00:08:09 +00:00
else:
ascii_encodable_reason = self.reason
2022-09-18 13:17:20 +00:00
print(
"<ClientResponse({}) [{} {}]>".format(
ascii_encodable_url, self.status, ascii_encodable_reason
),
file=out,
)
2020-12-20 00:08:09 +00:00
print(self.headers, file=out)
return out.getvalue()
@property
2022-09-18 13:17:20 +00:00
def connection(self) -> Optional["Connection"]:
2020-12-20 00:08:09 +00:00
return self._connection
@reify
2022-09-18 13:17:20 +00:00
def history(self) -> Tuple["ClientResponse", ...]:
2020-12-20 00:08:09 +00:00
"""A sequence of of responses, if redirects occurred."""
return self._history
@reify
2022-09-18 13:17:20 +00:00
def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
2020-12-20 00:08:09 +00:00
links_str = ", ".join(self.headers.getall("link", []))
if not links_str:
return MultiDictProxy(MultiDict())
links = MultiDict() # type: MultiDict[MultiDictProxy[Union[str, URL]]]
for val in re.split(r",(?=\s*<)", links_str):
match = re.match(r"\s*<(.*)>(.*)", val)
if match is None: # pragma: no cover
# the check exists to suppress mypy error
continue
url, params_str = match.groups()
params = params_str.split(";")[1:]
link = MultiDict() # type: MultiDict[Union[str, URL]]
for param in params:
2022-09-18 13:17:20 +00:00
match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
2020-12-20 00:08:09 +00:00
if match is None: # pragma: no cover
# the check exists to suppress mypy error
continue
key, _, value, _ = match.groups()
link.add(key, value)
2022-09-18 13:17:20 +00:00
key = link.get("rel", url) # type: ignore[assignment]
2020-12-20 00:08:09 +00:00
link.add("url", self.url.join(URL(url)))
links.add(key, MultiDictProxy(link))
return MultiDictProxy(links)
2022-09-18 13:17:20 +00:00
async def start(self, connection: "Connection") -> "ClientResponse":
2020-12-20 00:08:09 +00:00
"""Start response processing."""
self._closed = False
self._protocol = connection.protocol
self._connection = connection
with self._timer:
while True:
# read response
try:
2022-09-18 13:17:20 +00:00
protocol = self._protocol
message, payload = await protocol.read() # type: ignore[union-attr]
2020-12-20 00:08:09 +00:00
except http.HttpProcessingError as exc:
raise ClientResponseError(
2022-09-18 13:17:20 +00:00
self.request_info,
self.history,
2020-12-20 00:08:09 +00:00
status=exc.code,
2022-09-18 13:17:20 +00:00
message=exc.message,
headers=exc.headers,
) from exc
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
if message.code < 100 or message.code > 199 or message.code == 101:
2020-12-20 00:08:09 +00:00
break
if self._continue is not None:
set_result(self._continue, True)
self._continue = None
# payload eof handler
payload.on_eof(self._response_eof)
# response status
self.version = message.version
self.status = message.code
self.reason = message.reason
# headers
self._headers = message.headers # type is CIMultiDictProxy
self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
# payload
self.content = payload
# cookies
for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
try:
self.cookies.load(hdr)
except CookieError as exc:
2022-09-18 13:17:20 +00:00
client_logger.warning("Can not load response cookies: %s", exc)
2020-12-20 00:08:09 +00:00
return self
def _response_eof(self) -> None:
if self._closed:
return
if self._connection is not None:
# websocket, protocol could be None because
# connection could be detached
2022-09-18 13:17:20 +00:00
if (
self._connection.protocol is not None
and self._connection.protocol.upgraded
):
2020-12-20 00:08:09 +00:00
return
self._connection.release()
self._connection = None
self._closed = True
self._cleanup_writer()
@property
def closed(self) -> bool:
return self._closed
def close(self) -> None:
if not self._released:
self._notify_content()
if self._closed:
return
self._closed = True
if self._loop is None or self._loop.is_closed():
return
if self._connection is not None:
self._connection.close()
self._connection = None
self._cleanup_writer()
def release(self) -> Any:
if not self._released:
self._notify_content()
if self._closed:
return noop()
self._closed = True
if self._connection is not None:
self._connection.release()
self._connection = None
self._cleanup_writer()
return noop()
2022-09-18 13:17:20 +00:00
@property
def ok(self) -> bool:
"""Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
This is **not** a check for ``200 OK`` but a check that the response
status is under 400.
"""
return 400 > self.status
2020-12-20 00:08:09 +00:00
def raise_for_status(self) -> None:
2022-09-18 13:17:20 +00:00
if not self.ok:
2020-12-20 00:08:09 +00:00
# reason should always be not None for a started response
assert self.reason is not None
self.release()
raise ClientResponseError(
self.request_info,
self.history,
status=self.status,
message=self.reason,
2022-09-18 13:17:20 +00:00
headers=self.headers,
)
2020-12-20 00:08:09 +00:00
def _cleanup_writer(self) -> None:
if self._writer is not None:
self._writer.cancel()
self._writer = None
self._session = None
def _notify_content(self) -> None:
content = self.content
if content and content.exception() is None:
2022-09-18 13:17:20 +00:00
content.set_exception(ClientConnectionError("Connection closed"))
2020-12-20 00:08:09 +00:00
self._released = True
async def wait_for_close(self) -> None:
if self._writer is not None:
try:
await self._writer
finally:
self._writer = None
self.release()
async def read(self) -> bytes:
"""Read response payload."""
if self._body is None:
try:
self._body = await self.content.read()
for trace in self._traces:
2022-09-18 13:17:20 +00:00
await trace.send_response_chunk_received(
self.method, self.url, self._body
)
2020-12-20 00:08:09 +00:00
except BaseException:
self.close()
raise
elif self._released:
2022-09-18 13:17:20 +00:00
raise ClientConnectionError("Connection closed")
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
return self._body # type: ignore[no-any-return]
2020-12-20 00:08:09 +00:00
def get_encoding(self) -> str:
2022-09-18 13:17:20 +00:00
ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
2020-12-20 00:08:09 +00:00
mimetype = helpers.parse_mimetype(ctype)
2022-09-18 13:17:20 +00:00
encoding = mimetype.parameters.get("charset")
2020-12-20 00:08:09 +00:00
if encoding:
try:
codecs.lookup(encoding)
except LookupError:
encoding = None
if not encoding:
2022-09-18 13:17:20 +00:00
if mimetype.type == "application" and (
mimetype.subtype == "json" or mimetype.subtype == "rdap"
):
2020-12-20 00:08:09 +00:00
# RFC 7159 states that the default encoding is UTF-8.
2022-09-18 13:17:20 +00:00
# RFC 7483 defines application/rdap+json
encoding = "utf-8"
elif self._body is None:
raise RuntimeError(
"Cannot guess the encoding of " "a not yet read body"
)
2020-12-20 00:08:09 +00:00
else:
2022-09-18 13:17:20 +00:00
encoding = chardet.detect(self._body)["encoding"]
2020-12-20 00:08:09 +00:00
if not encoding:
2022-09-18 13:17:20 +00:00
encoding = "utf-8"
2020-12-20 00:08:09 +00:00
return encoding
2022-09-18 13:17:20 +00:00
async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
2020-12-20 00:08:09 +00:00
"""Read response payload and decode."""
if self._body is None:
await self.read()
if encoding is None:
encoding = self.get_encoding()
2022-09-18 13:17:20 +00:00
return self._body.decode( # type: ignore[no-any-return,union-attr]
encoding, errors=errors
)
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
async def json(
self,
*,
encoding: Optional[str] = None,
loads: JSONDecoder = DEFAULT_JSON_DECODER,
content_type: Optional[str] = "application/json",
) -> Any:
2020-12-20 00:08:09 +00:00
"""Read and decodes JSON response."""
if self._body is None:
await self.read()
if content_type:
2022-09-18 13:17:20 +00:00
ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
2020-12-20 00:08:09 +00:00
if not _is_expected_content_type(ctype, content_type):
raise ContentTypeError(
self.request_info,
self.history,
2022-09-18 13:17:20 +00:00
message=(
"Attempt to decode JSON with " "unexpected mimetype: %s" % ctype
),
headers=self.headers,
)
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
stripped = self._body.strip() # type: ignore[union-attr]
2020-12-20 00:08:09 +00:00
if not stripped:
return None
if encoding is None:
encoding = self.get_encoding()
return loads(stripped.decode(encoding))
2022-09-18 13:17:20 +00:00
async def __aenter__(self) -> "ClientResponse":
2020-12-20 00:08:09 +00:00
return self
2022-09-18 13:17:20 +00:00
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
2020-12-20 00:08:09 +00:00
# similar to _RequestContextManager, we do not need to check
2022-09-18 13:17:20 +00:00
# for exceptions, response object can close connection
# if state is broken
2020-12-20 00:08:09 +00:00
self.release()