Source code for stellar_sdk.sep.txrep

"""
SEP: 0011
Title: Txrep: human-readable low-level representation of Stellar transactions
Author: David Mazières
Status: Active
Created: 2018-08-31
"""

import json
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Dict, List, Optional, Union

from .. import xdr as stellar_xdr
from ..asset import Asset
from ..exceptions import ValueError as SdkValueError
from ..fee_bump_transaction import FeeBumpTransaction
from ..fee_bump_transaction_envelope import FeeBumpTransactionEnvelope
from ..memo import *
from ..muxed_account import MuxedAccount
from ..operation import *
from ..operation.create_claimable_balance import ClaimPredicateType
from ..operation.revoke_sponsorship import RevokeSponsorshipType
from ..price import Price
from ..signer import Signer
from ..signer_key import SignerKey
from ..strkey import StrKey
from ..time_bounds import TimeBounds
from ..transaction import Transaction
from ..transaction_envelope import TransactionEnvelope

__all__ = ["to_txrep", "from_txrep"]

_true = "true"
_false = "false"


class _EnvelopeType(Enum):
    ENVELOPE_TYPE_TX_V0 = "ENVELOPE_TYPE_TX_V0"
    ENVELOPE_TYPE_TX = "ENVELOPE_TYPE_TX"
    ENVELOPE_TYPE_TX_FEE_BUMP = "ENVELOPE_TYPE_TX_FEE_BUMP"


