Source code for stellar_sdk.xdr.stellar_message

# This is an automatically generated file.
# DO NOT EDIT or your changes may be overwritten
from __future__ import annotations

import base64
from typing import List

from xdrlib3 import Packer, Unpacker

from .auth import Auth
from .dont_have import DontHave
from .error import Error
from .flood_advert import FloodAdvert
from .flood_demand import FloodDemand
from .generalized_transaction_set import GeneralizedTransactionSet
from .hello import Hello
from .message_type import MessageType
from .peer_address import PeerAddress
from .scp_envelope import SCPEnvelope
from .scp_quorum_set import SCPQuorumSet
from .send_more import SendMore
from .send_more_extended import SendMoreExtended
from .signed_survey_request_message import SignedSurveyRequestMessage
from .signed_survey_response_message import SignedSurveyResponseMessage
from .transaction_envelope import TransactionEnvelope
from .transaction_set import TransactionSet
from .uint32 import Uint32
from .uint256 import Uint256

__all__ = ["StellarMessage"]


[docs] class StellarMessage: """ XDR Source Code:: union StellarMessage switch (MessageType type) { case ERROR_MSG: Error error; case HELLO: Hello hello; case AUTH: Auth auth; case DONT_HAVE: DontHave dontHave; case GET_PEERS: void; case PEERS: PeerAddress peers<100>; case GET_TX_SET: uint256 txSetHash; case TX_SET: TransactionSet txSet; case GENERALIZED_TX_SET: GeneralizedTransactionSet generalizedTxSet; case TRANSACTION: TransactionEnvelope transaction; case SURVEY_REQUEST: SignedSurveyRequestMessage signedSurveyRequestMessage; case SURVEY_RESPONSE: SignedSurveyResponseMessage signedSurveyResponseMessage; // SCP case GET_SCP_QUORUMSET: uint256 qSetHash; case SCP_QUORUMSET: SCPQuorumSet qSet; case SCP_MESSAGE: SCPEnvelope envelope; case GET_SCP_STATE: uint32 getSCPLedgerSeq; // ledger seq requested ; if 0, requests the latest case SEND_MORE: SendMore sendMoreMessage; case SEND_MORE_EXTENDED: SendMoreExtended sendMoreExtendedMessage; // Pull mode case FLOOD_ADVERT: FloodAdvert floodAdvert; case FLOOD_DEMAND: FloodDemand floodDemand; }; """ def __init__( self, type: MessageType, error: Error = None, hello: Hello = None, auth: Auth = None, dont_have: DontHave = None, peers: List[PeerAddress] = None, tx_set_hash: Uint256 = None, tx_set: TransactionSet = None, generalized_tx_set: GeneralizedTransactionSet = None, transaction: TransactionEnvelope = None, signed_survey_request_message: SignedSurveyRequestMessage = None, signed_survey_response_message: SignedSurveyResponseMessage = None, q_set_hash: Uint256 = None, q_set: SCPQuorumSet = None, envelope: SCPEnvelope = None, get_scp_ledger_seq: Uint32 = None, send_more_message: SendMore = None, send_more_extended_message: SendMoreExtended = None, flood_advert: FloodAdvert = None, flood_demand: FloodDemand = None, ) -> None: _expect_max_length = 100 if peers and len(peers) > _expect_max_length: raise ValueError( f"The maximum length of `peers` should be {_expect_max_length}, but got {len(peers)}." ) self.type = type self.error = error self.hello = hello self.auth = auth self.dont_have = dont_have self.peers = peers self.tx_set_hash = tx_set_hash self.tx_set = tx_set self.generalized_tx_set = generalized_tx_set self.transaction = transaction self.signed_survey_request_message = signed_survey_request_message self.signed_survey_response_message = signed_survey_response_message self.q_set_hash = q_set_hash self.q_set = q_set self.envelope = envelope self.get_scp_ledger_seq = get_scp_ledger_seq self.send_more_message = send_more_message self.send_more_extended_message = send_more_extended_message self.flood_advert = flood_advert self.flood_demand = flood_demand def pack(self, packer: Packer) -> None: self.type.pack(packer) if self.type == MessageType.ERROR_MSG: if self.error is None: raise ValueError("error should not be None.") self.error.pack(packer) return if self.type == MessageType.HELLO: if self.hello is None: raise ValueError("hello should not be None.") self.hello.pack(packer) return if self.type == MessageType.AUTH: if self.auth is None: raise ValueError("auth should not be None.") self.auth.pack(packer) return if self.type == MessageType.DONT_HAVE: if self.dont_have is None: raise ValueError("dont_have should not be None.") self.dont_have.pack(packer) return if self.type == MessageType.GET_PEERS: return if self.type == MessageType.PEERS: if self.peers is None: raise ValueError("peers should not be None.") packer.pack_uint(len(self.peers)) for peers_item in self.peers: peers_item.pack(packer) return if self.type == MessageType.GET_TX_SET: if self.tx_set_hash is None: raise ValueError("tx_set_hash should not be None.") self.tx_set_hash.pack(packer) return if self.type == MessageType.TX_SET: if self.tx_set is None: raise ValueError("tx_set should not be None.") self.tx_set.pack(packer) return if self.type == MessageType.GENERALIZED_TX_SET: if self.generalized_tx_set is None: raise ValueError("generalized_tx_set should not be None.") self.generalized_tx_set.pack(packer) return if self.type == MessageType.TRANSACTION: if self.transaction is None: raise ValueError("transaction should not be None.") self.transaction.pack(packer) return if self.type == MessageType.SURVEY_REQUEST: if self.signed_survey_request_message is None: raise ValueError("signed_survey_request_message should not be None.") self.signed_survey_request_message.pack(packer) return if self.type == MessageType.SURVEY_RESPONSE: if self.signed_survey_response_message is None: raise ValueError("signed_survey_response_message should not be None.") self.signed_survey_response_message.pack(packer) return if self.type == MessageType.GET_SCP_QUORUMSET: if self.q_set_hash is None: raise ValueError("q_set_hash should not be None.") self.q_set_hash.pack(packer) return if self.type == MessageType.SCP_QUORUMSET: if self.q_set is None: raise ValueError("q_set should not be None.") self.q_set.pack(packer) return if self.type == MessageType.SCP_MESSAGE: if self.envelope is None: raise ValueError("envelope should not be None.") self.envelope.pack(packer) return if self.type == MessageType.GET_SCP_STATE: if self.get_scp_ledger_seq is None: raise ValueError("get_scp_ledger_seq should not be None.") self.get_scp_ledger_seq.pack(packer) return if self.type == MessageType.SEND_MORE: if self.send_more_message is None: raise ValueError("send_more_message should not be None.") self.send_more_message.pack(packer) return if self.type == MessageType.SEND_MORE_EXTENDED: if self.send_more_extended_message is None: raise ValueError("send_more_extended_message should not be None.") self.send_more_extended_message.pack(packer) return if self.type == MessageType.FLOOD_ADVERT: if self.flood_advert is None: raise ValueError("flood_advert should not be None.") self.flood_advert.pack(packer) return if self.type == MessageType.FLOOD_DEMAND: if self.flood_demand is None: raise ValueError("flood_demand should not be None.") self.flood_demand.pack(packer) return @classmethod def unpack(cls, unpacker: Unpacker) -> StellarMessage: type = MessageType.unpack(unpacker) if type == MessageType.ERROR_MSG: error = Error.unpack(unpacker) return cls(type=type, error=error) if type == MessageType.HELLO: hello = Hello.unpack(unpacker) return cls(type=type, hello=hello) if type == MessageType.AUTH: auth = Auth.unpack(unpacker) return cls(type=type, auth=auth) if type == MessageType.DONT_HAVE: dont_have = DontHave.unpack(unpacker) return cls(type=type, dont_have=dont_have) if type == MessageType.GET_PEERS: return cls(type=type) if type == MessageType.PEERS: length = unpacker.unpack_uint() peers = [] for _ in range(length): peers.append(PeerAddress.unpack(unpacker)) return cls(type=type, peers=peers) if type == MessageType.GET_TX_SET: tx_set_hash = Uint256.unpack(unpacker) return cls(type=type, tx_set_hash=tx_set_hash) if type == MessageType.TX_SET: tx_set = TransactionSet.unpack(unpacker) return cls(type=type, tx_set=tx_set) if type == MessageType.GENERALIZED_TX_SET: generalized_tx_set = GeneralizedTransactionSet.unpack(unpacker) return cls(type=type, generalized_tx_set=generalized_tx_set) if type == MessageType.TRANSACTION: transaction = TransactionEnvelope.unpack(unpacker) return cls(type=type, transaction=transaction) if type == MessageType.SURVEY_REQUEST: signed_survey_request_message = SignedSurveyRequestMessage.unpack(unpacker) return cls( type=type, signed_survey_request_message=signed_survey_request_message ) if type == MessageType.SURVEY_RESPONSE: signed_survey_response_message = SignedSurveyResponseMessage.unpack( unpacker ) return cls( type=type, signed_survey_response_message=signed_survey_response_message ) if type == MessageType.GET_SCP_QUORUMSET: q_set_hash = Uint256.unpack(unpacker) return cls(type=type, q_set_hash=q_set_hash) if type == MessageType.SCP_QUORUMSET: q_set = SCPQuorumSet.unpack(unpacker) return cls(type=type, q_set=q_set) if type == MessageType.SCP_MESSAGE: envelope = SCPEnvelope.unpack(unpacker) return cls(type=type, envelope=envelope) if type == MessageType.GET_SCP_STATE: get_scp_ledger_seq = Uint32.unpack(unpacker) return cls(type=type, get_scp_ledger_seq=get_scp_ledger_seq) if type == MessageType.SEND_MORE: send_more_message = SendMore.unpack(unpacker) return cls(type=type, send_more_message=send_more_message) if type == MessageType.SEND_MORE_EXTENDED: send_more_extended_message = SendMoreExtended.unpack(unpacker) return cls(type=type, send_more_extended_message=send_more_extended_message) if type == MessageType.FLOOD_ADVERT: flood_advert = FloodAdvert.unpack(unpacker) return cls(type=type, flood_advert=flood_advert) if type == MessageType.FLOOD_DEMAND: flood_demand = FloodDemand.unpack(unpacker) return cls(type=type, flood_demand=flood_demand) return cls(type=type) def to_xdr_bytes(self) -> bytes: packer = Packer() self.pack(packer) return packer.get_buffer() @classmethod def from_xdr_bytes(cls, xdr: bytes) -> StellarMessage: unpacker = Unpacker(xdr) return cls.unpack(unpacker) def to_xdr(self) -> str: xdr_bytes = self.to_xdr_bytes() return base64.b64encode(xdr_bytes).decode() @classmethod def from_xdr(cls, xdr: str) -> StellarMessage: xdr_bytes = base64.b64decode(xdr.encode()) return cls.from_xdr_bytes(xdr_bytes) def __hash__(self): return hash( ( self.type, self.error, self.hello, self.auth, self.dont_have, self.peers, self.tx_set_hash, self.tx_set, self.generalized_tx_set, self.transaction, self.signed_survey_request_message, self.signed_survey_response_message, self.q_set_hash, self.q_set, self.envelope, self.get_scp_ledger_seq, self.send_more_message, self.send_more_extended_message, self.flood_advert, self.flood_demand, ) ) def __eq__(self, other: object): if not isinstance(other, self.__class__): return NotImplemented return ( self.type == other.type and self.error == other.error and self.hello == other.hello and self.auth == other.auth and self.dont_have == other.dont_have and self.peers == other.peers and self.tx_set_hash == other.tx_set_hash and self.tx_set == other.tx_set and self.generalized_tx_set == other.generalized_tx_set and self.transaction == other.transaction and self.signed_survey_request_message == other.signed_survey_request_message and self.signed_survey_response_message == other.signed_survey_response_message and self.q_set_hash == other.q_set_hash and self.q_set == other.q_set and self.envelope == other.envelope and self.get_scp_ledger_seq == other.get_scp_ledger_seq and self.send_more_message == other.send_more_message and self.send_more_extended_message == other.send_more_extended_message and self.flood_advert == other.flood_advert and self.flood_demand == other.flood_demand ) def __str__(self): out = [] out.append(f"type={self.type}") out.append(f"error={self.error}") if self.error is not None else None out.append(f"hello={self.hello}") if self.hello is not None else None out.append(f"auth={self.auth}") if self.auth is not None else None out.append( f"dont_have={self.dont_have}" ) if self.dont_have is not None else None out.append(f"peers={self.peers}") if self.peers is not None else None out.append( f"tx_set_hash={self.tx_set_hash}" ) if self.tx_set_hash is not None else None out.append(f"tx_set={self.tx_set}") if self.tx_set is not None else None out.append( f"generalized_tx_set={self.generalized_tx_set}" ) if self.generalized_tx_set is not None else None out.append( f"transaction={self.transaction}" ) if self.transaction is not None else None out.append( f"signed_survey_request_message={self.signed_survey_request_message}" ) if self.signed_survey_request_message is not None else None out.append( f"signed_survey_response_message={self.signed_survey_response_message}" ) if self.signed_survey_response_message is not None else None out.append( f"q_set_hash={self.q_set_hash}" ) if self.q_set_hash is not None else None out.append(f"q_set={self.q_set}") if self.q_set is not None else None out.append(f"envelope={self.envelope}") if self.envelope is not None else None out.append( f"get_scp_ledger_seq={self.get_scp_ledger_seq}" ) if self.get_scp_ledger_seq is not None else None out.append( f"send_more_message={self.send_more_message}" ) if self.send_more_message is not None else None out.append( f"send_more_extended_message={self.send_more_extended_message}" ) if self.send_more_extended_message is not None else None out.append( f"flood_advert={self.flood_advert}" ) if self.flood_advert is not None else None out.append( f"flood_demand={self.flood_demand}" ) if self.flood_demand is not None else None return f"<StellarMessage [{', '.join(out)}]>"