Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flow_runs.py: 25%

191 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Functions for interacting with flow run ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6import contextlib 1a

7import datetime 1a

8from itertools import chain 1a

9from typing import ( 1a

10 TYPE_CHECKING, 

11 Any, 

12 Dict, 

13 List, 

14 Optional, 

15 Sequence, 

16 Tuple, 

17 Type, 

18 TypeVar, 

19 Union, 

20 cast, 

21) 

22from uuid import UUID 1a

23 

24import sqlalchemy as sa 1a

25from sqlalchemy import delete, select 1a

26from sqlalchemy.ext.asyncio import AsyncSession 1a

27from sqlalchemy.orm import load_only, selectinload 1a

28from sqlalchemy.sql import Select 1a

29 

30import prefect.server.models as models 1a

31import prefect.server.schemas as schemas 1a

32from prefect.logging.loggers import get_logger 1a

33from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

34from prefect.server.exceptions import ObjectNotFoundError 1a

35from prefect.server.orchestration.core_policy import MinimalFlowPolicy 1a

36from prefect.server.orchestration.global_policy import GlobalFlowPolicy 1a

37from prefect.server.orchestration.policies import ( 1a

38 FlowRunOrchestrationPolicy, 

39) 

40from prefect.server.orchestration.rules import FlowOrchestrationContext 1a

41from prefect.server.schemas.core import TaskRunResult 1a

42from prefect.server.schemas.graph import Graph 1a

43from prefect.server.schemas.responses import OrchestrationResult 1a

44from prefect.server.schemas.states import State 1a

45from prefect.server.utilities.schemas import PrefectBaseModel 1a

46from prefect.settings import ( 1a

47 PREFECT_API_MAX_FLOW_RUN_GRAPH_ARTIFACTS, 

48 PREFECT_API_MAX_FLOW_RUN_GRAPH_NODES, 

49) 

50from prefect.types import KeyValueLabels 1a

51from prefect.types._datetime import DateTime, earliest_possible_datetime, now 1a

52 

53if TYPE_CHECKING: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true1a

54 import logging 

55 

56logger: "logging.Logger" = get_logger("flow_runs") 1a

57 

58 

59T = TypeVar("T", bound=tuple[Any, ...]) 1a

60 

61 

62@db_injector 1a

63async def create_flow_run( 1a

64 db: PrefectDBInterface, 

65 session: AsyncSession, 

66 flow_run: schemas.core.FlowRun, 

67 orchestration_parameters: Optional[dict[str, Any]] = None, 

68) -> orm_models.FlowRun: 

69 """Creates a new flow run. 

70 

71 If the provided flow run has a state attached, it will also be created. 

72 

73 Args: 

74 session: a database session 

75 flow_run: a flow run model 

76 

77 Returns: 

78 orm_models.FlowRun: the newly-created flow run 

79 """ 

80 right_now = now("UTC") 

81 # model: Union[orm_models.FlowRun, None] = None 

82 

83 flow_run.labels = await with_system_labels_for_flow_run( 

84 session=session, flow_run=flow_run 

85 ) 

86 

87 flow_run_dict = dict( 

88 **flow_run.model_dump_for_orm( 

89 exclude={ 

90 "created", 

91 "state", 

92 "estimated_run_time", 

93 "estimated_start_time_delta", 

94 }, 

95 exclude_unset=True, 

96 ), 

97 created=right_now, 

98 ) 

99 

100 # if no idempotency key was provided, create the run directly 

101 if not flow_run.idempotency_key: 

102 model = db.FlowRun(**flow_run_dict) 

103 session.add(model) 

104 await session.flush() 

105 

106 # otherwise let the database take care of enforcing idempotency 

107 else: 

108 insert_stmt = ( 

109 db.queries.insert(db.FlowRun) 

110 .values(**flow_run_dict) 

111 .on_conflict_do_nothing( 

112 index_elements=db.orm.flow_run_unique_upsert_columns, 

113 ) 

114 ) 

115 await session.execute(insert_stmt) 

116 

117 # read the run to see if idempotency was applied or not 

