# File lib/delayed/worker.rb, line 32 def self.backend=(backend) if backend.is_a? Symbol require "delayed/serialization/#{backend}" require "delayed/backend/#{backend}" backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize end @@backend = backend silence_warnings { ::Delayed.const_set(:Job, backend) } end
# File lib/delayed/worker.rb, line 42 def self.guess_backend self.backend ||= :active_record if defined?(ActiveRecord) end
# File lib/delayed/worker.rb, line 46 def initialize(options={}) @quiet = options.has_key?(:quiet) ? options[:quiet] : true self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) end
# File lib/delayed/worker.rb, line 147 def failed(job) job.hook(:failure) if job.respond_to?(:on_permanent_failure) warn "[DEPRECATION] The #on_permanent_failure hook has been renamed to #failure." end self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now) end
# File lib/delayed/worker.rb, line 161 def max_attempts(job) job.max_attempts || self.class.max_attempts end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 57 def name return @name unless @name.nil? "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" end
Sets the name of the worker. Setting the name to nil will reset the default worker name
# File lib/delayed/worker.rb, line 64 def name=(val) @name = val end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 135 def reschedule(job, time = nil) if (job.attempts += 1) < max_attempts(job) time ||= job.reschedule_at job.run_at = time job.unlock job.save! else say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO failed(job) end end
# File lib/delayed/worker.rb, line 118 def run(job) runtime = Benchmark.realtime do Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job } job.destroy end say "#{job.name} completed after %.4f" % runtime return true # did work rescue DeserializationError => error job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}" failed(job) rescue Exception => error handle_failed_job(job, error) return false # work failed end
# File lib/delayed/worker.rb, line 155 def say(text, level = Logger::INFO) text = "[Worker(#{name})] #{text}" puts text unless @quiet logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger end
# File lib/delayed/worker.rb, line 68 def start say "Starting job worker" trap('TERM') { say 'Exiting...'; $exit = true } trap('INT') { say 'Exiting...'; $exit = true } loop do result = nil realtime = Benchmark.realtime do result = work_off end count = result.sum break if $exit if count.zero? sleep(self.class.sleep_delay) else say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] end break if $exit end ensure Delayed::Job.clear_locks!(name) end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 100 def work_off(num = 100) success, failure = 0, 0 num.times do case reserve_and_run_one_job when true success += 1 when false failure += 1 else break # leave if no work could be done end break if $exit # leave if we're exiting end return [success, failure] end
# File lib/delayed/worker.rb, line 167 def handle_failed_job(job, error) job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}" say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR reschedule(job) end
Generated with the Darkfish Rdoc Generator 2.