Files
Foxel/Services/Background/BackgroundTaskQueue.cs

501 lines
19 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Collections.Concurrent;
using System.Threading.Channels;
using Foxel.Models.DataBase;
using Foxel.Services.AI;
using Foxel.Services.Attributes;
using Foxel.Services.Configuration;
using Foxel.Services.Storage;
using Foxel.Services.VectorDB;
using Foxel.Utils;
using Microsoft.EntityFrameworkCore;
namespace Foxel.Services.Background;
public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
{
private readonly Channel<PictureProcessingTask> _queue;
private readonly ConcurrentDictionary<Guid, PictureProcessingTask> _activeTasks;
private readonly ConcurrentDictionary<int, PictureProcessingStatus> _pictureStatus;
private readonly IServiceProvider _serviceProvider;
private readonly IDbContextFactory<MyDbContext> _contextFactory;
private readonly List<Task> _processingTasks;
private readonly SemaphoreSlim _signal;
private readonly int _maxConcurrentTasks;
private bool _isDisposed;
private readonly ILogger<BackgroundTaskQueue> _logger;
public BackgroundTaskQueue(
IServiceProvider serviceProvider,
IDbContextFactory<MyDbContext> contextFactory,
IConfigService configuration,
ILogger<BackgroundTaskQueue> logger)
{
_serviceProvider = serviceProvider;
_contextFactory = contextFactory;
_logger = logger;
_activeTasks = new ConcurrentDictionary<Guid, PictureProcessingTask>();
_pictureStatus = new ConcurrentDictionary<int, PictureProcessingStatus>();
_processingTasks = new List<Task>();
_maxConcurrentTasks = configuration.GetValueAsync("BackgroundTasks:MaxConcurrentTasks", 10).Result;
_signal = new SemaphoreSlim(_maxConcurrentTasks);
var options = new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<PictureProcessingTask>(options);
}
public async Task<Guid> QueuePictureProcessingTaskAsync(int pictureId, string originalFilePath)
{
var task = new PictureProcessingTask
{
Id = Guid.NewGuid(),
PictureId = pictureId,
OriginalFilePath = originalFilePath,
CreatedAt = DateTime.UtcNow
};
// 更新状态字典
var status = new PictureProcessingStatus
{
TaskId = task.Id,
PictureId = pictureId,
Status = ProcessingStatus.Pending,
Progress = 0,
CreatedAt = DateTime.UtcNow
};
// 将用户ID添加到任务状态中这样可以按用户过滤任务
using var scope = _serviceProvider.CreateScope();
await using var dbContext = await _contextFactory.CreateDbContextAsync();
var picture = await dbContext.Pictures
.Include(p => p.User)
.FirstOrDefaultAsync(p => p.Id == pictureId);
if (picture != null)
{
status.PictureName = picture.Name;
task.UserId = picture.UserId;
}
_pictureStatus[pictureId] = status;
_activeTasks[task.Id] = task;
await _queue.Writer.WriteAsync(task);
// 启动处理器,如果没有正在运行
StartProcessor();
return task.Id;
}
public async Task<List<PictureProcessingStatus>> GetUserTasksStatusAsync(int userId)
{
await using var dbContext = await _contextFactory.CreateDbContextAsync();
var userPictureIds = await dbContext.Pictures
.Where(p => p.UserId == userId &&
(p.ProcessingStatus == ProcessingStatus.Pending ||
p.ProcessingStatus == ProcessingStatus.Processing))
.Select(p => p.Id)
.ToListAsync();
return _pictureStatus.Values
.Where(s => userPictureIds.Contains(s.PictureId))
.OrderByDescending(s => s.CreatedAt)
.ToList();
}
public Task<PictureProcessingStatus?> GetPictureProcessingStatusAsync(int pictureId)
{
return Task.FromResult(_pictureStatus.GetValueOrDefault(pictureId));
}
public async Task RestoreUnfinishedTasksAsync()
{
try
{
await using var dbContext = await _contextFactory.CreateDbContextAsync();
// 获取所有未完成的图片处理任务
var unfinishedPictures = await dbContext.Pictures
.Where(p => p.ProcessingStatus == ProcessingStatus.Pending ||
p.ProcessingStatus == ProcessingStatus.Processing)
.ToListAsync();
if (unfinishedPictures.Any())
{
_logger.LogInformation("正在恢复 {Count} 个未完成的图片处理任务", unfinishedPictures.Count);
foreach (var picture in unfinishedPictures)
{
// 构建原始文件路径
string relativePath = picture.Path.TrimStart('/');
string originalFilePath = Path.Combine(Directory.GetCurrentDirectory(), relativePath);
if (File.Exists(originalFilePath))
{
// 重新加入队列
await QueuePictureProcessingTaskAsync(picture.Id, originalFilePath);
_logger.LogInformation("已恢复图片处理任务: ID={PictureId}, 路径={FilePath}", picture.Id, originalFilePath);
}
else
{
// 如果文件不存在,则标记为失败
picture.ProcessingStatus = ProcessingStatus.Failed;
picture.ProcessingError = "系统重启后找不到原始图片文件";
_logger.LogWarning("无法恢复图片处理任务: ID={PictureId}, 找不到文件: {FilePath}", picture.Id, originalFilePath);
}
}
await dbContext.SaveChangesAsync();
}
else
{
_logger.LogInformation("没有需要恢复的图片处理任务");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "恢复未完成的任务时发生错误");
}
}
private void StartProcessor()
{
// 添加新的处理任务,如果当前任务数量小于最大并发数
while (_processingTasks.Count(t => !t.IsCompleted) < _maxConcurrentTasks)
{
_processingTasks.Add(Task.Run(ProcessTasksAsync));
}
// 清理已完成的任务
_processingTasks.RemoveAll(t => t.IsCompleted);
}
private async Task ProcessTasksAsync()
{
while (await _queue.Reader.WaitToReadAsync())
{
await _signal.WaitAsync();
try
{
if (_queue.Reader.TryRead(out var task))
{
await ProcessPictureAsync(task);
}
}
finally
{
_signal.Release();
}
}
}
private async Task ProcessPictureAsync(PictureProcessingTask task)
{
if (!_activeTasks.TryGetValue(task.Id, out _) || !_pictureStatus.TryGetValue(task.PictureId, out var status))
{
return;
}
// 更新状态为处理中
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 0);
string localFilePath = "";
bool isTempFile = false;
var dbContext = await _contextFactory.CreateDbContextAsync();
var picture = await dbContext.Pictures.FindAsync(task.PictureId);
try
{
using var scope = _serviceProvider.CreateScope();
var aiService = scope.ServiceProvider.GetRequiredService<IAiService>();
var storageService = scope.ServiceProvider.GetRequiredService<IStorageService>();
// 1. 获取图片信息
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 10);
if (picture == null)
{
throw new Exception($"找不到ID为{task.PictureId}的图片");
}
// 处理文件获取逻辑
if (picture.StorageType == StorageType.Local)
{
// 本地存储,直接使用文件路径
localFilePath = Path.Combine(Directory.GetCurrentDirectory(), picture.Path.TrimStart('/'));
}
else
{
// 非本地存储需要先下载文件
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 15);
localFilePath = await storageService.ExecuteAsync(picture.StorageType,
provider => provider.DownloadFileAsync(picture.Path));
isTempFile = true;
}
if (string.IsNullOrEmpty(localFilePath) || !File.Exists(localFilePath))
{
throw new Exception($"找不到图片文件: {localFilePath}");
}
// 检查并生成缩略图(如果不存在)
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 20);
string thumbnailForAI = localFilePath; // 用于AI分析的缩略图路径
if (string.IsNullOrEmpty(picture.ThumbnailPath))
{
// 如果缩略图不存在,生成缩略图
var thumbnailPath = Path.Combine(
Path.GetDirectoryName(localFilePath)!,
Path.GetFileNameWithoutExtension(Path.GetFileName(localFilePath)) + "_thumb.webp");
await ImageHelper.CreateThumbnailAsync(localFilePath, thumbnailPath, 500);
thumbnailForAI = thumbnailPath;
// 更新缩略图路径到数据库
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 25);
if (picture.StorageType == StorageType.Local)
{
// 本地存储缩略图
var relativeThumbnailPath =
$"/Uploads/{Path.GetRelativePath("Uploads", Path.GetDirectoryName(thumbnailPath)!)}/{Path.GetFileName(thumbnailPath)}";
picture.ThumbnailPath = relativeThumbnailPath.Replace('\\', '/');
}
else
{
// 上传缩略图并获取存储路径或元数据
await using var thumbnailFileStream = new FileStream(thumbnailPath, FileMode.Open, FileAccess.Read);
var thumbnailFileName = Path.GetFileName(thumbnailPath);
var thumbnailContentType = "image/webp";
string thumbnailStoragePath = await storageService.ExecuteAsync(
picture.StorageType,
provider => provider.SaveAsync(thumbnailFileStream, thumbnailFileName, thumbnailContentType));
// 将路径或元数据存储到ThumbnailPath
picture.ThumbnailPath = thumbnailStoragePath;
}
}
else
{
// 如果缩略图已存在下载用于AI分析
if (picture.StorageType != StorageType.Local)
{
thumbnailForAI = await storageService.ExecuteAsync(picture.StorageType,
provider => provider.DownloadFileAsync(picture.ThumbnailPath));
}
else
{
thumbnailForAI = Path.Combine(Directory.GetCurrentDirectory(), picture.ThumbnailPath.TrimStart('/'));
}
}
// 3. 提取EXIF信息
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 30);
var exifInfo = await ImageHelper.ExtractExifInfoAsync(localFilePath);
picture.ExifInfo = exifInfo;
// 4. 从EXIF中提取拍摄时间并确保是UTC格式
picture.TakenAt = ImageHelper.ParseExifDateTime(exifInfo.DateTimeOriginal);
// 保存缩略图和EXIF信息的更改确保这些基本信息即使在后续步骤失败时也能保存
await dbContext.SaveChangesAsync();
// 5. 将缩略图转换为Base64并调用AI分析
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 50);
string base64Image = await ImageHelper.ConvertImageToBase64(thumbnailForAI);
var (title, description) = await aiService.AnalyzeImageAsync(base64Image);
// 6. 确定最终标题和描述
string finalTitle = !string.IsNullOrWhiteSpace(title) && title != "AI生成的标题"
? title
: Path.GetFileNameWithoutExtension(localFilePath);
string finalDescription = !string.IsNullOrWhiteSpace(description) && description != "AI生成的描述"
? description
: picture.Description;
picture.Name = finalTitle;
picture.Description = finalDescription;
// 7. 生成嵌入向量
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 60);
var combinedText = $"{finalTitle}. {finalDescription}";
var embedding = await aiService.GetEmbeddingAsync(combinedText);
picture.Embedding = embedding;
if (picture.UserId.HasValue && embedding.Length > 0)
{
var vectorDbService = scope.ServiceProvider.GetRequiredService<IVectorDbService>();
var pictureVector = new Models.Vector.PictureVector
{
Id = (ulong)picture.Id,
Name = picture.Name,
Embedding = embedding
};
await vectorDbService.AddPictureToUserCollectionAsync(picture.UserId.Value, pictureVector);
}
// 8. 获取所有可用标签名称
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 70);
var availableTagNames = await dbContext.Tags.Select(t => t.Name).ToListAsync();
// 9. 获取匹配的标签名称 - 从图片生成标签
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 80);
var matchedTagNames = await aiService.GenerateTagsFromImageAsync(base64Image, availableTagNames, true);
// 10. 处理标签
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 90);
var user = await dbContext.Users
.Include(u => u.Tags)
.FirstOrDefaultAsync(u => u.Id == picture.UserId);
if (user != null && matchedTagNames.Any())
{
var tagEntities = new List<Tag>();
foreach (var tagName in matchedTagNames)
{
var existingTag = await dbContext.Tags
.Include(t => t.Users)
.FirstOrDefaultAsync(t => t.Name.ToLower() == tagName.ToLower());
if (existingTag != null)
{
tagEntities.Add(existingTag);
user.Tags ??= new List<Tag>();
if (user.Tags.All(t => t.Id != existingTag.Id))
{
user.Tags.Add(existingTag);
}
}
else
{
var newTag = new Tag { Name = tagName.Trim(), Description = tagName.Trim() };
dbContext.Tags.Add(newTag);
await dbContext.SaveChangesAsync();
user.Tags ??= new List<Tag>();
user.Tags.Add(newTag);
tagEntities.Add(newTag);
}
}
picture.Tags = tagEntities;
}
// 11. 更新图片处理状态为完成
picture.ProcessingStatus = ProcessingStatus.Completed;
picture.ProcessingProgress = 100;
await dbContext.SaveChangesAsync();
// 更新任务状态
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Completed, 100);
status.CompletedAt = DateTime.UtcNow;
}
catch (Exception ex)
{
// 更新状态为失败
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Failed, 0, ex.Message);
// 确保图片对象存在且已有的处理结果被保存
if (picture != null)
{
picture.ProcessingStatus = ProcessingStatus.Failed;
picture.ProcessingError = ex.Message;
try
{
await dbContext.SaveChangesAsync();
}
catch (Exception saveEx)
{
_logger.LogError(saveEx, "保存失败状态时出错");
}
}
// 记录错误日志
_logger.LogError(ex, "图片处理失败: 图片ID={PictureId}", task.PictureId);
}
finally
{
// 如果是临时文件,处理完后删除
if (isTempFile && File.Exists(localFilePath))
{
try
{
File.Delete(localFilePath);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "删除临时文件失败: {FilePath}", localFilePath);
}
}
// 清理活动任务
_activeTasks.TryRemove(task.Id, out _);
// 继续处理队列中的下一个任务
StartProcessor();
}
}
private async Task UpdatePictureStatus(int pictureId, ProcessingStatus status, int progress, string? error = null)
{
if (_pictureStatus.TryGetValue(pictureId, out var currentStatus))
{
currentStatus.Status = status;
currentStatus.Progress = progress;
currentStatus.Error = error;
}
// 更新数据库中的状态
await using var dbContext = await _contextFactory.CreateDbContextAsync();
var picture = await dbContext.Pictures.FindAsync(pictureId);
if (picture != null)
{
picture.ProcessingStatus = status;
picture.ProcessingProgress = progress;
picture.ProcessingError = error;
await dbContext.SaveChangesAsync();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (_isDisposed) return;
if (disposing)
{
_signal.Dispose();
try
{
Task.WhenAll(_processingTasks).Wait(5000);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "等待处理任务完成时超时");
}
}
_isDisposed = true;
}
}
/// <summary>
/// 图片处理任务
/// </summary>
public class PictureProcessingTask
{
public Guid Id { get; set; }
public int PictureId { get; set; }
public string OriginalFilePath { get; set; } = string.Empty;
public int? UserId { get; set; }
public DateTime CreatedAt { get; set; }
}