118 query = ( 

119 sa.select(db.FlowRun) 

120 .where( 

121 sa.and_( 

122 db.FlowRun.flow_id == flow_run.flow_id, 

123 db.FlowRun.idempotency_key == flow_run.idempotency_key, 

124 ) 

125 ) 

126 .limit(1) 

127 .execution_options(populate_existing=True) 

128 .options( 

129 selectinload(db.FlowRun.work_queue).selectinload(db.WorkQueue.work_pool) 

130 ) 

131 ) 

132 result = await session.execute(query) 

133 model = result.scalar_one() 

134 

135 # if the flow run was created in this function call then we need to set the 

136 # state. If it was created idempotently, the created time won't match. 

137 if model.created == right_now and flow_run.state: 

138 await models.flow_runs.set_flow_run_state( 

139 session=session, 

140 flow_run_id=model.id, 

141 state=flow_run.state, 

142 force=True, 

143 orchestration_parameters=orchestration_parameters, 

144 ) 

145 return model 

146 

147 

148@db_injector 1a

149async def update_flow_run( 1a

150 db: PrefectDBInterface, 

151 session: AsyncSession, 

152 flow_run_id: UUID, 

153 flow_run: schemas.actions.FlowRunUpdate, 

154) -> bool: 

155 """ 

156 Updates a flow run. 

157 

158 Args: 

159 session: a database session 

160 flow_run_id: the flow run id to update 

161 flow_run: a flow run model 

162 

163 Returns: 

164 bool: whether or not matching rows were found to update 

165 """ 

166 update_stmt = ( 

167 sa.update(db.FlowRun) 

168 .where(db.FlowRun.id == flow_run_id) 

169 # exclude_unset=True allows us to only update values provided by 

170 # the user, ignoring any defaults on the model 

171 .values(**flow_run.model_dump_for_orm(exclude_unset=True)) 

172 ) 

173 result = await session.execute(update_stmt) 

174 return result.rowcount > 0 

175 

176 

177@db_injector 1a

178async def read_flow_run( 1a

179 db: PrefectDBInterface, 

180 session: AsyncSession, 

181 flow_run_id: UUID, 

182 for_update: bool = False, 

183) -> Optional[orm_models.FlowRun]: 

184 """ 

185 Reads a flow run by id. 

186 

187 Args: 

188 session: A database session 

189 flow_run_id: a flow run id 

190 

191 Returns: 

192 orm_models.FlowRun: the flow run 

193 """ 

194 select = ( 

195 sa.select(db.FlowRun) 

196 .where(db.FlowRun.id == flow_run_id) 

197 .options( 

198 selectinload(db.FlowRun.work_queue).selectinload(db.WorkQueue.work_pool) 

199 ) 

200 ) 

201 

202 if for_update: 

203 select = select.with_for_update() 

204 

205 result = await session.execute(select) 

206 return result.scalar() 

207 

208 

209async def _apply_flow_run_filters( 1a

210 db: PrefectDBInterface, 

211 query: Select[T], 

212 flow_filter: Optional[schemas.filters.FlowFilter] = None, 

213 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, 

214 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, 

215 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, 

216 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, 

217 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, 

218) -> Select[T]: 

219 """ 

220 Applies filters to a flow run query as a combination of EXISTS subqueries. 

221 """ 

222 

223 if flow_run_filter: 

224 query = query.where(flow_run_filter.as_sql_filter()) 

225 

226 if deployment_filter: 

227 deployment_exists_clause = select(db.Deployment).where( 

228 db.Deployment.id == db.FlowRun.deployment_id, 

229 deployment_filter.as_sql_filter(), 

230 ) 

231 query = query.where(deployment_exists_clause.exists()) 

232 

233 if work_pool_filter: 

234 work_pool_exists_clause = select(db.WorkPool).where( 

235 db.WorkQueue.id == db.FlowRun.work_queue_id, 

236 db.WorkPool.id == db.WorkQueue.work_pool_id, 

237 work_pool_filter.as_sql_filter(), 

238 ) 

