86 lines
2.4 KiB
Go
86 lines
2.4 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/charmbracelet/log"
|
|
"github.com/krau/SaveAny-Bot/config"
|
|
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
|
"github.com/krau/SaveAny-Bot/pkg/queue"
|
|
)
|
|
|
|
var queueInstance *queue.TaskQueue[Exectable]
|
|
|
|
type Exectable interface {
|
|
Type() tasktype.TaskType
|
|
TaskID() string
|
|
Execute(ctx context.Context) error
|
|
}
|
|
|
|
func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan struct{}) {
|
|
logger := log.FromContext(ctx)
|
|
execHooks := config.C().Hook.Exec
|
|
for {
|
|
semaphore <- struct{}{}
|
|
qtask, err := qe.Get()
|
|
if err != nil {
|
|
logger.Error("Failed to get task from queue:", err)
|
|
break // queue closed and empty
|
|
}
|
|
task := qtask.Data
|
|
logger.Infof("Processing task: %s", task.TaskID())
|
|
if err := ExecCommandString(qtask.Context(), execHooks.TaskBeforeStart); err != nil {
|
|
logger.Errorf("Failed to execute before start hook for task %s: %v", task.TaskID(), err)
|
|
}
|
|
if err := task.Execute(qtask.Context()); err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
logger.Infof("Task %s was canceled", task.TaskID())
|
|
if err := ExecCommandString(ctx, execHooks.TaskCancel); err != nil {
|
|
logger.Errorf("Failed to execute cancel hook for task %s: %v", task.TaskID(), err)
|
|
}
|
|
} else {
|
|
logger.Errorf("Failed to execute task %s: %v", task.TaskID(), err)
|
|
if err := ExecCommandString(ctx, execHooks.TaskFail); err != nil {
|
|
logger.Errorf("Failed to execute fail hook for task %s: %v", task.TaskID(), err)
|
|
}
|
|
}
|
|
} else {
|
|
logger.Infof("Task %s completed successfully", task.TaskID())
|
|
if err := ExecCommandString(ctx, execHooks.TaskSuccess); err != nil {
|
|
logger.Errorf("Failed to execute success hook for task %s: %v", task.TaskID(), err)
|
|
}
|
|
}
|
|
qe.Done(qtask.ID)
|
|
<-semaphore
|
|
}
|
|
}
|
|
|
|
func Run(ctx context.Context) {
|
|
log.FromContext(ctx).Info("Start processing tasks...")
|
|
semaphore := make(chan struct{}, config.C().Workers)
|
|
if queueInstance == nil {
|
|
queueInstance = queue.NewTaskQueue[Exectable]()
|
|
}
|
|
for range config.C().Workers {
|
|
go worker(ctx, queueInstance, semaphore)
|
|
}
|
|
|
|
}
|
|
|
|
func AddTask(ctx context.Context, task Exectable) error {
|
|
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task))
|
|
}
|
|
|
|
func CancelTask(ctx context.Context, id string) error {
|
|
err := queueInstance.CancelTask(id)
|
|
return err
|
|
}
|
|
|
|
func GetLength(ctx context.Context) int {
|
|
if queueInstance == nil {
|
|
return 0
|
|
}
|
|
return queueInstance.ActiveLength()
|
|
}
|