Skip to content


Prepare for v1.1 release (#8)
Browse files Browse the repository at this point in the history
* Introduce configurable job timeouts.
* Fix a bug when job recovery ignores the maximum number of attempts.
  • Loading branch information
temochka authored Apr 28, 2019
1 parent 1814286 commit 68dd19e
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 14 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
dist: trusty
language: clojure
- mysql
- openjdk7
- oraclejdk7
- oraclejdk8
- oraclejdk9
- oraclejdk11
script: "lein test-all"
before_script: "echo 'CREATE DATABASE IF NOT EXISTS clj_mysql_queue;' | mysql -uroot"

2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject mysql-queue "1.0.0"
(defproject mysql-queue "1.1.0"
:description "A durable queue with scheduled job support that is backed by MySQL."
:url ""
:license {:name "Eclipse Public License"
Expand Down
43 changes: 31 additions & 12 deletions src/mysql_queue/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,21 @@
[clojure.core.async :as async :refer [chan >!! >! <! <!! go go-loop thread thread-call close! timeout alts! alts!!]]
[clojure.core.async.impl.protocols :as async-proto :refer [closed?]])
(:import (com.mysql.jdbc.exceptions.jdbc4 MySQLIntegrityConstraintViolationException)
(java.util Date)))
(java.util Date)
(java.util.concurrent TimeoutException)))