239 

240 query = query.where(work_pool_exists_clause.exists()) 

241 

242 if work_queue_filter: 

243 work_queue_exists_clause = select(db.WorkQueue).where( 

244 db.WorkQueue.id == db.FlowRun.work_queue_id, 

245 work_queue_filter.as_sql_filter(), 

246 ) 

247 query = query.where(work_queue_exists_clause.exists()) 

248 

249 if flow_filter or task_run_filter: 

250 flow_or_task_run_exists_clause: Union[ 

251 Select[Tuple[db.Flow]], 

252 Select[Tuple[db.TaskRun]], 

253 ] 

254 

255 if flow_filter: 

256 flow_or_task_run_exists_clause = select(db.Flow).where( 

257 db.Flow.id == db.FlowRun.flow_id, 

258 flow_filter.as_sql_filter(), 

259 ) 

260 

261 if task_run_filter: 

262 if not flow_filter: 

263 flow_or_task_run_exists_clause = select(db.TaskRun).where( 

264 db.TaskRun.flow_run_id == db.FlowRun.id 

265 ) 

266 else: 

267 flow_or_task_run_exists_clause = flow_or_task_run_exists_clause.join( 

268 db.TaskRun, 

269 db.TaskRun.flow_run_id == db.FlowRun.id, 

270 ) 

271 flow_or_task_run_exists_clause = flow_or_task_run_exists_clause.where( 

272 db.FlowRun.id == db.TaskRun.flow_run_id, 

273 task_run_filter.as_sql_filter(), 

274 ) 

275 

276 query = query.where(flow_or_task_run_exists_clause.exists()) 

277 

278 return query 

279 

280 

281@db_injector 1a

282async def read_flow_runs( 1a

283 db: PrefectDBInterface, 

284 session: AsyncSession, 

285 columns: Optional[list[str]] = None, 

286 flow_filter: Optional[schemas.filters.FlowFilter] = None, 

287 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, 

288 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, 

289 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, 

290 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, 

291 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, 

292 offset: Optional[int] = None, 

293 limit: Optional[int] = None, 

294 sort: schemas.sorting.FlowRunSort = schemas.sorting.FlowRunSort.ID_DESC, 

295) -> Sequence[orm_models.FlowRun]: 

296 """ 

297 Read flow runs. 

298 

299 Args: 

300 session: a database session 

301 columns: a list of the flow run ORM columns to load, for performance 

302 flow_filter: only select flow runs whose flows match these filters 

303 flow_run_filter: only select flow runs match these filters 

304 task_run_filter: only select flow runs whose task runs match these filters 

305 deployment_filter: only select flow runs whose deployments match these filters 

306 offset: Query offset 

307 limit: Query limit 

308 sort: Query sort 

309 

310 Returns: 

311 List[orm_models.FlowRun]: flow runs 

312 """ 

313 query = ( 

314 select(db.FlowRun) 

315 .order_by(*sort.as_sql_sort()) 

316 .options( 

317 selectinload(db.FlowRun.work_queue).selectinload(db.WorkQueue.work_pool) 

318 ) 

319 ) 

320 

321 if columns: 

322 query = query.options(load_only(*columns)) 

323 

324 query = await _apply_flow_run_filters( 

325 db, 

326 query, 

327 flow_filter=flow_filter, 

328 flow_run_filter=flow_run_filter, 

329 task_run_filter=task_run_filter, 

330 deployment_filter=deployment_filter, 

331 work_pool_filter=work_pool_filter, 

332 work_queue_filter=work_queue_filter, 

333 ) 

334 

335 if offset is not None: 

336 query = query.offset(offset) 

337 

338 if limit is not None: 

339 query = query.limit(limit) 

340 

341 result = await session.execute(query) 

342 return result.scalars().unique().all() 

343 

344 

345async def cleanup_flow_run_concurrency_slots( 1a

346 session: AsyncSession, 

347 flow_run: orm_models.FlowRun, 

348) -> None: 

