Source code for stellar_sdk.call_builder.call_builder_async.data_call_builder

from collections.abc import AsyncGenerator
from typing import Any

from ...call_builder.base import BaseDataCallBuilder
from ...call_builder.call_builder_async.base_call_builder import BaseCallBuilder
from ...client.base_async_client import BaseAsyncClient

__all__ = ["DataCallBuilder"]


[docs] class DataCallBuilder(BaseCallBuilder, BaseDataCallBuilder): """Creates a new :class:`DataCallBuilder` pointed to server defined by horizon_url. Do not create this object directly, use :func:`stellar_sdk.ServerAsync.data`. See `Retrieve an Account's Data <https://developers.stellar.org/docs/data/apis/horizon/api-reference/get-data-by-account-id>`__ for more information. :param horizon_url: Horizon server URL. :param client: The client instance used to send request. :param account_id: account id, for example: ``"GDGQVOKHW4VEJRU2TETD6DBRKEO5ERCNF353LW5WBFW3JJWQ2BRQ6KDD"`` :param data_name: Key name """ def __init__( self, horizon_url: str, client: BaseAsyncClient, account_id: str, data_name: str, ) -> None: super().__init__( horizon_url=horizon_url, client=client, account_id=account_id, data_name=data_name, )
[docs] def stream( self, ) -> AsyncGenerator[dict[str, Any], None]: """Creates an EventSource that listens for events from the `Account Data` endpoint. See `Streaming <https://developers.stellar.org/docs/data/apis/horizon/api-reference/structure/streaming>`__ for more information. """ return self._stream()