Source code for stellar_sdk.xdr.time_sliced_node_data

# This is an automatically generated file.
# DO NOT EDIT or your changes may be overwritten
from __future__ import annotations

import base64
import json

from xdrlib3 import Packer, Unpacker

from .base import DEFAULT_XDR_MAX_DEPTH, Boolean
from .uint32 import Uint32

__all__ = ["TimeSlicedNodeData"]


[docs] class TimeSlicedNodeData: """ XDR Source Code:: struct TimeSlicedNodeData { uint32 addedAuthenticatedPeers; uint32 droppedAuthenticatedPeers; uint32 totalInboundPeerCount; uint32 totalOutboundPeerCount; // SCP stats uint32 p75SCPFirstToSelfLatencyMs; uint32 p75SCPSelfToOtherLatencyMs; // How many times the node lost sync in the time slice uint32 lostSyncCount; // Config data bool isValidator; uint32 maxInboundPeerCount; uint32 maxOutboundPeerCount; }; """ def __init__( self, added_authenticated_peers: Uint32, dropped_authenticated_peers: Uint32, total_inbound_peer_count: Uint32, total_outbound_peer_count: Uint32, p75_scp_first_to_self_latency_ms: Uint32, p75_scp_self_to_other_latency_ms: Uint32, lost_sync_count: Uint32, is_validator: bool, max_inbound_peer_count: Uint32, max_outbound_peer_count: Uint32, ) -> None: self.added_authenticated_peers = added_authenticated_peers self.dropped_authenticated_peers = dropped_authenticated_peers self.total_inbound_peer_count = total_inbound_peer_count self.total_outbound_peer_count = total_outbound_peer_count self.p75_scp_first_to_self_latency_ms = p75_scp_first_to_self_latency_ms self.p75_scp_self_to_other_latency_ms = p75_scp_self_to_other_latency_ms self.lost_sync_count = lost_sync_count self.is_validator = is_validator self.max_inbound_peer_count = max_inbound_peer_count self.max_outbound_peer_count = max_outbound_peer_count def pack(self, packer: Packer) -> None: self.added_authenticated_peers.pack(packer) self.dropped_authenticated_peers.pack(packer) self.total_inbound_peer_count.pack(packer) self.total_outbound_peer_count.pack(packer) self.p75_scp_first_to_self_latency_ms.pack(packer) self.p75_scp_self_to_other_latency_ms.pack(packer) self.lost_sync_count.pack(packer) Boolean(self.is_validator).pack(packer) self.max_inbound_peer_count.pack(packer) self.max_outbound_peer_count.pack(packer) @classmethod def unpack( cls, unpacker: Unpacker, depth_limit: int = DEFAULT_XDR_MAX_DEPTH ) -> TimeSlicedNodeData: if depth_limit <= 0: raise ValueError("Maximum decoding depth reached") added_authenticated_peers = Uint32.unpack(unpacker, depth_limit - 1) dropped_authenticated_peers = Uint32.unpack(unpacker, depth_limit - 1) total_inbound_peer_count = Uint32.unpack(unpacker, depth_limit - 1) total_outbound_peer_count = Uint32.unpack(unpacker, depth_limit - 1) p75_scp_first_to_self_latency_ms = Uint32.unpack(unpacker, depth_limit - 1) p75_scp_self_to_other_latency_ms = Uint32.unpack(unpacker, depth_limit - 1) lost_sync_count = Uint32.unpack(unpacker, depth_limit - 1) is_validator = Boolean.unpack(unpacker) max_inbound_peer_count = Uint32.unpack(unpacker, depth_limit - 1) max_outbound_peer_count = Uint32.unpack(unpacker, depth_limit - 1) return cls( added_authenticated_peers=added_authenticated_peers, dropped_authenticated_peers=dropped_authenticated_peers, total_inbound_peer_count=total_inbound_peer_count, total_outbound_peer_count=total_outbound_peer_count, p75_scp_first_to_self_latency_ms=p75_scp_first_to_self_latency_ms, p75_scp_self_to_other_latency_ms=p75_scp_self_to_other_latency_ms, lost_sync_count=lost_sync_count, is_validator=is_validator, max_inbound_peer_count=max_inbound_peer_count, max_outbound_peer_count=max_outbound_peer_count, ) def to_xdr_bytes(self) -> bytes: packer = Packer() self.pack(packer) return packer.get_buffer() @classmethod def from_xdr_bytes(cls, xdr: bytes) -> TimeSlicedNodeData: unpacker = Unpacker(xdr) result = cls.unpack(unpacker) remaining = len(xdr) - unpacker.get_position() if remaining != 0: raise ValueError(f"Unexpected trailing {remaining} bytes in XDR data") return result def to_xdr(self) -> str: xdr_bytes = self.to_xdr_bytes() return base64.b64encode(xdr_bytes).decode() @classmethod def from_xdr(cls, xdr: str) -> TimeSlicedNodeData: xdr_bytes = base64.b64decode(xdr.encode()) return cls.from_xdr_bytes(xdr_bytes) def to_json(self) -> str: return json.dumps(self.to_json_dict()) @classmethod def from_json(cls, json_str: str) -> TimeSlicedNodeData: return cls.from_json_dict(json.loads(json_str)) def to_json_dict(self) -> dict: return { "added_authenticated_peers": self.added_authenticated_peers.to_json_dict(), "dropped_authenticated_peers": self.dropped_authenticated_peers.to_json_dict(), "total_inbound_peer_count": self.total_inbound_peer_count.to_json_dict(), "total_outbound_peer_count": self.total_outbound_peer_count.to_json_dict(), "p75_scp_first_to_self_latency_ms": self.p75_scp_first_to_self_latency_ms.to_json_dict(), "p75_scp_self_to_other_latency_ms": self.p75_scp_self_to_other_latency_ms.to_json_dict(), "lost_sync_count": self.lost_sync_count.to_json_dict(), "is_validator": Boolean.to_json_dict(self.is_validator), "max_inbound_peer_count": self.max_inbound_peer_count.to_json_dict(), "max_outbound_peer_count": self.max_outbound_peer_count.to_json_dict(), } @classmethod def from_json_dict(cls, json_dict: dict) -> TimeSlicedNodeData: added_authenticated_peers = Uint32.from_json_dict( json_dict["added_authenticated_peers"] ) dropped_authenticated_peers = Uint32.from_json_dict( json_dict["dropped_authenticated_peers"] ) total_inbound_peer_count = Uint32.from_json_dict( json_dict["total_inbound_peer_count"] ) total_outbound_peer_count = Uint32.from_json_dict( json_dict["total_outbound_peer_count"] ) p75_scp_first_to_self_latency_ms = Uint32.from_json_dict( json_dict["p75_scp_first_to_self_latency_ms"] ) p75_scp_self_to_other_latency_ms = Uint32.from_json_dict( json_dict["p75_scp_self_to_other_latency_ms"] ) lost_sync_count = Uint32.from_json_dict(json_dict["lost_sync_count"]) is_validator = Boolean.from_json_dict(json_dict["is_validator"]) max_inbound_peer_count = Uint32.from_json_dict( json_dict["max_inbound_peer_count"] ) max_outbound_peer_count = Uint32.from_json_dict( json_dict["max_outbound_peer_count"] ) return cls( added_authenticated_peers=added_authenticated_peers, dropped_authenticated_peers=dropped_authenticated_peers, total_inbound_peer_count=total_inbound_peer_count, total_outbound_peer_count=total_outbound_peer_count, p75_scp_first_to_self_latency_ms=p75_scp_first_to_self_latency_ms, p75_scp_self_to_other_latency_ms=p75_scp_self_to_other_latency_ms, lost_sync_count=lost_sync_count, is_validator=is_validator, max_inbound_peer_count=max_inbound_peer_count, max_outbound_peer_count=max_outbound_peer_count, ) def __hash__(self): return hash( ( self.added_authenticated_peers, self.dropped_authenticated_peers, self.total_inbound_peer_count, self.total_outbound_peer_count, self.p75_scp_first_to_self_latency_ms, self.p75_scp_self_to_other_latency_ms, self.lost_sync_count, self.is_validator, self.max_inbound_peer_count, self.max_outbound_peer_count, ) ) def __eq__(self, other: object): if not isinstance(other, self.__class__): return NotImplemented return ( self.added_authenticated_peers == other.added_authenticated_peers and self.dropped_authenticated_peers == other.dropped_authenticated_peers and self.total_inbound_peer_count == other.total_inbound_peer_count and self.total_outbound_peer_count == other.total_outbound_peer_count and self.p75_scp_first_to_self_latency_ms == other.p75_scp_first_to_self_latency_ms and self.p75_scp_self_to_other_latency_ms == other.p75_scp_self_to_other_latency_ms and self.lost_sync_count == other.lost_sync_count and self.is_validator == other.is_validator and self.max_inbound_peer_count == other.max_inbound_peer_count and self.max_outbound_peer_count == other.max_outbound_peer_count ) def __repr__(self): out = [ f"added_authenticated_peers={self.added_authenticated_peers}", f"dropped_authenticated_peers={self.dropped_authenticated_peers}", f"total_inbound_peer_count={self.total_inbound_peer_count}", f"total_outbound_peer_count={self.total_outbound_peer_count}", f"p75_scp_first_to_self_latency_ms={self.p75_scp_first_to_self_latency_ms}", f"p75_scp_self_to_other_latency_ms={self.p75_scp_self_to_other_latency_ms}", f"lost_sync_count={self.lost_sync_count}", f"is_validator={self.is_validator}", f"max_inbound_peer_count={self.max_inbound_peer_count}", f"max_outbound_peer_count={self.max_outbound_peer_count}", ] return f"<TimeSlicedNodeData [{', '.join(out)}]>"