349 """ 

350 Cleanup flow run related resources, such as releasing concurrency slots. 

351 All operations should be idempotent and safe to call multiple times. 

352 IMPORTANT: This run may no longer exist in the database when this operation occurs. 

353 """ 

354 

355 if ( 

356 flow_run.deployment_id 

357 and flow_run.state 

358 and flow_run.state.type 

359 in ( 

360 schemas.states.StateType.PENDING, 

361 schemas.states.StateType.RUNNING, 

362 schemas.states.StateType.CANCELLING, 

363 ) 

364 ): 

365 deployment = await models.deployments.read_deployment( 

366 session, flow_run.deployment_id 

367 ) 

368 if deployment and deployment.concurrency_limit_id: 

369 await models.concurrency_limits_v2.bulk_decrement_active_slots( 

370 session, [deployment.concurrency_limit_id], 1 

371 ) 

372 

373 

374class DependencyResult(PrefectBaseModel): 1a

375 id: UUID 1a

376 name: str 1a

377 upstream_dependencies: List[TaskRunResult] 1a

378 state: Optional[State] 1a

379 expected_start_time: Optional[datetime.datetime] 1a

380 start_time: Optional[datetime.datetime] 1a

381 end_time: Optional[datetime.datetime] 1a

382 total_run_time: Optional[datetime.timedelta] 1a

383 estimated_run_time: Optional[datetime.timedelta] 1a

384 untrackable_result: bool 1a

385 

386 

387async def read_task_run_dependencies( 1a

388 session: AsyncSession, 

389 flow_run_id: UUID, 

390) -> List[DependencyResult]: 

391 """ 

392 Get a task run dependency map for a given flow run. 

393 """ 

394 flow_run = await models.flow_runs.read_flow_run( 

395 session=session, flow_run_id=flow_run_id 

396 ) 

397 if not flow_run: 

398 raise ObjectNotFoundError(f"Flow run with id {flow_run_id} not found") 

399 

400 task_runs = await models.task_runs.read_task_runs( 

401 session=session, 

402 flow_run_filter=schemas.filters.FlowRunFilter( 

403 id=schemas.filters.FlowRunFilterId(any_=[flow_run_id]) 

404 ), 

405 ) 

406 

407 dependency_graph = [] 

408 

409 for task_run in task_runs: 

410 inputs = list(set(chain(*task_run.task_inputs.values()))) 

411 untrackable_result_status = ( 

412 False 

413 if task_run.state is None 

414 else task_run.state.state_details.untrackable_result 

415 ) 

416 dependency_graph.append( 

417 DependencyResult( 

418 id=task_run.id, 

419 upstream_dependencies=inputs, 

420 state=task_run.state, 

421 expected_start_time=task_run.expected_start_time, 

422 name=task_run.name, 

423 start_time=task_run.start_time, 

424 end_time=task_run.end_time, 

425 total_run_time=task_run.total_run_time, 

426 estimated_run_time=task_run.estimated_run_time, 

427 untrackable_result=untrackable_result_status, 

428 ) 

429 ) 

430 

431 return dependency_graph 

432 

433 

434@db_injector 1a

435async def count_flow_runs( 1a

436 db: PrefectDBInterface, 

437 session: AsyncSession, 

438 flow_filter: Optional[schemas.filters.FlowFilter] = None, 

439 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, 

440 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, 

441 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, 

442 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, 

443 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, 

444) -> int: 

445 """ 

446 Count flow runs. 

447 

448 Args: 

449 session: a database session 

450 flow_filter: only count flow runs whose flows match these filters 

451 flow_run_filter: only count flow runs that match these filters 

452 task_run_filter: only count flow runs whose task runs match these filters 

453 deployment_filter: only count flow runs whose deployments match these filters 

454 

455 Returns: 

456 int: count of flow runs 

457 """ 

458 

459 query = select(sa.func.count(None)).select_from(db.FlowRun) 

460 

461 query = await _apply_flow_run_filters( 

462 db, 

463 query, 

464 flow_filter=flow_filter, 

465 flow_run_filter=flow_run_filter, 

466 task_run_filter=task_run_filter, 

467 deployment_filter=deployment_filter, 

468 work_pool_filter=work_pool_filter, 

469 work_queue_filter=work_queue_filter, 

470 ) 

