KhadhroonySRLPv4/application.py

4220 lines
208 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!.venv/bin/python
"""Khadhroony SRLPv4 main script"""
import argparse
import base58
import base64
# import hashlib
# import io
import json
import os
# import pathlib
import platform
import portalocker
import random
import re
import rc_doc
import rc_icons
import rc_images
import rc_trans
import sys
# import time
import yaml
from collections import deque
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from datetime import datetime, timezone
from itertools import count as itercount
from loguru import logger
from PySide6 import __version__ as pyside6version
from PySide6.QtCore import (
QAbstractTableModel,
QEasingCurve,
QEvent,
QEventLoop,
QLocale,
QMargins,
QModelIndex,
QObject,
QPropertyAnimation,
Qt,
QThread,
QTimer,
QTranslator,
QUrl,
Signal,
Slot
)
from PySide6.QtGui import (
QAction,
QColor,
QCursor,
# QDoubleValidator,
QIcon,
# QImage,
QKeyEvent,
QLinearGradient,
QPainter,
QPixmap,
QFont
)
from PySide6.QtNetwork import (
QAbstractSocket,
QNetworkAccessManager,
QNetworkInformation,
QNetworkReply,
QNetworkRequest
)
from PySide6.QtPdf import QPdfDocument
from PySide6.QtPdfWidgets import QPdfView
from PySide6.QtWebSockets import QWebSocket
from PySide6.QtWidgets import (
QAbstractItemView,
QApplication,
QDialog,
QDialogButtonBox,
QFileDialog,
QFormLayout,
QFrame,
QGraphicsOpacityEffect,
QGridLayout,
QHeaderView,
QHBoxLayout,
# QInputDialog,
QLabel,
QLineEdit,
QMainWindow,
QMenu,
QMessageBox,
QSplashScreen,
QPushButton,
QSizePolicy,
QSystemTrayIcon,
QTableView,
QTabWidget,
QTextEdit,
QToolButton,
QToolBar,
QVBoxLayout,
QWidget
)
from queue import Queue
from solders.account_decoder import (
UiAccountEncoding,
UiDataSliceConfig
)
from solders.commitment_config import CommitmentLevel
from solders.hash import Hash
from solders.keypair import Keypair
from solders.message import VersionedMessage
from solders.pubkey import Pubkey
from solders.rpc.config import (
RpcAccountInfoConfig,
RpcBlockConfig,
RpcBlockProductionConfig,
RpcBlockProductionConfigRange,
RpcBlockSubscribeConfig,
RpcBlockSubscribeFilter,
RpcBlockSubscribeFilterMentions,
RpcContextConfig, RpcEpochConfig,
RpcGetVoteAccountsConfig,
RpcLargestAccountsFilter,
RpcLeaderScheduleConfig,
RpcProgramAccountsConfig,
RpcRequestAirdropConfig,
RpcSendTransactionConfig,
RpcSignaturesForAddressConfig,
RpcSignatureStatusConfig,
RpcSignatureSubscribeConfig,
RpcSimulateTransactionConfig,
RpcSupplyConfig,
RpcTokenAccountsFilterMint,
RpcTokenAccountsFilterProgramId,
RpcTransactionConfig,
RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
RpcTransactionLogsFilterMentions)
from solders.rpc.filter import Memcmp
from solders.rpc.requests import (
AccountSubscribe,
AccountUnsubscribe,
BlockSubscribe,
BlockUnsubscribe,
GetAccountInfo,
GetBalance,
GetBlock,
GetBlockCommitment,
GetBlockHeight,
GetBlockProduction,
GetBlockTime,
GetBlocks,
GetBlocksWithLimit,
GetClusterNodes,
GetEpochInfo,
GetEpochSchedule,
GetFeeForMessage,
GetFirstAvailableBlock,
GetGenesisHash,
GetHealth,
GetHighestSnapshotSlot,
GetIdentity,
GetInflationGovernor,
GetInflationRate,
GetInflationReward,
GetLargestAccounts,
GetLatestBlockhash,
GetLeaderSchedule,
GetMaxRetransmitSlot,
GetMaxShredInsertSlot,
GetMinimumBalanceForRentExemption,
GetMultipleAccounts,
GetProgramAccounts,
GetRecentPerformanceSamples,
GetSignaturesForAddress,
GetSignatureStatuses,
GetSlot,
GetSlotLeader,
GetSlotLeaders,
GetSupply,
GetTokenAccountBalance,
GetTokenAccountsByDelegate,
GetTokenAccountsByOwner,
GetTokenLargestAccounts,
GetTokenSupply,
GetTransaction,
GetTransactionCount,
GetVersion,
GetVoteAccounts,
IsBlockhashValid,
LogsSubscribe,
LogsUnsubscribe,
MinimumLedgerSlot,
ProgramSubscribe,
ProgramUnsubscribe,
RequestAirdrop,
RootSubscribe,
RootUnsubscribe,
SendRawTransaction,
SignatureSubscribe,
SignatureUnsubscribe,
SlotSubscribe,
SlotUnsubscribe,
SlotsUpdatesSubscribe,
SlotsUpdatesUnsubscribe,
VoteSubscribe,
VoteUnsubscribe,
SimulateVersionedTransaction,
SimulateLegacyTransaction
)
from solders.rpc.responses import (
LogsNotification,
# LogsNotificationResult,
SubscriptionError,
SubscriptionResult,
parse_websocket_message, GetBalanceResp, GetAccountInfoResp,
GetTokenAccountsByOwnerResp, GetTokenSupplyResp, GetProgramAccountsResp,
GetSignaturesForAddressResp, GetTokenAccountBalanceResp,
GetTokenAccountsByDelegateResp
)
from solders.signature import Signature
from solders.transaction import VersionedTransaction, Transaction
from solders.transaction_status import TransactionDetails, UiTransactionEncoding
from threading import Lock
from typing import Any, IO, List, NamedTuple, Optional, Sequence, Union
APP_NAME: str = "KhadhroonySRLPv4" # Application Name
APP_ABOUT_NAME: str = "Khadhroony Sol RLPv4 Trading App" # Application Full Name
APP_FULL_NAME: str = "Khadhroony Solana Raydium Liquidity Pool v4 Trading App" # Application Full Name
APP_DESC: str = "Khadhroony Solana Raydium Liquidity Pool v4 Trading Application" # Application Version
APP_VERSION_INFO = ('1', '0', '2') # Application Version (tuple)
APP_VERSION = '.'.join(APP_VERSION_INFO) # Application Version (str)
APP_NAME_VERSION = "Khadhroony Solana Raydium Liquidity Pool v4 Trading Application @ " + APP_VERSION
APP_LANGS = ["en", "fr", "de", "ar"] # supported languages
APP_RTL_LANGS = ["ae", "aeb", "aec", "ar", "arb", "arc", "arq", "ary", "arz", "ayl", "ckb", "dv", "fa", "glk", "he", "khw", "mzn", "ota", "otk", "pnb", "ps", "syr", "tmr", "ug", "ur"] # rtl langs
is_rtl = False # global var used to indicate if the current language select is RTL or LTR
APP_SALT = APP_NAME + " @ " + APP_VERSION_INFO[0] + "." + APP_VERSION_INFO[1] # Application Salt for wallet encrypting"""
WALLET_FILE_EXT = "kew" # Wallet files extention
WALLET_FILE_DESC = "Khadhroony Encrypted Wallet" # Wallet files description
WALLETS_FOLDER = "./wallets" # Wallet files default folder
URL_Ws_MainNet = "wss://api.mainnet-beta.solana.com" # Websocket Solana Mainnet URL
URL_Http_MainNet = "https://api.mainnet-beta.solana.com" # Http Solana Mainnet URL
URL_Ws_DevNet = "wss://api.devnet.solana.com" # Websocket Solana Devnet URL
URL_Http_DevNet = "https://api.devnet.solana.com" # Http Solana Devnet URL
""" Solana URLs
Maximum number of requests per 10 seconds per IP: 100
Maximum number of requests per 10 seconds per IP for a single RPC: 40
Maximum concurrent connections per IP: 40
Maximum connection rate per 10 seconds per IP: 40
Maximum amount of data per 30 second: 100 MB
"""
LAMPORTS_PER_SOL: int = 1_000_000_000 # Number of lamports per SOL, where 1 SOL equals 1 billion lamports.
MINT_LEN: int = 82 # Data length of a token mint account.
ACCOUNT_LEN: int = 165 # Data length of a token account.
MULTISIG_LEN: int = 355 # Data length of a multisig token account.
SYSTEM_PROGRAM_ID: str = "11111111111111111111111111111111" # Program ID for the System Program (str).
PK_SYSTEM_PROGRAM_ID = Pubkey.from_string(SYSTEM_PROGRAM_ID) # Program ID for the System Program (Pubkey).
CONFIG_PROGRAM_ID: str = "Config1111111111111111111111111111111111111" # Program ID for the Config Program (str).
PK_CONFIG_PROGRAM_ID: Pubkey = Pubkey.from_string(CONFIG_PROGRAM_ID) # Program ID for the Config Program (Pubkey).
STAKE_PROGRAM_ID: str = "Stake11111111111111111111111111111111111111" # Program ID for the Stake Program (str).
PK_STAKE_PROGRAM_ID: Pubkey = Pubkey.from_string(STAKE_PROGRAM_ID) # Program ID for the Stake Program (Pubkey).
VOTE_PROGRAM_ID: str = "Vote111111111111111111111111111111111111111" # Program ID for the Vote Program (str).
PK_VOTE_PROGRAM_ID: Pubkey = Pubkey.from_string(VOTE_PROGRAM_ID) # Program ID for the Vote Program (Pubkey).
ADDRESS_LOOKUP_TABLE_PROGRAM_ID: str = "AddressLookupTab1e1111111111111111111111111" # Program ID for the Address Lookup Table Program (str).
PK_ADDRESS_LOOKUP_TABLE_PROGRAM_ID: Pubkey = Pubkey.from_string(ADDRESS_LOOKUP_TABLE_PROGRAM_ID) # Program ID for the Address Lookup Table Program (Pubkey).
BPF_LOADER_PROGRAM_ID: str = "BPFLoaderUpgradeab1e11111111111111111111111" # Program ID for the BPF Loader Program (str).
PK_BPF_LOADER_PROGRAM_ID: Pubkey = Pubkey.from_string(BPF_LOADER_PROGRAM_ID) # Program ID for the BPF Loader Program (Pubkey).
ED25519_PROGRAM_ID: str = "Ed25519SigVerify111111111111111111111111111" # Program ID for the Ed25519 Program (str).
PK_ED25519_PROGRAM_ID: Pubkey = Pubkey.from_string(ED25519_PROGRAM_ID) # Program ID for the Ed25519 Program (Pubkey).
SECP256K1_PROGRAM_ID: str = "KeccakSecp256k11111111111111111111111111111" # Program ID for the Secp256k1 Program (str).
PK_SECP256K1_PROGRAM_ID: Pubkey = Pubkey.from_string(SECP256K1_PROGRAM_ID) # Program ID for the Secp256k1 Program (Pubkey).
ASSOCIATED_TOKEN_PROGRAM_ID: str = "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL" # Program ID for the associated token account program (str).
PK_ASSOCIATED_TOKEN_PROGRAM_ID: Pubkey = Pubkey.from_string(ASSOCIATED_TOKEN_PROGRAM_ID) # Program ID for the associated token account program (Pubkey).
TOKEN_PROGRAM_ID: str = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" # Public key that identifies the SPL token program (str).
PK_TOKEN_PROGRAM_ID: Pubkey = Pubkey.from_string(TOKEN_PROGRAM_ID) # Public key that identifies the SPL token program (Pubkey).
TOKEN_2022_PROGRAM_ID: str = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb" # Public key that identifies the SPL token 2022 program (str).
PK_TOKEN_2022_PROGRAM_ID: Pubkey = Pubkey.from_string(TOKEN_2022_PROGRAM_ID) # Public key that identifies the SPL token 2022 program (Pubkey).
WRAPPED_SOL_MINT: str = "So11111111111111111111111111111111111111112" # Public key of the "Native Mint" for wrapping SOL to SPL token (str).
""" The Token Program can be used to wrap native SOL. Doing so allows native SOL to be treated like any
other Token program token type and can be useful when being called from other programs that interact
with the Token Program's interface. """
PK_WRAPPED_SOL_MINT: Pubkey = Pubkey.from_string(WRAPPED_SOL_MINT) # Public key of the "Native Mint" for wrapping SOL to SPL token (Pubkey).
""" The Token Program can be used to wrap native SOL. Doing so allows native SOL to be treated like any
other Token program token type and can be useful when being called from other programs that interact
with the Token Program's interface. """
MEMO_PROGRAM_ID: str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr" # Public key that identifies the Memo program (str).
PK_MEMO_PROGRAM_ID: Pubkey = Pubkey.from_string(MEMO_PROGRAM_ID) # Public key that identifies the Memo program (Pubkey).
RaydiumLPV4: str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8" # Public key that identifies the Raydium Liquidity Pool V4 (str)."""
PK_RaydiumLPV4: Pubkey = Pubkey.from_string(RaydiumLPV4) # Public key that identifies the Raydium Liquidity Pool V4 (Pubkey).
RPC_WS_REQ = ["accountSubscribe", "accountUnsubscribe", "blockSubscribe", "blockUnsubscribe", "logsSubscribe", "logsUnsubscribe", "programSubscribe", "programUnsubscribe", "rootSubscribe", "rootUnsubscribe", "signatureSubscribe", "signatureUnsubscribe", "slotSubscribe", "slotUnsubscribe", "slotsUpdatesSubscribe", "slotsUpdatesUnsubscribe", "voteSubscribe", "voteUnsubscribe"] # RPC WebSocket Requests
RPC_HTTP_REQ = ["getAccountInfo", "getBalance", "getBlock", "getBlockCommitment", "getBlockHeight", "getBlockProduction", "getBlockTime", "getBlocks", "getBlocksWithLimit", "getClusterNodes", "getEpochInfo", "getEpochSchedule", "getFeeForMessage", "getFirstAvailableBlock", "getGenesisHash", "getHealth", "getHighestSnapshotSlot", "getIdentity", "getInflationGovernor", "getInflationRate", "getInflationReward", "getLargestAccounts", "getLatestBlockhash", "getLeaderSchedule", "getMaxRetransmitSlot", "getMaxShredInsertSlot", "getMinimumBalanceForRentExemption", "getMultipleAccounts", "getProgramAccounts", "getRecentPerformanceSamples", "getRecentPrioritizationFees", "getSignatureStatuses", "getSignaturesForAddress", "getSlot", "getSlotLeader", "getSlotLeaders", "getStakeMinimumDelegation", "getSupply", "getTokenAccountBalance", "getTokenAccountsByDelegate", "getTokenAccountsByOwner", "getTokenLargestAccounts", "getTokenSupply", "getTransaction", "getTransactionCount", "getVersion", "getVoteAccounts", "isBlockhashValid", "minimumLedgerSlot", "requestAirdrop", "sendTransaction", "simulateTransaction"] # RPC Http Requests
YAML_CONFIG_FILE = "KhadhroonySRLPv4.yaml" # Yaml config file
DEFAULT_YAML_CONFIG = {
""" default Yaml config """
"lastFile": "",
"lastFiles": [],
"defaultLang": "en",
"lastWidth": 1024,
"lastHeight": 768,
"lastMaximized": True
}
def load_yaml_app_config():
""" Load the configuration file or create a default one. """
if not os.path.exists(YAML_CONFIG_FILE):
logger.debug(f"{YAML_CONFIG_FILE} not found, creating new one")
with open(YAML_CONFIG_FILE, "w") as f:
yaml.dump(DEFAULT_YAML_CONFIG, f)
return DEFAULT_YAML_CONFIG
with open(YAML_CONFIG_FILE, "r") as f:
data = yaml.safe_load(f)
logger.debug(f"{YAML_CONFIG_FILE} safe loaded")
# Data validation and completion
for key, val in DEFAULT_YAML_CONFIG.items():
if key not in data:
logger.debug(f"{key} not found")
data[key] = val
elif not isinstance(data[key], type(val)):
logger.debug(f"{key} wrong type : wanted {type(val)}, found {type(data[key])}")
data[key] = val
return data
def save_yaml_app_config(cfg):
""" Save the configuration to the file. """
with open(YAML_CONFIG_FILE, "w") as f:
yaml.dump(cfg, f)
def create_wallets_folder():
""" Checks if the wallets directory exists and creates it if not. """
if not os.path.exists(WALLETS_FOLDER):
os.makedirs(WALLETS_FOLDER)
def derive_key(password: str, salt: bytes) -> bytes:
""" Derive a symmetric key from the given password and salt.
Args:
password (str): The password to derive the key from.
salt (bytes): A random salt for the key derivation.
Returns:
bytes: The derived key.
"""
kdf = PBKDF2HMAC(
algorithm = hashes.SHA256(),
length = 32,
salt = salt,
iterations = 100_000,
backend = default_backend(),
)
return base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8")))
def encrypt_string(password: str, salt: bytes, clear_txt: str) -> str:
""" Encrypts a string using a password.
Args:
password (str): The password to derive the encryption key.
salt (bytes): A salt for the key derivation.
clear_txt (str): The clear string to encrypt.
Returns:
tuple: containing the salt and the encrypted string.
"""
key = derive_key(password, salt) # Derive the encryption key
fernet = Fernet(key)
encrypted_text = fernet.encrypt(clear_txt.encode("utf-8"))
return encrypted_text.decode()
def decrypt_string(password: str, salt: bytes, encrypted_txt: Union[bytes, str]) -> str:
""" Decrypts an encrypted string using the given password and salt.
Args:
password (str): The password to derive the decryption key.
salt (bytes): The salt used during encryption.
encrypted_txt (Union[bytes, str]): The encrypted string to decrypt.
Returns:
str: The decrypted string.
"""
enc_text = encrypted_txt
if isinstance(encrypted_txt, str):
enc_text = encrypted_txt.encode("utf-8")
key = derive_key(password, salt) # Derive the decryption key
fernet = Fernet(key)
return fernet.decrypt(enc_text).decode()
def center_on_screen(widget: QWidget):
""" Centers the window on the active monitor (the one where the cursor is present). """
screen = QApplication.screenAt(QCursor().pos()) # Get Active Monitor
if not screen:
screen = QApplication.primaryScreen() # By default, use the main screen
screen_geometry = screen.availableGeometry() # Screen dimensions
widget_geometry = widget.frameGeometry()
# Calculating coordinates to center the window
x = screen_geometry.x() + (screen_geometry.width() - widget_geometry.width()) // 2
y = screen_geometry.y() + (screen_geometry.height() - widget_geometry.height()) // 2
widget.move(x, y)
def set_app_dir(lang_code: str):
""" Set the application layout direction depending of the language code.
Args:
lang_code (str): The language code.
"""
global is_rtl
if lang_code in APP_RTL_LANGS:
is_rtl = True
else:
is_rtl = False
def get_qt_dir() -> Qt.LayoutDirection:
""" get the Qt layout direction depending on is_rtl.
Returns:
Qt.LayoutDirection: Enum LTR/RTL.
"""
global is_rtl
return Qt.LayoutDirection.RightToLeft if is_rtl else Qt.LayoutDirection.LeftToRight
def get_qt_align_center() -> Qt.AlignmentFlag:
""" return the Qt alignment center. """
return Qt.AlignmentFlag.AlignCenter
def get_qt_align_vchl(rtl: bool = False) -> Qt.AlignmentFlag:
""" return the Qt alignment vertical center and to left if ltr or to right if rtl. """
if rtl:
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignRight
else:
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignLeft
def get_qt_align_vchr(rtl: bool = False) -> Qt.AlignmentFlag:
""" return the Qt alignment vertical center and to right if ltr or to left if rtl. """
if rtl:
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignLeft
else:
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignRight
def get_qt_align_hcenter() -> Qt.AlignmentFlag:
""" return the Qt alignment to horizontal center. """
return Qt.AlignmentFlag.AlignHCenter
def get_qt_align_hl(rtl: bool = False) -> Qt.AlignmentFlag:
""" return the Qt alignment to horizontal left if ltr or to horizontal right if rtl. """
if rtl:
return Qt.AlignmentFlag.AlignRight
else:
return Qt.AlignmentFlag.AlignLeft
def get_qt_align_hr(rtl: bool = False) -> Qt.AlignmentFlag:
""" return the Qt alignment to horizontal right if ltr or to horizontal left if rtl. """
if rtl:
return Qt.AlignmentFlag.AlignLeft
else:
return Qt.AlignmentFlag.AlignRight
class LockedFile:
""" A class for handling files with exclusive locking.
Use portalocker to avoid concurrent access. """
def __init__(self, file_path: str, mode: str = "r"):
""" Initializes a Lock.
Args:
file_path (str): Chemin vers le fichier à verrouiller.
mode (str): Mode d'ouverture du fichier ('r', 'w', 'a', etc.).
"""
self._file = None
self._filePath = file_path
self._mode = mode
def __enter__(self) -> IO[Any]:
""" Opens and locks the file.
Returns:
IO[Any]: file stream
"""
self._file = open(file = self._filePath, mode = self._mode)
# Application of an exclusive lock
portalocker.lock(self._file, portalocker.LOCK_EX)
return self._file
def __exit__(self, exc_type, exc_val, exc_tb):
""" Releases the lock and closes the file. """
try:
portalocker.unlock(self._file)
finally:
self._file.close()
class Map:
def __init__(self):
""" Initializes an internal dictionary to store items. """
self._data = {}
def put(self, key, value):
""" Adds or updates an entry in the map.
Args:
key: the unique key of the map
value: the value associated to the key
"""
previous_value = self._data.get(key)
self._data[key] = value
return previous_value
def get(self, key) -> Any | None:
""" Retrieves the value associated with a key, or None if the key does not exist.
Args:
key: the key to check
Returns:
Any | None: the value associated with a key or None
"""
return self._data.get(key)
def remove(self, key) -> Any | None:
""" Deletes a key and returns its value, or None if the key does not exist.
Args:
key: the key to remove
Returns:
Any | None: the value associated with a key or None
"""
return self._data.pop(key, None)
def contains_key(self, key) -> bool:
""" Checks if a key is present in the map.
Args:
key: the key to check
Returns:
bool: True if exists
"""
return key in self._data
def contains_value(self, value) -> bool:
""" Checks if a value is present in the map.
Args:
value: the value to check
Returns:
bool: True if exists
"""
return value in self._data.values()
def size(self) -> int:
""" Returns the number of elements in the map.
Returns:
int: number of elements in the map
"""
return len(self._data)
def is_empty(self) -> bool:
""" Check if the map is empty.
Returns:
bool: True if the map is empty, False otherwise.
"""
return len(self._data) == 0
def keys(self):
""" Returns an iterator over the keys of the map. """
return iter(self._data.keys())
def values(self):
""" Returns an iterator over the values in the map. """
return iter(self._data.values())
def items(self):
""" Returns an iterator over the key-value pairs in the map. """
return iter(self._data.items())
def clear(self):
""" Removes all elements from the map. """
self._data.clear()
def __repr__(self):
""" Readable representation of the map. """
return f"Map({self._data})"
class DataSliceOpts(NamedTuple):
""" Option to limit the returned account data, only available for "base58" or "base64" encoding.
Attributes:
offset (int): Limit the returned account data using the provided offset: <usize>.
length (int): Limit the returned account data using the provided length: <usize>.
"""
offset: int
length: int
class MemcmpOpts(NamedTuple):
""" Option to compare a provided series of bytes with program account data at a particular offset.
Attributes:
offset (int): Offset into program account data to start comparison: <usize>.
bytes (str): Data to match, as base-58 encoded string: <string>.
"""
offset: int
bytes: str
class TokenAccountOpts(NamedTuple):
""" Options when querying token accounts. Provide one of mint or programId.
Attributes:
mint (Optional[Pubkey]): Public key of the specific token Mint to limit accounts to.
programId (Optional[Pubkey]): Public key of the Token program ID that owns the accounts.
encoding (UiAccountEncoding): Encoding for Account data, either "base58" (slow) or "base64".
dataSlice (Optional[DataSliceOpts]): Option to limit the returned account data, only available for "base58" or "base64" encoding.
"""
mint: Optional[Pubkey] = None
programId: Optional[Pubkey] = None
encoding: UiAccountEncoding = UiAccountEncoding.Base64
dataSlice: Optional[DataSliceOpts] = None
class TxOpts(NamedTuple):
""" Options to specify when broadcasting a transaction.
Attributes:
skipConfirmation (bool): If false, `sendTransaction` will try to confirm that the transaction was successfully broadcasted. When confirming a transaction, `sendTransaction` will block for a maximum of 30 seconds.
skipPreflight (bool]): If true, skip the preflight transaction checks.
prefLightCommitment (CommitmentLevel): Commitment level to use for preflight. Default to CommitmentLevel.Finalized.
prefEncoding (UiTransactionEncoding): Encoding to use. Default to UiTransactionEncoding.Base64.
maxRetries (Optional[int]): Maximum number of times for the RPC node to retry sending the transaction to the leader. If this parameter not provided, the RPC node will retry the transaction until it is finalized or until the blockhash expires.
lastValidBlockHeight (Optional[int]): Pass the latest valid block height here, to be consumed by confirm_transaction. Valid only if skip_confirmation is False.
"""
skipConfirmation: bool = True
skipPreflight: bool = False
prefLightCommitment: CommitmentLevel = CommitmentLevel.Finalized
prefEncoding: UiTransactionEncoding = UiTransactionEncoding.Base64
maxRetries: Optional[int] = None
lastValidBlockHeight: Optional[int] = None
class SharedCounter:
""" Class for a self-incrementing, thread safe, shared counter. """
_counter = itercount() # Shared counter
_lock = Lock() # Lock to ensure thread safety
@classmethod
def reset_id(cls):
""" Resets the counters to their initial value. Thread-safe through the use of a lock. """
with cls._lock:
cls._counter = itercount()
@classmethod
def get_next_id(cls):
""" Returns the next unique identifier. Thread-safe due to the use of a lock. """
with cls._lock:
return next(cls._counter) + 1
class WsError:
""" Websocket error """
def __init__(self, code: QAbstractSocket.SocketError, body: str):
self.code = code
self.body = body
class NetworkReplyError:
""" HTTP Network Error Reply """
def __init__(self, code: QNetworkReply.NetworkError, body: str):
self.code = code
self.body = body
class ResponseError:
""" Http Response with code error between 400 and 600 """
def __init__(self, code: int, headers: dict, body: str, resp_size: int):
self.code = code
self.headers = headers
self.body = body
self.size = resp_size
class ResponseOk:
""" Http Response Ok with size """
def __init__(self, body: str, resp_size: int):
self.body = body
self.size = resp_size
class SimpleSignal(QObject):
""" Object emitting a Signal """
_sig = Signal()
def signal(self):
""" emit Signal """
self._sig.emit()
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class StrSignal(QObject):
""" Object emitting a Signal """
_sig = Signal(str)
def signal(self, data_str: str):
""" emit Signal """
self._sig.emit(data_str)
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class SocketStateSignal(QObject):
""" Object emitting a Signal """
_sig = Signal(QAbstractSocket.SocketState)
def signal(self, data_qasss: QAbstractSocket.SocketState):
""" emit Signal """
self._sig.emit(data_qasss)
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class SocketErrorSignal(QObject):
""" Object emitting a Signal """
_sig = Signal(WsError)
def signal(self, ws_err: WsError):
""" emit Signal """
self._sig.emit(ws_err)
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class NetworkReplyErrorSignal(QObject):
""" Object emitting a Signal """
_sig = Signal(NetworkReplyError)
def signal(self, reply: NetworkReplyError):
""" emit Signal """
self._sig.emit(reply)
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class ResponseErrorSignal(QObject):
""" Object emitting a Signal """
_sig = Signal(ResponseError)
def signal(self, resp: ResponseError):
""" emit Signal """
self._sig.emit(resp)
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class ResponseSuccessSignal(QObject):
""" Object emitting a Signal """
_sig = Signal(ResponseOk)
def signal(self, resp: ResponseOk):
""" emit Signal """
self._sig.emit(resp)
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class DictSignal(QObject):
""" Object emitting a Signal """
_sig = Signal(dict)
def signal(self, data: dict):
""" emit Signal """
self._sig.emit(data)
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
""" connect Signal """
self._sig.connect(slot = slot_obj, type = con_type)
def disc(self):
""" disconnect Signal """
self._sig.disconnect()
class WebSocketClient(QThread):
"""Runnable WebSocket Client (wrap for a QWebSocket) """
def __init__(self, base_url: str, max_retry: int = 3, reconnect_in: int = 2000, parent = None):
super().__init__(parent)
self.socketStateSig = SocketStateSignal() # Signal avec état de connexion
self.socketErrorSig = SocketErrorSignal() # Signal avec erreur de connexion
self.logSig = StrSignal() # Signal avec log
self.responseOkSig = StrSignal() # Signal pour notifier la reception de nouveaux messages
self._baseUrl = base_url
self._maxRetry = max(max_retry, 1)
self._reconnectIn = max(reconnect_in, 1000)
self._attempts = 0
self._websocket = QWebSocket()
self._websocket.connected.connect(self._on_connected)
self._websocket.disconnected.connect(self._on_disconnected)
self._websocket.errorOccurred.connect(self._on_error)
self._websocket.textMessageReceived.connect(self._on_message)
self._threadRunning = False
def _connect(self):
""" Opens the connection and attempts reconnections if unsuccessful. """
if self._threadRunning:
if self._attempts != 0:
self.logSig.signal("New connexion attempt")
self._websocket.open(QUrl(self._baseUrl))
def _on_connected(self):
""" Connection successful handling. """
self._attempts = 0 # Reset reconnection attempts
self.socketStateSig.signal(self._websocket.state()) # Emit websocket state
def _on_disconnected(self):
""" Disconnection handling. """
self.socketStateSig.signal(self._websocket.state()) # Emit websocket state
if self._threadRunning and self._attempts < self._maxRetry:
self._attempts += 1
QTimer.singleShot(self._reconnectIn, self._connect) # Wait "self._reconnectIn" seconds before trying again
elif self._threadRunning:
self.logSig.signal("Maximum reconnection attempts reached.") # Log signal
self._threadRunning = False
def _on_error(self, code: QAbstractSocket.SocketError):
""" Error handling. """
ws_err = WsError(code, self._websocket.errorString())
self.socketErrorSig.signal(ws_err) # error
def _on_message(self, resp: str):
""" Management of received messages. """
self.responseOkSig.signal(resp)
def run(self):
""" QThread Run """
self.exec()
def get_socket_state(self) -> QAbstractSocket.SocketState:
""" Get the current state of the websocket. """
return self._websocket.state()
def open(self) -> bool:
""" Opens the WebSocket connection. """
if self._threadRunning:
self.logSig.signal("WebSocket is already connected.") # if this happend thant something is wrong in code logic
return False
self.socketStateSig.signal(QAbstractSocket.SocketState.ConnectingState)
self._threadRunning = True
self._attempts = 0
self._connect()
self.start()
return True
def send_msg(self, message: Union[bytes, str]) -> bool:
""" Send an msg to Websocket server """
if self._websocket.state() == QAbstractSocket.SocketState.ConnectedState:
if isinstance(message, bytes):
self._websocket.sendBinaryMessage(message)
else:
self._websocket.sendTextMessage(message)
return True
return False
def close(self):
""" Close connexion WebSocket connexion. """
self._threadRunning = False
self._attempts = 0
if self._websocket.state() != QAbstractSocket.SocketState.UnconnectedState:
self._websocket.close()
self.quit()
self.wait()
class HttpPostClient(QObject):
""" Sync Client for HTTP POST Request """
def __init__(self, base_url: str, parent = None):
super().__init__(parent)
self._baseUrl = base_url
self._defaultHeader = "application/json"
self._networkManager = QNetworkAccessManager()
@staticmethod
def _process_reply(reply: QNetworkReply) -> Union[NetworkReplyError, ResponseError, ResponseOk]:
""" Sync Process Reply """
if reply.error() != QNetworkReply.NetworkError.NoError:
rep_err = NetworkReplyError(reply.error(), reply.errorString())
reply.deleteLater()
return rep_err
status_code = reply.attribute(QNetworkRequest.Attribute.HttpStatusCodeAttribute)
headers = {
(k.data() if hasattr(k, "data") else k).decode():
(v.data() if hasattr(v, "data") else v).decode()
for k, v in reply.rawHeaderPairs()
}
body = reply.readAll().data().decode()
size = len(body.encode('utf-8')) + sum(len(k.encode('utf-8')) + len(v.encode('utf-8')) + 4 for k, v in headers.items()) + 2 # Include ": " and "\r\n"
if 400 <= status_code < 600:
reply.deleteLater()
return ResponseError(status_code, headers, body, size)
else:
reply.deleteLater()
return ResponseOk(body, size)
def send_post(self, body: Union[bytes, str]) -> Union[NetworkReplyError, ResponseError, ResponseOk]:
""" Send HTTP POST Request """
if isinstance(body, str):
body = body.encode("utf-8")
request = QNetworkRequest(self._baseUrl)
request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, self._defaultHeader)
reply = self._networkManager.post(request, body)
while not reply.isFinished():
self._networkManager.thread().eventDispatcher().processEvents(QEventLoop.ProcessEventsFlag.AllEvents)
return self._process_reply(reply)
class AsyncHttpPostClient(QObject):
""" Async Client for HTTP POST Request """
def __init__(self, base_url: str, parent = None):
super().__init__(parent)
self.requestSentSig = SimpleSignal()
self.netRepErrorSig = NetworkReplyErrorSignal()
self.responseErrorSig = ResponseErrorSignal()
self.responseOkSig = ResponseSuccessSignal()
self._baseUrl = base_url
self._defaultHeader = "application/json"
self._networkManager = QNetworkAccessManager()
def _process_reply(self, reply: QNetworkReply):
""" Async Process Reply """
if reply.error() != QNetworkReply.NetworkError.NoError:
self.netRepErrorSig.signal(NetworkReplyError(reply.error(), reply.errorString()))
else:
status_code = reply.attribute(QNetworkRequest.Attribute.HttpStatusCodeAttribute)
headers = {
(k.data() if hasattr(k, "data") else k).decode():
(v.data() if hasattr(v, "data") else v).decode()
for k, v in reply.rawHeaderPairs()
}
body = reply.readAll().data().decode()
size = len(body.encode('utf-8')) + sum(len(k.encode('utf-8')) + len(v.encode('utf-8')) + 4 for k, v in headers.items()) + 2 # Include ": " and "\r\n"
if 400 <= status_code < 600:
self.responseErrorSig.signal(ResponseError(status_code, headers, body, size))
else:
self.responseOkSig.signal(ResponseOk(body, size))
reply.deleteLater()
def send_post(self, body: Union[str, bytes]):
""" Send HTTP Post Request without waiting for Response. """
if isinstance(body, str):
body = body.encode("utf-8")
request = QNetworkRequest(QUrl(self._baseUrl))
request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, self._defaultHeader)
reply = self._networkManager.post(request, body)
self.requestSentSig.signal()
reply.finished.connect(lambda: self._process_reply(reply))
class ConnectionPool(QObject):
""" Client Pool with 1 fixed websocket, a minimum of 1 websocket and 1 httpclient in Pool and maxi 21 clients (ws+http) in Pool. """
def __init__(self, ws_url: str, http_url: str, num_ws_clients: int = 1, num_async_http_clients: int = 2, num_sync_http_clients: int = 2, parent = None):
super().__init__(parent)
self._num_ws_clients = max(num_ws_clients, 1)
self._num_async_http_clients = max(num_async_http_clients, 1)
self._num_sync_http_clients = max(num_sync_http_clients, 1)
# Ensure the total number of clients does not exceed 21
total_clients = self._num_ws_clients + self._num_async_http_clients + self._num_sync_http_clients
if total_clients > 21:
# Calculate the reduction factor
reduction_factor = 21 / total_clients
self._num_ws_clients = max(1, int(self._num_ws_clients * reduction_factor))
self._num_async_http_clients = max(1, int(self._num_async_http_clients * reduction_factor))
self._num_sync_http_clients = max(1, int(self._num_sync_http_clients * reduction_factor))
# Adjust further if necessary
while self._num_ws_clients + self._num_async_http_clients + self._num_sync_http_clients > 21:
if self._num_sync_http_clients > 1:
self._num_sync_http_clients -= 1
elif self._num_ws_clients > 1:
self._num_ws_clients -= 1
else:
self._num_async_http_clients -= 1
# Clients list
self._wsMainClient = WebSocketClient(ws_url, 3, 2000)
self._wsClients = [WebSocketClient(ws_url, 3, 2000) for _ in range(self._num_ws_clients)]
self._connectedWsClients = [] # List of connected WebSocket clients
self._asyncHttpClients = [AsyncHttpPostClient(http_url) for _ in range(self._num_async_http_clients)]
self._syncHttpClients = [HttpPostClient(http_url) for _ in range(self._num_sync_http_clients)]
# Signals list
self.logSig = StrSignal() # new log Signal
self.wsMainConnexionStatusSig = SocketStateSignal() # connection status change for _wsMainClient
self.wsConnexionStatusSigs = [] # connection status change for _wsClients[i]
for _ in range(self._num_ws_clients):
self.wsConnexionStatusSigs.append(SocketStateSignal())
self.httpPostSentSigs = [] # new msg sent via httpClient
self.httpReplyReceivedSigs = [] # nouveau message via httpClient
for _ in range(self._num_async_http_clients):
self.httpPostSentSigs.append(SimpleSignal())
self.httpReplyReceivedSigs.append(SimpleSignal())
self.wsMainReplyReceivedSig = SimpleSignal() # new message received and stored for _wsMainClient
self.wsReplyReceivedSig = SimpleSignal() # new message received and stored for _wsClients[i]
self.httpReplyReceivedSig = SimpleSignal() # new message received and stored for _asyncHttpClients[i]
# connecting clients signals
self._wsMainClient.socketStateSig.conn(self._handle_ws_main_connect_status)
self._wsMainClient.socketErrorSig.conn(self._handle_ws_main_error)
self._wsMainClient.logSig.conn(self._handle_ws_main_log)
self._wsMainClient.responseOkSig.conn(self._handle_ws_main_new_msg)
for idx, client in enumerate(self._wsClients):
client.socketStateSig.conn(self._handle_ws_connect_status(idx))
client.socketErrorSig.conn(self._handle_ws_error(idx))
client.logSig.conn(self._handle_ws_log(idx))
client.responseOkSig.conn(self._handle_ws_new_msg())
for idx, client in enumerate(self._asyncHttpClients):
client.requestSentSig.conn(self._handle_async_http_msg_sent(idx))
client.netRepErrorSig.conn(self._handle_async_http_net_error(idx))
client.responseErrorSig.conn(self._handle_async_http_resp_err(idx))
client.responseOkSig.conn(self._handle_async_http_new_msg(idx))
self._wsMainReceivedMessages = deque(maxlen = 1000) # FIFO to store messages received by _wsMainClient
self._wsReceivedMessages = deque(maxlen = 1000) # FIFO to store messages received by _wsClients[i]
self._httpReceivedMessages = deque(maxlen = 1000) # FIFO to store messages received by _asyncHttpClients[i]
# Initialize request message queues
self._wsMainQueue = Queue()
self._wsQueue = Queue()
self._asyncHttpQueue = Queue()
# Counters Limits
self._maxRequestsPer10s = 100
self._maxRpcRequestsPer10s = 40
self._maxDataPer30s = 100 * 1024 * 1024 # 100 MB
# Counters
self._currentRequestCount = 0
self._currentDataSize = 0
self._currentRpcCounters = {method: 0 for method in (RPC_WS_REQ + RPC_HTTP_REQ)} # Specific counters for each RPC method
self._lock = Lock() # Lock for synchronization
self._connected = False # loop indicator in progress or stopped
# Timers
self._timer_reset_counters = QTimer()
self._timer_reset_counters.timeout.connect(self._reset_counters)
self._timer_process_queues = QTimer()
self._timer_process_queues.timeout.connect(self._process_queues)
def _reset_counters(self):
""" Resets the counters periodically. """
with self._lock:
self._currentRequestCount = 0
self._currentDataSize = 0
for key in self._currentRpcCounters:
self._currentRpcCounters[key] = 0
def _process_queues(self):
""" Attempts to process queues """
while not self._wsMainQueue.empty():
method, body = self._wsMainQueue.get()
self.send_websocket_main_request(method, body)
while not self._wsQueue.empty():
method, body = self._wsQueue.get()
self.send_websocket_request(method, body)
while not self._asyncHttpQueue.empty():
method, body = self._asyncHttpQueue.get()
self.send_async_http_post(method, body)
def _check_limits(self, rpc_method: str, size: int = 0) -> bool:
""" Checks if a new/stored request can be sent using counters limits. """
with self._lock:
if self._currentRequestCount >= self._maxRequestsPer10s:
return False
if self._currentRpcCounters.get(rpc_method, 0) >= self._maxRpcRequestsPer10s:
return False
if self._currentDataSize + size > self._maxDataPer30s:
return False
return True
def _handle_ws_main_connect_status(self, value: QAbstractSocket.SocketState):
""" Forward self._wsMainClient.socketStateSig """
if value == QAbstractSocket.SocketState.ConnectedState:
self._connected = True
else:
self._connected = False
self.wsMainConnexionStatusSig.signal(value)
def _handle_ws_connect_status(self, index: int):
""" Forward _wsMainClient.socketStateSig """
def handler(value: QAbstractSocket.SocketState):
""" handler for Signal """
if value == QAbstractSocket.SocketState.ConnectedState:
if self._wsClients[index] not in self._connectedWsClients:
self._connectedWsClients.append(self._wsClients[index])
else:
if self._wsClients[index] in self._connectedWsClients:
self._connectedWsClients.remove(self._wsClients[index])
self.wsConnexionStatusSigs[index].signal(value)
return handler
def _handle_ws_main_error(self, ws_error: WsError):
""" Handle self._wsMainClient.socketErrorSig """
logger.error(f"[main] {int(ws_error.code.value)} - {ws_error.body}")
self.logSig.signal(ws_error.body)
def _handle_ws_error(self, index: int):
""" Handle _wsClients[index].socketStateSig """
def handler(ws_error: WsError):
""" handler for Signal """
logger.error(f"[{index}] {int(ws_error.code.value)} - {ws_error.body}")
self.logSig.signal(ws_error.body)
return handler
def _handle_ws_main_log(self, body: str):
""" Handle self._wsMainClient.logSig """
logger.debug(f"[main] {body}")
self.logSig.signal(body)
def _handle_ws_log(self, index: int):
""" Handle _wsClients[index].logSig """
def handler(body: str):
""" handler for Signal """
logger.debug(f"[{index}] {body}")
self.logSig.signal(body)
return handler
def _handle_ws_main_new_msg(self, resp: str):
""" Handle self._wsMainClient.responseOkSig """
timestamp = datetime.now()
self._wsMainReceivedMessages.append((timestamp, resp))
self._currentDataSize += len(resp.encode("utf-8"))
self.wsMainReplyReceivedSig.signal()
def _handle_ws_new_msg(self):
""" Handle _wsClients[index].responseOkSig """
def handler(resp: str):
""" handler for Signal """
timestamp = datetime.now()
self._wsReceivedMessages.append((timestamp, resp))
self._currentDataSize += len(resp.encode("utf-8"))
self.wsReplyReceivedSig.signal()
return handler
def _handle_async_http_msg_sent(self, index: int):
""" Handle _asyncHttpClients[index].requestSentSig """
def handler():
""" handler for Signal """
self.httpPostSentSigs[index].signal()
return handler
def _handle_async_http_net_error(self, index: int):
""" Handle _asyncHttpClients[index].netRepErrorSig """
def handler(reply: NetworkReplyError):
""" handler for Signal """
logger.error(f"[{index}] {int(reply.code.value)} - {reply.body}")
self.logSig.signal(reply.body)
return handler
def _handle_async_http_resp_err(self, index: int):
""" Handle _asyncHttpClients[index].responseErrorSig """
def handler(reply: ResponseError):
""" handler for Signal """
logger.error(f"[{index}] {reply.code} - {reply.body}")
self.logSig.signal(reply.body)
return handler
def _handle_async_http_new_msg(self, index: int):
""" Handle _asyncHttpClients[index].logSig """
def handler(reply: ResponseOk):
""" handler for Signal """
timestamp = datetime.now()
self._httpReceivedMessages.append((timestamp, reply.body))
self._currentDataSize += reply.size
self.httpReplyReceivedSigs[index].signal()
self.httpReplyReceivedSig.signal()
return handler
def _start(self):
""" Start the Timers """
self._timer_reset_counters.start(100) # reset counters a font every 100ms
self._timer_process_queues.start(10) # process_queues every 10 ms
def _stop(self):
""" Stop the Timers """
self._timer_reset_counters.stop() # reset counters a font every 10ms
self._timer_process_queues.stop() # process_queues every 1 ms
def open(self):
""" Open Ws Clients connexion """
if not self._connected:
self._start()
self._wsMainClient.open()
for client in self._wsClients:
client.open()
def close(self):
""" Close Ws Clients connexion """
if self._connected:
self._connected = False
self._stop()
self._wsMainClient.close()
for client in self._wsClients:
client.close()
def send_websocket_main_request(self, rpc_method: str, message: str):
""" Sends a WebSocket message with a specific RPC method. """
if not rpc_method in RPC_WS_REQ:
# must never happen
logger.debug(f"Wrong Rpc Request {rpc_method}.")
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
size = len(message.encode("utf-8"))
if not self._check_limits(rpc_method, size) or self._wsMainClient.get_socket_state() != QAbstractSocket.SocketState.ConnectedState:
self._wsMainQueue.put((rpc_method, message)) # Queue if limits reached
return
with self._lock:
self._currentRequestCount += 1
self._currentRpcCounters[rpc_method] += 1
self._currentDataSize += size
self._wsMainClient.send_msg(message)
def send_websocket_request(self, rpc_method: str, message: str):
""" Sends a WebSocket message with a specific RPC method. """
if not rpc_method in RPC_WS_REQ:
# must never happen
logger.debug(f"Wrong Rpc Request {rpc_method}.")
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
size = len(message.encode("utf-8"))
if not self._check_limits(rpc_method, size) or not self._connectedWsClients:
self._wsQueue.put((rpc_method, message)) # Queue if limits reached
return
with self._lock:
self._currentRequestCount += 1
self._currentRpcCounters[rpc_method] += 1
self._currentDataSize += size
client = self._connectedWsClients.pop(0) # Pop and use a connected client
client.send_msg(message)
self._connectedWsClients.append(client) # Reinsert into the list of connected clients
def send_async_http_post(self, rpc_method: str, body: Union[bytes, str]):
""" Sends a HTTP Post message with a specific RPC method. """
if not rpc_method in RPC_HTTP_REQ:
# must never happen
logger.debug(f"Wrong Rpc Request {rpc_method}.")
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
size = len(body) if body else 0
if not self._check_limits(rpc_method, size):
self._asyncHttpQueue.put((rpc_method, body))
return
with self._lock:
self._currentRequestCount += 1
self._currentRpcCounters[rpc_method] += 1
self._currentDataSize += size
client = self._asyncHttpClients.pop(0)
client.send_post(body)
self._asyncHttpClients.append(client)
def send_http_post(self, rpc_method: str, body: Union[bytes, str]) -> Union[NetworkReplyError, ResponseError, ResponseOk]:
""" Sends a HTTP Post message with a specific RPC method. """
if rpc_method not in RPC_HTTP_REQ:
# must never happen
logger.debug(f"Wrong Rpc Request {rpc_method}.")
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
size = len(body) if body else 0
while True:
if self._check_limits(rpc_method, size) and self._syncHttpClients:
client = self._syncHttpClients.pop(0)
break
response = client.send_post(body)
with self._lock:
self._currentRequestCount += 1
self._currentRpcCounters[rpc_method] += 1
self._currentDataSize += size
if isinstance(response, ResponseOk) or isinstance(response, ResponseError):
self._currentDataSize += response.size
self._syncHttpClients.append(client)
# Use the first available synchronous HTTP client
return response
def get_count_ws_main_msgs(self):
""" Get count of the fifo list of messages from main websocket """
return len(self._wsMainReceivedMessages) if self._wsMainReceivedMessages else 0
def read_first_ws_main_msg(self):
""" Read and delete the first main WS message. """
return self._wsMainReceivedMessages.popleft() if self._wsMainReceivedMessages else None
def read_last_ws_main_msg(self):
""" Read and delete the last main WS message. """
return self._wsMainReceivedMessages.pop() if self._wsMainReceivedMessages else None
def get_count_ws_msgs(self):
""" Get count of the fifo list of messages from websockets """
return len(self._wsReceivedMessages) if self._wsReceivedMessages else 0
def read_first_ws_msg(self):
""" Read and delete the first WS message. """
return self._wsReceivedMessages.popleft() if self._wsReceivedMessages else None
def read_last_ws_msg(self):
""" Read and delete the last WS message. """
return self._wsReceivedMessages.pop() if self._wsReceivedMessages else None
def get_count_http_msgs(self):
""" Get count of the fifo list of messages from http clients """
return len(self._httpReceivedMessages) if self._httpReceivedMessages else 0
def read_first_http_msg(self):
""" Read and delete the first Http Async message. """
return self._httpReceivedMessages.popleft() if self._httpReceivedMessages else None
def read_last_http_msg(self):
""" Read and delete the last Http Async message. """
return self._httpReceivedMessages.pop() if self._httpReceivedMessages else None
def clean_up(self):
""" Close connexions and stop timers """
self.close()
self._stop()
class WsReqGen:
""" Generate requests for Solana Websocket client"""
@classmethod
def account_subscribe(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiAccountEncoding] = None) -> (int, str, AccountSubscribe):
""" Subscribe to an account to receive notifications when the lamports or data change.
Args:
pubkey_to_use (Pubkey): Account pubkey.
commitment_to_use (Optional[CommitmentLevel]): Commitment level.
encoding_to_use (Optional[UiAccountEncoding]): Encoding to use.
Returns:
int, str, AccountSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_account_info_config = None if commitment_to_use is None and encoding_to_use is None else RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use)
req = AccountSubscribe(pubkey_to_use, rpc_account_info_config, next_id)
return next_id, "accountSubscribe", req
@classmethod
def account_unsubscribe(cls, subscription_id: int) -> (int, str, AccountUnsubscribe):
""" Unsubscribe from transaction logging.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, AccountUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = AccountUnsubscribe(subscription_id, next_id)
return next_id, "accountUnsubscribe", req
@classmethod
def block_subscribe(cls, rpc_filer: Union[RpcBlockSubscribeFilter, RpcBlockSubscribeFilterMentions] = RpcBlockSubscribeFilter.All, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiTransactionEncoding] = None, transaction_details: Optional[TransactionDetails] = None, show_rewards: Optional[bool] = None, max_supported_transaction_version: Optional[int] = None) -> (int, str, BlockSubscribe):
""" Subscribe to blocks.
Args:
rpc_filer (Union[RpcBlockSubscribeFilter, RpcBlockSubscribeFilterMentions]): filter criteria for the blocks.
commitment_to_use (Optional[CommitmentLevel]): The commitment level to use.
encoding_to_use (Optional[UiTransactionEncoding]): Encoding to use.
transaction_details (Optional[TransactionDetails]): level of transaction detail to return.
show_rewards (Optional[bool]): whether to populate the rewards array.
max_supported_transaction_version (Optional[int] ): the max transaction version to return in responses.
Returns:
int, str, BlockSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_block_subscribe_config = None if commitment_to_use is None and encoding_to_use is None and transaction_details is None and show_rewards is None and max_supported_transaction_version is None else RpcBlockSubscribeConfig(
commitment = commitment_to_use, encoding = encoding_to_use, transaction_details = transaction_details,
show_rewards = show_rewards, max_supported_transaction_version = max_supported_transaction_version)
req = BlockSubscribe(rpc_filer, rpc_block_subscribe_config, next_id)
return next_id, "blockSubscribe", req
@classmethod
def block_unsubscribe(cls, subscription_id: int) -> (int, str, BlockUnsubscribe):
""" Unsubscribe from blocks.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, BlockSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = BlockUnsubscribe(subscription_id, next_id)
return next_id, "blockUnsubscribe", req
@classmethod
def logs_subscribe(cls, rpc_filer: Union[RpcTransactionLogsFilter, RpcTransactionLogsFilterMentions] = RpcTransactionLogsFilter.All, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, LogsSubscribe):
""" Subscribe to transaction logging.
Args:
rpc_filer (Union[RpcTransactionLogsFilter, RpcTransactionLogsFilterMentions]): filter criteria for the logs.
commitment_to_use (Optional[CommitmentLevel]): The commitment level to use.
Returns:
int, str, LogsSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_transaction_logs_config = None if commitment_to_use is None else RpcTransactionLogsConfig(commitment = commitment_to_use)
req = LogsSubscribe(rpc_filer, rpc_transaction_logs_config, next_id)
return next_id, "logsSubscribe", req
@classmethod
def logs_unsubscribe(cls, subscription_id: int) -> (int, str, LogsUnsubscribe):
""" Unsubscribe from transaction logging.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, LogsUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = LogsUnsubscribe(subscription_id, next_id)
return next_id, "logsUnsubscribe", req
@classmethod
def program_subscribe(cls, program_id: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiAccountEncoding] = None, data_slice: Optional[DataSliceOpts] = None, filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None) -> (int, str, ProgramSubscribe):
""" Receive notifications when the lamports or data for a given account owned by the program changes.
Args:
program_id (Pubkey): The program ID.
commitment_to_use (Optional[CommitmentLevel]): Commitment level to use.
encoding_to_use (Optional[UiAccountEncoding]): Encoding to use.
data_slice (Optional[DataSliceOpts]): Limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
filters (Optional[Sequence[Union[int, MemcmpOpts]]]): Options to compare a provided series of bytes with program account data at a particular offset. Note: an int entry is converted to a `dataSize` filter.
Returns:
int, str, ProgramSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
if commitment_to_use is None and encoding_to_use is None and data_slice is None and filters is None:
rpc_program_accounts_config = None
else:
data_slice_to_use = (
None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use, data_slice = data_slice_to_use)
filters_to_use: Optional[List[Union[int, Memcmp]]] = None if filters is None else [x if isinstance(x, int) else Memcmp(*x) for x in filters]
rpc_program_accounts_config = RpcProgramAccountsConfig(account_config = rpc_account_info_config, filters = filters_to_use)
req = ProgramSubscribe(program_id, rpc_program_accounts_config, next_id)
return next_id, "programSubscribe", req
@classmethod
def program_unsubscribe(cls, subscription_id: int) -> (int, str, ProgramUnsubscribe):
"""Unsubscribe from program account notifications.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, ProgramUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = ProgramUnsubscribe(subscription_id, next_id)
return next_id, "programUnsubscribe", req
@classmethod
def root_subscribe(cls) -> (int, str, RootSubscribe):
""" Subscribe to receive notification anytime a new root is set by the validator.
Returns:
int, str, RootSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = RootSubscribe(next_id)
return next_id, "rootSubscribe", req
@classmethod
def root_unsubscribe(cls, subscription_id: int) -> (int, str, RootUnsubscribe):
""" Unsubscribe from root notifications.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, RootUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = RootUnsubscribe(subscription_id, next_id)
return next_id, "rootUnsubscribe", req
@classmethod
def signature_subscribe(cls, signature_to_use: Signature, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, SignatureSubscribe):
""" Subscribe to a transaction signature to receive notification when the transaction is confirmed.
Args:
signature_to_use (Signature): The transaction signature to subscribe to.
commitment_to_use (Optional[CommitmentLevel]): Commitment level.
Returns:
int, str, SignatureSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_signature_subscribe_config = None if commitment_to_use is None else RpcSignatureSubscribeConfig(commitment = commitment_to_use)
req = SignatureSubscribe(signature_to_use, rpc_signature_subscribe_config, next_id)
return next_id, "signatureSubscribe", req
@classmethod
def signature_unsubscribe(cls, subscription_id: int) -> (int, str, SignatureUnsubscribe):
""" Unsubscribe from signature notifications.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, RootUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = SignatureUnsubscribe(subscription_id, next_id)
return next_id, "signatureUnsubscribe", req
@classmethod
def slot_subscribe(cls) -> (int, str, SlotSubscribe):
""" Subscribe to receive notification anytime a slot is processed by the validator.
Returns:
int, str, SlotSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = SlotSubscribe(next_id)
return next_id, "slotSubscribe", req
@classmethod
def slot_unsubscribe(cls, subscription_id: int) -> (int, str, SlotUnsubscribe):
"""Unsubscribe from slot notifications.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, SlotUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = SlotUnsubscribe(subscription_id, next_id)
return next_id, "slotUnsubscribe", req
@classmethod
def slots_updates_subscribe(cls) -> (int, str, SlotsUpdatesSubscribe):
""" Subscribe to receive a notification from the validator on a variety of updates on every slot.
Returns:
int, str, SlotsUpdatesSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = SlotsUpdatesSubscribe(next_id)
return next_id, "slotsUpdatesSubscribe", req
@classmethod
def slots_updates_unsubscribe(cls, subscription_id: int) -> (int, str, SlotsUpdatesUnsubscribe):
""" Unsubscribe from slot update notifications.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, SlotsUpdatesUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = SlotsUpdatesUnsubscribe(subscription_id, next_id)
return next_id, "slotsUpdatesUnsubscribe", req
@classmethod
def vote_subscribe(cls) -> (int, str, VoteSubscribe):
""" Subscribe to receive notification anytime a new vote is observed in gossip.
Returns:
int, str, VoteSubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = VoteSubscribe(next_id)
return next_id, "voteSubscribe", req
@classmethod
def vote_unsubscribe(cls, subscription_id: int) -> (int, str, VoteUnsubscribe):
""" Unsubscribe from vote notifications.
Args:
subscription_id (int): ID of subscription to cancel.
Returns:
int, str, VoteUnsubscribe: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = VoteUnsubscribe(subscription_id, next_id)
return next_id, "voteUnsubscribe", req
class HttpReqGen:
""" Generate requests for Solana Http client. """
@classmethod
def get_account_info(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = CommitmentLevel.Finalized, encoding_to_use: Optional[UiAccountEncoding] = UiAccountEncoding.Base64, data_slice: Optional[DataSliceOpts] = None) -> (int, str, GetAccountInfo):
""" Returns all the account info for the specified public key.
Args:
pubkey_to_use (Pubkey): Address of account to query
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
encoding_to_use (Optional[UiAccountEncoding]): Encoding for Account data, either "base58" (slow), "base64", or "jsonParsed".
Default is "base64".
"base58" is limited to Account data of less than 128 bytes.
"base64" will return base64 encoded data for Account data of any size.
"jsonParsed" encoding attempts to use program-specific state parsers to return more human-readable and explicit account state data.
If jsonParsed is requested but a parser cannot be found, the field falls back to base64 encoding, detectable when the data field is type. (jsonParsed encoding is UNSTABLE).
data_slice (Optional[DataSliceOpts]): Option to limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
Returns:
int, str, GetAccountInfo: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
data_slice_to_use = (
None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, data_slice = data_slice_to_use, commitment = commitment_to_use)
req = GetAccountInfo(pubkey_to_use, rpc_account_info_config, next_id)
return next_id, "getAccountInfo", req
@classmethod
def get_balance(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = CommitmentLevel.Finalized) -> (int, str, GetBalance):
""" Returns the balance of the account of provided Pubkey.
Args:
pubkey_to_use (Pubkey): Pubkey of account to query
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetBalance: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = RpcContextConfig(commitment_to_use)
req = GetBalance(pubkey_to_use, rpc_context_config, next_id)
return next_id, "getBalance", req
@classmethod
def get_block(cls, slot: int, encoding_to_use: Optional[UiTransactionEncoding] = UiTransactionEncoding.Json, max_supported_transaction_version: Optional[int] = None) -> (int, str, GetBlock):
""" Returns identity and transaction information about a confirmed block in the ledger.
Args:
slot: Slot, as u64 integer.
encoding_to_use (Optional[UiTransactionEncoding]): Encoding for the returned Transaction, either "json", "jsonParsed",
"base58" (slow), or "base64". If parameter not provided, the default encoding is JSON.
max_supported_transaction_version: (optional) The max transaction version to return in
responses. If the requested transaction is a higher version, an error will be returned
Returns:
int, str, GetBlock: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_block_config = RpcBlockConfig(encoding = encoding_to_use, max_supported_transaction_version = max_supported_transaction_version)
req = GetBlock(slot, rpc_block_config, next_id)
return next_id, "getBlock", req
@classmethod
def get_block_commitment(cls, slot: int) -> (int, str, GetBlockCommitment):
""" Fetch the commitment for particular block.
Args:
slot (int): Block, identified by Slot.
Returns:
int, str, GetBlockCommitment: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetBlockCommitment(slot, next_id)
return next_id, "getBlockCommitment", req
@classmethod
def get_block_height(cls, commitment_to_use: CommitmentLevel = CommitmentLevel.Finalized) -> (int, str, GetBlockHeight):
""" Returns the current block height of the node.
Args:
commitment_to_use (CommitmentLevel): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetBlockHeight: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
req = GetBlockHeight(rpc_context_config, next_id)
return next_id, "getBlockHeight", req
@classmethod
def get_block_production(cls, identity_to_use: Optional[Pubkey] = None, range_to_use: Optional[RpcBlockProductionConfigRange] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetBlockProduction):
""" Returns recent block production information from the current or previous epoch.
Args:
identity_to_use (Optional[Pubkey]): Only return results for this validator identity (base-58 encoded)
range_to_use (Optional[RpcBlockProductionConfigRange]): Slot range to return block production for. If parameter not provided, defaults to current epoch
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetBlockProduction: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_block_production_config = None if identity_to_use is None and range_to_use is None and commitment_to_use is None else RpcBlockProductionConfig(
identity = identity_to_use, range = range_to_use, commitment = commitment_to_use)
req = GetBlockProduction(rpc_block_production_config, next_id)
return next_id, "getBlockProduction", req
@classmethod
def get_block_time(cls, slot: int) -> (int, str, GetBlockTime):
""" Fetch the estimated production time of a block.
Args:
slot (int): Block, identified by Slot.
Returns:
int, str, GetBlockTime: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetBlockTime(slot, next_id)
return next_id, "getBlockTime", req
@classmethod
def get_blocks(cls, start_slot: int, end_slot: Optional[int] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetBlocks):
""" Returns a list of confirmed blocks.
Args:
start_slot (int): Start slot, as u64 integer.
end_slot (Optional[int]): End slot, as u64 integer.
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetBlocks: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetBlocks(start_slot, end_slot, commitment_to_use, next_id)
return next_id, "getBlocks", req
@classmethod
def get_blocks_with_limit(cls, start_slot: int, limit_to_use: Optional[int] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetBlocksWithLimit):
""" Returns a list of confirmed blocks.
Args:
start_slot (int): Start slot, as u64 integer.
limit_to_use (Optional[int]): limit, as u64 integer (must be no more than 500,000 blocks higher than the startSlot.
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".)
Returns:
int, str, GetBlocksWithLimit: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetBlocksWithLimit(start_slot, limit_to_use, commitment_to_use, next_id)
return next_id, "getBlocksWithLimit", req
@classmethod
def get_cluster_nodes(cls) -> (int, str, GetClusterNodes):
""" Returns information about all the nodes participating in the cluster.
Returns:
int, str, GetClusterNodes: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetClusterNodes(next_id)
return next_id, "getClusterNodes", req
@classmethod
def get_epoch_info(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetEpochInfo):
""" Returns information about the current epoch.
Args:
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetEpochInfo: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment_to_use)
req = GetEpochInfo(rpc_context_config, next_id)
return next_id, "getEpochInfo", req
@classmethod
def get_epoch_schedule(cls) -> (int, str, GetEpochSchedule):
""" Returns epoch schedule information from this cluster's genesis config.
Returns:
int, str, GetEpochSchedule: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetEpochSchedule(next_id)
return next_id, "getEpochSchedule", req
@classmethod
def get_fee_for_message(cls, message_to_use: VersionedMessage, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetFeeForMessage):
""" Returns the fee for a message.
Args:
message_to_use (VersionedMessage): Message that the fee is requested for.
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetFeeForMessage: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetFeeForMessage(message_to_use, commitment_to_use, next_id)
return next_id, "getFeeForMessage", req
@classmethod
def get_first_available_block(cls) -> (int, str, GetFirstAvailableBlock):
""" Returns the slot of the lowest confirmed block that has not been purged from the ledger.
Returns:
int, str, GetFirstAvailableBlock: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetFirstAvailableBlock(next_id)
return next_id, "getFirstAvailableBlock", req
@classmethod
def get_genesis_hash(cls) -> (int, str, GetGenesisHash):
""" Returns the genesis hash.
Returns:
int, str, GetGenesisHash: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetGenesisHash(next_id)
return next_id, "getGenesisHash", req
@classmethod
def get_health(cls) -> (int, str, GetHealth):
""" Returns the current health of the node.
A healthy node is one that is within HEALTH_CHECK_SLOT_DISTANCE slots of the latest cluster confirmed slot.
Returns:
int, str, GetHealth: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetHealth(next_id)
return next_id, "getHealth", req
@classmethod
def get_highest_snapshot_slot(cls) -> (int, str, GetHighestSnapshotSlot):
""" Returns the highest slot information that the node has snapshots for.
Returns:
int, str, GetHighestSnapshotSlot: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetHighestSnapshotSlot(next_id)
return next_id, "getHighestSnapshotSlot", req
@classmethod
def get_identity(cls) -> (int, str, GetIdentity):
""" Returns the identity pubkey for the current node
Returns:
int, str, GetIdentity: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetIdentity(next_id)
return next_id, "getIdentity", req
@classmethod
def get_inflation_governor(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetInflationGovernor):
""" Returns the current inflation governor.
Args:
commitment_to_use: Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetInflationGovernor: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetInflationGovernor(commitment_to_use, next_id)
return next_id, "getInflationGovernor", req
@classmethod
def get_inflation_rate(cls) -> (int, str, GetInflationRate):
""" Returns the specific inflation values for the current epoch.
Returns:
int, str, GetInflationRate: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetInflationRate(next_id)
return next_id, "getInflationRate", req
@classmethod
def get_inflation_reward(cls, pubkeys_to_use: Sequence[Pubkey], epoch_to_use: Optional[int] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetInflationReward):
""" Returns the inflation / staking reward for a list of addresses for an epoch.
Args:
pubkeys_to_use (Sequence[Pubkey]): An array of addresses to query, as base-58 encoded strings
epoch_to_use (Optional[int]): An epoch for which the reward occurs. If omitted, the previous epoch will be used
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized" or "confirmed".
Returns:
int, str, GetInflationReward: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_epoch_config = None if epoch_to_use is None and commitment_to_use is None else RpcEpochConfig(epoch_to_use, commitment_to_use)
req = GetInflationReward(pubkeys_to_use, rpc_epoch_config, next_id)
return next_id, "getInflationReward", req
@classmethod
def get_largest_accounts(cls, commitment_to_use: Optional[CommitmentLevel] = None, filter_to_use: Optional[RpcLargestAccountsFilter] = None) -> (int, str, GetLargestAccounts):
""" Returns the 20 largest accounts, by lamport balance.
Args:
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
filter_to_use (Optional[RpcLargestAccountsFilter]): Filter results by account type; currently supported: circulating|nonCirculating.
Returns:
int, str, GetLargestAccounts: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetLargestAccounts(commitment_to_use, filter_to_use, next_id)
return next_id, "getLargestAccounts", req
@classmethod
def get_latest_blockhash(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetLatestBlockhash):
""" Returns the latest block hash from the ledger. Response also includes the last valid block height.
Args:
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetLatestBlockhash: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment_to_use)
req = GetLatestBlockhash(rpc_context_config, next_id)
return next_id, "getLatestBlockhash", req
@classmethod
def get_leader_schedule(cls, epoch_to_use: Optional[int] = None, identity_to_use: Optional[Pubkey] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetLeaderSchedule):
""" Returns the leader schedule for an epoch.
Args:
epoch_to_use (Optional[int]): Fetch the leader schedule for the epoch that corresponds to the provided slot. If unspecified, the leader schedule for the current epoch is fetched.
identity_to_use (Optional[Pubkey]): Only return results for this validator identity (base-58 encoded)
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetLeaderSchedule: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_leader_schedule_config = None if identity_to_use is None and commitment_to_use is None else RpcLeaderScheduleConfig(identity_to_use, commitment_to_use)
req = GetLeaderSchedule(epoch_to_use, rpc_leader_schedule_config, next_id)
return next_id, "getLeaderSchedule", req
@classmethod
def get_max_retransmit_slot(cls) -> (int, str, GetMaxRetransmitSlot):
""" Get the max slot seen from retransmit stage.
Returns:
int, str, GetMaxRetransmitSlot: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetMaxRetransmitSlot(next_id)
return next_id, "getMaxRetransmitSlot", req
@classmethod
def get_max_shred_insert_slot(cls) -> (int, str, GetMaxShredInsertSlot):
""" Get the max slot seen from after shred insert.
Returns:
int, str, GetAccountInfo: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetMaxShredInsertSlot(next_id)
return next_id, "getMaxShredInsertSlot", req
@classmethod
def get_minimum_balance_for_rent_exemption(cls, min_size: int, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetMinimumBalanceForRentExemption):
""" Returns minimum balance required to make account rent exempt.
Args:
min_size (int): Account data length.
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetMinimumBalanceForRentExemption: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetMinimumBalanceForRentExemption(min_size, commitment_to_use, next_id)
return next_id, "getMinimumBalanceForRentExemption", req
@classmethod
def get_multiple_accounts(cls, pubkeys_to_use: Sequence[Pubkey], commitment_to_use: Optional[CommitmentLevel] = CommitmentLevel.Finalized, encoding_to_use: Optional[UiAccountEncoding] = UiAccountEncoding.Base64, data_slice: Optional[DataSliceOpts] = None) -> (int, str, GetMultipleAccounts):
""" Returns all the account info for a list of public keys.
Args:
pubkeys_to_use (Sequence[Pubkey]): list of Pubkeys to query
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed". Default to finalized.
encoding_to_use (Optional[UiAccountEncoding]): Encoding for Account data, either "base58" (slow) or "base64".
"base58" is limited to Account data of less than 128 bytes.
"base64" will return base64 encoded data for Account data of any size.
data_slice (Optional[DataSliceOpts]): (optional) Option to limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
Returns:
int, str, GetMultipleAccounts: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
data_slice_to_use = (None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use, data_slice = data_slice_to_use)
req = GetMultipleAccounts(pubkeys_to_use, rpc_account_info_config, next_id)
return next_id, "getMultipleAccounts", req
@classmethod
def get_program_accounts(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiAccountEncoding] = None, data_slice: Optional[DataSliceOpts] = None, filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None) -> (int, str, GetProgramAccounts):
""" Returns all accounts owned by the provided program Pubkey.
Args:
pubkey_to_use (Pubkey): Pubkey of program
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
encoding_to_use (Optional[UiAccountEncoding]): Encoding for the returned Transaction, either jsonParsed", "base58" (slow), or "base64".
data_slice (Optional[DataSliceOpts]): Limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
filters (Optional[Sequence[Union[int, MemcmpOpts]]]): Options to compare a provided series of bytes with program account data at a particular offset. Note: an int entry is converted to a `dataSize` filter.
Returns:
int, str, GetProgramAccounts: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
data_slice_to_use = (None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use, data_slice = data_slice_to_use)
filters_to_use = (None if filters is None else [x if isinstance(x, int) else Memcmp(offset = x.offset, bytes_ = x.bytes) for x in filters])
rpc_program_accounts_config = RpcProgramAccountsConfig(rpc_account_info_config, filters_to_use)
req = GetProgramAccounts(pubkey_to_use, rpc_program_accounts_config, next_id)
return next_id, "getProgramAccounts", req
@classmethod
def get_recent_performance_samples(cls, limit: int) -> (int, str, GetRecentPerformanceSamples):
""" Returns a list of recent performance samples, in reverse slot order. Performance samples are taken every 60 seconds and include the number of transactions and slots that occur in a given time window.
Args:
limit (int): number of samples to return (maximum 720)
Returns:
int, str, GetRecentPerformanceSamples: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetRecentPerformanceSamples(limit, next_id)
return next_id, "getRecentPerformanceSamples", req
# TODO: where is GetRecentPrioritizationFees in solders
# @classmethod
# def getRecentPrioritizationFees(cls, pubkeys_to_use: Sequence[Pubkey]) -> (int, str, GetRecentPrioritizationFees):
# """ Returns a list of prioritization fees from recent blocks.
# Args:
# pubkeys_to_use (Sequence[Pubkey]): An array of Account addresses (up to a maximum of 128 addresses), as base-58 encoded strings
# Returns:
# int, str, GetRecentPrioritizationFees: next_id, RPC method name, solders request object
# """
# next_id = SharedCounter.get_next_id()
# req = GetRecentPrioritizationFees(pubkeys_to_use, id)
# return next_id, "getRecentPrioritizationFees", req
@classmethod
def get_signature_statuses(cls, signatures: Sequence[Signature], search_transaction_history: bool = False) -> (int, str, GetSignatureStatuses):
""" Returns the statuses of a list of signatures. Unless the `searchTransactionHistory` configuration parameter is included, this method only searches the recent status cache of signatures, which retains statuses for all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots.
Args:
signatures (Sequence[Signature]): An array of transaction signatures to confirm.
search_transaction_history (bool): If true, a Solana node will search its ledger cache for any signatures not found in the recent status cache.
Returns:
int, str, GetSignatureStatuses: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_signature_status_config = RpcSignatureStatusConfig(search_transaction_history)
req = GetSignatureStatuses(signatures, rpc_signature_status_config, next_id)
return next_id, "getSignatureStatuses", req
@classmethod
def get_signatures_for_address(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, limit_to_use: Optional[int] = None, before_to_use: Optional[Signature] = None, until_to_use: Optional[Signature] = None) -> (int, str, GetSignaturesForAddress):
""" Returns signatures for confirmed transactions that include the given address in their accountKeys list. Returns signatures backwards in time from the provided signature or most recent confirmed block
Args:
pubkey_to_use (Pubkey): Account address as base-58 encoded string
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
limit_to_use (Optional[int]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
before_to_use (Optional[Signature]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
until_to_use (Optional[Signature]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetSignaturesForAddress: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_signatures_for_address_config = None if commitment_to_use is None and limit_to_use is None and before_to_use is None and until_to_use is None else RpcSignaturesForAddressConfig(before = before_to_use, until = until_to_use, limit = limit_to_use, commitment = commitment_to_use)
req = GetSignaturesForAddress(pubkey_to_use, rpc_signatures_for_address_config, next_id)
return next_id, "getSignaturesForAddress", req
@classmethod
def get_slot(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetSlot):
""" Returns the current slot the node is processing.
Args:
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetSlot: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
req = GetSlot(rpc_context_config, next_id)
return next_id, "getSlot", req
@classmethod
def get_slot_leader(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetSlotLeader):
""" Returns the current slot leader.
Args:
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetSlotLeader: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
req = GetSlotLeader(rpc_context_config, next_id)
return next_id, "getSlotLeader", req
@classmethod
def get_slot_leaders(cls, start: int, limit: int) -> (int, str, GetSlotLeaders):
""" Returns the slot leaders for a given slot range.
Args:
start (int): Start slot, as u64 integer
limit (int): Limit, as u64 integer (between 1 and 5,000)
Returns:
int, str, GetSlotLeaders: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetSlotLeaders(start, limit, next_id)
return next_id, "getSlotLeaders", req
# TODO: where is GetStakeMinimumDelegation in solders
# @classmethod
# def getStakeMinimumDelegation(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetStakeMinimumDelegation):
# """ Returns the stake minimum delegation, in lamports.
# Args:
# commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
# Returns:
# int, str, GetStakeMinimumDelegation: next_id, RPC method name, solders request object
# """
# next_id = SharedCounter.get_next_id()
# rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
# req = GetStakeMinimumDelegation(rpc_context_config, id)
# return next_id, "getStakeMinimumDelegation", req
@classmethod
def get_supply(cls, exclude_non_circulating_accounts_list: bool = True, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetSupply):
""" Returns information about the current supply.
Args:
exclude_non_circulating_accounts_list (bool): exclude non circulating accounts list from response
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetSupply: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_supply_config = RpcSupplyConfig(exclude_non_circulating_accounts_list = exclude_non_circulating_accounts_list, commitment = commitment_to_use)
req = GetSupply(rpc_supply_config, next_id)
return next_id, "getSupply", req
@classmethod
def get_token_account_balance(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenAccountBalance):
""" Returns the token balance of an SPL Token account (UNSTABLE).
Args:
pubkey_to_use (Pubkey): Pubkey of Token account to query
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetTokenAccountBalance: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetTokenAccountBalance(pubkey_to_use, commitment_to_use, next_id)
return next_id, "getTokenAccountBalance", req
@classmethod
def get_token_accounts_by_delegate(cls, pubkey_to_use: Pubkey, opts: TokenAccountOpts, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenAccountsByDelegate):
""" Returns all SPL Token accounts by approved Delegate (UNSTABLE).
Args:
pubkey_to_use (Pubkey): Pubkey of Token account to query
opts (TokenAccountOpts): Token account option specifying at least one of `mint` or `programId`.
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetTokenAccountsByDelegate: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
encoding_to_use = opts.encoding
maybe_data_slice = opts.dataSlice
data_slice_to_use = (None if maybe_data_slice is None else UiDataSliceConfig(offset = maybe_data_slice.offset, length = maybe_data_slice.length))
maybe_mint = opts.mint
maybe_program_id = opts.programId
filter_to_use: Union[RpcTokenAccountsFilterMint, RpcTokenAccountsFilterProgramId]
if maybe_mint is not None:
filter_to_use = RpcTokenAccountsFilterMint(maybe_mint)
elif maybe_program_id is not None:
filter_to_use = RpcTokenAccountsFilterProgramId(maybe_program_id)
else:
raise ValueError("Please provide one of mint or program_id")
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, data_slice = data_slice_to_use, commitment = commitment_to_use)
req = GetTokenAccountsByDelegate(pubkey_to_use, filter_to_use, rpc_account_info_config, next_id)
return next_id, "getTokenAccountsByDelegate", req
@classmethod
def get_token_accounts_by_owner(cls, pubkey_to_use: Pubkey, opts: TokenAccountOpts, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenAccountsByOwner):
""" Returns all SPL Token accounts by token owner (UNSTABLE).
Args:
pubkey_to_use (Pubkey): Pubkey of Token account to query
opts (TokenAccountOpts): Token account option specifying at least one of `mint` or `programId`.
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetTokenAccountsByOwner: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
encoding_to_use = opts.encoding
maybe_data_dlice = opts.dataSlice
data_slice_to_use = (None if maybe_data_dlice is None else UiDataSliceConfig(offset = maybe_data_dlice.offset, length = maybe_data_dlice.length))
maybe_mint = opts.mint
maybe_program_id = opts.programId
filter_to_use: Union[RpcTokenAccountsFilterMint, RpcTokenAccountsFilterProgramId]
if maybe_mint is not None:
filter_to_use = RpcTokenAccountsFilterMint(maybe_mint)
elif maybe_program_id is not None:
filter_to_use = RpcTokenAccountsFilterProgramId(maybe_program_id)
else:
raise ValueError("Please provide one of mint or program_id")
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, data_slice = data_slice_to_use, commitment = commitment_to_use)
req = GetTokenAccountsByOwner(pubkey_to_use, filter_to_use, rpc_account_info_config, next_id)
return next_id, "getTokenAccountsByOwner", req
@classmethod
def get_token_largest_accounts(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenLargestAccounts):
""" Returns the 20 largest accounts of a particular SPL Token type.
Args:
pubkey_to_use (Pubkey): Pubkey of Token account to query
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetTokenLargestAccounts: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetTokenLargestAccounts(pubkey_to_use, commitment_to_use, next_id)
return next_id, "getTokenLargestAccounts", req
@classmethod
def get_token_supply(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenSupply):
"""Returns the total supply of an SPL Token type.
Args:
pubkey_to_use (Pubkey): Pubkey of Token account to query
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetTokenSupply: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetTokenSupply(pubkey_to_use, commitment_to_use, next_id)
return next_id, "getTokenSupply", req
@classmethod
def get_transaction(cls, signature_to_use: Signature, encoding_to_use: Optional[UiTransactionEncoding] = UiTransactionEncoding.Json, commitment_to_use: Optional[CommitmentLevel] = None, max_supported_transaction_version: Optional[int] = None) -> (int, str, GetTransaction):
""" Returns transaction details for a confirmed transaction
Args:
signature_to_use (Signature): Transaction signature, as base-58 encoded string
encoding_to_use (Optional[UiTransactionEncoding]): Encoding for the returned Transaction
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
max_supported_transaction_version (Optional[int]): max supported transaction version
Returns:
int, str, GetTransaction: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_transaction_config = None if encoding_to_use is None and commitment_to_use is None and max_supported_transaction_version is None else RpcTransactionConfig(encoding = encoding_to_use, commitment = commitment_to_use, max_supported_transaction_version = max_supported_transaction_version)
req = GetTransaction(signature_to_use, rpc_transaction_config, next_id)
return next_id, "getTransaction", req
@classmethod
def get_transaction_count(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTransactionCount):
""" Returns the current Transaction count from the ledger
Args:
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, GetTransactionCount: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
req = GetTransactionCount(rpc_context_config, next_id)
return next_id, "getTransactionCount", req
@classmethod
def get_version(cls) -> (int, str, GetVersion):
""" Returns the current Solana version running on the node
Returns:
int, str, GetVersion: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = GetVersion(next_id)
return next_id, "getVersion", req
@classmethod
def get_vote_accounts(cls, pubkey_to_use: Optional[Pubkey] = None, commitment_to_use: Optional[CommitmentLevel] = None, keep_unstaked_delinquents: Optional[bool] = None, delinquent_slot_distance: Optional[int] = None) -> (int, str, GetVoteAccounts):
""" Returns the account info and associated stake for all the voting accounts in the current bank.
Args:
pubkey_to_use (Optional[Pubkey]): Only return results for this validator vote address (base-58 encoded)
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
keep_unstaked_delinquents (Optional[bool]): Do not filter out delinquent validators with no stake
delinquent_slot_distance (Optional[int]): Specify the number of slots behind the tip that a validator must fall to be considered delinquent. NOTE: For the sake of consistency between ecosystem products, it is not recommended that this argument be specified.
Returns:
int, str, GetVoteAccounts: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_get_vote_accounts_config = None if pubkey_to_use is None and commitment_to_use is None and keep_unstaked_delinquents is None and delinquent_slot_distance is None else RpcGetVoteAccountsConfig(vote_pubkey = pubkey_to_use, commitment = commitment_to_use, keep_unstaked_delinquents = keep_unstaked_delinquents, delinquent_slot_distance = delinquent_slot_distance)
req = GetVoteAccounts(rpc_get_vote_accounts_config, next_id)
return next_id, "getVoteAccounts", req
@classmethod
def is_blockhash_valid(cls, blockhash_to_use: Hash, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, IsBlockhashValid):
""" Returns whether a blockhash is still valid or not
Args:
blockhash_to_use (Hash): the blockhash of the block to evaluate, as base-58 encoded string
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, IsBlockhashValid: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment_to_use)
req = IsBlockhashValid(blockhash_to_use, rpc_context_config, next_id)
return next_id, "isBlockhashValid", req
@classmethod
def minimum_ledger_slot(cls) -> (int, str, MinimumLedgerSlot):
""" Returns the lowest slot that the node has information about in its ledger.
Returns:
int, str, MinimumLedgerSlot: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
req = MinimumLedgerSlot(next_id)
return next_id, "minimumLedgerSlot", req
@classmethod
def request_airdrop(cls, pubkey_to_use: Pubkey, lamports_to_use: int, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, RequestAirdrop):
""" Requests an airdrop of lamports to a Pubkey.
Args:
pubkey_to_use (Pubkey): Pubkey of account to receive lamports, as base-58 encoded string or public key object.
lamports_to_use (int): Amount of lamports.
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, RequestAirdrop: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_request_airdrop_config = None if commitment_to_use is None else RpcRequestAirdropConfig(commitment_to_use)
req = RequestAirdrop(pubkey_to_use, lamports_to_use, rpc_request_airdrop_config, next_id)
return next_id, "requestAirdrop", req
@classmethod
def send_transaction(cls, txn: Union[VersionedTransaction, Transaction], opts: Optional[TxOpts] = None) -> (int, str, SendRawTransaction):
""" Send a transaction.
Args:
txn (Union[VersionedTransaction, Transaction]): transaction object.
opts (Optional[TxOpts]): Transaction options.
Returns:
int, str, SendRawTransaction: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
btxn = bytes(txn)
opts_to_use = TxOpts(prefLightCommitment = CommitmentLevel.Finalized) if opts is None else opts
pref_light_commitment_to_use = CommitmentLevel.Finalized if opts.prefLightCommitment is None else opts.prefLightCommitment
rpc_send_transaction_config = RpcSendTransactionConfig(skip_preflight = opts_to_use.skipPreflight, preflight_commitment = pref_light_commitment_to_use, encoding = opts_to_use.prefEncoding, max_retries = opts_to_use.maxRetries)
req = SendRawTransaction(btxn, rpc_send_transaction_config, next_id)
return next_id, "sendTransaction", req
@classmethod
def simulate_transaction(cls, txn: Union[VersionedTransaction, Transaction], verify_signature: bool, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, Union[SimulateVersionedTransaction, SimulateLegacyTransaction]):
""" Send a transaction.
Args:
txn (Union[VersionedTransaction, Transaction]): A Transaction object, a transaction in wire format, or a transaction as base-64 encoded string. The transaction must have a valid blockhash, but is not required to be signed.
verify_signature (bool): If true the transaction signatures will be verified (default: false).
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
Returns:
int, str, Union[SimulateVersionedTransaction, SimulateLegacyTransaction]: next_id, RPC method name, solders request object
"""
next_id = SharedCounter.get_next_id()
rpc_simulate_transaction_config = RpcSimulateTransactionConfig(sig_verify = verify_signature, commitment = commitment_to_use)
if isinstance(txn, Transaction):
req = SimulateLegacyTransaction(txn, rpc_simulate_transaction_config, next_id)
else:
req = SimulateVersionedTransaction(txn, rpc_simulate_transaction_config, next_id)
return next_id, "simulateTransaction", req
class Wallet:
""" Class Wallet can raise ValueError """
WrongWalletName = 1 # 'walletName' wrong type (must not be empty string)
WrongWalletId = 2 # 'walletId' wrong type (must not be empty string)
WrongWalletPass = 3 # 'walletPass' wrong type (must not be empty string)
WrongWalletType = 4 # 'walletType' must be equal string 'solana'
JSONDecodeError = 5 # Cannot decode Json
def __init__(self, wallet_name: str, wallet_pub_key: str, wallet_priv_key: str, dt_crea: datetime = None, dt_update: datetime = None, main_sol_balance: float = 0, main_tokens: list = None, dev_sol_balance: float = 0, dev_tokens: list = None, wallet_type: str = 'solana'):
# Checking mandatory parameters
if not isinstance(wallet_name, str) or not wallet_name.strip():
raise ValueError(Wallet.WrongWalletName)
if not isinstance(wallet_pub_key, str) or not wallet_pub_key.strip():
raise ValueError(Wallet.WrongWalletId)
if not isinstance(wallet_priv_key, str) or not wallet_priv_key.strip():
raise ValueError(Wallet.WrongWalletPass)
if wallet_type != "solana":
raise ValueError(Wallet.WrongWalletType)
# Assign required values
self.walletType = wallet_type
self.walletName = wallet_name
self.walletPubKey = wallet_pub_key
self.walletPrivKey = wallet_priv_key
# Manage default date values
self.dtCrea = dt_crea if isinstance(dt_crea, datetime) else datetime.now(timezone.utc)
self.dtUpdate = dt_update if isinstance(dt_update, datetime) else datetime.now(timezone.utc)
# Manage the balances and set them to zero if they are negative
self.mainSolBalance = max(main_sol_balance, 0)
self.devSolBalance = max(dev_sol_balance, 0)
# Initialize token lists if not provided
self.mainTokens = main_tokens if main_tokens is not None else []
self.devTokens = dev_tokens if dev_tokens is not None else []
def to_json(self) -> str:
""" Convert instance to JSON """
return json.dumps({"walletType": self.walletType, "walletName": self.walletName, "walletPubKey": self.walletPubKey, "walletPrivKey": self.walletPrivKey, "dtCrea": self.dtCrea.isoformat(), "dtUpdate": self.dtUpdate.isoformat(), "mainSolBalance": self.mainSolBalance, "mainTokens": self.mainTokens, "devSolBalance": self.devSolBalance, "devTokens": self.devTokens})
@staticmethod
def parse_datetime(dt_str):
""" Parse a date string into a datetime, returning the current time in UTC on error.
Args:
dt_str: possible datetime string
Returns:
datetime:
"""
if isinstance(dt_str, str):
try:
return datetime.fromisoformat(dt_str)
except ValueError:
logger.warning("Format de date invalide, remplacement par l'heure actuelle.")
return datetime.now(timezone.utc)
@staticmethod
def from_json(json_string: str):
""" Create a Wallet instance from a JSON string """
try:
data = json.loads(json_string)
# Checking mandatory parameters
if 'walletName' not in data or not isinstance(data["walletName"], str) or not data["walletName"].strip():
raise ValueError(Wallet.WrongWalletName)
if 'walletPubKey' not in data or not isinstance(data["walletPubKey"], str) or not data["walletPubKey"].strip():
raise ValueError(Wallet.WrongWalletId)
if 'walletPrivKey' not in data or not isinstance(data["walletPrivKey"], str) or not data["walletPrivKey"].strip():
raise ValueError(Wallet.WrongWalletPass)
if 'walletType' not in data or data["walletType"] != "solana":
raise ValueError(Wallet.WrongWalletType)
wallet_name = data["walletName"]
wallet_pub_key = data["walletPubKey"]
wallet_priv_key = data["walletPrivKey"]
wallet_type = data["walletType"]
# Retrieve and validate dates
dt_crea = Wallet.parse_datetime(data.get("dtCrea"))
dt_update = Wallet.parse_datetime(data.get("dtUpdate"))
# Retrieve and validate the scales, reset them to zero if they are negative
main_sol_balance = max(data.get("mainSolBalance", 0), 0)
dev_sol_balance = max(data.get("devSolBalance", 0), 0)
# Retrieve token lists, initialize to empty list if absent
main_tokens = data.get("mainTokens", [])
dev_tokens = data.get("devTokens", [])
# Create and return the Wallet instance
return Wallet(wallet_name, wallet_pub_key, wallet_priv_key, dt_crea, dt_update, main_sol_balance, main_tokens, dev_sol_balance, dev_tokens, wallet_type)
except json.JSONDecodeError:
raise ValueError(Wallet.JSONDecodeError)
class SimpleMsgBox(QMessageBox):
""" Simple Class inherit from QMessageBox. """
def __init__(self, window_title: str, window_text: str, ico: QMessageBox.Icon, parent = None):
super().__init__(parent)
self.setLayoutDirection(get_qt_dir())
self.setOption(QMessageBox.Option.DontUseNativeDialog, True)
self.setWindowIcon(QIcon(":imgIcon"))
self.setWindowTitle(window_title)
self.setText(window_text)
self.setIcon(ico)
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
center_on_screen(self)
class NewWalletNameAndPassDialog(QDialog):
""" Dialog to ask for the name and the encryptPass for the new Wallet """
def __init__(self, parent = None):
super().__init__(parent)
self.setLayoutDirection(get_qt_dir())
self.setWindowIcon(QIcon(":imgIcon"))
self.setWindowTitle(self.tr("Wallet Name and EncryptPass"))
main_qflo = QFormLayout()
self.setLayout(main_qflo)
self._walletName_Qle = QLineEdit()
self._walletEncyptPass_Qle = QLineEdit()
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
main_qflo.addRow(self.tr("Wallet Name:"), self._walletName_Qle)
self._toggle_qtbtn = QToolButton()
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
self._toggle_qtbtn.setCheckable(True)
self._toggle_qtbtn.setToolTip(self.tr("Show/Hide Password"))
self._toggle_qtbtn.clicked.connect(self._toggle_password_visibility)
pass_layout = QHBoxLayout()
pass_layout.addWidget(self._walletEncyptPass_Qle)
pass_layout.addWidget(self._toggle_qtbtn)
main_qflo.addRow(self.tr("Wallet EncryptPass:"), pass_layout)
validate_qpbtn = QPushButton(self.tr("Validate"))
cancel_qpbtn = QPushButton(self.tr("Cancel"))
button_qdbbox = QDialogButtonBox()
button_qdbbox.addButton(validate_qpbtn, QDialogButtonBox.ButtonRole.AcceptRole)
button_qdbbox.addButton(cancel_qpbtn, QDialogButtonBox.ButtonRole.RejectRole)
validate_qpbtn.clicked.connect(self._validate_and_accept)
cancel_qpbtn.clicked.connect(self.reject)
main_qflo.addWidget(button_qdbbox)
self.walletName = None
self.walletEncyptPass = None
def _toggle_password_visibility(self):
""" Toggle the password visibility in the QLineEdit """
is_visible = self._walletEncyptPass_Qle.echoMode() == QLineEdit.EchoMode.Normal
if is_visible:
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
else:
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Normal)
self._toggle_qtbtn.setIcon(QIcon(":icoEyeHide"))
def _validate_and_accept(self):
""" validation of the QLineEdit contents """
# Validation of field 1
if len(self._walletName_Qle.text().strip()) < 2:
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The Wallet Name must have a minimum of 2 characters."), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
return
# Validation of field 2 (password)
if not re.match(r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]{8,}$", self._walletEncyptPass_Qle.text()):
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The Wallet EncryptPass must have a minimum of 1 lower case character, 1 upper case character, 1 numeric character, 1 special character (@$!%*?&), and must have a minimum of 8 characters length."), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
return
self.walletName = self._walletName_Qle.text().strip()
self.walletEncyptPass = self._walletEncyptPass_Qle.text()
# If both fields are valid, the entry is accepted.
self.accept()
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
size = self.size()
self.setFixedSize(size)
center_on_screen(self)
class NewWalletFileDialog(QFileDialog):
""" File Fialog to save a new wallet file """
def __init__(self, parent = None):
super().__init__(parent)
self.setOption(QFileDialog.Option.DontUseNativeDialog, True)
self.setWindowTitle(self.tr("Save Content to New Wallet File"))
self.setWindowIcon(QIcon(":imgIcon"))
self.setAcceptMode(QFileDialog.AcceptMode.AcceptSave)
self.setFileMode(QFileDialog.FileMode.AnyFile)
self.setOption(QFileDialog.Option.DontConfirmOverwrite, True)
self.setLabelText(QFileDialog.DialogLabel.LookIn, self.tr("Wallets location:"))
self.setLabelText(QFileDialog.DialogLabel.FileName, self.tr("File name:"))
self.setLabelText(QFileDialog.DialogLabel.FileType, "File type:")
self.setLabelText(QFileDialog.DialogLabel.Accept, self.tr("Validate"))
self.setLabelText(QFileDialog.DialogLabel.Reject, self.tr("Cancel"))
self.setDirectory(WALLETS_FOLDER)
self.setDefaultSuffix(WALLET_FILE_EXT)
self.setNameFilter(self.tr("{desc} (*.{ext})").format(desc = WALLET_FILE_DESC, ext = WALLET_FILE_EXT))
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
size = self.size()
self.setMinimumSize(size)
center_on_screen(self)
class LoadWalletPassDialog(QDialog):
""" Dialog to ask for EncryptPass to open a wallet file """
def __init__(self, file_name: str, parent = None):
super().__init__(parent)
self.setLayoutDirection(get_qt_dir())
self.setWindowIcon(QIcon(":imgIcon"))
self.setWindowTitle(self.tr("Wallet EncryptPass for file '{fileName}'").format(fileName = file_name))
main_qflo = QFormLayout()
self.setLayout(main_qflo)
self._walletEncyptPass_Qle = QLineEdit()
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
self._toggle_qtbtn = QToolButton()
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
self._toggle_qtbtn.setCheckable(True)
self._toggle_qtbtn.setToolTip(self.tr("Show/Hide Password"))
self._toggle_qtbtn.clicked.connect(self._toggle_password_visibility)
pass_layout = QHBoxLayout()
pass_layout.addWidget(self._walletEncyptPass_Qle)
pass_layout.addWidget(self._toggle_qtbtn)
main_qflo.addRow(self.tr("Wallet EncryptPass:"), pass_layout)
validate_qpbtn = QPushButton(self.tr("Validate"))
cancel_qpbtn = QPushButton(self.tr("Cancel"))
button_qdbbox = QDialogButtonBox()
button_qdbbox.addButton(validate_qpbtn, QDialogButtonBox.ButtonRole.AcceptRole)
button_qdbbox.addButton(cancel_qpbtn, QDialogButtonBox.ButtonRole.RejectRole)
validate_qpbtn.clicked.connect(self._validate_and_accept)
cancel_qpbtn.clicked.connect(self.reject)
main_qflo.addWidget(button_qdbbox)
self.walletEncyptPass = None
def _toggle_password_visibility(self):
""" Toggle the password visibility in the QLineEdit """
is_visible = self._walletEncyptPass_Qle.echoMode() == QLineEdit.EchoMode.Normal
if is_visible:
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
else:
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Normal)
self._toggle_qtbtn.setIcon(QIcon(":icoEyeHide"))
def _validate_and_accept(self):
""" validation of the QLineEdit content """
# Validation of field 1
if len(self._walletEncyptPass_Qle.text().strip()) < 1:
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("You must enter a valid Password."), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
return
self.walletEncyptPass = self._walletEncyptPass_Qle.text()
# If field is valid, the entry is accepted
self.accept()
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
size = self.size()
self.setFixedSize(size)
center_on_screen(self)
class LoadWalletFileDialog(QFileDialog):
""" QFileDialog used to load wallets files """
def __init__(self, parent = None):
super().__init__(parent)
self.setOption(QFileDialog.Option.DontUseNativeDialog, True)
self.setWindowTitle(self.tr("Load Wallet File"))
self.setWindowIcon(QIcon(":imgIcon"))
self.setAcceptMode(QFileDialog.AcceptMode.AcceptOpen)
self.setFileMode(QFileDialog.FileMode.ExistingFile)
self.setDirectory(WALLETS_FOLDER)
self.setDefaultSuffix(WALLET_FILE_EXT)
self.setLabelText(QFileDialog.DialogLabel.LookIn, self.tr("Wallets location:"))
self.setLabelText(QFileDialog.DialogLabel.FileName, self.tr("File name:"))
self.setLabelText(QFileDialog.DialogLabel.FileType, "File type:")
self.setLabelText(QFileDialog.DialogLabel.Accept, self.tr("Validate"))
self.setLabelText(QFileDialog.DialogLabel.Reject, self.tr("Cancel"))
self.setNameFilter(self.tr("{desc} (*.{ext})").format(desc = WALLET_FILE_DESC, ext = WALLET_FILE_EXT))
# self.setMinimumSize(dialog.width(), dialog.height())
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
size = self.size()
self.setMinimumSize(size)
center_on_screen(self)
class SplashScreen(QSplashScreen):
""" SplashScreen class. """
def __init__(self, fade_in_duration = 1500, display_duration = 2000, fade_out_duration = 1500):
super().__init__(QPixmap(":imgSplash"))
self.setWindowFlag(Qt.WindowType.FramelessWindowHint) # Remove window border and buttons
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
# Apply opacity effect to manage transitions
self._opacityEffect = QGraphicsOpacityEffect()
self.setGraphicsEffect(self._opacityEffect)
# Animation Fade-in
self._fadeIn = QPropertyAnimation(self._opacityEffect, b"opacity")
self._fadeIn.setDuration(fade_in_duration)
self._fadeIn.setStartValue(0)
self._fadeIn.setEndValue(1)
self._fadeIn.setEasingCurve(QEasingCurve.Type.InOutQuad)
# Animation Fade-out
self.fadeOut = QPropertyAnimation(self._opacityEffect, b"opacity")
self.fadeOut.setDuration(fade_out_duration)
self.fadeOut.setStartValue(1)
self.fadeOut.setEndValue(0)
self.fadeOut.setEasingCurve(QEasingCurve.Type.InOutQuad)
# Scheduling the fade-out animation after duration
QTimer.singleShot(display_duration, self.start_fade_out)
# Start fade-in animation
self._fadeIn.start()
def start_fade_out(self):
""" Start the fade-out animation and close the splash screen at the end. """
self.fadeOut.start()
self.fadeOut.finished.connect(self.close)
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
center_on_screen(self)
class AboutWindow(QWidget):
""" About Window Class used to show Modal image about the app """
def __init__(self, bg_txt: str, parent = None):
super().__init__(parent)
self.setWindowTitle(self.tr("About {app}").format(app = APP_NAME_VERSION))
self.setWindowIcon(QIcon(":imgIcon"))
self.setFixedSize(900, 600)
self.setWindowModality(Qt.WindowModality.ApplicationModal)
self.setWindowFlags(Qt.WindowType.Dialog | Qt.WindowType.FramelessWindowHint | Qt.WindowType.WindowStaysOnTopHint)
self._bg_qpix = QPixmap(":imgAbout")
# Centered text placed before the close button so as not to prevent it from being clicked
self._about_Qlbl = QLabel()
self._about_Qlbl.setText(bg_txt)
self._about_Qlbl.setAlignment(Qt.AlignmentFlag.AlignCenter) # Centrer le texte
self._about_Qlbl.setStyleSheet("font-size: 20px; color: white;") # Style du texte
self._about_Qlbl.setWordWrap(True) # Activer le retour à la ligne
self._about_Qlbl.setGeometry(0, 0, self.width(), self.height()) # Occuper toute la fenêtre
# Window close button
self._exit_Qpbtn = QPushButton(self)
self._exit_Qpbtn.setIcon(QIcon(":icoQuit"))
self._exit_Qpbtn.setFixedSize(24, 24)
self._exit_Qpbtn.setGeometry(self.width() - 44, 20, 24, 24) # Position et taille
self._exit_Qpbtn.clicked.connect(self.close)
self._exit_Qpbtn.move(self.width() - self._exit_Qpbtn.width() - 20, 20)
def paintEvent(self, event):
""" fired on paint event """
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.SmoothPixmapTransform)
# Resize the image to fit the window size
scaled_image = self._bg_qpix.scaled(self.size(), Qt.AspectRatioMode.IgnoreAspectRatio, Qt.TransformationMode.SmoothTransformation)
painter.drawPixmap(0, 0, scaled_image)
painter.setOpacity(0.7)
# Add a gradient effect
gradient = QLinearGradient(0, 0, 0, self.height())
gradient.setColorAt(0, QColor(0, 0, 0, 0)) # Transparent en haut
gradient.setColorAt(1, QColor(0, 0, 0, 128)) # Sombre en bas
painter.fillRect(self.rect(), gradient)
def keyPressEvent(self, event: QKeyEvent):
""" Manage keyboard shortcuts to close the window. """
if event.key() in {Qt.Key.Key_Escape, Qt.Key.Key_Space, Qt.Key.Key_Enter, Qt.Key.Key_Return}:
print("Fenêtre fermée avec une touche simple.")
self.close()
elif event.key() == Qt.Key.Key_F4 and event.modifiers() & Qt.KeyboardModifier.ControlModifier:
print("Fenêtre fermée avec Ctrl+F4.")
self.close()
else:
# If another key is pressed, leave the default behavior
super().keyPressEvent(event)
def event(self, event):
""" Monitor window events. """
if event.type() == QEvent.Type.WindowDeactivate:
self.close()
return super().event(event)
class SolanaAdressCheckerWindow(QWidget):
""" Class used to check info of a solana address """
def __init__(self, pool: ConnectionPool, parent = None):
super().__init__(parent)
self._parent = parent
self._pool = pool
self.setWindowIcon(QIcon(":imgIcon"))
self.setMinimumSize(1024, 600)
self.setFocus()
self.setWindowFlags(Qt.WindowType.Dialog)
layout = QVBoxLayout(self)
self.setLayout(layout)
self._solAddress_Qlbl = QLabel()
self._solAddress_Qled = QLineEdit()
h_layout1 = QHBoxLayout()
h_layout1.addWidget(self._solAddress_Qlbl)
h_layout1.addWidget(self._solAddress_Qled)
layout.addLayout(h_layout1)
self._get_account_info_Qpbtn = QPushButton()
self._get_account_info_Qpbtn.setEnabled(False)
self._get_account_info_Qpbtn.clicked.connect(self._get_account_info)
self._get_balance_Qpbtn_Qpbtn = QPushButton()
self._get_balance_Qpbtn_Qpbtn.setEnabled(False)
self._get_balance_Qpbtn_Qpbtn.clicked.connect(self._get_balance)
self._get_program_accounts_Qpbtn = QPushButton()
self._get_program_accounts_Qpbtn.setEnabled(False)
self._get_program_accounts_Qpbtn.clicked.connect(self._get_program_accounts)
self._get_signatures_for_address_Qpbtn = QPushButton()
self._get_signatures_for_address_Qpbtn.setEnabled(False)
self._get_signatures_for_address_Qpbtn.clicked.connect(self._get_signatures_for_address)
self._get_token_account_balance_Qpbtn = QPushButton()
self._get_token_account_balance_Qpbtn.setEnabled(False)
self._get_token_account_balance_Qpbtn.clicked.connect(self._get_token_account_balance)
self._get_token_accounts_by_delegate_Qpbtn = QPushButton()
self._get_token_accounts_by_delegate_Qpbtn.setEnabled(False)
self._get_token_accounts_by_delegate_Qpbtn.clicked.connect(self._get_token_accounts_by_delegate)
self._get_token_accounts_by_owner_Qpbtn = QPushButton()
self._get_token_accounts_by_owner_Qpbtn.setEnabled(False)
self._get_token_accounts_by_owner_Qpbtn.clicked.connect(self._get_token_accounts_by_owner)
self._get_token_supply_Qpbtn = QPushButton()
self._get_token_supply_Qpbtn.setEnabled(False)
self._get_token_supply_Qpbtn.clicked.connect(self._get_token_supply)
h_layout2 = QHBoxLayout()
h_layout2.addWidget(self._get_account_info_Qpbtn)
h_layout2.addWidget(self._get_balance_Qpbtn_Qpbtn)
h_layout2.addWidget(self._get_program_accounts_Qpbtn)
h_layout2.addWidget(self._get_signatures_for_address_Qpbtn)
h_layout2.addWidget(self._get_token_account_balance_Qpbtn)
h_layout2.addWidget(self._get_token_accounts_by_delegate_Qpbtn)
h_layout2.addWidget(self._get_token_accounts_by_owner_Qpbtn)
h_layout2.addWidget(self._get_token_supply_Qpbtn)
layout.addLayout(h_layout2)
self.result_text_edit = QTextEdit()
self.result_text_edit.setReadOnly(True)
layout.addWidget(self.result_text_edit)
self.update_display()
# Connects text modification to button activation/deactivation
self._solAddress_Qled.textChanged.connect(self._toggle_buttons_state)
def _get_account_info(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
req_id, rpc_method, req = HttpReqGen.get_account_info(pk_adr)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = GetAccountInfoResp.from_json(response.body)
if isinstance(resp, GetAccountInfoResp):
result = f"response : {resp.value}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _get_balance(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = GetBalanceResp.from_json(response.body)
if isinstance(resp, GetBalanceResp):
result = f"response : {resp.value}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _get_program_accounts(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = response.body
if isinstance(resp, GetProgramAccountsResp):
result = f"response : {resp.to_json()}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _get_signatures_for_address(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = response.body
if isinstance(resp, GetSignaturesForAddressResp):
result = f"response : {resp.to_json()}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _get_token_account_balance(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = response.body
if isinstance(resp, GetTokenAccountBalanceResp):
result = f"response : {resp.to_json()}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _get_token_accounts_by_delegate(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = response.body
if isinstance(resp, GetTokenAccountsByDelegateResp):
result = f"response : {resp.to_json()}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _get_token_accounts_by_owner(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
tokaaccopts = TokenAccountOpts(programId=PK_TOKEN_PROGRAM_ID)
req_id, rpc_method, req = HttpReqGen.get_token_accounts_by_owner(pk_adr, tokaaccopts, CommitmentLevel.Finalized)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = response.body
if isinstance(resp, GetTokenAccountsByOwnerResp):
result = f"response : {resp.to_json()}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _get_token_supply(self):
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
req_id, rpc_method, req = HttpReqGen.get_token_supply(pk_adr, CommitmentLevel.Finalized)
response = self._pool.send_http_post(rpc_method, req.to_json())
if isinstance(response, ResponseOk):
resp = response.body
if isinstance(resp, GetTokenSupplyResp):
result = f"response : {resp.to_json()}\n"
self.result_text_edit.append(result)
else:
result = f"response err: {type(resp)} - {resp}\n"
self.result_text_edit.append(result)
elif isinstance(response, ResponseError):
result = f"response err: {response.code} - {response.body}\n"
self.result_text_edit.append(result)
elif isinstance(response, NetworkReplyError):
result = f"response net err: {response.code.value} - {response.body}\n"
self.result_text_edit.append(result)
else:
result = f"response unknown: {type(response)} - {response}\n"
self.result_text_edit.append(result)
def _toggle_buttons_state(self):
""" Active ou désactive les boutons en fonction du contenu du QLineEdit """
self._get_account_info_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
self._get_balance_Qpbtn_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
self._get_program_accounts_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
self._get_signatures_for_address_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
self._get_token_account_balance_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
self._get_token_accounts_by_delegate_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
self._get_token_accounts_by_owner_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
self._get_token_supply_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
if self._solAddress_Qled.text().strip() == "":
self._solAddress_Qled.setStyleSheet("background-color: white;")
else:
try:
adr = self._solAddress_Qled.text().strip()
pk_adr = Pubkey.from_string(adr)
if pk_adr.is_on_curve():
self._solAddress_Qled.setStyleSheet("background-color: lightgreen;")
elif len(adr) != 44 or len(base58.b58decode(adr)) != 32:
self._solAddress_Qled.setStyleSheet("background-color: lighred;")
else:
self._solAddress_Qled.setStyleSheet("background-color: green;")
except ValueError:
self._solAddress_Qled.setStyleSheet("background-color: red;")
def update_display(self):
""" update the display of the widget """
global is_rtl
self.setWindowTitle(self.tr("Solana Address Checker"))
self.setLayoutDirection(get_qt_dir())
self._solAddress_Qlbl.setText(self.tr("Address:"))
self._solAddress_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
self._get_account_info_Qpbtn.setText(self.tr("Account Info"))
self._get_balance_Qpbtn_Qpbtn.setText(self.tr("Balance"))
self._get_program_accounts_Qpbtn.setText(self.tr("Program Accounts"))
self._get_signatures_for_address_Qpbtn.setText(self.tr("Signatures"))
self._get_token_account_balance_Qpbtn.setText(self.tr("Token Account Balance"))
self._get_token_accounts_by_delegate_Qpbtn.setText(self.tr("Token Account by Delegates"))
self._get_token_accounts_by_owner_Qpbtn.setText(self.tr("Token Account by Owner"))
self._get_token_supply_Qpbtn.setText(self.tr("Token Supply"))
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
center_on_screen(self)
def closeEvent(self, event):
if isinstance(self._parent, MainWindow):
self._parent.solana_addr_check = False
return super().closeEvent(event)
class HelpWindow(QWidget):
""" Class used to display help from pdf file (stored in qrc) """
def __init__(self, pdf_doc: str, parent = None):
super().__init__(parent)
self.setWindowTitle(self.tr("Help - {app}").format(app = APP_NAME_VERSION))
self.setWindowIcon(QIcon(":imgIcon"))
self.setFixedSize(900, 600)
self.setWindowModality(Qt.WindowModality.ApplicationModal)
self.setWindowFlags(Qt.WindowType.Dialog | Qt.WindowType.MSWindowsFixedSizeDialogHint)
# Create PDF document loader and PDF viewer
self._pdfDoc = QPdfDocument()
self._pdfView = QPdfView()
self._pdfView.setDocument(self._pdfDoc)
# Configure the central widget to display the PDF
self._main_Qvlo = QVBoxLayout()
self.setLayout(self._main_Qvlo)
self._main_Qvlo.setSpacing(0)
self._main_Qvlo.setContentsMargins(QMargins(0, 0, 0, 0))
self._main_Qvlo.addWidget(self._pdfView)
self._pdfDoc.load(pdf_doc)
if self._pdfDoc.status() != QPdfDocument.Status.Ready:
self.close()
else:
self._pdfView.setPageMode(QPdfView.PageMode.MultiPage)
self._pdfView.setZoomMode(QPdfView.ZoomMode.FitToWidth)
self._pdfView.setPageSpacing(0)
self._pdfView.setDocumentMargins(QMargins(0, 0, 0, 0))
def showEvent(self, event) -> None:
""" fired on show event """
super().showEvent(event)
center_on_screen(self)
class WalletTokensTableModel(QAbstractTableModel):
""" Table Model for the tokens in a wallet """
def __init__(self, parent = None):
super().__init__(parent)
self._headers = [self.tr("Token Id"), self.tr("Buy Price"), self.tr("Buy Time"), self.tr("Action")]
self._dataList = []
def rowCount(self, parent = QModelIndex()):
""" return the number of rows in the table """
return len(self._dataList)
def columnCount(self, parent = QModelIndex()):
""" return the number of columns in the table """
return len(self._headers)
def headerData(self, section, orientation, role = Qt.ItemDataRole.DisplayRole):
""" Définit les en-têtes des colonnes. """
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self._headers[section]
return None
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole) -> Any:
""" return the data to display in the QTableView """
if not index.isValid():
return None
elif role == Qt.ItemDataRole.DisplayRole:
token = self._dataList[index.row()]
if index.column() == 0: # Timestamp
return token["token_id"]
elif index.column() == 1: # Level
return token["buy_price"]
elif index.column() == 2: # Message
return token["buy_time"]
elif index.column() == 3: # Action
return ""
elif role == Qt.ItemDataRole.TextAlignmentRole:
global is_rtl
if index.column() == 0:
return get_qt_align_vchr()
elif index.column() == 1:
return get_qt_align_center()
elif index.column() == 2:
return get_qt_align_vchl(not is_rtl)
elif role == Qt.ItemDataRole.DecorationRole and index.column() == 0:
return QIcon(":icoWallet")
return None
def set_headers(self, new_headers: list):
""" used to update the header column content (on lang change) """
self._headers = new_headers
self.headerDataChanged.emit(Qt.Orientation.Horizontal, 0, 3)
self.layoutChanged.emit()
def set_data(self, data: list):
""" change the data of the table model """
self._dataList = data
self.layoutChanged.emit()
class WalletWidget(QWidget):
""" Widget to display the content of a wallet """
def __init__(self, parent = None):
super().__init__(parent)
self.setWindowIcon(QIcon(":icoWallet"))
self.wallet = None
self.fileKey = None
self.filePath = None
self._main_Qglo = QGridLayout()
self.setLayout(self._main_Qglo)
# Line 1
self._walletName_Qlbl = QLabel()
self.walletName_Qled = QLineEdit()
self._saveWallet_Qpbtn = QPushButton()
self._init_line1()
# Line 2
self._walletPubKey_Qlbl = QLabel()
self.walletPubKey_Qled = QLineEdit()
self._copyToClipboard_Qpbtn = QPushButton()
self._init_line2()
# Line 3
self._realBalance_Qlbl = QLabel()
self.realBalance_Qled = QLineEdit()
self._demoBalance_Qlbl = QLabel()
self.demoBalance_Qled = QLineEdit()
self._init_line3()
# Line 4
self._tokens_Qlbl = QLabel()
self._init_line4()
# # Line 5
self._tokens_qtbvmb = QTableView()
self._tokens_wtQtm = WalletTokensTableModel()
self._init_tokens_qtbvmb()
self.update_display()
self.populate_table()
def set_wallet(self, wallet: Wallet, file_pass: str, file_path: str):
""" changing the wallet """
self.save_wallet()
self.fileKey = file_pass
self.filePath = file_path
self.wallet = wallet
self.update_wallet_display()
def update_wallet_display(self):
""" refresh the display of the wallet data """
if self.wallet is not None:
self.walletPubKey_Qled.setText(self.wallet.walletPubKey)
self.walletName_Qled.setText(self.wallet.walletName)
self.realBalance_Qled.setText(f"{self.wallet.mainSolBalance:.9f}")
self.demoBalance_Qled.setText(f"{self.wallet.devSolBalance:.9f}")
else:
self.walletPubKey_Qled.setText("")
self.walletName_Qled.setText("")
self.realBalance_Qled.setText(f"{0:.9f}")
self.demoBalance_Qled.setText(f"{0:.9f}")
def save_wallet(self):
""" saving the wallet content to the wallet file """
if self.wallet is not None and self.fileKey is not None and self.filePath is not None:
encrypted_data = encrypt_string(self.fileKey, APP_SALT.encode("utf-8"), self.wallet.to_json())
with LockedFile(self.filePath, "w+b") as file:
file.seek(0)
file.truncate()
file.write(encrypted_data.encode("utf-8"))
def _init_line1(self):
self.walletName_Qled.setAlignment(get_qt_align_center())
self._saveWallet_Qpbtn.setIcon(QIcon(":icoSave"))
self._saveWallet_Qpbtn.setMaximumWidth(50)
self._saveWallet_Qpbtn.clicked.connect(self.save_wallet)
self._main_Qglo.addWidget(self._walletName_Qlbl, 0, 0, 1, 2)
self._main_Qglo.addWidget(self.walletName_Qled, 0, 2, 1, 9)
self._main_Qglo.addWidget(self._saveWallet_Qpbtn, 0, 11, 1, 1)
def _init_line2(self):
self.walletPubKey_Qled.setAlignment(get_qt_align_center())
self.walletPubKey_Qled.setText("XXXXXXXXXXXX")
self.walletPubKey_Qled.setReadOnly(True)
self._copyToClipboard_Qpbtn.setIcon(QIcon(":icoCopy"))
self._copyToClipboard_Qpbtn.setMaximumWidth(50)
self._copyToClipboard_Qpbtn.clicked.connect(self.copy_wallet_priv_key_to_clipboard)
self._main_Qglo.addWidget(self._walletPubKey_Qlbl, 1, 0, 1, 2)
self._main_Qglo.addWidget(self.walletPubKey_Qled, 1, 2, 1, 9)
self._main_Qglo.addWidget(self._copyToClipboard_Qpbtn, 1, 11, 1, 1)
def _init_line3(self):
self.realBalance_Qled.setAlignment(get_qt_align_vchr())
self.realBalance_Qled.setText("10.000000000")
self.realBalance_Qled.setReadOnly(True)
self.demoBalance_Qled.setAlignment(get_qt_align_vchr())
self.demoBalance_Qled.setText("10.000000000")
self.demoBalance_Qled.setReadOnly(True)
self._main_Qglo.addWidget(self._realBalance_Qlbl, 2, 0, 1, 2)
self._main_Qglo.addWidget(self.realBalance_Qled, 2, 2, 1, 3)
self._main_Qglo.addWidget(self._demoBalance_Qlbl, 2, 7, 1, 2)
self._main_Qglo.addWidget(self.demoBalance_Qled, 2, 9, 1, 3)
def _init_line4(self):
self._tokens_Qlbl.setAlignment(get_qt_align_center())
self._main_Qglo.addWidget(self._tokens_Qlbl, 3, 0, 1, 12)
def _init_tokens_qtbvmb(self):
self._tokens_qtbvmb.setModel(self._tokens_wtQtm)
# entire line selection
self._tokens_qtbvmb.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
# disable editing attempt
self._tokens_qtbvmb.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers)
# alternate colors
self._tokens_qtbvmb.setAlternatingRowColors(True)
# fill the background
self._tokens_qtbvmb.setAutoFillBackground(True)
# enable sorting
self._tokens_qtbvmb.setSortingEnabled(True)
# configuring column sizes
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) # Première colonne
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents) # Deuxième colonne
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeMode.Interactive) # Troisième colonne
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch) # Quatrième colonne
self._tokens_qtbvmb.setTextElideMode(Qt.TextElideMode.ElideNone)
self._tokens_qtbvmb.setWordWrap(False)
self._main_Qglo.addWidget(self._tokens_qtbvmb, 4, 0, 1, 12)
def update_display(self):
""" update the display of the widget """
global is_rtl
self.setLayoutDirection(get_qt_dir())
self._walletName_Qlbl.setText(self.tr("Wallet Name:"))
self._walletName_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
self._walletPubKey_Qlbl.setText(self.tr("Wallet PubKey:"))
self._walletPubKey_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
self._realBalance_Qlbl.setText(self.tr("Balance Real:"))
self._realBalance_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
self._demoBalance_Qlbl.setText(self.tr("Balance Demo:"))
self._demoBalance_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
self._tokens_Qlbl.setText(self.tr("My Tokens:"))
self._tokens_wtQtm.set_headers([self.tr("Token Id"), self.tr("Buy Price"), self.tr("Buy Time"), self.tr("Action")])
def populate_table(self):
""" populate the tableview...
TODO: muste change the model etc...
"""
data = []
for x in range(10):
token_id = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k = 3))
buy_price = f"{random.uniform(0.000000001, 1.000000000):.9f}"
buy_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
action = self.tr("Sell") # Placeholder for button-like behavior
token = {
"token_id": token_id, # token_id
"buy_price": buy_price, # Niveau du log
"buy_time": buy_time, # Message
"action": action # Message
}
data.append(token)
self._tokens_wtQtm.set_data(data)
def copy_wallet_priv_key_to_clipboard(self):
""" Copier le texte de QLineEdit dans le presse-papiers. """
clipboard = QApplication.clipboard()
clipboard.setText(self.walletPubKey_Qled.text())
logger.info(self.tr("walletId copied to clipboard"))
class LogHandler:
""" Gestionnaire personnalisé pour diriger les logs de loguru vers un QTableView. """
def __init__(self, emitter: DictSignal):
self._logEmitter = emitter
def __call__(self, message: str):
""" Appelé par loguru pour chaque message de log. """
parsed_log = self._parse_log_message(message)
if parsed_log:
self._logEmitter.signal(parsed_log)
@staticmethod
def _parse_log_message(message: str):
""" Parse un message de log loguru au format attendu (timestamp, LEVEL, message). """
parts = message.split(" -|- ", 2) # On attend le format "{time} -|- {level} -|- {message}"
if len(parts) == 3:
return {
"timestamp": parts[0], # Timestamp
"level": parts[1], # Niveau du log
"message": parts[2] # Message
}
return None
class LogsTableModel(QAbstractTableModel):
""" Model to manage the display of logs in a QTableView. """
def __init__(self, display_lines: int = 100):
super().__init__()
self._dataList = [] # logs storage
self._maxLines = display_lines # max lines display
self._headers = [self.tr("Timestamp"), self.tr("Level"), self.tr("Message")]
self._sortColumn = 0
self._sortOrder = Qt.SortOrder.AscendingOrder
self.sort(self._sortColumn, self._sortOrder)
def rowCount(self, parent = QModelIndex()):
""" return the number of rows in the table """
return len(self._dataList)
def columnCount(self, parent = QModelIndex()):
""" return the number of columns in the table """
return len(self._headers)
def headerData(self, section: int, orientation: Qt.Orientation, role: Qt.ItemDataRole = Qt.ItemDataRole.DisplayRole) -> Any:
""" Définit les en-têtes des colonnes. """
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self._headers[section]
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole) -> Any:
""" return the data to display in the QTableView """
if not index.isValid():
return None
elif role == Qt.ItemDataRole.DisplayRole:
log = self._dataList[index.row()]
if index.column() == 0: # Timestamp
return log["timestamp"]
elif index.column() == 1: # Level
return log["level"]
elif index.column() == 2: # Message
return log["message"]
elif role == Qt.ItemDataRole.BackgroundRole and index.column() == 1: # Level
log = self._dataList[index.row()]
if log["level"] == 'DEBUG':
return QColor(Qt.GlobalColor.cyan)
elif log["level"] == 'INFO':
return QColor(Qt.GlobalColor.white)
elif log["level"] == 'SUCCESS':
return QColor(Qt.GlobalColor.green)
elif log["level"] == 'WARNING':
return QColor(Qt.GlobalColor.yellow)
elif log["level"] == 'ERROR':
return QColor(Qt.GlobalColor.red)
elif log["level"] == 'CRITICAL':
return QColor(Qt.GlobalColor.darkRed)
elif role == Qt.ItemDataRole.ForegroundRole and index.column() == 1: # Level
log = self._dataList[index.row()]
if log["level"] == 'DEBUG':
return QColor(Qt.GlobalColor.darkGray)
elif log["level"] == 'INFO':
return QColor(Qt.GlobalColor.black)
elif log["level"] == 'SUCCESS':
return QColor(Qt.GlobalColor.darkGray)
elif log["level"] == 'WARNING':
return QColor(Qt.GlobalColor.lightGray)
elif log["level"] == 'ERROR':
return QColor(Qt.GlobalColor.darkGray)
elif log["level"] == 'CRITICAL':
return QColor(Qt.GlobalColor.lightGray)
elif role == Qt.ItemDataRole.FontRole:
if index.column() == 1: # Level
# log = self._dataList[index.row()]
font = QFont("Helvetica")
font.setBold(True)
return font
return None
elif role == Qt.ItemDataRole.TextAlignmentRole:
global is_rtl
if index.column() == 0:
return get_qt_align_vchr()
elif index.column() == 1:
return get_qt_align_center()
elif index.column() == 2:
return get_qt_align_vchl()
return None
def sort(self, column: int, order: Qt.SortOrder = Qt.SortOrder.DescendingOrder):
""" sort the datas """
self._sortColumn = column
self._sortOrder = order
# Sorting logic
if column == 0: # Sorting by "timestamp"
self._dataList.sort(key = lambda log: log["timestamp"], reverse = (order == Qt.SortOrder.AscendingOrder))
elif column == 1: # Sorting by "level"
self._dataList.sort(key = lambda log: log["level"], reverse = (order == Qt.SortOrder.AscendingOrder))
elif column == 2: # Sorting by "message"
self._dataList.sort(key = lambda log: log["message"], reverse = (order == Qt.SortOrder.AscendingOrder))
# Notify view that data has changed after sorting
self.layoutChanged.emit()
def set_headers(self, new_headers: list):
""" used to update the header column content (on lang change) """
self._headers = new_headers
self.headerDataChanged.emit(Qt.Orientation.Horizontal, 0, len(self._headers) - 1)
self.layoutChanged.emit()
def add_data(self, log: dict):
""" Adds a log to the model while respecting the maximum row limit. """
self._dataList.append(log)
while len(self._dataList) > self._maxLines:
self._dataList.pop(0) # Supprime les anciennes lignes
self.sort(self._sortColumn, self._sortOrder)
self.layoutChanged.emit()
def set_data(self, data: list):
""" change the data of the table model """
self._dataList = data
self.sort(self._sortColumn, self._sortOrder)
self.layoutChanged.emit()
class LogViewerWidget(QWidget):
""" Main widget containing a QTableView to display logs. """
def __init__(self, max_display_lines: int = 100, parent = None):
super().__init__(parent)
self._main_Qvlo = QVBoxLayout()
self.setLayout(self._main_Qvlo)
self._main_Qvlo.setContentsMargins(0, 0, 0, 0)
self._logs_Qtvmb = QTableView(self)
self._main_Qvlo.addWidget(self._logs_Qtvmb)
self._logs_lQtm = LogsTableModel(max_display_lines)
self._logs_Qtvmb.setModel(self._logs_lQtm)
self._logs_Qtvmb.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
self._logs_Qtvmb.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers) # Lecture seule
self._logs_Qtvmb.setAlternatingRowColors(True)
self._logs_Qtvmb.setAutoFillBackground(True)
self._logs_Qtvmb.setSortingEnabled(True)
self._logs_Qtvmb.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) # Première colonne
self._logs_Qtvmb.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents) # Deuxième colonne
self._logs_Qtvmb.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeMode.Stretch) # Troisième colonne
self._logs_Qtvmb.setTextElideMode(Qt.TextElideMode.ElideNone)
self._logs_Qtvmb.setWordWrap(False)
self._logEmitter = DictSignal()
self._logEmitter.conn(self._add_log)
log_handler = LogHandler(self._logEmitter)
logger.add(log_handler, format = "{time:YYYY-MM-DD HH:mm:ss.SSS} -|- {level} -|- {message}", level = "INFO")
self.update_display()
@Slot(dict)
def _add_log(self, log: dict):
""" Ajoute un log au modèle et effectue le défilement automatique. """
self._logs_lQtm.add_data(log)
def update_display(self):
""" call on lang change """
self._logs_lQtm.set_headers([self.tr("Timestamp"), self.tr("Level"), self.tr("Message")])
self.setLayoutDirection(get_qt_dir())
class MainWindow(QMainWindow):
""" Main Window class """
def __init__(self, appli: QApplication, trans: QTranslator, cfg: dict = None):
super().__init__()
self.app = appli
self.config = cfg if cfg is not None else DEFAULT_YAML_CONFIG
self.translator = trans
self.currentLang = self.config['defaultLang']
self.setWindowIcon(QIcon(":imgIcon"))
self.setWindowTitle(APP_NAME_VERSION)
self.setMinimumSize(1024, 768)
self.resize(max(self.config.get('lastWidth'), 1024), max(self.config.get('lastHeight'), 768))
self._mustCenter = True
self._connected = False
# Main Menubar
self.main_Qmnub = self.menuBar()
# File Menu
self.file_Qmnu = QMenu()
self.newWallet_Qact = QAction()
self.newWallet_Qact.setIcon(QIcon(":icoWalletNew"))
self.openWallet_Qact = QAction()
self.openWallet_Qact.setIcon(QIcon(":icoWalletOpen"))
self.filesHistory_Qmnu = QMenu()
self.filesHistory_Qmnu.setIcon(QIcon(":icoLastFiles"))
self.noFileHistory_Qact = QAction()
self.noFileHistory_Qact.setIcon(QIcon(":icoNoHistory"))
self.noFileHistory_Qact.setDisabled(True)
self.conn_disc_Qact = QAction()
self.conn_disc_Qact.setIcon(QIcon(":icoConnect"))
self.hideApp_Qact = QAction()
self.hideApp_Qact.setIcon(QIcon(":icoTrayHide"))
self.closeApp_Qact = QAction()
self.closeApp_Qact.setIcon(QIcon(":icoQuit"))
self.forceCloseApp_Qact = QAction()
self.forceCloseApp_Qact.setIcon(QIcon(":icoForceQuit"))
# finalize initialisation of File Menu
self._init_file_qmnu()
# Tools Menu
self.tools_Qmnu = QMenu()
self.checkAddress_Qact = QAction()
self.checkAddress_Qact.setIcon(QIcon(":icoCheckAddress"))
self.solana_addr_check = False
# finalize initialisation of File Menu
self._init_tools_qmnu()
# Langs Menu
self.langs_Qmnu = QMenu()
self.changeLangAr_Qact = QAction(QIcon(":icoLang"), "العربية")
self.changeLangDe_Qact = QAction(QIcon(":icoLang"), "Deutsch")
self.changeLangEn_Qact = QAction(QIcon(":icoLang"), "English")
self.changeLangEs_Qact = QAction(QIcon(":icoLang"), "Español")
self.changeLangFr_Qact = QAction(QIcon(":icoLang"), "Français")
self.changeLangHe_Qact = QAction(QIcon(":icoLang"), "עברית")
self.changeLangHi_Qact = QAction(QIcon(":icoLang"), "हिन्दी")
self.changeLangIt_Qact = QAction(QIcon(":icoLang"), "Italiano")
self.changeLangJa_Qact = QAction(QIcon(":icoLang"), "日本語")
self.changeLangKo_Qact = QAction(QIcon(":icoLang"), "한국어")
self.changeLangPt_Qact = QAction(QIcon(":icoLang"), "Português")
self.changeLangRu_Qact = QAction(QIcon(":icoLang"), "Русский")
self.changeLangZh_Qact = QAction(QIcon(":icoLang"), "中文")
# finalize initialisation of Langs Menu
self._init_langs_qmnu()
# Help Menu
self.help_Qmnu = QMenu()
self.help_Qact = QAction()
self.help_Qact.setIcon(QIcon(":icoHelp"))
self.about_Qact = QAction()
self.about_Qact.setIcon(QIcon(":icoAbout"))
self._init_help_qmnu()
# set texts and titles for Main Menubar
# Main Toolbar
self.main_Qtb = QToolBar()
self._init_main_toolbar()
# Initialisation de l'icône du System Tray
self.tray_Qsti = QSystemTrayIcon(QIcon(":imgIcon"))
# self.trayIcon.setToolTip(f"{APP_FULL_NAME} ({APP_VERSION})") # don't work on linux ???
# Menu du trayIcon
self.tray_Qmnu = QMenu()
self.showApp_Qact = QAction()
self._init_tray_qmnu()
# Central widget: QTabWidget
self.main_Qtabw = QTabWidget()
self.setCentralWidget(self.main_Qtabw)
self.token_qw = QWidget()
self.main_Qtabw.addTab(self.token_qw, QIcon(":icoTokens"), self.tr("Tokens:"))
self.token_qw.setLayout(QVBoxLayout())
self.token_qw.layout().addWidget(QTableView())
self.wallet_Qw = WalletWidget()
self.main_Qtabw.addTab(self.wallet_Qw, QIcon(":icoWallet"), self.tr("Wallet: {walletName}").format(walletName = ""))
self.logViewer_Qw = LogViewerWidget(1000)
self.main_Qtabw.addTab(self.logViewer_Qw, QIcon(":icoLogs"), self.tr("Logs:"))
self.nbrWsClients = 2
self.nbrAsyncHttpPostClients = 4
self.nbrHttpPostClients = 4
self.connPool = ConnectionPool(URL_Ws_MainNet, URL_Http_MainNet, self.nbrWsClients, self.nbrAsyncHttpPostClients, self.nbrHttpPostClients)
# declaration des pixmaps pour le statusbar
self.wsDisconnected_Qpxm = QPixmap(":icoDisconnected")
self.wsConnected_Qpxm = QPixmap(":icoConnected")
self.httpNone_Qpxm = QPixmap(":icoHttpNone")
self.httpSend_Qpxm = QPixmap(":icoHttpSend")
self.httpWait_Qpxm = QPixmap(":icoHttpWait")
self.httpReceive_Qpxm = QPixmap(":icoHttpReceive")
self.httpError_Qpxm = QPixmap(":icoHttpError")
self.reachabilityDisconnected_Qpxm = QPixmap(":icoReachabilityDisconnected")
self.reachabilityLocal_Qpxm = QPixmap(":icoReachabilityLocal")
self.reachabilityOnline_Qpxm = QPixmap(":icoReachabilityOnline")
self.reachabilitySite_Qpxm = QPixmap(":icoReachabilitySite")
self.reachabilityUnknown_Qpxm = QPixmap(":icoReachabilityUnknown")
self.main_Qstb = self.statusBar()
self.setStatusBar(self.main_Qstb)
self.wsCnxImgs = []
self.httpMsgImgs = []
self.netReachabilityImg = QLabel()
try:
QNetworkInformation.loadBackendByFeatures(QNetworkInformation.Feature.Reachability)
self._network_info = QNetworkInformation.instance()
if self._network_info:
self._network_info.reachabilityChanged.connect(self.on_reachability_changed)
self.check_connectivity()
except Exception as e:
self.netReachabilityImg.setPixmap(self.reachabilityUnknown_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.netReachabilityImg.setToolTip(self.tr("Unknown"))
logger.error(self.tr(f"Error while loading QNetworkInformation: {str(e)}"))
self.main_Qstb.addWidget(self.netReachabilityImg)
self.main_Qstb.addWidget(self._get_vertical_separator())
self.wsMainCnxImg = QLabel()
self.wsMainCnxImg.setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.main_Qstb.addWidget(self.wsMainCnxImg)
self.connPool.wsMainConnexionStatusSig.conn(self._update_ws_main_connection_status)
self.main_Qstb.addWidget(self._get_vertical_separator())
for idx in range(self.nbrWsClients):
lbl = QLabel()
lbl.setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.wsCnxImgs.append(lbl)
self.main_Qstb.addWidget(lbl)
# Connecter le signal
self.connPool.wsConnexionStatusSigs[idx].conn(lambda state, cid = idx: self._update_ws_connection_status(cid, state))
self.main_Qstb.addWidget(self._get_vertical_separator())
for idx in range(self.nbrHttpPostClients):
lbl = QLabel()
lbl.setPixmap(self.httpNone_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.httpMsgImgs.append(lbl)
self.main_Qstb.addWidget(lbl)
# Connecter le signal
self.connPool.httpPostSentSigs[idx].conn(lambda cid = idx: self.update_http_pxm_sent(cid))
self.connPool.httpReplyReceivedSigs[idx].conn(lambda cid = idx: self.update_http_pxm_received(cid))
self.main_Qstb.addPermanentWidget(self._get_vertical_separator())
self.statusMsg_Qled = QLineEdit()
self.statusMsg_Qled.setReadOnly(True)
self.statusMsg_Qled.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
# self.statusMsg_Qled.setFrame(False)
self.statusMsg_Qled.setStyleSheet("QLineEdit {border: none; background: transparent;}")
self.statusMsg_Qled.setText(self.tr("Ready"))
self.main_Qstb.addPermanentWidget(self.statusMsg_Qled)
self.connPool.wsMainReplyReceivedSig.conn(self.read_ws_main_sol_msg)
self.connPool.wsReplyReceivedSig.conn(self.read_ws_sol_msg)
self.connPool.httpReplyReceivedSig.conn(self.read_http_sol_msg)
self.connPool.logSig.conn(self.show_pool_logs)
self.update_display()
logger.success(self.tr("Init of {appName}").format(appName = APP_ABOUT_NAME))
def on_reachability_changed(self, reachability):
self.check_connectivity()
def check_connectivity(self):
if self._network_info:
reachability = self._network_info.reachability()
if reachability == QNetworkInformation.Reachability.Online:
self.netReachabilityImg.setPixmap(self.reachabilityOnline_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.netReachabilityImg.setToolTip(self.tr("Online"))
elif reachability == QNetworkInformation.Reachability.Local:
self.netReachabilityImg.setPixmap(self.reachabilityLocal_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.netReachabilityImg.setToolTip(self.tr("Local"))
elif reachability == QNetworkInformation.Reachability.Site:
self.netReachabilityImg.setPixmap(self.reachabilitySite_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.netReachabilityImg.setToolTip(self.tr("Site"))
elif reachability == QNetworkInformation.Reachability.Disconnected:
self.netReachabilityImg.setPixmap(self.reachabilityDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.netReachabilityImg.setToolTip(self.tr("Disconnected"))
elif reachability == QNetworkInformation.Reachability.Unknown:
self.netReachabilityImg.setPixmap(self.reachabilityUnknown_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.netReachabilityImg.setToolTip(self.tr("Unknown"))
else:
self.netReachabilityImg.setPixmap(self.reachabilityUnknown_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self.netReachabilityImg.setToolTip(self.tr("Unknown"))
def show_pool_logs(self, msg: str):
self.statusMsg_Qled.setText(self.tr(msg))
if self.isHidden() or self.isMinimized():
self.tray_Qsti.showMessage(APP_NAME, self.tr(msg), QIcon(":imgFavicon"), 2500)
def read_ws_main_sol_msg(self):
""" Lis le dernier message """
(ts, msg) = self.connPool.read_first_ws_main_msg()
parsed = parse_websocket_message(msg)
for item in parsed:
if isinstance(item, SubscriptionError):
logger.error(item.error)
if isinstance(item, SubscriptionResult):
logger.info(item.to_json())
if isinstance(item, LogsNotification) and item.result.value.err is None:
logs = set(item.result.value.logs)
search = "initialize2"
found = False
for message in logs:
if search in message:
found = True
if found:
signature = item.result.value.signature
logger.info(f"See, <a href=\"https://solscan.io/tx/{signature}\">https://solscan.io/tx/{signature}</a>")
break
def read_ws_sol_msg(self):
""" Lis le dernier message """
(ts, msg) = self.connPool.read_first_ws_msg()
parsed = parse_websocket_message(msg)
for item in parsed:
if isinstance(item, SubscriptionError):
logger.error(item.error)
if isinstance(item, SubscriptionResult):
logger.info(item.to_json())
if isinstance(item, LogsNotification) and item.result.value.err is None:
logs = set(item.result.value.logs)
search = "initialize2"
found = False
for message in logs:
if search in message:
found = True
if found:
signature = item.result.value.signature
logger.info(f"See, <a href=\"https://solscan.io/tx/{signature}\">https://solscan.io/tx/{signature}</a>")
break
def read_http_sol_msg(self):
""" Lis le dernier message """
(ts, msg) = self.connPool.read_first_http_msg()
parsed = parse_websocket_message(msg)
for item in parsed:
if isinstance(item, SubscriptionError):
logger.error(item.error)
if isinstance(item, SubscriptionResult):
logger.info(item.to_json())
if isinstance(item, LogsNotification) and item.result.value.err is None:
logs = set(item.result.value.logs)
search = "initialize2"
found = False
for message in logs:
if search in message:
found = True
if found:
signature = item.result.value.signature
logger.info(f"See, <a href=\"https://solscan.io/tx/{signature}\">https://solscan.io/tx/{signature}</a>")
break
def _update_ws_main_connection_status(self, state: QAbstractSocket.SocketState):
""" Met à jour l'image d'un QLabel WebSocket en fonction de son état. """
if state == QAbstractSocket.SocketState.ConnectedState:
self.wsMainCnxImg.setPixmap(self.wsConnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self._connected = True
self.conn_disc_Qact.setIcon(QIcon(":icoDisconnect"))
self.conn_disc_Qact.setText(self.tr("Disconnect"))
self.conn_disc_Qact.setDisabled(False)
(req_id, req_cmd, req) = WsReqGen.logs_subscribe(RpcTransactionLogsFilterMentions(PK_RaydiumLPV4), CommitmentLevel.Finalized)
self.connPool.send_websocket_main_request(req_cmd, req.to_json())
logger.debug(f"{req.to_json()} sent")
elif state == QAbstractSocket.SocketState.UnconnectedState:
self.wsMainCnxImg.setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
self._connected = False
self.conn_disc_Qact.setIcon(QIcon(":icoConnect"))
self.conn_disc_Qact.setText(self.tr("Connect"))
self.conn_disc_Qact.setDisabled(False)
SharedCounter.reset_id()
def _update_ws_connection_status(self, index: int, state: QAbstractSocket.SocketState):
""" Met à jour l'image d'un QLabel WebSocket en fonction de son état. """
if state == QAbstractSocket.SocketState.ConnectedState:
self.wsCnxImgs[index].setPixmap(self.wsConnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
elif state == QAbstractSocket.SocketState.UnconnectedState:
self.wsCnxImgs[index].setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
def update_http_pxm_sent(self, index: int):
""" Met à jour l'image d'un QLabel HTTP en état 'Envoyé'. """
self.httpMsgImgs[index].setPixmap(self.httpSend_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
QTimer.singleShot(200, lambda: self.httpMsgImgs[index].setPixmap(self.httpWait_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
def update_http_pxm_received(self, index: int):
""" Met à jour l'image d'un QLabel HTTP en état 'Reçu'. """
self.httpMsgImgs[index].setPixmap(self.httpReceive_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
QTimer.singleShot(200, lambda: self.httpMsgImgs[index].setPixmap(self.httpNone_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
def _get_vertical_separator(self) -> QFrame:
""" Create and return vertical separator."""
sep = QFrame(self)
sep.setFrameShape(QFrame.Shape.VLine)
sep.setFrameShadow(QFrame.Shadow.Sunken)
return sep
def first_load(self):
""" called the first time from main to load or create wallet """
last_open_is_maximized = self.config.get('lastMaximized', True)
if last_open_is_maximized:
self.showMaximized()
else:
self.show()
last_file = self.config["lastFile"]
if last_file is None or last_file == "":
self._init_new_or_load_wallet()
elif not os.path.exists(last_file):
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' does not exists. Please choose a new file.").format(fileName = last_file), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
self._init_new_or_load_wallet()
else:
self._ask_load_wallet_pass(last_file, True)
def _quit_app_func(self, force: bool = False):
""" called to Quit the application """
if force:
msg_qmbox = SimpleMsgBox(self.tr("Do you really want to Quit?"), self.tr("If you accept, nothing will be saved\nand the application will be closed."), QMessageBox.Icon.Critical, self)
else:
msg_qmbox = SimpleMsgBox(self.tr("Do you really want to Quit?"), self.tr("If you accept, all the files will be saved\nand the application will be closed."), QMessageBox.Icon.Question, self)
yes_qpbtn = msg_qmbox.addButton(self.tr("Yes"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.addButton(self.tr("No"), QMessageBox.ButtonRole.ActionRole) # no_qpbtn = msg_qmbox.addButton(self.tr("No"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
if msg_qmbox.clickedButton() == yes_qpbtn:
if not force:
self.config['lastMaximized'] = self.isMaximized()
# if self.isMaximized():
# self.showNormal()
# TODO: save size of non maximized window
self.config['lastHeight'] = self.height()
self.config['lastWidth'] = self.width()
# saving self.currentLang in config memory
self.config['defaultLang'] = self.currentLang
# save memory config to file
save_yaml_app_config(self.config)
self.wallet_Qw.save_wallet()
self.logViewer_Qw.close()
self.connPool.clean_up()
self.connPool = None
self.tray_Qsti.deleteLater()
logger.debug(self.tr("Closing App"))
# close instance
self.app.quit()
def _change_lang(self, lang_code, lang_name):
if self.translator.load(f":trans_{lang_code}"):
self.app.installTranslator(self.translator)
self.currentLang = lang_code
set_app_dir(self.currentLang)
logger.info(self.tr("Switching to language {lang}.").format(lang = lang_name))
self.update_display()
def _init_tray_qmnu(self):
self.tray_Qsti.setContextMenu(self.tray_Qmnu)
self.tray_Qmnu.addAction(self.hideApp_Qact)
self.showApp_Qact.setIcon(QIcon(":icoTrayShow"))
self.showApp_Qact.setDisabled(True)
self.showApp_Qact.triggered.connect(self.show)
self.tray_Qmnu.addAction(self.showApp_Qact)
self.tray_Qmnu.addAction(self.closeApp_Qact)
self.tray_Qmnu.addAction(self.forceCloseApp_Qact)
self.tray_Qsti.show()
def _show_about_window(self):
""" Show About Dialog. """
bg_txt = f"<h3>{APP_FULL_NAME}: {APP_VERSION}<br>PYTHON: {platform.python_version()}<br>Qt: {pyside6version}<br>OS: {platform.system()}</h3>"
about_window = AboutWindow(bg_txt, self)
about_window.show()
def _show_help_window(self):
""" Show Help Dialog. """
help_window = HelpWindow(":pdfHelpFr", self)
help_window.show()
def _init_main_toolbar(self):
""" Initialize the main ToolBar """
self.addToolBar(self.main_Qtb)
self.main_Qtb.setMovable(False)
favicon_toolbar_qlbl = QLabel()
favicon_toolbar_qlbl.setPixmap(QPixmap(":imgFavicon").scaled(24, 24, Qt.AspectRatioMode.KeepAspectRatio))
self.main_Qtb.addWidget(favicon_toolbar_qlbl)
self.main_Qtb.addSeparator()
self.main_Qtb.addAction(self.openWallet_Qact)
self.main_Qtb.addAction(self.newWallet_Qact)
self.main_Qtb.addSeparator()
self.main_Qtb.addAction(self.conn_disc_Qact)
self.main_Qtb.addSeparator()
# self.toolbarPcp.addAction(self.actionOpenWsMainNetConnexion)
def _init_new_or_load_wallet(self):
msg_qmbox = SimpleMsgBox(self.tr("Action Required"), self.tr("To use the application, you have to load a wallet or to create a new one."), QMessageBox.Icon.Critical, self)
load_qpbtn = msg_qmbox.addButton(self.tr("Load Wallet"), QMessageBox.ButtonRole.ActionRole)
create_qpbtn = msg_qmbox.addButton(self.tr("New Wallet"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.addButton(self.tr("Quit"), QMessageBox.ButtonRole.RejectRole) # quit_qpbtn = msg_qmbox.addButton(self.tr("No"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
if msg_qmbox.clickedButton() == load_qpbtn:
create_wallets_folder()
self._ask_load_wallet_filepath(True)
elif msg_qmbox.clickedButton() == create_qpbtn:
create_wallets_folder()
self._ask_new_wallet_name_and_pass(True)
else:
self.close()
QApplication.instance().quit()
return
def _create_new_wallet(self):
create_wallets_folder()
self._ask_new_wallet_name_and_pass()
def _ask_new_wallet_name_and_pass(self, return_to_prev: bool = False):
wallet_dialog = NewWalletNameAndPassDialog(self)
if wallet_dialog.exec():
new_wallet_name = wallet_dialog.walletName
new_wallet_pass = wallet_dialog.walletEncyptPass
self._ask_new_wallet_filepath(new_wallet_name, new_wallet_pass, return_to_prev)
elif return_to_prev:
self._init_new_or_load_wallet()
def _ask_new_wallet_filepath(self, new_wallet_name: str, new_wallet_pass: str, return_to_prev: bool = False):
while True:
saved_file = False
# Ouvre le QFileDialog pour choisir un emplacement et un nom de fichier
dialog = NewWalletFileDialog(self)
if dialog.exec():
file_path = dialog.selectedFiles()[0]
# Ajoute l'extension .kew si elle est manquante
if not file_path.endswith("." + WALLET_FILE_EXT):
file_path += "." + WALLET_FILE_EXT
file_name = os.path.basename(file_path)
# Vérifie si le fichier existe déjà
if os.path.exists(file_path):
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' already exists. Please choose a new file.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
continue # Relance le dialogue pour forcer un nouveau fichier
# Enregistre le contenu dans le nouveau fichier
try:
keypair = Keypair()
wallet = Wallet(wallet_name = new_wallet_name, wallet_pub_key = str(keypair.pubkey()), wallet_priv_key = str(keypair.secret()))
encrypted_data = encrypt_string(new_wallet_pass, APP_SALT.encode("utf-8"), wallet.to_json())
with LockedFile(file_path, "w+b") as file:
file.write(encrypted_data.encode("utf-8"))
file_path = os.path.realpath(file_path)
if file_path not in self.config["lastFiles"]:
self.config["lastFiles"].append(file_path)
while len(self.config["lastFiles"]) > 5:
self.config["lastFiles"].pop(0)
self._refresh_files_history_qmnu()
self.config["lastFile"] = file_path
self.wallet_Qw.set_wallet(wallet, new_wallet_pass, file_path)
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = wallet.walletName))
saved_file = True
break # Quitte la boucle après avoir sauvegardé
except Exception as e:
logger.error(e)
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("An error occurred while saving the file '{fileName}'.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
break # Quitte la boucle en cas d'erreur
else:
break # L'utilisateur a annulé.
if saved_file:
return
if return_to_prev:
self._ask_new_wallet_name_and_pass(return_to_prev)
def _load_wallet(self):
self._ask_load_wallet_filepath()
def _ask_load_wallet_filepath(self, return_to_prev: bool = False):
loaded_file = False
file_path = None
while True:
dialog = LoadWalletFileDialog(self)
if dialog.exec():
file_path = dialog.selectedFiles()[0]
file_name = os.path.basename(file_path)
if not os.path.exists(file_path):
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' does not exists. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
elif self.config["lastFile"] is None or file_path != self.config["lastFile"] or return_to_prev:
loaded_file = True
else:
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' is already loaded. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
continue # Relance le dialogue pour forcer un nouveau fichier
if loaded_file:
break
# Relance le dialogue pour forcer un nouveau fichier
continue
else:
break # L'utilisateur a annulé.
if loaded_file:
self._ask_load_wallet_pass(file_path, return_to_prev)
return
if return_to_prev:
self._init_new_or_load_wallet()
def _ask_load_wallet_pass(self, file_path: str, return_to_prev: bool = False):
loaded_file = False
wallet = None
wallet_pass = None
while not loaded_file:
file_name = os.path.basename(file_path)
wallet_dialog = LoadWalletPassDialog(file_name, self)
if wallet_dialog.exec():
wallet_pass = wallet_dialog.walletEncyptPass
try:
with LockedFile(file_path, "rb") as file:
enc_data = file.read()
wallet = Wallet.from_json(decrypt_string(wallet_pass, APP_SALT.encode("utf-8"), enc_data.decode()))
loaded_file = True
break
except portalocker.exceptions.LockException as e:
logger.error(self.tr("LockException for Wallet file {fileName} {exp}").format(fileName = file_path, exp = e))
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' seems to be locked. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
loaded_file = False
break
except InvalidToken as e:
logger.error(self.tr("Wrong password for Wallet file {fileName} {exp}").format(fileName = file_path, exp = e))
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The password you provided for file '{fileName}' is incorrect. Please try again.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
loaded_file = False
continue
except json.JSONDecodeError as e:
logger.error(self.tr("Cannot decode json in Wallet file {fileName} {exp}").format(fileName = file_path, exp = e))
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The file '{fileName}' seems to be corrupted. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
loaded_file = False
break
except ValueError as e:
logger.error(self.tr("Wrong values in {fileName} {exp}").format(fileName = file_path, exp = e))
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The file '{fileName}' seems to have wrong values. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
msg_qmbox.exec()
loaded_file = False
break
else:
break
if loaded_file:
file_path = os.path.realpath(file_path)
if file_path not in self.config["lastFiles"]:
self.config["lastFiles"].append(file_path)
while len(self.config["lastFiles"]) > 5:
self.config["lastFiles"].pop(0)
self._refresh_files_history_qmnu()
self.config["lastFile"] = file_path
self.wallet_Qw.set_wallet(wallet, wallet_pass, file_path)
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = wallet.walletName))
return
if return_to_prev:
self._ask_load_wallet_filepath(return_to_prev)
def _init_file_qmnu(self):
""" Initialize File Menu. """
# add File Menu to Main MenuBar
self.main_Qmnub.addMenu(self.file_Qmnu)
# add action "New Wallet" to File Menu
self.file_Qmnu.addAction(self.newWallet_Qact)
# set action target
self.newWallet_Qact.triggered.connect(self._create_new_wallet)
# add action "Open Wallet" to File Menu
self.file_Qmnu.addAction(self.openWallet_Qact)
# set action target
self.openWallet_Qact.triggered.connect(self._load_wallet)
self.file_Qmnu.addSeparator()
# add files
self.file_Qmnu.addMenu(self.filesHistory_Qmnu)
self._refresh_files_history_qmnu()
self.file_Qmnu.addSeparator()
self.file_Qmnu.addAction(self.conn_disc_Qact)
# set action target
self.conn_disc_Qact.triggered.connect(self._conn_disc)
self.file_Qmnu.addSeparator()
# add action "Hide application" to File Menu
self.file_Qmnu.addAction(self.hideApp_Qact)
# set action target
self.hideApp_Qact.triggered.connect(self.hide)
# add action "Close application" to File Menu
self.file_Qmnu.addAction(self.closeApp_Qact)
# set action target
self.closeApp_Qact.triggered.connect(lambda: self._quit_app_func(False))
# add action "Close without saving" to File Menu
self.file_Qmnu.addAction(self.forceCloseApp_Qact)
# set action target
self.forceCloseApp_Qact.triggered.connect(lambda: self._quit_app_func(True))
def _init_tools_qmnu(self):
""" Initialize Tools Menu. """
# add File Menu to Main MenuBar
self.main_Qmnub.addMenu(self.tools_Qmnu)
# add action "New Wallet" to File Menu
self.tools_Qmnu.addAction(self.checkAddress_Qact)
# set action target
self.checkAddress_Qact.triggered.connect(self._check_address)
# add action "Open Wallet" to File Menu
self.tools_Qmnu.addAction(self.checkAddress_Qact)
def _check_address(self):
if not self.solana_addr_check:
self.solana_addr_check = True
ch_adr_window = SolanaAdressCheckerWindow(self.connPool, self)
ch_adr_window.show()
def _conn_disc(self):
self.conn_disc_Qact.setDisabled(True)
if not self._connected:
self.connPool.open()
else:
self.connPool.close()
def _refresh_files_history_qmnu(self):
self.filesHistory_Qmnu.clear()
for file_path in self.config.get('lastFiles', []):
if not os.path.exists(file_path):
self.config['lastFiles'].remove(file_path)
else:
file_name = os.path.basename(file_path)
file_open_qact = QAction(QIcon(":icoWallet"), file_name)
self.filesHistory_Qmnu.addAction(file_open_qact)
# TODO: Add trigger connect
else:
self.filesHistory_Qmnu.addAction(self.noFileHistory_Qact)
def _init_langs_qmnu(self):
""" Initialize Langs Menu. """
self.main_Qmnub.addMenu(self.langs_Qmnu)
# Ar
self.langs_Qmnu.addAction(self.changeLangAr_Qact)
self.changeLangAr_Qact.triggered.connect(lambda: self._change_lang("ar", self.changeLangAr_Qact.text()))
self.changeLangAr_Qact.setDisabled(True)
# De
self.langs_Qmnu.addAction(self.changeLangDe_Qact)
self.changeLangDe_Qact.triggered.connect(lambda: self._change_lang("de", self.changeLangDe_Qact.text()))
self.changeLangDe_Qact.setDisabled(True)
# En
self.langs_Qmnu.addAction(self.changeLangEn_Qact)
self.changeLangEn_Qact.triggered.connect(lambda: self._change_lang("en", self.changeLangEn_Qact.text()))
# self.changeLangEn_Qact.setDisabled(True)
# Es
self.langs_Qmnu.addAction(self.changeLangEs_Qact)
self.changeLangEs_Qact.triggered.connect(lambda: self._change_lang("es", self.changeLangEs_Qact.text()))
self.changeLangEs_Qact.setDisabled(True)
# Fr
self.langs_Qmnu.addAction(self.changeLangFr_Qact)
self.changeLangFr_Qact.triggered.connect(lambda: self._change_lang("fr", self.changeLangFr_Qact.text()))
# self.changeLangFr_Qact.setDisabled(True)
# He
self.langs_Qmnu.addAction(self.changeLangHe_Qact)
self.changeLangHe_Qact.triggered.connect(lambda: self._change_lang("he", self.changeLangHe_Qact.text()))
self.changeLangHe_Qact.setDisabled(True)
# Hi
self.langs_Qmnu.addAction(self.changeLangHi_Qact)
self.changeLangHi_Qact.triggered.connect(lambda: self._change_lang("hi", self.changeLangHi_Qact.text()))
self.changeLangHi_Qact.setDisabled(True)
# It
self.langs_Qmnu.addAction(self.changeLangIt_Qact)
self.changeLangIt_Qact.triggered.connect(lambda: self._change_lang("it", self.changeLangIt_Qact.text()))
self.changeLangIt_Qact.setDisabled(True)
# Ja
self.langs_Qmnu.addAction(self.changeLangJa_Qact)
self.changeLangJa_Qact.triggered.connect(lambda: self._change_lang("ja", self.changeLangJa_Qact.text()))
self.changeLangJa_Qact.setDisabled(True)
# Ko
self.langs_Qmnu.addAction(self.changeLangKo_Qact)
self.changeLangKo_Qact.triggered.connect(lambda: self._change_lang("ko", self.changeLangKo_Qact.text()))
self.changeLangKo_Qact.setDisabled(True)
# Pt
self.langs_Qmnu.addAction(self.changeLangPt_Qact)
self.changeLangPt_Qact.triggered.connect(lambda: self._change_lang("pt", self.changeLangPt_Qact.text()))
self.changeLangPt_Qact.setDisabled(True)
# Ru
self.langs_Qmnu.addAction(self.changeLangRu_Qact)
self.changeLangRu_Qact.triggered.connect(lambda: self._change_lang("ru", self.changeLangRu_Qact.text()))
self.changeLangRu_Qact.setDisabled(True)
# Zh
self.langs_Qmnu.addAction(self.changeLangZh_Qact)
self.changeLangZh_Qact.triggered.connect(lambda: self._change_lang("zh", self.changeLangZh_Qact.text()))
self.changeLangZh_Qact.setDisabled(True)
def _init_help_qmnu(self):
""" Initialize Help menu. """
# add Help Menu to Main MenuBar
self.main_Qmnub.addMenu(self.help_Qmnu)
# add action "Help Content" to Help Menu
self.help_Qmnu.addAction(self.help_Qact)
# set action target
self.help_Qact.triggered.connect(self._show_help_window)
# add action "About Khadhroony SRLPv4" to Help Menu
self.help_Qmnu.addAction(self.about_Qact)
# set action target
self.about_Qact.triggered.connect(self._show_about_window)
def update_display(self):
""" refresh translations of texts in main menubar when language change. """
self.file_Qmnu.setTitle(self.tr("File"))
self.newWallet_Qact.setText(self.tr("New Wallet"))
self.openWallet_Qact.setText(self.tr("Open Wallet"))
self.filesHistory_Qmnu.setTitle(self.tr("Last Files"))
self.noFileHistory_Qact.setText(self.tr("No file found"))
if self._connected:
self.conn_disc_Qact.setText(self.tr("Disconnect"))
else:
self.conn_disc_Qact.setText(self.tr("Connect"))
self.hideApp_Qact.setText(self.tr("Hide application"))
self.closeApp_Qact.setText(self.tr("Close application"))
self.forceCloseApp_Qact.setText(self.tr("Close without saving"))
self.tools_Qmnu.setTitle(self.tr("Tools"))
self.checkAddress_Qact.setText(self.tr("Check Address"))
self.langs_Qmnu.setTitle(self.tr("Languages"))
self.help_Qmnu.setTitle(self.tr("Help"))
self.help_Qact.setText(self.tr("Help Content"))
self.about_Qact.setText(self.tr("About {app}").format(app = APP_NAME))
self.showApp_Qact.setText(self.tr("Show application"))
self.main_Qtabw.setTabText(0, self.tr("Tokens:"))
if self.wallet_Qw.wallet is None:
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = ""))
else:
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = self.wallet_Qw.wallet.walletName))
self.main_Qtabw.setTabText(2, self.tr("Log Display:"))
self.wallet_Qw.update_display()
self.logViewer_Qw.update_display()
self.setLayoutDirection(get_qt_dir())
def closeEvent(self, event: QEvent):
""" Intercept closing to hide the window instead of exiting. """
self.hide()
event.ignore()
def showEvent(self, event) -> None:
""" fired on show event """
self.showApp_Qact.setDisabled(True)
self.hideApp_Qact.setDisabled(False)
super().showEvent(event)
if self._mustCenter:
self._mustCenter = False
center_on_screen(self)
def hideEvent(self, event) -> None:
""" fired on hide event """
self.showApp_Qact.setDisabled(False)
self.hideApp_Qact.setDisabled(True)
self._mustCenter = True
super().hideEvent(event)
if __name__ == "__main__":
logger.remove() # Supprime les sorties par défaut
logger.add(sys.stderr, level = 'TRACE', format = '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{line: <4}</cyan> - <level>{message}</level>', filter = None, colorize = None, serialize = False, backtrace = True, diagnose = True, enqueue = False, context = None, catch = True)
logger.add("logs/KhadhroonyRaydium4.{time}.log", format = "{time:YYYY-MM-DD HH:mm:ss.SSS} - {level} - {line: <4}: {message}", level = "DEBUG")
# Gestion des options de ligne de commande
parser = argparse.ArgumentParser(
description = "==~-<=<~~> No Khadhrawy <$<~~>$> No Hamroony <~~>=>-~==\n\n" + APP_DESC + "\n" + "Launch without options to show main window",
formatter_class = argparse.RawTextHelpFormatter,
epilog = "==~-<=<~~> No Khadhrawy <$<~~>$> No Hamroony <~~>=>-~==\n\n",
prog = APP_NAME
)
parser.add_argument('-v', '--version', action = 'version', version = "{prog} (version {version})".format(prog = APP_NAME, version = APP_VERSION), help = "show program version and exit")
args = parser.parse_args()
# creation du répertoire de wallets
create_wallets_folder()
# chargement configuration de base / historique / langue / display etc ...
config = load_yaml_app_config()
config_locale = config["defaultLang"]
# Déterminer la langue par défaut
system_locale = QLocale.system().name().split("_")[0]
sys_lang = system_locale if system_locale in APP_LANGS else "en"
app_lang = config_locale if config_locale in APP_LANGS else sys_lang
config['defaultLang'] = app_lang
set_app_dir(app_lang)
translator = QTranslator()
translator.load(f":trans_{app_lang}")
# Créer l'application PySide6
app = QApplication(sys.argv)
app.installTranslator(translator)
# chargement du SplashScreen
splash_screen = SplashScreen()
splash_screen.show()
# Planifier l'affichage de la fenêtre principale après le splash screen
main_window = MainWindow(app, translator, config)
splash_screen.fadeOut.finished.connect(main_window.first_load)
sys.exit(app.exec())