OpenProject is the leading open source project management software.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
openproject/db/migrate/20210615150558_aggregate_jo...

392 lines
14 KiB

require_relative './20200924085508_cleanup_orphaned_journal_data'
class AggregateJournals < ActiveRecord::Migration[6.1]
def up
[Attachment,
Changeset,
Message,
News,
WikiContent,
WorkPackage,
Budget,
TimeEntry,
Document,
Meeting,
MeetingContent].each do |klass|
say_with_time "Aggregating journals for #{klass}" do
aggregate_journals(klass)
end
end
# Now cleanup all orphaned attachable/customizable journals
CleanupOrphanedJournalData.up
end
# The change is irreversible (aggregated journals cannot be broken down) but down will not cause database inconsistencies.
def aggregate_journals(klass)
klass.in_batches(of: ENV["OPENPROJECT_MIGRATION_AGGREGATE_JOURNALS_BATCH_SIZE"]&.to_i || 1000) do |instances|
# Instantiating is faster than calculating the aggregated journals multiple times.
aggregated_journals = aggregated_journals_of(klass, instances).to_a
aggregated_journals
.reject { |journal| journal.notes_id == journal.id }
.each do |mismatched_journal|
update_journal_notes(mismatched_journal)
end
remove_unnecessary_journals(klass, instances, aggregated_journals)
end
end
def aggregated_journals_of(klass, instances)
aggregated_journal(sql: Journal.where(journable_type: klass.name,
journable_id: instances.pluck(:id))
.to_sql)
end
def update_journal_notes(mismatched_journal)
sql = <<~SQL
UPDATE journals
SET notes = :notes
WHERE id = :id
SQL
suppress_messages do
execute ::OpenProject::SqlSanitization.sanitize(sql, notes: mismatched_journal.notes, id: mismatched_journal.id)
end
end
def remove_unnecessary_journals(klass, instances, aggregated_journals)
# Only delete the journals (without callbacks as it is faster).
# We remove the then orphaned attachable/customizable journals later.
Journal
.where(journable_type: klass.name, journable_id: instances.pluck(:id))
.where.not(id: aggregated_journals.map(&:id))
.delete_all
end
def aggregated_journal(journable: nil, sql: nil, until_version: nil)
journals_preselection = raw_journals_subselect(journable, sql, until_version)
# We wrap the sql with a subselect so that outside of this class,
# The fields native to journals (e.g. id, version) can be referenced, without
# having to also use a CASE/COALESCE statement.
Journal
.from(select_sql(journals_preselection))
.select("DISTINCT *")
end
private
# The sql used to query the database for the aggregated journals.
# The query makes use of 4 parts (which are selected from/joined):
# * The minimum journal version that starts an aggregation group.
# * The maximum journal version that ends an aggregation group.
# * Journals with notes that are within the bounds of minimum version/maximum version.
# * Journals with notes that are within the bounds of the before mentioned journal notes and the maximum journal version.
#
# The maximum version are those journals, whose successor:
# * Where created by a different user
# * Where created after the configured aggregation period had expired (always relative to the journal under consideration).
#
# The minimum version then is the maximum version of the group before - 1.
#
# e.g. a group of 10 sequential journals might break into the following groups
#
# Version 10 (User A, 6 minutes after 9)
# Version 9 (User A, 2 minutes after 8)
# Version 8 (User A, 4 minutes after 7)
# Version 7 (User A, 1 minute after 6)
# Version 6 (User A, 3 minutes after 5)
# Version 5 (User A, 1 minute after 4)
# Version 4 (User B, 1 minute after 3)
# Version 3 (User B, 4 minutes after 2)
# Version 2 (User A, 1 minute after 1)
# Version 1 (User A)
#
# would have the following maximum journals if the aggregation period where 5 minutes:
#
# Version 10 (User A, 6 minutes after 9)
# Version 9 (User A, 2 minutes after 8)
# Version 4 (User B, 1 minute after 3)
# Version 2 (User A, 1 minute after 1)
#
# The last journal (one without a successor) of a journable will obviously also always be a maximum journal.
#
# If the aggregation period where to be expanded to 7 minutes, the maximum journals would be slightly different:
#
# Version 10 (User A, 6 minutes after 9)
# Version 4 (User B, 1 minute after 3)
# Version 2 (User A, 1 minute after 1)
#
# As we do not store the aggregated journals, and rather calculate them on reading, the aggregated journals might be tuned
# by a user.
#
# The minimum version in the example with the 5 minute aggregation period would then be calculated from the maximum version:
#
# Version 10
# Version 5
# Version 3
# Version 1
#
# The first version will always be included.
#
# Without a journal with notes (the user commented on the journable) in between, the maximum journal is returned
# as the representation of every aggregation group. This is possible as the journals (together with their data and their
# customizable_journals/attachable_journals) represent the complete state of the journable at the given time.
#
# e.g. a group of 5 sequential journals without notes, belonging to the same user and created within the configured
# time difference between one journal and its successor
#
# Version 9
# Version 8
# Version 7
# Version 6
# Version 5
#
# would only return the last journal, Version 9.
#
# In case the group has one journal with notes in it, the last journal is also returned. But as we also want the note
# to be returned, we return the note as if it would belong to the maximum journal version. This explicitly means
# that all journals of the same group that are after the notes journal are also returned.
#
# e.g. a group of 5 sequential journals with only one note, belonging to the same user and created within the configured
# time difference between one journal and its successor
#
# Version 9
# Version 8
# Version 7
# Version 6 (note)
# Version 5
#
# would only return the last journal, Version 9, but would also return the note and the id of the journal the note
# belongs to natively.
#
# But as we do not want to aggregate notes, the behaviour above can no longer work if there is more than one note in the
# same group. In such a case, a group is cut into subsets. The journals returned will then only contain all the changes
# up until a journal with notes. The only exception to this is the last journal note which might also contain changes
# after it up to and including the maximum journal version of the group.
# e.g. a group of 5 sequential journals with only one note, belonging to the same user and created within the configured
# time difference between one journal and its successor
#
# Version 9
# Version 8 (note)
# Version 7
# Version 6 (note)
# Version 5
#
# would return the last journal, Version 9, but with the note of Version 8 and also a reference in the form of
# note_id pointing to Version 8. It would also return Version 6, with its note and a reference in the form of note_id
# this time pointing to the native journal, Version 6.
#
# The journals that are considered for aggregation can also be reduced by providing a subselect. Doing so, one
# can e.g. consider only the journals created after a certain time.
def select_sql(journals_preselection)
<<~SQL
(#{Journal
.from(start_group_journals_select(journals_preselection))
.joins(end_group_journals_join(journals_preselection))
.joins(notes_in_group_join(journals_preselection))
.joins(additional_notes_in_group_join(journals_preselection))
.select(projection_list).to_sql}) journals
SQL
end
def user_or_time_group_breaking_journals_subselect(journals_preselection)
<<~SQL
SELECT
predecessor.*,
row_number() OVER (ORDER BY predecessor.journable_type, predecessor.journable_id, predecessor.version ASC) #{group_number_alias}
FROM #{journals_preselection} predecessor
LEFT OUTER JOIN #{journals_preselection} successor
ON predecessor.version + 1 = successor.version
AND predecessor.journable_type = successor.journable_type
AND predecessor.journable_id = successor.journable_id
WHERE (predecessor.user_id != successor.user_id
OR #{beyond_aggregation_time_condition})
OR successor.id IS NULL
SQL
end
def notes_journals_subselect(journals_preselection)
<<~SQL
(SELECT
notes_journals.*
FROM #{journals_preselection} notes_journals
WHERE notes_journals.notes != '' AND notes_journals.notes IS NOT NULL)
SQL
end
def start_group_journals_select(journals_preselection)
"(#{user_or_time_group_breaking_journals_subselect(journals_preselection)}) #{start_group_journals_alias}"
end
def end_group_journals_join(journals_preselection)
group_journals_join_condition = <<~SQL
#{start_group_journals_alias}.#{group_number_alias} = #{end_group_journals_alias}.#{group_number_alias} - 1
AND #{start_group_journals_alias}.journable_type = #{end_group_journals_alias}.journable_type
AND #{start_group_journals_alias}.journable_id = #{end_group_journals_alias}.journable_id
SQL
end_group_journals = <<~SQL
RIGHT OUTER JOIN
(#{user_or_time_group_breaking_journals_subselect(journals_preselection)}) #{end_group_journals_alias}
ON #{group_journals_join_condition}
SQL
Arel.sql(end_group_journals)
end
def notes_in_group_join(journals_preselection)
# As we right join on the minimum journal version, the minimum might be empty. We thus have to coalesce in such
# case as <= will not interpret NULL as 0.
# This also works if we do not fetch the whole set of journals starting from the first journal but rather
# start somewhere within the set. This might take place e.g. when fetching only the journals that are
# created after a certain point in time which is done when displaying of the last month in the activity module.
breaking_journals_notes_join_condition = <<~SQL
COALESCE(#{start_group_journals_alias}.version, 0) + 1 <= #{notes_in_group_alias}.version
AND #{end_group_journals_alias}.version >= #{notes_in_group_alias}.version
AND #{end_group_journals_alias}.journable_type = #{notes_in_group_alias}.journable_type
AND #{end_group_journals_alias}.journable_id = #{notes_in_group_alias}.journable_id
SQL
breaking_journals_notes = <<~SQL
LEFT OUTER JOIN
#{notes_journals_subselect(journals_preselection)} #{notes_in_group_alias}
ON #{breaking_journals_notes_join_condition}
SQL
Arel.sql(breaking_journals_notes)
end
def additional_notes_in_group_join(journals_preselection)
successor_journals_notes_join_condition = <<~SQL
#{notes_in_group_alias}.version < successor_notes.version
AND #{end_group_journals_alias}.version >= successor_notes.version
AND #{end_group_journals_alias}.journable_type = successor_notes.journable_type
AND #{end_group_journals_alias}.journable_id = successor_notes.journable_id
SQL
successor_journals_notes = <<~SQL
LEFT OUTER JOIN
#{notes_journals_subselect(journals_preselection)} successor_notes
ON #{successor_journals_notes_join_condition}
SQL
Arel.sql(successor_journals_notes)
end
def projection_list
projections = <<~SQL
#{end_group_journals_alias}.journable_type,
#{end_group_journals_alias}.journable_id,
#{end_group_journals_alias}.user_id,
#{end_group_journals_alias}.activity_type,
#{notes_projection} notes,
#{notes_id_projection} notes_id,
#{notes_in_group_alias}.version notes_version,
#{version_projection} AS version,
#{created_at_projection} created_at,
#{updated_at_projection} updated_at,
#{id_projection} id
SQL
Arel.sql(projections)
end
def id_projection
attribute_projection(:id)
end
def version_projection
attribute_projection(:version)
end
def created_at_projection
attribute_projection(:created_at)
end
def updated_at_projection
attribute_projection(:updated_at)
end
def attribute_projection(attribute)
<<~SQL
CASE
WHEN successor_notes.version IS NOT NULL THEN #{notes_in_group_alias}.#{attribute}
ELSE #{end_group_journals_alias}.#{attribute} END
SQL
end
def notes_id_projection
<<~SQL
COALESCE(#{notes_in_group_alias}.id, #{end_group_journals_alias}.id)
SQL
end
def notes_projection
<<~SQL
COALESCE(#{notes_in_group_alias}.notes, '')
SQL
end
def raw_journals_subselect(journable, sql, until_version)
if sql
raise 'until_version used together with sql' if until_version
"(#{sql})"
elsif journable
limit = until_version ? "AND journals.version <= #{until_version}" : ''
<<~SQL
(
SELECT * from journals
WHERE journals.journable_id = #{journable.id}
AND journals.journable_type = '#{journable.class.name}'
#{limit}
)
SQL
else
where = until_version ? "WHERE journals.version <= #{until_version}" : ''
<<~SQL
(SELECT * from journals #{where})
SQL
end
end
# Returns a SQL condition that will determine whether two entries are too far apart (temporal)
# to be considered for aggregation. This takes the current instance settings for temporal
# proximity into account.
def beyond_aggregation_time_condition
aggregation_time_seconds = Setting.journal_aggregation_time_minutes.to_i.minutes
if aggregation_time_seconds == 0
# if aggregation is disabled, we consider everything to be beyond aggregation time
# even if creation dates are exactly equal
return '(true = true)'
end
difference = "(successor.created_at - predecessor.created_at)"
threshold = "interval '#{aggregation_time_seconds} second'"
"(#{difference} > #{threshold})"
end
def start_group_journals_alias
"start_groups_journals"
end
def end_group_journals_alias
"end_groups_journals"
end
def group_number_alias
"group_number"
end
def notes_in_group_alias
"notes_in_group_journals"
end
end