471 

472 result = await session.execute(query) 

473 return result.scalar_one() 

474 

475 

476@db_injector 1a

477async def delete_flow_run( 1a

478 db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID 

479) -> bool: 

480 """ 

481 Delete a flow run by flow_run_id, handling concurrency limits if applicable. 

482 

483 Args: 

484 session: A database session 

485 flow_run_id: a flow run id 

486 

487 Returns: 

488 bool: whether or not the flow run was deleted 

489 """ 

490 flow_run = await read_flow_run(session, flow_run_id) 

491 if not flow_run: 

492 return False 

493 

494 deployment_id = flow_run.deployment_id 

495 

496 if deployment_id: 

497 await cleanup_flow_run_concurrency_slots(session=session, flow_run=flow_run) 

498 

499 # Delete the flow run 

500 result = await session.execute( 

501 delete(db.FlowRun).where(db.FlowRun.id == flow_run_id) 

502 ) 

503 

504 return result.rowcount > 0 

505 

506 

507async def set_flow_run_state( 1a

508 session: AsyncSession, 

509 flow_run_id: UUID, 

510 state: schemas.states.State, 

511 force: bool = False, 

512 flow_policy: Optional[Type[FlowRunOrchestrationPolicy]] = None, 

513 orchestration_parameters: Optional[Dict[str, Any]] = None, 

514 client_version: Optional[str] = None, 

515) -> OrchestrationResult: 

516 """ 

517 Creates a new orchestrated flow run state. 

518 

519 Setting a new state on a run is the one of the principal actions that is governed by 

520 Prefect's orchestration logic. Setting a new run state will not guarantee creation, 

521 but instead trigger orchestration rules to govern the proposed `state` input. If 

522 the state is considered valid, it will be written to the database. Otherwise, a 

523 it's possible a different state, or no state, will be created. A `force` flag is 

524 supplied to bypass a subset of orchestration logic. 

525 

526 Args: 

527 session: a database session 

528 flow_run_id: the flow run id 

529 state: a flow run state model 

530 force: if False, orchestration rules will be applied that may alter or prevent 

531 the state transition. If True, orchestration rules are not applied. 

532 

533 Returns: 

534 OrchestrationResult object 

535 """ 

536 

537 # load the flow run 

538 run = await models.flow_runs.read_flow_run( 

539 session=session, 

540 flow_run_id=flow_run_id, 

541 # Lock the row to prevent orchestration race conditions 

542 for_update=True, 

543 ) 

544 

545 if not run: 

546 raise ObjectNotFoundError(f"Flow run with id {flow_run_id} not found") 

547 

548 initial_state = run.state.as_state() if run.state else None 

549 initial_state_type = initial_state.type if initial_state else None 

550 proposed_state_type = state.type if state else None 

551 intended_transition = (initial_state_type, proposed_state_type) 

552 

553 if force or flow_policy is None: 

554 flow_policy = MinimalFlowPolicy 

555 

556 orchestration_rules = flow_policy.compile_transition_rules(*intended_transition) # type: ignore 

557 global_rules = GlobalFlowPolicy.compile_transition_rules(*intended_transition) 

558 

559 context = FlowOrchestrationContext( 

560 session=session, 

561 run=run, 

562 initial_state=initial_state, 

563 proposed_state=state, 

564 client_version=client_version, 

565 ) 

566 

567 if orchestration_parameters is not None: 

568 context.parameters = orchestration_parameters 

569 

570 # apply orchestration rules and create the new flow run state 

571 async with contextlib.AsyncExitStack() as stack: 

572 for rule in orchestration_rules: 

573 context = await stack.enter_async_context( 

574 rule(context, *intended_transition) 

575 ) 

576 

577 for rule in global_rules: 

578 context = await stack.enter_async_context( 

579 rule(context, *intended_transition) 

580 ) 

581 

582 await context.validate_proposed_state() 

583 

