diff --git a/app/chain/workflow.py b/app/chain/workflow.py index ad4b0804..65fad148 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -661,19 +661,40 @@ class WorkflowExecutor: declared_outputs = self.get_action_output_declarations(action) if isinstance(declared_outputs, list): normalized_outputs = {} + missing = object() for item in declared_outputs: key = item.get("name") if isinstance(item, dict) else item - if key and outputs.get(key) not in (None, "", [], {}): - normalized_outputs[key] = outputs.get(key) + if not key: + continue + value = outputs.get(key, missing) + if value is missing and key in result_context.__class__.model_fields: + value = getattr(result_context, key, missing) + if value is not missing and self.should_keep_output_value(action, key, value): + normalized_outputs[key] = value return normalized_outputs or outputs if isinstance(declared_outputs, dict): - return { - key: outputs.get(key) - for key in declared_outputs - if outputs.get(key) not in (None, "", [], {}) - } or outputs + normalized_outputs = {} + missing = object() + for key in declared_outputs: + value = outputs.get(key, missing) + if value is missing and key in result_context.__class__.model_fields: + value = getattr(result_context, key, missing) + if value is not missing and self.should_keep_output_value(action, key, value): + normalized_outputs[key] = value + return normalized_outputs or outputs return outputs + def should_keep_output_value(self, action: Action, key: str, value: Any) -> bool: + """ + 判断输出值是否应参与后续合并。 + """ + if value not in (None, "", [], {}): + return True + output_config = self.get_action_output_config(action, key) + target_key = output_config.get("target") or key + merge_policy = output_config.get("merge") or self.get_default_merge_policy(action, target_key, value) + return merge_policy == "replace" + def record_node_outputs(self, action_id: str, outputs: dict) -> None: """ 记录当前节点输出,供后续条件表达式读取。 @@ -699,11 +720,11 @@ class WorkflowExecutor: 按声明式合并策略写入全局上下文和 artifacts 分区。 """ for key, value in outputs.items(): - if value in (None, "", [], {}): - continue output_config = self.get_action_output_config(action, key) target_key = output_config.get("target") or key merge_policy = output_config.get("merge") or self.get_default_merge_policy(action, target_key, value) + if value in (None, "", [], {}) and merge_policy != "replace": + continue identity = output_config.get("identity") self.merge_output_value(target_key, value, merge_policy, identity) diff --git a/tests/test_workflow_execution.py b/tests/test_workflow_execution.py index 4337b7bb..df0af96b 100644 --- a/tests/test_workflow_execution.py +++ b/tests/test_workflow_execution.py @@ -679,6 +679,58 @@ def test_workflow_executor_filter_action_replaces_artifact_outputs(monkeypatch): assert executor.context.artifacts["torrents"] == ["keep"] +def test_workflow_executor_filter_action_replaces_with_empty_outputs(monkeypatch): + """过滤节点结果为空时也应清空上游资源,避免后续继续下载。""" + calls = [] + stale_torrents = ["old", "drop"] + filtered_torrents = [] + + def run_fetch(action, context): + """模拟上游搜索节点产出资源池。""" + context.torrents = stale_torrents.copy() + return ActionResult( + success=True, + message=f"{action.name}完成", + context=context, + outputs={"torrents": stale_torrents.copy()} + ) + + def run_filter(action, context): + """模拟过滤节点把资源全部过滤掉。""" + context.torrents = filtered_torrents.copy() + return ActionResult( + success=True, + message=f"{action.name}完成", + context=context, + outputs={"torrents": filtered_torrents.copy()} + ) + + fake_manager = _FakeWorkflowManager( + calls, + results={ + "A": run_fetch, + "B": run_filter, + } + ) + workflow = _build_workflow( + actions=[ + {"id": "A", "type": "FetchTorrentsAction", "name": "搜索站点资源", "data": {}}, + {"id": "B", "type": "FilterTorrentsAction", "name": "过滤资源", "data": {}}, + ], + ) + + monkeypatch.setattr(workflow_module, "WorkFlowManager", lambda: fake_manager) + monkeypatch.setattr(workflow_module.global_vars, "workflow_resume", lambda workflow_id: None) + monkeypatch.setattr(workflow_module.global_vars, "is_workflow_stopped", lambda workflow_id: False) + + executor = workflow_module.WorkflowExecutor(workflow) + executor.execute() + + assert executor.context.torrents == filtered_torrents + assert executor.context.artifacts["torrents"] == filtered_torrents + assert executor.context.node_outputs["B"]["torrents"] == filtered_torrents + + def test_workflow_executor_stop_is_not_success(monkeypatch): """停止信号不应被执行器汇报为成功完成。""" calls = []