Skip to content

Commit

Permalink
Initial code commit (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
thenorthnate authored Mar 1, 2024
1 parent 6a185ab commit 806052b
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: lint
on:
push:
tags:
- v*
branches:
- main
pull_request:
permissions:
contents: read
jobs:
golangci:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x]
name: golangci-lint
runs-on: ${{ matrix.os }}
steps:
- uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.54
25 changes: 25 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: test
on:
push:
tags:
- v*
branches:
- main
pull_request:
permissions:
contents: read
jobs:
test:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x]
name: go-test
runs-on: ${{ matrix.os }}
steps:
- uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- uses: actions/checkout@v3
- name: Run the tests
run: go test -race ./...
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
run:
timeout: 5m
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
# buzz

[![GoDoc][doc-img]][doc] [![Test][ci-img]][ci]

Robust workers for Go


[doc-img]: https://pkg.go.dev/badge/github.com/thenorthnate/buzz
[doc]: https://pkg.go.dev/github.com/thenorthnate/buzz
[ci-img]: https://github.com/thenorthnate/buzz/workflows/test/badge.svg
[ci]: https://github.com/thenorthnate/buzz/actions
33 changes: 33 additions & 0 deletions buzz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package buzz

import (
"context"

backoff "github.com/cenkalti/backoff/v4"
)

var (
DefaultBackoff = backoff.NewExponentialBackOff()
)

// Task represents the thing that you want accomplished.
type Task interface {
// Do should perform the desired work of the Worker. If the context is cancelled, it should
// return an error. If no error is returned, [Do] is called repeatedly in a loop.
Do(ctx context.Context) error
}

// CallChain represents a linked list that provides the mechanism through which middleware
// can be implemented.
type CallChain struct {
next *CallChain
exec MiddleFunc
}

// Next is used to allow the chain to proceed processing. When it returns, you can assume
// that all middleware as well as the task itself executed and returned.
func (chain *CallChain) Next(ctx context.Context) error {
return chain.exec(ctx, chain.next)
}

type MiddleFunc func(ctx context.Context, chain *CallChain) error
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/thenorthnate/buzz

go 1.22.0

require github.com/cenkalti/backoff/v4 v4.2.1

require github.com/thenorthnate/evs v0.1.0
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/thenorthnate/evs v0.1.0 h1:KvWzP/ZOwA7ClXbXgEUwp1HTmyxxigsndSMotQJjLjY=
github.com/thenorthnate/evs v0.1.0/go.mod h1:VUtM+gde4XxulEYrF1bndmIUq/X9RyxIh11qn0Ofv/A=
55 changes: 55 additions & 0 deletions hive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package buzz

import (
"slices"
"sync"
)

type Hive struct {
colony []*Worker
block sync.WaitGroup
notifyComplete chan *Worker
}

// New initializes a new [*Hive].
func New() *Hive {
hive := &Hive{
notifyComplete: make(chan *Worker, 1),
}
hive.startCleanupWorker()
return hive
}

func (hive *Hive) startCleanupWorker() {
go func() {
for completed := range hive.notifyComplete {
index := -1
for i, w := range hive.colony {
if completed == w {
index = i
break
}
}
if index >= 0 {
hive.colony = slices.Delete(hive.colony, index, index+1)
}
}
}()
}

func (hive *Hive) Submit(worker *Worker) {
hive.block.Add(1)
worker.notifyComplete = hive.notifyComplete
hive.colony = append(hive.colony, worker)
go worker.run(&hive.block)
}

// StopAll should only be used when you are completely done with the hive. Internal channels are
// closed and all workers are shutdown.
func (hive *Hive) StopAll() {
for i := range hive.colony {
hive.colony[i].Stop()
}
hive.block.Wait()
close(hive.notifyComplete)
}
37 changes: 37 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//go:build integration

package buzz_test

import (
"context"
"log"
"testing"
"time"

"github.com/thenorthnate/buzz"
)

