Source code for scrapy.core.downloader.handlers._httpx

"""``httpx``-based HTTP(S) download handler. Currently not recommended for production use."""

from __future__ import annotations

import ipaddress
import ssl
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, ClassVar

from scrapy.exceptions import (
    CannotResolveHostError,
    DownloadConnectionRefusedError,
    DownloadFailedError,
    DownloadTimeoutError,
    NotConfigured,
    UnsupportedURLSchemeError,
)
from scrapy.http import Headers
from scrapy.utils._download_handlers import NullCookieJar
from scrapy.utils.ssl import (
    _log_sslobj_debug_info,
    _make_insecure_ssl_ctx,
    _make_ssl_context,
)

from ._base_streaming import BaseStreamingDownloadHandler, _BaseResponseArgs

if TYPE_CHECKING:
    from collections.abc import AsyncIterator

    from httpcore import AsyncNetworkStream

    from scrapy import Request
    from scrapy.crawler import Crawler


try:
    import httpx
except ImportError:
    httpx = None  # type: ignore[assignment]


if TYPE_CHECKING:
    _Base = BaseStreamingDownloadHandler[httpx.Response]
else:
    _Base = BaseStreamingDownloadHandler


[docs] class HttpxDownloadHandler(_Base): experimental: ClassVar[bool] = True def __init__(self, crawler: Crawler): super().__init__(crawler) self._verify_certificates: bool = crawler.settings.getbool( "DOWNLOAD_VERIFY_CERTIFICATES" ) self._ssl_context: ssl.SSLContext = _make_ssl_context(crawler.settings) self._bind_host: str | None = self._get_bind_address_host() self._limits: httpx.Limits = httpx.Limits( # hard limit on simultaneous connections max_connections=self._pool_size_total, # total number of idle connections in the pool (extra ones are closed) max_keepalive_connections=self._pool_size_total, ) self._default_client: httpx.AsyncClient = self._make_client() # httpx doesn't support per-request proxies: https://github.com/encode/httpx/discussions/3183, # so we keep a pool of clients per proxy URL. LRU eviction can be added here if needed. self._proxy_clients: dict[str, httpx.AsyncClient] = {} @staticmethod def _check_deps_installed() -> None: if httpx is None: # pragma: no cover raise NotConfigured( "HttpxDownloadHandler requires the httpx library to be installed." ) def _make_client(self, proxy_url: str | None = None) -> httpx.AsyncClient: if proxy_url: if proxy_url.startswith("https:") and not self._verify_certificates: proxy_ssl_context = _make_insecure_ssl_ctx() else: proxy_ssl_context = None proxy = httpx.Proxy(proxy_url, ssl_context=proxy_ssl_context) else: proxy = None client = httpx.AsyncClient( cookies=NullCookieJar(), transport=httpx.AsyncHTTPTransport( verify=self._ssl_context, local_address=self._bind_host, limits=self._limits, trust_env=False, proxy=proxy, ), ) # https://github.com/encode/httpx/discussions/1566 for header_name in ("accept", "accept-encoding", "user-agent"): client.headers.pop(header_name, None) return client def _get_client(self, proxy_url: str | None) -> httpx.AsyncClient: if proxy_url is None: return self._default_client if cached := self._proxy_clients.get(proxy_url): return cached client = self._make_client(proxy_url) self._proxy_clients[proxy_url] = client return client @asynccontextmanager async def _make_request( self, request: Request, timeout: float ) -> AsyncIterator[httpx.Response]: client = self._get_client(self._extract_proxy_url_with_creds(request)) try: async with client.stream( request.method, request.url, content=request.body, headers=request.headers.to_tuple_list(), timeout=timeout, ) as response: yield response except httpx.TimeoutException as e: raise DownloadTimeoutError( f"Getting {request.url} took longer than {timeout} seconds." ) from e except httpx.UnsupportedProtocol as e: raise UnsupportedURLSchemeError(str(e)) from e except httpx.ConnectError as e: error_message = str(e) if ( "Name or service not known" in error_message or "getaddrinfo failed" in error_message or "nodename nor servname" in error_message or "Temporary failure in name resolution" in error_message ): raise CannotResolveHostError(error_message) from e raise DownloadConnectionRefusedError(str(e)) from e except httpx.ProxyError as e: raise DownloadConnectionRefusedError(str(e)) from e except (httpx.NetworkError, httpx.RemoteProtocolError) as e: raise DownloadFailedError(str(e)) from e @staticmethod def _extract_headers(response: httpx.Response) -> Headers: return Headers(response.headers.multi_items()) @staticmethod def _build_base_response_args( response: httpx.Response, request: Request, headers: Headers, ) -> _BaseResponseArgs: network_stream: AsyncNetworkStream = response.extensions["network_stream"] server_addr = network_stream.get_extra_info("server_addr") ip_address = ipaddress.ip_address(server_addr[0]) ssl_object = network_stream.get_extra_info("ssl_object") if isinstance(ssl_object, ssl.SSLObject): cert = ssl_object.getpeercert(binary_form=True) else: cert = None return { "status": response.status_code, "url": request.url, "headers": headers, "certificate": cert, "ip_address": ip_address, "protocol": response.http_version, } @staticmethod def _iter_body_chunks(response: httpx.Response) -> AsyncIterator[bytes]: return response.aiter_raw() @staticmethod def _is_dataloss_exception(exc: Exception) -> bool: return isinstance( exc, httpx.RemoteProtocolError ) and "peer closed connection without sending complete message body" in str(exc) def _log_tls_info(self, response: httpx.Response, request: Request) -> None: network_stream: AsyncNetworkStream = response.extensions["network_stream"] extra_ssl_object = network_stream.get_extra_info("ssl_object") if isinstance(extra_ssl_object, ssl.SSLObject): _log_sslobj_debug_info(extra_ssl_object) async def close(self) -> None: await self._default_client.aclose() for client in self._proxy_clients.values(): await client.aclose()