22import pickle
33import sqlite3
44import sys
5+ import threading
56from contextlib import contextmanager
67from dataclasses import dataclass
78from enum import Enum
89from pathlib import Path
9- from typing import Any , Generic , Iterator , List , Optional , Tuple , Type , TypeVar , Union
10+ from typing import Any , Callable , Generic , Iterator , List , Optional , Tuple , Type , TypeVar , Union
11+
12+ from robotcode .core .utils .logging import LoggingDescriptor
1013
1114from ..utils import get_robot_version_str
1215
1922
2023_M = TypeVar ("_M" )
2124_D = TypeVar ("_D" )
25+ _R = TypeVar ("_R" )
2226
2327
2428class CacheSection (Enum ):
@@ -38,14 +42,14 @@ class CacheEntry(Generic[_M, _D]):
3842
3943 def __init__ (
4044 self ,
41- conn : sqlite3 . Connection ,
45+ cache : "SqliteDataCache" ,
4246 section : "CacheSection" ,
4347 entry_name : str ,
4448 meta_blob : Optional [bytes ],
4549 meta_type : Union [Type [_M ], Tuple [Type [_M ], ...]],
4650 data_type : Union [Type [_D ], Tuple [Type [_D ], ...]],
4751 ) -> None :
48- self ._conn = conn
52+ self ._cache = cache
4953 self ._section = section
5054 self ._entry_name = entry_name
5155 self ._meta_blob = meta_blob
@@ -70,10 +74,7 @@ def meta(self) -> Optional[_M]:
7074 @property
7175 def data (self ) -> _D :
7276 if not self ._data_loaded :
73- row = self ._conn .execute (
74- f"SELECT data FROM { self ._section .value } WHERE entry_name = ?" ,
75- (self ._entry_name ,),
76- ).fetchone ()
77+ row = self ._cache ._fetch_data (self ._section , self ._entry_name )
7778 if row is None :
7879 raise RuntimeError (f"Cache entry '{ self ._entry_name } ' disappeared from DB" )
7980 result = pickle .loads (row [0 ])
@@ -247,16 +248,50 @@ def build_cache_dir(base_path: Path) -> Path:
247248 )
248249
249250
251+ # Primary SQLite result codes for an unusable database file.
252+ _SQLITE_CORRUPT = 11
253+ _SQLITE_NOTADB = 26
254+
255+ # Substring markers used to recognize corruption when no error code is available
256+ # (Python < 3.11) or when the error is raised by the sqlite3 module itself rather
257+ # than the engine (e.g. "Could not decode to UTF-8" on a corrupt TEXT column, which
258+ # carries no error code on any version). SQLite error strings are fixed and not
259+ # localized, so substring matching is safe.
260+ _CORRUPTION_MESSAGE_MARKERS = ("malformed" , "not a database" , "disk image" , "corrupt" , "decode" )
261+
262+
263+ def _is_corruption (exc : sqlite3 .DatabaseError ) -> bool :
264+ """Whether the error means the database file itself is corrupt/unusable.
265+
266+ A locked database, a closed connection (``ProgrammingError``) or a constraint
267+ violation are *not* corruption and must not trigger a destructive rebuild.
268+ """
269+ code = getattr (exc , "sqlite_errorcode" , None ) # available since Python 3.11
270+ if code is not None :
271+ return (code & 0xFF ) in (_SQLITE_CORRUPT , _SQLITE_NOTADB )
272+ message = str (exc ).lower ()
273+ return any (marker in message for marker in _CORRUPTION_MESSAGE_MARKERS )
274+
275+
250276class SqliteDataCache :
251277 """Cache backend using a single SQLite database with per-section tables.
252278
253279 Each CacheSection gets its own table with entry_name as PK, plus meta and data
254280 BLOB columns. An app_version is stored in a metadata table; on version mismatch
255281 all tables are dropped and recreated.
282+
283+ All access to the single shared connection is serialized through a lock, and a
284+ corrupt database (``sqlite3.DatabaseError``, e.g. "database disk image is
285+ malformed") is detected and rebuilt from scratch instead of propagating to
286+ callers.
256287 """
257288
289+ _logger = LoggingDescriptor ()
290+
258291 def __init__ (self , cache_dir : Path , app_version : str = "" ) -> None :
259292 self .cache_dir = cache_dir
293+ self ._app_version = app_version
294+ self ._lock = threading .Lock ()
260295
261296 if not cache_dir .exists ():
262297 cache_dir .mkdir (parents = True )
@@ -267,25 +302,84 @@ def __init__(self, cache_dir: Path, app_version: str = "") -> None:
267302
268303 self ._lock_fd = _acquire_shared_lock (cache_dir )
269304
270- db_path = cache_dir / "cache.db"
271- self ._conn = sqlite3 .connect (str (db_path ), check_same_thread = False )
305+ try :
306+ self ._open ()
307+ except sqlite3 .DatabaseError as e :
308+ if not _is_corruption (e ):
309+ raise
310+ self ._rebuild ()
311+
312+ def _open (self , * , in_memory : bool = False ) -> None :
313+ """Open the connection, configure it, and ensure the schema exists."""
314+ self ._conn = sqlite3 .connect (":memory:" if in_memory else str (self .db_path ), check_same_thread = False )
272315 self ._conn .execute ("PRAGMA journal_mode=WAL" )
273316 self ._conn .execute ("PRAGMA synchronous=NORMAL" )
274317 self ._conn .execute ("PRAGMA cache_size=-8000" )
275- self ._conn .execute ("PRAGMA mmap_size=67108864" )
276-
277- self ._ensure_schema (app_version )
278-
279- def _ensure_schema (self , app_version : str ) -> None :
318+ self ._conn .execute ("PRAGMA busy_timeout=5000" )
319+ # Memory-mapped reads race with a concurrent writer extending the file on
320+ # macOS/APFS and can persist a torn page ("database disk image is
321+ # malformed"); keep mmap only where the unified page cache makes it safe.
322+ self ._conn .execute (f"PRAGMA mmap_size={ 0 if sys .platform == 'darwin' else 67108864 } " )
323+ self ._ensure_schema ()
324+
325+ def _purge_db_files (self ) -> None :
326+ # Best-effort: on Windows another process (a second editor window) may hold
327+ # cache.db open, so unlink can raise PermissionError. Failing to delete must
328+ # not abort recovery - _rebuild falls back to an in-memory cache.
329+ for suffix in ("" , "-wal" , "-shm" ):
330+ try :
331+ (self .cache_dir / f"cache.db{ suffix } " ).unlink (missing_ok = True )
332+ except OSError :
333+ pass
334+
335+ def _rebuild (self ) -> None :
336+ """Discard a corrupt database and reopen an empty one.
337+
338+ If the corrupt file cannot be removed or reopened (e.g. another process holds
339+ it open), fall back to a transient in-memory database so the cache stays usable
340+ for this session instead of taking down the language server.
341+ """
342+ self ._logger .warning (lambda : f"Cache database { self .db_path } is corrupt, rebuilding it from scratch." )
343+ conn = getattr (self , "_conn" , None )
344+ if conn is not None :
345+ try :
346+ conn .close ()
347+ except sqlite3 .Error :
348+ pass
349+ self ._purge_db_files ()
350+ try :
351+ self ._open ()
352+ except sqlite3 .DatabaseError as e :
353+ if not _is_corruption (e ):
354+ raise
355+ self ._logger .warning (
356+ lambda : f"Could not rebuild cache database { self .db_path } ; using a temporary in-memory cache."
357+ )
358+ self ._open (in_memory = True )
359+
360+ def _run (self , operation : Callable [[], _R ]) -> _R :
361+ """Run a DB operation under the connection lock, rebuilding the cache once if it is corrupt."""
362+ with self ._lock :
363+ try :
364+ return operation ()
365+ except sqlite3 .DatabaseError as e :
366+ if not _is_corruption (e ):
367+ raise
368+ self ._rebuild ()
369+ return operation ()
370+
371+ def _ensure_schema (self ) -> None :
280372 self ._conn .execute ("CREATE TABLE IF NOT EXISTS _meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL)" )
281373
282374 row = self ._conn .execute ("SELECT value FROM _meta WHERE key = 'app_version'" ).fetchone ()
283375 stored_version = row [0 ] if row else None
284376
285- if stored_version != app_version :
377+ if stored_version != self . _app_version :
286378 for table in _TABLE_NAMES :
287379 self ._conn .execute (f"DROP TABLE IF EXISTS { table } " )
288- self ._conn .execute ("INSERT OR REPLACE INTO _meta (key, value) VALUES ('app_version', ?)" , (app_version ,))
380+ self ._conn .execute (
381+ "INSERT OR REPLACE INTO _meta (key, value) VALUES ('app_version', ?)" , (self ._app_version ,)
382+ )
289383
290384 for table in _TABLE_NAMES :
291385 self ._conn .execute (
@@ -305,15 +399,25 @@ def read_entry(
305399 meta_type : Union [Type [_M ], Tuple [Type [_M ], ...]],
306400 data_type : Union [Type [_D ], Tuple [Type [_D ], ...]],
307401 ) -> Optional [CacheEntry [_M , _D ]]:
308- row = self ._conn .execute (
309- f"SELECT meta FROM { section .value } WHERE entry_name = ?" ,
310- (entry_name ,),
311- ).fetchone ()
402+ row = self ._run (
403+ lambda : self ._conn .execute (
404+ f"SELECT meta FROM { section .value } WHERE entry_name = ?" ,
405+ (entry_name ,),
406+ ).fetchone ()
407+ )
312408
313409 if row is None :
314410 return None
315411
316- return CacheEntry (self ._conn , section , entry_name , row [0 ], meta_type , data_type )
412+ return CacheEntry (self , section , entry_name , row [0 ], meta_type , data_type )
413+
414+ def _fetch_data (self , section : CacheSection , entry_name : str ) -> Optional [Any ]:
415+ return self ._run (
416+ lambda : self ._conn .execute (
417+ f"SELECT data FROM { section .value } WHERE entry_name = ?" ,
418+ (entry_name ,),
419+ ).fetchone ()
420+ )
317421
318422 def save_entry (
319423 self ,
@@ -324,37 +428,44 @@ def save_entry(
324428 ) -> None :
325429 meta_blob = pickle .dumps (meta , protocol = pickle .HIGHEST_PROTOCOL ) if meta is not None else None
326430 data_blob = pickle .dumps (data , protocol = pickle .HIGHEST_PROTOCOL )
327- self ._conn .execute (
328- f"INSERT INTO { section .value } (entry_name, meta, data)"
329- f" VALUES (?, ?, ?)"
330- f" ON CONFLICT(entry_name) DO UPDATE SET"
331- f" meta = excluded.meta, data = excluded.data, modified_at = CURRENT_TIMESTAMP" ,
332- (entry_name , meta_blob , data_blob ),
333- )
334- self ._conn .commit ()
431+
432+ def op () -> None :
433+ self ._conn .execute (
434+ f"INSERT INTO { section .value } (entry_name, meta, data)"
435+ f" VALUES (?, ?, ?)"
436+ f" ON CONFLICT(entry_name) DO UPDATE SET"
437+ f" meta = excluded.meta, data = excluded.data, modified_at = CURRENT_TIMESTAMP" ,
438+ (entry_name , meta_blob , data_blob ),
439+ )
440+ self ._conn .commit ()
441+
442+ self ._run (op )
335443
336444 def close (self ) -> None :
337- self ._conn .close ()
338- _release_lock (self ._lock_fd )
339- self ._lock_fd = None
445+ with self ._lock :
446+ self ._conn .close ()
447+ fd , self ._lock_fd = self ._lock_fd , None
448+ _release_lock (fd )
340449
341450 @property
342451 def db_path (self ) -> Path :
343452 return self .cache_dir / "cache.db"
344453
345454 @property
346455 def app_version (self ) -> Optional [str ]:
347- row = self ._conn .execute ("SELECT value FROM _meta WHERE key = 'app_version'" ).fetchone ()
456+ row = self ._run ( lambda : self . _conn .execute ("SELECT value FROM _meta WHERE key = 'app_version'" ).fetchone () )
348457 return row [0 ] if row else None
349458
350459 def get_section_stats (self , section : CacheSection ) -> "SectionStats" :
351- row = self ._conn .execute (
352- f"SELECT COUNT(*),"
353- f" COALESCE(SUM(LENGTH(meta) + LENGTH(data)), 0),"
354- f" MIN(created_at),"
355- f" MAX(modified_at)"
356- f" FROM { section .value } " ,
357- ).fetchone ()
460+ row = self ._run (
461+ lambda : self ._conn .execute (
462+ f"SELECT COUNT(*),"
463+ f" COALESCE(SUM(LENGTH(meta) + LENGTH(data)), 0),"
464+ f" MIN(created_at),"
465+ f" MAX(modified_at)"
466+ f" FROM { section .value } " ,
467+ ).fetchone ()
468+ )
358469 assert row is not None
359470 return SectionStats (
360471 section = section ,
@@ -365,12 +476,14 @@ def get_section_stats(self, section: CacheSection) -> "SectionStats":
365476 )
366477
367478 def list_entries (self , section : CacheSection ) -> List ["EntryInfo" ]:
368- rows = self ._conn .execute (
369- f"SELECT entry_name, created_at, modified_at,"
370- f" LENGTH(meta), LENGTH(data)"
371- f" FROM { section .value } "
372- f" ORDER BY entry_name" ,
373- ).fetchall ()
479+ rows = self ._run (
480+ lambda : self ._conn .execute (
481+ f"SELECT entry_name, created_at, modified_at,"
482+ f" LENGTH(meta), LENGTH(data)"
483+ f" FROM { section .value } "
484+ f" ORDER BY entry_name" ,
485+ ).fetchall ()
486+ )
374487 return [
375488 EntryInfo (
376489 entry_name = r [0 ],
@@ -383,17 +496,23 @@ def list_entries(self, section: CacheSection) -> List["EntryInfo"]:
383496 ]
384497
385498 def clear_section (self , section : CacheSection ) -> int :
386- cursor = self ._conn .execute (f"DELETE FROM { section .value } " )
387- self ._conn .commit ()
388- return cursor .rowcount
499+ def op () -> int :
500+ cursor = self ._conn .execute (f"DELETE FROM { section .value } " )
501+ self ._conn .commit ()
502+ return cursor .rowcount
503+
504+ return self ._run (op )
389505
390506 def clear_all (self ) -> int :
391- total = 0
392- for table in _TABLE_NAMES :
393- cursor = self ._conn .execute (f"DELETE FROM { table } " )
394- total += cursor .rowcount
395- self ._conn .commit ()
396- return total
507+ def op () -> int :
508+ total = 0
509+ for table in _TABLE_NAMES :
510+ cursor = self ._conn .execute (f"DELETE FROM { table } " )
511+ total += cursor .rowcount
512+ self ._conn .commit ()
513+ return total
514+
515+ return self ._run (op )
397516
398517
399518@dataclass
0 commit comments