type logTask struct{}

func (t *logTask) Do(ctx context.Context) error {
log.Println("message here")
return nil
}

func logMiddleware(ctx context.Context, chain *buzz.CallChain) error {
log.Println("task starting")
err := chain.Next(ctx)
log.Println("task complete")
return err
}

func TestIntegrationWorker(t *testing.T) {
hive := buzz.New()
worker := buzz.
NewWorker(&logTask{}).
Tick(time.Second).
Use(buzz.RecoveryMiddleware, logMiddleware)
hive.Submit(worker)
time.Sleep(5 * time.Second)
hive.StopAll()
}
16 changes: 16 additions & 0 deletions middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package buzz

import (
"context"

"github.com/thenorthnate/evs"
)

func RecoveryMiddleware(ctx context.Context, chain *CallChain) (err error) {
defer func() {
if r := recover(); r != nil {
err = evs.Newf("worker panic'd: %v", r).Err()
}
}()
return chain.Next(ctx)
}
25 changes: 25 additions & 0 deletions middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package buzz

import (
"context"
"errors"
"testing"
)

func TestRecoveryMiddleware(t *testing.T) {
bee := NewWorker(&mockTask{
dofunc: func(ctx context.Context) error {
return errors.New("darn")
},
}).Use(RecoveryMiddleware)
chain := bee.assembleCallChain()
if chain.exec == nil {
t.Fatal("exec was supposed to be defined but was nil instead")
}
if chain.next == nil {
t.Fatal("chain.next was not supposed to be nil")
}
if err := bee.runChainOnce(context.Background(), chain); err == nil {
t.Fatal("runChainOnce was supposed to return an error but did not")
}
}
100 changes: 100 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package buzz

import (
"context"
"sync"
"time"
)

type Worker struct {
task Task
middleware []MiddleFunc
cancel context.CancelFunc
tick time.Duration
tickChan <-chan time.Time
notifyComplete chan *Worker
}

func NewWorker(task Task) *Worker {
tickChan := make(chan time.Time)
close(tickChan)
return &Worker{
task: task,
middleware: make([]MiddleFunc, 0),
tickChan: tickChan,
}
}

// Use adds the given middleware functions to the Bee.
func (w *Worker) Use(middleware ...MiddleFunc) *Worker {
w.middleware = append(w.middleware, middleware...)
return w
}

// Use adds the given middleware functions to the Bee.
func (w *Worker) Tick(tick time.Duration) *Worker {
w.tick = tick
return w
}

func (w *Worker) assembleCallChain() *CallChain {
root := &CallChain{}
node := root
for _, mfunc := range w.middleware {
node.exec = mfunc
node.next = &CallChain{}
node = node.next
}
node.exec = w.workTillError
return root
}

func (w *Worker) run(block *sync.WaitGroup) {
defer block.Done()
ctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
if w.tick > 0 {
ticker := time.NewTicker(w.tick)
defer ticker.Stop()
w.tickChan = ticker.C
}
callChain := w.assembleCallChain()
for {
// execute chain of middleware funcs where each func is passed the next func
select {
case <-ctx.Done():
return
default:
if err := w.runChainOnce(ctx, callChain); err != nil {
return
}
}
}
}

func (w *Worker) runChainOnce(ctx context.Context, callChain *CallChain) error {
return callChain.Next(ctx)
}

// workTillError should be the final "middleware" called in the call chain. The next call chain
// link will be nil and should not be used hence the underscore.
func (w *Worker) workTillError(ctx context.Context, _ *CallChain) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-w.tickChan:
if err := w.task.Do(ctx); err != nil {
return err
}
}
}
}

// Stop issues a command to the hive to stop this worker from running and remove it.
func (w *Worker) Stop() {
if w.cancel != nil {
w.cancel()
}
w.notifyComplete <- w
}
Loading

0 comments on commit 806052b

Please sign in to comment.