-
Notifications
You must be signed in to change notification settings - Fork 0
/
joblevel.go
260 lines (218 loc) · 6.9 KB
/
joblevel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package joblevel
import (
"encoding/csv"
"errors"
"io"
"math"
"time"
"github.com/cespare/xxhash/v2"
"github.com/gocarina/gocsv"
)
// time.Duration of a day
const dayDuration = 24 * time.Hour
// multiplier to scale the max Int64 (largest hash possible) to the max duration of a day
const hashToDayDurationScaler = float64(dayDuration) / math.MaxUint64
// a series of start times across the day
type StartDurations []time.Duration
// Job represents a task that needs to be scheduled during the day
type Job struct {
// A unique identifier for the job that doesn't change often
ID string
// How often the job should be run; parsed from strings like "5m" "1h"
// Should evenly divide the 24 hours in a day
Frequency time.Duration
// Calculated start times across the day
starts StartDurations
}
// Jobs represent a collection of Job items
type Jobs []Job
// Returns the duration past midnight when the Job's first start time occurs
func (j *Job) getFirstStart() time.Duration {
// determine hash of the job ID
hash := xxhash.Sum64([]byte(j.ID))
// scale to the job frequency so that the first time begins close to midnight
dayToFrequencyScaler := float64(j.Frequency) / float64(dayDuration)
// scale the hash
hashScaled := float64(hash) * hashToDayDurationScaler * dayToFrequencyScaler
return time.Duration(hashScaled)
}
// RunsPerDay calculcates the number of runs per day for a job
func (j *Job) RunsPerDay() int {
// return int32(math.Floor(float64(dayDuration) / float64(j.Frequency)))
return int(dayDuration / j.Frequency)
}
// ScheduleJob sets start times based upon ID hash and frequency
func (j *Job) ScheduleJob() {
j.starts = make(StartDurations, j.RunsPerDay())
j.starts[0] = j.getFirstStart()
for i := 1; i < j.RunsPerDay(); i++ {
j.starts[i] = j.starts[i-1] + j.Frequency
}
}
// ScheduleJobs runs ScheduleJob on each each job
func (jobs Jobs) ScheduleJobs() {
for i := range jobs {
jobs[i].ScheduleJob()
}
}
// StartsBetween determines whether a job starts between a range of times
func (j *Job) StartsBetween(fromTime, toTime time.Time) (bool, error) {
if !fromTime.Before(toTime) {
return false, errors.New("fromTime must precede toTime")
}
// Determine the duration after midnight UTC for the endpoints
const hoursPerDay = 24
from := fromTime.In(time.UTC).Sub(fromTime.Truncate(time.Hour * hoursPerDay))
to := toTime.In(time.UTC).Sub(toTime.Truncate(time.Hour * hoursPerDay))
// log.Printf("from %v to %v", from, to)
// for any of the job startimes
filteredStarts := j.starts.startsBetween(from, to, true)
return len(filteredStarts) > 0, nil
}
// startsBetween filters to start times that occur between the given range
func (starts StartDurations) startsBetween(from, to time.Duration, firstOnly bool) StartDurations {
filteredStarts := make([]time.Duration, 0)
for _, s := range starts {
// if the start time falls between the endpoints
if from <= s && s < to {
// log.Printf("start %s matched condition 1", start)
filteredStarts = append(filteredStarts, s)
}
// if from and to straddle 0, check that the start time doesn't fall outside them
if from > to && !(to <= s && s < from) {
// log.Printf("start %s matched condition 2", start)
filteredStarts = append(filteredStarts, s)
}
// early return if we only care about one start
if firstOnly && len(filteredStarts) > 0 {
return filteredStarts
}
}
return filteredStarts
}
// StartingBetween filters jobs to those that start between the given times
func (jobs Jobs) StartingBetween(fromTime, toTime time.Time) Jobs {
startingJobs := make(Jobs, 0)
for _, j := range jobs {
b, _ := j.StartsBetween(fromTime, toTime)
if b {
startingJobs = append(startingJobs, j)
}
}
return startingJobs
}
// DurationContaining finds duration containing the given time
func DurationContaining(d time.Duration, t time.Time) (fromTime, toTime time.Time) {
fromTime = t.Truncate(d)
toTime = fromTime.Add(d)
return
}
// StartingDuringDuration filters jobs to those starting during the duration containing the given time
// For instance, setting 12:07pm and a 1 hour duration returns jobs between noon and 1pm
func (jobs Jobs) StartingDuringDuration(t time.Time, d time.Duration) Jobs {
fromTime, toTime := DurationContaining(d, t)
return jobs.StartingBetween(fromTime, toTime)
}
// IDs returns the IDs for the provided jobs
func (jobs Jobs) IDs() []string {
IDs := make([]string, 0)
for i := range jobs {
IDs = append(IDs, jobs[i].ID)
}
return IDs
}
// Deduplicate a slice of strings
func Deduplicate(values []string) []string {
keys := make(map[string]bool)
outputs := []string{}
for _, s := range values {
if _, value := keys[s]; !value {
keys[s] = true
outputs = append(outputs, s)
}
}
return outputs
}
// AllStarts returns all start durations for a given set of Jobs
// func (jobs Jobs) AllStarts() StartDurations {
// starts := make(StartDurations, 0)
// for i := range jobs {
// starts = append(starts, jobs[i].starts...)
// }
// return starts
// }
// ScheduleStartRecrods returns a record for each start time of each Job
// containing the job ID and the starting time as a duration string
func (jobs Jobs) ScheduledStartRecords() [][]string {
records := make([][]string, 0)
for _, j := range jobs {
for _, s := range j.starts {
// For fraction of day values: strconv.FormatFloat(float64(s)/float64(dayDuration), 'f', -1, 64)
records = append(records, []string{j.ID, s.String()})
}
}
return records
}
// NewJob creates new Job with given ID
func NewJob(id string) *Job {
return &Job{ID: id}
}
// WithFrequency sets the Frequency for the Job via a string ("5m") or time.Duration
func (j Job) WithFrequency(frequency interface{}) Job {
switch v := frequency.(type) {
case string:
d, err := time.ParseDuration(v)
if err != nil {
panic(err)
}
j.Frequency = d
case time.Duration:
j.Frequency = v
default:
panic(errors.New("unknown frequency type"))
}
return j
}
// NewJobsFromCSV loads Jobs from a CSV
func NewJobsFromCSV(r io.Reader) Jobs {
newJobs := make(Jobs, 0)
type csvJobs struct {
ID string
Frequency string
}
jobs := make([]csvJobs, 0)
if err := gocsv.UnmarshalCSV(gocsv.DefaultCSVReader(r), &jobs); err != nil {
panic(err)
}
for _, j := range jobs {
newJobs = append(newJobs, NewJob(j.ID).WithFrequency(j.Frequency))
}
return newJobs
}
// CSV writes a CSV of Jobs to the provided io.Writer
func (jobs Jobs) CSV(w io.Writer) error {
s, err := gocsv.MarshalString(jobs)
if err != nil {
return err
}
_, err = io.WriteString(w, s)
if err != nil {
return err
}
return nil
}
// ScheduleCSV writes a CSV of Job start IDs and starts
// with one line per job start duration
func (jobs Jobs) ScheduleCSV(w io.Writer) error {
csvwriter := csv.NewWriter(w)
err := csvwriter.Write([]string{"ID", "StartDurationAfterMidnightUTC"})
if err != nil {
return err
}
err = csvwriter.WriteAll(jobs.ScheduledStartRecords())
if err != nil {
return err
}
csvwriter.Flush()
return nil
}