Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/postgres_listener.py: 11%

92 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4from typing import TYPE_CHECKING, AsyncGenerator 1a

5 

6import asyncpg # type: ignore 1a

7from pydantic import SecretStr 1a

8from sqlalchemy.engine.url import make_url 1a

9 

10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a

11 from asyncpg import Connection 

12 

13from prefect.logging import get_logger 1a

14from prefect.settings import get_current_settings 1a

15 

16_logger = get_logger(__name__) 1a

17 

18 

19async def get_pg_notify_connection() -> Connection | None: 1a

20 """ 

21 Establishes and returns a raw asyncpg connection for LISTEN/NOTIFY. 

22 Returns None if not a PostgreSQL connection URL. 

23 """ 

24 db_url_str = get_current_settings().server.database.connection_url 

25 if isinstance(db_url_str, SecretStr): 

26 db_url_str = db_url_str.get_secret_value() 

27 

28 if not db_url_str: 

29 _logger.debug( 

30 "Cannot create Postgres LISTEN connection: PREFECT_API_DATABASE_CONNECTION_URL is not set." 

31 ) 

32 return None 

33 

34 try: 

35 db_url = make_url(db_url_str) 

36 except Exception as e: 

37 _logger.error(f"Invalid PREFECT_API_DATABASE_CONNECTION_URL: {e}") 

38 return None 

39 

40 if db_url.drivername.split("+")[0] not in ("postgresql", "postgres"): 

41 _logger.debug( 

42 "Cannot create Postgres LISTEN connection: PREFECT_API_DATABASE_CONNECTION_URL " 

43 f"is not a PostgreSQL connection URL (driver: {db_url.drivername})." 

44 ) 

45 return None 

46 

47 # Construct a new DSN for asyncpg, omitting the dialect part like '+asyncpg' 

48 # and ensuring essential components are present. 

49 asyncpg_dsn = db_url.set( 

50 drivername="postgresql" 

51 ) # Ensure drivername is plain postgresql 

52 

53 # asyncpg.connect can take individual params or a DSN string. 

54 # We'll pass params directly from the parsed URL if they exist to be explicit. 

55 # Build connection arguments, ensuring proper types 

56 connect_args = {} 

57 if asyncpg_dsn.host: 

58 connect_args["host"] = asyncpg_dsn.host 

59 if asyncpg_dsn.port: 

60 connect_args["port"] = asyncpg_dsn.port 

61 if asyncpg_dsn.username: 

62 connect_args["user"] = asyncpg_dsn.username 

63 if asyncpg_dsn.password: 

64 connect_args["password"] = asyncpg_dsn.password 

65 if asyncpg_dsn.database: 

66 connect_args["database"] = asyncpg_dsn.database 

67 

68 # Include application_name if configured 

69 settings = get_current_settings() 

70 app_name = settings.server.database.sqlalchemy.connect_args.application_name 

71 if app_name: 

72 connect_args["server_settings"] = {"application_name": app_name} 

73 

74 try: 

75 # Note: For production, connection parameters (timeouts, etc.) might need tuning. 

76 # This connection is outside SQLAlchemy's pool and needs its own lifecycle management. 

77 conn = await asyncpg.connect(**connect_args) 

78 _logger.info( 

79 f"Successfully established raw asyncpg connection for LISTEN/NOTIFY to " 

80 f"{asyncpg_dsn.host}:{asyncpg_dsn.port}/{asyncpg_dsn.database}" 

81 ) 

82 return conn 

83 except Exception as e: 

84 _logger.error( 

85 f"Failed to establish raw asyncpg connection for LISTEN/NOTIFY: {e}", 

86 exc_info=True, 

87 ) 

88 return None 

89 

90 

91async def pg_listen( 1a

92 connection: Connection, channel_name: str, heartbeat_interval: float = 5.0 

93) -> AsyncGenerator[str, None]: 

94 """ 

95 Listens to a specific Postgres channel and yields payloads. 

96 Manages adding and removing the listener on the given connection. 

97 """ 

98 listen_queue: asyncio.Queue[str] = asyncio.Queue() 

99 

100 # asyncpg expects a regular function for the callback, not an async one directly. 

101 # This callback will be run in asyncpg's event loop / thread context. 

102 def queue_notifications_callback( 

103 conn_unused: Connection, pid: int, chan: str, payload: str 

104 ): 

105 try: 

106 listen_queue.put_nowait(payload) 

107 except asyncio.QueueFull: 

108 _logger.warning( 

109 f"Postgres listener queue full for channel {channel_name}. Notification may be lost." 

110 ) 

111 

112 try: 

113 # Add the listener that uses the queue 

114 await connection.add_listener(channel_name, queue_notifications_callback) 

115 _logger.info(f"Listening on Postgres channel: {channel_name}") 

116 

117 while True: 

118 try: 

119 # Wait for a notification with a timeout to allow checking if connection is still alive 

120 payload: str = await asyncio.wait_for( 

121 listen_queue.get(), timeout=heartbeat_interval 

122 ) 

123 yield payload 

124 listen_queue.task_done() # Acknowledge processing if using Queue for tracking 

125 except asyncio.TimeoutError: 

126 if connection.is_closed(): 

127 _logger.info( 

128 f"Postgres connection closed while listening on {channel_name}." 

129 ) 

130 break 

131 continue # Continue listening 

132 except ( 

133 Exception 

134 ) as e: # Catch broader exceptions during listen_queue.get() or yield 

135 _logger.error( 

136 f"Error during notification processing on {channel_name}: {e}", 

137 exc_info=True, 

138 ) 

139 # Depending on the error, you might want to break or continue 

140 if isinstance( 

141 e, (GeneratorExit, asyncio.CancelledError) 

142 ): # Graceful shutdown 

143 raise 

144 if isinstance( 

145 e, (asyncpg.exceptions.PostgresConnectionError, OSError) 

146 ): # Connection critical 

147 _logger.error( 

148 f"Connection error on {channel_name}. Listener stopping." 

149 ) 

150 break 

151 await asyncio.sleep(1) # Prevent tight loop on other continuous errors 

152 

153 except ( 

154 asyncpg.exceptions.PostgresConnectionError, 

155 OSError, 

156 ) as e: # Errors during setup 

157 _logger.error( 

158 f"Connection error setting up listener for {channel_name}: {e}", 

159 exc_info=True, 

160 ) 

161 raise 

162 except (GeneratorExit, asyncio.CancelledError): # Handle task cancellation 

163 _logger.info(f"Listener for {channel_name} cancelled.") 

164 raise 

165 except Exception as e: # Catch-all for unexpected errors during setup 

166 _logger.error( 

167 f"Unexpected error setting up or during listen on {channel_name}: {e}", 

168 exc_info=True, 

169 ) 

170 raise 

171 finally: 

172 if not connection.is_closed(): 

173 try: 

174 await connection.remove_listener( 

175 channel_name, queue_notifications_callback 

176 ) 

177 _logger.info(f"Removed listener from Postgres channel: {channel_name}") 

178 except Exception as e: 

179 _logger.error( 

180 f"Error removing listener for {channel_name}: {e}", exc_info=True 

181 )