Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/postgres_listener.py: 11%
92 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4from typing import TYPE_CHECKING, AsyncGenerator 1a
6import asyncpg # type: ignore 1a
7from pydantic import SecretStr 1a
8from sqlalchemy.engine.url import make_url 1a
10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a
11 from asyncpg import Connection
13from prefect.logging import get_logger 1a
14from prefect.settings import get_current_settings 1a
16_logger = get_logger(__name__) 1a
19async def get_pg_notify_connection() -> Connection | None: 1a
20 """
21 Establishes and returns a raw asyncpg connection for LISTEN/NOTIFY.
22 Returns None if not a PostgreSQL connection URL.
23 """
24 db_url_str = get_current_settings().server.database.connection_url
25 if isinstance(db_url_str, SecretStr):
26 db_url_str = db_url_str.get_secret_value()
28 if not db_url_str:
29 _logger.debug(
30 "Cannot create Postgres LISTEN connection: PREFECT_API_DATABASE_CONNECTION_URL is not set."
31 )
32 return None
34 try:
35 db_url = make_url(db_url_str)
36 except Exception as e:
37 _logger.error(f"Invalid PREFECT_API_DATABASE_CONNECTION_URL: {e}")
38 return None
40 if db_url.drivername.split("+")[0] not in ("postgresql", "postgres"):
41 _logger.debug(
42 "Cannot create Postgres LISTEN connection: PREFECT_API_DATABASE_CONNECTION_URL "
43 f"is not a PostgreSQL connection URL (driver: {db_url.drivername})."
44 )
45 return None
47 # Construct a new DSN for asyncpg, omitting the dialect part like '+asyncpg'
48 # and ensuring essential components are present.
49 asyncpg_dsn = db_url.set(
50 drivername="postgresql"
51 ) # Ensure drivername is plain postgresql
53 # asyncpg.connect can take individual params or a DSN string.
54 # We'll pass params directly from the parsed URL if they exist to be explicit.
55 # Build connection arguments, ensuring proper types
56 connect_args = {}
57 if asyncpg_dsn.host:
58 connect_args["host"] = asyncpg_dsn.host
59 if asyncpg_dsn.port:
60 connect_args["port"] = asyncpg_dsn.port
61 if asyncpg_dsn.username:
62 connect_args["user"] = asyncpg_dsn.username
63 if asyncpg_dsn.password:
64 connect_args["password"] = asyncpg_dsn.password
65 if asyncpg_dsn.database:
66 connect_args["database"] = asyncpg_dsn.database
68 # Include application_name if configured
69 settings = get_current_settings()
70 app_name = settings.server.database.sqlalchemy.connect_args.application_name
71 if app_name:
72 connect_args["server_settings"] = {"application_name": app_name}
74 try:
75 # Note: For production, connection parameters (timeouts, etc.) might need tuning.
76 # This connection is outside SQLAlchemy's pool and needs its own lifecycle management.
77 conn = await asyncpg.connect(**connect_args)
78 _logger.info(
79 f"Successfully established raw asyncpg connection for LISTEN/NOTIFY to "
80 f"{asyncpg_dsn.host}:{asyncpg_dsn.port}/{asyncpg_dsn.database}"
81 )
82 return conn
83 except Exception as e:
84 _logger.error(
85 f"Failed to establish raw asyncpg connection for LISTEN/NOTIFY: {e}",
86 exc_info=True,
87 )
88 return None
91async def pg_listen( 1a
92 connection: Connection, channel_name: str, heartbeat_interval: float = 5.0
93) -> AsyncGenerator[str, None]:
94 """
95 Listens to a specific Postgres channel and yields payloads.
96 Manages adding and removing the listener on the given connection.
97 """
98 listen_queue: asyncio.Queue[str] = asyncio.Queue()
100 # asyncpg expects a regular function for the callback, not an async one directly.
101 # This callback will be run in asyncpg's event loop / thread context.
102 def queue_notifications_callback(
103 conn_unused: Connection, pid: int, chan: str, payload: str
104 ):
105 try:
106 listen_queue.put_nowait(payload)
107 except asyncio.QueueFull:
108 _logger.warning(
109 f"Postgres listener queue full for channel {channel_name}. Notification may be lost."
110 )
112 try:
113 # Add the listener that uses the queue
114 await connection.add_listener(channel_name, queue_notifications_callback)
115 _logger.info(f"Listening on Postgres channel: {channel_name}")
117 while True:
118 try:
119 # Wait for a notification with a timeout to allow checking if connection is still alive
120 payload: str = await asyncio.wait_for(
121 listen_queue.get(), timeout=heartbeat_interval
122 )
123 yield payload
124 listen_queue.task_done() # Acknowledge processing if using Queue for tracking
125 except asyncio.TimeoutError:
126 if connection.is_closed():
127 _logger.info(
128 f"Postgres connection closed while listening on {channel_name}."
129 )
130 break
131 continue # Continue listening
132 except (
133 Exception
134 ) as e: # Catch broader exceptions during listen_queue.get() or yield
135 _logger.error(
136 f"Error during notification processing on {channel_name}: {e}",
137 exc_info=True,
138 )
139 # Depending on the error, you might want to break or continue
140 if isinstance(
141 e, (GeneratorExit, asyncio.CancelledError)
142 ): # Graceful shutdown
143 raise
144 if isinstance(
145 e, (asyncpg.exceptions.PostgresConnectionError, OSError)
146 ): # Connection critical
147 _logger.error(
148 f"Connection error on {channel_name}. Listener stopping."
149 )
150 break
151 await asyncio.sleep(1) # Prevent tight loop on other continuous errors
153 except (
154 asyncpg.exceptions.PostgresConnectionError,
155 OSError,
156 ) as e: # Errors during setup
157 _logger.error(
158 f"Connection error setting up listener for {channel_name}: {e}",
159 exc_info=True,
160 )
161 raise
162 except (GeneratorExit, asyncio.CancelledError): # Handle task cancellation
163 _logger.info(f"Listener for {channel_name} cancelled.")
164 raise
165 except Exception as e: # Catch-all for unexpected errors during setup
166 _logger.error(
167 f"Unexpected error setting up or during listen on {channel_name}: {e}",
168 exc_info=True,
169 )
170 raise
171 finally:
172 if not connection.is_closed():
173 try:
174 await connection.remove_listener(
175 channel_name, queue_notifications_callback
176 )
177 _logger.info(f"Removed listener from Postgres channel: {channel_name}")
178 except Exception as e:
179 _logger.error(
180 f"Error removing listener for {channel_name}: {e}", exc_info=True
181 )