Skip to content

Commit

Permalink
Merge pull request #176 from takonomura/notify-job
Browse files Browse the repository at this point in the history
Notify to starter when new job enqueued
  • Loading branch information
whywaita authored Sep 27, 2023
2 parents 5ef225d + c8a8b2c commit ed56517
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 12 deletions.
6 changes: 4 additions & 2 deletions cmd/server/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ type myShoes struct {

// newShoes create myshoes.
func newShoes() (*myShoes, error) {
ds, err := mysql.New(config.Config.MySQLDSN)
notifyEnqueueCh := make(chan struct{}, 1)

ds, err := mysql.New(config.Config.MySQLDSN, notifyEnqueueCh)
if err != nil {
return nil, fmt.Errorf("failed to mysql.New: %w", err)
}

unlimit := unlimited.Unlimited{}
s := starter.New(ds, unlimit, config.Config.RunnerVersion)
s := starter.New(ds, unlimit, config.Config.RunnerVersion, notifyEnqueueCh)

manager := runner.New(ds, config.Config.RunnerVersion)

Expand Down
2 changes: 1 addition & 1 deletion internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func IntegrationTestRunner(m *testing.M) int {
if err := pool.Retry(func() error {
var err error
dsn := fmt.Sprintf("root:%s@(localhost:%s)/mysql", mysqlRootPassword, resource.GetPort("3306/tcp"))
testDatastore, err = mysql.New(dsn)
testDatastore, err = mysql.New(dsn, make(chan<- struct{}))
if err != nil {
log.Fatalf("failed to create datastore instance: %s", err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/datastore/mysql/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ func (m *MySQL) EnqueueJob(ctx context.Context, job datastore.Job) error {
return fmt.Errorf("failed to execute INSERT query: %w", err)
}

select {
case m.notifyEnqueueCh <- struct{}{}:
// notified to starter
default:
// no capacity on channel, do not block
}

return nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/datastore/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
// MySQL is implement datastore in MySQL
type MySQL struct {
Conn *sqlx.DB

notifyEnqueueCh chan<- struct{}
}

// New create mysql connection
func New(dsn string) (*MySQL, error) {
func New(dsn string, notifyEnqueueCh chan<- struct{}) (*MySQL, error) {
u, err := getMySQLURL(dsn)
if err != nil {
return nil, fmt.Errorf("failed to get MySQL URL: %w", err)
Expand All @@ -26,7 +28,8 @@ func New(dsn string) (*MySQL, error) {
}

return &MySQL{
Conn: conn,
Conn: conn,
notifyEnqueueCh: notifyEnqueueCh,
}, nil
}

Expand Down
21 changes: 14 additions & 7 deletions pkg/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ var (

// Starter is dispatcher for running job
type Starter struct {
ds datastore.Datastore
safety safety.Safety
runnerVersion string
ds datastore.Datastore
safety safety.Safety
runnerVersion string
notifyEnqueueCh <-chan struct{}
}

// New create starter instance
func New(ds datastore.Datastore, s safety.Safety, runnerVersion string) *Starter {
func New(ds datastore.Datastore, s safety.Safety, runnerVersion string, notifyEnqueueCh <-chan struct{}) *Starter {
return &Starter{
ds: ds,
safety: s,
runnerVersion: runnerVersion,
ds: ds,
safety: s,
runnerVersion: runnerVersion,
notifyEnqueueCh: notifyEnqueueCh,
}
}

Expand Down Expand Up @@ -82,6 +84,11 @@ func (s *Starter) Loop(ctx context.Context) error {
if err := s.dispatcher(ctx, ch); err != nil {
logger.Logf(false, "failed to starter: %+v", err)
}
case <-s.notifyEnqueueCh:
ticker.Reset(10 * time.Second)
if err := s.dispatcher(ctx, ch); err != nil {
logger.Logf(false, "failed to starter: %+v", err)
}
case <-ctx.Done():
return nil
}
Expand Down

0 comments on commit ed56517

Please sign in to comment.