Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 167 additions & 53 deletions src/project_x_py/order_manager/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def main():

import asyncio
import time
from datetime import datetime
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from typing import TYPE_CHECKING, Any, Optional

Expand Down Expand Up @@ -731,6 +731,83 @@ async def search_open_orders(

return open_orders

@handle_errors("search orders")
async def search_orders(
self,
start_timestamp: datetime | None = None,
end_timestamp: datetime | None = None,
contract_id: str | None = None,
account_id: int | None = None,
) -> list[Order]:
"""
Search historical orders with optional contract and time filters.

Unlike search_open_orders, this method returns terminal orders such as
filled, cancelled, expired, and rejected orders. It is useful for
reconciling race conditions where an order changes state while a cancel
request is in flight.

Args:
start_timestamp: Start of the search window. Defaults to 30 days ago.
end_timestamp: End of the search window. Defaults to now.
contract_id: Filter by instrument (optional).
account_id: Account ID. Uses default account if None.

Returns:
List of Order objects.
"""
if account_id is None:
if not self.project_x.account_info:
await self.project_x.authenticate()
if not self.project_x.account_info:
raise ProjectXOrderError(ErrorMessages.ORDER_NO_ACCOUNT)
account_id = self.project_x.account_info.id

if end_timestamp is None:
end_timestamp = datetime.now(UTC)
if start_timestamp is None:
start_timestamp = end_timestamp - timedelta(days=30)

payload: dict[str, Any] = {
"accountId": account_id,
"startTimestamp": start_timestamp.isoformat(),
"endTimestamp": end_timestamp.isoformat(),
}

if contract_id:
resolved = await resolve_contract_id(contract_id, self.project_x)
if resolved and resolved.get("id"):
payload["contractId"] = resolved["id"]

response = await self.project_x._make_request(
"POST", "/Order/search", data=payload
)

if not isinstance(response, dict):
raise ProjectXOrderError("Invalid response format")

if not response.get("success", False):
error_msg = response.get("errorMessage", ErrorMessages.ORDER_SEARCH_FAILED)
raise ProjectXOrderError(error_msg)

orders = []
for order_data in response.get("orders", []):
try:
order = Order(**order_data)
orders.append(order)

async with self.order_lock:
self.tracked_orders[str(order.id)] = order_data
self.order_status_cache[str(order.id)] = order.status
except Exception as e:
self.logger.warning(
"Failed to parse order",
extra={"error": str(e), "order_data": order_data},
)
continue

return orders

async def _check_circuit_breaker(self) -> bool:
"""
Check if circuit breaker allows execution.
Expand Down Expand Up @@ -916,15 +993,37 @@ async def get_order_by_id(self, order_id: int) -> Order | None:
except Exception as e:
self.logger.debug(f"Failed to parse cached order data: {e}")

# Fallback to API search
# Fallback to open order search
try:
orders = await self.search_open_orders()
for order in orders:
if order.id == order_id:
return order
return None
except Exception as e:
self.logger.debug(f"Failed to search open orders for {order_id}: {e}")

# Fallback to historical order search so terminal orders can be reconciled.
try:
orders = await self.search_orders()
for order in orders:
if order.id == order_id:
return order
except Exception as e:
self.logger.error(f"Failed to get order {order_id}: {e}")

return None

async def _cache_order_status(
self, order_id: int, status: int, order_data: dict[str, Any] | None = None
) -> None:
"""Update local order caches for a known order status."""
async with self.order_lock:
order_id_str = str(order_id)
if order_data is not None:
self.tracked_orders[order_id_str] = order_data
elif order_id_str in self.tracked_orders:
self.tracked_orders[order_id_str]["status"] = status
self.order_status_cache[order_id_str] = status
return None

@handle_errors("cancel order")
Expand All @@ -941,15 +1040,17 @@ async def cancel_order(self, order_id: int, account_id: int | None = None) -> bo
"""
self.logger.info(LogMessages.ORDER_CANCEL, extra={"order_id": order_id})

# Check if order is already known to be terminal before submitting a
# cancel. Keep the lock scope small so reconciliation can perform API
# lookups without deadlocking on cache updates.
order_id_str = str(order_id)
async with self.order_lock:
# Check if order is already filled
order_id_str = str(order_id)
if order_id_str in self.order_status_cache:
status = self.order_status_cache[order_id_str]
if status == OrderStatus.FILLED or status == 2: # 2 is FILLED
raise ProjectXOrderError(
f"Cannot cancel order {order_id}: already filled"
)
return False
if status == OrderStatus.CANCELLED or status == 3:
return True

# Also check tracked orders
if order_id_str in self.tracked_orders:
Expand All @@ -958,58 +1059,71 @@ async def cancel_order(self, order_id: int, account_id: int | None = None) -> bo
tracked.get("status") == OrderStatus.FILLED
or tracked.get("status") == 2
):
raise ProjectXOrderError(
f"Cannot cancel order {order_id}: already filled"
)
return False
if (
tracked.get("status") == OrderStatus.CANCELLED
or tracked.get("status") == 3
):
return True

