Source code for stellar_sdk.client.requests_client

import json
import logging
import time
from collections.abc import Generator
from typing import Any

import requests
from requests import RequestException, Session
from requests.adapters import DEFAULT_POOLSIZE, HTTPAdapter
from requests_sse import EventSource
from urllib3.exceptions import NewConnectionError
from urllib3.util import Retry

from ..__version__ import __version__
from ..client.base_sync_client import BaseSyncClient
from ..client.response import Response
from ..exceptions import (
    ConnectionError,
    ContentSizeLimitExceededError,
    StreamClientError,
)
from . import defines

DEFAULT_NUM_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.5
USER_AGENT = f"py-stellar-base/{__version__}/RequestsClient"
IDENTIFICATION_HEADERS = {
    "X-Client-Name": "py-stellar-base",
    "X-Client-Version": __version__,
}

logger = logging.getLogger(__name__)

__all__ = ["RequestsClient"]


[docs] class RequestsClient(BaseSyncClient): """The :class:`RequestsClient` object is a synchronous http client, which represents the interface for making requests to a server instance. :param pool_size: persistent connection to Horizon and connection pool :param num_retries: configurable request retry functionality :param request_timeout: the timeout for all GET requests (for each retry) :param post_timeout: the timeout for all POST requests (for each retry) :param backoff_factor: a backoff factor to apply between attempts after the second try :param session: the request session :param stream_session: the stream request session :param custom_headers: any additional HTTP headers to add in requests """ def __init__( self, pool_size: int = DEFAULT_POOLSIZE, num_retries: int = DEFAULT_NUM_RETRIES, request_timeout: int = defines.DEFAULT_GET_TIMEOUT_SECONDS, post_timeout: float = defines.DEFAULT_POST_TIMEOUT_SECONDS, backoff_factor: float = DEFAULT_BACKOFF_FACTOR, session: Session | None = None, stream_session: Session | None = None, custom_headers: dict[str, str] | None = None, ): self.pool_size: int = pool_size self.num_retries: int = num_retries self.request_timeout: int = request_timeout self.post_timeout: float = post_timeout self.backoff_factor: float = backoff_factor # adding 504 to the tuple of statuses to retry self.status_forcelist = (*Retry.RETRY_AFTER_STATUS_CODES, 504) # configure standard session # configure retry handler retry = Retry( total=self.num_retries, backoff_factor=self.backoff_factor, redirect=0, status_forcelist=self.status_forcelist, allowed_methods=frozenset(["GET", "POST"]), raise_on_status=False, ) # init transport adapter adapter = HTTPAdapter( pool_connections=self.pool_size, pool_maxsize=self.pool_size, max_retries=retry, ) self.headers = IDENTIFICATION_HEADERS | {"User-Agent": USER_AGENT} if custom_headers: self.headers = self.headers | custom_headers # init session if session is None: session = requests.Session() # set default headers session.headers.update(self.headers) session.mount("http://", adapter) session.mount("https://", adapter) self._session: Session = session self._stream_session: Session | None = stream_session
[docs] def get( self, url: str, params: dict[str, str] | None = None, max_content_size: int | None = None, ) -> Response: """Perform HTTP GET request. :param url: the request url :param params: the request params :param max_content_size: the maximum allowed response content size in bytes. If the response exceeds this limit, a :exc:`ContentSizeLimitExceededError` is raised. If None, no limit is applied. :return: the response from server :raise: :exc:`ConnectionError <stellar_sdk.exceptions.ConnectionError>` :raise: :exc:`ContentSizeLimitExceededError <stellar_sdk.exceptions.ContentSizeLimitExceededError>` """ try: resp = self._session.get( url, params=params, timeout=self.request_timeout, stream=max_content_size is not None, ) if max_content_size is not None: text = self._read_with_limit(resp, max_content_size) else: text = resp.text except (RequestException, NewConnectionError) as err: raise ConnectionError(err) from err return Response( status_code=resp.status_code, text=text, headers=dict(resp.headers), url=resp.url, )
def _read_with_limit(self, resp: requests.Response, max_content_size: int) -> str: """Read response content with size limit using streaming.""" chunks = [] total_size = 0 try: for chunk in resp.iter_content(chunk_size=8192, decode_unicode=False): total_size += len(chunk) if total_size > max_content_size: raise ContentSizeLimitExceededError( limit=max_content_size, content_size=total_size ) chunks.append(chunk) finally: resp.close() content = b"".join(chunks) encoding = resp.encoding or "utf-8" return content.decode(encoding)
[docs] def post( self, url: str, data: dict[str, str] | None = None, json_data: dict[str, Any] | None = None, ) -> Response: """Perform HTTP POST request. :param url: the request url :param data: the data send to server :param json_data: the json data send to server :return: the response from server :raise: :exc:`ConnectionError <stellar_sdk.exceptions.ConnectionError>` """ try: resp = self._session.post( url, data=data, json=json_data, timeout=self.post_timeout ) except (RequestException, NewConnectionError) as err: raise ConnectionError(err) from err return Response( status_code=resp.status_code, text=resp.text, headers=dict(resp.headers), url=resp.url, )
[docs] def stream( self, url: str, params: dict[str, str] | None = None ) -> Generator[dict[str, Any], None, None]: """Creates an EventSource that listens for incoming messages from the server. See `Horizon Response Format <https://developers.stellar.org/docs/data/apis/horizon/api-reference/structure/response-format>`__ See `MDN EventSource <https://developer.mozilla.org/en-US/docs/Web/API/EventSource>`_ :param url: the request url :param params: the request params :return: a Generator for server response :raise: :exc:`StreamClientError <stellar_sdk.exceptions.StreamClientError>` """ query_params = params.copy() if params else {} query_params |= IDENTIFICATION_HEADERS retry = 0.1 while True: try: """ Create a new SSEClient: Using the last id as the cursor """ with EventSource( url, timeout=60, params=query_params, headers=self.headers, ) as client: for event in client: if event.last_event_id: query_params["cursor"] = event.last_event_id # Events that dont have an id are not useful for us (hello/byebye events) retry = client._reconnection_time.total_seconds() try: data = event.data if ( data is not None and data != '"hello"' and data != '"byebye"' ): yield json.loads(data) except json.JSONDecodeError: # Content was not json-decodable pass except requests.Timeout: logger.warning( f"We have encountered an timeout error and we will try to reconnect, cursor = {query_params.get('cursor')}" ) time.sleep(retry) except requests.RequestException as e: raise StreamClientError( query_params["cursor"], "Failed to get stream message." ) from e
[docs] def close(self) -> None: """Close underlying connector. Release all acquired resources. """ self._session.close() if self._stream_session: self._stream_session.close()
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __repr__(self): return ( f"<RequestsClient [pool_size={self.pool_size}, " f"num_retries={self.num_retries}, " f"request_timeout={self.request_timeout}, " f"post_timeout={self.post_timeout}, " f"backoff_factor={self.backoff_factor}, " f"session={self._session}, " f"stream_session={self.backoff_factor}]>" )