Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating default middleware on the hive #8

Merged
merged 1 commit into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions buzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@ package buzz

import (
"context"

backoff "github.com/cenkalti/backoff/v4"
)

var (
DefaultBackoff = backoff.NewExponentialBackOff()
)

// Task represents the thing that you want accomplished.
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@ module github.com/thenorthnate/buzz

go 1.22.0

require github.com/cenkalti/backoff/v4 v4.2.1

require github.com/thenorthnate/evs v0.1.0
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/thenorthnate/evs v0.1.0 h1:KvWzP/ZOwA7ClXbXgEUwp1HTmyxxigsndSMotQJjLjY=
github.com/thenorthnate/evs v0.1.0/go.mod h1:VUtM+gde4XxulEYrF1bndmIUq/X9RyxIh11qn0Ofv/A=
49 changes: 36 additions & 13 deletions hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,63 @@ import (
"sync"
)

// Hive contains all the workers and synchronizes a graceful shutdown of all the workers.
type Hive struct {
colony []*Worker
block sync.WaitGroup
notifyComplete chan *Worker
notifyComplete chan struct{}
middleware []MiddleFunc
closed chan struct{}
}

// New initializes a new [*Hive].
func New() *Hive {
hive := &Hive{
notifyComplete: make(chan *Worker, 1),
notifyComplete: make(chan struct{}, 1),
closed: make(chan struct{}, 1),
}
hive.startCleanupWorker()
return hive
}

func (hive *Hive) startCleanupWorker() {
go func() {
for completed := range hive.notifyComplete {
index := -1
for i, w := range hive.colony {
if completed == w {
index = i
break
}
}
if index >= 0 {
hive.colony = slices.Delete(hive.colony, index, index+1)
}
for range hive.notifyComplete {
hive.removeDoneWorkers()
}
hive.removeDoneWorkers()
hive.closed <- struct{}{}
}()
}

func (hive *Hive) removeDoneWorkers() {
finished := []int{}
for i := range hive.colony {
if hive.colony[i].done {
finished = append(finished, i)
}
}
for i := len(finished) - 1; i >= 0; i-- {
hive.colony = slices.Delete(hive.colony, i, i+1)
}
}

// Use adds the given MiddleFunc's to the hive as default functions. They will get added
// to each worker that is added to the hive. They are placed as the earliest middleware
// in the chain, in the same order they are added here. So, if you add A, B, C to the hive,
// and add a worker that already has D, and E middleware, you will end up with a middleware
// chain on that worker equivalent to A, B, C, D, E. From that point, it's important to note
// that any closures that are added as middleware to the hive may behave in unexpected ways
// since each worker will get the same closure (unless that is your intent!).
func (hive *Hive) Use(middleFunc ...MiddleFunc) {
hive.middleware = append(hive.middleware, middleFunc...)
}

// Submit starts the worker running and adds it to the hive.
func (hive *Hive) Submit(worker *Worker) {
hive.block.Add(1)
worker.notifyComplete = hive.notifyComplete
worker.middleware = append(hive.middleware, worker.middleware...)
hive.colony = append(hive.colony, worker)
go worker.run(&hive.block)
}
Expand All @@ -52,4 +74,5 @@ func (hive *Hive) StopAll() {
}
hive.block.Wait()
close(hive.notifyComplete)
<-hive.closed
}
40 changes: 40 additions & 0 deletions hive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package buzz

import (
"context"
"testing"
)

func TestHive_RemoveDoneWorkers(t *testing.T) {
hive := New()
worker := NewWorker(&mockTask{})
worker.done = true
hive.colony = append(hive.colony, worker)
hive.removeDoneWorkers()
if len(hive.colony) != 0 {
t.Fatalf("hive is supposed to be empty but has %v workers in it still", len(hive.colony))
}
}

func TestHive_Middleware(t *testing.T) {
waiter := make(chan struct{}, 1)
hive := New()
hive.Use(RecoveryMiddleware)
worker := NewWorker(&mockTask{dofunc: func(ctx context.Context) error {
select {
case waiter <- struct{}{}:
default:
}
<-ctx.Done()
return nil
}})
hive.Submit(worker)
<-waiter
if len(worker.middleware) != 1 {
t.Fatalf("worker was supposed to have 1 middlefunc but had %v", len(worker.middleware))
}
hive.StopAll()
if len(hive.colony) > 0 {
t.Fatalf("the hive is supposed to be empty but it still has %v workers in it", len(hive.colony))
}
}
13 changes: 9 additions & 4 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ type Worker struct {
cancel context.CancelFunc
tick time.Duration
tickChan <-chan time.Time
notifyComplete chan *Worker
notifyComplete chan struct{}
done bool
}

// NewWorker wraps the task and returns a worker that can be submitted to the hive.
func NewWorker(task Task) *Worker {
tickChan := make(chan time.Time)
close(tickChan)
Expand All @@ -25,13 +27,15 @@ func NewWorker(task Task) *Worker {
}
}

// Use adds the given middleware functions to the Bee.
// Use adds the given middleware functions to the Worker.
func (w *Worker) Use(middleware ...MiddleFunc) *Worker {
w.middleware = append(w.middleware, middleware...)
return w
}

// Use adds the given middleware functions to the Bee.
// Tick provides a mechanism through which you can schedule your task to get run on a
// regular interval. By default the tick time is zero meaning that the task is called
// repeatedly as fast as the computer executes it.
func (w *Worker) Tick(tick time.Duration) *Worker {
w.tick = tick
return w
Expand Down Expand Up @@ -94,5 +98,6 @@ func (w *Worker) Stop() {
if w.cancel != nil {
w.cancel()
}
w.notifyComplete <- w
w.done = true
w.notifyComplete <- struct{}{}
}
12 changes: 7 additions & 5 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ func (task *mockTask) Do(ctx context.Context) error {

func TestWorker(t *testing.T) {
waiter := make(chan struct{}, 1)
counter := 0
task := &mockTask{dofunc: func(ctx context.Context) error {
counter++
if counter == 2 {
waiter <- struct{}{}
select {
case waiter <- struct{}{}:
default:
}
<-ctx.Done()
return nil
}}
worker := NewWorker(task)

hive := New()
hive.Submit(worker)
<-waiter
hive.StopAll()
if len(hive.colony) > 0 {
t.Fatalf("the hive is supposed to be empty but it still has %v workers in it", len(hive.colony))
}
}

func TestWorkerAssembleCallChain(t *testing.T) {
Expand Down
Loading