mirror of
https://github.com/DrizzleTime/Foxel.git
synced 2026-05-20 07:40:57 +08:00
feat(task-processor): implement PictureTaskProcessor for background image processing tasks
This commit is contained in:
@@ -1,21 +1,16 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Channels;
|
||||
using Foxel.Models.DataBase;
|
||||
using Foxel.Services.AI;
|
||||
using Foxel.Services.Attributes;
|
||||
using Foxel.Services.Configuration;
|
||||
using Foxel.Services.Storage;
|
||||
using Foxel.Services.VectorDB;
|
||||
using Foxel.Utils;
|
||||
using Foxel.Services.Background.Processors;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace Foxel.Services.Background;
|
||||
|
||||
public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
|
||||
{
|
||||
private readonly Channel<PictureProcessingTask> _queue;
|
||||
private readonly ConcurrentDictionary<Guid, PictureProcessingTask> _activeTasks;
|
||||
private readonly ConcurrentDictionary<int, PictureProcessingStatus> _pictureStatus;
|
||||
private readonly Channel<Guid> _queue;
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly IDbContextFactory<MyDbContext> _contextFactory;
|
||||
private readonly List<Task> _processingTasks;
|
||||
@@ -30,83 +25,129 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
|
||||
IConfigService configuration,
|
||||
ILogger<BackgroundTaskQueue> logger)
|
||||
{
|
||||
_serviceProvider = serviceProvider;
|
||||
_serviceProvider = serviceProvider; // Keep IServiceProvider to resolve processors
|
||||
_contextFactory = contextFactory;
|
||||
_logger = logger;
|
||||
_activeTasks = new ConcurrentDictionary<Guid, PictureProcessingTask>();
|
||||
_pictureStatus = new ConcurrentDictionary<int, PictureProcessingStatus>();
|
||||
_processingTasks = new List<Task>();
|
||||
_maxConcurrentTasks = configuration.GetValueAsync("BackgroundTasks:MaxConcurrentTasks", 10).Result;
|
||||
_maxConcurrentTasks = configuration.GetValueAsync("BackgroundTasks:MaxConcurrentTasks", 10).Result; // 保持原有逻辑
|
||||
_signal = new SemaphoreSlim(_maxConcurrentTasks);
|
||||
var options = new BoundedChannelOptions(10000)
|
||||
var options = new BoundedChannelOptions(10000) // 保持原有逻辑
|
||||
{
|
||||
FullMode = BoundedChannelFullMode.Wait
|
||||
};
|
||||
_queue = Channel.CreateBounded<PictureProcessingTask>(options);
|
||||
_queue = Channel.CreateBounded<Guid>(options);
|
||||
|
||||
// 启动处理器,确保在服务启动时就开始处理队列
|
||||
StartProcessor();
|
||||
}
|
||||
|
||||
public async Task<Guid> QueuePictureProcessingTaskAsync(int pictureId, string originalFilePath)
|
||||
{
|
||||
var task = new PictureProcessingTask
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
PictureId = pictureId,
|
||||
OriginalFilePath = originalFilePath,
|
||||
CreatedAt = DateTime.UtcNow
|
||||
};
|
||||
|
||||
// 更新状态字典
|
||||
var status = new PictureProcessingStatus
|
||||
{
|
||||
TaskId = task.Id,
|
||||
PictureId = pictureId,
|
||||
Status = ProcessingStatus.Pending,
|
||||
Progress = 0,
|
||||
CreatedAt = DateTime.UtcNow
|
||||
};
|
||||
|
||||
// 将用户ID添加到任务状态中,这样可以按用户过滤任务
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
await using var dbContext = await _contextFactory.CreateDbContextAsync();
|
||||
var picture = await dbContext.Pictures
|
||||
.Include(p => p.User)
|
||||
.FirstOrDefaultAsync(p => p.Id == pictureId);
|
||||
|
||||
if (picture != null)
|
||||
var picture = await dbContext.Pictures.FindAsync(pictureId);
|
||||
if (picture == null)
|
||||
{
|
||||
status.PictureName = picture.Name;
|
||||
task.UserId = picture.UserId;
|
||||
_logger.LogError("无法为不存在的图片 PictureId: {PictureId} 创建处理任务", pictureId);
|
||||
throw new KeyNotFoundException($"找不到 PictureId: {pictureId} 的图片");
|
||||
}
|
||||
|
||||
_pictureStatus[pictureId] = status;
|
||||
_activeTasks[task.Id] = task;
|
||||
await _queue.Writer.WriteAsync(task);
|
||||
var payload = new PictureProcessingPayload
|
||||
{
|
||||
PictureId = pictureId,
|
||||
OriginalFilePath = originalFilePath,
|
||||
UserIdForPicture = picture.UserId
|
||||
};
|
||||
|
||||
var backgroundTask = new BackgroundTask
|
||||
{
|
||||
Type = TaskType.PictureProcessing,
|
||||
Payload = JsonSerializer.Serialize(payload),
|
||||
UserId = picture.UserId,
|
||||
RelatedEntityId = pictureId,
|
||||
Status = TaskExecutionStatus.Pending,
|
||||
CreatedAt = DateTime.UtcNow
|
||||
};
|
||||
|
||||
dbContext.BackgroundTasks.Add(backgroundTask);
|
||||
await dbContext.SaveChangesAsync();
|
||||
|
||||
await _queue.Writer.WriteAsync(backgroundTask.Id);
|
||||
_logger.LogInformation("图片处理任务已加入队列: TaskId={TaskId}, PictureId={PictureId}", backgroundTask.Id, pictureId);
|
||||
|
||||
// 启动处理器,如果没有正在运行
|
||||
StartProcessor();
|
||||
|
||||
return task.Id;
|
||||
return backgroundTask.Id;
|
||||
}
|
||||
|
||||
public async Task<List<PictureProcessingStatus>> GetUserTasksStatusAsync(int userId)
|
||||
public async Task<List<TaskDetailsDto>> GetUserTasksStatusAsync(int userId)
|
||||
{
|
||||
await using var dbContext = await _contextFactory.CreateDbContextAsync();
|
||||
var userPictureIds = await dbContext.Pictures
|
||||
.Where(p => p.UserId == userId &&
|
||||
(p.ProcessingStatus == ProcessingStatus.Pending ||
|
||||
p.ProcessingStatus == ProcessingStatus.Processing))
|
||||
.Select(p => p.Id)
|
||||
var tasks = await dbContext.BackgroundTasks
|
||||
.Where(bt => bt.UserId == userId)
|
||||
.OrderByDescending(bt => bt.CreatedAt)
|
||||
.ToListAsync();
|
||||
|
||||
return _pictureStatus.Values
|
||||
.Where(s => userPictureIds.Contains(s.PictureId))
|
||||
.OrderByDescending(s => s.CreatedAt)
|
||||
.ToList();
|
||||
var statusList = new List<TaskDetailsDto>();
|
||||
foreach (var task in tasks)
|
||||
{
|
||||
string taskName = $"任务: {task.Id}";
|
||||
if (task.Type == TaskType.PictureProcessing && task.RelatedEntityId.HasValue)
|
||||
{
|
||||
var picture = await dbContext.Pictures.FindAsync(task.RelatedEntityId.Value);
|
||||
if (picture != null)
|
||||
{
|
||||
taskName = picture.Name;
|
||||
}
|
||||
else
|
||||
{
|
||||
taskName = "图片处理 (图片信息丢失)";
|
||||
}
|
||||
}
|
||||
|
||||
statusList.Add(new TaskDetailsDto
|
||||
{
|
||||
TaskId = task.Id,
|
||||
TaskName = taskName,
|
||||
TaskType = task.Type,
|
||||
Status = task.Status,
|
||||
Progress = task.Progress,
|
||||
Error = task.ErrorMessage,
|
||||
CreatedAt = task.CreatedAt,
|
||||
CompletedAt = task.CompletedAt,
|
||||
RelatedEntityId = task.RelatedEntityId
|
||||
});
|
||||
}
|
||||
return statusList;
|
||||
}
|
||||
|
||||
public Task<PictureProcessingStatus?> GetPictureProcessingStatusAsync(int pictureId)
|
||||
public async Task<TaskDetailsDto?> GetPictureProcessingStatusAsync(int pictureId)
|
||||
{
|
||||
return Task.FromResult(_pictureStatus.GetValueOrDefault(pictureId));
|
||||
await using var dbContext = await _contextFactory.CreateDbContextAsync();
|
||||
var task = await dbContext.BackgroundTasks
|
||||
.FirstOrDefaultAsync(bt => bt.RelatedEntityId == pictureId && bt.Type == TaskType.PictureProcessing);
|
||||
|
||||
if (task == null)
|
||||
return null;
|
||||
|
||||
var pictureName = "未知图片";
|
||||
var picture = await dbContext.Pictures.FindAsync(pictureId);
|
||||
if (picture != null)
|
||||
{
|
||||
pictureName = picture.Name;
|
||||
}
|
||||
|
||||
return new TaskDetailsDto
|
||||
{
|
||||
TaskId = task.Id,
|
||||
TaskName = pictureName, // Picture name as task name
|
||||
TaskType = task.Type,
|
||||
Status = task.Status,
|
||||
Progress = task.Progress,
|
||||
Error = task.ErrorMessage,
|
||||
CreatedAt = task.CreatedAt,
|
||||
CompletedAt = task.CompletedAt,
|
||||
RelatedEntityId = task.RelatedEntityId
|
||||
};
|
||||
}
|
||||
|
||||
public async Task RestoreUnfinishedTasksAsync()
|
||||
@@ -114,38 +155,27 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
|
||||
try
|
||||
{
|
||||
await using var dbContext = await _contextFactory.CreateDbContextAsync();
|
||||
|
||||
// 获取所有未完成的图片处理任务
|
||||
var unfinishedPictures = await dbContext.Pictures
|
||||
.Where(p => p.ProcessingStatus == ProcessingStatus.Pending ||
|
||||
p.ProcessingStatus == ProcessingStatus.Processing)
|
||||
var unfinishedTasks = await dbContext.BackgroundTasks
|
||||
.Where(bt => bt.Type == TaskType.PictureProcessing &&
|
||||
(bt.Status == TaskExecutionStatus.Pending || bt.Status == TaskExecutionStatus.Processing))
|
||||
.ToListAsync();
|
||||
|
||||
if (unfinishedPictures.Any())
|
||||
if (unfinishedTasks.Any())
|
||||
{
|
||||
_logger.LogInformation("正在恢复 {Count} 个未完成的图片处理任务", unfinishedPictures.Count);
|
||||
|
||||
foreach (var picture in unfinishedPictures)
|
||||
_logger.LogInformation("正在恢复 {Count} 个未完成的图片处理任务", unfinishedTasks.Count);
|
||||
foreach (var task in unfinishedTasks)
|
||||
{
|
||||
// 构建原始文件路径
|
||||
string relativePath = picture.Path.TrimStart('/');
|
||||
string originalFilePath = Path.Combine(Directory.GetCurrentDirectory(), relativePath);
|
||||
if (File.Exists(originalFilePath))
|
||||
// 确保任务状态在数据库中被重置为 Pending,以防上次运行时停在 Processing 状态
|
||||
if (task.Status == TaskExecutionStatus.Processing)
|
||||
{
|
||||
// 重新加入队列
|
||||
await QueuePictureProcessingTaskAsync(picture.Id, originalFilePath);
|
||||
_logger.LogInformation("已恢复图片处理任务: ID={PictureId}, 路径={FilePath}", picture.Id, originalFilePath);
|
||||
}
|
||||
else
|
||||
{
|
||||
// 如果文件不存在,则标记为失败
|
||||
picture.ProcessingStatus = ProcessingStatus.Failed;
|
||||
picture.ProcessingError = "系统重启后找不到原始图片文件";
|
||||
_logger.LogWarning("无法恢复图片处理任务: ID={PictureId}, 找不到文件: {FilePath}", picture.Id, originalFilePath);
|
||||
task.Status = TaskExecutionStatus.Pending;
|
||||
task.StartedAt = null; // 重置开始时间
|
||||
// 保留 Progress 和 ErrorMessage 以供参考
|
||||
}
|
||||
await _queue.Writer.WriteAsync(task.Id);
|
||||
_logger.LogInformation("已恢复图片处理任务到队列: TaskId={TaskId}, RelatedEntityId={RelatedEntityId}", task.Id, task.RelatedEntityId);
|
||||
}
|
||||
|
||||
await dbContext.SaveChangesAsync();
|
||||
await dbContext.SaveChangesAsync(); // 保存状态更改
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -160,342 +190,140 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
|
||||
|
||||
private void StartProcessor()
|
||||
{
|
||||
// 添加新的处理任务,如果当前任务数量小于最大并发数
|
||||
while (_processingTasks.Count(t => !t.IsCompleted) < _maxConcurrentTasks)
|
||||
lock (_processingTasks) // 确保线程安全地访问 _processingTasks
|
||||
{
|
||||
_processingTasks.Add(Task.Run(ProcessTasksAsync));
|
||||
}
|
||||
// 清理已完成的任务
|
||||
_processingTasks.RemoveAll(t => t.IsCompleted);
|
||||
|
||||
// 清理已完成的任务
|
||||
_processingTasks.RemoveAll(t => t.IsCompleted);
|
||||
// 添加新的处理任务,如果当前任务数量小于最大并发数
|
||||
while (_processingTasks.Count < _maxConcurrentTasks && _queue.Reader.Count > 0)
|
||||
{
|
||||
_processingTasks.Add(Task.Run(ProcessTasksAsync));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessTasksAsync()
|
||||
{
|
||||
while (await _queue.Reader.WaitToReadAsync())
|
||||
{
|
||||
await _signal.WaitAsync();
|
||||
|
||||
try
|
||||
if (_queue.Reader.TryRead(out var taskId))
|
||||
{
|
||||
if (_queue.Reader.TryRead(out var task))
|
||||
await _signal.WaitAsync();
|
||||
try
|
||||
{
|
||||
await ProcessPictureAsync(task);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_signal.Release();
|
||||
}
|
||||
}
|
||||
}
|
||||
await using var checkDbContext = await _contextFactory.CreateDbContextAsync();
|
||||
var taskToCheck = await checkDbContext.BackgroundTasks.FindAsync(taskId);
|
||||
|
||||
private async Task ProcessPictureAsync(PictureProcessingTask task)
|
||||
{
|
||||
if (!_activeTasks.TryGetValue(task.Id, out _) || !_pictureStatus.TryGetValue(task.PictureId, out var status))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新状态为处理中
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 0);
|
||||
|
||||
string localFilePath = "";
|
||||
bool isTempFile = false;
|
||||
var dbContext = await _contextFactory.CreateDbContextAsync();
|
||||
var picture = await dbContext.Pictures.FindAsync(task.PictureId);
|
||||
|
||||
try
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var aiService = scope.ServiceProvider.GetRequiredService<IAiService>();
|
||||
var storageService = scope.ServiceProvider.GetRequiredService<IStorageService>();
|
||||
|
||||
// 1. 获取图片信息
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 10);
|
||||
|
||||
if (picture == null)
|
||||
{
|
||||
throw new Exception($"找不到ID为{task.PictureId}的图片");
|
||||
}
|
||||
|
||||
// 处理文件获取逻辑
|
||||
if (picture.StorageType == StorageType.Local)
|
||||
{
|
||||
// 本地存储,直接使用文件路径
|
||||
localFilePath = Path.Combine(Directory.GetCurrentDirectory(), picture.Path.TrimStart('/'));
|
||||
}
|
||||
else
|
||||
{
|
||||
// 非本地存储需要先下载文件
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 15);
|
||||
localFilePath = await storageService.ExecuteAsync(picture.StorageType,
|
||||
provider => provider.DownloadFileAsync(picture.Path));
|
||||
isTempFile = true;
|
||||
}
|
||||
|
||||
if (string.IsNullOrEmpty(localFilePath) || !File.Exists(localFilePath))
|
||||
{
|
||||
throw new Exception($"找不到图片文件: {localFilePath}");
|
||||
}
|
||||
|
||||
// 检查并生成缩略图(如果不存在)
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 20);
|
||||
|
||||
string thumbnailForAI = localFilePath; // 用于AI分析的缩略图路径
|
||||
|
||||
if (string.IsNullOrEmpty(picture.ThumbnailPath))
|
||||
{
|
||||
// 如果缩略图不存在,生成缩略图
|
||||
var thumbnailPath = Path.Combine(
|
||||
Path.GetDirectoryName(localFilePath)!,
|
||||
Path.GetFileNameWithoutExtension(Path.GetFileName(localFilePath)) + "_thumb.webp");
|
||||
|
||||
await ImageHelper.CreateThumbnailAsync(localFilePath, thumbnailPath, 500);
|
||||
thumbnailForAI = thumbnailPath;
|
||||
|
||||
// 更新缩略图路径到数据库
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 25);
|
||||
|
||||
if (picture.StorageType == StorageType.Local)
|
||||
{
|
||||
// 本地存储缩略图
|
||||
var relativeThumbnailPath =
|
||||
$"/Uploads/{Path.GetRelativePath("Uploads", Path.GetDirectoryName(thumbnailPath)!)}/{Path.GetFileName(thumbnailPath)}";
|
||||
picture.ThumbnailPath = relativeThumbnailPath.Replace('\\', '/');
|
||||
}
|
||||
else
|
||||
{
|
||||
// 上传缩略图并获取存储路径或元数据
|
||||
await using var thumbnailFileStream = new FileStream(thumbnailPath, FileMode.Open, FileAccess.Read);
|
||||
var thumbnailFileName = Path.GetFileName(thumbnailPath);
|
||||
var thumbnailContentType = "image/webp";
|
||||
|
||||
string thumbnailStoragePath = await storageService.ExecuteAsync(
|
||||
picture.StorageType,
|
||||
provider => provider.SaveAsync(thumbnailFileStream, thumbnailFileName, thumbnailContentType));
|
||||
|
||||
// 将路径或元数据存储到ThumbnailPath
|
||||
picture.ThumbnailPath = thumbnailStoragePath;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// 如果缩略图已存在,下载用于AI分析
|
||||
if (picture.StorageType != StorageType.Local)
|
||||
{
|
||||
thumbnailForAI = await storageService.ExecuteAsync(picture.StorageType,
|
||||
provider => provider.DownloadFileAsync(picture.ThumbnailPath));
|
||||
}
|
||||
else
|
||||
{
|
||||
thumbnailForAI = Path.Combine(Directory.GetCurrentDirectory(), picture.ThumbnailPath.TrimStart('/'));
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 提取EXIF信息
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 30);
|
||||
var exifInfo = await ImageHelper.ExtractExifInfoAsync(localFilePath);
|
||||
picture.ExifInfo = exifInfo;
|
||||
|
||||
// 4. 从EXIF中提取拍摄时间并确保是UTC格式
|
||||
picture.TakenAt = ImageHelper.ParseExifDateTime(exifInfo.DateTimeOriginal);
|
||||
|
||||
// 保存缩略图和EXIF信息的更改,确保这些基本信息即使在后续步骤失败时也能保存
|
||||
await dbContext.SaveChangesAsync();
|
||||
|
||||
// 5. 将缩略图转换为Base64并调用AI分析
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 50);
|
||||
string base64Image = await ImageHelper.ConvertImageToBase64(thumbnailForAI);
|
||||
var (title, description) = await aiService.AnalyzeImageAsync(base64Image);
|
||||
|
||||
// 6. 确定最终标题和描述
|
||||
string finalTitle = !string.IsNullOrWhiteSpace(title) && title != "AI生成的标题"
|
||||
? title
|
||||
: Path.GetFileNameWithoutExtension(localFilePath);
|
||||
|
||||
string finalDescription = !string.IsNullOrWhiteSpace(description) && description != "AI生成的描述"
|
||||
? description
|
||||
: picture.Description;
|
||||
|
||||
picture.Name = finalTitle;
|
||||
picture.Description = finalDescription;
|
||||
|
||||
// 7. 生成嵌入向量
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 60);
|
||||
var combinedText = $"{finalTitle}. {finalDescription}";
|
||||
var embedding = await aiService.GetEmbeddingAsync(combinedText);
|
||||
picture.Embedding = embedding;
|
||||
if (picture.UserId.HasValue && embedding.Length > 0)
|
||||
{
|
||||
var vectorDbService = scope.ServiceProvider.GetRequiredService<IVectorDbService>();
|
||||
var pictureVector = new Models.Vector.PictureVector
|
||||
{
|
||||
Id = (ulong)picture.Id,
|
||||
Name = picture.Name,
|
||||
Embedding = embedding
|
||||
};
|
||||
await vectorDbService.AddPictureToUserCollectionAsync(picture.UserId.Value, pictureVector);
|
||||
}
|
||||
|
||||
// 8. 获取所有可用标签名称
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 70);
|
||||
var availableTagNames = await dbContext.Tags.Select(t => t.Name).ToListAsync();
|
||||
|
||||
// 9. 获取匹配的标签名称 - 从图片生成标签
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 80);
|
||||
var matchedTagNames = await aiService.GenerateTagsFromImageAsync(base64Image, availableTagNames, true);
|
||||
|
||||
// 10. 处理标签
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Processing, 90);
|
||||
var user = await dbContext.Users
|
||||
.Include(u => u.Tags)
|
||||
.FirstOrDefaultAsync(u => u.Id == picture.UserId);
|
||||
|
||||
if (user != null && matchedTagNames.Any())
|
||||
{
|
||||
var tagEntities = new List<Tag>();
|
||||
foreach (var tagName in matchedTagNames)
|
||||
{
|
||||
var existingTag = await dbContext.Tags
|
||||
.Include(t => t.Users)
|
||||
.FirstOrDefaultAsync(t => t.Name.ToLower() == tagName.ToLower());
|
||||
|
||||
if (existingTag != null)
|
||||
if (taskToCheck == null)
|
||||
{
|
||||
tagEntities.Add(existingTag);
|
||||
user.Tags ??= new List<Tag>();
|
||||
if (user.Tags.All(t => t.Id != existingTag.Id))
|
||||
_logger.LogWarning("任务 TaskId={TaskId} 在开始处理前未找到,可能已被删除。", taskId);
|
||||
continue; // Skip this task
|
||||
}
|
||||
|
||||
if (taskToCheck.Status != TaskExecutionStatus.Pending && taskToCheck.Status != TaskExecutionStatus.Processing)
|
||||
{
|
||||
_logger.LogInformation("任务 TaskId={TaskId} 状态为 {Status},跳过处理。", taskId, taskToCheck.Status);
|
||||
continue; // Skip this task, already completed or failed by another process
|
||||
}
|
||||
|
||||
taskToCheck.Status = TaskExecutionStatus.Processing;
|
||||
taskToCheck.StartedAt = DateTime.UtcNow;
|
||||
await checkDbContext.SaveChangesAsync();
|
||||
|
||||
_logger.LogInformation("开始处理任务: TaskId={TaskId}, Type={TaskType}", taskToCheck.Id, taskToCheck.Type);
|
||||
|
||||
try
|
||||
{
|
||||
ITaskProcessor processor;
|
||||
// Processors are typically scoped, so we create a scope here.
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
switch (taskToCheck.Type)
|
||||
{
|
||||
user.Tags.Add(existingTag);
|
||||
case TaskType.PictureProcessing:
|
||||
processor = scope.ServiceProvider.GetRequiredService<PictureTaskProcessor>();
|
||||
break;
|
||||
// Future task types can be added here
|
||||
default:
|
||||
_logger.LogError("未找到任务类型 {TaskType} 的处理器: TaskId={TaskId}", taskToCheck.Type, taskToCheck.Id);
|
||||
await MarkTaskAsFailedByQueue(taskToCheck.Id, $"未找到任务类型 {taskToCheck.Type} 的处理器。");
|
||||
continue; // Continue to next task in queue
|
||||
}
|
||||
await processor.ProcessAsync(taskToCheck); // Processor handles its own final status update
|
||||
}
|
||||
else
|
||||
catch (Exception procEx)
|
||||
{
|
||||
var newTag = new Tag { Name = tagName.Trim(), Description = tagName.Trim() };
|
||||
dbContext.Tags.Add(newTag);
|
||||
await dbContext.SaveChangesAsync();
|
||||
user.Tags ??= new List<Tag>();
|
||||
user.Tags.Add(newTag);
|
||||
tagEntities.Add(newTag);
|
||||
_logger.LogError(procEx, "处理器执行任务 TaskId={TaskId} 时发生错误。", taskToCheck.Id);
|
||||
await MarkTaskAsFailedByQueue(taskToCheck.Id, $"处理器执行时发生错误: {procEx.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
picture.Tags = tagEntities;
|
||||
}
|
||||
|
||||
// 11. 更新图片处理状态为完成
|
||||
picture.ProcessingStatus = ProcessingStatus.Completed;
|
||||
picture.ProcessingProgress = 100;
|
||||
await dbContext.SaveChangesAsync();
|
||||
|
||||
// 更新任务状态
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Completed, 100);
|
||||
status.CompletedAt = DateTime.UtcNow;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 更新状态为失败
|
||||
await UpdatePictureStatus(task.PictureId, ProcessingStatus.Failed, 0, ex.Message);
|
||||
|
||||
// 确保图片对象存在且已有的处理结果被保存
|
||||
if (picture != null)
|
||||
{
|
||||
picture.ProcessingStatus = ProcessingStatus.Failed;
|
||||
picture.ProcessingError = ex.Message;
|
||||
|
||||
try
|
||||
{
|
||||
await dbContext.SaveChangesAsync();
|
||||
}
|
||||
catch (Exception saveEx)
|
||||
{
|
||||
_logger.LogError(saveEx, "保存失败状态时出错");
|
||||
}
|
||||
}
|
||||
|
||||
// 记录错误日志
|
||||
_logger.LogError(ex, "图片处理失败: 图片ID={PictureId}", task.PictureId);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// 如果是临时文件,处理完后删除
|
||||
if (isTempFile && File.Exists(localFilePath))
|
||||
{
|
||||
try
|
||||
{
|
||||
File.Delete(localFilePath);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "删除临时文件失败: {FilePath}", localFilePath);
|
||||
_logger.LogError(ex, "处理任务 TaskId={TaskId} 时发生未捕获的异常。", taskId);
|
||||
await MarkTaskAsFailedByQueue(taskId, $"处理过程中发生未捕获的异常: {ex.Message}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
_signal.Release();
|
||||
StartProcessor();
|
||||
}
|
||||
}
|
||||
|
||||
// 清理活动任务
|
||||
_activeTasks.TryRemove(task.Id, out _);
|
||||
|
||||
// 继续处理队列中的下一个任务
|
||||
StartProcessor();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UpdatePictureStatus(int pictureId, ProcessingStatus status, int progress, string? error = null)
|
||||
private async Task MarkTaskAsFailedByQueue(Guid taskId, string errorMessage)
|
||||
{
|
||||
if (_pictureStatus.TryGetValue(pictureId, out var currentStatus))
|
||||
{
|
||||
currentStatus.Status = status;
|
||||
currentStatus.Progress = progress;
|
||||
currentStatus.Error = error;
|
||||
}
|
||||
|
||||
// 更新数据库中的状态
|
||||
await using var dbContext = await _contextFactory.CreateDbContextAsync();
|
||||
var picture = await dbContext.Pictures.FindAsync(pictureId);
|
||||
if (picture != null)
|
||||
var task = await dbContext.BackgroundTasks.FindAsync(taskId);
|
||||
if (task != null)
|
||||
{
|
||||
picture.ProcessingStatus = status;
|
||||
picture.ProcessingProgress = progress;
|
||||
picture.ProcessingError = error;
|
||||
task.Status = TaskExecutionStatus.Failed;
|
||||
task.ErrorMessage = errorMessage;
|
||||
task.Progress = task.Progress; // Keep existing progress or reset to 0
|
||||
task.CompletedAt = DateTime.UtcNow;
|
||||
if (!task.StartedAt.HasValue) // Ensure StartedAt is set if not already
|
||||
{
|
||||
task.StartedAt = task.CreatedAt;
|
||||
}
|
||||
await dbContext.SaveChangesAsync();
|
||||
_logger.LogWarning("任务由队列标记为失败: TaskId={TaskId}, Error='{Error}'", taskId, errorMessage);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning("尝试由队列标记为失败,但未找到任务: TaskId={TaskId}", taskId);
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
private void Dispose(bool disposing)
|
||||
{
|
||||
if (_isDisposed) return;
|
||||
|
||||
if (disposing)
|
||||
_queue.Writer.TryComplete(); // 尝试完成队列写入
|
||||
|
||||
// 等待所有处理任务完成,设置超时
|
||||
var allProcessingTasksDone = Task.WhenAll(_processingTasks);
|
||||
try
|
||||
{
|
||||
_signal.Dispose();
|
||||
try
|
||||
if (!allProcessingTasksDone.Wait(TimeSpan.FromSeconds(10))) // 例如,等待10秒
|
||||
{
|
||||
Task.WhenAll(_processingTasks).Wait(5000);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "等待处理任务完成时超时");
|
||||
_logger.LogWarning("并非所有后台任务都在 Dispose 超时内完成。");
|
||||
}
|
||||
}
|
||||
catch (AggregateException ae)
|
||||
{
|
||||
ae.Handle(ex =>
|
||||
{
|
||||
_logger.LogError(ex, "后台任务在 Dispose 期间抛出异常。");
|
||||
return true; // 标记为已处理
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "等待处理任务完成时发生错误。");
|
||||
}
|
||||
|
||||
_signal.Dispose();
|
||||
_isDisposed = true;
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 图片处理任务
|
||||
/// </summary>
|
||||
public class PictureProcessingTask
|
||||
{
|
||||
public Guid Id { get; set; }
|
||||
public int PictureId { get; set; }
|
||||
public string OriginalFilePath { get; set; } = string.Empty;
|
||||
public int? UserId { get; set; }
|
||||
public DateTime CreatedAt { get; set; }
|
||||
}
|
||||
Reference in New Issue
Block a user