11(ns cronut
22 (:refer-clojure :exclude [proxy])
3- (:require [clojure.tools.logging :as log])
4- (:import (java.util TimeZone)
5- (org.quartz CronScheduleBuilder DisallowConcurrentExecution Job JobBuilder JobDetail JobExecutionException Scheduler SimpleScheduleBuilder TriggerBuilder)
6- (org.quartz.impl StdSchedulerFactory)
7- (org.quartz.spi JobFactory TriggerFiredBundle)))
8-
9- (defn base-trigger-builder
10- " Provide a base trigger-builder from configuration"
11- [{:keys [identity description start end priority]}]
12- (cond-> (TriggerBuilder/newTrigger )
13- (seq identity) (.withIdentity (first identity) (second identity))
14- description (.withDescription description)
15- start (.startAt start)
16- (nil? start) (.startNow )
17- end (.endAt end)
18- priority (.withPriority (int priority))))
19-
20- (defn simple-schedule
21- " Provide a simple schedule from configuration"
22- [{:keys [interval time-unit repeat misfire]}]
23- (let [schedule (SimpleScheduleBuilder/simpleSchedule )]
24- (case time-unit
25- :millis (.withIntervalInMilliseconds schedule interval)
26- :seconds (.withIntervalInSeconds schedule interval)
27- :minutes (.withIntervalInMinutes schedule interval)
28- :hours (.withIntervalInHours schedule interval)
29- nil (when interval (.withIntervalInMilliseconds schedule interval)))
30- (case misfire
31- :fire-now (.withMisfireHandlingInstructionFireNow schedule)
32- :ignore (.withMisfireHandlingInstructionIgnoreMisfires schedule)
33- :next-existing (.withMisfireHandlingInstructionNextWithExistingCount schedule)
34- :next-remaining (.withMisfireHandlingInstructionNextWithRemainingCount schedule)
35- :now-existing (.withMisfireHandlingInstructionNowWithExistingCount schedule)
36- :now-remaining (.withMisfireHandlingInstructionNowWithRemainingCount schedule)
37- nil nil )
38- (cond
39- (number? repeat) (.withRepeatCount schedule repeat)
40- (= :forever repeat) (.repeatForever schedule))
41- schedule))
42-
43- (defn cron-schedule
44- " Provide a cron schedule from configuration"
45- [{:keys [cron time-zone misfire]}]
46- (let [schedule (CronScheduleBuilder/cronSchedule ^String cron)]
47- (case misfire
48- :ignore (.withMisfireHandlingInstructionIgnoreMisfires schedule)
49- :do-nothing (.withMisfireHandlingInstructionDoNothing schedule)
50- :fire-and-proceed (.withMisfireHandlingInstructionFireAndProceed schedule)
51- nil nil )
52- (when time-zone
53- (.inTimeZone schedule (TimeZone/getTimeZone ^String time-zone)))
54- schedule))
55-
56- (defmulti trigger-builder :type )
57-
58- (defmethod trigger-builder :simple
59- [config]
60- (.withSchedule ^TriggerBuilder (base-trigger-builder config)
61- (simple-schedule config)))
62-
63- (defmethod trigger-builder :cron
64- [config]
65- (.withSchedule ^TriggerBuilder (base-trigger-builder config)
66- (cron-schedule config)))
67-
68- (defrecord ^{DisallowConcurrentExecution true } SerialProxyJob [proxied-job]
69- Job
70- (execute [_ job-context]
71- (try
72- (.execute ^Job proxied-job job-context)
73- (catch JobExecutionException ex
74- (throw ex))
75- (catch Exception ex
76- (throw (JobExecutionException. ^Exception ex))))))
77-
78- (defrecord ProxyJob [proxied-job]
79- Job
80- (execute [_ job-context]
81- (try
82- (.execute ^Job proxied-job job-context)
83- (catch JobExecutionException ex
84- (throw ex))
85- (catch Exception ex
86- (throw (JobExecutionException. ^Exception ex))))))
87-
88- (defn job-factory
89- [scheduled opts]
90- (reify JobFactory
91- (newJob [_ bundle _]
92- (let [job-detail (.getJobDetail ^TriggerFiredBundle bundle)
93- job-key (.getKey job-detail)]
94- (if (:disallowConcurrentExecution? opts)
95- (->SerialProxyJob (get scheduled job-key))
96- (->ProxyJob (get scheduled job-key)))))))
97-
98- (defn proxy
99- [job opts]
100- (let [{:keys [identity description recover? durable?]} job]
101- (.build (cond-> (.ofType (JobBuilder/newJob ) (if (:disallowConcurrentExecution? opts) SerialProxyJob ProxyJob))
102- (seq identity) (.withIdentity (first identity) (second identity))
103- description (.withDescription description)
104- (boolean? recover?) (.requestRecovery recover?)
105- (boolean? durable?) (.storeDurably durable?)))))
106-
107- (defn activate
108- [^Scheduler scheduler schedule global-opts]
3+ (:require [clojure.tools.logging :as log]
4+ [cronut.job :as job])
5+ (:import (org.quartz JobDetail JobKey Scheduler Trigger TriggerBuilder TriggerKey)
6+ (org.quartz.impl StdSchedulerFactory)))
7+
8+ (defn concurrent-execution-disallowed?
9+ [^Scheduler scheduler]
10+ (= " true" (get (.getContext scheduler) " concurrentExecutionDisallowed?" )))
11+
12+ (defn get-detail
13+ [^Scheduler scheduler ^JobKey key]
14+ (.getJobDetail scheduler key))
15+
16+ (defn ^Trigger schedule-job
17+ [^Scheduler scheduler ^TriggerBuilder trigger job]
18+ (let [detail ^JobDetail (job/detail job (concurrent-execution-disallowed? scheduler))]
19+ (if-let [^JobDetail previously-scheduled (get-detail scheduler (.getKey detail))]
20+ (let [built (.build (.forJob trigger previously-scheduled))]
21+ (log/info " scheduling new trigger for existing job" built previously-scheduled)
22+ (.scheduleJob scheduler built)
23+ built)
24+ (let [built (.build trigger)]
25+ (log/info " scheduling new job" built detail)
26+ (.scheduleJob scheduler detail built)
27+ built))))
28+
29+ (defn schedule-jobs
30+ [^Scheduler scheduler jobs]
31+ (log/infof " scheduling [%s] jobs" (count jobs))
32+ (loop [schedule jobs
33+ triggers []]
34+ (if-let [{:keys [^TriggerBuilder trigger job]} (first schedule)]
35+ (recur (rest schedule) (conj triggers (schedule-job scheduler trigger job)))
36+ triggers)))
37+
38+ (defn scheduler
39+ [{:keys [update-check? concurrent-execution-disallowed?]}]
40+ (log/infof " initializing scheduler" )
41+ (when-not update-check?
42+ (System/setProperty " org.terracotta.quartz.skipUpdateCheck" " true" )
43+ (log/infof " with quartz update check disabled" ))
44+ (let [scheduler (StdSchedulerFactory/getDefaultScheduler )]
45+ (if concurrent-execution-disallowed?
46+ (do
47+ (log/infof " with global concurrent execution disallowed" )
48+ (.put (.getContext scheduler) " concurrentExecutionDisallowed?" " true" ))
49+ (.put (.getContext scheduler) " concurrentExecutionDisallowed?" " false" ))
50+ (.setJobFactory scheduler (job/factory ))
51+ scheduler))
52+
53+ (defn pause-job
54+ ([^Scheduler scheduler group name]
55+ (.pauseJob scheduler (JobKey. name group)))
56+ ([^Scheduler scheduler ^Trigger trigger]
57+ (.pauseJob scheduler (.getJobKey trigger))))
58+
59+ (defn resume-job
60+ ([^Scheduler scheduler group name]
61+ (.resumeJob scheduler (JobKey. name group)))
62+ ([^Scheduler scheduler ^Trigger trigger]
63+ (.resumeJob scheduler (.getJobKey trigger))))
64+
65+ (defn pause-trigger
66+ ([^Scheduler scheduler group name]
67+ (.pauseTrigger scheduler (TriggerKey. name group)))
68+ ([^Scheduler scheduler ^Trigger trigger]
69+ (.pauseTrigger scheduler (.getKey trigger))))
70+
71+ (defn resume-trigger
72+ ([^Scheduler scheduler group name]
73+ (.resumeTrigger scheduler (TriggerKey. name group)))
74+ ([^Scheduler scheduler ^Trigger trigger]
75+ (.resumeTrigger scheduler (.getKey trigger))))
76+
77+ (defn delete-job
78+ ([^Scheduler scheduler group name]
79+ (.deleteJob scheduler (JobKey. name group)))
80+ ([^Scheduler scheduler ^Trigger trigger]
81+ (.deleteJob scheduler (.getJobKey trigger))))
82+
83+ (defn unschedule-job
84+ ([^Scheduler scheduler group name]
85+ (.unscheduleJob scheduler (TriggerKey. name group)))
86+ ([^Scheduler scheduler ^Trigger trigger]
87+ (.unscheduleJob scheduler (.getKey trigger))))
88+
89+ (defn pause-all
90+ [^Scheduler scheduler]
91+ (.pauseAll scheduler))
92+
93+ (defn resume-all
94+ [^Scheduler scheduler]
95+ (.resumeAll scheduler))
96+
97+ (defn clear
98+ [^Scheduler scheduler]
10999 (.clear scheduler)
110- (loop [schedule schedule
111- scheduled {}
112- proxies {}]
113- (if-let [{:keys [job ^TriggerBuilder trigger]} (first schedule)]
114- (if-let [^JobDetail previously-scheduled (get proxies job)]
115- (let [built (.build (.forJob trigger previously-scheduled))]
116- (log/info " scheduling new trigger for existing job" built previously-scheduled)
117- (.scheduleJob scheduler built)
118- (recur (rest schedule) scheduled proxies))
119- (let [proxy-detail ^JobDetail (proxy job global-opts)
120- job-key (.getKey proxy-detail)
121- built (.build trigger)]
122- (log/info " scheduling new job" built proxy-detail)
123- (.scheduleJob scheduler proxy-detail built)
124- (recur (rest schedule) (assoc scheduled job-key job) (assoc proxies job proxy-detail))))
125- (.setJobFactory scheduler (job-factory scheduled global-opts))))
126- (.start scheduler)
127100 scheduler )
128101
129- (defn initialize
130- [config]
131- (let [{:keys [schedule update-check?]} config
132- opts (dissoc config :schedule :update-check? )]
133- (log/infof " initializing schedule of [%s] jobs" (count schedule))
134- (when-not update-check?
135- (System/setProperty " org.terracotta.quartz.skipUpdateCheck" " true" )
136- (log/infof " with quartz update check disabled" ))
137- (log/infof " with cronut opts %s" opts)
138- (activate (StdSchedulerFactory/getDefaultScheduler ) schedule config)))
139-
140- (defn shortcut-interval
141- " Trigger immediately, at an interval-ms, run forever (well that's optimistic but you get the idea)"
142- [interval-ms]
143- (trigger-builder {:type :simple
144- :interval interval-ms
145- :time-unit :millis
146- :repeat :forever }))
147-
148- (defn shortcut-cron
149- [cron]
150- (trigger-builder {:type :cron
151- :cron cron}))
102+ (defn start
103+ [^Scheduler scheduler]
104+ (.start scheduler)
105+ scheduler )
152106
153107(defn shutdown
154108 [scheduler]
155- (.shutdown ^Scheduler scheduler))
109+ (.shutdown ^Scheduler scheduler)
110+ scheduler )
0 commit comments