feat: add ability to use existing connections and connection pools#54
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for injecting existing database connections and connection pools into the TaskIQ Postgres brokers/result backends/schedule sources, while ensuring that externally-provided resources are not closed during component shutdown.
Changes:
- Added optional parameters for reusing externally-managed connections/pools across asyncpg, psycopg, psqlpy, and aiopg implementations, including ownership tracking to control shutdown behavior.
- Introduced new integration tests validating that shared pools/connections remain usable after shutdown.
- Updated tooling configuration/dependencies (dev dependency additions and build-system version bump).
Reviewed changes
Copilot reviewed 16 out of 17 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
uv.lock |
Locks the newly added dev dependency (ty) and its artifacts. |
pyproject.toml |
Adds ty to dev deps, bumps uv_build, and updates Ruff configuration. |
tests/integration/test_shared_pool.py |
New integration tests to ensure externally-provided pools/connections aren’t closed by shutdown. |
src/taskiq_pg/asyncpg/broker.py |
Adds support for injected write pool/read connection with ownership tracking. |
src/taskiq_pg/asyncpg/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/asyncpg/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/aiopg/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/aiopg/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psqlpy/broker.py |
Adds support for injected write pool/read connection with ownership tracking. |
src/taskiq_pg/psqlpy/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psqlpy/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psycopg/broker.py |
Adds support for injected write pool/read connection with ownership tracking. |
src/taskiq_pg/psycopg/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psycopg/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/_internal/broker.py |
Minor refactor/typing-related updates for the shared broker base. |
src/taskiq_pg/_internal/result_backend.py |
Minor docstring formatting adjustments in the shared result backend base. |
src/taskiq_pg/_internal/schedule_source.py |
Minor typing/import cleanup for the shared schedule source base. |
Comments suppressed due to low confidence (1)
src/taskiq_pg/psqlpy/broker.py:202
shutdown()checksif self._listener is not None:, but_listeneris not initialized in the class or__init__. Ifstartup()fails before_listeneris assigned (orshutdown()is called without a successful startup), this will raiseAttributeError. Consider initializing_listenertoNone(and checking it) or usinggetattr(self, "_listener", None)here.
async def shutdown(self) -> None:
"""Close all connections on shutdown."""
await super().shutdown()
if self._read_conn is not None and self._owns_read_conn:
self._read_conn.close()
if self._write_pool is not None and self._owns_write_pool:
self._write_pool.close()
if self._listener is not None:
self._listener.abort_listen()
await self._listener.shutdown()
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5ba71bc to
b3b5af2
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 20 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
tests/integration/test_shared_pool.py:217
- This test creates
read_connbut never closes it. Since the broker does not own externally provided connections, you need to explicitly closeread_connat the end of the test to avoid leaking DB connections.
# Connection must still be alive — run a query on it
result = await read_conn.fetch("SELECT 1 AS val")
assert result.result()[0]["val"] == 1
| await self._read_conn.notifies().aclose() | ||
| await self._read_conn.close() |
| if not self._read_conn: | ||
| self._read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs) | ||
|
|
||
| if not self._write_pool: | ||
| self._write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs) |
| if self._owns_read_conn: | ||
| self._read_conn = await psqlpy.connect( | ||
| dsn=self.dsn, | ||
| **self.read_kwargs, | ||
| ) |
|
|
||
| broker = AsyncpgBroker(write_pool=pool, read_connection=await asyncpg.connect(dsn=pg_dsn)) | ||
| result_backend = AsyncpgResultBackend( | ||
| dsn=pg_dsn, | ||
| table_name=f"taskiq_results_{uuid.uuid4().hex}", | ||
| pool=pool, | ||
| ) | ||
| schedule_source = AsyncpgScheduleSource( | ||
| broker=broker, | ||
| dsn=pg_dsn, | ||
| table_name=f"taskiq_schedules_{uuid.uuid4().hex}", | ||
| pool=pool, | ||
| ) | ||
|
|
||
| await broker.startup() | ||
| await result_backend.startup() | ||
| await schedule_source.startup() | ||
|
|
||
| await schedule_source.shutdown() | ||
| await result_backend.shutdown() | ||
| await broker.shutdown() | ||
|
|
||
| # Pool must still be alive — execute a simple query on it | ||
| result = await pool.fetchval("SELECT 1") | ||
| assert result == 1 | ||
|
|
||
| await pool.close() | ||
|
|
||
|
|
| read_conn = await AsyncConnection.connect(conninfo=pg_dsn, autocommit=True) | ||
| broker = PsycopgBroker(write_pool=pool, read_connection=read_conn) | ||
| result_backend = PsycopgResultBackend( |
| pool = psqlpy.ConnectionPool(dsn=pg_dsn) | ||
| read_conn = await psqlpy.connect(dsn=pg_dsn) | ||
|
|
||
| broker = PSQLPyBroker(write_pool=pool, read_connection=read_conn) | ||
| result_backend = PSQLPyResultBackend( |
| - The component sets `_owns_pool = False` and uses the pool as-is. | ||
| - `startup()` opens the pool if it is not yet open, but will **not** close it on `shutdown()`. | ||
| - Lifecycle management (opening, closing) is your responsibility. |
Connected to #41