Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ select = [
"TC", # flake8-type-checking
]

[tool.ruff.lint.per-file-ignores]
"tests/**" = ["SIM117"] # Nested with statements are idiomatic in test mocking

[tool.ruff.lint.flake8-bugbear]
extend-immutable-calls = ["pydantic.Field"]

Expand Down
246 changes: 138 additions & 108 deletions src/decibel/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from aptos_sdk.ed25519 import Signature as Ed25519Signature
from aptos_sdk.transactions import FeePayerRawTransaction, SignedTransaction

from ._constants import DEFAULT_TXN_CONFIRM_TIMEOUT, DEFAULT_TXN_SUBMIT_TIMEOUT
from ._constants import (
DEFAULT_TXN_CONFIRM_TIMEOUT,
DEFAULT_TXN_SUBMIT_TIMEOUT,
HTTP_LIMITS,
HTTP_TIMEOUT,
)
from ._exceptions import TxnConfirmError, TxnSubmitError
from ._fee_pay import (
PendingTransactionResponse,
Expand Down Expand Up @@ -50,6 +55,14 @@
DEFAULT_GAS_ESTIMATE = 100
MAX_GAS_UNITS_LIMIT = 2_000_000

_POLL_DELAYS = (0.2, 0.2, 0.5, 0.5, 1.0)


def _poll_delay(index: int) -> float:
if index < len(_POLL_DELAYS):
return _POLL_DELAYS[index]
return 1.0


@dataclass
class BaseSDKOptions:
Expand Down Expand Up @@ -82,6 +95,7 @@ def __init__(
self._chain_id = config.chain_id
self._abi_registry = AbiRegistry(chain_id=config.chain_id)
self._aptos = RestClient(config.fullnode_url)
self._http_client = httpx.AsyncClient(limits=HTTP_LIMITS, timeout=HTTP_TIMEOUT)

opts = opts or BaseSDKOptions()
self._skip_simulate = opts.skip_simulate
Expand Down Expand Up @@ -124,6 +138,20 @@ def time_delta_ms(self) -> int:
def time_delta_ms(self, value: int) -> None:
self._time_delta_ms = value

async def close(self) -> None:
await self._http_client.aclose()

async def __aenter__(self) -> BaseSDK:
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
await self.close()

def _get_abi(self, function_id: str) -> MoveFunction | None:
return self._abi_registry.get_function(function_id)

Expand Down Expand Up @@ -181,6 +209,7 @@ async def submit_tx(
self._config,
transaction,
sender_authenticator,
client=self._http_client,
txn_submit_timeout=txn_submit_timeout,
)

Expand Down Expand Up @@ -277,8 +306,7 @@ async def _fetch_gas_price_estimation(self) -> int:
url = f"{self._config.fullnode_url}/estimate_gas_price"
headers = self._build_node_headers()

async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
response = await self._http_client.get(url, headers=headers, timeout=5.0)

if not response.is_success:
raise ValueError(f"Failed to fetch gas price: {response.status_code} - {response.text}")
Expand All @@ -296,13 +324,12 @@ async def _simulate_transaction(

bcs_bytes = self._serialize_for_simulation(transaction)

async with httpx.AsyncClient() as client:
response = await client.post(
url,
content=bcs_bytes,
headers=headers,
params={"estimate_max_gas_amount": "true", "estimate_gas_unit_price": "true"},
)
response = await self._http_client.post(
url,
content=bcs_bytes,
headers=headers,
params={"estimate_max_gas_amount": "true", "estimate_gas_unit_price": "true"},
)

if not response.is_success:
raise ValueError(
Expand All @@ -327,10 +354,9 @@ async def _submit_direct(

bcs_bytes = self._serialize_signed_transaction(transaction, sender_authenticator)

async with httpx.AsyncClient() as client:
response = await client.post(
url, content=bcs_bytes, headers=headers, timeout=txn_submit_timeout
)
response = await self._http_client.post(
url, content=bcs_bytes, headers=headers, timeout=txn_submit_timeout
)

if not response.is_success:
raise ValueError(
Expand All @@ -353,40 +379,41 @@ async def _wait_for_transaction(
self,
tx_hash: str,
txn_confirm_timeout: float | None = None, # Uses DEFAULT_TXN_CONFIRM_TIMEOUT if None
poll_interval_secs: float = 1.0,
) -> dict[str, Any]:
if txn_confirm_timeout is None:
txn_confirm_timeout = DEFAULT_TXN_CONFIRM_TIMEOUT
url = f"{self._config.fullnode_url}/transactions/by_hash/{tx_hash}"
headers = self._build_node_headers()
start_time = time.time()

async with httpx.AsyncClient() as client:
while True:
try:
response = await client.get(url, headers=headers)
except httpx.ConnectTimeout:
pass
except httpx.ReadTimeout:
pass
except httpx.ConnectError:
pass
else:
if response.is_success:
data = cast("dict[str, Any]", response.json())
tx_type = data.get("type")
if tx_type == "pending_transaction":
pass
elif data.get("success") is True:
return data
elif data.get("success") is False:
vm_status = data.get("vm_status", "Unknown error")
raise TxnConfirmError(tx_hash, f"failed: {vm_status}")

if time.time() - start_time > txn_confirm_timeout:
raise TxnConfirmError(tx_hash, f"did not confirm within {txn_confirm_timeout}s")

await self._async_sleep(poll_interval_secs)
poll_index = 0
while True:
try:
response = await self._http_client.get(url, headers=headers, timeout=5.0)
except httpx.ConnectTimeout:
pass
except httpx.ReadTimeout:
pass
except httpx.ConnectError:
pass
else:
if response.is_success:
data = cast("dict[str, Any]", response.json())
tx_type = data.get("type")
if tx_type == "pending_transaction":
pass
elif data.get("success") is True:
return data
elif data.get("success") is False:
vm_status = data.get("vm_status", "Unknown error")
raise TxnConfirmError(tx_hash, f"failed: {vm_status}")

if time.time() - start_time > txn_confirm_timeout:
raise TxnConfirmError(tx_hash, f"did not confirm within {txn_confirm_timeout}s")

delay = _poll_delay(poll_index)
poll_index += 1
await self._async_sleep(delay)

async def _async_sleep(self, seconds: float) -> None:
import asyncio
Expand Down Expand Up @@ -461,7 +488,10 @@ def __init__(
self._node_api_key = opts.node_api_key
self._gas_price_manager = opts.gas_price_manager
self._time_delta_ms = opts.time_delta_ms
self._http_client = opts.http_client
self._http_client = opts.http_client or httpx.Client(
limits=HTTP_LIMITS, timeout=HTTP_TIMEOUT
)
self._owns_http_client = opts.http_client is None

if config.chain_id is None:
logger.warning(
Expand Down Expand Up @@ -493,6 +523,21 @@ def time_delta_ms(self) -> int:
def time_delta_ms(self, value: int) -> None:
self._time_delta_ms = value

def close(self) -> None:
if self._owns_http_client:
self._http_client.close()

def __enter__(self) -> BaseSDKSync:
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
self.close()

def _get_abi(self, function_id: str) -> MoveFunction | None:
return self._abi_registry.get_function(function_id)

Expand Down Expand Up @@ -550,6 +595,7 @@ def submit_tx(
self._config,
transaction,
sender_authenticator,
client=self._http_client,
txn_submit_timeout=txn_submit_timeout,
)

Expand Down Expand Up @@ -644,19 +690,11 @@ def _fetch_gas_price_estimation(self) -> int:
url = f"{self._config.fullnode_url}/estimate_gas_price"
headers = self._build_node_headers()

def make_request(client: httpx.Client) -> int:
response = client.get(url, headers=headers)
if not response.is_success:
raise ValueError(
f"Failed to fetch gas price: {response.status_code} - {response.text}"
)
data = cast("dict[str, Any]", response.json())
return int(data.get("gas_estimate", DEFAULT_GAS_ESTIMATE))

if self._http_client is not None:
return make_request(self._http_client)
with httpx.Client() as client:
return make_request(client)
response = self._http_client.get(url, headers=headers, timeout=5.0)
if not response.is_success:
raise ValueError(f"Failed to fetch gas price: {response.status_code} - {response.text}")
data = cast("dict[str, Any]", response.json())
return int(data.get("gas_estimate", DEFAULT_GAS_ESTIMATE))

def _simulate_transaction(
self,
Expand All @@ -667,26 +705,20 @@ def _simulate_transaction(
headers["Content-Type"] = "application/x.aptos.signed_transaction+bcs"
bcs_bytes = self._serialize_for_simulation(transaction)

def make_request(client: httpx.Client) -> dict[str, Any]:
response = client.post(
url,
content=bcs_bytes,
headers=headers,
params={"estimate_max_gas_amount": "true", "estimate_gas_unit_price": "true"},
response = self._http_client.post(
url,
content=bcs_bytes,
headers=headers,
params={"estimate_max_gas_amount": "true", "estimate_gas_unit_price": "true"},
)
if not response.is_success:
raise ValueError(
f"Transaction simulation failed: {response.status_code} - {response.text}"
)
if not response.is_success:
raise ValueError(
f"Transaction simulation failed: {response.status_code} - {response.text}"
)
data: list[dict[str, Any]] | dict[str, Any] = response.json()
if isinstance(data, list) and len(data) > 0:
return data[0]
raise ValueError("Transaction simulation returned empty results")

if self._http_client is not None:
return make_request(self._http_client)
with httpx.Client() as client:
return make_request(client)
data: list[dict[str, Any]] | dict[str, Any] = response.json()
if isinstance(data, list) and len(data) > 0:
return data[0]
raise ValueError("Transaction simulation returned empty results")

def _submit_direct(
self,
Expand All @@ -699,45 +731,46 @@ def _submit_direct(
headers["Content-Type"] = "application/x.aptos.signed_transaction+bcs"
bcs_bytes = self._serialize_signed_transaction(transaction, sender_authenticator)

def make_request(client: httpx.Client) -> PendingTransactionResponse:
response = client.post(
url, content=bcs_bytes, headers=headers, timeout=txn_submit_timeout
)
if not response.is_success:
raise ValueError(
f"Transaction submission failed: {response.status_code} - {response.text}"
)
data = cast("dict[str, Any]", response.json())
raw_txn = transaction.raw_transaction
return PendingTransactionResponse(
hash=str(data.get("hash", "")),
sender=str(raw_txn.sender),
sequence_number=str(raw_txn.sequence_number),
max_gas_amount=str(raw_txn.max_gas_amount),
gas_unit_price=str(raw_txn.gas_unit_price),
expiration_timestamp_secs=str(raw_txn.expiration_timestamps_secs),
response = self._http_client.post(
url, content=bcs_bytes, headers=headers, timeout=txn_submit_timeout
)
if not response.is_success:
raise ValueError(
f"Transaction submission failed: {response.status_code} - {response.text}"
)

if self._http_client is not None:
return make_request(self._http_client)
with httpx.Client() as client:
return make_request(client)
data = cast("dict[str, Any]", response.json())
raw_txn = transaction.raw_transaction
return PendingTransactionResponse(
hash=str(data.get("hash", "")),
sender=str(raw_txn.sender),
sequence_number=str(raw_txn.sequence_number),
max_gas_amount=str(raw_txn.max_gas_amount),
gas_unit_price=str(raw_txn.gas_unit_price),
expiration_timestamp_secs=str(raw_txn.expiration_timestamps_secs),
)

def _wait_for_transaction(
self,
tx_hash: str,
txn_confirm_timeout: float | None = None, # Uses DEFAULT_TXN_CONFIRM_TIMEOUT if None
poll_interval_secs: float = 1.0,
) -> dict[str, Any]:
if txn_confirm_timeout is None:
txn_confirm_timeout = DEFAULT_TXN_CONFIRM_TIMEOUT
url = f"{self._config.fullnode_url}/transactions/by_hash/{tx_hash}"
headers = self._build_node_headers()
start_time = time.time()

def poll_loop(client: httpx.Client) -> dict[str, Any]:
while True:
response = client.get(url, headers=headers)
poll_index = 0
while True:
try:
response = self._http_client.get(url, headers=headers, timeout=5.0)
except httpx.ConnectTimeout:
Comment thread
gregnazario marked this conversation as resolved.
pass
except httpx.ReadTimeout:
pass
except httpx.ConnectError:
pass
else:
if response.is_success:
data = cast("dict[str, Any]", response.json())
tx_type = data.get("type")
Expand All @@ -748,14 +781,11 @@ def poll_loop(client: httpx.Client) -> dict[str, Any]:
elif data.get("success") is False:
vm_status = data.get("vm_status", "Unknown error")
raise TxnConfirmError(tx_hash, f"failed: {vm_status}")
if time.time() - start_time > txn_confirm_timeout:
raise TxnConfirmError(tx_hash, f"did not confirm within {txn_confirm_timeout}s")
time.sleep(poll_interval_secs)

if self._http_client is not None:
return poll_loop(self._http_client)
with httpx.Client() as client:
return poll_loop(client)
if time.time() - start_time > txn_confirm_timeout:
raise TxnConfirmError(tx_hash, f"did not confirm within {txn_confirm_timeout}s")
delay = _poll_delay(poll_index)
poll_index += 1
time.sleep(delay)
Comment thread
gregnazario marked this conversation as resolved.

def _build_node_headers(self) -> dict[str, str]:
headers: dict[str, str] = {}
Expand Down
Loading
Loading