diff --git a/app/core/security.py b/app/core/security.py index 0f010f4e..6a617638 100644 --- a/app/core/security.py +++ b/app/core/security.py @@ -1,10 +1,17 @@ import base64 import datetime +import hashlib +import hmac +import json +import os +import traceback from datetime import timedelta from typing import Any, Union, Annotated, Optional import jwt from Crypto.Cipher import AES +from Crypto.Util.Padding import pad +from cryptography.fernet import Fernet from fastapi import HTTPException, status, Security, Request, Response from fastapi.security import OAuth2PasswordBearer, APIKeyHeader, APIKeyQuery, APIKeyCookie from passlib.context import CryptContext @@ -267,6 +274,35 @@ def verify_password(plain_password: str, hashed_password: str) -> bool: def get_password_hash(password: str) -> str: return pwd_context.hash(password) + +def decrypt(data: bytes, key: bytes) -> Optional[bytes]: + """ + 解密二进制数据 + """ + fernet = Fernet(key) + try: + return fernet.decrypt(data) + except Exception as e: + logger.error(f"解密失败:{str(e)} - {traceback.format_exc()}") + return None + + +def encrypt_message(message: str, key: bytes) -> str: + """ + 使用给定的key对消息进行加密,并返回加密后的字符串 + """ + f = Fernet(key) + encrypted_message = f.encrypt(message.encode()) + return encrypted_message.decode() + + +def hash_sha256(message: str) -> str: + """ + 对字符串做hash运算 + """ + return hashlib.sha256(message.encode()).hexdigest() + + def aes_decrypt(data: str, key: str) -> str: """ AES解密 @@ -301,3 +337,29 @@ def aes_encrypt(data: str, key: str) -> str: result = cipher.encrypt(data.encode('utf-8')) # 使用base64编码 return base64.b64encode(cipher.iv + result).decode('utf-8') + + +def nexusphp_encrypt(data_str: str, key: bytes) -> str: + """ + NexusPHP加密 + """ + # 生成16字节长的随机字符串 + iv = os.urandom(16) + # 对向量进行 Base64 编码 + iv_base64 = base64.b64encode(iv) + # 加密数据 + cipher = AES.new(key, AES.MODE_CBC, iv) + ciphertext = cipher.encrypt(pad(data_str.encode(), AES.block_size)) + ciphertext_base64 = base64.b64encode(ciphertext) + # 对向量的字符串表示进行签名 + mac = hmac.new(key, msg=iv_base64 + ciphertext_base64, digestmod=hashlib.sha256).hexdigest() + # 构造 JSON 字符串 + json_str = json.dumps({ + 'iv': iv_base64.decode(), + 'value': ciphertext_base64.decode(), + 'mac': mac, + 'tag': '' + }) + + # 对 JSON 字符串进行 Base64 编码 + return base64.b64encode(json_str.encode()).decode()