mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-04 06:59:42 +08:00
fix: 兼容低版本临时文件参数
This commit is contained in:
@@ -183,6 +183,20 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
|
||||
def on_config_changed(self):
|
||||
self.scraping_policies = ScrapingConfig.from_system_config()
|
||||
|
||||
@staticmethod
|
||||
def _cleanup_temp_file(path: Optional[Path]):
|
||||
"""
|
||||
清理临时刮削文件
|
||||
|
||||
:param path: 临时文件路径
|
||||
"""
|
||||
if not path or not path.exists():
|
||||
return
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError as err:
|
||||
logger.warn(f"临时文件清理失败:{path} - {err}")
|
||||
|
||||
@staticmethod
|
||||
def _should_scrape(
|
||||
scraping_option: ScrapingOption,
|
||||
@@ -232,18 +246,17 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
|
||||
"""
|
||||
if not fileitem or not content or not path:
|
||||
return
|
||||
# 使用tempfile创建临时文件
|
||||
with NamedTemporaryFile(
|
||||
delete=True, delete_on_close=False, suffix=path.suffix
|
||||
) as tmp_file:
|
||||
tmp_file_path = Path(tmp_file.name)
|
||||
# 写入内容
|
||||
if isinstance(content, bytes):
|
||||
tmp_file.write(content)
|
||||
else:
|
||||
tmp_file.write(content.encode("utf-8"))
|
||||
tmp_file.flush()
|
||||
tmp_file.close() # 关闭文件句柄
|
||||
tmp_file_path = None
|
||||
try:
|
||||
# delete_on_close 是 Python 3.12 才支持的参数,使用 delete=False 后手动清理以兼容低版本。
|
||||
with NamedTemporaryFile(delete=False, suffix=path.suffix) as tmp_file:
|
||||
tmp_file_path = Path(tmp_file.name)
|
||||
# 写入内容
|
||||
if isinstance(content, bytes):
|
||||
tmp_file.write(content)
|
||||
else:
|
||||
tmp_file.write(content.encode("utf-8"))
|
||||
tmp_file.flush()
|
||||
|
||||
# 刮削文件只需要读写权限
|
||||
tmp_file_path.chmod(0o666 & ~current_umask)
|
||||
@@ -256,6 +269,8 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
|
||||
logger.info(f"已保存文件:{item.path}")
|
||||
else:
|
||||
logger.warn(f"文件保存失败:{path}")
|
||||
finally:
|
||||
self._cleanup_temp_file(tmp_file_path)
|
||||
|
||||
def _download_and_save_image(
|
||||
self, fileitem: schemas.FileItem, path: Path, url: str
|
||||
@@ -276,17 +291,16 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
|
||||
)
|
||||
with request_utils.get_stream(url=url) as r:
|
||||
if r and r.status_code == 200:
|
||||
# 使用tempfile创建临时文件,自动删除
|
||||
with NamedTemporaryFile(
|
||||
delete=True, delete_on_close=False, suffix=path.suffix
|
||||
) as tmp_file:
|
||||
tmp_file_path = Path(tmp_file.name)
|
||||
# 流式写入文件
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
tmp_file.write(chunk)
|
||||
tmp_file.flush()
|
||||
tmp_file.close() # 关闭文件句柄
|
||||
tmp_file_path = None
|
||||
try:
|
||||
# delete_on_close 是 Python 3.12 才支持的参数,使用 delete=False 后手动清理以兼容低版本。
|
||||
with NamedTemporaryFile(delete=False, suffix=path.suffix) as tmp_file:
|
||||
tmp_file_path = Path(tmp_file.name)
|
||||
# 流式写入文件
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
tmp_file.write(chunk)
|
||||
tmp_file.flush()
|
||||
|
||||
# 刮削的图片只需要读写权限
|
||||
tmp_file_path.chmod(0o666 & ~current_umask)
|
||||
@@ -299,6 +313,8 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
|
||||
logger.info(f"已保存图片:{item.path}")
|
||||
else:
|
||||
logger.warn(f"图片保存失败:{path}")
|
||||
finally:
|
||||
self._cleanup_temp_file(tmp_file_path)
|
||||
else:
|
||||
logger.info(f"{url} 图片下载失败")
|
||||
except Exception as err:
|
||||
|
||||
@@ -424,6 +424,7 @@ class TestMediaScrapingImages(unittest.TestCase):
|
||||
|
||||
mock_request_utils.assert_called_with(proxies=mock_settings.PROXY, ua=mock_settings.NORMAL_USER_AGENT)
|
||||
mock_instance.get_stream.assert_called_with(url=url)
|
||||
mock_temp_file.assert_called_once_with(delete=False, suffix=".jpg")
|
||||
tmp_mock.write.assert_any_call(b"data1")
|
||||
tmp_mock.write.assert_any_call(b"data2")
|
||||
mock_chmod.assert_called()
|
||||
@@ -432,6 +433,30 @@ class TestMediaScrapingImages(unittest.TestCase):
|
||||
self.assertEqual(call_args["fileitem"], fileitem)
|
||||
self.assertEqual(call_args["new_name"], "poster.jpg")
|
||||
|
||||
@patch("app.chain.media.NamedTemporaryFile")
|
||||
@patch("app.chain.media.Path.chmod")
|
||||
def test_save_file_uses_python310_compatible_tempfile(self, mock_chmod, mock_temp_file):
|
||||
"""保存刮削文件时不应使用 Python 3.12 才支持的 delete_on_close 参数。"""
|
||||
self.media_chain = MediaChain()
|
||||
self.media_chain.storagechain = MagicMock()
|
||||
self.media_chain._cleanup_temp_file = MagicMock()
|
||||
|
||||
fileitem = schemas.FileItem(path="/movies/Avatar", name="Avatar", type="dir", storage="local")
|
||||
target_path = Path("/movies/Avatar/movie.nfo")
|
||||
|
||||
tmp_mock = MagicMock()
|
||||
tmp_mock.name = "/tmp/mockfile"
|
||||
mock_temp_file.return_value.__enter__.return_value = tmp_mock
|
||||
self.media_chain.storagechain.upload_file.return_value = fileitem
|
||||
|
||||
self.media_chain._save_file(fileitem, target_path, "<nfo></nfo>")
|
||||
|
||||
mock_temp_file.assert_called_once_with(delete=False, suffix=".nfo")
|
||||
tmp_mock.write.assert_called_once_with(b"<nfo></nfo>")
|
||||
mock_chmod.assert_called()
|
||||
self.media_chain.storagechain.upload_file.assert_called_once()
|
||||
self.media_chain._cleanup_temp_file.assert_called_once_with(Path("/tmp/mockfile"))
|
||||
|
||||
|
||||
class TestMediaScrapingTVDirectory(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
||||
Reference in New Issue
Block a user