Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions centml/sdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,52 @@ def invite_user(self, email: str):
def get_capacity(self, cluster_id=None):
return self._api.list_cluster_capacity_capacity_get(cluster_id=cluster_id).results

def get_deployment_revisions(self, deployment_id: int):
return self._api.get_deployment_revisions_deployments_revisions_deployment_id_get(
deployment_id=deployment_id
).results

def get_deployment_logs(
self,
deployment_id: int,
revision_number: int,
start_time: int,
end_time: int,
line_count: int = 100,
start_from_head: bool = True,
stream: bool = False,
):
"""Fetch logs for a deployment within a time window, handling pagination automatically.

start_time and end_time are Unix timestamps in milliseconds.
Use get_deployment_revisions() to find the current revision number.

If stream=True, returns a generator that yields events as each page is fetched.
If stream=False (default), returns a flat list of all events.
"""

def _iter_events():
next_page_token = None
while True:
response = self._api.get_deployment_logs_v3_deployments_logs_v3_deployment_id_revision_number_get(
deployment_id=deployment_id,
revision_number=revision_number,
start_time=start_time,
end_time=end_time,
next_page_token=next_page_token,
start_from_head=start_from_head,
line_count=line_count,
)
yield from response.events
next_page_token = response.next_page_token
if not next_page_token:
break

if stream:
return _iter_events()

return list(_iter_events())


@contextmanager
def get_centml_client():
Expand Down
75 changes: 75 additions & 0 deletions examples/sdk/get_deployment_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from datetime import datetime, timezone, timedelta

from centml.sdk.api import get_centml_client

# --- Configuration ---
DEPLOYMENT_ID = 1234 # Replace with your deployment ID
REVISION_NUMBER = 10
HOURS_BACK = 1 # Fetch logs from the last N hours


def format_event(event: dict) -> str:
timestamp_ms = (
event.get("timestamp")
or event.get("time")
or event.get("ts")
or ""
)
message = (
event.get("message")
or event.get("msg")
or event.get("log")
or str(event)
)
if timestamp_ms:
ts = datetime.fromtimestamp(int(timestamp_ms) / 1000, tz=timezone.utc).isoformat()
return f"[{ts}] {message}"
return message


def main():
stream = True
end_time = int(datetime.now(timezone.utc).timestamp() * 1000)
start_time = end_time - int(timedelta(hours=HOURS_BACK).total_seconds() * 1000)

print(f"Fetching logs for deployment {DEPLOYMENT_ID}")
print(
f"Time window: "
f"{datetime.fromtimestamp(start_time / 1000, tz=timezone.utc).isoformat()} → "
f"{datetime.fromtimestamp(end_time / 1000, tz=timezone.utc).isoformat()}"
)
print()
Comment thread
anandj91 marked this conversation as resolved.

with get_centml_client() as cclient:
if stream:
# Streaming: print events as each page arrives
for event in cclient.get_deployment_logs(
deployment_id=DEPLOYMENT_ID,
revision_number=REVISION_NUMBER,
start_time=start_time,
end_time=end_time,
start_from_head=False,
stream=stream,
):
print(format_event(event))
else:
# Batch: collect all events then process
events = cclient.get_deployment_logs(
deployment_id=DEPLOYMENT_ID,
revision_number=REVISION_NUMBER,
start_time=start_time,
end_time=end_time,
start_from_head=False,
)

if not events:
print("No logs found in the given time window.")
return

print(f"Found {len(events)} log entries:\n")
for event in events:
print(format_event(event))


if __name__ == "__main__":
main()
Loading