Skip to content

Commit

Permalink
Improved testing (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
thenorthnate authored Mar 3, 2024
1 parent 583c09c commit 8d210e9
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 87 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

Robust workers for Go

## Overview
The concept defined in this package is intended to be quite configurable. Using middleware
can be a very powerful design. You may want to define middleware to recover from panics,
inject content into the context (like a logger, etc), perform setup or teardown steps, or
any number of other things that you can think of!


[doc-img]: https://pkg.go.dev/badge/github.com/thenorthnate/buzz
[doc]: https://pkg.go.dev/github.com/thenorthnate/buzz
Expand Down
1 change: 1 addition & 0 deletions buzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ func (chain *CallChain) Next(ctx context.Context) error {
return chain.exec(ctx, chain.next)
}

// MiddleFunc defines the type of any middleware that can be used in the hive.
type MiddleFunc func(ctx context.Context, chain *CallChain) error
33 changes: 33 additions & 0 deletions external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package buzz_test

import (
"context"
"log"

"github.com/thenorthnate/buzz"
)

type logTask struct{}

func (t *logTask) Do(ctx context.Context) error {
log.Println("message here")
return nil
}

func Example() {
// This defines some middleware that logs before and after the task runs
logger := func(ctx context.Context, chain *buzz.CallChain) error {
// This happens before the task runs
log.Println("Starting!")
// This call runs the rest of the middleware and the task
err := chain.Next(ctx)
// This runs after the task has completed
log.Printf("Finished with err=[%v]\n", err)
return err
}
hive := buzz.New()
worker := buzz.NewWorker(&logTask{}).Use(logger)
hive.Submit(worker)
// Some time later... during shutdown
hive.StopAll()
}
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
module github.com/thenorthnate/buzz

go 1.22.0

require github.com/thenorthnate/evs v0.1.0
2 changes: 0 additions & 2 deletions go.sum

This file was deleted.

2 changes: 1 addition & 1 deletion hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (hive *Hive) startCleanupWorker() {
func (hive *Hive) removeDoneWorkers() {
finished := []int{}
for i := range hive.colony {
if hive.colony[i].done {
if hive.colony[i].done.Load() {
finished = append(finished, i)
}
}
Expand Down
52 changes: 49 additions & 3 deletions hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@ package buzz

import (
"context"
"log"
"testing"
"time"
)

func logMiddleware(ctx context.Context, chain *CallChain) error {
log.Println("task starting")
err := chain.Next(ctx)
log.Println("task complete")
return err
}

func TestHive_RemoveDoneWorkers(t *testing.T) {
hive := New()
worker := NewWorker(&mockTask{})
worker.done = true
worker.done.Store(true)
hive.colony = append(hive.colony, worker)
hive.removeDoneWorkers()
if len(hive.colony) != 0 {
Expand All @@ -19,15 +28,14 @@ func TestHive_RemoveDoneWorkers(t *testing.T) {
func TestHive_Middleware(t *testing.T) {
waiter := make(chan struct{}, 1)
hive := New()
hive.Use(RecoveryMiddleware)
worker := NewWorker(&mockTask{dofunc: func(ctx context.Context) error {
select {
case waiter <- struct{}{}:
default:
}
<-ctx.Done()
return nil
}})
}}).Use(logMiddleware)
hive.Submit(worker)
<-waiter
if len(worker.middleware) != 1 {
Expand All @@ -38,3 +46,41 @@ func TestHive_Middleware(t *testing.T) {
t.Fatalf("the hive is supposed to be empty but it still has %v workers in it", len(hive.colony))
}
}

func TestHive_MultipleWorkers(t *testing.T) {
waiter1 := make(chan struct{}, 1)
waiter2 := make(chan struct{}, 1)
hive := New()
hive.Use(logMiddleware)
worker1 := NewWorker(&mockTask{dofunc: func(ctx context.Context) error {
select {
case waiter1 <- struct{}{}:
default:
}
<-ctx.Done()
return nil
}})
hive.Submit(worker1)
worker2 := NewWorker(&mockTask{dofunc: func(ctx context.Context) error {
select {
case waiter2 <- struct{}{}:
default:
}
<-ctx.Done()
return nil
}}).Tick(time.Microsecond)
hive.Submit(worker2)

<-waiter1
if len(worker1.middleware) != 1 {
t.Fatalf("worker1 was supposed to have 1 middlefunc but had %v", len(worker1.middleware))
}
<-waiter2
if len(worker2.middleware) != 1 {
t.Fatalf("worker2 was supposed to have 1 middlefunc but had %v", len(worker2.middleware))
}
hive.StopAll()
if len(hive.colony) > 0 {
t.Fatalf("the hive is supposed to be empty but it still has %v workers in it", len(hive.colony))
}
}
37 changes: 0 additions & 37 deletions integration_test.go

This file was deleted.

16 changes: 0 additions & 16 deletions middleware.go

This file was deleted.

24 changes: 0 additions & 24 deletions middleware_test.go

This file was deleted.

6 changes: 4 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package buzz
import (
"context"
"sync"
"sync/atomic"
"time"
)

// Worker wraps your task with additional context to provide a robust operational environment.
type Worker struct {
task Task
middleware []MiddleFunc
cancel context.CancelFunc
tick time.Duration
tickChan <-chan time.Time
notifyComplete chan struct{}
done bool
done atomic.Bool
}

// NewWorker wraps the task and returns a worker that can be submitted to the hive.
Expand Down Expand Up @@ -98,6 +100,6 @@ func (w *Worker) Stop() {
if w.cancel != nil {
w.cancel()
}
w.done = true
w.done.Store(true)
w.notifyComplete <- struct{}{}
}

0 comments on commit 8d210e9

Please sign in to comment.