| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- package task
- import (
- "context"
- "v2ray.com/core/common/signal/semaphore"
- )
- type Task func() error
- type executionContext struct {
- ctx context.Context
- tasks []Task
- onSuccess Task
- onFailure Task
- }
- func (c *executionContext) executeTask() error {
- if len(c.tasks) == 0 {
- return nil
- }
- // Reuse current goroutine if we only have one task to run.
- if len(c.tasks) == 1 && c.ctx == nil {
- return c.tasks[0]()
- }
- ctx := context.Background()
- if c.ctx != nil {
- ctx = c.ctx
- }
- return executeParallel(ctx, c.tasks)
- }
- func (c *executionContext) run() error {
- err := c.executeTask()
- if err == nil && c.onSuccess != nil {
- return c.onSuccess()
- }
- if err != nil && c.onFailure != nil {
- return c.onFailure()
- }
- return err
- }
- type ExecutionOption func(*executionContext)
- func WithContext(ctx context.Context) ExecutionOption {
- return func(c *executionContext) {
- c.ctx = ctx
- }
- }
- func Parallel(tasks ...Task) ExecutionOption {
- return func(c *executionContext) {
- c.tasks = append(c.tasks, tasks...)
- }
- }
- func Sequential(tasks ...Task) ExecutionOption {
- return func(c *executionContext) {
- if len(tasks) == 0 {
- return
- }
- if len(tasks) == 1 {
- c.tasks = append(c.tasks, tasks[0])
- return
- }
- c.tasks = append(c.tasks, func() error {
- return execute(tasks...)
- })
- }
- }
- func OnSuccess(task Task) ExecutionOption {
- return func(c *executionContext) {
- c.onSuccess = task
- }
- }
- func OnFailure(task Task) ExecutionOption {
- return func(c *executionContext) {
- c.onFailure = task
- }
- }
- func Single(task Task, opts ...ExecutionOption) Task {
- return Run(append([]ExecutionOption{Sequential(task)}, opts...)...)
- }
- func Run(opts ...ExecutionOption) Task {
- var c executionContext
- for _, opt := range opts {
- opt(&c)
- }
- return func() error {
- return c.run()
- }
- }
- // execute runs a list of tasks sequentially, returns the first error encountered or nil if all tasks pass.
- func execute(tasks ...Task) error {
- for _, task := range tasks {
- if err := task(); err != nil {
- return err
- }
- }
- return nil
- }
- // executeParallel executes a list of tasks asynchronously, returns the first error encountered or nil if all tasks pass.
- func executeParallel(ctx context.Context, tasks []Task) error {
- n := len(tasks)
- s := semaphore.New(n)
- done := make(chan error, 1)
- for _, task := range tasks {
- <-s.Wait()
- go func(f Task) {
- err := f()
- if err == nil {
- s.Signal()
- return
- }
- select {
- case done <- err:
- default:
- }
- }(task)
- }
- for i := 0; i < n; i++ {
- select {
- case err := <-done:
- return err
- case <-ctx.Done():
- return ctx.Err()
- case <-s.Wait():
- }
- }
- return nil
- }
|