[docs]def to_txrep( transaction_envelope: Union[TransactionEnvelope, FeeBumpTransactionEnvelope], ) -> str: """Generate a human-readable format for Stellar transactions. MuxAccount is currently not supported. Txrep is a human-readable representation of Stellar transactions that functions like an assembly language for XDR. See `SEP-0011 <https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0011.md>`_ :param transaction_envelope: Transaction envelope object. :return: A human-readable format for Stellar transactions. """ is_fee_bump = isinstance(transaction_envelope, FeeBumpTransactionEnvelope) tx_type = _EnvelopeType.ENVELOPE_TYPE_TX_FEE_BUMP.value if not is_fee_bump: assert isinstance(transaction_envelope, TransactionEnvelope) if transaction_envelope.transaction.v1: tx_type = _EnvelopeType.ENVELOPE_TYPE_TX.value else: tx_type = _EnvelopeType.ENVELOPE_TYPE_TX_V0.value prefix = "feeBump.tx.innerTx.tx." if is_fee_bump else "tx." transaction = transaction_envelope.transaction if is_fee_bump: assert isinstance(transaction_envelope, FeeBumpTransactionEnvelope) fee_bump_transaction_envelope = transaction_envelope fee_bump_transaction = fee_bump_transaction_envelope.transaction transaction_envelope = fee_bump_transaction.inner_transaction_envelope transaction = transaction_envelope.transaction lines: List[str] = [] _add_line("type", tx_type, lines) if is_fee_bump: assert isinstance(fee_bump_transaction, FeeBumpTransaction) assert isinstance(transaction, Transaction) _add_line( "feeBump.tx.feeSource", _to_muxed_account(fee_bump_transaction.fee_source), lines, comment=_to_muxed_account_comment(fee_bump_transaction.fee_source), ) _add_line( "feeBump.tx.fee", fee_bump_transaction.base_fee * (len(transaction.operations) + 1), lines, ) _add_line( "feeBump.tx.innerTx.type", _EnvelopeType.ENVELOPE_TYPE_TX.value, lines ) assert isinstance(transaction, Transaction) _add_line( f"{prefix}sourceAccount", _to_muxed_account(transaction.source), lines, comment=_to_muxed_account_comment(transaction.source), ) _add_line(f"{prefix}fee", transaction.fee, lines) _add_line(f"{prefix}seqNum", transaction.sequence, lines) _add_time_bounds(transaction.time_bounds, prefix, lines) _add_memo(transaction.memo, prefix, lines) _add_operations(transaction.operations, prefix, lines) _add_line(f"{prefix}ext.v", 0, lines) _add_signatures( transaction_envelope.signatures, "feeBump.tx.innerTx." if is_fee_bump else "", lines, ) if is_fee_bump: _add_line("feeBump.tx.ext.v", 0, lines) _add_signatures(fee_bump_transaction_envelope.signatures, "feeBump.", lines) return "\n".join(lines)
# Setting to ignore pass in .coveragerc will cause this function to not be counted by pytest.
[docs]def from_txrep( txrep: str, network_passphrase: str ) -> Union[TransactionEnvelope, FeeBumpTransactionEnvelope]: """Parse txrep and generate transaction envelope object. MuxAccount is currently not supported. Txrep is a human-readable representation of Stellar transactions that functions like an assembly language for XDR. See `SEP-0011 <https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0011.md>`_ :param txrep: a human-readable format for Stellar transactions. :param network_passphrase: The network to connect, you do not need to set this value at this time, it is reserved for future use. :return: A human-readable format for Stellar transactions. """ raw_data_map = _get_raw_data_map(txrep) tx_type = _EnvelopeType(_get_value(raw_data_map, "type")) is_fee_bump = True if tx_type == _EnvelopeType.ENVELOPE_TYPE_TX_FEE_BUMP else False prefix = "feeBump.tx.innerTx.tx." if is_fee_bump else "tx." source = _get_value(raw_data_map, f"{prefix}sourceAccount") fee = _get_int_value(raw_data_map, f"{prefix}fee") sequence = _get_int_value(raw_data_map, f"{prefix}seqNum") time_bounds = _get_time_bounds(raw_data_map, prefix) memo = _get_memo(raw_data_map, prefix) operations = _get_operations(raw_data_map, prefix) prefix = "feeBump.tx.innerTx." if is_fee_bump else "" transaction_signatures = _get_signatures(raw_data_map, prefix) v1 = ( False if not is_fee_bump and tx_type == _EnvelopeType.ENVELOPE_TYPE_TX_V0 else True ) transaction = Transaction( source=source, sequence=sequence, fee=fee, operations=operations, memo=memo, time_bounds=time_bounds, v1=v1, ) transaction_envelope = TransactionEnvelope( transaction=transaction, signatures=transaction_signatures, network_passphrase=network_passphrase, ) if is_fee_bump: fee_bump_fee_source = _get_value(raw_data_map, "feeBump.tx.feeSource") fee_bump_fee = _get_int_value(raw_data_map, "feeBump.tx.fee") fee_bump_base_fee = int(fee_bump_fee / (len(operations) + 1)) fee_bump_transaction_signatures = _get_signatures(raw_data_map, "feeBump.") fee_bump_transaction = FeeBumpTransaction( fee_source=fee_bump_fee_source, base_fee=fee_bump_base_fee, inner_transaction_envelope=transaction_envelope, ) fee_bump_transaction_envelope = FeeBumpTransactionEnvelope( transaction=fee_bump_transaction, signatures=fee_bump_transaction_signatures, network_passphrase=network_passphrase, ) return fee_bump_transaction_envelope return transaction_envelope
def _to_muxed_account(account: MuxedAccount) -> str: if account.account_muxed_id is None: return account.account_id assert account.account_muxed is not None return account.account_muxed def _get_operations(raw_data_map: Dict[str, str], prefix: str) -> List[Operation]: operations = [] operation_length = _get_int_value(raw_data_map, f"{prefix}operations.len") for i in range(operation_length): operation = _get_operation(i, raw_data_map, prefix) operations.append(operation) return operations def _get_raw_data_map(txrep: str) -> Dict[str, str]: lines = txrep.strip().split("\n") raw_data_map = {} for line in lines: if line.startswith(":") or len(line.strip()) == 0: # remove full-line comment and blank line continue parts = line.split(":", 1) if len(parts) == 2: key = parts[0].strip() value = _remove_comment(parts[1]) raw_data_map[key] = value return raw_data_map def _get_time_bounds(raw_data_map: Dict[str, str], prefix: str) -> Optional[TimeBounds]: time_bounds_present = _get_bool_value(raw_data_map, f"{prefix}timeBounds._present") time_bounds = None if time_bounds_present: min_time = _get_int_value(raw_data_map, f"{prefix}timeBounds.minTime") max_time = _get_int_value(raw_data_map, f"{prefix}timeBounds.maxTime") time_bounds = TimeBounds(min_time=min_time, max_time=max_time) return time_bounds def _get_memo(raw_data_map: Dict[str, str], prefix: str) -> Memo: memo_type = _get_value(raw_data_map, f"{prefix}memo.type") if memo_type == "MEMO_TEXT": return TextMemo(_get_bytes_value(raw_data_map, f"{prefix}memo.text")) elif memo_type == "MEMO_ID": return IdMemo(_get_int_value(raw_data_map, f"{prefix}memo.id")) elif memo_type == "MEMO_HASH": return HashMemo(_get_bytes_value(raw_data_map, f"{prefix}memo.hash")) elif memo_type == "MEMO_RETURN": return ReturnHashMemo(_get_bytes_value(raw_data_map, f"{prefix}memo.retHash")) elif memo_type == "MEMO_NONE": return NoneMemo() else: raise SdkValueError( f"`{memo_type}` is not a valid memo type, expected one of `MEMO_TEXT`, `MEMO_ID`, " f"`MEMO_HASH`, `MEMO_RETURN`, `MEMO_NONE`." ) def _remove_comment(value: str) -> str: value = value.strip() if len(value) == 0: return value if value[0] == '"': return _remove_string_value_comment(value) return _remove_non_string_value_comment(value) def _remove_non_string_value_comment(value: str) -> str: parts = value.split(" ") return parts[0] def _remove_string_value_comment(value: str) -> str: v = '"' in_escape_sequence = False for char in value[1:]: if in_escape_sequence: if char == "n": v += "\n" else: v += char in_escape_sequence = False elif char == "\\": in_escape_sequence = True elif char == '"': v += char break else: v += char return v def _get_signature( index: int, raw_data_map: Dict[str, str], prefix: str ) -> stellar_xdr.DecoratedSignature: hint = _get_bytes_value(raw_data_map, f"{prefix}signatures[{index}].hint") signature = _get_bytes_value(raw_data_map, f"{prefix}signatures[{index}].signature") return stellar_xdr.DecoratedSignature( stellar_xdr.SignatureHint(hint), stellar_xdr.Signature(signature) ) def _get_signatures( raw_data_map: Dict[str, str], prefix: str ) -> List[stellar_xdr.DecoratedSignature]: signatures: List[stellar_xdr.DecoratedSignature] = [] signature_length = _get_int_value(raw_data_map, f"{prefix}signatures.len") for i in range(signature_length): signature = _get_signature(i, raw_data_map, prefix) signatures.append(signature) return signatures def _get_operation(index, raw_data_map, tx_prefix): prefix = f"{tx_prefix}operations[{index}].body." source_account_id = None if _get_bool_value( raw_data_map, f"{tx_prefix}operations[{index}].sourceAccount._present" ): source_account_id = _get_value( raw_data_map, f"{tx_prefix}operations[{index}].sourceAccount" ) operation_type = _get_value(raw_data_map, f"{prefix}type") if operation_type == _to_caps_with_under(AccountMerge.__name__): return _get_account_merge_op(source_account_id, tx_prefix, raw_data_map, index) elif operation_type == _to_caps_with_under(AllowTrust.__name__): operation_prefix = prefix + "allowTrustOp." return _get_allow_trust_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(BeginSponsoringFutureReserves.__name__): operation_prefix = prefix + "beginSponsoringFutureReservesOp." return _get_begin_sponsoring_future_reserves_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(BumpSequence.__name__): operation_prefix = prefix + "bumpSequenceOp." return _get_bump_sequence_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(ChangeTrust.__name__): operation_prefix = prefix + "changeTrustOp." return _get_change_trust_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(ClaimClaimableBalance.__name__): operation_prefix = prefix + "claimClaimableBalanceOp." return _get_claim_claimable_balance_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(Clawback.__name__): operation_prefix = prefix + "clawbackOp." return _get_clawback_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(ClawbackClaimableBalance.__name__): operation_prefix = prefix + "clawbackClaimableBalanceOp." return _get_clawback_claimable_balance_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(CreateAccount.__name__): operation_prefix = prefix + "createAccountOp." return _get_create_account_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(CreateClaimableBalance.__name__): operation_prefix = prefix + "createClaimableBalanceOp." return _get_create_claimable_balance_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(CreatePassiveSellOffer.__name__): operation_prefix = prefix + "createPassiveSellOfferOp." return _get_create_passive_sell_offer_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(EndSponsoringFutureReserves.__name__): return _get_end_sponsoring_future_reserves_op(source_account_id) elif operation_type == _to_caps_with_under(Inflation.__name__): return _get_inflation_op(source_account_id) elif operation_type == _to_caps_with_under(ManageBuyOffer.__name__): operation_prefix = prefix + "manageBuyOfferOp." return _get_manage_buy_offer_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(ManageData.__name__): operation_prefix = prefix + "manageDataOp." return _get_manage_data_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(ManageSellOffer.__name__): operation_prefix = prefix + "manageSellOfferOp." return _get_manage_sell_offer_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(PathPaymentStrictReceive.__name__): operation_prefix = prefix + "pathPaymentStrictReceiveOp." return _get_path_payment_strict_receive_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(PathPaymentStrictSend.__name__): operation_prefix = prefix + "pathPaymentStrictSendOp." return _get_path_payment_strict_send_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(Payment.__name__): operation_prefix = prefix + "paymentOp." return _get_payment_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(RevokeSponsorship.__name__): operation_prefix = prefix + "revokeSponsorshipOp." return _get_revoke_sponsorship_op( source_account_id, operation_prefix, raw_data_map ) elif operation_type == _to_caps_with_under(SetOptions.__name__): operation_prefix = prefix + "setOptionsOp." return _get_set_options_op(source_account_id, operation_prefix, raw_data_map) elif operation_type == _to_caps_with_under(SetTrustLineFlags.__name__): operation_prefix = prefix + "setTrustLineFlagsOp." return _get_set_trust_line_flags_op( source_account_id, operation_prefix, raw_data_map ) else: raise SdkValueError( f"This operation has not been implemented yet, " f"operation type: {operation_type}." ) def _get_set_options_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> SetOptions: inflation_dest = None clear_flags = None set_flags = None master_weight = None low_threshold = None med_threshold = None high_threshold = None signer = None home_domain = None if _get_bool_value(raw_data_map, f"{operation_prefix}inflationDest._present"): inflation_dest = _get_value(raw_data_map, f"{operation_prefix}inflationDest") if _get_bool_value(raw_data_map, f"{operation_prefix}clearFlags._present"): clear_flags = _get_int_value(raw_data_map, f"{operation_prefix}clearFlags") if _get_bool_value(raw_data_map, f"{operation_prefix}setFlags._present"): set_flags = _get_int_value(raw_data_map, f"{operation_prefix}setFlags") if _get_bool_value(raw_data_map, f"{operation_prefix}masterWeight._present"): master_weight = _get_int_value(raw_data_map, f"{operation_prefix}masterWeight") if _get_bool_value(raw_data_map, f"{operation_prefix}lowThreshold._present"): low_threshold = _get_int_value(raw_data_map, f"{operation_prefix}lowThreshold") if _get_bool_value(raw_data_map, f"{operation_prefix}medThreshold._present"): med_threshold = _get_int_value(raw_data_map, f"{operation_prefix}medThreshold") if _get_bool_value(raw_data_map, f"{operation_prefix}highThreshold._present"): high_threshold = _get_int_value( raw_data_map, f"{operation_prefix}highThreshold" ) if _get_bool_value(raw_data_map, f"{operation_prefix}signer._present"): weight = _get_int_value(raw_data_map, f"{operation_prefix}signer.weight") key = _get_value(raw_data_map, f"{operation_prefix}signer.key") if key.startswith("G"): signer = Signer.ed25519_public_key(key, weight) elif key.startswith("X"): sha256_hash = StrKey.decode_sha256_hash(key) signer = Signer.sha256_hash(sha256_hash, weight) elif key.startswith("T"): pre_auth_tx_hash = StrKey.decode_pre_auth_tx(key) signer = Signer.pre_auth_tx(pre_auth_tx_hash, weight) else: raise SdkValueError("Signer key should start with `G`, `X` or `T`.") if _get_bool_value(raw_data_map, f"{operation_prefix}homeDomain._present"): home_domain = _get_string_value(raw_data_map, f"{operation_prefix}homeDomain") return SetOptions( inflation_dest=inflation_dest, clear_flags=clear_flags, set_flags=set_flags, master_weight=master_weight, low_threshold=low_threshold, med_threshold=med_threshold, high_threshold=high_threshold, signer=signer, home_domain=home_domain, source=source, ) def _get_path_payment_strict_receive_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> PathPaymentStrictReceive: send_asset = _get_asset(raw_data_map, f"{operation_prefix}sendAsset") send_max = _get_amount_value(raw_data_map, f"{operation_prefix}sendMax") destination = _get_value(raw_data_map, f"{operation_prefix}destination") dest_asset = _get_asset(raw_data_map, f"{operation_prefix}destAsset") dest_amount = _get_amount_value(raw_data_map, f"{operation_prefix}destAmount") path_length = _get_int_value(raw_data_map, f"{operation_prefix}path.len") path = [] for i in range(path_length): asset = _get_asset(raw_data_map, f"{operation_prefix}path[{i}]") path.append(asset) return PathPaymentStrictReceive( destination=destination, send_asset=send_asset, send_max=send_max, dest_asset=dest_asset, dest_amount=dest_amount, path=path, source=source, ) def _get_path_payment_strict_send_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> PathPaymentStrictSend: send_asset = _get_asset(raw_data_map, f"{operation_prefix}sendAsset") send_amount = _get_amount_value(raw_data_map, f"{operation_prefix}sendAmount") destination = _get_value(raw_data_map, f"{operation_prefix}destination") dest_asset = _get_asset(raw_data_map, f"{operation_prefix}destAsset") dest_min = _get_amount_value(raw_data_map, f"{operation_prefix}destMin") path_length = _get_int_value(raw_data_map, f"{operation_prefix}path.len") path = [] for i in range(path_length): asset = _get_asset(raw_data_map, f"{operation_prefix}path[{i}]") path.append(asset) return PathPaymentStrictSend( destination=destination, send_asset=send_asset, send_amount=send_amount, dest_asset=dest_asset, dest_min=dest_min, path=path, source=source, ) def _get_create_passive_sell_offer_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> CreatePassiveSellOffer: selling = _get_asset(raw_data_map, f"{operation_prefix}selling") buying = _get_asset(raw_data_map, f"{operation_prefix}buying") amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount") price_n = _get_int_value(raw_data_map, f"{operation_prefix}price.n") price_d = _get_int_value(raw_data_map, f"{operation_prefix}price.d") price = Price(n=price_n, d=price_d) return CreatePassiveSellOffer( selling=selling, buying=buying, amount=amount, price=price, source=source ) def _get_manage_buy_offer_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> ManageBuyOffer: selling = _get_asset(raw_data_map, f"{operation_prefix}selling") buying = _get_asset(raw_data_map, f"{operation_prefix}buying") amount = _get_amount_value(raw_data_map, f"{operation_prefix}buyAmount") offer_id = _get_int_value(raw_data_map, f"{operation_prefix}offerID") price_n = _get_int_value(raw_data_map, f"{operation_prefix}price.n") price_d = _get_int_value(raw_data_map, f"{operation_prefix}price.d") price = Price(n=price_n, d=price_d) return ManageBuyOffer( selling=selling, buying=buying, amount=amount, price=price, offer_id=offer_id, source=source, ) def _get_manage_sell_offer_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> ManageSellOffer: selling = _get_asset(raw_data_map, f"{operation_prefix}selling") buying = _get_asset(raw_data_map, f"{operation_prefix}buying") amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount") offer_id = _get_int_value(raw_data_map, f"{operation_prefix}offerID") price_n = _get_int_value(raw_data_map, f"{operation_prefix}price.n") price_d = _get_int_value(raw_data_map, f"{operation_prefix}price.d") price = Price(n=price_n, d=price_d) return ManageSellOffer( selling=selling, buying=buying, amount=amount, price=price, offer_id=offer_id, source=source, ) def _get_payment_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> Payment: destination = _get_value(raw_data_map, f"{operation_prefix}destination") asset = _get_asset(raw_data_map, f"{operation_prefix}asset") amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount") return Payment(destination=destination, asset=asset, amount=amount, source=source) def _get_manage_data_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> ManageData: data_name = _get_string_value(raw_data_map, f"{operation_prefix}dataName") data_value = None if _get_bool_value(raw_data_map, f"{operation_prefix}dataValue._present"): data_value = _get_bytes_value(raw_data_map, f"{operation_prefix}dataValue") return ManageData(data_name=data_name, data_value=data_value, source=source) def _get_bump_sequence_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> BumpSequence: bump_to = _get_int_value(raw_data_map, f"{operation_prefix}bumpTo") return BumpSequence(bump_to=bump_to, source=source) def _get_allow_trust_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> AllowTrust: trustor = _get_value(raw_data_map, f"{operation_prefix}trustor") asset_code = _get_value(raw_data_map, f"{operation_prefix}asset") authorize = _get_bool_value(raw_data_map, f"{operation_prefix}authorize") return AllowTrust( trustor=trustor, asset_code=asset_code, authorize=authorize, source=source ) def _get_change_trust_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> ChangeTrust: line = _get_asset(raw_data_map, f"{operation_prefix}line") limit = _get_amount_value(raw_data_map, f"{operation_prefix}limit") return ChangeTrust(asset=line, limit=limit, source=source) def _get_inflation_op(source: str) -> Inflation: return Inflation(source=source) def _get_account_merge_op( source: str, transaction_prefix: str, raw_data_map: Dict[str, str], index: int ) -> AccountMerge: destination = _get_value( raw_data_map, f"{transaction_prefix}operations[{index}].body.destination" ) return AccountMerge(destination=destination, source=source) def _get_create_account_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> CreateAccount: destination = _get_value(raw_data_map, f"{operation_prefix}destination") starting_balance = _get_amount_value( raw_data_map, f"{operation_prefix}startingBalance" ) return CreateAccount( destination=destination, starting_balance=starting_balance, source=source ) def _get_begin_sponsoring_future_reserves_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> BeginSponsoringFutureReserves: sponsored_id = _get_value(raw_data_map, f"{operation_prefix}sponsoredID") return BeginSponsoringFutureReserves(sponsored_id=sponsored_id, source=source) def _get_claim_claimable_balance_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> ClaimClaimableBalance: balance_id = _get_value(raw_data_map, f"{operation_prefix}balanceID") return ClaimClaimableBalance(balance_id=balance_id, source=source) def _get_clawback_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> Clawback: asset = _get_asset(raw_data_map, f"{operation_prefix}asset") from_ = _get_value(raw_data_map, f"{operation_prefix}from") amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount") return Clawback(asset=asset, from_=from_, amount=amount, source=source) def _get_clawback_claimable_balance_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> ClawbackClaimableBalance: balance_id = _get_value(raw_data_map, f"{operation_prefix}balanceID") return ClawbackClaimableBalance(balance_id=balance_id, source=source) def _get_create_claimable_balance_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ): def parse_claimant_predicate( prefix: str, raw_data_map: Dict[str, str] ) -> ClaimPredicate: claimant_predicate_type = _get_value(raw_data_map, f"{prefix}.type") if claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_AND.name: left = parse_claimant_predicate(f"{prefix}.andPredicates[0]", raw_data_map) right = parse_claimant_predicate(f"{prefix}.andPredicates[1]", raw_data_map) return ClaimPredicate.predicate_and(left, right) elif claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_OR.name: left = parse_claimant_predicate(f"{prefix}.orPredicates[0]", raw_data_map) right = parse_claimant_predicate(f"{prefix}.orPredicates[1]", raw_data_map) return ClaimPredicate.predicate_or(left, right) elif claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_NOT.name: predicate = parse_claimant_predicate(f"{prefix}.notPredicate", raw_data_map) return ClaimPredicate.predicate_not(predicate) elif ( claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_ABSOLUTE_TIME.name ): before_absolute_time = _get_int_value(raw_data_map, f"{prefix}.absBefore") return ClaimPredicate.predicate_before_absolute_time(before_absolute_time) elif ( claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_RELATIVE_TIME.name ): before_relative_time = _get_int_value(raw_data_map, f"{prefix}.relBefore") return ClaimPredicate.predicate_before_relative_time(before_relative_time) else: raise SdkValueError( f"This claim predicate type has not been implemented yet, " f"claim predicate type: {claimant_predicate_type}." ) asset = _get_asset(raw_data_map, f"{operation_prefix}asset") amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount") claimants: List[Claimant] = [] claimants_len = _get_int_value(raw_data_map, f"{operation_prefix}claimants.len") for index in range(claimants_len): destination = _get_value( raw_data_map, f"{operation_prefix}claimants[{index}].v0.destination" ) claimant_predicate = parse_claimant_predicate( f"{operation_prefix}claimants[{index}].v0.predicate", raw_data_map ) claimant = Claimant(destination=destination, predicate=claimant_predicate) claimants.append(claimant) return CreateClaimableBalance( asset=asset, amount=amount, claimants=claimants, source=source ) def _get_end_sponsoring_future_reserves_op(source: str) -> EndSponsoringFutureReserves: return EndSponsoringFutureReserves(source=source) def _get_revoke_sponsorship_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> RevokeSponsorship: revoke_sponsorship_type = _get_value(raw_data_map, f"{operation_prefix}type") if ( revoke_sponsorship_type == stellar_xdr.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name ): ledger_entry_type = _get_value( raw_data_map, f"{operation_prefix}ledgerKey.type" ) if ledger_entry_type == stellar_xdr.LedgerEntryType.ACCOUNT.name: account = _get_value(raw_data_map, f"{operation_prefix}ledgerKey.account") return RevokeSponsorship.revoke_account_sponsorship( account_id=account, source=source ) elif ledger_entry_type == stellar_xdr.LedgerEntryType.TRUSTLINE.name: account = _get_value( raw_data_map, f"{operation_prefix}ledgerKey.trustLine.accountID" ) asset = _get_asset( raw_data_map, f"{operation_prefix}ledgerKey.trustLine.asset" ) return RevokeSponsorship.revoke_trustline_sponsorship( account_id=account, asset=asset, source=source ) elif ledger_entry_type == stellar_xdr.LedgerEntryType.OFFER.name: sell_id = _get_value( raw_data_map, f"{operation_prefix}ledgerKey.offer.sellerID" ) offer_id = _get_int_value( raw_data_map, f"{operation_prefix}ledgerKey.offer.offerID" ) return RevokeSponsorship.revoke_offer_sponsorship( seller_id=sell_id, offer_id=offer_id, source=source ) elif ledger_entry_type == stellar_xdr.LedgerEntryType.DATA.name: account_id = _get_value( raw_data_map, f"{operation_prefix}ledgerKey.data.accountID" ) data_name = _get_value( raw_data_map, f"{operation_prefix}ledgerKey.data.dataName" ) return RevokeSponsorship.revoke_data_sponsorship( account_id=account_id, data_name=data_name, source=source ) elif ledger_entry_type == stellar_xdr.LedgerEntryType.CLAIMABLE_BALANCE.name: claimable_balance_id = _get_value( raw_data_map, f"{operation_prefix}ledgerKey.claimableBalance.balanceID" ) return RevokeSponsorship.revoke_claimable_balance_sponsorship( claimable_balance_id=claimable_balance_id, source=source ) else: raise SdkValueError( f"This ledger entry type has not been implemented yet, " f"ledger entry type: {ledger_entry_type}." ) elif ( revoke_sponsorship_type == stellar_xdr.RevokeSponsorshipType.REVOKE_SPONSORSHIP_SIGNER.name ): signer_key_type = _get_value( raw_data_map, f"{operation_prefix}signer.signerKey.type" ) account_id = _get_value(raw_data_map, f"{operation_prefix}signer.accountID") if ( signer_key_type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_PRE_AUTH_TX.name ): key = _get_value( raw_data_map, f"{operation_prefix}signer.signerKey.preAuthTx" ) signer_key = SignerKey.pre_auth_tx(StrKey.decode_pre_auth_tx(key)) elif signer_key_type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_ED25519.name: key = _get_value( raw_data_map, f"{operation_prefix}signer.signerKey.ed25519" ) signer_key = SignerKey.ed25519_public_key(key) elif signer_key_type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_HASH_X.name: key = _get_value(raw_data_map, f"{operation_prefix}signer.signerKey.hashX") signer_key = SignerKey.sha256_hash(StrKey.decode_sha256_hash(key)) else: raise SdkValueError( f"This signer key type has not been implemented yet, " f"signer key type: {signer_key_type}." ) return RevokeSponsorship.revoke_signer_sponsorship( account_id=account_id, signer_key=signer_key, source=source ) else: raise SdkValueError( f"This revoke sponsorship type has not been implemented yet, " f"revoke sponsorship type: {revoke_sponsorship_type}." ) def _get_set_trust_line_flags_op( source: str, operation_prefix: str, raw_data_map: Dict[str, str] ) -> SetTrustLineFlags: trustor = _get_value(raw_data_map, f"{operation_prefix}trustor") asset = _get_asset(raw_data_map, f"{operation_prefix}asset") clear_flags_raw = _get_int_value(raw_data_map, f"{operation_prefix}clearFlags") set_flags_raw = _get_int_value(raw_data_map, f"{operation_prefix}setFlags") if clear_flags_raw == 0: clear_flags = None else: clear_flags = TrustLineFlags(clear_flags_raw) if set_flags_raw == 0: set_flags = None else: set_flags = TrustLineFlags(set_flags_raw) return SetTrustLineFlags( trustor=trustor, asset=asset, clear_flags=clear_flags, set_flags=set_flags, source=source, ) def _get_asset(raw_data_map: Dict[str, str], key: str) -> Asset: return _decode_asset(_get_value(raw_data_map, key)) def _get_amount_value(raw_data_map: Dict[str, str], key: str) -> str: value = _get_int_value(raw_data_map, key) return Operation.from_xdr_amount(value) def _get_int_value(raw_data_map: Dict[str, str], key: str) -> int: value = _get_value(raw_data_map, key) try: return int(value) except ValueError as e: raise SdkValueError(f"Failed to convert `{value}` to int type.") from e def _get_bool_value(raw_data_map: Dict[str, str], key: str) -> bool: value = _get_value(raw_data_map, key) if value == _true: return True elif value == _false: return False else: raise SdkValueError(f"Failed to convert `{value}` to bool type.") def _get_bytes_value(raw_data_map: Dict[str, str], key: str) -> bytes: value = _get_value(raw_data_map, key) if value[0] == '"': # for text memo. return _get_string_value(raw_data_map, key).encode() try: return bytes.fromhex(value) except ValueError as e: raise SdkValueError(f"Failed to convert `{value}` to bytes type.") from e def _get_string_value(raw_data_map: Dict[str, str], key: str) -> str: value = _get_value(raw_data_map, key) if len(value) == 0: return value return value[1:-1] def _get_value(raw_data_map: Dict[str, str], key: str) -> str: try: return raw_data_map[key] except KeyError as e: raise SdkValueError(f"`{key}` is missing from txrep.") from e def _decode_asset(asset: str) -> Asset: # native (or any string up to 12 characters not containing an unescaped colon) for the native asset if ":" not in asset and len(asset) <= 12: return Asset.native() parts = asset.split(":") if len(parts) != 2: raise SdkValueError("Failed to decode asset string.") return Asset(parts[0], parts[1]) def _add_line( key: str, value: Union[str, int], lines: List[str], comment: Union[str, int, Decimal] = None, ) -> None: lines.append(f"{key}: {value}{' (' + str(comment) + ')' if comment else ''}") def _add_time_bounds( time_bounds: Optional[TimeBounds], prefix: str, lines: List[str] ) -> None: if time_bounds is None: _add_line(f"{prefix}timeBounds._present", _false, lines) else: _add_line(f"{prefix}timeBounds._present", _true, lines) _add_line( f"{prefix}timeBounds.minTime", time_bounds.min_time, lines, _to_readable_utc_time_comment(time_bounds.min_time), ) _add_line( f"{prefix}timeBounds.maxTime", time_bounds.max_time, lines, _to_readable_utc_time_comment(time_bounds.max_time), ) def _add_memo(memo: Memo, prefix: str, lines: List[str]) -> None: if isinstance(memo, NoneMemo): _add_line(f"{prefix}memo.type", "MEMO_NONE", lines) if isinstance(memo, TextMemo): _add_line(f"{prefix}memo.type", "MEMO_TEXT", lines) # I don't think we should decode it. _add_line(f"{prefix}memo.text", _to_string(memo.memo_text), lines) if isinstance(memo, IdMemo): _add_line(f"{prefix}memo.type", "MEMO_ID", lines) _add_line(f"{prefix}memo.id", memo.memo_id, lines) if isinstance(memo, HashMemo): _add_line(f"{prefix}memo.type", "MEMO_HASH", lines) _add_line(f"{prefix}memo.hash", _to_opaque(memo.memo_hash), lines) if isinstance(memo, ReturnHashMemo): _add_line(f"{prefix}memo.type", "MEMO_RETURN", lines) _add_line(f"{prefix}memo.retHash", _to_opaque(memo.memo_return), lines) def _add_operations(operations: List[Operation], prefix: str, lines: List[str]) -> None: _add_line(f"{prefix}operations.len", len(operations), lines) for index, operation in enumerate(operations): _add_operation(index, operation, prefix, lines) def _add_operation( index: int, operation: Operation, prefix: str, lines: List[str] ) -> None: prefix = f"{prefix}operations[{index}]." operation_type = operation.__class__.__name__ def add_operation_line( key: str, value: Union[str, int], comment: Union[str, int, Decimal] = None ) -> None: _add_line(f"{prefix}{key}", value, lines, comment) if operation.source is not None: add_operation_line("sourceAccount._present", _true) add_operation_line( "sourceAccount", _to_muxed_account(operation.source), comment=_to_muxed_account_comment(operation.source), ) else: add_operation_line("sourceAccount._present", _false) add_operation_line("body.type", _to_caps_with_under(operation_type)) def add_body_line( key: str, value: Union[str, int, None], optional: bool = False, comment: Union[str, int, Decimal] = None, ) -> None: operation_type = operation.__class__.__name__ key = f"body.{_to_camel_case(operation_type)}Op.{key}" if optional: present = True if value is not None else False add_operation_line(f"{key}._present", _true if present else _false) if present: assert value is not None add_operation_line(key, value, comment=comment) else: assert value is not None add_operation_line(key, value, comment=comment) def add_signer(signer: Optional[Signer]) -> None: add_body_line("signer._present", _false if signer is None else _true) if signer is None: return if ( signer.signer_key.signer_key.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_ED25519 ): assert signer.signer_key.signer_key.ed25519 is not None add_body_line( "signer.key", StrKey.encode_ed25519_public_key( signer.signer_key.signer_key.ed25519.uint256 ), ) if ( signer.signer_key.signer_key.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_PRE_AUTH_TX ): assert signer.signer_key.signer_key.pre_auth_tx is not None add_body_line( "signer.key", StrKey.encode_pre_auth_tx( signer.signer_key.signer_key.pre_auth_tx.uint256 ), ) if ( signer.signer_key.signer_key.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_HASH_X ): assert signer.signer_key.signer_key.hash_x is not None add_body_line( "signer.key", StrKey.encode_sha256_hash(signer.signer_key.signer_key.hash_x.uint256), ) add_body_line("signer.weight", signer.weight) def add_price(price: Union[Price, str, Decimal]) -> None: price = _to_price(price) add_body_line("price.n", price.n) add_body_line("price.d", price.d) def add_home_domain(home_domain: Optional[str]) -> None: if home_domain is None: add_body_line("homeDomain", None, True) else: add_body_line("homeDomain", _to_string(home_domain), True) def add_claim_predicate(prefix: str, claimant_predicate: ClaimPredicate): add_body_line(f"{prefix}.type", claimant_predicate.claim_predicate_type.name) if ( claimant_predicate.claim_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_UNCONDITIONAL ): pass elif ( claimant_predicate.claim_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_NOT ): assert claimant_predicate.not_predicate is not None add_claim_predicate( f"{prefix}.notPredicate", claimant_predicate.not_predicate ) elif ( claimant_predicate.claim_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_AND ): add_body_line(f"{prefix}.andPredicates.len", 2) assert claimant_predicate.and_predicates is not None add_claim_predicate( f"{prefix}.andPredicates[0]", claimant_predicate.and_predicates.left ) add_claim_predicate( f"{prefix}.andPredicates[1]", claimant_predicate.and_predicates.right ) elif ( claimant_predicate.claim_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_OR ): add_body_line(f"{prefix}.orPredicates.len", 2) assert claimant_predicate.or_predicates is not None add_claim_predicate( f"{prefix}.orPredicates[0]", claimant_predicate.or_predicates.left ) add_claim_predicate( f"{prefix}.orPredicates[1]", claimant_predicate.or_predicates.right ) elif ( claimant_predicate.claim_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_ABSOLUTE_TIME ): assert claimant_predicate.abs_before is not None add_body_line( f"{prefix}.absBefore", claimant_predicate.abs_before, comment=_to_readable_utc_time_comment(claimant_predicate.abs_before), ) elif ( claimant_predicate.claim_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_RELATIVE_TIME ): assert claimant_predicate.rel_before is not None add_body_line( f"{prefix}.relBefore", claimant_predicate.rel_before, comment=_to_readable_utc_time_comment(claimant_predicate.rel_before), ) else: raise SdkValueError( f"This claim predicate type has not been implemented yet, " f"claim predicate type: {claimant_predicate.claim_predicate_type}." ) if isinstance(operation, CreateAccount): add_body_line("destination", operation.destination) add_body_line( "startingBalance", _to_amount(operation.starting_balance), comment=operation.starting_balance, ) elif isinstance(operation, Payment): add_body_line( "destination", _to_muxed_account(operation.destination), comment=_to_muxed_account_comment(operation.destination), ) add_body_line("asset", _to_asset(operation.asset)) add_body_line("amount", _to_amount(operation.amount), comment=operation.amount) elif isinstance(operation, PathPaymentStrictReceive): add_body_line("sendAsset", _to_asset(operation.send_asset)) add_body_line( "sendMax", _to_amount(operation.send_max), comment=operation.send_max ) add_body_line( "destination", _to_muxed_account(operation.destination), comment=_to_muxed_account_comment(operation.destination), ) add_body_line("destAsset", _to_asset(operation.dest_asset)) add_body_line( "destAmount", _to_amount(operation.dest_amount), comment=operation.dest_amount, ) add_body_line("path.len", len(operation.path)) for index, asset in enumerate(operation.path): add_body_line(f"path[{index}]", _to_asset(asset)) elif isinstance(operation, ManageSellOffer): add_body_line("selling", _to_asset(operation.selling)) add_body_line("buying", _to_asset(operation.buying)) add_body_line("amount", _to_amount(operation.amount), comment=operation.amount) add_price(operation.price) add_body_line("offerID", operation.offer_id) elif isinstance(operation, CreatePassiveSellOffer): add_body_line("selling", _to_asset(operation.selling)) add_body_line("buying", _to_asset(operation.buying)) add_body_line("amount", _to_amount(operation.amount), comment=operation.amount) add_price(operation.price) elif isinstance(operation, SetOptions): add_body_line("inflationDest", operation.inflation_dest, True) add_body_line("clearFlags", operation.clear_flags, True) add_body_line("setFlags", operation.set_flags, True) add_body_line("masterWeight", operation.master_weight, True) add_body_line("lowThreshold", operation.low_threshold, True) add_body_line("medThreshold", operation.med_threshold, True) add_body_line("highThreshold", operation.high_threshold, True) add_home_domain(operation.home_domain) add_signer(operation.signer) elif isinstance(operation, ChangeTrust): add_body_line("line", _to_asset(operation.asset)) add_body_line("limit", _to_amount(operation.limit), comment=operation.limit) elif isinstance(operation, AllowTrust): add_body_line("trustor", operation.trustor) add_body_line("asset", operation.asset_code) add_body_line("authorize", _true if operation.authorize else _false) elif isinstance(operation, AccountMerge): # AccountMerge does not include 'accountMergeOp' prefix # see https://github.com/StellarCN/py-stellar-base/blob/master/.xdr/Stellar-transaction.x#L282 add_operation_line( "body.destination", _to_muxed_account(operation.destination), comment=_to_muxed_account_comment(operation.destination), ) elif isinstance(operation, ManageData): add_body_line("dataName", _to_string(operation.data_name)) if operation.data_value is None: add_body_line("dataValue._present", _false) else: add_body_line("dataValue._present", _true) add_body_line("dataValue", _to_opaque(operation.data_value)) elif isinstance(operation, BumpSequence): add_body_line("bumpTo", operation.bump_to) elif isinstance(operation, ManageBuyOffer): add_body_line("selling", _to_asset(operation.selling)) add_body_line("buying", _to_asset(operation.buying)) add_body_line( "buyAmount", _to_amount(operation.amount), comment=operation.amount ) add_price(operation.price) add_body_line("offerID", operation.offer_id) elif isinstance(operation, PathPaymentStrictSend): add_body_line("sendAsset", _to_asset(operation.send_asset)) add_body_line( "sendAmount", _to_amount(operation.send_amount), comment=operation.send_amount, ) add_body_line( "destination", _to_muxed_account(operation.destination), comment=_to_muxed_account_comment(operation.destination), ) add_body_line("destAsset", _to_asset(operation.dest_asset)) add_body_line( "destMin", _to_amount(operation.dest_min), comment=operation.dest_min ) add_body_line("path.len", len(operation.path)) for index, asset in enumerate(operation.path): add_body_line(f"path[{index}]", _to_asset(asset)) elif isinstance(operation, Inflation): # no body pass elif isinstance(operation, CreateClaimableBalance): add_body_line("asset", _to_asset(operation.asset)) add_body_line("amount", _to_amount(operation.amount), comment=operation.amount) add_body_line("claimants.len", len(operation.claimants)) for index, claimant in enumerate(operation.claimants): # current CLAIMANT_TYPE is CLAIMANT_TYPE_V0 add_body_line( f"claimants[{index}].type", stellar_xdr.ClaimantType.CLAIMANT_TYPE_V0.name, ) add_body_line(f"claimants[{index}].v0.destination", claimant.destination) add_claim_predicate(f"claimants[{index}].v0.predicate", claimant.predicate) elif isinstance(operation, ClaimClaimableBalance): add_body_line("balanceID", operation.balance_id) elif isinstance(operation, BeginSponsoringFutureReserves): add_body_line("sponsoredID", operation.sponsored_id) elif isinstance(operation, EndSponsoringFutureReserves): # no body pass elif isinstance(operation, RevokeSponsorship): if operation.revoke_sponsorship_type == RevokeSponsorshipType.ACCOUNT: add_body_line( "type", stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name, ) add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.ACCOUNT.name) add_body_line("ledgerKey.account", operation.account_id) elif operation.revoke_sponsorship_type == RevokeSponsorshipType.TRUSTLINE: add_body_line( "type", stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name, ) add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.TRUSTLINE.name) assert operation.trustline is not None add_body_line( "ledgerKey.trustLine.accountID", operation.trustline.account_id ) add_body_line( "ledgerKey.trustLine.asset", _to_asset(operation.trustline.asset) ) elif operation.revoke_sponsorship_type == RevokeSponsorshipType.OFFER: add_body_line( "type", stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name, ) add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.OFFER.name) assert operation.offer is not None add_body_line("ledgerKey.offer.sellerID", operation.offer.seller_id) add_body_line("ledgerKey.offer.offerID", operation.offer.offer_id) elif operation.revoke_sponsorship_type == RevokeSponsorshipType.DATA: add_body_line( "type", stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name, ) add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.DATA.name) assert operation.data is not None add_body_line("ledgerKey.data.accountID", operation.data.account_id) add_body_line("ledgerKey.data.dataName", operation.data.data_name) elif ( operation.revoke_sponsorship_type == RevokeSponsorshipType.CLAIMABLE_BALANCE ): add_body_line( "type", stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name, ) add_body_line( "ledgerKey.type", stellar_xdr.LedgerEntryType.CLAIMABLE_BALANCE.name ) add_body_line( "ledgerKey.claimableBalance.balanceID", operation.claimable_balance_id ) elif operation.revoke_sponsorship_type == RevokeSponsorshipType.SIGNER: assert operation.signer is not None add_body_line( "type", stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_SIGNER.name, ) add_body_line("signer.accountID", operation.signer.account_id) add_body_line( "signer.signerKey.type", operation.signer.signer_key.signer_key.type.name, ) if ( operation.signer.signer_key.signer_key.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_HASH_X ): assert operation.signer.signer_key.signer_key.hash_x is not None key = StrKey.encode_sha256_hash( operation.signer.signer_key.signer_key.hash_x.uint256 ) add_body_line("signer.signerKey.hashX", key) elif ( operation.signer.signer_key.signer_key.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_ED25519 ): assert operation.signer.signer_key.signer_key.ed25519 is not None key = StrKey.encode_ed25519_public_key( operation.signer.signer_key.signer_key.ed25519.uint256 ) add_body_line("signer.signerKey.ed25519", key) elif ( operation.signer.signer_key.signer_key.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_PRE_AUTH_TX ): assert operation.signer.signer_key.signer_key.pre_auth_tx is not None key = StrKey.encode_pre_auth_tx( operation.signer.signer_key.signer_key.pre_auth_tx.uint256 ) add_body_line("signer.signerKey.preAuthTx", key) else: raise SdkValueError( f"This signer key type has not been implemented yet, " f"signer key type: {operation.signer.signer_key.signer_key.type}." ) else: raise SdkValueError( f"This revoke sponsorship type has not been implemented yet, " f"revoke sponsorship type: {operation.revoke_sponsorship_type}." ) elif isinstance(operation, Clawback): add_body_line("asset", _to_asset(operation.asset)) add_body_line( "from", _to_muxed_account(operation.from_), comment=_to_muxed_account_comment(operation.from_), ) add_body_line("amount", _to_amount(operation.amount), comment=operation.amount) elif isinstance(operation, ClawbackClaimableBalance): add_body_line("balanceID", operation.balance_id) elif isinstance(operation, SetTrustLineFlags): add_body_line("trustor", operation.trustor) add_body_line("asset", _to_asset(operation.asset)) if operation.clear_flags is None: add_body_line("clearFlags", 0) else: add_body_line("clearFlags", operation.clear_flags.value) if operation.set_flags is None: add_body_line("setFlags", 0) else: add_body_line("setFlags", operation.set_flags.value) else: raise SdkValueError( f"This operation has not been implemented yet, " f"operation type: {operation}." ) def _add_signatures( signatures: List[stellar_xdr.DecoratedSignature], prefix: str, lines: List[str] ) -> None: _add_line(f"{prefix}signatures.len", len(signatures), lines) for index, signature in enumerate(signatures): _add_signature(index, signature, prefix, lines) def _add_signature( index: int, signature: stellar_xdr.DecoratedSignature, prefix: str, lines: List[str] ) -> None: prefix = f"{prefix}signatures[{index}]." _add_line(f"{prefix}hint", _to_opaque(signature.hint.signature_hint), lines) _add_line(f"{prefix}signature", _to_opaque(signature.signature.signature), lines) def _to_asset(asset: Asset) -> str: if asset.is_native(): return "native" return f"{asset.code}:{asset.issuer}" def _to_readable_utc_time_comment(timestamp: int) -> str: utc_time = datetime.utcfromtimestamp(timestamp) return utc_time.strftime("%Y-%m-%d %H:%M:%S.%f+00:00 (UTC)") def _to_muxed_account_comment(account: MuxedAccount) -> Optional[str]: if account.account_muxed_id is None: return None return ( f"accountID: {account.account_id}, accountMuxedID: {account.account_muxed_id}" ) def _to_amount(amount: Union[Decimal, str]) -> int: return Operation.to_xdr_amount(amount) def _to_price(price: Union[Price, str, Decimal]) -> Price: if isinstance(price, Price): price_fraction = price else: price_fraction = Price.from_raw_price(price) return price_fraction def _to_camel_case(cap_words: str) -> str: return cap_words[0].lower() + cap_words[1:] def _to_caps_with_under(word): return "".join(["_" + c if c.isupper() else c for c in word]).lstrip("_").upper() def _to_string(value: Union[str, bytes]) -> str: if isinstance(value, str): return json.dumps(value) # We are not following the standard here, it needs more discussion. try: value = value.decode("utf-8") except UnicodeDecodeError: assert isinstance(value, bytes) return _to_opaque(value) return json.dumps(value) def _to_opaque(value: bytes) -> str: return value.hex()