4220 lines
208 KiB
Python
Executable File
4220 lines
208 KiB
Python
Executable File
#!.venv/bin/python
|
||
"""Khadhroony SRLPv4 main script"""
|
||
|
||
import argparse
|
||
import base58
|
||
import base64
|
||
# import hashlib
|
||
# import io
|
||
import json
|
||
import os
|
||
# import pathlib
|
||
import platform
|
||
import portalocker
|
||
import random
|
||
import re
|
||
import rc_doc
|
||
import rc_icons
|
||
import rc_images
|
||
import rc_trans
|
||
import sys
|
||
# import time
|
||
import yaml
|
||
|
||
from collections import deque
|
||
from cryptography.fernet import Fernet, InvalidToken
|
||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||
from cryptography.hazmat.primitives import hashes
|
||
from cryptography.hazmat.backends import default_backend
|
||
from datetime import datetime, timezone
|
||
from itertools import count as itercount
|
||
from loguru import logger
|
||
from PySide6 import __version__ as pyside6version
|
||
from PySide6.QtCore import (
|
||
QAbstractTableModel,
|
||
QEasingCurve,
|
||
QEvent,
|
||
QEventLoop,
|
||
QLocale,
|
||
QMargins,
|
||
QModelIndex,
|
||
QObject,
|
||
QPropertyAnimation,
|
||
Qt,
|
||
QThread,
|
||
QTimer,
|
||
QTranslator,
|
||
QUrl,
|
||
Signal,
|
||
Slot
|
||
)
|
||
from PySide6.QtGui import (
|
||
QAction,
|
||
QColor,
|
||
QCursor,
|
||
# QDoubleValidator,
|
||
QIcon,
|
||
# QImage,
|
||
QKeyEvent,
|
||
QLinearGradient,
|
||
QPainter,
|
||
QPixmap,
|
||
QFont
|
||
)
|
||
from PySide6.QtNetwork import (
|
||
QAbstractSocket,
|
||
QNetworkAccessManager,
|
||
QNetworkInformation,
|
||
QNetworkReply,
|
||
QNetworkRequest
|
||
)
|
||
from PySide6.QtPdf import QPdfDocument
|
||
from PySide6.QtPdfWidgets import QPdfView
|
||
from PySide6.QtWebSockets import QWebSocket
|
||
from PySide6.QtWidgets import (
|
||
QAbstractItemView,
|
||
QApplication,
|
||
QDialog,
|
||
QDialogButtonBox,
|
||
QFileDialog,
|
||
QFormLayout,
|
||
QFrame,
|
||
QGraphicsOpacityEffect,
|
||
QGridLayout,
|
||
QHeaderView,
|
||
QHBoxLayout,
|
||
# QInputDialog,
|
||
QLabel,
|
||
QLineEdit,
|
||
QMainWindow,
|
||
QMenu,
|
||
QMessageBox,
|
||
QSplashScreen,
|
||
QPushButton,
|
||
QSizePolicy,
|
||
QSystemTrayIcon,
|
||
QTableView,
|
||
QTabWidget,
|
||
QTextEdit,
|
||
QToolButton,
|
||
QToolBar,
|
||
QVBoxLayout,
|
||
QWidget
|
||
)
|
||
from queue import Queue
|
||
from solders.account_decoder import (
|
||
UiAccountEncoding,
|
||
UiDataSliceConfig
|
||
)
|
||
from solders.commitment_config import CommitmentLevel
|
||
from solders.hash import Hash
|
||
from solders.keypair import Keypair
|
||
from solders.message import VersionedMessage
|
||
from solders.pubkey import Pubkey
|
||
from solders.rpc.config import (
|
||
RpcAccountInfoConfig,
|
||
RpcBlockConfig,
|
||
RpcBlockProductionConfig,
|
||
RpcBlockProductionConfigRange,
|
||
RpcBlockSubscribeConfig,
|
||
RpcBlockSubscribeFilter,
|
||
RpcBlockSubscribeFilterMentions,
|
||
RpcContextConfig, RpcEpochConfig,
|
||
RpcGetVoteAccountsConfig,
|
||
RpcLargestAccountsFilter,
|
||
RpcLeaderScheduleConfig,
|
||
RpcProgramAccountsConfig,
|
||
RpcRequestAirdropConfig,
|
||
RpcSendTransactionConfig,
|
||
RpcSignaturesForAddressConfig,
|
||
RpcSignatureStatusConfig,
|
||
RpcSignatureSubscribeConfig,
|
||
RpcSimulateTransactionConfig,
|
||
RpcSupplyConfig,
|
||
RpcTokenAccountsFilterMint,
|
||
RpcTokenAccountsFilterProgramId,
|
||
RpcTransactionConfig,
|
||
RpcTransactionLogsConfig,
|
||
RpcTransactionLogsFilter,
|
||
RpcTransactionLogsFilterMentions)
|
||
from solders.rpc.filter import Memcmp
|
||
from solders.rpc.requests import (
|
||
AccountSubscribe,
|
||
AccountUnsubscribe,
|
||
BlockSubscribe,
|
||
BlockUnsubscribe,
|
||
GetAccountInfo,
|
||
GetBalance,
|
||
GetBlock,
|
||
GetBlockCommitment,
|
||
GetBlockHeight,
|
||
GetBlockProduction,
|
||
GetBlockTime,
|
||
GetBlocks,
|
||
GetBlocksWithLimit,
|
||
GetClusterNodes,
|
||
GetEpochInfo,
|
||
GetEpochSchedule,
|
||
GetFeeForMessage,
|
||
GetFirstAvailableBlock,
|
||
GetGenesisHash,
|
||
GetHealth,
|
||
GetHighestSnapshotSlot,
|
||
GetIdentity,
|
||
GetInflationGovernor,
|
||
GetInflationRate,
|
||
GetInflationReward,
|
||
GetLargestAccounts,
|
||
GetLatestBlockhash,
|
||
GetLeaderSchedule,
|
||
GetMaxRetransmitSlot,
|
||
GetMaxShredInsertSlot,
|
||
GetMinimumBalanceForRentExemption,
|
||
GetMultipleAccounts,
|
||
GetProgramAccounts,
|
||
GetRecentPerformanceSamples,
|
||
GetSignaturesForAddress,
|
||
GetSignatureStatuses,
|
||
GetSlot,
|
||
GetSlotLeader,
|
||
GetSlotLeaders,
|
||
GetSupply,
|
||
GetTokenAccountBalance,
|
||
GetTokenAccountsByDelegate,
|
||
GetTokenAccountsByOwner,
|
||
GetTokenLargestAccounts,
|
||
GetTokenSupply,
|
||
GetTransaction,
|
||
GetTransactionCount,
|
||
GetVersion,
|
||
GetVoteAccounts,
|
||
IsBlockhashValid,
|
||
LogsSubscribe,
|
||
LogsUnsubscribe,
|
||
MinimumLedgerSlot,
|
||
ProgramSubscribe,
|
||
ProgramUnsubscribe,
|
||
RequestAirdrop,
|
||
RootSubscribe,
|
||
RootUnsubscribe,
|
||
SendRawTransaction,
|
||
SignatureSubscribe,
|
||
SignatureUnsubscribe,
|
||
SlotSubscribe,
|
||
SlotUnsubscribe,
|
||
SlotsUpdatesSubscribe,
|
||
SlotsUpdatesUnsubscribe,
|
||
VoteSubscribe,
|
||
VoteUnsubscribe,
|
||
SimulateVersionedTransaction,
|
||
SimulateLegacyTransaction
|
||
)
|
||
from solders.rpc.responses import (
|
||
LogsNotification,
|
||
# LogsNotificationResult,
|
||
SubscriptionError,
|
||
SubscriptionResult,
|
||
parse_websocket_message, GetBalanceResp, GetAccountInfoResp,
|
||
GetTokenAccountsByOwnerResp, GetTokenSupplyResp, GetProgramAccountsResp,
|
||
GetSignaturesForAddressResp, GetTokenAccountBalanceResp,
|
||
GetTokenAccountsByDelegateResp
|
||
)
|
||
from solders.signature import Signature
|
||
from solders.transaction import VersionedTransaction, Transaction
|
||
from solders.transaction_status import TransactionDetails, UiTransactionEncoding
|
||
from threading import Lock
|
||
from typing import Any, IO, List, NamedTuple, Optional, Sequence, Union
|
||
|
||
APP_NAME: str = "KhadhroonySRLPv4" # Application Name
|
||
APP_ABOUT_NAME: str = "Khadhroony Sol RLPv4 Trading App" # Application Full Name
|
||
APP_FULL_NAME: str = "Khadhroony Solana Raydium Liquidity Pool v4 Trading App" # Application Full Name
|
||
APP_DESC: str = "Khadhroony Solana Raydium Liquidity Pool v4 Trading Application" # Application Version
|
||
APP_VERSION_INFO = ('1', '0', '2') # Application Version (tuple)
|
||
APP_VERSION = '.'.join(APP_VERSION_INFO) # Application Version (str)
|
||
APP_NAME_VERSION = "Khadhroony Solana Raydium Liquidity Pool v4 Trading Application @ " + APP_VERSION
|
||
APP_LANGS = ["en", "fr", "de", "ar"] # supported languages
|
||
APP_RTL_LANGS = ["ae", "aeb", "aec", "ar", "arb", "arc", "arq", "ary", "arz", "ayl", "ckb", "dv", "fa", "glk", "he", "khw", "mzn", "ota", "otk", "pnb", "ps", "syr", "tmr", "ug", "ur"] # rtl langs
|
||
is_rtl = False # global var used to indicate if the current language select is RTL or LTR
|
||
APP_SALT = APP_NAME + " @ " + APP_VERSION_INFO[0] + "." + APP_VERSION_INFO[1] # Application Salt for wallet encrypting"""
|
||
WALLET_FILE_EXT = "kew" # Wallet files extention
|
||
WALLET_FILE_DESC = "Khadhroony Encrypted Wallet" # Wallet files description
|
||
WALLETS_FOLDER = "./wallets" # Wallet files default folder
|
||
URL_Ws_MainNet = "wss://api.mainnet-beta.solana.com" # Websocket Solana Mainnet URL
|
||
URL_Http_MainNet = "https://api.mainnet-beta.solana.com" # Http Solana Mainnet URL
|
||
URL_Ws_DevNet = "wss://api.devnet.solana.com" # Websocket Solana Devnet URL
|
||
URL_Http_DevNet = "https://api.devnet.solana.com" # Http Solana Devnet URL
|
||
""" Solana URLs
|
||
Maximum number of requests per 10 seconds per IP: 100
|
||
Maximum number of requests per 10 seconds per IP for a single RPC: 40
|
||
Maximum concurrent connections per IP: 40
|
||
Maximum connection rate per 10 seconds per IP: 40
|
||
Maximum amount of data per 30 second: 100 MB
|
||
"""
|
||
LAMPORTS_PER_SOL: int = 1_000_000_000 # Number of lamports per SOL, where 1 SOL equals 1 billion lamports.
|
||
MINT_LEN: int = 82 # Data length of a token mint account.
|
||
ACCOUNT_LEN: int = 165 # Data length of a token account.
|
||
MULTISIG_LEN: int = 355 # Data length of a multisig token account.
|
||
SYSTEM_PROGRAM_ID: str = "11111111111111111111111111111111" # Program ID for the System Program (str).
|
||
PK_SYSTEM_PROGRAM_ID = Pubkey.from_string(SYSTEM_PROGRAM_ID) # Program ID for the System Program (Pubkey).
|
||
CONFIG_PROGRAM_ID: str = "Config1111111111111111111111111111111111111" # Program ID for the Config Program (str).
|
||
PK_CONFIG_PROGRAM_ID: Pubkey = Pubkey.from_string(CONFIG_PROGRAM_ID) # Program ID for the Config Program (Pubkey).
|
||
STAKE_PROGRAM_ID: str = "Stake11111111111111111111111111111111111111" # Program ID for the Stake Program (str).
|
||
PK_STAKE_PROGRAM_ID: Pubkey = Pubkey.from_string(STAKE_PROGRAM_ID) # Program ID for the Stake Program (Pubkey).
|
||
VOTE_PROGRAM_ID: str = "Vote111111111111111111111111111111111111111" # Program ID for the Vote Program (str).
|
||
PK_VOTE_PROGRAM_ID: Pubkey = Pubkey.from_string(VOTE_PROGRAM_ID) # Program ID for the Vote Program (Pubkey).
|
||
ADDRESS_LOOKUP_TABLE_PROGRAM_ID: str = "AddressLookupTab1e1111111111111111111111111" # Program ID for the Address Lookup Table Program (str).
|
||
PK_ADDRESS_LOOKUP_TABLE_PROGRAM_ID: Pubkey = Pubkey.from_string(ADDRESS_LOOKUP_TABLE_PROGRAM_ID) # Program ID for the Address Lookup Table Program (Pubkey).
|
||
BPF_LOADER_PROGRAM_ID: str = "BPFLoaderUpgradeab1e11111111111111111111111" # Program ID for the BPF Loader Program (str).
|
||
PK_BPF_LOADER_PROGRAM_ID: Pubkey = Pubkey.from_string(BPF_LOADER_PROGRAM_ID) # Program ID for the BPF Loader Program (Pubkey).
|
||
ED25519_PROGRAM_ID: str = "Ed25519SigVerify111111111111111111111111111" # Program ID for the Ed25519 Program (str).
|
||
PK_ED25519_PROGRAM_ID: Pubkey = Pubkey.from_string(ED25519_PROGRAM_ID) # Program ID for the Ed25519 Program (Pubkey).
|
||
SECP256K1_PROGRAM_ID: str = "KeccakSecp256k11111111111111111111111111111" # Program ID for the Secp256k1 Program (str).
|
||
PK_SECP256K1_PROGRAM_ID: Pubkey = Pubkey.from_string(SECP256K1_PROGRAM_ID) # Program ID for the Secp256k1 Program (Pubkey).
|
||
ASSOCIATED_TOKEN_PROGRAM_ID: str = "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL" # Program ID for the associated token account program (str).
|
||
PK_ASSOCIATED_TOKEN_PROGRAM_ID: Pubkey = Pubkey.from_string(ASSOCIATED_TOKEN_PROGRAM_ID) # Program ID for the associated token account program (Pubkey).
|
||
TOKEN_PROGRAM_ID: str = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" # Public key that identifies the SPL token program (str).
|
||
PK_TOKEN_PROGRAM_ID: Pubkey = Pubkey.from_string(TOKEN_PROGRAM_ID) # Public key that identifies the SPL token program (Pubkey).
|
||
TOKEN_2022_PROGRAM_ID: str = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb" # Public key that identifies the SPL token 2022 program (str).
|
||
PK_TOKEN_2022_PROGRAM_ID: Pubkey = Pubkey.from_string(TOKEN_2022_PROGRAM_ID) # Public key that identifies the SPL token 2022 program (Pubkey).
|
||
WRAPPED_SOL_MINT: str = "So11111111111111111111111111111111111111112" # Public key of the "Native Mint" for wrapping SOL to SPL token (str).
|
||
""" The Token Program can be used to wrap native SOL. Doing so allows native SOL to be treated like any
|
||
other Token program token type and can be useful when being called from other programs that interact
|
||
with the Token Program's interface. """
|
||
PK_WRAPPED_SOL_MINT: Pubkey = Pubkey.from_string(WRAPPED_SOL_MINT) # Public key of the "Native Mint" for wrapping SOL to SPL token (Pubkey).
|
||
""" The Token Program can be used to wrap native SOL. Doing so allows native SOL to be treated like any
|
||
other Token program token type and can be useful when being called from other programs that interact
|
||
with the Token Program's interface. """
|
||
MEMO_PROGRAM_ID: str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr" # Public key that identifies the Memo program (str).
|
||
PK_MEMO_PROGRAM_ID: Pubkey = Pubkey.from_string(MEMO_PROGRAM_ID) # Public key that identifies the Memo program (Pubkey).
|
||
RaydiumLPV4: str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8" # Public key that identifies the Raydium Liquidity Pool V4 (str)."""
|
||
PK_RaydiumLPV4: Pubkey = Pubkey.from_string(RaydiumLPV4) # Public key that identifies the Raydium Liquidity Pool V4 (Pubkey).
|
||
RPC_WS_REQ = ["accountSubscribe", "accountUnsubscribe", "blockSubscribe", "blockUnsubscribe", "logsSubscribe", "logsUnsubscribe", "programSubscribe", "programUnsubscribe", "rootSubscribe", "rootUnsubscribe", "signatureSubscribe", "signatureUnsubscribe", "slotSubscribe", "slotUnsubscribe", "slotsUpdatesSubscribe", "slotsUpdatesUnsubscribe", "voteSubscribe", "voteUnsubscribe"] # RPC WebSocket Requests
|
||
RPC_HTTP_REQ = ["getAccountInfo", "getBalance", "getBlock", "getBlockCommitment", "getBlockHeight", "getBlockProduction", "getBlockTime", "getBlocks", "getBlocksWithLimit", "getClusterNodes", "getEpochInfo", "getEpochSchedule", "getFeeForMessage", "getFirstAvailableBlock", "getGenesisHash", "getHealth", "getHighestSnapshotSlot", "getIdentity", "getInflationGovernor", "getInflationRate", "getInflationReward", "getLargestAccounts", "getLatestBlockhash", "getLeaderSchedule", "getMaxRetransmitSlot", "getMaxShredInsertSlot", "getMinimumBalanceForRentExemption", "getMultipleAccounts", "getProgramAccounts", "getRecentPerformanceSamples", "getRecentPrioritizationFees", "getSignatureStatuses", "getSignaturesForAddress", "getSlot", "getSlotLeader", "getSlotLeaders", "getStakeMinimumDelegation", "getSupply", "getTokenAccountBalance", "getTokenAccountsByDelegate", "getTokenAccountsByOwner", "getTokenLargestAccounts", "getTokenSupply", "getTransaction", "getTransactionCount", "getVersion", "getVoteAccounts", "isBlockhashValid", "minimumLedgerSlot", "requestAirdrop", "sendTransaction", "simulateTransaction"] # RPC Http Requests
|
||
YAML_CONFIG_FILE = "KhadhroonySRLPv4.yaml" # Yaml config file
|
||
DEFAULT_YAML_CONFIG = {
|
||
""" default Yaml config """
|
||
"lastFile": "",
|
||
"lastFiles": [],
|
||
"defaultLang": "en",
|
||
"lastWidth": 1024,
|
||
"lastHeight": 768,
|
||
"lastMaximized": True
|
||
}
|
||
|
||
|
||
def load_yaml_app_config():
|
||
""" Load the configuration file or create a default one. """
|
||
if not os.path.exists(YAML_CONFIG_FILE):
|
||
logger.debug(f"{YAML_CONFIG_FILE} not found, creating new one")
|
||
with open(YAML_CONFIG_FILE, "w") as f:
|
||
yaml.dump(DEFAULT_YAML_CONFIG, f)
|
||
return DEFAULT_YAML_CONFIG
|
||
with open(YAML_CONFIG_FILE, "r") as f:
|
||
data = yaml.safe_load(f)
|
||
logger.debug(f"{YAML_CONFIG_FILE} safe loaded")
|
||
# Data validation and completion
|
||
for key, val in DEFAULT_YAML_CONFIG.items():
|
||
if key not in data:
|
||
logger.debug(f"{key} not found")
|
||
data[key] = val
|
||
elif not isinstance(data[key], type(val)):
|
||
logger.debug(f"{key} wrong type : wanted {type(val)}, found {type(data[key])}")
|
||
data[key] = val
|
||
return data
|
||
|
||
|
||
def save_yaml_app_config(cfg):
|
||
""" Save the configuration to the file. """
|
||
with open(YAML_CONFIG_FILE, "w") as f:
|
||
yaml.dump(cfg, f)
|
||
|
||
|
||
def create_wallets_folder():
|
||
""" Checks if the wallets directory exists and creates it if not. """
|
||
if not os.path.exists(WALLETS_FOLDER):
|
||
os.makedirs(WALLETS_FOLDER)
|
||
|
||
|
||
def derive_key(password: str, salt: bytes) -> bytes:
|
||
""" Derive a symmetric key from the given password and salt.
|
||
Args:
|
||
password (str): The password to derive the key from.
|
||
salt (bytes): A random salt for the key derivation.
|
||
Returns:
|
||
bytes: The derived key.
|
||
"""
|
||
kdf = PBKDF2HMAC(
|
||
algorithm = hashes.SHA256(),
|
||
length = 32,
|
||
salt = salt,
|
||
iterations = 100_000,
|
||
backend = default_backend(),
|
||
)
|
||
return base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8")))
|
||
|
||
|
||
def encrypt_string(password: str, salt: bytes, clear_txt: str) -> str:
|
||
""" Encrypts a string using a password.
|
||
Args:
|
||
password (str): The password to derive the encryption key.
|
||
salt (bytes): A salt for the key derivation.
|
||
clear_txt (str): The clear string to encrypt.
|
||
Returns:
|
||
tuple: containing the salt and the encrypted string.
|
||
"""
|
||
key = derive_key(password, salt) # Derive the encryption key
|
||
fernet = Fernet(key)
|
||
encrypted_text = fernet.encrypt(clear_txt.encode("utf-8"))
|
||
return encrypted_text.decode()
|
||
|
||
|
||
def decrypt_string(password: str, salt: bytes, encrypted_txt: Union[bytes, str]) -> str:
|
||
""" Decrypts an encrypted string using the given password and salt.
|
||
Args:
|
||
password (str): The password to derive the decryption key.
|
||
salt (bytes): The salt used during encryption.
|
||
encrypted_txt (Union[bytes, str]): The encrypted string to decrypt.
|
||
Returns:
|
||
str: The decrypted string.
|
||
"""
|
||
enc_text = encrypted_txt
|
||
if isinstance(encrypted_txt, str):
|
||
enc_text = encrypted_txt.encode("utf-8")
|
||
key = derive_key(password, salt) # Derive the decryption key
|
||
fernet = Fernet(key)
|
||
return fernet.decrypt(enc_text).decode()
|
||
|
||
|
||
def center_on_screen(widget: QWidget):
|
||
""" Centers the window on the active monitor (the one where the cursor is present). """
|
||
screen = QApplication.screenAt(QCursor().pos()) # Get Active Monitor
|
||
if not screen:
|
||
screen = QApplication.primaryScreen() # By default, use the main screen
|
||
screen_geometry = screen.availableGeometry() # Screen dimensions
|
||
widget_geometry = widget.frameGeometry()
|
||
# Calculating coordinates to center the window
|
||
x = screen_geometry.x() + (screen_geometry.width() - widget_geometry.width()) // 2
|
||
y = screen_geometry.y() + (screen_geometry.height() - widget_geometry.height()) // 2
|
||
widget.move(x, y)
|
||
|
||
|
||
def set_app_dir(lang_code: str):
|
||
""" Set the application layout direction depending of the language code.
|
||
Args:
|
||
lang_code (str): The language code.
|
||
"""
|
||
global is_rtl
|
||
if lang_code in APP_RTL_LANGS:
|
||
is_rtl = True
|
||
else:
|
||
is_rtl = False
|
||
|
||
|
||
def get_qt_dir() -> Qt.LayoutDirection:
|
||
""" get the Qt layout direction depending on is_rtl.
|
||
Returns:
|
||
Qt.LayoutDirection: Enum LTR/RTL.
|
||
"""
|
||
global is_rtl
|
||
return Qt.LayoutDirection.RightToLeft if is_rtl else Qt.LayoutDirection.LeftToRight
|
||
|
||
|
||
def get_qt_align_center() -> Qt.AlignmentFlag:
|
||
""" return the Qt alignment center. """
|
||
return Qt.AlignmentFlag.AlignCenter
|
||
|
||
|
||
def get_qt_align_vchl(rtl: bool = False) -> Qt.AlignmentFlag:
|
||
""" return the Qt alignment vertical center and to left if ltr or to right if rtl. """
|
||
if rtl:
|
||
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignRight
|
||
else:
|
||
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignLeft
|
||
|
||
|
||
def get_qt_align_vchr(rtl: bool = False) -> Qt.AlignmentFlag:
|
||
""" return the Qt alignment vertical center and to right if ltr or to left if rtl. """
|
||
if rtl:
|
||
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignLeft
|
||
else:
|
||
return Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignRight
|
||
|
||
|
||
def get_qt_align_hcenter() -> Qt.AlignmentFlag:
|
||
""" return the Qt alignment to horizontal center. """
|
||
return Qt.AlignmentFlag.AlignHCenter
|
||
|
||
|
||
def get_qt_align_hl(rtl: bool = False) -> Qt.AlignmentFlag:
|
||
""" return the Qt alignment to horizontal left if ltr or to horizontal right if rtl. """
|
||
if rtl:
|
||
return Qt.AlignmentFlag.AlignRight
|
||
else:
|
||
return Qt.AlignmentFlag.AlignLeft
|
||
|
||
|
||
def get_qt_align_hr(rtl: bool = False) -> Qt.AlignmentFlag:
|
||
""" return the Qt alignment to horizontal right if ltr or to horizontal left if rtl. """
|
||
if rtl:
|
||
return Qt.AlignmentFlag.AlignLeft
|
||
else:
|
||
return Qt.AlignmentFlag.AlignRight
|
||
|
||
|
||
class LockedFile:
|
||
""" A class for handling files with exclusive locking.
|
||
Use portalocker to avoid concurrent access. """
|
||
def __init__(self, file_path: str, mode: str = "r"):
|
||
""" Initializes a Lock.
|
||
Args:
|
||
file_path (str): Chemin vers le fichier à verrouiller.
|
||
mode (str): Mode d'ouverture du fichier ('r', 'w', 'a', etc.).
|
||
"""
|
||
self._file = None
|
||
self._filePath = file_path
|
||
self._mode = mode
|
||
|
||
def __enter__(self) -> IO[Any]:
|
||
""" Opens and locks the file.
|
||
Returns:
|
||
IO[Any]: file stream
|
||
"""
|
||
self._file = open(file = self._filePath, mode = self._mode)
|
||
# Application of an exclusive lock
|
||
portalocker.lock(self._file, portalocker.LOCK_EX)
|
||
return self._file
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
""" Releases the lock and closes the file. """
|
||
try:
|
||
portalocker.unlock(self._file)
|
||
finally:
|
||
self._file.close()
|
||
|
||
|
||
class Map:
|
||
|
||
def __init__(self):
|
||
""" Initializes an internal dictionary to store items. """
|
||
self._data = {}
|
||
|
||
def put(self, key, value):
|
||
""" Adds or updates an entry in the map.
|
||
Args:
|
||
key: the unique key of the map
|
||
value: the value associated to the key
|
||
"""
|
||
previous_value = self._data.get(key)
|
||
self._data[key] = value
|
||
return previous_value
|
||
|
||
def get(self, key) -> Any | None:
|
||
""" Retrieves the value associated with a key, or None if the key does not exist.
|
||
Args:
|
||
key: the key to check
|
||
Returns:
|
||
Any | None: the value associated with a key or None
|
||
"""
|
||
return self._data.get(key)
|
||
|
||
def remove(self, key) -> Any | None:
|
||
""" Deletes a key and returns its value, or None if the key does not exist.
|
||
Args:
|
||
key: the key to remove
|
||
Returns:
|
||
Any | None: the value associated with a key or None
|
||
"""
|
||
return self._data.pop(key, None)
|
||
|
||
def contains_key(self, key) -> bool:
|
||
""" Checks if a key is present in the map.
|
||
Args:
|
||
key: the key to check
|
||
Returns:
|
||
bool: True if exists
|
||
"""
|
||
return key in self._data
|
||
|
||
def contains_value(self, value) -> bool:
|
||
""" Checks if a value is present in the map.
|
||
Args:
|
||
value: the value to check
|
||
Returns:
|
||
bool: True if exists
|
||
"""
|
||
return value in self._data.values()
|
||
|
||
def size(self) -> int:
|
||
""" Returns the number of elements in the map.
|
||
Returns:
|
||
int: number of elements in the map
|
||
"""
|
||
return len(self._data)
|
||
|
||
def is_empty(self) -> bool:
|
||
""" Check if the map is empty.
|
||
Returns:
|
||
bool: True if the map is empty, False otherwise.
|
||
"""
|
||
return len(self._data) == 0
|
||
|
||
def keys(self):
|
||
""" Returns an iterator over the keys of the map. """
|
||
return iter(self._data.keys())
|
||
|
||
def values(self):
|
||
""" Returns an iterator over the values in the map. """
|
||
return iter(self._data.values())
|
||
|
||
def items(self):
|
||
""" Returns an iterator over the key-value pairs in the map. """
|
||
return iter(self._data.items())
|
||
|
||
def clear(self):
|
||
""" Removes all elements from the map. """
|
||
self._data.clear()
|
||
|
||
def __repr__(self):
|
||
""" Readable representation of the map. """
|
||
return f"Map({self._data})"
|
||
|
||
class DataSliceOpts(NamedTuple):
|
||
""" Option to limit the returned account data, only available for "base58" or "base64" encoding.
|
||
Attributes:
|
||
offset (int): Limit the returned account data using the provided offset: <usize>.
|
||
length (int): Limit the returned account data using the provided length: <usize>.
|
||
"""
|
||
offset: int
|
||
length: int
|
||
|
||
|
||
class MemcmpOpts(NamedTuple):
|
||
""" Option to compare a provided series of bytes with program account data at a particular offset.
|
||
Attributes:
|
||
offset (int): Offset into program account data to start comparison: <usize>.
|
||
bytes (str): Data to match, as base-58 encoded string: <string>.
|
||
"""
|
||
offset: int
|
||
bytes: str
|
||
|
||
|
||
class TokenAccountOpts(NamedTuple):
|
||
""" Options when querying token accounts. Provide one of mint or programId.
|
||
Attributes:
|
||
mint (Optional[Pubkey]): Public key of the specific token Mint to limit accounts to.
|
||
programId (Optional[Pubkey]): Public key of the Token program ID that owns the accounts.
|
||
encoding (UiAccountEncoding): Encoding for Account data, either "base58" (slow) or "base64".
|
||
dataSlice (Optional[DataSliceOpts]): Option to limit the returned account data, only available for "base58" or "base64" encoding.
|
||
"""
|
||
mint: Optional[Pubkey] = None
|
||
programId: Optional[Pubkey] = None
|
||
encoding: UiAccountEncoding = UiAccountEncoding.Base64
|
||
dataSlice: Optional[DataSliceOpts] = None
|
||
|
||
|
||
class TxOpts(NamedTuple):
|
||
""" Options to specify when broadcasting a transaction.
|
||
Attributes:
|
||
skipConfirmation (bool): If false, `sendTransaction` will try to confirm that the transaction was successfully broadcasted. When confirming a transaction, `sendTransaction` will block for a maximum of 30 seconds.
|
||
skipPreflight (bool]): If true, skip the preflight transaction checks.
|
||
prefLightCommitment (CommitmentLevel): Commitment level to use for preflight. Default to CommitmentLevel.Finalized.
|
||
prefEncoding (UiTransactionEncoding): Encoding to use. Default to UiTransactionEncoding.Base64.
|
||
maxRetries (Optional[int]): Maximum number of times for the RPC node to retry sending the transaction to the leader. If this parameter not provided, the RPC node will retry the transaction until it is finalized or until the blockhash expires.
|
||
lastValidBlockHeight (Optional[int]): Pass the latest valid block height here, to be consumed by confirm_transaction. Valid only if skip_confirmation is False.
|
||
"""
|
||
skipConfirmation: bool = True
|
||
skipPreflight: bool = False
|
||
prefLightCommitment: CommitmentLevel = CommitmentLevel.Finalized
|
||
prefEncoding: UiTransactionEncoding = UiTransactionEncoding.Base64
|
||
maxRetries: Optional[int] = None
|
||
lastValidBlockHeight: Optional[int] = None
|
||
|
||
|
||
class SharedCounter:
|
||
""" Class for a self-incrementing, thread safe, shared counter. """
|
||
_counter = itercount() # Shared counter
|
||
_lock = Lock() # Lock to ensure thread safety
|
||
|
||
@classmethod
|
||
def reset_id(cls):
|
||
""" Resets the counters to their initial value. Thread-safe through the use of a lock. """
|
||
with cls._lock:
|
||
cls._counter = itercount()
|
||
|
||
@classmethod
|
||
def get_next_id(cls):
|
||
""" Returns the next unique identifier. Thread-safe due to the use of a lock. """
|
||
with cls._lock:
|
||
return next(cls._counter) + 1
|
||
|
||
class WsError:
|
||
""" Websocket error """
|
||
def __init__(self, code: QAbstractSocket.SocketError, body: str):
|
||
self.code = code
|
||
self.body = body
|
||
|
||
|
||
class NetworkReplyError:
|
||
""" HTTP Network Error Reply """
|
||
def __init__(self, code: QNetworkReply.NetworkError, body: str):
|
||
self.code = code
|
||
self.body = body
|
||
|
||
|
||
class ResponseError:
|
||
""" Http Response with code error between 400 and 600 """
|
||
def __init__(self, code: int, headers: dict, body: str, resp_size: int):
|
||
self.code = code
|
||
self.headers = headers
|
||
self.body = body
|
||
self.size = resp_size
|
||
|
||
|
||
class ResponseOk:
|
||
""" Http Response Ok with size """
|
||
def __init__(self, body: str, resp_size: int):
|
||
self.body = body
|
||
self.size = resp_size
|
||
|
||
|
||
class SimpleSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal()
|
||
|
||
def signal(self):
|
||
""" emit Signal """
|
||
self._sig.emit()
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class StrSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal(str)
|
||
|
||
def signal(self, data_str: str):
|
||
""" emit Signal """
|
||
self._sig.emit(data_str)
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class SocketStateSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal(QAbstractSocket.SocketState)
|
||
|
||
def signal(self, data_qasss: QAbstractSocket.SocketState):
|
||
""" emit Signal """
|
||
self._sig.emit(data_qasss)
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class SocketErrorSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal(WsError)
|
||
|
||
def signal(self, ws_err: WsError):
|
||
""" emit Signal """
|
||
self._sig.emit(ws_err)
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class NetworkReplyErrorSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal(NetworkReplyError)
|
||
|
||
def signal(self, reply: NetworkReplyError):
|
||
""" emit Signal """
|
||
self._sig.emit(reply)
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class ResponseErrorSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal(ResponseError)
|
||
|
||
def signal(self, resp: ResponseError):
|
||
""" emit Signal """
|
||
self._sig.emit(resp)
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class ResponseSuccessSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal(ResponseOk)
|
||
|
||
def signal(self, resp: ResponseOk):
|
||
""" emit Signal """
|
||
self._sig.emit(resp)
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class DictSignal(QObject):
|
||
""" Object emitting a Signal """
|
||
_sig = Signal(dict)
|
||
|
||
def signal(self, data: dict):
|
||
""" emit Signal """
|
||
self._sig.emit(data)
|
||
|
||
def conn(self, slot_obj, con_type = Qt.ConnectionType.AutoConnection):
|
||
""" connect Signal """
|
||
self._sig.connect(slot = slot_obj, type = con_type)
|
||
|
||
def disc(self):
|
||
""" disconnect Signal """
|
||
self._sig.disconnect()
|
||
|
||
|
||
class WebSocketClient(QThread):
|
||
"""Runnable WebSocket Client (wrap for a QWebSocket) """
|
||
def __init__(self, base_url: str, max_retry: int = 3, reconnect_in: int = 2000, parent = None):
|
||
super().__init__(parent)
|
||
self.socketStateSig = SocketStateSignal() # Signal avec état de connexion
|
||
self.socketErrorSig = SocketErrorSignal() # Signal avec erreur de connexion
|
||
self.logSig = StrSignal() # Signal avec log
|
||
self.responseOkSig = StrSignal() # Signal pour notifier la reception de nouveaux messages
|
||
self._baseUrl = base_url
|
||
self._maxRetry = max(max_retry, 1)
|
||
self._reconnectIn = max(reconnect_in, 1000)
|
||
self._attempts = 0
|
||
self._websocket = QWebSocket()
|
||
self._websocket.connected.connect(self._on_connected)
|
||
self._websocket.disconnected.connect(self._on_disconnected)
|
||
self._websocket.errorOccurred.connect(self._on_error)
|
||
self._websocket.textMessageReceived.connect(self._on_message)
|
||
self._threadRunning = False
|
||
|
||
def _connect(self):
|
||
""" Opens the connection and attempts reconnections if unsuccessful. """
|
||
if self._threadRunning:
|
||
if self._attempts != 0:
|
||
self.logSig.signal("New connexion attempt")
|
||
self._websocket.open(QUrl(self._baseUrl))
|
||
|
||
def _on_connected(self):
|
||
""" Connection successful handling. """
|
||
self._attempts = 0 # Reset reconnection attempts
|
||
self.socketStateSig.signal(self._websocket.state()) # Emit websocket state
|
||
|
||
def _on_disconnected(self):
|
||
""" Disconnection handling. """
|
||
self.socketStateSig.signal(self._websocket.state()) # Emit websocket state
|
||
if self._threadRunning and self._attempts < self._maxRetry:
|
||
self._attempts += 1
|
||
QTimer.singleShot(self._reconnectIn, self._connect) # Wait "self._reconnectIn" seconds before trying again
|
||
elif self._threadRunning:
|
||
self.logSig.signal("Maximum reconnection attempts reached.") # Log signal
|
||
self._threadRunning = False
|
||
|
||
def _on_error(self, code: QAbstractSocket.SocketError):
|
||
""" Error handling. """
|
||
ws_err = WsError(code, self._websocket.errorString())
|
||
self.socketErrorSig.signal(ws_err) # error
|
||
|
||
def _on_message(self, resp: str):
|
||
""" Management of received messages. """
|
||
self.responseOkSig.signal(resp)
|
||
|
||
def run(self):
|
||
""" QThread Run """
|
||
self.exec()
|
||
|
||
def get_socket_state(self) -> QAbstractSocket.SocketState:
|
||
""" Get the current state of the websocket. """
|
||
return self._websocket.state()
|
||
|
||
def open(self) -> bool:
|
||
""" Opens the WebSocket connection. """
|
||
if self._threadRunning:
|
||
self.logSig.signal("WebSocket is already connected.") # if this happend thant something is wrong in code logic
|
||
return False
|
||
self.socketStateSig.signal(QAbstractSocket.SocketState.ConnectingState)
|
||
self._threadRunning = True
|
||
self._attempts = 0
|
||
self._connect()
|
||
self.start()
|
||
return True
|
||
|
||
def send_msg(self, message: Union[bytes, str]) -> bool:
|
||
""" Send an msg to Websocket server """
|
||
if self._websocket.state() == QAbstractSocket.SocketState.ConnectedState:
|
||
if isinstance(message, bytes):
|
||
self._websocket.sendBinaryMessage(message)
|
||
else:
|
||
self._websocket.sendTextMessage(message)
|
||
return True
|
||
return False
|
||
|
||
def close(self):
|
||
""" Close connexion WebSocket connexion. """
|
||
self._threadRunning = False
|
||
self._attempts = 0
|
||
if self._websocket.state() != QAbstractSocket.SocketState.UnconnectedState:
|
||
self._websocket.close()
|
||
self.quit()
|
||
self.wait()
|
||
|
||
|
||
class HttpPostClient(QObject):
|
||
""" Sync Client for HTTP POST Request """
|
||
def __init__(self, base_url: str, parent = None):
|
||
super().__init__(parent)
|
||
self._baseUrl = base_url
|
||
self._defaultHeader = "application/json"
|
||
self._networkManager = QNetworkAccessManager()
|
||
|
||
@staticmethod
|
||
def _process_reply(reply: QNetworkReply) -> Union[NetworkReplyError, ResponseError, ResponseOk]:
|
||
""" Sync Process Reply """
|
||
if reply.error() != QNetworkReply.NetworkError.NoError:
|
||
rep_err = NetworkReplyError(reply.error(), reply.errorString())
|
||
reply.deleteLater()
|
||
return rep_err
|
||
status_code = reply.attribute(QNetworkRequest.Attribute.HttpStatusCodeAttribute)
|
||
headers = {
|
||
(k.data() if hasattr(k, "data") else k).decode():
|
||
(v.data() if hasattr(v, "data") else v).decode()
|
||
for k, v in reply.rawHeaderPairs()
|
||
}
|
||
body = reply.readAll().data().decode()
|
||
size = len(body.encode('utf-8')) + sum(len(k.encode('utf-8')) + len(v.encode('utf-8')) + 4 for k, v in headers.items()) + 2 # Include ": " and "\r\n"
|
||
if 400 <= status_code < 600:
|
||
reply.deleteLater()
|
||
return ResponseError(status_code, headers, body, size)
|
||
else:
|
||
reply.deleteLater()
|
||
return ResponseOk(body, size)
|
||
|
||
def send_post(self, body: Union[bytes, str]) -> Union[NetworkReplyError, ResponseError, ResponseOk]:
|
||
""" Send HTTP POST Request """
|
||
if isinstance(body, str):
|
||
body = body.encode("utf-8")
|
||
request = QNetworkRequest(self._baseUrl)
|
||
request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, self._defaultHeader)
|
||
reply = self._networkManager.post(request, body)
|
||
while not reply.isFinished():
|
||
self._networkManager.thread().eventDispatcher().processEvents(QEventLoop.ProcessEventsFlag.AllEvents)
|
||
return self._process_reply(reply)
|
||
|
||
|
||
class AsyncHttpPostClient(QObject):
|
||
""" Async Client for HTTP POST Request """
|
||
def __init__(self, base_url: str, parent = None):
|
||
super().__init__(parent)
|
||
self.requestSentSig = SimpleSignal()
|
||
self.netRepErrorSig = NetworkReplyErrorSignal()
|
||
self.responseErrorSig = ResponseErrorSignal()
|
||
self.responseOkSig = ResponseSuccessSignal()
|
||
self._baseUrl = base_url
|
||
self._defaultHeader = "application/json"
|
||
self._networkManager = QNetworkAccessManager()
|
||
|
||
def _process_reply(self, reply: QNetworkReply):
|
||
""" Async Process Reply """
|
||
if reply.error() != QNetworkReply.NetworkError.NoError:
|
||
self.netRepErrorSig.signal(NetworkReplyError(reply.error(), reply.errorString()))
|
||
else:
|
||
status_code = reply.attribute(QNetworkRequest.Attribute.HttpStatusCodeAttribute)
|
||
headers = {
|
||
(k.data() if hasattr(k, "data") else k).decode():
|
||
(v.data() if hasattr(v, "data") else v).decode()
|
||
for k, v in reply.rawHeaderPairs()
|
||
}
|
||
body = reply.readAll().data().decode()
|
||
size = len(body.encode('utf-8')) + sum(len(k.encode('utf-8')) + len(v.encode('utf-8')) + 4 for k, v in headers.items()) + 2 # Include ": " and "\r\n"
|
||
if 400 <= status_code < 600:
|
||
self.responseErrorSig.signal(ResponseError(status_code, headers, body, size))
|
||
else:
|
||
self.responseOkSig.signal(ResponseOk(body, size))
|
||
reply.deleteLater()
|
||
|
||
def send_post(self, body: Union[str, bytes]):
|
||
""" Send HTTP Post Request without waiting for Response. """
|
||
if isinstance(body, str):
|
||
body = body.encode("utf-8")
|
||
request = QNetworkRequest(QUrl(self._baseUrl))
|
||
request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, self._defaultHeader)
|
||
reply = self._networkManager.post(request, body)
|
||
self.requestSentSig.signal()
|
||
reply.finished.connect(lambda: self._process_reply(reply))
|
||
|
||
|
||
class ConnectionPool(QObject):
|
||
""" Client Pool with 1 fixed websocket, a minimum of 1 websocket and 1 httpclient in Pool and maxi 21 clients (ws+http) in Pool. """
|
||
def __init__(self, ws_url: str, http_url: str, num_ws_clients: int = 1, num_async_http_clients: int = 2, num_sync_http_clients: int = 2, parent = None):
|
||
super().__init__(parent)
|
||
self._num_ws_clients = max(num_ws_clients, 1)
|
||
self._num_async_http_clients = max(num_async_http_clients, 1)
|
||
self._num_sync_http_clients = max(num_sync_http_clients, 1)
|
||
# Ensure the total number of clients does not exceed 21
|
||
total_clients = self._num_ws_clients + self._num_async_http_clients + self._num_sync_http_clients
|
||
if total_clients > 21:
|
||
# Calculate the reduction factor
|
||
reduction_factor = 21 / total_clients
|
||
self._num_ws_clients = max(1, int(self._num_ws_clients * reduction_factor))
|
||
self._num_async_http_clients = max(1, int(self._num_async_http_clients * reduction_factor))
|
||
self._num_sync_http_clients = max(1, int(self._num_sync_http_clients * reduction_factor))
|
||
# Adjust further if necessary
|
||
while self._num_ws_clients + self._num_async_http_clients + self._num_sync_http_clients > 21:
|
||
if self._num_sync_http_clients > 1:
|
||
self._num_sync_http_clients -= 1
|
||
elif self._num_ws_clients > 1:
|
||
self._num_ws_clients -= 1
|
||
else:
|
||
self._num_async_http_clients -= 1
|
||
# Clients list
|
||
self._wsMainClient = WebSocketClient(ws_url, 3, 2000)
|
||
self._wsClients = [WebSocketClient(ws_url, 3, 2000) for _ in range(self._num_ws_clients)]
|
||
self._connectedWsClients = [] # List of connected WebSocket clients
|
||
self._asyncHttpClients = [AsyncHttpPostClient(http_url) for _ in range(self._num_async_http_clients)]
|
||
self._syncHttpClients = [HttpPostClient(http_url) for _ in range(self._num_sync_http_clients)]
|
||
# Signals list
|
||
self.logSig = StrSignal() # new log Signal
|
||
self.wsMainConnexionStatusSig = SocketStateSignal() # connection status change for _wsMainClient
|
||
self.wsConnexionStatusSigs = [] # connection status change for _wsClients[i]
|
||
for _ in range(self._num_ws_clients):
|
||
self.wsConnexionStatusSigs.append(SocketStateSignal())
|
||
self.httpPostSentSigs = [] # new msg sent via httpClient
|
||
self.httpReplyReceivedSigs = [] # nouveau message via httpClient
|
||
for _ in range(self._num_async_http_clients):
|
||
self.httpPostSentSigs.append(SimpleSignal())
|
||
self.httpReplyReceivedSigs.append(SimpleSignal())
|
||
self.wsMainReplyReceivedSig = SimpleSignal() # new message received and stored for _wsMainClient
|
||
self.wsReplyReceivedSig = SimpleSignal() # new message received and stored for _wsClients[i]
|
||
self.httpReplyReceivedSig = SimpleSignal() # new message received and stored for _asyncHttpClients[i]
|
||
# connecting clients signals
|
||
self._wsMainClient.socketStateSig.conn(self._handle_ws_main_connect_status)
|
||
self._wsMainClient.socketErrorSig.conn(self._handle_ws_main_error)
|
||
self._wsMainClient.logSig.conn(self._handle_ws_main_log)
|
||
self._wsMainClient.responseOkSig.conn(self._handle_ws_main_new_msg)
|
||
for idx, client in enumerate(self._wsClients):
|
||
client.socketStateSig.conn(self._handle_ws_connect_status(idx))
|
||
client.socketErrorSig.conn(self._handle_ws_error(idx))
|
||
client.logSig.conn(self._handle_ws_log(idx))
|
||
client.responseOkSig.conn(self._handle_ws_new_msg())
|
||
for idx, client in enumerate(self._asyncHttpClients):
|
||
client.requestSentSig.conn(self._handle_async_http_msg_sent(idx))
|
||
client.netRepErrorSig.conn(self._handle_async_http_net_error(idx))
|
||
client.responseErrorSig.conn(self._handle_async_http_resp_err(idx))
|
||
client.responseOkSig.conn(self._handle_async_http_new_msg(idx))
|
||
self._wsMainReceivedMessages = deque(maxlen = 1000) # FIFO to store messages received by _wsMainClient
|
||
self._wsReceivedMessages = deque(maxlen = 1000) # FIFO to store messages received by _wsClients[i]
|
||
self._httpReceivedMessages = deque(maxlen = 1000) # FIFO to store messages received by _asyncHttpClients[i]
|
||
# Initialize request message queues
|
||
self._wsMainQueue = Queue()
|
||
self._wsQueue = Queue()
|
||
self._asyncHttpQueue = Queue()
|
||
# Counters Limits
|
||
self._maxRequestsPer10s = 100
|
||
self._maxRpcRequestsPer10s = 40
|
||
self._maxDataPer30s = 100 * 1024 * 1024 # 100 MB
|
||
# Counters
|
||
self._currentRequestCount = 0
|
||
self._currentDataSize = 0
|
||
self._currentRpcCounters = {method: 0 for method in (RPC_WS_REQ + RPC_HTTP_REQ)} # Specific counters for each RPC method
|
||
self._lock = Lock() # Lock for synchronization
|
||
self._connected = False # loop indicator in progress or stopped
|
||
# Timers
|
||
self._timer_reset_counters = QTimer()
|
||
self._timer_reset_counters.timeout.connect(self._reset_counters)
|
||
self._timer_process_queues = QTimer()
|
||
self._timer_process_queues.timeout.connect(self._process_queues)
|
||
|
||
def _reset_counters(self):
|
||
""" Resets the counters periodically. """
|
||
with self._lock:
|
||
self._currentRequestCount = 0
|
||
self._currentDataSize = 0
|
||
for key in self._currentRpcCounters:
|
||
self._currentRpcCounters[key] = 0
|
||
|
||
def _process_queues(self):
|
||
""" Attempts to process queues """
|
||
while not self._wsMainQueue.empty():
|
||
method, body = self._wsMainQueue.get()
|
||
self.send_websocket_main_request(method, body)
|
||
while not self._wsQueue.empty():
|
||
method, body = self._wsQueue.get()
|
||
self.send_websocket_request(method, body)
|
||
while not self._asyncHttpQueue.empty():
|
||
method, body = self._asyncHttpQueue.get()
|
||
self.send_async_http_post(method, body)
|
||
|
||
def _check_limits(self, rpc_method: str, size: int = 0) -> bool:
|
||
""" Checks if a new/stored request can be sent using counters limits. """
|
||
with self._lock:
|
||
if self._currentRequestCount >= self._maxRequestsPer10s:
|
||
return False
|
||
if self._currentRpcCounters.get(rpc_method, 0) >= self._maxRpcRequestsPer10s:
|
||
return False
|
||
if self._currentDataSize + size > self._maxDataPer30s:
|
||
return False
|
||
return True
|
||
|
||
def _handle_ws_main_connect_status(self, value: QAbstractSocket.SocketState):
|
||
""" Forward self._wsMainClient.socketStateSig """
|
||
if value == QAbstractSocket.SocketState.ConnectedState:
|
||
self._connected = True
|
||
else:
|
||
self._connected = False
|
||
self.wsMainConnexionStatusSig.signal(value)
|
||
|
||
def _handle_ws_connect_status(self, index: int):
|
||
""" Forward _wsMainClient.socketStateSig """
|
||
def handler(value: QAbstractSocket.SocketState):
|
||
""" handler for Signal """
|
||
if value == QAbstractSocket.SocketState.ConnectedState:
|
||
if self._wsClients[index] not in self._connectedWsClients:
|
||
self._connectedWsClients.append(self._wsClients[index])
|
||
else:
|
||
if self._wsClients[index] in self._connectedWsClients:
|
||
self._connectedWsClients.remove(self._wsClients[index])
|
||
self.wsConnexionStatusSigs[index].signal(value)
|
||
return handler
|
||
|
||
def _handle_ws_main_error(self, ws_error: WsError):
|
||
""" Handle self._wsMainClient.socketErrorSig """
|
||
logger.error(f"[main] {int(ws_error.code.value)} - {ws_error.body}")
|
||
self.logSig.signal(ws_error.body)
|
||
|
||
def _handle_ws_error(self, index: int):
|
||
""" Handle _wsClients[index].socketStateSig """
|
||
def handler(ws_error: WsError):
|
||
""" handler for Signal """
|
||
logger.error(f"[{index}] {int(ws_error.code.value)} - {ws_error.body}")
|
||
self.logSig.signal(ws_error.body)
|
||
return handler
|
||
|
||
def _handle_ws_main_log(self, body: str):
|
||
""" Handle self._wsMainClient.logSig """
|
||
logger.debug(f"[main] {body}")
|
||
self.logSig.signal(body)
|
||
|
||
def _handle_ws_log(self, index: int):
|
||
""" Handle _wsClients[index].logSig """
|
||
def handler(body: str):
|
||
""" handler for Signal """
|
||
logger.debug(f"[{index}] {body}")
|
||
self.logSig.signal(body)
|
||
return handler
|
||
|
||
def _handle_ws_main_new_msg(self, resp: str):
|
||
""" Handle self._wsMainClient.responseOkSig """
|
||
timestamp = datetime.now()
|
||
self._wsMainReceivedMessages.append((timestamp, resp))
|
||
self._currentDataSize += len(resp.encode("utf-8"))
|
||
self.wsMainReplyReceivedSig.signal()
|
||
|
||
def _handle_ws_new_msg(self):
|
||
""" Handle _wsClients[index].responseOkSig """
|
||
|
||
def handler(resp: str):
|
||
""" handler for Signal """
|
||
timestamp = datetime.now()
|
||
self._wsReceivedMessages.append((timestamp, resp))
|
||
self._currentDataSize += len(resp.encode("utf-8"))
|
||
self.wsReplyReceivedSig.signal()
|
||
return handler
|
||
|
||
def _handle_async_http_msg_sent(self, index: int):
|
||
""" Handle _asyncHttpClients[index].requestSentSig """
|
||
def handler():
|
||
""" handler for Signal """
|
||
self.httpPostSentSigs[index].signal()
|
||
|
||
return handler
|
||
|
||
def _handle_async_http_net_error(self, index: int):
|
||
""" Handle _asyncHttpClients[index].netRepErrorSig """
|
||
def handler(reply: NetworkReplyError):
|
||
""" handler for Signal """
|
||
logger.error(f"[{index}] {int(reply.code.value)} - {reply.body}")
|
||
self.logSig.signal(reply.body)
|
||
return handler
|
||
|
||
def _handle_async_http_resp_err(self, index: int):
|
||
""" Handle _asyncHttpClients[index].responseErrorSig """
|
||
def handler(reply: ResponseError):
|
||
""" handler for Signal """
|
||
logger.error(f"[{index}] {reply.code} - {reply.body}")
|
||
self.logSig.signal(reply.body)
|
||
return handler
|
||
|
||
def _handle_async_http_new_msg(self, index: int):
|
||
""" Handle _asyncHttpClients[index].logSig """
|
||
def handler(reply: ResponseOk):
|
||
""" handler for Signal """
|
||
timestamp = datetime.now()
|
||
self._httpReceivedMessages.append((timestamp, reply.body))
|
||
self._currentDataSize += reply.size
|
||
self.httpReplyReceivedSigs[index].signal()
|
||
self.httpReplyReceivedSig.signal()
|
||
return handler
|
||
|
||
def _start(self):
|
||
""" Start the Timers """
|
||
self._timer_reset_counters.start(100) # reset counters a font every 100ms
|
||
self._timer_process_queues.start(10) # process_queues every 10 ms
|
||
|
||
def _stop(self):
|
||
""" Stop the Timers """
|
||
self._timer_reset_counters.stop() # reset counters a font every 10ms
|
||
self._timer_process_queues.stop() # process_queues every 1 ms
|
||
|
||
def open(self):
|
||
""" Open Ws Clients connexion """
|
||
if not self._connected:
|
||
self._start()
|
||
self._wsMainClient.open()
|
||
for client in self._wsClients:
|
||
client.open()
|
||
|
||
def close(self):
|
||
""" Close Ws Clients connexion """
|
||
if self._connected:
|
||
self._connected = False
|
||
self._stop()
|
||
self._wsMainClient.close()
|
||
for client in self._wsClients:
|
||
client.close()
|
||
|
||
def send_websocket_main_request(self, rpc_method: str, message: str):
|
||
""" Sends a WebSocket message with a specific RPC method. """
|
||
if not rpc_method in RPC_WS_REQ:
|
||
# must never happen
|
||
logger.debug(f"Wrong Rpc Request {rpc_method}.")
|
||
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
|
||
size = len(message.encode("utf-8"))
|
||
if not self._check_limits(rpc_method, size) or self._wsMainClient.get_socket_state() != QAbstractSocket.SocketState.ConnectedState:
|
||
self._wsMainQueue.put((rpc_method, message)) # Queue if limits reached
|
||
return
|
||
with self._lock:
|
||
self._currentRequestCount += 1
|
||
self._currentRpcCounters[rpc_method] += 1
|
||
self._currentDataSize += size
|
||
self._wsMainClient.send_msg(message)
|
||
|
||
def send_websocket_request(self, rpc_method: str, message: str):
|
||
""" Sends a WebSocket message with a specific RPC method. """
|
||
if not rpc_method in RPC_WS_REQ:
|
||
# must never happen
|
||
logger.debug(f"Wrong Rpc Request {rpc_method}.")
|
||
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
|
||
size = len(message.encode("utf-8"))
|
||
if not self._check_limits(rpc_method, size) or not self._connectedWsClients:
|
||
self._wsQueue.put((rpc_method, message)) # Queue if limits reached
|
||
return
|
||
with self._lock:
|
||
self._currentRequestCount += 1
|
||
self._currentRpcCounters[rpc_method] += 1
|
||
self._currentDataSize += size
|
||
client = self._connectedWsClients.pop(0) # Pop and use a connected client
|
||
client.send_msg(message)
|
||
self._connectedWsClients.append(client) # Reinsert into the list of connected clients
|
||
|
||
def send_async_http_post(self, rpc_method: str, body: Union[bytes, str]):
|
||
""" Sends a HTTP Post message with a specific RPC method. """
|
||
if not rpc_method in RPC_HTTP_REQ:
|
||
# must never happen
|
||
logger.debug(f"Wrong Rpc Request {rpc_method}.")
|
||
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
|
||
size = len(body) if body else 0
|
||
if not self._check_limits(rpc_method, size):
|
||
self._asyncHttpQueue.put((rpc_method, body))
|
||
return
|
||
with self._lock:
|
||
self._currentRequestCount += 1
|
||
self._currentRpcCounters[rpc_method] += 1
|
||
self._currentDataSize += size
|
||
client = self._asyncHttpClients.pop(0)
|
||
client.send_post(body)
|
||
self._asyncHttpClients.append(client)
|
||
|
||
def send_http_post(self, rpc_method: str, body: Union[bytes, str]) -> Union[NetworkReplyError, ResponseError, ResponseOk]:
|
||
""" Sends a HTTP Post message with a specific RPC method. """
|
||
if rpc_method not in RPC_HTTP_REQ:
|
||
# must never happen
|
||
logger.debug(f"Wrong Rpc Request {rpc_method}.")
|
||
raise RuntimeError(f"Wrong Rpc Request {rpc_method}.")
|
||
size = len(body) if body else 0
|
||
while True:
|
||
if self._check_limits(rpc_method, size) and self._syncHttpClients:
|
||
client = self._syncHttpClients.pop(0)
|
||
break
|
||
response = client.send_post(body)
|
||
with self._lock:
|
||
self._currentRequestCount += 1
|
||
self._currentRpcCounters[rpc_method] += 1
|
||
self._currentDataSize += size
|
||
if isinstance(response, ResponseOk) or isinstance(response, ResponseError):
|
||
self._currentDataSize += response.size
|
||
self._syncHttpClients.append(client)
|
||
# Use the first available synchronous HTTP client
|
||
return response
|
||
|
||
def get_count_ws_main_msgs(self):
|
||
""" Get count of the fifo list of messages from main websocket """
|
||
return len(self._wsMainReceivedMessages) if self._wsMainReceivedMessages else 0
|
||
|
||
def read_first_ws_main_msg(self):
|
||
""" Read and delete the first main WS message. """
|
||
return self._wsMainReceivedMessages.popleft() if self._wsMainReceivedMessages else None
|
||
|
||
def read_last_ws_main_msg(self):
|
||
""" Read and delete the last main WS message. """
|
||
return self._wsMainReceivedMessages.pop() if self._wsMainReceivedMessages else None
|
||
|
||
def get_count_ws_msgs(self):
|
||
""" Get count of the fifo list of messages from websockets """
|
||
return len(self._wsReceivedMessages) if self._wsReceivedMessages else 0
|
||
|
||
def read_first_ws_msg(self):
|
||
""" Read and delete the first WS message. """
|
||
return self._wsReceivedMessages.popleft() if self._wsReceivedMessages else None
|
||
|
||
def read_last_ws_msg(self):
|
||
""" Read and delete the last WS message. """
|
||
return self._wsReceivedMessages.pop() if self._wsReceivedMessages else None
|
||
|
||
def get_count_http_msgs(self):
|
||
""" Get count of the fifo list of messages from http clients """
|
||
return len(self._httpReceivedMessages) if self._httpReceivedMessages else 0
|
||
|
||
def read_first_http_msg(self):
|
||
""" Read and delete the first Http Async message. """
|
||
return self._httpReceivedMessages.popleft() if self._httpReceivedMessages else None
|
||
|
||
def read_last_http_msg(self):
|
||
""" Read and delete the last Http Async message. """
|
||
return self._httpReceivedMessages.pop() if self._httpReceivedMessages else None
|
||
|
||
def clean_up(self):
|
||
""" Close connexions and stop timers """
|
||
self.close()
|
||
self._stop()
|
||
|
||
|
||
class WsReqGen:
|
||
""" Generate requests for Solana Websocket client"""
|
||
@classmethod
|
||
def account_subscribe(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiAccountEncoding] = None) -> (int, str, AccountSubscribe):
|
||
""" Subscribe to an account to receive notifications when the lamports or data change.
|
||
Args:
|
||
pubkey_to_use (Pubkey): Account pubkey.
|
||
commitment_to_use (Optional[CommitmentLevel]): Commitment level.
|
||
encoding_to_use (Optional[UiAccountEncoding]): Encoding to use.
|
||
Returns:
|
||
int, str, AccountSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_account_info_config = None if commitment_to_use is None and encoding_to_use is None else RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use)
|
||
req = AccountSubscribe(pubkey_to_use, rpc_account_info_config, next_id)
|
||
return next_id, "accountSubscribe", req
|
||
|
||
@classmethod
|
||
def account_unsubscribe(cls, subscription_id: int) -> (int, str, AccountUnsubscribe):
|
||
""" Unsubscribe from transaction logging.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, AccountUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = AccountUnsubscribe(subscription_id, next_id)
|
||
return next_id, "accountUnsubscribe", req
|
||
|
||
@classmethod
|
||
def block_subscribe(cls, rpc_filer: Union[RpcBlockSubscribeFilter, RpcBlockSubscribeFilterMentions] = RpcBlockSubscribeFilter.All, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiTransactionEncoding] = None, transaction_details: Optional[TransactionDetails] = None, show_rewards: Optional[bool] = None, max_supported_transaction_version: Optional[int] = None) -> (int, str, BlockSubscribe):
|
||
""" Subscribe to blocks.
|
||
Args:
|
||
rpc_filer (Union[RpcBlockSubscribeFilter, RpcBlockSubscribeFilterMentions]): filter criteria for the blocks.
|
||
commitment_to_use (Optional[CommitmentLevel]): The commitment level to use.
|
||
encoding_to_use (Optional[UiTransactionEncoding]): Encoding to use.
|
||
transaction_details (Optional[TransactionDetails]): level of transaction detail to return.
|
||
show_rewards (Optional[bool]): whether to populate the rewards array.
|
||
max_supported_transaction_version (Optional[int] ): the max transaction version to return in responses.
|
||
Returns:
|
||
int, str, BlockSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_block_subscribe_config = None if commitment_to_use is None and encoding_to_use is None and transaction_details is None and show_rewards is None and max_supported_transaction_version is None else RpcBlockSubscribeConfig(
|
||
commitment = commitment_to_use, encoding = encoding_to_use, transaction_details = transaction_details,
|
||
show_rewards = show_rewards, max_supported_transaction_version = max_supported_transaction_version)
|
||
req = BlockSubscribe(rpc_filer, rpc_block_subscribe_config, next_id)
|
||
return next_id, "blockSubscribe", req
|
||
|
||
@classmethod
|
||
def block_unsubscribe(cls, subscription_id: int) -> (int, str, BlockUnsubscribe):
|
||
""" Unsubscribe from blocks.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, BlockSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = BlockUnsubscribe(subscription_id, next_id)
|
||
return next_id, "blockUnsubscribe", req
|
||
|
||
@classmethod
|
||
def logs_subscribe(cls, rpc_filer: Union[RpcTransactionLogsFilter, RpcTransactionLogsFilterMentions] = RpcTransactionLogsFilter.All, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, LogsSubscribe):
|
||
""" Subscribe to transaction logging.
|
||
Args:
|
||
rpc_filer (Union[RpcTransactionLogsFilter, RpcTransactionLogsFilterMentions]): filter criteria for the logs.
|
||
commitment_to_use (Optional[CommitmentLevel]): The commitment level to use.
|
||
Returns:
|
||
int, str, LogsSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_transaction_logs_config = None if commitment_to_use is None else RpcTransactionLogsConfig(commitment = commitment_to_use)
|
||
req = LogsSubscribe(rpc_filer, rpc_transaction_logs_config, next_id)
|
||
return next_id, "logsSubscribe", req
|
||
|
||
@classmethod
|
||
def logs_unsubscribe(cls, subscription_id: int) -> (int, str, LogsUnsubscribe):
|
||
""" Unsubscribe from transaction logging.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, LogsUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = LogsUnsubscribe(subscription_id, next_id)
|
||
return next_id, "logsUnsubscribe", req
|
||
|
||
@classmethod
|
||
def program_subscribe(cls, program_id: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiAccountEncoding] = None, data_slice: Optional[DataSliceOpts] = None, filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None) -> (int, str, ProgramSubscribe):
|
||
""" Receive notifications when the lamports or data for a given account owned by the program changes.
|
||
Args:
|
||
program_id (Pubkey): The program ID.
|
||
commitment_to_use (Optional[CommitmentLevel]): Commitment level to use.
|
||
encoding_to_use (Optional[UiAccountEncoding]): Encoding to use.
|
||
data_slice (Optional[DataSliceOpts]): Limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
|
||
filters (Optional[Sequence[Union[int, MemcmpOpts]]]): Options to compare a provided series of bytes with program account data at a particular offset. Note: an int entry is converted to a `dataSize` filter.
|
||
Returns:
|
||
int, str, ProgramSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
if commitment_to_use is None and encoding_to_use is None and data_slice is None and filters is None:
|
||
rpc_program_accounts_config = None
|
||
else:
|
||
data_slice_to_use = (
|
||
None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
|
||
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use, data_slice = data_slice_to_use)
|
||
filters_to_use: Optional[List[Union[int, Memcmp]]] = None if filters is None else [x if isinstance(x, int) else Memcmp(*x) for x in filters]
|
||
rpc_program_accounts_config = RpcProgramAccountsConfig(account_config = rpc_account_info_config, filters = filters_to_use)
|
||
req = ProgramSubscribe(program_id, rpc_program_accounts_config, next_id)
|
||
return next_id, "programSubscribe", req
|
||
|
||
@classmethod
|
||
def program_unsubscribe(cls, subscription_id: int) -> (int, str, ProgramUnsubscribe):
|
||
"""Unsubscribe from program account notifications.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, ProgramUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = ProgramUnsubscribe(subscription_id, next_id)
|
||
return next_id, "programUnsubscribe", req
|
||
|
||
@classmethod
|
||
def root_subscribe(cls) -> (int, str, RootSubscribe):
|
||
""" Subscribe to receive notification anytime a new root is set by the validator.
|
||
Returns:
|
||
int, str, RootSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = RootSubscribe(next_id)
|
||
return next_id, "rootSubscribe", req
|
||
|
||
@classmethod
|
||
def root_unsubscribe(cls, subscription_id: int) -> (int, str, RootUnsubscribe):
|
||
""" Unsubscribe from root notifications.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, RootUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = RootUnsubscribe(subscription_id, next_id)
|
||
return next_id, "rootUnsubscribe", req
|
||
|
||
@classmethod
|
||
def signature_subscribe(cls, signature_to_use: Signature, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, SignatureSubscribe):
|
||
""" Subscribe to a transaction signature to receive notification when the transaction is confirmed.
|
||
Args:
|
||
signature_to_use (Signature): The transaction signature to subscribe to.
|
||
commitment_to_use (Optional[CommitmentLevel]): Commitment level.
|
||
Returns:
|
||
int, str, SignatureSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_signature_subscribe_config = None if commitment_to_use is None else RpcSignatureSubscribeConfig(commitment = commitment_to_use)
|
||
req = SignatureSubscribe(signature_to_use, rpc_signature_subscribe_config, next_id)
|
||
return next_id, "signatureSubscribe", req
|
||
|
||
@classmethod
|
||
def signature_unsubscribe(cls, subscription_id: int) -> (int, str, SignatureUnsubscribe):
|
||
""" Unsubscribe from signature notifications.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, RootUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = SignatureUnsubscribe(subscription_id, next_id)
|
||
return next_id, "signatureUnsubscribe", req
|
||
|
||
@classmethod
|
||
def slot_subscribe(cls) -> (int, str, SlotSubscribe):
|
||
""" Subscribe to receive notification anytime a slot is processed by the validator.
|
||
Returns:
|
||
int, str, SlotSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = SlotSubscribe(next_id)
|
||
return next_id, "slotSubscribe", req
|
||
|
||
@classmethod
|
||
def slot_unsubscribe(cls, subscription_id: int) -> (int, str, SlotUnsubscribe):
|
||
"""Unsubscribe from slot notifications.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, SlotUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = SlotUnsubscribe(subscription_id, next_id)
|
||
return next_id, "slotUnsubscribe", req
|
||
|
||
@classmethod
|
||
def slots_updates_subscribe(cls) -> (int, str, SlotsUpdatesSubscribe):
|
||
""" Subscribe to receive a notification from the validator on a variety of updates on every slot.
|
||
Returns:
|
||
int, str, SlotsUpdatesSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = SlotsUpdatesSubscribe(next_id)
|
||
return next_id, "slotsUpdatesSubscribe", req
|
||
|
||
@classmethod
|
||
def slots_updates_unsubscribe(cls, subscription_id: int) -> (int, str, SlotsUpdatesUnsubscribe):
|
||
""" Unsubscribe from slot update notifications.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, SlotsUpdatesUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = SlotsUpdatesUnsubscribe(subscription_id, next_id)
|
||
return next_id, "slotsUpdatesUnsubscribe", req
|
||
|
||
@classmethod
|
||
def vote_subscribe(cls) -> (int, str, VoteSubscribe):
|
||
""" Subscribe to receive notification anytime a new vote is observed in gossip.
|
||
Returns:
|
||
int, str, VoteSubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = VoteSubscribe(next_id)
|
||
return next_id, "voteSubscribe", req
|
||
|
||
@classmethod
|
||
def vote_unsubscribe(cls, subscription_id: int) -> (int, str, VoteUnsubscribe):
|
||
""" Unsubscribe from vote notifications.
|
||
Args:
|
||
subscription_id (int): ID of subscription to cancel.
|
||
Returns:
|
||
int, str, VoteUnsubscribe: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = VoteUnsubscribe(subscription_id, next_id)
|
||
return next_id, "voteUnsubscribe", req
|
||
|
||
|
||
class HttpReqGen:
|
||
""" Generate requests for Solana Http client. """
|
||
@classmethod
|
||
def get_account_info(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = CommitmentLevel.Finalized, encoding_to_use: Optional[UiAccountEncoding] = UiAccountEncoding.Base64, data_slice: Optional[DataSliceOpts] = None) -> (int, str, GetAccountInfo):
|
||
""" Returns all the account info for the specified public key.
|
||
Args:
|
||
pubkey_to_use (Pubkey): Address of account to query
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
encoding_to_use (Optional[UiAccountEncoding]): Encoding for Account data, either "base58" (slow), "base64", or "jsonParsed".
|
||
Default is "base64".
|
||
"base58" is limited to Account data of less than 128 bytes.
|
||
"base64" will return base64 encoded data for Account data of any size.
|
||
"jsonParsed" encoding attempts to use program-specific state parsers to return more human-readable and explicit account state data.
|
||
If jsonParsed is requested but a parser cannot be found, the field falls back to base64 encoding, detectable when the data field is type. (jsonParsed encoding is UNSTABLE).
|
||
data_slice (Optional[DataSliceOpts]): Option to limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
|
||
Returns:
|
||
int, str, GetAccountInfo: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
data_slice_to_use = (
|
||
None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
|
||
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, data_slice = data_slice_to_use, commitment = commitment_to_use)
|
||
req = GetAccountInfo(pubkey_to_use, rpc_account_info_config, next_id)
|
||
return next_id, "getAccountInfo", req
|
||
|
||
@classmethod
|
||
def get_balance(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = CommitmentLevel.Finalized) -> (int, str, GetBalance):
|
||
""" Returns the balance of the account of provided Pubkey.
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of account to query
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetBalance: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = RpcContextConfig(commitment_to_use)
|
||
req = GetBalance(pubkey_to_use, rpc_context_config, next_id)
|
||
return next_id, "getBalance", req
|
||
|
||
@classmethod
|
||
def get_block(cls, slot: int, encoding_to_use: Optional[UiTransactionEncoding] = UiTransactionEncoding.Json, max_supported_transaction_version: Optional[int] = None) -> (int, str, GetBlock):
|
||
""" Returns identity and transaction information about a confirmed block in the ledger.
|
||
Args:
|
||
slot: Slot, as u64 integer.
|
||
encoding_to_use (Optional[UiTransactionEncoding]): Encoding for the returned Transaction, either "json", "jsonParsed",
|
||
"base58" (slow), or "base64". If parameter not provided, the default encoding is JSON.
|
||
max_supported_transaction_version: (optional) The max transaction version to return in
|
||
responses. If the requested transaction is a higher version, an error will be returned
|
||
Returns:
|
||
int, str, GetBlock: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_block_config = RpcBlockConfig(encoding = encoding_to_use, max_supported_transaction_version = max_supported_transaction_version)
|
||
req = GetBlock(slot, rpc_block_config, next_id)
|
||
return next_id, "getBlock", req
|
||
|
||
@classmethod
|
||
def get_block_commitment(cls, slot: int) -> (int, str, GetBlockCommitment):
|
||
""" Fetch the commitment for particular block.
|
||
Args:
|
||
slot (int): Block, identified by Slot.
|
||
Returns:
|
||
int, str, GetBlockCommitment: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetBlockCommitment(slot, next_id)
|
||
return next_id, "getBlockCommitment", req
|
||
|
||
@classmethod
|
||
def get_block_height(cls, commitment_to_use: CommitmentLevel = CommitmentLevel.Finalized) -> (int, str, GetBlockHeight):
|
||
""" Returns the current block height of the node.
|
||
Args:
|
||
commitment_to_use (CommitmentLevel): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetBlockHeight: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
|
||
req = GetBlockHeight(rpc_context_config, next_id)
|
||
return next_id, "getBlockHeight", req
|
||
|
||
@classmethod
|
||
def get_block_production(cls, identity_to_use: Optional[Pubkey] = None, range_to_use: Optional[RpcBlockProductionConfigRange] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetBlockProduction):
|
||
""" Returns recent block production information from the current or previous epoch.
|
||
Args:
|
||
identity_to_use (Optional[Pubkey]): Only return results for this validator identity (base-58 encoded)
|
||
range_to_use (Optional[RpcBlockProductionConfigRange]): Slot range to return block production for. If parameter not provided, defaults to current epoch
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetBlockProduction: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_block_production_config = None if identity_to_use is None and range_to_use is None and commitment_to_use is None else RpcBlockProductionConfig(
|
||
identity = identity_to_use, range = range_to_use, commitment = commitment_to_use)
|
||
req = GetBlockProduction(rpc_block_production_config, next_id)
|
||
return next_id, "getBlockProduction", req
|
||
|
||
@classmethod
|
||
def get_block_time(cls, slot: int) -> (int, str, GetBlockTime):
|
||
""" Fetch the estimated production time of a block.
|
||
Args:
|
||
slot (int): Block, identified by Slot.
|
||
Returns:
|
||
int, str, GetBlockTime: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetBlockTime(slot, next_id)
|
||
return next_id, "getBlockTime", req
|
||
|
||
@classmethod
|
||
def get_blocks(cls, start_slot: int, end_slot: Optional[int] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetBlocks):
|
||
""" Returns a list of confirmed blocks.
|
||
Args:
|
||
start_slot (int): Start slot, as u64 integer.
|
||
end_slot (Optional[int]): End slot, as u64 integer.
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetBlocks: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetBlocks(start_slot, end_slot, commitment_to_use, next_id)
|
||
return next_id, "getBlocks", req
|
||
|
||
@classmethod
|
||
def get_blocks_with_limit(cls, start_slot: int, limit_to_use: Optional[int] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetBlocksWithLimit):
|
||
""" Returns a list of confirmed blocks.
|
||
Args:
|
||
start_slot (int): Start slot, as u64 integer.
|
||
limit_to_use (Optional[int]): limit, as u64 integer (must be no more than 500,000 blocks higher than the startSlot.
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".)
|
||
Returns:
|
||
int, str, GetBlocksWithLimit: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetBlocksWithLimit(start_slot, limit_to_use, commitment_to_use, next_id)
|
||
return next_id, "getBlocksWithLimit", req
|
||
|
||
@classmethod
|
||
def get_cluster_nodes(cls) -> (int, str, GetClusterNodes):
|
||
""" Returns information about all the nodes participating in the cluster.
|
||
Returns:
|
||
int, str, GetClusterNodes: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetClusterNodes(next_id)
|
||
return next_id, "getClusterNodes", req
|
||
|
||
@classmethod
|
||
def get_epoch_info(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetEpochInfo):
|
||
""" Returns information about the current epoch.
|
||
Args:
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetEpochInfo: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment_to_use)
|
||
req = GetEpochInfo(rpc_context_config, next_id)
|
||
return next_id, "getEpochInfo", req
|
||
|
||
@classmethod
|
||
def get_epoch_schedule(cls) -> (int, str, GetEpochSchedule):
|
||
""" Returns epoch schedule information from this cluster's genesis config.
|
||
Returns:
|
||
int, str, GetEpochSchedule: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetEpochSchedule(next_id)
|
||
return next_id, "getEpochSchedule", req
|
||
|
||
@classmethod
|
||
def get_fee_for_message(cls, message_to_use: VersionedMessage, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetFeeForMessage):
|
||
""" Returns the fee for a message.
|
||
Args:
|
||
message_to_use (VersionedMessage): Message that the fee is requested for.
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetFeeForMessage: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetFeeForMessage(message_to_use, commitment_to_use, next_id)
|
||
return next_id, "getFeeForMessage", req
|
||
|
||
@classmethod
|
||
def get_first_available_block(cls) -> (int, str, GetFirstAvailableBlock):
|
||
""" Returns the slot of the lowest confirmed block that has not been purged from the ledger.
|
||
Returns:
|
||
int, str, GetFirstAvailableBlock: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetFirstAvailableBlock(next_id)
|
||
return next_id, "getFirstAvailableBlock", req
|
||
|
||
@classmethod
|
||
def get_genesis_hash(cls) -> (int, str, GetGenesisHash):
|
||
""" Returns the genesis hash.
|
||
Returns:
|
||
int, str, GetGenesisHash: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetGenesisHash(next_id)
|
||
return next_id, "getGenesisHash", req
|
||
|
||
@classmethod
|
||
def get_health(cls) -> (int, str, GetHealth):
|
||
""" Returns the current health of the node.
|
||
A healthy node is one that is within HEALTH_CHECK_SLOT_DISTANCE slots of the latest cluster confirmed slot.
|
||
Returns:
|
||
int, str, GetHealth: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetHealth(next_id)
|
||
return next_id, "getHealth", req
|
||
|
||
@classmethod
|
||
def get_highest_snapshot_slot(cls) -> (int, str, GetHighestSnapshotSlot):
|
||
""" Returns the highest slot information that the node has snapshots for.
|
||
Returns:
|
||
int, str, GetHighestSnapshotSlot: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetHighestSnapshotSlot(next_id)
|
||
return next_id, "getHighestSnapshotSlot", req
|
||
|
||
@classmethod
|
||
def get_identity(cls) -> (int, str, GetIdentity):
|
||
""" Returns the identity pubkey for the current node
|
||
Returns:
|
||
int, str, GetIdentity: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetIdentity(next_id)
|
||
return next_id, "getIdentity", req
|
||
|
||
@classmethod
|
||
def get_inflation_governor(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetInflationGovernor):
|
||
""" Returns the current inflation governor.
|
||
Args:
|
||
commitment_to_use: Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetInflationGovernor: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetInflationGovernor(commitment_to_use, next_id)
|
||
return next_id, "getInflationGovernor", req
|
||
|
||
@classmethod
|
||
def get_inflation_rate(cls) -> (int, str, GetInflationRate):
|
||
""" Returns the specific inflation values for the current epoch.
|
||
Returns:
|
||
int, str, GetInflationRate: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetInflationRate(next_id)
|
||
return next_id, "getInflationRate", req
|
||
|
||
@classmethod
|
||
def get_inflation_reward(cls, pubkeys_to_use: Sequence[Pubkey], epoch_to_use: Optional[int] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetInflationReward):
|
||
""" Returns the inflation / staking reward for a list of addresses for an epoch.
|
||
Args:
|
||
pubkeys_to_use (Sequence[Pubkey]): An array of addresses to query, as base-58 encoded strings
|
||
epoch_to_use (Optional[int]): An epoch for which the reward occurs. If omitted, the previous epoch will be used
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized" or "confirmed".
|
||
Returns:
|
||
int, str, GetInflationReward: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_epoch_config = None if epoch_to_use is None and commitment_to_use is None else RpcEpochConfig(epoch_to_use, commitment_to_use)
|
||
req = GetInflationReward(pubkeys_to_use, rpc_epoch_config, next_id)
|
||
return next_id, "getInflationReward", req
|
||
|
||
@classmethod
|
||
def get_largest_accounts(cls, commitment_to_use: Optional[CommitmentLevel] = None, filter_to_use: Optional[RpcLargestAccountsFilter] = None) -> (int, str, GetLargestAccounts):
|
||
""" Returns the 20 largest accounts, by lamport balance.
|
||
Args:
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
filter_to_use (Optional[RpcLargestAccountsFilter]): Filter results by account type; currently supported: circulating|nonCirculating.
|
||
Returns:
|
||
int, str, GetLargestAccounts: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetLargestAccounts(commitment_to_use, filter_to_use, next_id)
|
||
return next_id, "getLargestAccounts", req
|
||
|
||
@classmethod
|
||
def get_latest_blockhash(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetLatestBlockhash):
|
||
""" Returns the latest block hash from the ledger. Response also includes the last valid block height.
|
||
Args:
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetLatestBlockhash: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment_to_use)
|
||
req = GetLatestBlockhash(rpc_context_config, next_id)
|
||
return next_id, "getLatestBlockhash", req
|
||
|
||
@classmethod
|
||
def get_leader_schedule(cls, epoch_to_use: Optional[int] = None, identity_to_use: Optional[Pubkey] = None, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetLeaderSchedule):
|
||
""" Returns the leader schedule for an epoch.
|
||
Args:
|
||
epoch_to_use (Optional[int]): Fetch the leader schedule for the epoch that corresponds to the provided slot. If unspecified, the leader schedule for the current epoch is fetched.
|
||
identity_to_use (Optional[Pubkey]): Only return results for this validator identity (base-58 encoded)
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetLeaderSchedule: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_leader_schedule_config = None if identity_to_use is None and commitment_to_use is None else RpcLeaderScheduleConfig(identity_to_use, commitment_to_use)
|
||
req = GetLeaderSchedule(epoch_to_use, rpc_leader_schedule_config, next_id)
|
||
return next_id, "getLeaderSchedule", req
|
||
|
||
@classmethod
|
||
def get_max_retransmit_slot(cls) -> (int, str, GetMaxRetransmitSlot):
|
||
""" Get the max slot seen from retransmit stage.
|
||
Returns:
|
||
int, str, GetMaxRetransmitSlot: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetMaxRetransmitSlot(next_id)
|
||
return next_id, "getMaxRetransmitSlot", req
|
||
|
||
@classmethod
|
||
def get_max_shred_insert_slot(cls) -> (int, str, GetMaxShredInsertSlot):
|
||
""" Get the max slot seen from after shred insert.
|
||
Returns:
|
||
int, str, GetAccountInfo: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetMaxShredInsertSlot(next_id)
|
||
return next_id, "getMaxShredInsertSlot", req
|
||
|
||
@classmethod
|
||
def get_minimum_balance_for_rent_exemption(cls, min_size: int, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetMinimumBalanceForRentExemption):
|
||
""" Returns minimum balance required to make account rent exempt.
|
||
Args:
|
||
min_size (int): Account data length.
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetMinimumBalanceForRentExemption: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetMinimumBalanceForRentExemption(min_size, commitment_to_use, next_id)
|
||
return next_id, "getMinimumBalanceForRentExemption", req
|
||
|
||
@classmethod
|
||
def get_multiple_accounts(cls, pubkeys_to_use: Sequence[Pubkey], commitment_to_use: Optional[CommitmentLevel] = CommitmentLevel.Finalized, encoding_to_use: Optional[UiAccountEncoding] = UiAccountEncoding.Base64, data_slice: Optional[DataSliceOpts] = None) -> (int, str, GetMultipleAccounts):
|
||
""" Returns all the account info for a list of public keys.
|
||
Args:
|
||
pubkeys_to_use (Sequence[Pubkey]): list of Pubkeys to query
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed". Default to finalized.
|
||
encoding_to_use (Optional[UiAccountEncoding]): Encoding for Account data, either "base58" (slow) or "base64".
|
||
"base58" is limited to Account data of less than 128 bytes.
|
||
"base64" will return base64 encoded data for Account data of any size.
|
||
data_slice (Optional[DataSliceOpts]): (optional) Option to limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
|
||
Returns:
|
||
int, str, GetMultipleAccounts: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
data_slice_to_use = (None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
|
||
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use, data_slice = data_slice_to_use)
|
||
req = GetMultipleAccounts(pubkeys_to_use, rpc_account_info_config, next_id)
|
||
return next_id, "getMultipleAccounts", req
|
||
|
||
@classmethod
|
||
def get_program_accounts(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, encoding_to_use: Optional[UiAccountEncoding] = None, data_slice: Optional[DataSliceOpts] = None, filters: Optional[Sequence[Union[int, MemcmpOpts]]] = None) -> (int, str, GetProgramAccounts):
|
||
""" Returns all accounts owned by the provided program Pubkey.
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of program
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
encoding_to_use (Optional[UiAccountEncoding]): Encoding for the returned Transaction, either jsonParsed", "base58" (slow), or "base64".
|
||
data_slice (Optional[DataSliceOpts]): Limit the returned account data using the provided `offset`: <usize> and `length`: <usize> fields; only available for "base58" or "base64" encoding.
|
||
filters (Optional[Sequence[Union[int, MemcmpOpts]]]): Options to compare a provided series of bytes with program account data at a particular offset. Note: an int entry is converted to a `dataSize` filter.
|
||
Returns:
|
||
int, str, GetProgramAccounts: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
data_slice_to_use = (None if data_slice is None else UiDataSliceConfig(offset = data_slice.offset, length = data_slice.length))
|
||
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, commitment = commitment_to_use, data_slice = data_slice_to_use)
|
||
filters_to_use = (None if filters is None else [x if isinstance(x, int) else Memcmp(offset = x.offset, bytes_ = x.bytes) for x in filters])
|
||
rpc_program_accounts_config = RpcProgramAccountsConfig(rpc_account_info_config, filters_to_use)
|
||
req = GetProgramAccounts(pubkey_to_use, rpc_program_accounts_config, next_id)
|
||
return next_id, "getProgramAccounts", req
|
||
|
||
@classmethod
|
||
def get_recent_performance_samples(cls, limit: int) -> (int, str, GetRecentPerformanceSamples):
|
||
""" Returns a list of recent performance samples, in reverse slot order. Performance samples are taken every 60 seconds and include the number of transactions and slots that occur in a given time window.
|
||
Args:
|
||
limit (int): number of samples to return (maximum 720)
|
||
Returns:
|
||
int, str, GetRecentPerformanceSamples: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetRecentPerformanceSamples(limit, next_id)
|
||
return next_id, "getRecentPerformanceSamples", req
|
||
|
||
# TODO: where is GetRecentPrioritizationFees in solders
|
||
# @classmethod
|
||
# def getRecentPrioritizationFees(cls, pubkeys_to_use: Sequence[Pubkey]) -> (int, str, GetRecentPrioritizationFees):
|
||
# """ Returns a list of prioritization fees from recent blocks.
|
||
# Args:
|
||
# pubkeys_to_use (Sequence[Pubkey]): An array of Account addresses (up to a maximum of 128 addresses), as base-58 encoded strings
|
||
# Returns:
|
||
# int, str, GetRecentPrioritizationFees: next_id, RPC method name, solders request object
|
||
# """
|
||
# next_id = SharedCounter.get_next_id()
|
||
# req = GetRecentPrioritizationFees(pubkeys_to_use, id)
|
||
# return next_id, "getRecentPrioritizationFees", req
|
||
|
||
@classmethod
|
||
def get_signature_statuses(cls, signatures: Sequence[Signature], search_transaction_history: bool = False) -> (int, str, GetSignatureStatuses):
|
||
""" Returns the statuses of a list of signatures. Unless the `searchTransactionHistory` configuration parameter is included, this method only searches the recent status cache of signatures, which retains statuses for all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots.
|
||
Args:
|
||
signatures (Sequence[Signature]): An array of transaction signatures to confirm.
|
||
search_transaction_history (bool): If true, a Solana node will search its ledger cache for any signatures not found in the recent status cache.
|
||
Returns:
|
||
int, str, GetSignatureStatuses: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_signature_status_config = RpcSignatureStatusConfig(search_transaction_history)
|
||
req = GetSignatureStatuses(signatures, rpc_signature_status_config, next_id)
|
||
return next_id, "getSignatureStatuses", req
|
||
|
||
@classmethod
|
||
def get_signatures_for_address(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None, limit_to_use: Optional[int] = None, before_to_use: Optional[Signature] = None, until_to_use: Optional[Signature] = None) -> (int, str, GetSignaturesForAddress):
|
||
""" Returns signatures for confirmed transactions that include the given address in their accountKeys list. Returns signatures backwards in time from the provided signature or most recent confirmed block
|
||
Args:
|
||
pubkey_to_use (Pubkey): Account address as base-58 encoded string
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
limit_to_use (Optional[int]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
before_to_use (Optional[Signature]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
until_to_use (Optional[Signature]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetSignaturesForAddress: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_signatures_for_address_config = None if commitment_to_use is None and limit_to_use is None and before_to_use is None and until_to_use is None else RpcSignaturesForAddressConfig(before = before_to_use, until = until_to_use, limit = limit_to_use, commitment = commitment_to_use)
|
||
req = GetSignaturesForAddress(pubkey_to_use, rpc_signatures_for_address_config, next_id)
|
||
return next_id, "getSignaturesForAddress", req
|
||
|
||
@classmethod
|
||
def get_slot(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetSlot):
|
||
""" Returns the current slot the node is processing.
|
||
Args:
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetSlot: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
|
||
req = GetSlot(rpc_context_config, next_id)
|
||
return next_id, "getSlot", req
|
||
|
||
@classmethod
|
||
def get_slot_leader(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetSlotLeader):
|
||
""" Returns the current slot leader.
|
||
Args:
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetSlotLeader: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
|
||
req = GetSlotLeader(rpc_context_config, next_id)
|
||
return next_id, "getSlotLeader", req
|
||
|
||
@classmethod
|
||
def get_slot_leaders(cls, start: int, limit: int) -> (int, str, GetSlotLeaders):
|
||
""" Returns the slot leaders for a given slot range.
|
||
Args:
|
||
start (int): Start slot, as u64 integer
|
||
limit (int): Limit, as u64 integer (between 1 and 5,000)
|
||
Returns:
|
||
int, str, GetSlotLeaders: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetSlotLeaders(start, limit, next_id)
|
||
return next_id, "getSlotLeaders", req
|
||
|
||
# TODO: where is GetStakeMinimumDelegation in solders
|
||
# @classmethod
|
||
# def getStakeMinimumDelegation(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetStakeMinimumDelegation):
|
||
# """ Returns the stake minimum delegation, in lamports.
|
||
# Args:
|
||
# commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
# Returns:
|
||
# int, str, GetStakeMinimumDelegation: next_id, RPC method name, solders request object
|
||
# """
|
||
# next_id = SharedCounter.get_next_id()
|
||
# rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
|
||
# req = GetStakeMinimumDelegation(rpc_context_config, id)
|
||
# return next_id, "getStakeMinimumDelegation", req
|
||
|
||
@classmethod
|
||
def get_supply(cls, exclude_non_circulating_accounts_list: bool = True, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetSupply):
|
||
""" Returns information about the current supply.
|
||
Args:
|
||
exclude_non_circulating_accounts_list (bool): exclude non circulating accounts list from response
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetSupply: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_supply_config = RpcSupplyConfig(exclude_non_circulating_accounts_list = exclude_non_circulating_accounts_list, commitment = commitment_to_use)
|
||
req = GetSupply(rpc_supply_config, next_id)
|
||
return next_id, "getSupply", req
|
||
|
||
@classmethod
|
||
def get_token_account_balance(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenAccountBalance):
|
||
""" Returns the token balance of an SPL Token account (UNSTABLE).
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of Token account to query
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetTokenAccountBalance: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetTokenAccountBalance(pubkey_to_use, commitment_to_use, next_id)
|
||
return next_id, "getTokenAccountBalance", req
|
||
|
||
@classmethod
|
||
def get_token_accounts_by_delegate(cls, pubkey_to_use: Pubkey, opts: TokenAccountOpts, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenAccountsByDelegate):
|
||
""" Returns all SPL Token accounts by approved Delegate (UNSTABLE).
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of Token account to query
|
||
opts (TokenAccountOpts): Token account option specifying at least one of `mint` or `programId`.
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetTokenAccountsByDelegate: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
encoding_to_use = opts.encoding
|
||
maybe_data_slice = opts.dataSlice
|
||
data_slice_to_use = (None if maybe_data_slice is None else UiDataSliceConfig(offset = maybe_data_slice.offset, length = maybe_data_slice.length))
|
||
maybe_mint = opts.mint
|
||
maybe_program_id = opts.programId
|
||
filter_to_use: Union[RpcTokenAccountsFilterMint, RpcTokenAccountsFilterProgramId]
|
||
if maybe_mint is not None:
|
||
filter_to_use = RpcTokenAccountsFilterMint(maybe_mint)
|
||
elif maybe_program_id is not None:
|
||
filter_to_use = RpcTokenAccountsFilterProgramId(maybe_program_id)
|
||
else:
|
||
raise ValueError("Please provide one of mint or program_id")
|
||
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, data_slice = data_slice_to_use, commitment = commitment_to_use)
|
||
req = GetTokenAccountsByDelegate(pubkey_to_use, filter_to_use, rpc_account_info_config, next_id)
|
||
return next_id, "getTokenAccountsByDelegate", req
|
||
|
||
@classmethod
|
||
def get_token_accounts_by_owner(cls, pubkey_to_use: Pubkey, opts: TokenAccountOpts, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenAccountsByOwner):
|
||
""" Returns all SPL Token accounts by token owner (UNSTABLE).
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of Token account to query
|
||
opts (TokenAccountOpts): Token account option specifying at least one of `mint` or `programId`.
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetTokenAccountsByOwner: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
encoding_to_use = opts.encoding
|
||
maybe_data_dlice = opts.dataSlice
|
||
data_slice_to_use = (None if maybe_data_dlice is None else UiDataSliceConfig(offset = maybe_data_dlice.offset, length = maybe_data_dlice.length))
|
||
maybe_mint = opts.mint
|
||
maybe_program_id = opts.programId
|
||
filter_to_use: Union[RpcTokenAccountsFilterMint, RpcTokenAccountsFilterProgramId]
|
||
if maybe_mint is not None:
|
||
filter_to_use = RpcTokenAccountsFilterMint(maybe_mint)
|
||
elif maybe_program_id is not None:
|
||
filter_to_use = RpcTokenAccountsFilterProgramId(maybe_program_id)
|
||
else:
|
||
raise ValueError("Please provide one of mint or program_id")
|
||
rpc_account_info_config = RpcAccountInfoConfig(encoding = encoding_to_use, data_slice = data_slice_to_use, commitment = commitment_to_use)
|
||
req = GetTokenAccountsByOwner(pubkey_to_use, filter_to_use, rpc_account_info_config, next_id)
|
||
return next_id, "getTokenAccountsByOwner", req
|
||
|
||
@classmethod
|
||
def get_token_largest_accounts(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenLargestAccounts):
|
||
""" Returns the 20 largest accounts of a particular SPL Token type.
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of Token account to query
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetTokenLargestAccounts: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetTokenLargestAccounts(pubkey_to_use, commitment_to_use, next_id)
|
||
return next_id, "getTokenLargestAccounts", req
|
||
|
||
@classmethod
|
||
def get_token_supply(cls, pubkey_to_use: Pubkey, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTokenSupply):
|
||
"""Returns the total supply of an SPL Token type.
|
||
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of Token account to query
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetTokenSupply: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetTokenSupply(pubkey_to_use, commitment_to_use, next_id)
|
||
return next_id, "getTokenSupply", req
|
||
|
||
@classmethod
|
||
def get_transaction(cls, signature_to_use: Signature, encoding_to_use: Optional[UiTransactionEncoding] = UiTransactionEncoding.Json, commitment_to_use: Optional[CommitmentLevel] = None, max_supported_transaction_version: Optional[int] = None) -> (int, str, GetTransaction):
|
||
""" Returns transaction details for a confirmed transaction
|
||
Args:
|
||
signature_to_use (Signature): Transaction signature, as base-58 encoded string
|
||
encoding_to_use (Optional[UiTransactionEncoding]): Encoding for the returned Transaction
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
max_supported_transaction_version (Optional[int]): max supported transaction version
|
||
Returns:
|
||
int, str, GetTransaction: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_transaction_config = None if encoding_to_use is None and commitment_to_use is None and max_supported_transaction_version is None else RpcTransactionConfig(encoding = encoding_to_use, commitment = commitment_to_use, max_supported_transaction_version = max_supported_transaction_version)
|
||
req = GetTransaction(signature_to_use, rpc_transaction_config, next_id)
|
||
return next_id, "getTransaction", req
|
||
|
||
@classmethod
|
||
def get_transaction_count(cls, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, GetTransactionCount):
|
||
""" Returns the current Transaction count from the ledger
|
||
Args:
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, GetTransactionCount: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment = commitment_to_use)
|
||
req = GetTransactionCount(rpc_context_config, next_id)
|
||
return next_id, "getTransactionCount", req
|
||
|
||
@classmethod
|
||
def get_version(cls) -> (int, str, GetVersion):
|
||
""" Returns the current Solana version running on the node
|
||
Returns:
|
||
int, str, GetVersion: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = GetVersion(next_id)
|
||
return next_id, "getVersion", req
|
||
|
||
@classmethod
|
||
def get_vote_accounts(cls, pubkey_to_use: Optional[Pubkey] = None, commitment_to_use: Optional[CommitmentLevel] = None, keep_unstaked_delinquents: Optional[bool] = None, delinquent_slot_distance: Optional[int] = None) -> (int, str, GetVoteAccounts):
|
||
""" Returns the account info and associated stake for all the voting accounts in the current bank.
|
||
Args:
|
||
pubkey_to_use (Optional[Pubkey]): Only return results for this validator vote address (base-58 encoded)
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
keep_unstaked_delinquents (Optional[bool]): Do not filter out delinquent validators with no stake
|
||
delinquent_slot_distance (Optional[int]): Specify the number of slots behind the tip that a validator must fall to be considered delinquent. NOTE: For the sake of consistency between ecosystem products, it is not recommended that this argument be specified.
|
||
Returns:
|
||
int, str, GetVoteAccounts: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_get_vote_accounts_config = None if pubkey_to_use is None and commitment_to_use is None and keep_unstaked_delinquents is None and delinquent_slot_distance is None else RpcGetVoteAccountsConfig(vote_pubkey = pubkey_to_use, commitment = commitment_to_use, keep_unstaked_delinquents = keep_unstaked_delinquents, delinquent_slot_distance = delinquent_slot_distance)
|
||
req = GetVoteAccounts(rpc_get_vote_accounts_config, next_id)
|
||
return next_id, "getVoteAccounts", req
|
||
|
||
@classmethod
|
||
def is_blockhash_valid(cls, blockhash_to_use: Hash, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, IsBlockhashValid):
|
||
""" Returns whether a blockhash is still valid or not
|
||
Args:
|
||
blockhash_to_use (Hash): the blockhash of the block to evaluate, as base-58 encoded string
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, IsBlockhashValid: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_context_config = None if commitment_to_use is None else RpcContextConfig(commitment_to_use)
|
||
req = IsBlockhashValid(blockhash_to_use, rpc_context_config, next_id)
|
||
return next_id, "isBlockhashValid", req
|
||
|
||
@classmethod
|
||
def minimum_ledger_slot(cls) -> (int, str, MinimumLedgerSlot):
|
||
""" Returns the lowest slot that the node has information about in its ledger.
|
||
Returns:
|
||
int, str, MinimumLedgerSlot: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
req = MinimumLedgerSlot(next_id)
|
||
return next_id, "minimumLedgerSlot", req
|
||
|
||
@classmethod
|
||
def request_airdrop(cls, pubkey_to_use: Pubkey, lamports_to_use: int, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, RequestAirdrop):
|
||
""" Requests an airdrop of lamports to a Pubkey.
|
||
Args:
|
||
pubkey_to_use (Pubkey): Pubkey of account to receive lamports, as base-58 encoded string or public key object.
|
||
lamports_to_use (int): Amount of lamports.
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, RequestAirdrop: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_request_airdrop_config = None if commitment_to_use is None else RpcRequestAirdropConfig(commitment_to_use)
|
||
req = RequestAirdrop(pubkey_to_use, lamports_to_use, rpc_request_airdrop_config, next_id)
|
||
return next_id, "requestAirdrop", req
|
||
|
||
@classmethod
|
||
def send_transaction(cls, txn: Union[VersionedTransaction, Transaction], opts: Optional[TxOpts] = None) -> (int, str, SendRawTransaction):
|
||
""" Send a transaction.
|
||
Args:
|
||
txn (Union[VersionedTransaction, Transaction]): transaction object.
|
||
opts (Optional[TxOpts]): Transaction options.
|
||
Returns:
|
||
int, str, SendRawTransaction: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
btxn = bytes(txn)
|
||
opts_to_use = TxOpts(prefLightCommitment = CommitmentLevel.Finalized) if opts is None else opts
|
||
pref_light_commitment_to_use = CommitmentLevel.Finalized if opts.prefLightCommitment is None else opts.prefLightCommitment
|
||
rpc_send_transaction_config = RpcSendTransactionConfig(skip_preflight = opts_to_use.skipPreflight, preflight_commitment = pref_light_commitment_to_use, encoding = opts_to_use.prefEncoding, max_retries = opts_to_use.maxRetries)
|
||
req = SendRawTransaction(btxn, rpc_send_transaction_config, next_id)
|
||
return next_id, "sendTransaction", req
|
||
|
||
@classmethod
|
||
def simulate_transaction(cls, txn: Union[VersionedTransaction, Transaction], verify_signature: bool, commitment_to_use: Optional[CommitmentLevel] = None) -> (int, str, Union[SimulateVersionedTransaction, SimulateLegacyTransaction]):
|
||
""" Send a transaction.
|
||
Args:
|
||
txn (Union[VersionedTransaction, Transaction]): A Transaction object, a transaction in wire format, or a transaction as base-64 encoded string. The transaction must have a valid blockhash, but is not required to be signed.
|
||
verify_signature (bool): If true the transaction signatures will be verified (default: false).
|
||
commitment_to_use (Optional[CommitmentLevel]): Bank state to query. It can be either "finalized", "confirmed" or "processed".
|
||
Returns:
|
||
int, str, Union[SimulateVersionedTransaction, SimulateLegacyTransaction]: next_id, RPC method name, solders request object
|
||
"""
|
||
next_id = SharedCounter.get_next_id()
|
||
rpc_simulate_transaction_config = RpcSimulateTransactionConfig(sig_verify = verify_signature, commitment = commitment_to_use)
|
||
if isinstance(txn, Transaction):
|
||
req = SimulateLegacyTransaction(txn, rpc_simulate_transaction_config, next_id)
|
||
else:
|
||
req = SimulateVersionedTransaction(txn, rpc_simulate_transaction_config, next_id)
|
||
return next_id, "simulateTransaction", req
|
||
|
||
|
||
class Wallet:
|
||
""" Class Wallet can raise ValueError """
|
||
WrongWalletName = 1 # 'walletName' wrong type (must not be empty string)
|
||
WrongWalletId = 2 # 'walletId' wrong type (must not be empty string)
|
||
WrongWalletPass = 3 # 'walletPass' wrong type (must not be empty string)
|
||
WrongWalletType = 4 # 'walletType' must be equal string 'solana'
|
||
JSONDecodeError = 5 # Cannot decode Json
|
||
def __init__(self, wallet_name: str, wallet_pub_key: str, wallet_priv_key: str, dt_crea: datetime = None, dt_update: datetime = None, main_sol_balance: float = 0, main_tokens: list = None, dev_sol_balance: float = 0, dev_tokens: list = None, wallet_type: str = 'solana'):
|
||
# Checking mandatory parameters
|
||
if not isinstance(wallet_name, str) or not wallet_name.strip():
|
||
raise ValueError(Wallet.WrongWalletName)
|
||
if not isinstance(wallet_pub_key, str) or not wallet_pub_key.strip():
|
||
raise ValueError(Wallet.WrongWalletId)
|
||
if not isinstance(wallet_priv_key, str) or not wallet_priv_key.strip():
|
||
raise ValueError(Wallet.WrongWalletPass)
|
||
if wallet_type != "solana":
|
||
raise ValueError(Wallet.WrongWalletType)
|
||
# Assign required values
|
||
self.walletType = wallet_type
|
||
self.walletName = wallet_name
|
||
self.walletPubKey = wallet_pub_key
|
||
self.walletPrivKey = wallet_priv_key
|
||
# Manage default date values
|
||
self.dtCrea = dt_crea if isinstance(dt_crea, datetime) else datetime.now(timezone.utc)
|
||
self.dtUpdate = dt_update if isinstance(dt_update, datetime) else datetime.now(timezone.utc)
|
||
# Manage the balances and set them to zero if they are negative
|
||
self.mainSolBalance = max(main_sol_balance, 0)
|
||
self.devSolBalance = max(dev_sol_balance, 0)
|
||
# Initialize token lists if not provided
|
||
self.mainTokens = main_tokens if main_tokens is not None else []
|
||
self.devTokens = dev_tokens if dev_tokens is not None else []
|
||
|
||
def to_json(self) -> str:
|
||
""" Convert instance to JSON """
|
||
return json.dumps({"walletType": self.walletType, "walletName": self.walletName, "walletPubKey": self.walletPubKey, "walletPrivKey": self.walletPrivKey, "dtCrea": self.dtCrea.isoformat(), "dtUpdate": self.dtUpdate.isoformat(), "mainSolBalance": self.mainSolBalance, "mainTokens": self.mainTokens, "devSolBalance": self.devSolBalance, "devTokens": self.devTokens})
|
||
|
||
@staticmethod
|
||
def parse_datetime(dt_str):
|
||
""" Parse a date string into a datetime, returning the current time in UTC on error.
|
||
Args:
|
||
dt_str: possible datetime string
|
||
Returns:
|
||
datetime:
|
||
"""
|
||
if isinstance(dt_str, str):
|
||
try:
|
||
return datetime.fromisoformat(dt_str)
|
||
except ValueError:
|
||
logger.warning("Format de date invalide, remplacement par l'heure actuelle.")
|
||
return datetime.now(timezone.utc)
|
||
|
||
@staticmethod
|
||
def from_json(json_string: str):
|
||
""" Create a Wallet instance from a JSON string """
|
||
try:
|
||
data = json.loads(json_string)
|
||
# Checking mandatory parameters
|
||
if 'walletName' not in data or not isinstance(data["walletName"], str) or not data["walletName"].strip():
|
||
raise ValueError(Wallet.WrongWalletName)
|
||
if 'walletPubKey' not in data or not isinstance(data["walletPubKey"], str) or not data["walletPubKey"].strip():
|
||
raise ValueError(Wallet.WrongWalletId)
|
||
if 'walletPrivKey' not in data or not isinstance(data["walletPrivKey"], str) or not data["walletPrivKey"].strip():
|
||
raise ValueError(Wallet.WrongWalletPass)
|
||
if 'walletType' not in data or data["walletType"] != "solana":
|
||
raise ValueError(Wallet.WrongWalletType)
|
||
wallet_name = data["walletName"]
|
||
wallet_pub_key = data["walletPubKey"]
|
||
wallet_priv_key = data["walletPrivKey"]
|
||
wallet_type = data["walletType"]
|
||
# Retrieve and validate dates
|
||
dt_crea = Wallet.parse_datetime(data.get("dtCrea"))
|
||
dt_update = Wallet.parse_datetime(data.get("dtUpdate"))
|
||
# Retrieve and validate the scales, reset them to zero if they are negative
|
||
main_sol_balance = max(data.get("mainSolBalance", 0), 0)
|
||
dev_sol_balance = max(data.get("devSolBalance", 0), 0)
|
||
# Retrieve token lists, initialize to empty list if absent
|
||
main_tokens = data.get("mainTokens", [])
|
||
dev_tokens = data.get("devTokens", [])
|
||
# Create and return the Wallet instance
|
||
return Wallet(wallet_name, wallet_pub_key, wallet_priv_key, dt_crea, dt_update, main_sol_balance, main_tokens, dev_sol_balance, dev_tokens, wallet_type)
|
||
except json.JSONDecodeError:
|
||
raise ValueError(Wallet.JSONDecodeError)
|
||
|
||
|
||
class SimpleMsgBox(QMessageBox):
|
||
""" Simple Class inherit from QMessageBox. """
|
||
def __init__(self, window_title: str, window_text: str, ico: QMessageBox.Icon, parent = None):
|
||
super().__init__(parent)
|
||
self.setLayoutDirection(get_qt_dir())
|
||
self.setOption(QMessageBox.Option.DontUseNativeDialog, True)
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setWindowTitle(window_title)
|
||
self.setText(window_text)
|
||
self.setIcon(ico)
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
center_on_screen(self)
|
||
|
||
|
||
class NewWalletNameAndPassDialog(QDialog):
|
||
""" Dialog to ask for the name and the encryptPass for the new Wallet """
|
||
def __init__(self, parent = None):
|
||
super().__init__(parent)
|
||
self.setLayoutDirection(get_qt_dir())
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setWindowTitle(self.tr("Wallet Name and EncryptPass"))
|
||
main_qflo = QFormLayout()
|
||
self.setLayout(main_qflo)
|
||
self._walletName_Qle = QLineEdit()
|
||
self._walletEncyptPass_Qle = QLineEdit()
|
||
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
|
||
main_qflo.addRow(self.tr("Wallet Name:"), self._walletName_Qle)
|
||
self._toggle_qtbtn = QToolButton()
|
||
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
|
||
self._toggle_qtbtn.setCheckable(True)
|
||
self._toggle_qtbtn.setToolTip(self.tr("Show/Hide Password"))
|
||
self._toggle_qtbtn.clicked.connect(self._toggle_password_visibility)
|
||
pass_layout = QHBoxLayout()
|
||
pass_layout.addWidget(self._walletEncyptPass_Qle)
|
||
pass_layout.addWidget(self._toggle_qtbtn)
|
||
main_qflo.addRow(self.tr("Wallet EncryptPass:"), pass_layout)
|
||
validate_qpbtn = QPushButton(self.tr("Validate"))
|
||
cancel_qpbtn = QPushButton(self.tr("Cancel"))
|
||
button_qdbbox = QDialogButtonBox()
|
||
button_qdbbox.addButton(validate_qpbtn, QDialogButtonBox.ButtonRole.AcceptRole)
|
||
button_qdbbox.addButton(cancel_qpbtn, QDialogButtonBox.ButtonRole.RejectRole)
|
||
validate_qpbtn.clicked.connect(self._validate_and_accept)
|
||
cancel_qpbtn.clicked.connect(self.reject)
|
||
main_qflo.addWidget(button_qdbbox)
|
||
self.walletName = None
|
||
self.walletEncyptPass = None
|
||
|
||
def _toggle_password_visibility(self):
|
||
""" Toggle the password visibility in the QLineEdit """
|
||
is_visible = self._walletEncyptPass_Qle.echoMode() == QLineEdit.EchoMode.Normal
|
||
if is_visible:
|
||
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
|
||
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
|
||
else:
|
||
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Normal)
|
||
self._toggle_qtbtn.setIcon(QIcon(":icoEyeHide"))
|
||
|
||
def _validate_and_accept(self):
|
||
""" validation of the QLineEdit contents """
|
||
# Validation of field 1
|
||
if len(self._walletName_Qle.text().strip()) < 2:
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The Wallet Name must have a minimum of 2 characters."), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
return
|
||
# Validation of field 2 (password)
|
||
if not re.match(r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]{8,}$", self._walletEncyptPass_Qle.text()):
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The Wallet EncryptPass must have a minimum of 1 lower case character, 1 upper case character, 1 numeric character, 1 special character (@$!%*?&), and must have a minimum of 8 characters length."), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
return
|
||
self.walletName = self._walletName_Qle.text().strip()
|
||
self.walletEncyptPass = self._walletEncyptPass_Qle.text()
|
||
# If both fields are valid, the entry is accepted.
|
||
self.accept()
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
size = self.size()
|
||
self.setFixedSize(size)
|
||
center_on_screen(self)
|
||
|
||
|
||
class NewWalletFileDialog(QFileDialog):
|
||
""" File Fialog to save a new wallet file """
|
||
def __init__(self, parent = None):
|
||
super().__init__(parent)
|
||
self.setOption(QFileDialog.Option.DontUseNativeDialog, True)
|
||
self.setWindowTitle(self.tr("Save Content to New Wallet File"))
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setAcceptMode(QFileDialog.AcceptMode.AcceptSave)
|
||
self.setFileMode(QFileDialog.FileMode.AnyFile)
|
||
self.setOption(QFileDialog.Option.DontConfirmOverwrite, True)
|
||
self.setLabelText(QFileDialog.DialogLabel.LookIn, self.tr("Wallets location:"))
|
||
self.setLabelText(QFileDialog.DialogLabel.FileName, self.tr("File name:"))
|
||
self.setLabelText(QFileDialog.DialogLabel.FileType, "File type:")
|
||
self.setLabelText(QFileDialog.DialogLabel.Accept, self.tr("Validate"))
|
||
self.setLabelText(QFileDialog.DialogLabel.Reject, self.tr("Cancel"))
|
||
self.setDirectory(WALLETS_FOLDER)
|
||
self.setDefaultSuffix(WALLET_FILE_EXT)
|
||
self.setNameFilter(self.tr("{desc} (*.{ext})").format(desc = WALLET_FILE_DESC, ext = WALLET_FILE_EXT))
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
size = self.size()
|
||
self.setMinimumSize(size)
|
||
center_on_screen(self)
|
||
|
||
|
||
class LoadWalletPassDialog(QDialog):
|
||
""" Dialog to ask for EncryptPass to open a wallet file """
|
||
def __init__(self, file_name: str, parent = None):
|
||
super().__init__(parent)
|
||
self.setLayoutDirection(get_qt_dir())
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setWindowTitle(self.tr("Wallet EncryptPass for file '{fileName}'").format(fileName = file_name))
|
||
main_qflo = QFormLayout()
|
||
self.setLayout(main_qflo)
|
||
self._walletEncyptPass_Qle = QLineEdit()
|
||
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
|
||
self._toggle_qtbtn = QToolButton()
|
||
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
|
||
self._toggle_qtbtn.setCheckable(True)
|
||
self._toggle_qtbtn.setToolTip(self.tr("Show/Hide Password"))
|
||
self._toggle_qtbtn.clicked.connect(self._toggle_password_visibility)
|
||
pass_layout = QHBoxLayout()
|
||
pass_layout.addWidget(self._walletEncyptPass_Qle)
|
||
pass_layout.addWidget(self._toggle_qtbtn)
|
||
main_qflo.addRow(self.tr("Wallet EncryptPass:"), pass_layout)
|
||
validate_qpbtn = QPushButton(self.tr("Validate"))
|
||
cancel_qpbtn = QPushButton(self.tr("Cancel"))
|
||
button_qdbbox = QDialogButtonBox()
|
||
button_qdbbox.addButton(validate_qpbtn, QDialogButtonBox.ButtonRole.AcceptRole)
|
||
button_qdbbox.addButton(cancel_qpbtn, QDialogButtonBox.ButtonRole.RejectRole)
|
||
validate_qpbtn.clicked.connect(self._validate_and_accept)
|
||
cancel_qpbtn.clicked.connect(self.reject)
|
||
main_qflo.addWidget(button_qdbbox)
|
||
self.walletEncyptPass = None
|
||
|
||
def _toggle_password_visibility(self):
|
||
""" Toggle the password visibility in the QLineEdit """
|
||
is_visible = self._walletEncyptPass_Qle.echoMode() == QLineEdit.EchoMode.Normal
|
||
if is_visible:
|
||
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Password)
|
||
self._toggle_qtbtn.setIcon(QIcon(":icoEyeShow"))
|
||
else:
|
||
self._walletEncyptPass_Qle.setEchoMode(QLineEdit.EchoMode.Normal)
|
||
self._toggle_qtbtn.setIcon(QIcon(":icoEyeHide"))
|
||
|
||
def _validate_and_accept(self):
|
||
""" validation of the QLineEdit content """
|
||
# Validation of field 1
|
||
if len(self._walletEncyptPass_Qle.text().strip()) < 1:
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("You must enter a valid Password."), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
return
|
||
self.walletEncyptPass = self._walletEncyptPass_Qle.text()
|
||
# If field is valid, the entry is accepted
|
||
self.accept()
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
size = self.size()
|
||
self.setFixedSize(size)
|
||
center_on_screen(self)
|
||
|
||
|
||
class LoadWalletFileDialog(QFileDialog):
|
||
""" QFileDialog used to load wallets files """
|
||
def __init__(self, parent = None):
|
||
super().__init__(parent)
|
||
self.setOption(QFileDialog.Option.DontUseNativeDialog, True)
|
||
self.setWindowTitle(self.tr("Load Wallet File"))
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setAcceptMode(QFileDialog.AcceptMode.AcceptOpen)
|
||
self.setFileMode(QFileDialog.FileMode.ExistingFile)
|
||
self.setDirectory(WALLETS_FOLDER)
|
||
self.setDefaultSuffix(WALLET_FILE_EXT)
|
||
self.setLabelText(QFileDialog.DialogLabel.LookIn, self.tr("Wallets location:"))
|
||
self.setLabelText(QFileDialog.DialogLabel.FileName, self.tr("File name:"))
|
||
self.setLabelText(QFileDialog.DialogLabel.FileType, "File type:")
|
||
self.setLabelText(QFileDialog.DialogLabel.Accept, self.tr("Validate"))
|
||
self.setLabelText(QFileDialog.DialogLabel.Reject, self.tr("Cancel"))
|
||
self.setNameFilter(self.tr("{desc} (*.{ext})").format(desc = WALLET_FILE_DESC, ext = WALLET_FILE_EXT))
|
||
# self.setMinimumSize(dialog.width(), dialog.height())
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
size = self.size()
|
||
self.setMinimumSize(size)
|
||
center_on_screen(self)
|
||
|
||
|
||
class SplashScreen(QSplashScreen):
|
||
""" SplashScreen class. """
|
||
|
||
def __init__(self, fade_in_duration = 1500, display_duration = 2000, fade_out_duration = 1500):
|
||
super().__init__(QPixmap(":imgSplash"))
|
||
self.setWindowFlag(Qt.WindowType.FramelessWindowHint) # Remove window border and buttons
|
||
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
|
||
# Apply opacity effect to manage transitions
|
||
self._opacityEffect = QGraphicsOpacityEffect()
|
||
self.setGraphicsEffect(self._opacityEffect)
|
||
# Animation Fade-in
|
||
self._fadeIn = QPropertyAnimation(self._opacityEffect, b"opacity")
|
||
self._fadeIn.setDuration(fade_in_duration)
|
||
self._fadeIn.setStartValue(0)
|
||
self._fadeIn.setEndValue(1)
|
||
self._fadeIn.setEasingCurve(QEasingCurve.Type.InOutQuad)
|
||
# Animation Fade-out
|
||
self.fadeOut = QPropertyAnimation(self._opacityEffect, b"opacity")
|
||
self.fadeOut.setDuration(fade_out_duration)
|
||
self.fadeOut.setStartValue(1)
|
||
self.fadeOut.setEndValue(0)
|
||
self.fadeOut.setEasingCurve(QEasingCurve.Type.InOutQuad)
|
||
# Scheduling the fade-out animation after duration
|
||
QTimer.singleShot(display_duration, self.start_fade_out)
|
||
# Start fade-in animation
|
||
self._fadeIn.start()
|
||
|
||
def start_fade_out(self):
|
||
""" Start the fade-out animation and close the splash screen at the end. """
|
||
self.fadeOut.start()
|
||
self.fadeOut.finished.connect(self.close)
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
center_on_screen(self)
|
||
|
||
|
||
class AboutWindow(QWidget):
|
||
""" About Window Class used to show Modal image about the app """
|
||
def __init__(self, bg_txt: str, parent = None):
|
||
super().__init__(parent)
|
||
self.setWindowTitle(self.tr("About {app}").format(app = APP_NAME_VERSION))
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setFixedSize(900, 600)
|
||
self.setWindowModality(Qt.WindowModality.ApplicationModal)
|
||
self.setWindowFlags(Qt.WindowType.Dialog | Qt.WindowType.FramelessWindowHint | Qt.WindowType.WindowStaysOnTopHint)
|
||
self._bg_qpix = QPixmap(":imgAbout")
|
||
# Centered text placed before the close button so as not to prevent it from being clicked
|
||
self._about_Qlbl = QLabel()
|
||
self._about_Qlbl.setText(bg_txt)
|
||
self._about_Qlbl.setAlignment(Qt.AlignmentFlag.AlignCenter) # Centrer le texte
|
||
self._about_Qlbl.setStyleSheet("font-size: 20px; color: white;") # Style du texte
|
||
self._about_Qlbl.setWordWrap(True) # Activer le retour à la ligne
|
||
self._about_Qlbl.setGeometry(0, 0, self.width(), self.height()) # Occuper toute la fenêtre
|
||
# Window close button
|
||
self._exit_Qpbtn = QPushButton(self)
|
||
self._exit_Qpbtn.setIcon(QIcon(":icoQuit"))
|
||
self._exit_Qpbtn.setFixedSize(24, 24)
|
||
self._exit_Qpbtn.setGeometry(self.width() - 44, 20, 24, 24) # Position et taille
|
||
self._exit_Qpbtn.clicked.connect(self.close)
|
||
self._exit_Qpbtn.move(self.width() - self._exit_Qpbtn.width() - 20, 20)
|
||
|
||
def paintEvent(self, event):
|
||
""" fired on paint event """
|
||
painter = QPainter(self)
|
||
painter.setRenderHint(QPainter.RenderHint.SmoothPixmapTransform)
|
||
# Resize the image to fit the window size
|
||
scaled_image = self._bg_qpix.scaled(self.size(), Qt.AspectRatioMode.IgnoreAspectRatio, Qt.TransformationMode.SmoothTransformation)
|
||
painter.drawPixmap(0, 0, scaled_image)
|
||
painter.setOpacity(0.7)
|
||
# Add a gradient effect
|
||
gradient = QLinearGradient(0, 0, 0, self.height())
|
||
gradient.setColorAt(0, QColor(0, 0, 0, 0)) # Transparent en haut
|
||
gradient.setColorAt(1, QColor(0, 0, 0, 128)) # Sombre en bas
|
||
painter.fillRect(self.rect(), gradient)
|
||
|
||
def keyPressEvent(self, event: QKeyEvent):
|
||
""" Manage keyboard shortcuts to close the window. """
|
||
if event.key() in {Qt.Key.Key_Escape, Qt.Key.Key_Space, Qt.Key.Key_Enter, Qt.Key.Key_Return}:
|
||
print("Fenêtre fermée avec une touche simple.")
|
||
self.close()
|
||
elif event.key() == Qt.Key.Key_F4 and event.modifiers() & Qt.KeyboardModifier.ControlModifier:
|
||
print("Fenêtre fermée avec Ctrl+F4.")
|
||
self.close()
|
||
else:
|
||
# If another key is pressed, leave the default behavior
|
||
super().keyPressEvent(event)
|
||
|
||
def event(self, event):
|
||
""" Monitor window events. """
|
||
if event.type() == QEvent.Type.WindowDeactivate:
|
||
self.close()
|
||
return super().event(event)
|
||
|
||
|
||
class SolanaAdressCheckerWindow(QWidget):
|
||
""" Class used to check info of a solana address """
|
||
def __init__(self, pool: ConnectionPool, parent = None):
|
||
super().__init__(parent)
|
||
self._parent = parent
|
||
self._pool = pool
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setMinimumSize(1024, 600)
|
||
self.setFocus()
|
||
self.setWindowFlags(Qt.WindowType.Dialog)
|
||
layout = QVBoxLayout(self)
|
||
self.setLayout(layout)
|
||
self._solAddress_Qlbl = QLabel()
|
||
self._solAddress_Qled = QLineEdit()
|
||
h_layout1 = QHBoxLayout()
|
||
h_layout1.addWidget(self._solAddress_Qlbl)
|
||
h_layout1.addWidget(self._solAddress_Qled)
|
||
layout.addLayout(h_layout1)
|
||
self._get_account_info_Qpbtn = QPushButton()
|
||
self._get_account_info_Qpbtn.setEnabled(False)
|
||
self._get_account_info_Qpbtn.clicked.connect(self._get_account_info)
|
||
self._get_balance_Qpbtn_Qpbtn = QPushButton()
|
||
self._get_balance_Qpbtn_Qpbtn.setEnabled(False)
|
||
self._get_balance_Qpbtn_Qpbtn.clicked.connect(self._get_balance)
|
||
self._get_program_accounts_Qpbtn = QPushButton()
|
||
self._get_program_accounts_Qpbtn.setEnabled(False)
|
||
self._get_program_accounts_Qpbtn.clicked.connect(self._get_program_accounts)
|
||
self._get_signatures_for_address_Qpbtn = QPushButton()
|
||
self._get_signatures_for_address_Qpbtn.setEnabled(False)
|
||
self._get_signatures_for_address_Qpbtn.clicked.connect(self._get_signatures_for_address)
|
||
self._get_token_account_balance_Qpbtn = QPushButton()
|
||
self._get_token_account_balance_Qpbtn.setEnabled(False)
|
||
self._get_token_account_balance_Qpbtn.clicked.connect(self._get_token_account_balance)
|
||
self._get_token_accounts_by_delegate_Qpbtn = QPushButton()
|
||
self._get_token_accounts_by_delegate_Qpbtn.setEnabled(False)
|
||
self._get_token_accounts_by_delegate_Qpbtn.clicked.connect(self._get_token_accounts_by_delegate)
|
||
self._get_token_accounts_by_owner_Qpbtn = QPushButton()
|
||
self._get_token_accounts_by_owner_Qpbtn.setEnabled(False)
|
||
self._get_token_accounts_by_owner_Qpbtn.clicked.connect(self._get_token_accounts_by_owner)
|
||
self._get_token_supply_Qpbtn = QPushButton()
|
||
self._get_token_supply_Qpbtn.setEnabled(False)
|
||
self._get_token_supply_Qpbtn.clicked.connect(self._get_token_supply)
|
||
h_layout2 = QHBoxLayout()
|
||
h_layout2.addWidget(self._get_account_info_Qpbtn)
|
||
h_layout2.addWidget(self._get_balance_Qpbtn_Qpbtn)
|
||
h_layout2.addWidget(self._get_program_accounts_Qpbtn)
|
||
h_layout2.addWidget(self._get_signatures_for_address_Qpbtn)
|
||
h_layout2.addWidget(self._get_token_account_balance_Qpbtn)
|
||
h_layout2.addWidget(self._get_token_accounts_by_delegate_Qpbtn)
|
||
h_layout2.addWidget(self._get_token_accounts_by_owner_Qpbtn)
|
||
h_layout2.addWidget(self._get_token_supply_Qpbtn)
|
||
layout.addLayout(h_layout2)
|
||
self.result_text_edit = QTextEdit()
|
||
self.result_text_edit.setReadOnly(True)
|
||
layout.addWidget(self.result_text_edit)
|
||
self.update_display()
|
||
# Connects text modification to button activation/deactivation
|
||
self._solAddress_Qled.textChanged.connect(self._toggle_buttons_state)
|
||
|
||
def _get_account_info(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
req_id, rpc_method, req = HttpReqGen.get_account_info(pk_adr)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = GetAccountInfoResp.from_json(response.body)
|
||
if isinstance(resp, GetAccountInfoResp):
|
||
result = f"response : {resp.value}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _get_balance(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = GetBalanceResp.from_json(response.body)
|
||
if isinstance(resp, GetBalanceResp):
|
||
result = f"response : {resp.value}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _get_program_accounts(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = response.body
|
||
if isinstance(resp, GetProgramAccountsResp):
|
||
result = f"response : {resp.to_json()}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _get_signatures_for_address(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = response.body
|
||
if isinstance(resp, GetSignaturesForAddressResp):
|
||
result = f"response : {resp.to_json()}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _get_token_account_balance(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = response.body
|
||
if isinstance(resp, GetTokenAccountBalanceResp):
|
||
result = f"response : {resp.to_json()}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _get_token_accounts_by_delegate(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
req_id, rpc_method, req = HttpReqGen.get_balance(pk_adr)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = response.body
|
||
if isinstance(resp, GetTokenAccountsByDelegateResp):
|
||
result = f"response : {resp.to_json()}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _get_token_accounts_by_owner(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
tokaaccopts = TokenAccountOpts(programId=PK_TOKEN_PROGRAM_ID)
|
||
req_id, rpc_method, req = HttpReqGen.get_token_accounts_by_owner(pk_adr, tokaaccopts, CommitmentLevel.Finalized)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = response.body
|
||
if isinstance(resp, GetTokenAccountsByOwnerResp):
|
||
result = f"response : {resp.to_json()}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _get_token_supply(self):
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
req_id, rpc_method, req = HttpReqGen.get_token_supply(pk_adr, CommitmentLevel.Finalized)
|
||
response = self._pool.send_http_post(rpc_method, req.to_json())
|
||
if isinstance(response, ResponseOk):
|
||
resp = response.body
|
||
if isinstance(resp, GetTokenSupplyResp):
|
||
result = f"response : {resp.to_json()}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response err: {type(resp)} - {resp}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, ResponseError):
|
||
result = f"response err: {response.code} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
elif isinstance(response, NetworkReplyError):
|
||
result = f"response net err: {response.code.value} - {response.body}\n"
|
||
self.result_text_edit.append(result)
|
||
else:
|
||
result = f"response unknown: {type(response)} - {response}\n"
|
||
self.result_text_edit.append(result)
|
||
|
||
def _toggle_buttons_state(self):
|
||
""" Active ou désactive les boutons en fonction du contenu du QLineEdit """
|
||
self._get_account_info_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
self._get_balance_Qpbtn_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
self._get_program_accounts_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
self._get_signatures_for_address_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
self._get_token_account_balance_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
self._get_token_accounts_by_delegate_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
self._get_token_accounts_by_owner_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
self._get_token_supply_Qpbtn.setEnabled(bool(self._solAddress_Qled.text().strip()))
|
||
if self._solAddress_Qled.text().strip() == "":
|
||
self._solAddress_Qled.setStyleSheet("background-color: white;")
|
||
else:
|
||
try:
|
||
adr = self._solAddress_Qled.text().strip()
|
||
pk_adr = Pubkey.from_string(adr)
|
||
if pk_adr.is_on_curve():
|
||
self._solAddress_Qled.setStyleSheet("background-color: lightgreen;")
|
||
elif len(adr) != 44 or len(base58.b58decode(adr)) != 32:
|
||
self._solAddress_Qled.setStyleSheet("background-color: lighred;")
|
||
else:
|
||
self._solAddress_Qled.setStyleSheet("background-color: green;")
|
||
except ValueError:
|
||
self._solAddress_Qled.setStyleSheet("background-color: red;")
|
||
|
||
def update_display(self):
|
||
""" update the display of the widget """
|
||
global is_rtl
|
||
self.setWindowTitle(self.tr("Solana Address Checker"))
|
||
self.setLayoutDirection(get_qt_dir())
|
||
self._solAddress_Qlbl.setText(self.tr("Address:"))
|
||
self._solAddress_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
|
||
self._get_account_info_Qpbtn.setText(self.tr("Account Info"))
|
||
self._get_balance_Qpbtn_Qpbtn.setText(self.tr("Balance"))
|
||
self._get_program_accounts_Qpbtn.setText(self.tr("Program Accounts"))
|
||
self._get_signatures_for_address_Qpbtn.setText(self.tr("Signatures"))
|
||
self._get_token_account_balance_Qpbtn.setText(self.tr("Token Account Balance"))
|
||
self._get_token_accounts_by_delegate_Qpbtn.setText(self.tr("Token Account by Delegates"))
|
||
self._get_token_accounts_by_owner_Qpbtn.setText(self.tr("Token Account by Owner"))
|
||
self._get_token_supply_Qpbtn.setText(self.tr("Token Supply"))
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
center_on_screen(self)
|
||
|
||
def closeEvent(self, event):
|
||
if isinstance(self._parent, MainWindow):
|
||
self._parent.solana_addr_check = False
|
||
return super().closeEvent(event)
|
||
|
||
|
||
class HelpWindow(QWidget):
|
||
""" Class used to display help from pdf file (stored in qrc) """
|
||
def __init__(self, pdf_doc: str, parent = None):
|
||
super().__init__(parent)
|
||
self.setWindowTitle(self.tr("Help - {app}").format(app = APP_NAME_VERSION))
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setFixedSize(900, 600)
|
||
self.setWindowModality(Qt.WindowModality.ApplicationModal)
|
||
self.setWindowFlags(Qt.WindowType.Dialog | Qt.WindowType.MSWindowsFixedSizeDialogHint)
|
||
# Create PDF document loader and PDF viewer
|
||
self._pdfDoc = QPdfDocument()
|
||
self._pdfView = QPdfView()
|
||
self._pdfView.setDocument(self._pdfDoc)
|
||
# Configure the central widget to display the PDF
|
||
self._main_Qvlo = QVBoxLayout()
|
||
self.setLayout(self._main_Qvlo)
|
||
self._main_Qvlo.setSpacing(0)
|
||
self._main_Qvlo.setContentsMargins(QMargins(0, 0, 0, 0))
|
||
self._main_Qvlo.addWidget(self._pdfView)
|
||
self._pdfDoc.load(pdf_doc)
|
||
if self._pdfDoc.status() != QPdfDocument.Status.Ready:
|
||
self.close()
|
||
else:
|
||
self._pdfView.setPageMode(QPdfView.PageMode.MultiPage)
|
||
self._pdfView.setZoomMode(QPdfView.ZoomMode.FitToWidth)
|
||
self._pdfView.setPageSpacing(0)
|
||
self._pdfView.setDocumentMargins(QMargins(0, 0, 0, 0))
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
super().showEvent(event)
|
||
center_on_screen(self)
|
||
|
||
|
||
class WalletTokensTableModel(QAbstractTableModel):
|
||
""" Table Model for the tokens in a wallet """
|
||
def __init__(self, parent = None):
|
||
super().__init__(parent)
|
||
self._headers = [self.tr("Token Id"), self.tr("Buy Price"), self.tr("Buy Time"), self.tr("Action")]
|
||
self._dataList = []
|
||
|
||
def rowCount(self, parent = QModelIndex()):
|
||
""" return the number of rows in the table """
|
||
return len(self._dataList)
|
||
|
||
def columnCount(self, parent = QModelIndex()):
|
||
""" return the number of columns in the table """
|
||
return len(self._headers)
|
||
|
||
def headerData(self, section, orientation, role = Qt.ItemDataRole.DisplayRole):
|
||
""" Définit les en-têtes des colonnes. """
|
||
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
|
||
return self._headers[section]
|
||
return None
|
||
|
||
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole) -> Any:
|
||
""" return the data to display in the QTableView """
|
||
if not index.isValid():
|
||
return None
|
||
elif role == Qt.ItemDataRole.DisplayRole:
|
||
token = self._dataList[index.row()]
|
||
if index.column() == 0: # Timestamp
|
||
return token["token_id"]
|
||
elif index.column() == 1: # Level
|
||
return token["buy_price"]
|
||
elif index.column() == 2: # Message
|
||
return token["buy_time"]
|
||
elif index.column() == 3: # Action
|
||
return ""
|
||
elif role == Qt.ItemDataRole.TextAlignmentRole:
|
||
global is_rtl
|
||
if index.column() == 0:
|
||
return get_qt_align_vchr()
|
||
elif index.column() == 1:
|
||
return get_qt_align_center()
|
||
elif index.column() == 2:
|
||
return get_qt_align_vchl(not is_rtl)
|
||
elif role == Qt.ItemDataRole.DecorationRole and index.column() == 0:
|
||
return QIcon(":icoWallet")
|
||
|
||
return None
|
||
|
||
def set_headers(self, new_headers: list):
|
||
""" used to update the header column content (on lang change) """
|
||
self._headers = new_headers
|
||
self.headerDataChanged.emit(Qt.Orientation.Horizontal, 0, 3)
|
||
self.layoutChanged.emit()
|
||
|
||
def set_data(self, data: list):
|
||
""" change the data of the table model """
|
||
self._dataList = data
|
||
self.layoutChanged.emit()
|
||
|
||
|
||
class WalletWidget(QWidget):
|
||
""" Widget to display the content of a wallet """
|
||
def __init__(self, parent = None):
|
||
super().__init__(parent)
|
||
self.setWindowIcon(QIcon(":icoWallet"))
|
||
self.wallet = None
|
||
self.fileKey = None
|
||
self.filePath = None
|
||
self._main_Qglo = QGridLayout()
|
||
self.setLayout(self._main_Qglo)
|
||
# Line 1
|
||
self._walletName_Qlbl = QLabel()
|
||
self.walletName_Qled = QLineEdit()
|
||
self._saveWallet_Qpbtn = QPushButton()
|
||
self._init_line1()
|
||
# Line 2
|
||
self._walletPubKey_Qlbl = QLabel()
|
||
self.walletPubKey_Qled = QLineEdit()
|
||
self._copyToClipboard_Qpbtn = QPushButton()
|
||
self._init_line2()
|
||
# Line 3
|
||
self._realBalance_Qlbl = QLabel()
|
||
self.realBalance_Qled = QLineEdit()
|
||
self._demoBalance_Qlbl = QLabel()
|
||
self.demoBalance_Qled = QLineEdit()
|
||
self._init_line3()
|
||
# Line 4
|
||
self._tokens_Qlbl = QLabel()
|
||
self._init_line4()
|
||
# # Line 5
|
||
self._tokens_qtbvmb = QTableView()
|
||
self._tokens_wtQtm = WalletTokensTableModel()
|
||
self._init_tokens_qtbvmb()
|
||
self.update_display()
|
||
self.populate_table()
|
||
|
||
def set_wallet(self, wallet: Wallet, file_pass: str, file_path: str):
|
||
""" changing the wallet """
|
||
self.save_wallet()
|
||
self.fileKey = file_pass
|
||
self.filePath = file_path
|
||
self.wallet = wallet
|
||
self.update_wallet_display()
|
||
|
||
def update_wallet_display(self):
|
||
""" refresh the display of the wallet data """
|
||
if self.wallet is not None:
|
||
self.walletPubKey_Qled.setText(self.wallet.walletPubKey)
|
||
self.walletName_Qled.setText(self.wallet.walletName)
|
||
self.realBalance_Qled.setText(f"{self.wallet.mainSolBalance:.9f}")
|
||
self.demoBalance_Qled.setText(f"{self.wallet.devSolBalance:.9f}")
|
||
else:
|
||
self.walletPubKey_Qled.setText("")
|
||
self.walletName_Qled.setText("")
|
||
self.realBalance_Qled.setText(f"{0:.9f}")
|
||
self.demoBalance_Qled.setText(f"{0:.9f}")
|
||
|
||
def save_wallet(self):
|
||
""" saving the wallet content to the wallet file """
|
||
if self.wallet is not None and self.fileKey is not None and self.filePath is not None:
|
||
encrypted_data = encrypt_string(self.fileKey, APP_SALT.encode("utf-8"), self.wallet.to_json())
|
||
with LockedFile(self.filePath, "w+b") as file:
|
||
file.seek(0)
|
||
file.truncate()
|
||
file.write(encrypted_data.encode("utf-8"))
|
||
|
||
def _init_line1(self):
|
||
self.walletName_Qled.setAlignment(get_qt_align_center())
|
||
self._saveWallet_Qpbtn.setIcon(QIcon(":icoSave"))
|
||
self._saveWallet_Qpbtn.setMaximumWidth(50)
|
||
self._saveWallet_Qpbtn.clicked.connect(self.save_wallet)
|
||
self._main_Qglo.addWidget(self._walletName_Qlbl, 0, 0, 1, 2)
|
||
self._main_Qglo.addWidget(self.walletName_Qled, 0, 2, 1, 9)
|
||
self._main_Qglo.addWidget(self._saveWallet_Qpbtn, 0, 11, 1, 1)
|
||
|
||
def _init_line2(self):
|
||
self.walletPubKey_Qled.setAlignment(get_qt_align_center())
|
||
self.walletPubKey_Qled.setText("XXXXXXXXXXXX")
|
||
self.walletPubKey_Qled.setReadOnly(True)
|
||
self._copyToClipboard_Qpbtn.setIcon(QIcon(":icoCopy"))
|
||
self._copyToClipboard_Qpbtn.setMaximumWidth(50)
|
||
self._copyToClipboard_Qpbtn.clicked.connect(self.copy_wallet_priv_key_to_clipboard)
|
||
self._main_Qglo.addWidget(self._walletPubKey_Qlbl, 1, 0, 1, 2)
|
||
self._main_Qglo.addWidget(self.walletPubKey_Qled, 1, 2, 1, 9)
|
||
self._main_Qglo.addWidget(self._copyToClipboard_Qpbtn, 1, 11, 1, 1)
|
||
|
||
def _init_line3(self):
|
||
self.realBalance_Qled.setAlignment(get_qt_align_vchr())
|
||
self.realBalance_Qled.setText("10.000000000")
|
||
self.realBalance_Qled.setReadOnly(True)
|
||
self.demoBalance_Qled.setAlignment(get_qt_align_vchr())
|
||
self.demoBalance_Qled.setText("10.000000000")
|
||
self.demoBalance_Qled.setReadOnly(True)
|
||
self._main_Qglo.addWidget(self._realBalance_Qlbl, 2, 0, 1, 2)
|
||
self._main_Qglo.addWidget(self.realBalance_Qled, 2, 2, 1, 3)
|
||
self._main_Qglo.addWidget(self._demoBalance_Qlbl, 2, 7, 1, 2)
|
||
self._main_Qglo.addWidget(self.demoBalance_Qled, 2, 9, 1, 3)
|
||
|
||
def _init_line4(self):
|
||
self._tokens_Qlbl.setAlignment(get_qt_align_center())
|
||
self._main_Qglo.addWidget(self._tokens_Qlbl, 3, 0, 1, 12)
|
||
|
||
def _init_tokens_qtbvmb(self):
|
||
self._tokens_qtbvmb.setModel(self._tokens_wtQtm)
|
||
# entire line selection
|
||
self._tokens_qtbvmb.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
|
||
# disable editing attempt
|
||
self._tokens_qtbvmb.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers)
|
||
# alternate colors
|
||
self._tokens_qtbvmb.setAlternatingRowColors(True)
|
||
# fill the background
|
||
self._tokens_qtbvmb.setAutoFillBackground(True)
|
||
# enable sorting
|
||
self._tokens_qtbvmb.setSortingEnabled(True)
|
||
# configuring column sizes
|
||
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) # Première colonne
|
||
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents) # Deuxième colonne
|
||
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeMode.Interactive) # Troisième colonne
|
||
self._tokens_qtbvmb.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch) # Quatrième colonne
|
||
self._tokens_qtbvmb.setTextElideMode(Qt.TextElideMode.ElideNone)
|
||
self._tokens_qtbvmb.setWordWrap(False)
|
||
self._main_Qglo.addWidget(self._tokens_qtbvmb, 4, 0, 1, 12)
|
||
|
||
def update_display(self):
|
||
""" update the display of the widget """
|
||
global is_rtl
|
||
self.setLayoutDirection(get_qt_dir())
|
||
self._walletName_Qlbl.setText(self.tr("Wallet Name:"))
|
||
self._walletName_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
|
||
self._walletPubKey_Qlbl.setText(self.tr("Wallet PubKey:"))
|
||
self._walletPubKey_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
|
||
self._realBalance_Qlbl.setText(self.tr("Balance Real:"))
|
||
self._realBalance_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
|
||
self._demoBalance_Qlbl.setText(self.tr("Balance Demo:"))
|
||
self._demoBalance_Qlbl.setAlignment(get_qt_align_vchr(is_rtl))
|
||
self._tokens_Qlbl.setText(self.tr("My Tokens:"))
|
||
self._tokens_wtQtm.set_headers([self.tr("Token Id"), self.tr("Buy Price"), self.tr("Buy Time"), self.tr("Action")])
|
||
|
||
def populate_table(self):
|
||
""" populate the tableview...
|
||
TODO: muste change the model etc...
|
||
"""
|
||
data = []
|
||
for x in range(10):
|
||
token_id = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k = 3))
|
||
buy_price = f"{random.uniform(0.000000001, 1.000000000):.9f}"
|
||
buy_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||
action = self.tr("Sell") # Placeholder for button-like behavior
|
||
token = {
|
||
"token_id": token_id, # token_id
|
||
"buy_price": buy_price, # Niveau du log
|
||
"buy_time": buy_time, # Message
|
||
"action": action # Message
|
||
}
|
||
data.append(token)
|
||
self._tokens_wtQtm.set_data(data)
|
||
|
||
def copy_wallet_priv_key_to_clipboard(self):
|
||
""" Copier le texte de QLineEdit dans le presse-papiers. """
|
||
clipboard = QApplication.clipboard()
|
||
clipboard.setText(self.walletPubKey_Qled.text())
|
||
logger.info(self.tr("walletId copied to clipboard"))
|
||
|
||
|
||
class LogHandler:
|
||
""" Gestionnaire personnalisé pour diriger les logs de loguru vers un QTableView. """
|
||
def __init__(self, emitter: DictSignal):
|
||
self._logEmitter = emitter
|
||
|
||
def __call__(self, message: str):
|
||
""" Appelé par loguru pour chaque message de log. """
|
||
parsed_log = self._parse_log_message(message)
|
||
if parsed_log:
|
||
self._logEmitter.signal(parsed_log)
|
||
|
||
@staticmethod
|
||
def _parse_log_message(message: str):
|
||
""" Parse un message de log loguru au format attendu (timestamp, LEVEL, message). """
|
||
parts = message.split(" -|- ", 2) # On attend le format "{time} -|- {level} -|- {message}"
|
||
if len(parts) == 3:
|
||
return {
|
||
"timestamp": parts[0], # Timestamp
|
||
"level": parts[1], # Niveau du log
|
||
"message": parts[2] # Message
|
||
}
|
||
return None
|
||
|
||
|
||
class LogsTableModel(QAbstractTableModel):
|
||
""" Model to manage the display of logs in a QTableView. """
|
||
def __init__(self, display_lines: int = 100):
|
||
super().__init__()
|
||
self._dataList = [] # logs storage
|
||
self._maxLines = display_lines # max lines display
|
||
self._headers = [self.tr("Timestamp"), self.tr("Level"), self.tr("Message")]
|
||
self._sortColumn = 0
|
||
self._sortOrder = Qt.SortOrder.AscendingOrder
|
||
self.sort(self._sortColumn, self._sortOrder)
|
||
|
||
def rowCount(self, parent = QModelIndex()):
|
||
""" return the number of rows in the table """
|
||
return len(self._dataList)
|
||
|
||
def columnCount(self, parent = QModelIndex()):
|
||
""" return the number of columns in the table """
|
||
return len(self._headers)
|
||
|
||
def headerData(self, section: int, orientation: Qt.Orientation, role: Qt.ItemDataRole = Qt.ItemDataRole.DisplayRole) -> Any:
|
||
""" Définit les en-têtes des colonnes. """
|
||
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
|
||
return self._headers[section]
|
||
|
||
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole) -> Any:
|
||
""" return the data to display in the QTableView """
|
||
if not index.isValid():
|
||
return None
|
||
elif role == Qt.ItemDataRole.DisplayRole:
|
||
log = self._dataList[index.row()]
|
||
if index.column() == 0: # Timestamp
|
||
return log["timestamp"]
|
||
elif index.column() == 1: # Level
|
||
return log["level"]
|
||
elif index.column() == 2: # Message
|
||
return log["message"]
|
||
elif role == Qt.ItemDataRole.BackgroundRole and index.column() == 1: # Level
|
||
log = self._dataList[index.row()]
|
||
if log["level"] == 'DEBUG':
|
||
return QColor(Qt.GlobalColor.cyan)
|
||
elif log["level"] == 'INFO':
|
||
return QColor(Qt.GlobalColor.white)
|
||
elif log["level"] == 'SUCCESS':
|
||
return QColor(Qt.GlobalColor.green)
|
||
elif log["level"] == 'WARNING':
|
||
return QColor(Qt.GlobalColor.yellow)
|
||
elif log["level"] == 'ERROR':
|
||
return QColor(Qt.GlobalColor.red)
|
||
elif log["level"] == 'CRITICAL':
|
||
return QColor(Qt.GlobalColor.darkRed)
|
||
elif role == Qt.ItemDataRole.ForegroundRole and index.column() == 1: # Level
|
||
log = self._dataList[index.row()]
|
||
if log["level"] == 'DEBUG':
|
||
return QColor(Qt.GlobalColor.darkGray)
|
||
elif log["level"] == 'INFO':
|
||
return QColor(Qt.GlobalColor.black)
|
||
elif log["level"] == 'SUCCESS':
|
||
return QColor(Qt.GlobalColor.darkGray)
|
||
elif log["level"] == 'WARNING':
|
||
return QColor(Qt.GlobalColor.lightGray)
|
||
elif log["level"] == 'ERROR':
|
||
return QColor(Qt.GlobalColor.darkGray)
|
||
elif log["level"] == 'CRITICAL':
|
||
return QColor(Qt.GlobalColor.lightGray)
|
||
elif role == Qt.ItemDataRole.FontRole:
|
||
if index.column() == 1: # Level
|
||
# log = self._dataList[index.row()]
|
||
font = QFont("Helvetica")
|
||
font.setBold(True)
|
||
return font
|
||
return None
|
||
elif role == Qt.ItemDataRole.TextAlignmentRole:
|
||
global is_rtl
|
||
if index.column() == 0:
|
||
return get_qt_align_vchr()
|
||
elif index.column() == 1:
|
||
return get_qt_align_center()
|
||
elif index.column() == 2:
|
||
return get_qt_align_vchl()
|
||
return None
|
||
|
||
def sort(self, column: int, order: Qt.SortOrder = Qt.SortOrder.DescendingOrder):
|
||
""" sort the datas """
|
||
self._sortColumn = column
|
||
self._sortOrder = order
|
||
# Sorting logic
|
||
if column == 0: # Sorting by "timestamp"
|
||
self._dataList.sort(key = lambda log: log["timestamp"], reverse = (order == Qt.SortOrder.AscendingOrder))
|
||
elif column == 1: # Sorting by "level"
|
||
self._dataList.sort(key = lambda log: log["level"], reverse = (order == Qt.SortOrder.AscendingOrder))
|
||
elif column == 2: # Sorting by "message"
|
||
self._dataList.sort(key = lambda log: log["message"], reverse = (order == Qt.SortOrder.AscendingOrder))
|
||
# Notify view that data has changed after sorting
|
||
self.layoutChanged.emit()
|
||
|
||
def set_headers(self, new_headers: list):
|
||
""" used to update the header column content (on lang change) """
|
||
self._headers = new_headers
|
||
self.headerDataChanged.emit(Qt.Orientation.Horizontal, 0, len(self._headers) - 1)
|
||
self.layoutChanged.emit()
|
||
|
||
def add_data(self, log: dict):
|
||
""" Adds a log to the model while respecting the maximum row limit. """
|
||
self._dataList.append(log)
|
||
while len(self._dataList) > self._maxLines:
|
||
self._dataList.pop(0) # Supprime les anciennes lignes
|
||
self.sort(self._sortColumn, self._sortOrder)
|
||
self.layoutChanged.emit()
|
||
|
||
def set_data(self, data: list):
|
||
""" change the data of the table model """
|
||
self._dataList = data
|
||
self.sort(self._sortColumn, self._sortOrder)
|
||
self.layoutChanged.emit()
|
||
|
||
|
||
class LogViewerWidget(QWidget):
|
||
""" Main widget containing a QTableView to display logs. """
|
||
def __init__(self, max_display_lines: int = 100, parent = None):
|
||
super().__init__(parent)
|
||
self._main_Qvlo = QVBoxLayout()
|
||
self.setLayout(self._main_Qvlo)
|
||
self._main_Qvlo.setContentsMargins(0, 0, 0, 0)
|
||
self._logs_Qtvmb = QTableView(self)
|
||
self._main_Qvlo.addWidget(self._logs_Qtvmb)
|
||
self._logs_lQtm = LogsTableModel(max_display_lines)
|
||
self._logs_Qtvmb.setModel(self._logs_lQtm)
|
||
self._logs_Qtvmb.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
|
||
self._logs_Qtvmb.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers) # Lecture seule
|
||
self._logs_Qtvmb.setAlternatingRowColors(True)
|
||
self._logs_Qtvmb.setAutoFillBackground(True)
|
||
self._logs_Qtvmb.setSortingEnabled(True)
|
||
self._logs_Qtvmb.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) # Première colonne
|
||
self._logs_Qtvmb.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents) # Deuxième colonne
|
||
self._logs_Qtvmb.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeMode.Stretch) # Troisième colonne
|
||
self._logs_Qtvmb.setTextElideMode(Qt.TextElideMode.ElideNone)
|
||
self._logs_Qtvmb.setWordWrap(False)
|
||
self._logEmitter = DictSignal()
|
||
self._logEmitter.conn(self._add_log)
|
||
log_handler = LogHandler(self._logEmitter)
|
||
logger.add(log_handler, format = "{time:YYYY-MM-DD HH:mm:ss.SSS} -|- {level} -|- {message}", level = "INFO")
|
||
self.update_display()
|
||
|
||
@Slot(dict)
|
||
def _add_log(self, log: dict):
|
||
""" Ajoute un log au modèle et effectue le défilement automatique. """
|
||
self._logs_lQtm.add_data(log)
|
||
|
||
def update_display(self):
|
||
""" call on lang change """
|
||
self._logs_lQtm.set_headers([self.tr("Timestamp"), self.tr("Level"), self.tr("Message")])
|
||
self.setLayoutDirection(get_qt_dir())
|
||
|
||
|
||
class MainWindow(QMainWindow):
|
||
""" Main Window class """
|
||
def __init__(self, appli: QApplication, trans: QTranslator, cfg: dict = None):
|
||
super().__init__()
|
||
self.app = appli
|
||
self.config = cfg if cfg is not None else DEFAULT_YAML_CONFIG
|
||
self.translator = trans
|
||
self.currentLang = self.config['defaultLang']
|
||
self.setWindowIcon(QIcon(":imgIcon"))
|
||
self.setWindowTitle(APP_NAME_VERSION)
|
||
self.setMinimumSize(1024, 768)
|
||
self.resize(max(self.config.get('lastWidth'), 1024), max(self.config.get('lastHeight'), 768))
|
||
self._mustCenter = True
|
||
self._connected = False
|
||
# Main Menubar
|
||
self.main_Qmnub = self.menuBar()
|
||
# File Menu
|
||
self.file_Qmnu = QMenu()
|
||
self.newWallet_Qact = QAction()
|
||
self.newWallet_Qact.setIcon(QIcon(":icoWalletNew"))
|
||
self.openWallet_Qact = QAction()
|
||
self.openWallet_Qact.setIcon(QIcon(":icoWalletOpen"))
|
||
self.filesHistory_Qmnu = QMenu()
|
||
self.filesHistory_Qmnu.setIcon(QIcon(":icoLastFiles"))
|
||
self.noFileHistory_Qact = QAction()
|
||
self.noFileHistory_Qact.setIcon(QIcon(":icoNoHistory"))
|
||
self.noFileHistory_Qact.setDisabled(True)
|
||
self.conn_disc_Qact = QAction()
|
||
self.conn_disc_Qact.setIcon(QIcon(":icoConnect"))
|
||
self.hideApp_Qact = QAction()
|
||
self.hideApp_Qact.setIcon(QIcon(":icoTrayHide"))
|
||
self.closeApp_Qact = QAction()
|
||
self.closeApp_Qact.setIcon(QIcon(":icoQuit"))
|
||
self.forceCloseApp_Qact = QAction()
|
||
self.forceCloseApp_Qact.setIcon(QIcon(":icoForceQuit"))
|
||
# finalize initialisation of File Menu
|
||
self._init_file_qmnu()
|
||
# Tools Menu
|
||
self.tools_Qmnu = QMenu()
|
||
self.checkAddress_Qact = QAction()
|
||
self.checkAddress_Qact.setIcon(QIcon(":icoCheckAddress"))
|
||
self.solana_addr_check = False
|
||
# finalize initialisation of File Menu
|
||
self._init_tools_qmnu()
|
||
# Langs Menu
|
||
self.langs_Qmnu = QMenu()
|
||
self.changeLangAr_Qact = QAction(QIcon(":icoLang"), "العربية")
|
||
self.changeLangDe_Qact = QAction(QIcon(":icoLang"), "Deutsch")
|
||
self.changeLangEn_Qact = QAction(QIcon(":icoLang"), "English")
|
||
self.changeLangEs_Qact = QAction(QIcon(":icoLang"), "Español")
|
||
self.changeLangFr_Qact = QAction(QIcon(":icoLang"), "Français")
|
||
self.changeLangHe_Qact = QAction(QIcon(":icoLang"), "עברית")
|
||
self.changeLangHi_Qact = QAction(QIcon(":icoLang"), "हिन्दी")
|
||
self.changeLangIt_Qact = QAction(QIcon(":icoLang"), "Italiano")
|
||
self.changeLangJa_Qact = QAction(QIcon(":icoLang"), "日本語")
|
||
self.changeLangKo_Qact = QAction(QIcon(":icoLang"), "한국어")
|
||
self.changeLangPt_Qact = QAction(QIcon(":icoLang"), "Português")
|
||
self.changeLangRu_Qact = QAction(QIcon(":icoLang"), "Русский")
|
||
self.changeLangZh_Qact = QAction(QIcon(":icoLang"), "中文")
|
||
# finalize initialisation of Langs Menu
|
||
self._init_langs_qmnu()
|
||
# Help Menu
|
||
self.help_Qmnu = QMenu()
|
||
self.help_Qact = QAction()
|
||
self.help_Qact.setIcon(QIcon(":icoHelp"))
|
||
self.about_Qact = QAction()
|
||
self.about_Qact.setIcon(QIcon(":icoAbout"))
|
||
self._init_help_qmnu()
|
||
# set texts and titles for Main Menubar
|
||
# Main Toolbar
|
||
self.main_Qtb = QToolBar()
|
||
self._init_main_toolbar()
|
||
# Initialisation de l'icône du System Tray
|
||
self.tray_Qsti = QSystemTrayIcon(QIcon(":imgIcon"))
|
||
# self.trayIcon.setToolTip(f"{APP_FULL_NAME} ({APP_VERSION})") # don't work on linux ???
|
||
# Menu du trayIcon
|
||
self.tray_Qmnu = QMenu()
|
||
self.showApp_Qact = QAction()
|
||
self._init_tray_qmnu()
|
||
# Central widget: QTabWidget
|
||
self.main_Qtabw = QTabWidget()
|
||
self.setCentralWidget(self.main_Qtabw)
|
||
self.token_qw = QWidget()
|
||
self.main_Qtabw.addTab(self.token_qw, QIcon(":icoTokens"), self.tr("Tokens:"))
|
||
self.token_qw.setLayout(QVBoxLayout())
|
||
self.token_qw.layout().addWidget(QTableView())
|
||
self.wallet_Qw = WalletWidget()
|
||
self.main_Qtabw.addTab(self.wallet_Qw, QIcon(":icoWallet"), self.tr("Wallet: {walletName}").format(walletName = ""))
|
||
self.logViewer_Qw = LogViewerWidget(1000)
|
||
self.main_Qtabw.addTab(self.logViewer_Qw, QIcon(":icoLogs"), self.tr("Logs:"))
|
||
self.nbrWsClients = 2
|
||
self.nbrAsyncHttpPostClients = 4
|
||
self.nbrHttpPostClients = 4
|
||
self.connPool = ConnectionPool(URL_Ws_MainNet, URL_Http_MainNet, self.nbrWsClients, self.nbrAsyncHttpPostClients, self.nbrHttpPostClients)
|
||
# declaration des pixmaps pour le statusbar
|
||
self.wsDisconnected_Qpxm = QPixmap(":icoDisconnected")
|
||
self.wsConnected_Qpxm = QPixmap(":icoConnected")
|
||
self.httpNone_Qpxm = QPixmap(":icoHttpNone")
|
||
self.httpSend_Qpxm = QPixmap(":icoHttpSend")
|
||
self.httpWait_Qpxm = QPixmap(":icoHttpWait")
|
||
self.httpReceive_Qpxm = QPixmap(":icoHttpReceive")
|
||
self.httpError_Qpxm = QPixmap(":icoHttpError")
|
||
self.reachabilityDisconnected_Qpxm = QPixmap(":icoReachabilityDisconnected")
|
||
self.reachabilityLocal_Qpxm = QPixmap(":icoReachabilityLocal")
|
||
self.reachabilityOnline_Qpxm = QPixmap(":icoReachabilityOnline")
|
||
self.reachabilitySite_Qpxm = QPixmap(":icoReachabilitySite")
|
||
self.reachabilityUnknown_Qpxm = QPixmap(":icoReachabilityUnknown")
|
||
self.main_Qstb = self.statusBar()
|
||
self.setStatusBar(self.main_Qstb)
|
||
self.wsCnxImgs = []
|
||
self.httpMsgImgs = []
|
||
self.netReachabilityImg = QLabel()
|
||
try:
|
||
QNetworkInformation.loadBackendByFeatures(QNetworkInformation.Feature.Reachability)
|
||
self._network_info = QNetworkInformation.instance()
|
||
if self._network_info:
|
||
self._network_info.reachabilityChanged.connect(self.on_reachability_changed)
|
||
self.check_connectivity()
|
||
except Exception as e:
|
||
self.netReachabilityImg.setPixmap(self.reachabilityUnknown_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.netReachabilityImg.setToolTip(self.tr("Unknown"))
|
||
logger.error(self.tr(f"Error while loading QNetworkInformation: {str(e)}"))
|
||
self.main_Qstb.addWidget(self.netReachabilityImg)
|
||
self.main_Qstb.addWidget(self._get_vertical_separator())
|
||
self.wsMainCnxImg = QLabel()
|
||
self.wsMainCnxImg.setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.main_Qstb.addWidget(self.wsMainCnxImg)
|
||
self.connPool.wsMainConnexionStatusSig.conn(self._update_ws_main_connection_status)
|
||
self.main_Qstb.addWidget(self._get_vertical_separator())
|
||
for idx in range(self.nbrWsClients):
|
||
lbl = QLabel()
|
||
lbl.setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.wsCnxImgs.append(lbl)
|
||
self.main_Qstb.addWidget(lbl)
|
||
# Connecter le signal
|
||
self.connPool.wsConnexionStatusSigs[idx].conn(lambda state, cid = idx: self._update_ws_connection_status(cid, state))
|
||
self.main_Qstb.addWidget(self._get_vertical_separator())
|
||
for idx in range(self.nbrHttpPostClients):
|
||
lbl = QLabel()
|
||
lbl.setPixmap(self.httpNone_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.httpMsgImgs.append(lbl)
|
||
self.main_Qstb.addWidget(lbl)
|
||
# Connecter le signal
|
||
self.connPool.httpPostSentSigs[idx].conn(lambda cid = idx: self.update_http_pxm_sent(cid))
|
||
self.connPool.httpReplyReceivedSigs[idx].conn(lambda cid = idx: self.update_http_pxm_received(cid))
|
||
|
||
self.main_Qstb.addPermanentWidget(self._get_vertical_separator())
|
||
|
||
self.statusMsg_Qled = QLineEdit()
|
||
self.statusMsg_Qled.setReadOnly(True)
|
||
self.statusMsg_Qled.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
|
||
# self.statusMsg_Qled.setFrame(False)
|
||
self.statusMsg_Qled.setStyleSheet("QLineEdit {border: none; background: transparent;}")
|
||
self.statusMsg_Qled.setText(self.tr("Ready"))
|
||
self.main_Qstb.addPermanentWidget(self.statusMsg_Qled)
|
||
|
||
self.connPool.wsMainReplyReceivedSig.conn(self.read_ws_main_sol_msg)
|
||
self.connPool.wsReplyReceivedSig.conn(self.read_ws_sol_msg)
|
||
self.connPool.httpReplyReceivedSig.conn(self.read_http_sol_msg)
|
||
self.connPool.logSig.conn(self.show_pool_logs)
|
||
|
||
self.update_display()
|
||
logger.success(self.tr("Init of {appName}").format(appName = APP_ABOUT_NAME))
|
||
|
||
def on_reachability_changed(self, reachability):
|
||
self.check_connectivity()
|
||
|
||
def check_connectivity(self):
|
||
if self._network_info:
|
||
reachability = self._network_info.reachability()
|
||
if reachability == QNetworkInformation.Reachability.Online:
|
||
self.netReachabilityImg.setPixmap(self.reachabilityOnline_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.netReachabilityImg.setToolTip(self.tr("Online"))
|
||
elif reachability == QNetworkInformation.Reachability.Local:
|
||
self.netReachabilityImg.setPixmap(self.reachabilityLocal_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.netReachabilityImg.setToolTip(self.tr("Local"))
|
||
elif reachability == QNetworkInformation.Reachability.Site:
|
||
self.netReachabilityImg.setPixmap(self.reachabilitySite_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.netReachabilityImg.setToolTip(self.tr("Site"))
|
||
elif reachability == QNetworkInformation.Reachability.Disconnected:
|
||
self.netReachabilityImg.setPixmap(self.reachabilityDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.netReachabilityImg.setToolTip(self.tr("Disconnected"))
|
||
elif reachability == QNetworkInformation.Reachability.Unknown:
|
||
self.netReachabilityImg.setPixmap(self.reachabilityUnknown_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.netReachabilityImg.setToolTip(self.tr("Unknown"))
|
||
else:
|
||
self.netReachabilityImg.setPixmap(self.reachabilityUnknown_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.netReachabilityImg.setToolTip(self.tr("Unknown"))
|
||
|
||
def show_pool_logs(self, msg: str):
|
||
self.statusMsg_Qled.setText(self.tr(msg))
|
||
if self.isHidden() or self.isMinimized():
|
||
self.tray_Qsti.showMessage(APP_NAME, self.tr(msg), QIcon(":imgFavicon"), 2500)
|
||
|
||
def read_ws_main_sol_msg(self):
|
||
""" Lis le dernier message """
|
||
(ts, msg) = self.connPool.read_first_ws_main_msg()
|
||
parsed = parse_websocket_message(msg)
|
||
for item in parsed:
|
||
if isinstance(item, SubscriptionError):
|
||
logger.error(item.error)
|
||
if isinstance(item, SubscriptionResult):
|
||
logger.info(item.to_json())
|
||
if isinstance(item, LogsNotification) and item.result.value.err is None:
|
||
logs = set(item.result.value.logs)
|
||
search = "initialize2"
|
||
found = False
|
||
for message in logs:
|
||
if search in message:
|
||
found = True
|
||
if found:
|
||
signature = item.result.value.signature
|
||
logger.info(f"See, <a href=\"https://solscan.io/tx/{signature}\">https://solscan.io/tx/{signature}</a>")
|
||
break
|
||
|
||
def read_ws_sol_msg(self):
|
||
""" Lis le dernier message """
|
||
(ts, msg) = self.connPool.read_first_ws_msg()
|
||
parsed = parse_websocket_message(msg)
|
||
for item in parsed:
|
||
if isinstance(item, SubscriptionError):
|
||
logger.error(item.error)
|
||
if isinstance(item, SubscriptionResult):
|
||
logger.info(item.to_json())
|
||
if isinstance(item, LogsNotification) and item.result.value.err is None:
|
||
logs = set(item.result.value.logs)
|
||
search = "initialize2"
|
||
found = False
|
||
for message in logs:
|
||
if search in message:
|
||
found = True
|
||
if found:
|
||
signature = item.result.value.signature
|
||
logger.info(f"See, <a href=\"https://solscan.io/tx/{signature}\">https://solscan.io/tx/{signature}</a>")
|
||
break
|
||
|
||
def read_http_sol_msg(self):
|
||
""" Lis le dernier message """
|
||
(ts, msg) = self.connPool.read_first_http_msg()
|
||
parsed = parse_websocket_message(msg)
|
||
for item in parsed:
|
||
if isinstance(item, SubscriptionError):
|
||
logger.error(item.error)
|
||
if isinstance(item, SubscriptionResult):
|
||
logger.info(item.to_json())
|
||
if isinstance(item, LogsNotification) and item.result.value.err is None:
|
||
logs = set(item.result.value.logs)
|
||
search = "initialize2"
|
||
found = False
|
||
for message in logs:
|
||
if search in message:
|
||
found = True
|
||
if found:
|
||
signature = item.result.value.signature
|
||
logger.info(f"See, <a href=\"https://solscan.io/tx/{signature}\">https://solscan.io/tx/{signature}</a>")
|
||
break
|
||
|
||
def _update_ws_main_connection_status(self, state: QAbstractSocket.SocketState):
|
||
""" Met à jour l'image d'un QLabel WebSocket en fonction de son état. """
|
||
if state == QAbstractSocket.SocketState.ConnectedState:
|
||
self.wsMainCnxImg.setPixmap(self.wsConnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self._connected = True
|
||
self.conn_disc_Qact.setIcon(QIcon(":icoDisconnect"))
|
||
self.conn_disc_Qact.setText(self.tr("Disconnect"))
|
||
self.conn_disc_Qact.setDisabled(False)
|
||
(req_id, req_cmd, req) = WsReqGen.logs_subscribe(RpcTransactionLogsFilterMentions(PK_RaydiumLPV4), CommitmentLevel.Finalized)
|
||
self.connPool.send_websocket_main_request(req_cmd, req.to_json())
|
||
logger.debug(f"{req.to_json()} sent")
|
||
elif state == QAbstractSocket.SocketState.UnconnectedState:
|
||
self.wsMainCnxImg.setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self._connected = False
|
||
self.conn_disc_Qact.setIcon(QIcon(":icoConnect"))
|
||
self.conn_disc_Qact.setText(self.tr("Connect"))
|
||
self.conn_disc_Qact.setDisabled(False)
|
||
SharedCounter.reset_id()
|
||
|
||
def _update_ws_connection_status(self, index: int, state: QAbstractSocket.SocketState):
|
||
""" Met à jour l'image d'un QLabel WebSocket en fonction de son état. """
|
||
if state == QAbstractSocket.SocketState.ConnectedState:
|
||
self.wsCnxImgs[index].setPixmap(self.wsConnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
elif state == QAbstractSocket.SocketState.UnconnectedState:
|
||
self.wsCnxImgs[index].setPixmap(self.wsDisconnected_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
|
||
def update_http_pxm_sent(self, index: int):
|
||
""" Met à jour l'image d'un QLabel HTTP en état 'Envoyé'. """
|
||
self.httpMsgImgs[index].setPixmap(self.httpSend_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
QTimer.singleShot(200, lambda: self.httpMsgImgs[index].setPixmap(self.httpWait_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
|
||
|
||
def update_http_pxm_received(self, index: int):
|
||
""" Met à jour l'image d'un QLabel HTTP en état 'Reçu'. """
|
||
self.httpMsgImgs[index].setPixmap(self.httpReceive_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio))
|
||
QTimer.singleShot(200, lambda: self.httpMsgImgs[index].setPixmap(self.httpNone_Qpxm.scaled(16, 16, Qt.AspectRatioMode.KeepAspectRatio)))
|
||
|
||
def _get_vertical_separator(self) -> QFrame:
|
||
""" Create and return vertical separator."""
|
||
sep = QFrame(self)
|
||
sep.setFrameShape(QFrame.Shape.VLine)
|
||
sep.setFrameShadow(QFrame.Shadow.Sunken)
|
||
return sep
|
||
|
||
def first_load(self):
|
||
""" called the first time from main to load or create wallet """
|
||
last_open_is_maximized = self.config.get('lastMaximized', True)
|
||
if last_open_is_maximized:
|
||
self.showMaximized()
|
||
else:
|
||
self.show()
|
||
last_file = self.config["lastFile"]
|
||
if last_file is None or last_file == "":
|
||
self._init_new_or_load_wallet()
|
||
elif not os.path.exists(last_file):
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' does not exists. Please choose a new file.").format(fileName = last_file), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
self._init_new_or_load_wallet()
|
||
else:
|
||
self._ask_load_wallet_pass(last_file, True)
|
||
|
||
def _quit_app_func(self, force: bool = False):
|
||
""" called to Quit the application """
|
||
if force:
|
||
msg_qmbox = SimpleMsgBox(self.tr("Do you really want to Quit?"), self.tr("If you accept, nothing will be saved\nand the application will be closed."), QMessageBox.Icon.Critical, self)
|
||
else:
|
||
msg_qmbox = SimpleMsgBox(self.tr("Do you really want to Quit?"), self.tr("If you accept, all the files will be saved\nand the application will be closed."), QMessageBox.Icon.Question, self)
|
||
yes_qpbtn = msg_qmbox.addButton(self.tr("Yes"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.addButton(self.tr("No"), QMessageBox.ButtonRole.ActionRole) # no_qpbtn = msg_qmbox.addButton(self.tr("No"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
if msg_qmbox.clickedButton() == yes_qpbtn:
|
||
if not force:
|
||
self.config['lastMaximized'] = self.isMaximized()
|
||
# if self.isMaximized():
|
||
# self.showNormal()
|
||
# TODO: save size of non maximized window
|
||
self.config['lastHeight'] = self.height()
|
||
self.config['lastWidth'] = self.width()
|
||
# saving self.currentLang in config memory
|
||
self.config['defaultLang'] = self.currentLang
|
||
# save memory config to file
|
||
save_yaml_app_config(self.config)
|
||
self.wallet_Qw.save_wallet()
|
||
self.logViewer_Qw.close()
|
||
self.connPool.clean_up()
|
||
self.connPool = None
|
||
self.tray_Qsti.deleteLater()
|
||
logger.debug(self.tr("Closing App"))
|
||
# close instance
|
||
self.app.quit()
|
||
|
||
def _change_lang(self, lang_code, lang_name):
|
||
if self.translator.load(f":trans_{lang_code}"):
|
||
self.app.installTranslator(self.translator)
|
||
self.currentLang = lang_code
|
||
set_app_dir(self.currentLang)
|
||
logger.info(self.tr("Switching to language {lang}.").format(lang = lang_name))
|
||
self.update_display()
|
||
|
||
def _init_tray_qmnu(self):
|
||
self.tray_Qsti.setContextMenu(self.tray_Qmnu)
|
||
self.tray_Qmnu.addAction(self.hideApp_Qact)
|
||
self.showApp_Qact.setIcon(QIcon(":icoTrayShow"))
|
||
self.showApp_Qact.setDisabled(True)
|
||
self.showApp_Qact.triggered.connect(self.show)
|
||
self.tray_Qmnu.addAction(self.showApp_Qact)
|
||
self.tray_Qmnu.addAction(self.closeApp_Qact)
|
||
self.tray_Qmnu.addAction(self.forceCloseApp_Qact)
|
||
self.tray_Qsti.show()
|
||
|
||
def _show_about_window(self):
|
||
""" Show About Dialog. """
|
||
bg_txt = f"<h3>{APP_FULL_NAME}: {APP_VERSION}<br>PYTHON: {platform.python_version()}<br>Qt: {pyside6version}<br>OS: {platform.system()}</h3>"
|
||
about_window = AboutWindow(bg_txt, self)
|
||
about_window.show()
|
||
|
||
def _show_help_window(self):
|
||
""" Show Help Dialog. """
|
||
help_window = HelpWindow(":pdfHelpFr", self)
|
||
help_window.show()
|
||
|
||
def _init_main_toolbar(self):
|
||
""" Initialize the main ToolBar """
|
||
self.addToolBar(self.main_Qtb)
|
||
self.main_Qtb.setMovable(False)
|
||
favicon_toolbar_qlbl = QLabel()
|
||
favicon_toolbar_qlbl.setPixmap(QPixmap(":imgFavicon").scaled(24, 24, Qt.AspectRatioMode.KeepAspectRatio))
|
||
self.main_Qtb.addWidget(favicon_toolbar_qlbl)
|
||
self.main_Qtb.addSeparator()
|
||
self.main_Qtb.addAction(self.openWallet_Qact)
|
||
self.main_Qtb.addAction(self.newWallet_Qact)
|
||
self.main_Qtb.addSeparator()
|
||
self.main_Qtb.addAction(self.conn_disc_Qact)
|
||
self.main_Qtb.addSeparator()
|
||
# self.toolbarPcp.addAction(self.actionOpenWsMainNetConnexion)
|
||
|
||
def _init_new_or_load_wallet(self):
|
||
msg_qmbox = SimpleMsgBox(self.tr("Action Required"), self.tr("To use the application, you have to load a wallet or to create a new one."), QMessageBox.Icon.Critical, self)
|
||
load_qpbtn = msg_qmbox.addButton(self.tr("Load Wallet"), QMessageBox.ButtonRole.ActionRole)
|
||
create_qpbtn = msg_qmbox.addButton(self.tr("New Wallet"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.addButton(self.tr("Quit"), QMessageBox.ButtonRole.RejectRole) # quit_qpbtn = msg_qmbox.addButton(self.tr("No"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
if msg_qmbox.clickedButton() == load_qpbtn:
|
||
create_wallets_folder()
|
||
self._ask_load_wallet_filepath(True)
|
||
elif msg_qmbox.clickedButton() == create_qpbtn:
|
||
create_wallets_folder()
|
||
self._ask_new_wallet_name_and_pass(True)
|
||
else:
|
||
self.close()
|
||
QApplication.instance().quit()
|
||
return
|
||
|
||
def _create_new_wallet(self):
|
||
create_wallets_folder()
|
||
self._ask_new_wallet_name_and_pass()
|
||
|
||
def _ask_new_wallet_name_and_pass(self, return_to_prev: bool = False):
|
||
wallet_dialog = NewWalletNameAndPassDialog(self)
|
||
if wallet_dialog.exec():
|
||
new_wallet_name = wallet_dialog.walletName
|
||
new_wallet_pass = wallet_dialog.walletEncyptPass
|
||
self._ask_new_wallet_filepath(new_wallet_name, new_wallet_pass, return_to_prev)
|
||
elif return_to_prev:
|
||
self._init_new_or_load_wallet()
|
||
|
||
def _ask_new_wallet_filepath(self, new_wallet_name: str, new_wallet_pass: str, return_to_prev: bool = False):
|
||
while True:
|
||
saved_file = False
|
||
# Ouvre le QFileDialog pour choisir un emplacement et un nom de fichier
|
||
dialog = NewWalletFileDialog(self)
|
||
if dialog.exec():
|
||
file_path = dialog.selectedFiles()[0]
|
||
# Ajoute l'extension .kew si elle est manquante
|
||
if not file_path.endswith("." + WALLET_FILE_EXT):
|
||
file_path += "." + WALLET_FILE_EXT
|
||
file_name = os.path.basename(file_path)
|
||
# Vérifie si le fichier existe déjà
|
||
if os.path.exists(file_path):
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' already exists. Please choose a new file.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
continue # Relance le dialogue pour forcer un nouveau fichier
|
||
# Enregistre le contenu dans le nouveau fichier
|
||
try:
|
||
keypair = Keypair()
|
||
wallet = Wallet(wallet_name = new_wallet_name, wallet_pub_key = str(keypair.pubkey()), wallet_priv_key = str(keypair.secret()))
|
||
encrypted_data = encrypt_string(new_wallet_pass, APP_SALT.encode("utf-8"), wallet.to_json())
|
||
with LockedFile(file_path, "w+b") as file:
|
||
file.write(encrypted_data.encode("utf-8"))
|
||
file_path = os.path.realpath(file_path)
|
||
if file_path not in self.config["lastFiles"]:
|
||
self.config["lastFiles"].append(file_path)
|
||
while len(self.config["lastFiles"]) > 5:
|
||
self.config["lastFiles"].pop(0)
|
||
self._refresh_files_history_qmnu()
|
||
self.config["lastFile"] = file_path
|
||
self.wallet_Qw.set_wallet(wallet, new_wallet_pass, file_path)
|
||
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = wallet.walletName))
|
||
saved_file = True
|
||
break # Quitte la boucle après avoir sauvegardé
|
||
except Exception as e:
|
||
logger.error(e)
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("An error occurred while saving the file '{fileName}'.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
break # Quitte la boucle en cas d'erreur
|
||
else:
|
||
break # L'utilisateur a annulé.
|
||
if saved_file:
|
||
return
|
||
if return_to_prev:
|
||
self._ask_new_wallet_name_and_pass(return_to_prev)
|
||
|
||
def _load_wallet(self):
|
||
self._ask_load_wallet_filepath()
|
||
|
||
def _ask_load_wallet_filepath(self, return_to_prev: bool = False):
|
||
loaded_file = False
|
||
file_path = None
|
||
while True:
|
||
dialog = LoadWalletFileDialog(self)
|
||
if dialog.exec():
|
||
file_path = dialog.selectedFiles()[0]
|
||
file_name = os.path.basename(file_path)
|
||
if not os.path.exists(file_path):
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' does not exists. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
elif self.config["lastFile"] is None or file_path != self.config["lastFile"] or return_to_prev:
|
||
loaded_file = True
|
||
else:
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' is already loaded. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Warning, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
continue # Relance le dialogue pour forcer un nouveau fichier
|
||
if loaded_file:
|
||
break
|
||
# Relance le dialogue pour forcer un nouveau fichier
|
||
continue
|
||
else:
|
||
break # L'utilisateur a annulé.
|
||
if loaded_file:
|
||
self._ask_load_wallet_pass(file_path, return_to_prev)
|
||
return
|
||
if return_to_prev:
|
||
self._init_new_or_load_wallet()
|
||
|
||
def _ask_load_wallet_pass(self, file_path: str, return_to_prev: bool = False):
|
||
loaded_file = False
|
||
wallet = None
|
||
wallet_pass = None
|
||
while not loaded_file:
|
||
file_name = os.path.basename(file_path)
|
||
wallet_dialog = LoadWalletPassDialog(file_name, self)
|
||
if wallet_dialog.exec():
|
||
wallet_pass = wallet_dialog.walletEncyptPass
|
||
try:
|
||
with LockedFile(file_path, "rb") as file:
|
||
enc_data = file.read()
|
||
wallet = Wallet.from_json(decrypt_string(wallet_pass, APP_SALT.encode("utf-8"), enc_data.decode()))
|
||
loaded_file = True
|
||
break
|
||
except portalocker.exceptions.LockException as e:
|
||
logger.error(self.tr("LockException for Wallet file {fileName} {exp}").format(fileName = file_path, exp = e))
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The File '{fileName}' seems to be locked. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
loaded_file = False
|
||
break
|
||
except InvalidToken as e:
|
||
logger.error(self.tr("Wrong password for Wallet file {fileName} {exp}").format(fileName = file_path, exp = e))
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The password you provided for file '{fileName}' is incorrect. Please try again.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
loaded_file = False
|
||
continue
|
||
except json.JSONDecodeError as e:
|
||
logger.error(self.tr("Cannot decode json in Wallet file {fileName} {exp}").format(fileName = file_path, exp = e))
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The file '{fileName}' seems to be corrupted. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
loaded_file = False
|
||
break
|
||
except ValueError as e:
|
||
logger.error(self.tr("Wrong values in {fileName} {exp}").format(fileName = file_path, exp = e))
|
||
msg_qmbox = SimpleMsgBox(self.tr("Error"), self.tr("The file '{fileName}' seems to have wrong values. Please choose another file.").format(fileName = file_name), QMessageBox.Icon.Critical, self)
|
||
msg_qmbox.addButton(self.tr("Ok"), QMessageBox.ButtonRole.ActionRole)
|
||
msg_qmbox.exec()
|
||
loaded_file = False
|
||
break
|
||
else:
|
||
break
|
||
if loaded_file:
|
||
file_path = os.path.realpath(file_path)
|
||
if file_path not in self.config["lastFiles"]:
|
||
self.config["lastFiles"].append(file_path)
|
||
while len(self.config["lastFiles"]) > 5:
|
||
self.config["lastFiles"].pop(0)
|
||
self._refresh_files_history_qmnu()
|
||
self.config["lastFile"] = file_path
|
||
self.wallet_Qw.set_wallet(wallet, wallet_pass, file_path)
|
||
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = wallet.walletName))
|
||
return
|
||
if return_to_prev:
|
||
self._ask_load_wallet_filepath(return_to_prev)
|
||
|
||
def _init_file_qmnu(self):
|
||
""" Initialize File Menu. """
|
||
# add File Menu to Main MenuBar
|
||
self.main_Qmnub.addMenu(self.file_Qmnu)
|
||
# add action "New Wallet" to File Menu
|
||
self.file_Qmnu.addAction(self.newWallet_Qact)
|
||
# set action target
|
||
self.newWallet_Qact.triggered.connect(self._create_new_wallet)
|
||
# add action "Open Wallet" to File Menu
|
||
self.file_Qmnu.addAction(self.openWallet_Qact)
|
||
# set action target
|
||
self.openWallet_Qact.triggered.connect(self._load_wallet)
|
||
self.file_Qmnu.addSeparator()
|
||
# add files
|
||
self.file_Qmnu.addMenu(self.filesHistory_Qmnu)
|
||
self._refresh_files_history_qmnu()
|
||
self.file_Qmnu.addSeparator()
|
||
self.file_Qmnu.addAction(self.conn_disc_Qact)
|
||
# set action target
|
||
self.conn_disc_Qact.triggered.connect(self._conn_disc)
|
||
self.file_Qmnu.addSeparator()
|
||
# add action "Hide application" to File Menu
|
||
self.file_Qmnu.addAction(self.hideApp_Qact)
|
||
# set action target
|
||
self.hideApp_Qact.triggered.connect(self.hide)
|
||
# add action "Close application" to File Menu
|
||
self.file_Qmnu.addAction(self.closeApp_Qact)
|
||
# set action target
|
||
self.closeApp_Qact.triggered.connect(lambda: self._quit_app_func(False))
|
||
# add action "Close without saving" to File Menu
|
||
self.file_Qmnu.addAction(self.forceCloseApp_Qact)
|
||
# set action target
|
||
self.forceCloseApp_Qact.triggered.connect(lambda: self._quit_app_func(True))
|
||
|
||
def _init_tools_qmnu(self):
|
||
""" Initialize Tools Menu. """
|
||
# add File Menu to Main MenuBar
|
||
self.main_Qmnub.addMenu(self.tools_Qmnu)
|
||
# add action "New Wallet" to File Menu
|
||
self.tools_Qmnu.addAction(self.checkAddress_Qact)
|
||
# set action target
|
||
self.checkAddress_Qact.triggered.connect(self._check_address)
|
||
# add action "Open Wallet" to File Menu
|
||
self.tools_Qmnu.addAction(self.checkAddress_Qact)
|
||
|
||
def _check_address(self):
|
||
if not self.solana_addr_check:
|
||
self.solana_addr_check = True
|
||
ch_adr_window = SolanaAdressCheckerWindow(self.connPool, self)
|
||
ch_adr_window.show()
|
||
|
||
def _conn_disc(self):
|
||
self.conn_disc_Qact.setDisabled(True)
|
||
if not self._connected:
|
||
self.connPool.open()
|
||
else:
|
||
self.connPool.close()
|
||
|
||
def _refresh_files_history_qmnu(self):
|
||
self.filesHistory_Qmnu.clear()
|
||
for file_path in self.config.get('lastFiles', []):
|
||
if not os.path.exists(file_path):
|
||
self.config['lastFiles'].remove(file_path)
|
||
else:
|
||
file_name = os.path.basename(file_path)
|
||
file_open_qact = QAction(QIcon(":icoWallet"), file_name)
|
||
self.filesHistory_Qmnu.addAction(file_open_qact)
|
||
# TODO: Add trigger connect
|
||
else:
|
||
self.filesHistory_Qmnu.addAction(self.noFileHistory_Qact)
|
||
|
||
def _init_langs_qmnu(self):
|
||
""" Initialize Langs Menu. """
|
||
self.main_Qmnub.addMenu(self.langs_Qmnu)
|
||
# Ar
|
||
self.langs_Qmnu.addAction(self.changeLangAr_Qact)
|
||
self.changeLangAr_Qact.triggered.connect(lambda: self._change_lang("ar", self.changeLangAr_Qact.text()))
|
||
self.changeLangAr_Qact.setDisabled(True)
|
||
# De
|
||
self.langs_Qmnu.addAction(self.changeLangDe_Qact)
|
||
self.changeLangDe_Qact.triggered.connect(lambda: self._change_lang("de", self.changeLangDe_Qact.text()))
|
||
self.changeLangDe_Qact.setDisabled(True)
|
||
# En
|
||
self.langs_Qmnu.addAction(self.changeLangEn_Qact)
|
||
self.changeLangEn_Qact.triggered.connect(lambda: self._change_lang("en", self.changeLangEn_Qact.text()))
|
||
# self.changeLangEn_Qact.setDisabled(True)
|
||
# Es
|
||
self.langs_Qmnu.addAction(self.changeLangEs_Qact)
|
||
self.changeLangEs_Qact.triggered.connect(lambda: self._change_lang("es", self.changeLangEs_Qact.text()))
|
||
self.changeLangEs_Qact.setDisabled(True)
|
||
# Fr
|
||
self.langs_Qmnu.addAction(self.changeLangFr_Qact)
|
||
self.changeLangFr_Qact.triggered.connect(lambda: self._change_lang("fr", self.changeLangFr_Qact.text()))
|
||
# self.changeLangFr_Qact.setDisabled(True)
|
||
# He
|
||
self.langs_Qmnu.addAction(self.changeLangHe_Qact)
|
||
self.changeLangHe_Qact.triggered.connect(lambda: self._change_lang("he", self.changeLangHe_Qact.text()))
|
||
self.changeLangHe_Qact.setDisabled(True)
|
||
# Hi
|
||
self.langs_Qmnu.addAction(self.changeLangHi_Qact)
|
||
self.changeLangHi_Qact.triggered.connect(lambda: self._change_lang("hi", self.changeLangHi_Qact.text()))
|
||
self.changeLangHi_Qact.setDisabled(True)
|
||
# It
|
||
self.langs_Qmnu.addAction(self.changeLangIt_Qact)
|
||
self.changeLangIt_Qact.triggered.connect(lambda: self._change_lang("it", self.changeLangIt_Qact.text()))
|
||
self.changeLangIt_Qact.setDisabled(True)
|
||
# Ja
|
||
self.langs_Qmnu.addAction(self.changeLangJa_Qact)
|
||
self.changeLangJa_Qact.triggered.connect(lambda: self._change_lang("ja", self.changeLangJa_Qact.text()))
|
||
self.changeLangJa_Qact.setDisabled(True)
|
||
# Ko
|
||
self.langs_Qmnu.addAction(self.changeLangKo_Qact)
|
||
self.changeLangKo_Qact.triggered.connect(lambda: self._change_lang("ko", self.changeLangKo_Qact.text()))
|
||
self.changeLangKo_Qact.setDisabled(True)
|
||
# Pt
|
||
self.langs_Qmnu.addAction(self.changeLangPt_Qact)
|
||
self.changeLangPt_Qact.triggered.connect(lambda: self._change_lang("pt", self.changeLangPt_Qact.text()))
|
||
self.changeLangPt_Qact.setDisabled(True)
|
||
# Ru
|
||
self.langs_Qmnu.addAction(self.changeLangRu_Qact)
|
||
self.changeLangRu_Qact.triggered.connect(lambda: self._change_lang("ru", self.changeLangRu_Qact.text()))
|
||
self.changeLangRu_Qact.setDisabled(True)
|
||
# Zh
|
||
self.langs_Qmnu.addAction(self.changeLangZh_Qact)
|
||
self.changeLangZh_Qact.triggered.connect(lambda: self._change_lang("zh", self.changeLangZh_Qact.text()))
|
||
self.changeLangZh_Qact.setDisabled(True)
|
||
|
||
def _init_help_qmnu(self):
|
||
""" Initialize Help menu. """
|
||
# add Help Menu to Main MenuBar
|
||
self.main_Qmnub.addMenu(self.help_Qmnu)
|
||
# add action "Help Content" to Help Menu
|
||
self.help_Qmnu.addAction(self.help_Qact)
|
||
# set action target
|
||
self.help_Qact.triggered.connect(self._show_help_window)
|
||
# add action "About Khadhroony SRLPv4" to Help Menu
|
||
self.help_Qmnu.addAction(self.about_Qact)
|
||
# set action target
|
||
self.about_Qact.triggered.connect(self._show_about_window)
|
||
|
||
def update_display(self):
|
||
""" refresh translations of texts in main menubar when language change. """
|
||
self.file_Qmnu.setTitle(self.tr("File"))
|
||
self.newWallet_Qact.setText(self.tr("New Wallet"))
|
||
self.openWallet_Qact.setText(self.tr("Open Wallet"))
|
||
self.filesHistory_Qmnu.setTitle(self.tr("Last Files"))
|
||
self.noFileHistory_Qact.setText(self.tr("No file found"))
|
||
if self._connected:
|
||
self.conn_disc_Qact.setText(self.tr("Disconnect"))
|
||
else:
|
||
self.conn_disc_Qact.setText(self.tr("Connect"))
|
||
self.hideApp_Qact.setText(self.tr("Hide application"))
|
||
self.closeApp_Qact.setText(self.tr("Close application"))
|
||
self.forceCloseApp_Qact.setText(self.tr("Close without saving"))
|
||
self.tools_Qmnu.setTitle(self.tr("Tools"))
|
||
self.checkAddress_Qact.setText(self.tr("Check Address"))
|
||
self.langs_Qmnu.setTitle(self.tr("Languages"))
|
||
self.help_Qmnu.setTitle(self.tr("Help"))
|
||
self.help_Qact.setText(self.tr("Help Content"))
|
||
self.about_Qact.setText(self.tr("About {app}").format(app = APP_NAME))
|
||
self.showApp_Qact.setText(self.tr("Show application"))
|
||
self.main_Qtabw.setTabText(0, self.tr("Tokens:"))
|
||
if self.wallet_Qw.wallet is None:
|
||
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = ""))
|
||
else:
|
||
self.main_Qtabw.setTabText(1, self.tr("Wallet: {walletName}").format(walletName = self.wallet_Qw.wallet.walletName))
|
||
self.main_Qtabw.setTabText(2, self.tr("Log Display:"))
|
||
self.wallet_Qw.update_display()
|
||
self.logViewer_Qw.update_display()
|
||
self.setLayoutDirection(get_qt_dir())
|
||
|
||
def closeEvent(self, event: QEvent):
|
||
""" Intercept closing to hide the window instead of exiting. """
|
||
self.hide()
|
||
event.ignore()
|
||
|
||
def showEvent(self, event) -> None:
|
||
""" fired on show event """
|
||
self.showApp_Qact.setDisabled(True)
|
||
self.hideApp_Qact.setDisabled(False)
|
||
super().showEvent(event)
|
||
if self._mustCenter:
|
||
self._mustCenter = False
|
||
center_on_screen(self)
|
||
|
||
def hideEvent(self, event) -> None:
|
||
""" fired on hide event """
|
||
self.showApp_Qact.setDisabled(False)
|
||
self.hideApp_Qact.setDisabled(True)
|
||
self._mustCenter = True
|
||
super().hideEvent(event)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
logger.remove() # Supprime les sorties par défaut
|
||
logger.add(sys.stderr, level = 'TRACE', format = '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{line: <4}</cyan> - <level>{message}</level>', filter = None, colorize = None, serialize = False, backtrace = True, diagnose = True, enqueue = False, context = None, catch = True)
|
||
logger.add("logs/KhadhroonyRaydium4.{time}.log", format = "{time:YYYY-MM-DD HH:mm:ss.SSS} - {level} - {line: <4}: {message}", level = "DEBUG")
|
||
# Gestion des options de ligne de commande
|
||
parser = argparse.ArgumentParser(
|
||
description = "==~-<=<~~> No Khadhrawy <$<~~>$> No Hamroony <~~>=>-~==\n\n" + APP_DESC + "\n" + "Launch without options to show main window",
|
||
formatter_class = argparse.RawTextHelpFormatter,
|
||
epilog = "==~-<=<~~> No Khadhrawy <$<~~>$> No Hamroony <~~>=>-~==\n\n",
|
||
prog = APP_NAME
|
||
)
|
||
parser.add_argument('-v', '--version', action = 'version', version = "{prog} (version {version})".format(prog = APP_NAME, version = APP_VERSION), help = "show program version and exit")
|
||
args = parser.parse_args()
|
||
# creation du répertoire de wallets
|
||
create_wallets_folder()
|
||
# chargement configuration de base / historique / langue / display etc ...
|
||
config = load_yaml_app_config()
|
||
config_locale = config["defaultLang"]
|
||
# Déterminer la langue par défaut
|
||
system_locale = QLocale.system().name().split("_")[0]
|
||
sys_lang = system_locale if system_locale in APP_LANGS else "en"
|
||
app_lang = config_locale if config_locale in APP_LANGS else sys_lang
|
||
config['defaultLang'] = app_lang
|
||
set_app_dir(app_lang)
|
||
translator = QTranslator()
|
||
translator.load(f":trans_{app_lang}")
|
||
# Créer l'application PySide6
|
||
app = QApplication(sys.argv)
|
||
app.installTranslator(translator)
|
||
# chargement du SplashScreen
|
||
splash_screen = SplashScreen()
|
||
splash_screen.show()
|
||
# Planifier l'affichage de la fenêtre principale après le splash screen
|
||
main_window = MainWindow(app, translator, config)
|
||
splash_screen.fadeOut.finished.connect(main_window.first_load)
|
||
sys.exit(app.exec())
|