feat(background): add visual recognition task and refactor picture processing

This commit is contained in:
ShiYu
2025-06-08 15:40:08 +08:00
parent 7ad8b6c826
commit 39c40d2746
17 changed files with 473 additions and 269 deletions

View File

@@ -1,4 +1,3 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
using Foxel.Models.DataBase;
@@ -25,19 +24,17 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
IConfigService configuration,
ILogger<BackgroundTaskQueue> logger)
{
_serviceProvider = serviceProvider; // Keep IServiceProvider to resolve processors
_serviceProvider = serviceProvider;
_contextFactory = contextFactory;
_logger = logger;
_processingTasks = new List<Task>();
_maxConcurrentTasks = configuration.GetValueAsync("BackgroundTasks:MaxConcurrentTasks", 10).Result; // 保持原有逻辑
_signal = new SemaphoreSlim(_maxConcurrentTasks);
var options = new BoundedChannelOptions(10000) // 保持原有逻辑
var options = new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Guid>(options);
// 启动处理器,确保在服务启动时就开始处理队列
StartProcessor();
}
@@ -51,7 +48,7 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
throw new KeyNotFoundException($"找不到 PictureId: {pictureId} 的图片");
}
var payload = new PictureProcessingPayload
var payload = new Processors.PictureProcessingPayload
{
PictureId = pictureId,
OriginalFilePath = originalFilePath,
@@ -74,7 +71,39 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
await _queue.Writer.WriteAsync(backgroundTask.Id);
_logger.LogInformation("图片处理任务已加入队列: TaskId={TaskId}, PictureId={PictureId}", backgroundTask.Id, pictureId);
StartProcessor();
StartProcessor(); // Ensure processor is running or starts for new items
return backgroundTask.Id;
}
public async Task<Guid> QueueVisualRecognitionTaskAsync(VisualRecognitionPayload payload)
{
await using var dbContext = await _contextFactory.CreateDbContextAsync();
// Optionally, validate picture existence again, though PictureTaskProcessor should ensure it.
var picture = await dbContext.Pictures.FindAsync(payload.PictureId);
if (picture == null)
{
_logger.LogError("无法为不存在的图片 PictureId: {PictureId} 创建视觉识别任务", payload.PictureId);
throw new KeyNotFoundException($"尝试为 PictureId: {payload.PictureId} 创建视觉识别任务时找不到图片");
}
var backgroundTask = new BackgroundTask
{
Type = TaskType.VisualRecognition, // New TaskType
Payload = JsonSerializer.Serialize(payload),
UserId = payload.UserIdForPicture, // Comes from the payload
RelatedEntityId = payload.PictureId,
Status = TaskExecutionStatus.Pending,
CreatedAt = DateTime.UtcNow
};
dbContext.BackgroundTasks.Add(backgroundTask);
await dbContext.SaveChangesAsync();
await _queue.Writer.WriteAsync(backgroundTask.Id);
_logger.LogInformation("视觉识别任务已加入队列: TaskId={TaskId}, PictureId={PictureId}", backgroundTask.Id, payload.PictureId);
StartProcessor(); // Ensure processor is running or starts for new items
return backgroundTask.Id;
}
@@ -96,13 +125,29 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
var picture = await dbContext.Pictures.FindAsync(task.RelatedEntityId.Value);
if (picture != null)
{
taskName = picture.Name;
taskName = $"图片处理: {picture.Name}";
}
else
{
taskName = "图片处理 (图片信息丢失)";
}
}
else if (task.Type == TaskType.VisualRecognition && task.RelatedEntityId.HasValue) // Added for VisualRecognition
{
var picture = await dbContext.Pictures.FindAsync(task.RelatedEntityId.Value);
if (picture != null)
{
taskName = $"视觉识别: {picture.Name}";
}
else
{
taskName = "视觉识别 (图片信息丢失)";
}
}
else
{
taskName = $"任务: {task.Id} ({task.Type})"; // Generic name
}
statusList.Add(new TaskDetailsDto
{
@@ -156,30 +201,28 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
{
await using var dbContext = await _contextFactory.CreateDbContextAsync();
var unfinishedTasks = await dbContext.BackgroundTasks
.Where(bt => bt.Type == TaskType.PictureProcessing &&
.Where(bt => (bt.Type == TaskType.PictureProcessing || bt.Type == TaskType.VisualRecognition) && // Added VisualRecognition
(bt.Status == TaskExecutionStatus.Pending || bt.Status == TaskExecutionStatus.Processing))
.ToListAsync();
if (unfinishedTasks.Any())
{
_logger.LogInformation("正在恢复 {Count} 个未完成的图片处理任务", unfinishedTasks.Count);
_logger.LogInformation("正在恢复 {Count} 个未完成的任务", unfinishedTasks.Count);
foreach (var task in unfinishedTasks)
{
// 确保任务状态在数据库中被重置为 Pending以防上次运行时停在 Processing 状态
if (task.Status == TaskExecutionStatus.Processing)
{
task.Status = TaskExecutionStatus.Pending;
task.StartedAt = null; // 重置开始时间
// 保留 Progress 和 ErrorMessage 以供参考
task.StartedAt = null;
}
await _queue.Writer.WriteAsync(task.Id);
_logger.LogInformation("已恢复图片处理任务到队列: TaskId={TaskId}, RelatedEntityId={RelatedEntityId}", task.Id, task.RelatedEntityId);
_logger.LogInformation("已恢复任务到队列: TaskId={TaskId}, Type={TaskType}, RelatedEntityId={RelatedEntityId}", task.Id, task.Type, task.RelatedEntityId);
}
await dbContext.SaveChangesAsync(); // 保存状态更改
await dbContext.SaveChangesAsync();
}
else
{
_logger.LogInformation("没有需要恢复的图片处理任务");
_logger.LogInformation("没有需要恢复的任务");
}
}
catch (Exception ex)
@@ -243,7 +286,9 @@ public sealed class BackgroundTaskQueue : IBackgroundTaskQueue, IDisposable
case TaskType.PictureProcessing:
processor = scope.ServiceProvider.GetRequiredService<PictureTaskProcessor>();
break;
// Future task types can be added here
case TaskType.VisualRecognition: // Added case for VisualRecognition
processor = scope.ServiceProvider.GetRequiredService<VisualRecognitionTaskProcessor>();
break;
default:
_logger.LogError("未找到任务类型 {TaskType} 的处理器: TaskId={TaskId}", taskToCheck.Type, taskToCheck.Id);
await MarkTaskAsFailedByQueue(taskToCheck.Id, $"未找到任务类型 {taskToCheck.Type} 的处理器。");