using System.Collections.Concurrent; using System.Threading.Channels; using Foxel.Models.DataBase; using Foxel.Services.AI; using Foxel.Services.Attributes; using Foxel.Services.Configuration; using Foxel.Services.Storage; using Foxel.Services.VectorDB; using Foxel.Utils; using Microsoft.EntityFrameworkCore; namespace Foxel.Services.Background; public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable { private readonly Channel _queue; private readonly ConcurrentDictionary _activeTasks; private readonly ConcurrentDictionary _pictureStatus; private readonly IServiceProvider _serviceProvider; private readonly IDbContextFactory _contextFactory; private readonly List _processingTasks; private readonly SemaphoreSlim _signal; private readonly int _maxConcurrentTasks; private bool _isDisposed; private readonly ILogger _logger; public BackgroundTaskQueue( IServiceProvider serviceProvider, IDbContextFactory contextFactory, IConfigService configuration, ILogger logger) { _serviceProvider = serviceProvider; _contextFactory = contextFactory; _logger = logger; _activeTasks = new ConcurrentDictionary(); _pictureStatus = new ConcurrentDictionary(); _processingTasks = new List(); _maxConcurrentTasks = configuration.GetValueAsync("BackgroundTasks:MaxConcurrentTasks", 10).Result; _signal = new SemaphoreSlim(_maxConcurrentTasks); var options = new BoundedChannelOptions(10000) { FullMode = BoundedChannelFullMode.Wait }; _queue = Channel.CreateBounded(options); } public async Task QueuePictureProcessingTaskAsync(int pictureId, string originalFilePath) { var task = new PictureProcessingTask { Id = Guid.NewGuid(), PictureId = pictureId, OriginalFilePath = originalFilePath, CreatedAt = DateTime.UtcNow }; // 更新状态字典 var status = new PictureProcessingStatus { TaskId = task.Id, PictureId = pictureId, Status = ProcessingStatus.Pending, Progress = 0, CreatedAt = DateTime.UtcNow }; // 将用户ID添加到任务状态中,这样可以按用户过滤任务 using var scope = _serviceProvider.CreateScope(); await using var dbContext = await _contextFactory.CreateDbContextAsync(); var picture = await dbContext.Pictures .Include(p => p.User) .FirstOrDefaultAsync(p => p.Id == pictureId); if (picture != null) { status.PictureName = picture.Name; task.UserId = picture.UserId; } _pictureStatus[pictureId] = status; _activeTasks[task.Id] = task; await _queue.Writer.WriteAsync(task); // 启动处理器,如果没有正在运行 StartProcessor(); return task.Id; } public async Task> GetUserTasksStatusAsync(int userId) { await using var dbContext = await _contextFactory.CreateDbContextAsync(); var userPictureIds = await dbContext.Pictures .Where(p => p.UserId == userId && (p.ProcessingStatus == ProcessingStatus.Pending || p.ProcessingStatus == ProcessingStatus.Processing)) .Select(p => p.Id) .ToListAsync(); return _pictureStatus.Values .Where(s => userPictureIds.Contains(s.PictureId)) .OrderByDescending(s => s.CreatedAt) .ToList(); } public Task GetPictureProcessingStatusAsync(int pictureId) { return Task.FromResult(_pictureStatus.GetValueOrDefault(pictureId)); } public async Task RestoreUnfinishedTasksAsync() { try { await using var dbContext = await _contextFactory.CreateDbContextAsync(); // 获取所有未完成的图片处理任务 var unfinishedPictures = await dbContext.Pictures .Where(p => p.ProcessingStatus == ProcessingStatus.Pending || p.ProcessingStatus == ProcessingStatus.Processing) .ToListAsync(); if (unfinishedPictures.Any()) { _logger.LogInformation("正在恢复 {Count} 个未完成的图片处理任务", unfinishedPictures.Count); foreach (var picture in unfinishedPictures) { // 构建原始文件路径 string relativePath = picture.Path.TrimStart('/'); string originalFilePath = Path.Combine(Directory.GetCurrentDirectory(), relativePath); if (File.Exists(originalFilePath)) { // 重新加入队列 await QueuePictureProcessingTaskAsync(picture.Id, originalFilePath); _logger.LogInformation("已恢复图片处理任务: ID={PictureId}, 路径={FilePath}", picture.Id, originalFilePath); } else { // 如果文件不存在,则标记为失败 picture.ProcessingStatus = ProcessingStatus.Failed; picture.ProcessingError = "系统重启后找不到原始图片文件"; _logger.LogWarning("无法恢复图片处理任务: ID={PictureId}, 找不到文件: {FilePath}", picture.Id, originalFilePath); } } await dbContext.SaveChangesAsync(); } else { _logger.LogInformation("没有需要恢复的图片处理任务"); } } catch (Exception ex) { _logger.LogError(ex, "恢复未完成的任务时发生错误"); } } private void StartProcessor() { // 添加新的处理任务,如果当前任务数量小于最大并发数 while (_processingTasks.Count(t => !t.IsCompleted) < _maxConcurrentTasks) { _processingTasks.Add(Task.Run(ProcessTasksAsync)); } // 清理已完成的任务 _processingTasks.RemoveAll(t => t.IsCompleted); } private async Task ProcessTasksAsync() { while (await _queue.Reader.WaitToReadAsync()) { await _signal.WaitAsync(); try { if (_queue.Reader.TryRead(out var task)) { await ProcessPictureAsync(task); } } finally { _signal.Release(); } } } private async Task ProcessPictureAsync(PictureProcessingTask task) { if (!_activeTasks.TryGetValue(task.Id, out _) || !_pictureStatus.TryGetValue(task.PictureId, out var status)) { return; } // 更新状态为处理中 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 0); string localFilePath = ""; bool isTempFile = false; var dbContext = await _contextFactory.CreateDbContextAsync(); var picture = await dbContext.Pictures.FindAsync(task.PictureId); try { using var scope = _serviceProvider.CreateScope(); var aiService = scope.ServiceProvider.GetRequiredService(); var storageService = scope.ServiceProvider.GetRequiredService(); // 1. 获取图片信息 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 10); if (picture == null) { throw new Exception($"找不到ID为{task.PictureId}的图片"); } // 处理文件获取逻辑 if (picture.StorageType == StorageType.Local) { // 本地存储,直接使用文件路径 localFilePath = Path.Combine(Directory.GetCurrentDirectory(), picture.Path.TrimStart('/')); } else { // 非本地存储需要先下载文件 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 15); localFilePath = await storageService.ExecuteAsync(picture.StorageType, provider => provider.DownloadFileAsync(picture.Path)); isTempFile = true; } if (string.IsNullOrEmpty(localFilePath) || !File.Exists(localFilePath)) { throw new Exception($"找不到图片文件: {localFilePath}"); } // 检查并生成缩略图(如果不存在) await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 20); string thumbnailForAI = localFilePath; // 用于AI分析的缩略图路径 if (string.IsNullOrEmpty(picture.ThumbnailPath)) { // 如果缩略图不存在,生成缩略图 var thumbnailPath = Path.Combine( Path.GetDirectoryName(localFilePath)!, Path.GetFileNameWithoutExtension(Path.GetFileName(localFilePath)) + "_thumb.webp"); await ImageHelper.CreateThumbnailAsync(localFilePath, thumbnailPath, 500); thumbnailForAI = thumbnailPath; // 更新缩略图路径到数据库 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 25); if (picture.StorageType == StorageType.Local) { // 本地存储缩略图 var relativeThumbnailPath = $"/Uploads/{Path.GetRelativePath("Uploads", Path.GetDirectoryName(thumbnailPath)!)}/{Path.GetFileName(thumbnailPath)}"; picture.ThumbnailPath = relativeThumbnailPath.Replace('\\', '/'); } else { // 上传缩略图并获取存储路径或元数据 await using var thumbnailFileStream = new FileStream(thumbnailPath, FileMode.Open, FileAccess.Read); var thumbnailFileName = Path.GetFileName(thumbnailPath); var thumbnailContentType = "image/webp"; string thumbnailStoragePath = await storageService.ExecuteAsync( picture.StorageType, provider => provider.SaveAsync(thumbnailFileStream, thumbnailFileName, thumbnailContentType)); // 将路径或元数据存储到ThumbnailPath picture.ThumbnailPath = thumbnailStoragePath; } } else { // 如果缩略图已存在,下载用于AI分析 if (picture.StorageType != StorageType.Local) { thumbnailForAI = await storageService.ExecuteAsync(picture.StorageType, provider => provider.DownloadFileAsync(picture.ThumbnailPath)); } else { thumbnailForAI = Path.Combine(Directory.GetCurrentDirectory(), picture.ThumbnailPath.TrimStart('/')); } } // 3. 提取EXIF信息 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 30); var exifInfo = await ImageHelper.ExtractExifInfoAsync(localFilePath); picture.ExifInfo = exifInfo; // 4. 从EXIF中提取拍摄时间并确保是UTC格式 picture.TakenAt = ImageHelper.ParseExifDateTime(exifInfo.DateTimeOriginal); // 保存缩略图和EXIF信息的更改,确保这些基本信息即使在后续步骤失败时也能保存 await dbContext.SaveChangesAsync(); // 5. 将缩略图转换为Base64并调用AI分析 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 50); string base64Image = await ImageHelper.ConvertImageToBase64(thumbnailForAI); var (title, description) = await aiService.AnalyzeImageAsync(base64Image); // 6. 确定最终标题和描述 string finalTitle = !string.IsNullOrWhiteSpace(title) && title != "AI生成的标题" ? title : Path.GetFileNameWithoutExtension(localFilePath); string finalDescription = !string.IsNullOrWhiteSpace(description) && description != "AI生成的描述" ? description : picture.Description; picture.Name = finalTitle; picture.Description = finalDescription; // 7. 生成嵌入向量 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 60); var combinedText = $"{finalTitle}. {finalDescription}"; var embedding = await aiService.GetEmbeddingAsync(combinedText); picture.Embedding = embedding; if (picture.UserId.HasValue && embedding.Length > 0) { var vectorDbService = scope.ServiceProvider.GetRequiredService(); var pictureVector = new Models.Vector.PictureVector { Id = (ulong)picture.Id, Name = picture.Name, Embedding = embedding }; await vectorDbService.AddPictureToUserCollectionAsync(picture.UserId.Value, pictureVector); } // 8. 获取所有可用标签名称 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 70); var availableTagNames = await dbContext.Tags.Select(t => t.Name).ToListAsync(); // 9. 获取匹配的标签名称 - 从图片生成标签 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 80); var matchedTagNames = await aiService.GenerateTagsFromImageAsync(base64Image, availableTagNames, true); // 10. 处理标签 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 90); var user = await dbContext.Users .Include(u => u.Tags) .FirstOrDefaultAsync(u => u.Id == picture.UserId); if (user != null && matchedTagNames.Any()) { var tagEntities = new List(); foreach (var tagName in matchedTagNames) { var existingTag = await dbContext.Tags .Include(t => t.Users) .FirstOrDefaultAsync(t => t.Name.ToLower() == tagName.ToLower()); if (existingTag != null) { tagEntities.Add(existingTag); user.Tags ??= new List(); if (user.Tags.All(t => t.Id != existingTag.Id)) { user.Tags.Add(existingTag); } } else { var newTag = new Tag { Name = tagName.Trim(), Description = tagName.Trim() }; dbContext.Tags.Add(newTag); await dbContext.SaveChangesAsync(); user.Tags ??= new List(); user.Tags.Add(newTag); tagEntities.Add(newTag); } } picture.Tags = tagEntities; } // 11. 更新图片处理状态为完成 picture.ProcessingStatus = ProcessingStatus.Completed; picture.ProcessingProgress = 100; await dbContext.SaveChangesAsync(); // 更新任务状态 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Completed, 100); status.CompletedAt = DateTime.UtcNow; } catch (Exception ex) { // 更新状态为失败 await UpdatePictureStatus(task.PictureId, ProcessingStatus.Failed, 0, ex.Message); // 确保图片对象存在且已有的处理结果被保存 if (picture != null) { picture.ProcessingStatus = ProcessingStatus.Failed; picture.ProcessingError = ex.Message; try { await dbContext.SaveChangesAsync(); } catch (Exception saveEx) { _logger.LogError(saveEx, "保存失败状态时出错"); } } // 记录错误日志 _logger.LogError(ex, "图片处理失败: 图片ID={PictureId}", task.PictureId); } finally { // 如果是临时文件,处理完后删除 if (isTempFile && File.Exists(localFilePath)) { try { File.Delete(localFilePath); } catch (Exception ex) { _logger.LogWarning(ex, "删除临时文件失败: {FilePath}", localFilePath); } } // 清理活动任务 _activeTasks.TryRemove(task.Id, out _); // 继续处理队列中的下一个任务 StartProcessor(); } } private async Task UpdatePictureStatus(int pictureId, ProcessingStatus status, int progress, string? error = null) { if (_pictureStatus.TryGetValue(pictureId, out var currentStatus)) { currentStatus.Status = status; currentStatus.Progress = progress; currentStatus.Error = error; } // 更新数据库中的状态 await using var dbContext = await _contextFactory.CreateDbContextAsync(); var picture = await dbContext.Pictures.FindAsync(pictureId); if (picture != null) { picture.ProcessingStatus = status; picture.ProcessingProgress = progress; picture.ProcessingError = error; await dbContext.SaveChangesAsync(); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (_isDisposed) return; if (disposing) { _signal.Dispose(); try { Task.WhenAll(_processingTasks).Wait(5000); } catch (Exception ex) { _logger.LogWarning(ex, "等待处理任务完成时超时"); } } _isDisposed = true; } } /// /// 图片处理任务 /// public class PictureProcessingTask { public Guid Id { get; set; } public int PictureId { get; set; } public string OriginalFilePath { get; set; } = string.Empty; public int? UserId { get; set; } public DateTime CreatedAt { get; set; } }