# Get account ID if not provided
if account_id is None:
if not self.project_x.account_info:
await self.project_x.authenticate()
if not self.project_x.account_info:
raise ProjectXOrderError(ErrorMessages.ORDER_NO_ACCOUNT)
account_id = self.project_x.account_info.id

# Use correct endpoint and payload structure
payload = {
"accountId": account_id,
"orderId": order_id,
}

# Get account ID if not provided
if account_id is None:
if not self.project_x.account_info:
await self.project_x.authenticate()
if not self.project_x.account_info:
raise ProjectXOrderError(ErrorMessages.ORDER_NO_ACCOUNT)
account_id = self.project_x.account_info.id
response = await self.project_x._make_request(
"POST", "/Order/cancel", data=payload
)

# Use correct endpoint and payload structure
payload = {
"accountId": account_id,
"orderId": order_id,
}
# Response should be a dict
if not isinstance(response, dict):
raise ProjectXOrderError("Invalid response format")

response = await self.project_x._make_request(
"POST", "/Order/cancel", data=payload
)
success = response.get("success", False) if response else False

# Response should be a dict
if not isinstance(response, dict):
raise ProjectXOrderError("Invalid response format")
if success:
await self._cache_order_status(order_id, OrderStatus.CANCELLED)

success = response.get("success", False) if response else False

if success:
# Update cache
if str(order_id) in self.tracked_orders:
self.tracked_orders[str(order_id)]["status"] = OrderStatus.CANCELLED
self.order_status_cache[str(order_id)] = OrderStatus.CANCELLED
# Update statistics
await self.increment("orders_cancelled")
self.stats["orders_cancelled"] += 1
self.logger.info(
LogMessages.ORDER_CANCELLED, extra={"order_id": order_id}
)
return True

# Update statistics
await self.increment("orders_cancelled")
self.stats["orders_cancelled"] += 1
self.logger.info(
LogMessages.ORDER_CANCELLED, extra={"order_id": order_id}
)
reconciled_order = await self.get_order_by_id(order_id)
if reconciled_order is not None and reconciled_order.is_terminal:
await self._cache_order_status(reconciled_order.id, reconciled_order.status)
if reconciled_order.is_cancelled:
return True
else:
error_msg = response.get(
"errorMessage", ErrorMessages.ORDER_CANCEL_FAILED
)
raise ProjectXOrderError(
format_error_message(
ErrorMessages.ORDER_CANCEL_FAILED,
order_id=order_id,
reason=error_msg,
)
)
self.logger.info(
"Order already terminal before cancel completed",
extra={
"order_id": order_id,
"status": reconciled_order.status,
"status_str": reconciled_order.status_str,
},
)
return False

error_msg = response.get("errorMessage", ErrorMessages.ORDER_CANCEL_FAILED)
raise ProjectXOrderError(
format_error_message(
ErrorMessages.ORDER_CANCEL_FAILED,
order_id=order_id,
reason=error_msg,
)
)

@handle_errors("modify order")
async def modify_order(
Expand Down
5 changes: 2 additions & 3 deletions tests/order_manager/test_core_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,8 @@ async def test_cancel_already_filled_order(self, order_manager):
order_manager.tracked_orders["999"] = {"status": 2} # Already filled
order_manager.order_status_cache["999"] = 2

with pytest.raises(ProjectXOrderError) as exc_info:
await order_manager.cancel_order(999)
assert "already filled" in str(exc_info.value).lower()
result = await order_manager.cancel_order(999)
assert result is False

@pytest.mark.asyncio
async def test_get_order_by_id_with_invalid_cache_data(self, order_manager):
Expand Down
Loading