(defmacro with-timeout
"A macro that executes the body in a future with a specified
timeout. Returns nil if times out. Throws an exception if
body returns nil."
[timeout & body]
`(let [f# (future ~@body)
ret# (deref f# ~timeout ~::timed-out)]
(if (= ret# ~::timed-out)
(future-cancel f#)
(throw (TimeoutException. (str "Execution took more than " ~timeout "ms. Terminating."))))

(def ultimate-job-states #{:canceled :failed :done})
(def max-retries 5)
Expand Down Expand Up @@ -42,7 +56,7 @@

(defprotocol Executable
(finished? [job])
(execute [job db-conn log-fn err-fn]))
(execute [job db-conn log-fn err-fn timeout]))

(defprotocol Fertile
(beget [parent] [parent status] [parent status parameters]))
Expand Down Expand Up @@ -103,7 +117,9 @@
(job-summary-string this))
(beget [this]
(->Job user-fn nil scheduled-job-id id name status parameters (inc attempt)))
(if (< attempt max-retries)
(->Job user-fn nil scheduled-job-id id name status parameters (inc attempt))
(->Job user-fn nil scheduled-job-id id name :failed parameters attempt)))
(beget [this _] (beget this))
(beget [this _ _] (beget this)))

Expand Down Expand Up @@ -156,13 +172,13 @@
(finished? [job]
(ultimate-job-states (:status job)))
(execute [{:as job job-fn :user-fn :keys [status parameters attempt]} db-conn log-fn err-fn]
(execute [{:as job job-fn :user-fn :keys [status parameters attempt]} db-conn log-fn err-fn timeout]
(profile-block [m]
(if (finished? job)
(cleanup job db-conn)
(log-fn :info job "Executing job " job)
(let [[status params] (-> (meter m :user (job-fn status parameters))
(let [[status params] (-> (meter m :user (with-timeout timeout (job-fn status parameters)))
job-result-or-nil (or [:done nil]))]
(-> job (beget status params) (persist db-conn)))
(catch Exception e
Expand All @@ -172,14 +188,14 @@
(-> job (beget :failed) (persist db-conn))))))))
(finished? [job] false)
(execute [job db-conn log-fn err-fn]
(execute [job db-conn log-fn err-fn timeout]
(profile-block [_]
(log-fn :info job "Recovering job " job)
(-> job beget (persist db-conn))))
(finished? [job]
(throw (UnsupportedOperationException. "finished? is not implemented for ScheduledJob.")))
(execute [job db-conn log-fn _err-fn]
(execute [job db-conn log-fn err-fn timeout]
(profile-block [_]
(log-fn :info job "Executing job " job)
(-> job beget (persist db-conn)))))
Expand Down Expand Up @@ -246,7 +262,7 @@

(defn- consumer-thread
"Consumer loop. Automatically quits if the listen-chan is closed. Runs in a go-thread."
[id listen-chan status-chan db-conn log-fn err-fn]
[id listen-chan status-chan db-conn log-fn err-fn job-timeout]
(while-let [job (<! listen-chan)]
Expand All @@ -257,7 +273,7 @@
(if current-job
(>! status-chan {:id id :state :running-job :job current-job})
(let [[next-job dmetrics] (<! (thread (execute current-job db-conn log-fn err-fn)))]
(let [[next-job dmetrics] (<! (thread (execute current-job db-conn log-fn err-fn job-timeout)))]
(recur next-job current-job (merge-with + metrics dmetrics))))
(>! status-chan {:id id :state :finished-job :job last-job :metrics metrics})
Expand Down Expand Up @@ -449,6 +465,7 @@
* buffer-size - maximum number of jobs allowed into internal queue. Determines
when the publisher will block. Default 10.
* job-timeout-mins - the number of minutes after which the job times out. Default 20.
* prefetch - the number of jobs a publisher fetches from the database at once.
Default 10.
* num-stats-jobs - the number of jobs to keep in memory for statistical purpose.
Expand All @@ -463,14 +480,15 @@
sleep before querying the database for stuck jobs. Default 0 seconds.
* max-recovery-sleep-interval - the maximum time in seconds the recovery thread will
sleep before qerying the database for stuck jobs. Default 10 seconds.
* recovery-threshold-mins - the number of seconds after which a job is considered
stuck and will be picked up by the recovery thread.
* recovery-threshold-mins - the number of minutes after which a job is considered
stuck and will be picked up by the recovery thread. Default 20.
* log-fn - user-provided logging function of 3 arguments: level (keyword), job (record), message (msg).
* err-fn - user-provided error function of one argument: error (Exception)."
&{:keys [buffer-size
Expand All @@ -481,6 +499,7 @@
:or {buffer-size 10
job-timeout-mins 20
prefetch 10
num-stats-jobs 50
num-consumer-threads 2
Expand Down Expand Up @@ -519,7 +538,7 @@
[in-ch out-chs sieve] (deduplicate queue-chan num-consumer-threads)
consumer-threads (->> out-chs
#(consumer-thread %1 %2 consumer-status-channel db-conn log-fn err-fn))
#(consumer-thread %1 %2 consumer-status-channel db-conn log-fn err-fn (* 60 1000 job-timeout-mins)))
(into [])
handler (partial publisher-error-handler log-fn err-fn)
Expand Down
41 changes: 41 additions & 0 deletions test/mysql_queue/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
[db-conn job-name]
(sql/delete! db-conn :scheduled_jobs ["name = ?" job-name]))

(defn count-jobs
(sql/query db-conn ["SELECT COUNT(*) AS c FROM jobs"] {:result-set-fn (comp :c first)}))

(defn queue-size
(sql/query db-conn ["SELECT COUNT(*) AS c FROM scheduled_jobs"] {:result-set-fn (comp :c first)}))

(defn setup-db
(initialize! db-conn)
Expand Down Expand Up @@ -202,6 +210,39 @@
(is (= num-jobs (count @check-ins))
"The number of executed jobs doesn't match the number of jobs queued."))))

(deftest stuck-job-max-attempts-test
(let [jobs {:test-foo #(throw (Exception. "This job should not have been executed, because it reached max attempts."))}
scheduled-id (schedule-job db-conn :test-foo :begin {} (java.util.Date. 0))]
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {}) 5 (java.util.Date. 0))
(is (= 1 (count (queries/select-n-stuck-jobs db-conn ultimate-job-states ["test-foo"] [0] 5 5))))
(is (= 1 (count-jobs db-conn)))
(with-worker [wrk (worker db-conn
:num-consumer-threads 1
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(Thread/sleep 1000)
(is (zero? (count (queries/select-n-stuck-jobs db-conn ultimate-job-states ["test-foo"] [0] 5 5))))
(is (zero? (count-jobs db-conn))))))

(deftest job-timeout-test
(let [attempt (atom 0)
jobs {:test-foo (fn [_ _]
(swap! attempt inc)
(when (= 1 @attempt)
(Thread/sleep 10000))
[:done {}])}]
(schedule-job db-conn :test-foo :begin {} (java.util.Date.))
(with-worker [wrk (worker db-conn
:num-consumer-threads 1
; run scheduler every 0.5s
:max-scheduler-sleep-interval 0.5
; terminate jobs that take over 1 second
:job-timeout-mins (/ 1 60))]
(Thread/sleep 3000)
(is (= 2 @attempt)))))

(deftest graceful-shutdown-test
(let [num-jobs 2
expected-set (->> num-jobs range (map inc) (into #{}))
Expand Down

0 comments on commit 68dd19e

Please sign in to comment.