Source code for stellar_sdk.xdr.time_sliced_node_data

# This is an automatically generated file.
# DO NOT EDIT or your changes may be overwritten
from __future__ import annotations

import base64

from xdrlib3 import Packer, Unpacker

from .base import Boolean
from .uint32 import Uint32

__all__ = ["TimeSlicedNodeData"]


[docs] class TimeSlicedNodeData: """ XDR Source Code:: struct TimeSlicedNodeData { uint32 addedAuthenticatedPeers; uint32 droppedAuthenticatedPeers; uint32 totalInboundPeerCount; uint32 totalOutboundPeerCount; // SCP stats uint32 p75SCPFirstToSelfLatencyMs; uint32 p75SCPSelfToOtherLatencyMs; // How many times the node lost sync in the time slice uint32 lostSyncCount; // Config data bool isValidator; uint32 maxInboundPeerCount; uint32 maxOutboundPeerCount; }; """ def __init__( self, added_authenticated_peers: Uint32, dropped_authenticated_peers: Uint32, total_inbound_peer_count: Uint32, total_outbound_peer_count: Uint32, p75_scp_first_to_self_latency_ms: Uint32, p75_scp_self_to_other_latency_ms: Uint32, lost_sync_count: Uint32, is_validator: bool, max_inbound_peer_count: Uint32, max_outbound_peer_count: Uint32, ) -> None: self.added_authenticated_peers = added_authenticated_peers self.dropped_authenticated_peers = dropped_authenticated_peers self.total_inbound_peer_count = total_inbound_peer_count self.total_outbound_peer_count = total_outbound_peer_count self.p75_scp_first_to_self_latency_ms = p75_scp_first_to_self_latency_ms self.p75_scp_self_to_other_latency_ms = p75_scp_self_to_other_latency_ms self.lost_sync_count = lost_sync_count self.is_validator = is_validator self.max_inbound_peer_count = max_inbound_peer_count self.max_outbound_peer_count = max_outbound_peer_count def pack(self, packer: Packer) -> None: self.added_authenticated_peers.pack(packer) self.dropped_authenticated_peers.pack(packer) self.total_inbound_peer_count.pack(packer) self.total_outbound_peer_count.pack(packer) self.p75_scp_first_to_self_latency_ms.pack(packer) self.p75_scp_self_to_other_latency_ms.pack(packer) self.lost_sync_count.pack(packer) Boolean(self.is_validator).pack(packer) self.max_inbound_peer_count.pack(packer) self.max_outbound_peer_count.pack(packer) @classmethod def unpack(cls, unpacker: Unpacker) -> TimeSlicedNodeData: added_authenticated_peers = Uint32.unpack(unpacker) dropped_authenticated_peers = Uint32.unpack(unpacker) total_inbound_peer_count = Uint32.unpack(unpacker) total_outbound_peer_count = Uint32.unpack(unpacker) p75_scp_first_to_self_latency_ms = Uint32.unpack(unpacker) p75_scp_self_to_other_latency_ms = Uint32.unpack(unpacker) lost_sync_count = Uint32.unpack(unpacker) is_validator = Boolean.unpack(unpacker) max_inbound_peer_count = Uint32.unpack(unpacker) max_outbound_peer_count = Uint32.unpack(unpacker) return cls( added_authenticated_peers=added_authenticated_peers, dropped_authenticated_peers=dropped_authenticated_peers, total_inbound_peer_count=total_inbound_peer_count, total_outbound_peer_count=total_outbound_peer_count, p75_scp_first_to_self_latency_ms=p75_scp_first_to_self_latency_ms, p75_scp_self_to_other_latency_ms=p75_scp_self_to_other_latency_ms, lost_sync_count=lost_sync_count, is_validator=is_validator, max_inbound_peer_count=max_inbound_peer_count, max_outbound_peer_count=max_outbound_peer_count, ) def to_xdr_bytes(self) -> bytes: packer = Packer() self.pack(packer) return packer.get_buffer() @classmethod def from_xdr_bytes(cls, xdr: bytes) -> TimeSlicedNodeData: unpacker = Unpacker(xdr) return cls.unpack(unpacker) def to_xdr(self) -> str: xdr_bytes = self.to_xdr_bytes() return base64.b64encode(xdr_bytes).decode() @classmethod def from_xdr(cls, xdr: str) -> TimeSlicedNodeData: xdr_bytes = base64.b64decode(xdr.encode()) return cls.from_xdr_bytes(xdr_bytes) def __hash__(self): return hash( ( self.added_authenticated_peers, self.dropped_authenticated_peers, self.total_inbound_peer_count, self.total_outbound_peer_count, self.p75_scp_first_to_self_latency_ms, self.p75_scp_self_to_other_latency_ms, self.lost_sync_count, self.is_validator, self.max_inbound_peer_count, self.max_outbound_peer_count, ) ) def __eq__(self, other: object): if not isinstance(other, self.__class__): return NotImplemented return ( self.added_authenticated_peers == other.added_authenticated_peers and self.dropped_authenticated_peers == other.dropped_authenticated_peers and self.total_inbound_peer_count == other.total_inbound_peer_count and self.total_outbound_peer_count == other.total_outbound_peer_count and self.p75_scp_first_to_self_latency_ms == other.p75_scp_first_to_self_latency_ms and self.p75_scp_self_to_other_latency_ms == other.p75_scp_self_to_other_latency_ms and self.lost_sync_count == other.lost_sync_count and self.is_validator == other.is_validator and self.max_inbound_peer_count == other.max_inbound_peer_count and self.max_outbound_peer_count == other.max_outbound_peer_count ) def __repr__(self): out = [ f"added_authenticated_peers={self.added_authenticated_peers}", f"dropped_authenticated_peers={self.dropped_authenticated_peers}", f"total_inbound_peer_count={self.total_inbound_peer_count}", f"total_outbound_peer_count={self.total_outbound_peer_count}", f"p75_scp_first_to_self_latency_ms={self.p75_scp_first_to_self_latency_ms}", f"p75_scp_self_to_other_latency_ms={self.p75_scp_self_to_other_latency_ms}", f"lost_sync_count={self.lost_sync_count}", f"is_validator={self.is_validator}", f"max_inbound_peer_count={self.max_inbound_peer_count}", f"max_outbound_peer_count={self.max_outbound_peer_count}", ] return f"<TimeSlicedNodeData [{', '.join(out)}]>"