From ff2826a44860beccd7ba8017a84802c7332367d7 Mon Sep 17 00:00:00 2001 From: wumode Date: Wed, 5 Nov 2025 13:45:31 +0800 Subject: [PATCH 1/2] feat(utils): Refactor check_method to use ast MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 使用 AST 解析函数源码,相比基于字符串的方法更稳定,能够正确处理具有多行 def 语句的函数 - 为 check_method 添加了单元测试 --- app/utils/object.py | 60 ++++++++++++++++++++++---------------------- tests/run.py | 3 +++ tests/test_object.py | 41 ++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 30 deletions(-) create mode 100644 tests/test_object.py diff --git a/app/utils/object.py b/app/utils/object.py index 7030434f..60756618 100644 --- a/app/utils/object.py +++ b/app/utils/object.py @@ -1,6 +1,8 @@ +import ast import dis import inspect -from types import FunctionType +import textwrap +from types import FunctionType, MethodType from typing import Any, Callable, get_type_hints @@ -39,40 +41,38 @@ class ObjectUtils: return len(list(parameters.keys())) @staticmethod - def check_method(func: FunctionType) -> bool: + def check_method(func: FunctionType | MethodType) -> bool: """ 检查函数是否已实现 """ try: - # 尝试通过源代码分析 - source = inspect.getsource(func) - in_comment = False - for line in source.split('\n'): - line = line.strip() - # 跳过空行 - if not line: - continue - # 处理"""单行注释 - if (line.startswith(('"""', "'''")) - and line.endswith(('"""', "'''")) - and len(line) > 3): - continue - # 处理"""多行注释 - if line.startswith(('"""', "'''")): - in_comment = not in_comment - continue - # 在注释中则跳过 - if in_comment: - continue - # 跳过#注释、pass语句、装饰器、函数定义行 - if (line.startswith('#') - or line == "pass" - or line.startswith('@') - or line.startswith('def ')): - continue - # 发现有效代码行 + src = inspect.getsource(func) + tree = ast.parse(textwrap.dedent(src)) + node = tree.body[0] + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + return True + body = node.body + + for stmt in body: + # 跳过 pass + if isinstance(stmt, ast.Pass): + continue + # 跳过 docstring 或 ... + if isinstance(stmt, ast.Expr): + expr = stmt.value + if isinstance(expr, ast.Constant) and isinstance(expr.value, str): + continue + if isinstance(expr, ast.Constant) and expr.value is Ellipsis: + continue + # 检查 raise NotImplementedError + if isinstance(stmt, ast.Raise): + exc = stmt.exc + if isinstance(exc, ast.Call) and getattr(exc.func, "id", None) == "NotImplementedError": + continue + if isinstance(exc, ast.Name) and exc.id == "NotImplementedError": + continue + return True - # 没有有效代码行 return False except Exception as err: print(err) diff --git a/tests/run.py b/tests/run.py index 5aa65968..7daf3882 100644 --- a/tests/run.py +++ b/tests/run.py @@ -1,6 +1,8 @@ import unittest from tests.test_metainfo import MetaInfoTest +from tests.test_object import ObjectUtilsTest + if __name__ == '__main__': suite = unittest.TestSuite() @@ -8,6 +10,7 @@ if __name__ == '__main__': # 测试名称识别 suite.addTest(MetaInfoTest('test_metainfo')) suite.addTest(MetaInfoTest('test_emby_format_ids')) + suite.addTest(ObjectUtilsTest('test_check_method')) # 运行测试 runner = unittest.TextTestRunner() diff --git a/tests/test_object.py b/tests/test_object.py new file mode 100644 index 00000000..603f19a5 --- /dev/null +++ b/tests/test_object.py @@ -0,0 +1,41 @@ +from unittest import TestCase + +from app.utils.object import ObjectUtils + + +class ObjectUtilsTest(TestCase): + + def test_check_method(self): + def implemented_function(): + return "Hello" + + def pass_function(): + pass + + def docstring_function(): + """This is a docstring.""" + + def ellipsis_function(): + ... + + def not_implemented_function(): + raise NotImplementedError + + def not_implemented_function_no_call(): + raise NotImplementedError() + + async def multiple_lines_async_def(_param1: str, + _param2: str): + pass + + def empty_function(): + return + + self.assertTrue(ObjectUtils.check_method(implemented_function)) + self.assertFalse(ObjectUtils.check_method(pass_function)) + self.assertFalse(ObjectUtils.check_method(docstring_function)) + self.assertFalse(ObjectUtils.check_method(ellipsis_function)) + self.assertFalse(ObjectUtils.check_method(not_implemented_function)) + self.assertFalse(ObjectUtils.check_method(not_implemented_function_no_call)) + self.assertFalse(ObjectUtils.check_method(multiple_lines_async_def)) + self.assertTrue(ObjectUtils.check_method(empty_function)) From 9d182e53b2755c4affbbc5d02b2bfa8922778550 Mon Sep 17 00:00:00 2001 From: wumode Date: Wed, 5 Nov 2025 14:49:23 +0800 Subject: [PATCH 2/2] fix: type hints --- app/utils/object.py | 13 ++++++------- tests/test_object.py | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/app/utils/object.py b/app/utils/object.py index 60756618..50e6964f 100644 --- a/app/utils/object.py +++ b/app/utils/object.py @@ -2,7 +2,7 @@ import ast import dis import inspect import textwrap -from types import FunctionType, MethodType +from types import FunctionType from typing import Any, Callable, get_type_hints @@ -41,7 +41,7 @@ class ObjectUtils: return len(list(parameters.keys())) @staticmethod - def check_method(func: FunctionType | MethodType) -> bool: + def check_method(func: Callable[..., Any]) -> bool: """ 检查函数是否已实现 """ @@ -60,10 +60,9 @@ class ObjectUtils: # 跳过 docstring 或 ... if isinstance(stmt, ast.Expr): expr = stmt.value - if isinstance(expr, ast.Constant) and isinstance(expr.value, str): - continue - if isinstance(expr, ast.Constant) and expr.value is Ellipsis: - continue + if isinstance(expr, ast.Constant): + if isinstance(expr.value, str) or expr.value is Ellipsis: + continue # 检查 raise NotImplementedError if isinstance(stmt, ast.Raise): exc = stmt.exc @@ -77,7 +76,7 @@ class ObjectUtils: except Exception as err: print(err) # 源代码分析失败时,进行字节码分析 - code_obj = func.__code__ + code_obj = func.__code__ # type: ignore[attr-defined] instructions = list(dis.get_instructions(code_obj)) # 检查是否为仅返回None的简单结构 if len(instructions) == 2: diff --git a/tests/test_object.py b/tests/test_object.py index 603f19a5..750dac92 100644 --- a/tests/test_object.py +++ b/tests/test_object.py @@ -21,7 +21,7 @@ class ObjectUtilsTest(TestCase): def not_implemented_function(): raise NotImplementedError - def not_implemented_function_no_call(): + def not_implemented_function_with_call(): raise NotImplementedError() async def multiple_lines_async_def(_param1: str, @@ -36,6 +36,6 @@ class ObjectUtilsTest(TestCase): self.assertFalse(ObjectUtils.check_method(docstring_function)) self.assertFalse(ObjectUtils.check_method(ellipsis_function)) self.assertFalse(ObjectUtils.check_method(not_implemented_function)) - self.assertFalse(ObjectUtils.check_method(not_implemented_function_no_call)) + self.assertFalse(ObjectUtils.check_method(not_implemented_function_with_call)) self.assertFalse(ObjectUtils.check_method(multiple_lines_async_def)) self.assertTrue(ObjectUtils.check_method(empty_function))