Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions app/models/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,22 @@ class Job < ::ActiveRecord::Base

# high-level queue states (live => erroring => failed)
scope :live, -> { where(failed_at: nil) }
scope :erroring, -> { where(arel_table[:attempts].gt(0)).merge(unscoped.live) }
scope :erroring, -> { where(erroring_clause).merge(unscoped.live) }
scope :failed, -> { where.not(failed_at: nil) }

# live queue states (future vs pending)
scope :future, ->(as_of = db_time_now) { merge(unscoped.live).where(arel_table[:run_at].gt(as_of)) }
scope :pending, ->(as_of = db_time_now) { merge(unscoped.live).where(arel_table[:run_at].lteq(as_of)) }
scope :future, ->(as_of = db_time_now) { merge(unscoped.live).where(future_clause(as_of)) }
scope :pending, ->(as_of = db_time_now) { merge(unscoped.live).where(pending_clause(as_of)) }

# pending queue states (claimed vs claimable)
scope :claimed, ->(as_of = db_time_now) {
where(arel_table[:locked_at].gteq(db_time_now - lock_timeout))
.merge(unscoped.pending(as_of))
where(claimed_clause(as_of)).merge(unscoped.pending(as_of))
}
scope :claimed_by, ->(worker, as_of = db_time_now) {
where(locked_by: worker.name)
.claimed(as_of)
where(locked_by: worker.name).claimed(as_of)
}
scope :claimable, ->(as_of = db_time_now) {
where(locked_at: nil)
.or(where(arel_table[:locked_at].lt(db_time_now - lock_timeout)))
.merge(unscoped.pending(as_of))
where(claimable_clause(as_of)).merge(unscoped.pending(as_of))
}
scope :claimable_by, ->(worker, as_of = db_time_now) {
claimable(as_of)
Expand All @@ -40,6 +36,26 @@ class Job < ::ActiveRecord::Base
.by_priority
}

def self.erroring_clause
arel_table[:attempts].gt(0)
end

def self.future_clause(as_of = db_time_now)
arel_table[:run_at].gt(as_of)
end

def self.pending_clause(as_of = db_time_now)
arel_table[:run_at].lteq(as_of)
end

def self.claimed_clause(as_of = db_time_now)
arel_table[:locked_at].gteq(as_of - lock_timeout)
end

def self.claimable_clause(as_of = db_time_now)
arel_table[:locked_at].eq(nil).or arel_table[:locked_at].lt(as_of - lock_timeout)
end

before_save :set_default_run_at, :set_name

REENQUEUE_BUFFER = 30.seconds
Expand Down
67 changes: 48 additions & 19 deletions lib/delayed/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,50 +95,58 @@ def default_tags
}
end

def grouped_count(scope)
Delayed::Job.from(scope.select('priority, queue, COUNT(*) AS count'))
.group(priority_case_statement, :queue).sum(:count)
def grouped_count(scope, **kwargs)
selects = kwargs.map { |k, v| "#{v} AS #{k}" }.join(', ')
counts = kwargs.keys.map { |k| "SUM(#{k}) AS #{k}" }.join(', ')

Delayed::Job.from(scope.select("priority, queue, #{selects}")
.group(:priority, :queue))
.group(priority_case_statement, :queue).select(
counts,
"#{priority_case_statement} AS priority",
'queue AS queue',
).group_by { |j| [j.priority.to_i, j.queue] }
.transform_values(&:first)
end

def grouped_min(scope, column)
Delayed::Job.from(scope.select("priority, queue, MIN(#{column}) AS #{column}"))
.group(priority_case_statement, :queue)
.select(<<~SQL.squish)
(#{priority_case_statement}) AS priority,
queue,
MIN(#{column}) AS #{column},
#{self.class.sql_now_in_utc} AS db_now_utc
SQL
.group_by { |j| [j.priority.to_i, j.queue] }
.select(
"MIN(#{column}) AS #{column}",
"#{priority_case_statement} AS priority",
'queue AS queue',
"#{self.class.sql_now_in_utc} AS db_now_utc",
).group_by { |j| [j.priority.to_i, j.queue] }
.transform_values(&:first)
end

def count_grouped
if Job.connection.supports_partial_index?
failed_count_grouped.merge(live_count_grouped) { |_, l, f| l + f }
else
grouped_count(jobs)
grouped_count(jobs, count: 'COUNT(*)').transform_values(&:count)
end
end

def live_count_grouped
grouped_count(jobs.live)
live_counts.transform_values(&:count)
end

def future_count_grouped
grouped_count(jobs.future)
live_counts.transform_values(&:future_count)
end

def locked_count_grouped
@memo[:locked_count_grouped] ||= grouped_count(jobs.claimed)
def erroring_count_grouped
live_counts.transform_values(&:erroring_count)
end

def erroring_count_grouped
grouped_count(jobs.erroring)
def locked_count_grouped
@memo[:locked_count_grouped] ||= pending_counts.transform_values(&:claimed_count)
end

def failed_count_grouped
@memo[:failed_count_grouped] ||= grouped_count(jobs.failed)
@memo[:failed_count_grouped] ||= grouped_count(jobs.failed, count: 'COUNT(*)').transform_values(&:count)
end

def max_lock_age_grouped
Expand All @@ -158,7 +166,7 @@ def alert_age_percent_grouped
end

def workable_count_grouped
grouped_count(jobs.claimable)
pending_counts.transform_values(&:claimable_count)
end

alias working_count_grouped locked_count_grouped
Expand All @@ -179,6 +187,23 @@ def oldest_run_at_query
@memo[:oldest_run_at_query] ||= grouped_min(jobs.claimable, :run_at)
end

def live_counts
@memo[:live_counts] ||= grouped_count(
jobs.live,
count: 'COUNT(*)',
future_count: "SUM(#{case_when(Job.future_clause.to_sql)})",
erroring_count: "SUM(#{case_when(Job.erroring_clause.to_sql)})",
)
end

def pending_counts
@memo[:pending_counts] ||= grouped_count(
jobs.pending,
claimed_count: "SUM(#{case_when(Job.claimed_clause.to_sql)})",
claimable_count: "SUM(#{case_when(Job.claimable_clause.to_sql)})",
)
end

def db_now(record)
self.class.parse_utc_time(record.db_now_utc)
end
Expand All @@ -187,6 +212,10 @@ def time_ago(now, value)
[now - (value || now), 0].max
end

def case_when(condition)
"CASE WHEN #{condition} THEN 1 ELSE 0 END"
end

def priority_case_statement
[
'CASE',
Expand Down
Loading