Skip to content

Commit b1435d7

Browse files
authored
Merge branch 'main' into fix/clickhouse-multi-gateway-catalog-support
2 parents cdbece3 + d15203b commit b1435d7

8 files changed

Lines changed: 388 additions & 38 deletions

File tree

docs/reference/cli.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,17 @@ Usage: sqlmesh janitor [OPTIONS]
307307
The janitor cleans up old environments and expired snapshots.
308308
309309
Options:
310-
--ignore-ttl Cleanup snapshots that are not referenced in any environment,
311-
regardless of when they're set to expire
312-
--help Show this message and exit.
310+
--ignore-ttl Cleanup snapshots that are not referenced in any
311+
environment, regardless of when they're set to expire. Has
312+
no effect when --environment is specified.
313+
--force-delete Delete expired environment and snapshot state records even
314+
when the physical table or view drops fail. Any objects
315+
that could not be dropped become orphaned and must be
316+
removed manually.
317+
-e, --environment TEXT
318+
Scope cleanup to a single expired environment. Global
319+
snapshot and interval compaction are skipped.
320+
--help Show this message and exit.
313321
```
314322

315323
## migrate

sqlmesh/cli/main.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -629,24 +629,36 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
629629
@click.option(
630630
"--ignore-ttl",
631631
is_flag=True,
632-
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
632+
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. Has no effect when --environment is specified.",
633633
)
634634
@click.option(
635635
"--force-delete",
636636
is_flag=True,
637637
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
638638
"Any objects that could not be dropped become orphaned and must be removed manually.",
639639
)
640+
@click.option(
641+
"--environment",
642+
"-e",
643+
default=None,
644+
help="Scope cleanup to a single expired environment. Global snapshot and interval compaction are skipped.",
645+
)
640646
@click.pass_context
641647
@error_handler
642648
@cli_analytics
643-
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
649+
def janitor(
650+
ctx: click.Context,
651+
ignore_ttl: bool,
652+
force_delete: bool,
653+
environment: t.Optional[str],
654+
**kwargs: t.Any,
655+
) -> None:
644656
"""
645657
Run the janitor process on-demand.
646658
647659
The janitor cleans up old environments and expired snapshots.
648660
"""
649-
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)
661+
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, environment=environment, **kwargs)
650662

651663

652664
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -893,12 +893,20 @@ def _has_environment_changed() -> bool:
893893
return completion_status
894894

895895
@python_api_analytics
896-
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
896+
def run_janitor(
897+
self,
898+
ignore_ttl: bool,
899+
force_delete: bool = False,
900+
environment: t.Optional[str] = None,
901+
) -> bool:
902+
if environment is not None:
903+
environment = Environment.sanitize_name(environment)
904+
897905
success = False
898906

899907
if self.console.start_cleanup(ignore_ttl):
900908
try:
901-
self._run_janitor(ignore_ttl, force_delete=force_delete)
909+
self._run_janitor(ignore_ttl, force_delete=force_delete, environment=environment)
902910
success = True
903911
finally:
904912
self.console.stop_cleanup(success=success)
@@ -1831,7 +1839,7 @@ def invalidate_environment(self, name: str, sync: bool = False) -> None:
18311839
name = Environment.sanitize_name(name)
18321840
self.state_sync.invalidate_environment(name)
18331841
if sync:
1834-
self._cleanup_environments()
1842+
self._cleanup_environments(name=name)
18351843
self.console.log_success(f"Environment '{name}' deleted.")
18361844
else:
18371845
self.console.log_success(f"Environment '{name}' invalidated.")
@@ -2902,27 +2910,35 @@ def _destroy(self) -> bool:
29022910

29032911
return True
29042912

2905-
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
2913+
def _run_janitor(
2914+
self,
2915+
ignore_ttl: bool = False,
2916+
force_delete: bool = False,
2917+
environment: t.Optional[str] = None,
2918+
) -> None:
29062919
current_ts = now_timestamp()
29072920
failures: t.List[str] = []
29082921

29092922
# Clean up expired environments by removing their views and schemas
29102923
failures.extend(
2911-
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
2924+
self._cleanup_environments(
2925+
current_ts=current_ts, force_delete=force_delete, name=environment
2926+
)
29122927
)
29132928

2914-
failures.extend(
2915-
delete_expired_snapshots(
2916-
self.state_sync,
2917-
self.snapshot_evaluator,
2918-
current_ts=current_ts,
2919-
ignore_ttl=ignore_ttl,
2920-
force_delete=force_delete,
2921-
console=self.console,
2922-
batch_size=self.config.janitor.expired_snapshots_batch_size,
2929+
if environment is None:
2930+
failures.extend(
2931+
delete_expired_snapshots(
2932+
self.state_sync,
2933+
self.snapshot_evaluator,
2934+
current_ts=current_ts,
2935+
ignore_ttl=ignore_ttl,
2936+
force_delete=force_delete,
2937+
console=self.console,
2938+
batch_size=self.config.janitor.expired_snapshots_batch_size,
2939+
)
29232940
)
2924-
)
2925-
self.state_sync.compact_intervals()
2941+
self.state_sync.compact_intervals()
29262942

29272943
if failures:
29282944
failure_string = "\n - ".join(failures)
@@ -2935,15 +2951,23 @@ def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) ->
29352951
raise SQLMeshError(summary)
29362952

29372953
def _cleanup_environments(
2938-
self, current_ts: t.Optional[int] = None, force_delete: bool = False
2954+
self,
2955+
current_ts: t.Optional[int] = None,
2956+
force_delete: bool = False,
2957+
name: t.Optional[str] = None,
29392958
) -> t.List[str]:
29402959
current_ts = current_ts or now_timestamp()
29412960
failures: t.List[str] = []
29422961

29432962
expired_environments_summaries = self.state_sync.get_expired_environments(
2944-
current_ts=current_ts
2963+
current_ts=current_ts, name=name
29452964
)
29462965

2966+
if name is not None and not expired_environments_summaries:
2967+
self.console.log_warning(
2968+
f"Environment '{name}' is not expired or does not exist. Nothing to clean up."
2969+
)
2970+
29472971
for expired_env_summary in expired_environments_summaries:
29482972
expired_env = self.state_reader.get_environment(expired_env_summary.name)
29492973

@@ -2960,7 +2984,7 @@ def _cleanup_environments(
29602984
# we want to retry on the next janitor pass if drops failed, unless
29612985
# force_delete is set in which case we purge state records regardless
29622986
if not failures or force_delete:
2963-
self.state_sync.delete_expired_environments(current_ts=current_ts)
2987+
self.state_sync.delete_expired_environments(current_ts=current_ts, name=name)
29642988
return failures
29652989

29662990
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:

sqlmesh/core/state_sync/base.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,17 @@ def get_expired_snapshots(
321321
"""
322322

323323
@abc.abstractmethod
324-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
324+
def get_expired_environments(
325+
self, current_ts: int, name: t.Optional[str] = None
326+
) -> t.List[EnvironmentSummary]:
325327
"""Returns the expired environments.
326328
327329
Expired environments are environments that have exceeded their time-to-live value.
330+
331+
Args:
332+
current_ts: The current timestamp in milliseconds used to determine expiration.
333+
name: If provided, only the environment with this name is considered.
334+
328335
Returns:
329336
The list of environment summaries to remove.
330337
"""
@@ -436,12 +443,16 @@ def finalize(self, environment: Environment) -> None:
436443

437444
@abc.abstractmethod
438445
def delete_expired_environments(
439-
self, current_ts: t.Optional[int] = None
446+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
440447
) -> t.List[EnvironmentSummary]:
441448
"""Removes expired environments.
442449
443450
Expired environments are environments that have exceeded their time-to-live value.
444451
452+
Args:
453+
current_ts: The current timestamp in milliseconds. Defaults to now.
454+
name: If provided, only the environment with this name is deleted.
455+
445456
Returns:
446457
The list of removed environments.
447458
"""

sqlmesh/core/state_sync/db/environment.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,31 +167,42 @@ def finalize(self, environment: Environment) -> None:
167167
where=environment_filter,
168168
)
169169

170-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
170+
def get_expired_environments(
171+
self, current_ts: int, name: t.Optional[str] = None
172+
) -> t.List[EnvironmentSummary]:
171173
"""Returns the expired environments.
172174
173175
Expired environments are environments that have exceeded their time-to-live value.
176+
177+
Args:
178+
current_ts: The current timestamp in milliseconds used to determine expiration.
179+
name: If provided, only the environment with this name is considered.
180+
174181
Returns:
175182
The list of environment summaries to remove.
176183
"""
177184
return self._fetch_environment_summaries(
178-
where=self._create_expiration_filter_expr(current_ts)
185+
where=self._create_expiration_filter_expr(current_ts, name=name)
179186
)
180187

181188
def delete_expired_environments(
182-
self, current_ts: t.Optional[int] = None
189+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
183190
) -> t.List[EnvironmentSummary]:
184191
"""Deletes expired environments.
185192
193+
Args:
194+
current_ts: The current timestamp in milliseconds. Defaults to now.
195+
name: If provided, only the environment with this name is deleted.
196+
186197
Returns:
187198
A list of deleted environments.
188199
"""
189200
current_ts = current_ts or now_timestamp()
190-
expired_environments = self.get_expired_environments(current_ts=current_ts)
201+
expired_environments = self.get_expired_environments(current_ts=current_ts, name=name)
191202

192203
self.engine_adapter.delete_from(
193204
self.environments_table,
194-
where=self._create_expiration_filter_expr(current_ts),
205+
where=self._create_expiration_filter_expr(current_ts, name=name),
195206
)
196207

197208
# Delete the expired environments' corresponding environment statements
@@ -310,16 +321,22 @@ def _environments_query(
310321
return query.lock(copy=False)
311322
return query
312323

313-
def _create_expiration_filter_expr(self, current_ts: int) -> exp.Expr:
324+
def _create_expiration_filter_expr(
325+
self, current_ts: int, name: t.Optional[str] = None
326+
) -> exp.Expr:
314327
"""Creates a SQLGlot filter expression to find expired environments.
315328
316329
Args:
317330
current_ts: The current timestamp.
331+
name: If provided, adds an equality filter on the environment name.
318332
"""
319-
return exp.LTE(
333+
where: exp.Expr = exp.LTE(
320334
this=exp.column("expiration_ts"),
321335
expression=exp.Literal.number(current_ts),
322336
)
337+
if name is not None:
338+
where = exp.and_(t.cast(exp.Condition, where), exp.column("name").eq(name))
339+
return where
323340

324341
def _fetch_environment_summaries(
325342
self, where: t.Optional[str | exp.Expr] = None

sqlmesh/core/state_sync/db/facade.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,10 @@ def get_expired_snapshots(
276276
batch_range=batch_range,
277277
)
278278

279-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
280-
return self.environment_state.get_expired_environments(current_ts=current_ts)
279+
def get_expired_environments(
280+
self, current_ts: int, name: t.Optional[str] = None
281+
) -> t.List[EnvironmentSummary]:
282+
return self.environment_state.get_expired_environments(current_ts=current_ts, name=name)
281283

282284
@transactional()
283285
def delete_expired_snapshots(
@@ -297,10 +299,10 @@ def delete_expired_snapshots(
297299

298300
@transactional()
299301
def delete_expired_environments(
300-
self, current_ts: t.Optional[int] = None
302+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
301303
) -> t.List[EnvironmentSummary]:
302304
current_ts = current_ts or now_timestamp()
303-
return self.environment_state.delete_expired_environments(current_ts=current_ts)
305+
return self.environment_state.delete_expired_environments(current_ts=current_ts, name=name)
304306

305307
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
306308
self.snapshot_state.delete_snapshots(snapshot_ids)

0 commit comments

Comments
 (0)