584 if context.orchestration_error is not None: 

585 raise context.orchestration_error 

586 

587 result = OrchestrationResult( 

588 state=context.validated_state, 

589 status=context.response_status, 

590 details=context.response_details, 

591 ) 

592 

593 return result 

594 

595 

596@db_injector 1a

597async def read_flow_run_graph( 1a

598 db: PrefectDBInterface, 

599 session: AsyncSession, 

600 flow_run_id: UUID, 

601 since: datetime.datetime = earliest_possible_datetime(), 

602) -> Graph: 

603 """Given a flow run, return the graph of it's task and subflow runs. If a `since` 

604 datetime is provided, only return items that may have changed since that time.""" 

605 if isinstance(since, str): 

606 since = DateTime.fromisoformat(since) 

607 

608 return await db.queries.flow_run_graph_v2( 

609 session=session, 

610 flow_run_id=flow_run_id, 

611 since=since, 

612 max_nodes=PREFECT_API_MAX_FLOW_RUN_GRAPH_NODES.value(), 

613 max_artifacts=PREFECT_API_MAX_FLOW_RUN_GRAPH_ARTIFACTS.value(), 

614 ) 

615 

616 

617async def with_system_labels_for_flow_run( 1a

618 session: AsyncSession, 

619 flow_run: Union[schemas.core.FlowRun, schemas.actions.FlowRunCreate], 

620) -> schemas.core.KeyValueLabels: 

621 """Augment user supplied labels with system default labels for a flow 

622 run.""" 

623 

624 user_supplied_labels = flow_run.labels or {} 

625 

626 # `deployment_id` is deprecated on `schemas.actions.FlowRunCreate`. Only 

627 # check `deployment_id` if given an instance of a `schemas.core.FlowRun`. 

628 if isinstance(flow_run, schemas.core.FlowRun) and flow_run.deployment_id: 

629 deployment = await models.deployments.read_deployment( 

630 session, deployment_id=flow_run.deployment_id 

631 ) 

632 if deployment: 

633 # Use the deployment flow run utility for consistent label generation 

634 return await models.deployments.with_system_labels_for_deployment_flow_run( 

635 session=session, 

636 deployment=deployment, 

637 user_supplied_labels=user_supplied_labels, 

638 ) 

639 

640 # If the flow run is not part of a deployment, generate basic flow labels 

641 default_labels = cast( 

642 schemas.core.KeyValueLabels, 

643 { 

644 "prefect.flow.id": str(flow_run.flow_id), 

645 }, 

646 ) 

647 

648 parent_labels = await models.flows.read_flow_labels(session, flow_run.flow_id) or {} 

649 

650 return parent_labels | default_labels | user_supplied_labels 

651 

652 

653@db_injector 1a

654async def update_flow_run_labels( 1a

655 db: PrefectDBInterface, 

656 session: AsyncSession, 

657 flow_run_id: UUID, 

658 labels: KeyValueLabels, 

659) -> bool: 

660 """ 

661 Update flow run labels by patching existing labels with new values. 

662 Args: 

663 session: A database session 

664 flow_run_id: the flow run id to update 

665 labels: the new labels to patch into existing labels 

666 Returns: 

667 bool: whether the update was successful 

668 """ 

669 # First read the existing flow run to get current labels 

670 flow_run: Optional[orm_models.FlowRun] = await read_flow_run(session, flow_run_id) 

671 if not flow_run: 

672 raise ObjectNotFoundError(f"Flow run with id {flow_run_id} not found") 

673 

674 # Merge existing labels with new labels 

675 current_labels = flow_run.labels or {} 

676 updated_labels = {**current_labels, **labels} 

677 

678 try: 

679 # Update the flow run with merged labels 

680 result = await session.execute( 

681 sa.update(db.FlowRun) 

682 .where(db.FlowRun.id == flow_run_id) 

683 .values(labels=updated_labels) 

684 ) 

685 success = result.rowcount > 0 

686 if success: 

687 await session.commit() # Explicitly commit 

688 return success 

689 except Exception: 

690 raise