Merge pull request #3230 from NobodysNightmare/feature/sql_journal_aggregation
Journal aggregation (SQL based approach)pull/3276/head
commit
542335195c
@ -0,0 +1,222 @@ |
||||
#-- encoding: UTF-8 |
||||
#-- copyright |
||||
# OpenProject is a project management system. |
||||
# Copyright (C) 2012-2015 the OpenProject Foundation (OPF) |
||||
# |
||||
# This program is free software; you can redistribute it and/or |
||||
# modify it under the terms of the GNU General Public License version 3. |
||||
# |
||||
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows: |
||||
# Copyright (C) 2006-2013 Jean-Philippe Lang |
||||
# Copyright (C) 2010-2013 the ChiliProject Team |
||||
# |
||||
# This program is free software; you can redistribute it and/or |
||||
# modify it under the terms of the GNU General Public License |
||||
# as published by the Free Software Foundation; either version 2 |
||||
# of the License, or (at your option) any later version. |
||||
# |
||||
# This program is distributed in the hope that it will be useful, |
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
# GNU General Public License for more details. |
||||
# |
||||
# You should have received a copy of the GNU General Public License |
||||
# along with this program; if not, write to the Free Software |
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
||||
# |
||||
# See doc/COPYRIGHT.rdoc for more details. |
||||
#++ |
||||
|
||||
# Similar to regular Journals, but under the following circumstances journals are aggregated: |
||||
# * they are in temporal proximity |
||||
# * they belong to the same resource |
||||
# * they were created by the same user (i.e. the same user edited the journable) |
||||
# * no other user has an own journal on the same object between the aggregated ones |
||||
# When a user commented (added a note) twice within a short time, the second comment will |
||||
# "open" a new aggregation, since we do not want to merge comments in any way. |
||||
# The term "aggregation" means the following when applied to our journaling: |
||||
# * ignore/hide old journal rows (since every journal row contains a full copy of the journaled |
||||
# object, dropping intermediate rows will just increase the diff of the following journal) |
||||
# * in case an older row had notes, take the notes from the older row, since they shall not |
||||
# be dropped |
||||
class Journal::AggregatedJournal |
||||
class << self |
||||
def aggregated_journals(journable: nil) |
||||
query_aggregated_journals(journable: journable).map { |journal| |
||||
Journal::AggregatedJournal.new(journal) |
||||
} |
||||
end |
||||
|
||||
def query_aggregated_journals(journable: nil) |
||||
# Using the roughly aggregated groups from :sql_rough_group we need to merge journals |
||||
# where an entry with empty notes follows an entry containing notes, so that the notes |
||||
# from the main entry are taken, while the remaining information is taken from the |
||||
# more recent entry. We therefore join the rough groups with itself |
||||
# _wherever a merge would be valid_. |
||||
# Since the results are already pre-merged, this can only happen if Our first entry (master) |
||||
# had a comment and its successor (addition) had no comment, but can be merged. |
||||
# This alone would, however, leave the addition in the result set, leaving a "no change" |
||||
# journal entry back. By an additional self-join towards the predecessor, we can make sure |
||||
# that our own row (master) would not already have been merged by its predecessor. If it is |
||||
# (that means if we can find a valid predecessor), we drop our current row, because it will |
||||
# already be present (in a merged form) in the row of our predecessor. |
||||
Journal.from("(#{sql_rough_group(journable, 1)}) #{table_name}") |
||||
.joins("LEFT OUTER JOIN (#{sql_rough_group(journable, 2)}) addition |
||||
ON #{sql_on_groups_belong_condition(table_name, 'addition')}") |
||||
.joins("LEFT OUTER JOIN (#{sql_rough_group(journable, 3)}) predecessor |
||||
ON #{sql_on_groups_belong_condition('predecessor', table_name)}") |
||||
.where('predecessor.id IS NULL') |
||||
.select("#{table_name}.journable_id, |
||||
#{table_name}.journable_type, |
||||
#{table_name}.user_id, |
||||
#{table_name}.notes, |
||||
#{table_name}.id \"notes_id\", |
||||
#{table_name}.activity_type, |
||||
COALESCE(addition.created_at, #{table_name}.created_at) \"created_at\", |
||||
COALESCE(addition.id, #{table_name}.id) \"id\", |
||||
COALESCE(addition.version, #{table_name}.version) \"version\"") |
||||
end |
||||
|
||||
private |
||||
|
||||
# Provides a full SQL statement that returns journals that are aggregated on a basic level: |
||||
# * a row is dropped as soon as its successor is eligible to be merged with it |
||||
# * rows with a comment are never dropped (we _might_ need the comment later) |
||||
# Thereby the result already has aggregation performed, but will still have too many rows: |
||||
# Changes without notes after changes containing notes (even if both were performed by |
||||
# the same user). Those need to be filtered out later. |
||||
# To be able to self-join results of this statement, we add an additional column called |
||||
# "group_number" to the result. This allows to compare a group resulting from this query with |
||||
# its predecessor and successor. |
||||
def sql_rough_group(journable, uid) |
||||
sql = "SELECT predecessor.*, #{sql_group_counter(uid)} AS group_number |
||||
FROM #{sql_rough_group_from_clause(uid)} |
||||
LEFT OUTER JOIN journals successor |
||||
ON predecessor.version + 1 = successor.version AND |
||||
predecessor.journable_type = successor.journable_type AND |
||||
predecessor.journable_id = successor.journable_id |
||||
WHERE (predecessor.user_id != successor.user_id OR |
||||
(predecessor.notes != '' AND predecessor.notes IS NOT NULL) OR |
||||
#{sql_beyond_aggregation_time?('predecessor', 'successor')} OR |
||||
successor.id IS NULL)" |
||||
|
||||
if journable |
||||
sql += " AND predecessor.journable_type = '#{journable.class.name}' AND |
||||
predecessor.journable_id = #{journable.id}" |
||||
end |
||||
|
||||
sql |
||||
end |
||||
|
||||
# The "group_number" required in :sql_rough_group has to be generated differently depending on |
||||
# the DBMS used. This method returns the appropriate statement to be used inside a SELECT to |
||||
# obtain the current group number. |
||||
# The :uid parameter allows to define non-conflicting variable names (for MySQL). |
||||
def sql_group_counter(uid) |
||||
if OpenProject::Database.mysql? |
||||
group_counter = mysql_group_count_variable(uid) |
||||
"(#{group_counter} := #{group_counter} + 1)" |
||||
else |
||||
'row_number() OVER ()' |
||||
end |
||||
end |
||||
|
||||
# MySQL requires some initialization to be performed before being able to count the groups. |
||||
# This method allows to inject further FROM sources to achieve that in a single SQL statement. |
||||
# Sadly MySQL requires the whole statement to be wrapped in parenthesis, while PostgreSQL |
||||
# prohibits that. |
||||
def sql_rough_group_from_clause(uid) |
||||
if OpenProject::Database.mysql? |
||||
"(journals predecessor, (SELECT #{mysql_group_count_variable(uid)}:=0) number_initializer)" |
||||
else |
||||
'journals predecessor' |
||||
end |
||||
end |
||||
|
||||
def mysql_group_count_variable(uid) |
||||
"@aggregated_journal_row_counter_#{uid}" |
||||
end |
||||
|
||||
# Similar to the WHERE statement used in :sql_rough_group. However, this condition will |
||||
# match (return true) for all pairs where a merge/aggregation IS possible. |
||||
def sql_on_groups_belong_condition(predecessor, successor) |
||||
"#{predecessor}.group_number + 1 = #{successor}.group_number AND |
||||
(NOT #{sql_beyond_aggregation_time?(predecessor, successor)} AND |
||||
#{predecessor}.user_id = #{successor}.user_id AND |
||||
#{successor}.journable_type = #{predecessor}.journable_type AND |
||||
#{successor}.journable_id = #{predecessor}.journable_id AND |
||||
NOT ((#{predecessor}.notes != '' AND #{predecessor}.notes IS NOT NULL) AND |
||||
(#{successor}.notes != '' AND #{successor}.notes IS NOT NULL)))" |
||||
end |
||||
|
||||
# Returns a SQL condition that will determine whether two entries are too far apart (temporal) |
||||
# to be considered for aggregation. This takes the current instance settings for temporal |
||||
# proximity into account. |
||||
def sql_beyond_aggregation_time?(predecessor, successor) |
||||
aggregation_time_seconds = Setting.journal_aggregation_time_minutes.to_i * 60 |
||||
|
||||
if OpenProject::Database.mysql? |
||||
difference = "TIMESTAMPDIFF(second, #{predecessor}.created_at, #{successor}.created_at)" |
||||
threshold = aggregation_time_seconds |
||||
else |
||||
difference = "(#{successor}.created_at - #{predecessor}.created_at)" |
||||
threshold = "interval '#{aggregation_time_seconds} second'" |
||||
end |
||||
|
||||
"(#{difference} > #{threshold})" |
||||
end |
||||
|
||||
def table_name |
||||
Journal.table_name |
||||
end |
||||
end |
||||
|
||||
delegate :journable_type, |
||||
:journable_id, |
||||
:journable, |
||||
:user_id, |
||||
:user, |
||||
:notes, |
||||
:activity_type, |
||||
:created_at, |
||||
:id, |
||||
:version, |
||||
:attributes, |
||||
to: :journal |
||||
|
||||
def initialize(journal) |
||||
@journal = journal |
||||
end |
||||
|
||||
def user |
||||
@user ||= User.find(user_id) |
||||
end |
||||
|
||||
def predecessor |
||||
unless defined? @predecessor |
||||
raw_journal = self.class.query_aggregated_journals(journable: journable) |
||||
.where("#{Journal.table_name}.version < ?", version) |
||||
.order("#{Journal.table_name}.version DESC") |
||||
.first |
||||
|
||||
@predecessor = raw_journal ? Journal::AggregatedJournal.new(raw_journal) : nil |
||||
end |
||||
|
||||
@predecessor |
||||
end |
||||
|
||||
def initial? |
||||
predecessor.nil? |
||||
end |
||||
|
||||
# ARs automagic addition of dynamic columns (those not present in the physical table) seems |
||||
# not to work with PostgreSQL and simply return a string for unknown columns. |
||||
# Thus we need to ensure manually that this column is correctly casted. |
||||
def notes_id |
||||
ActiveRecord::ConnectionAdapters::Column.value_to_integer(journal.notes_id) |
||||
end |
||||
|
||||
private |
||||
|
||||
attr_reader :journal |
||||
end |
@ -0,0 +1,219 @@ |
||||
#-- encoding: UTF-8 |
||||
#-- copyright |
||||
# OpenProject is a project management system. |
||||
# Copyright (C) 2012-2015 the OpenProject Foundation (OPF) |
||||
# |
||||
# This program is free software; you can redistribute it and/or |
||||
# modify it under the terms of the GNU General Public License version 3. |
||||
# |
||||
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows: |
||||
# Copyright (C) 2006-2013 Jean-Philippe Lang |
||||
# Copyright (C) 2010-2013 the ChiliProject Team |
||||
# |
||||
# This program is free software; you can redistribute it and/or |
||||
# modify it under the terms of the GNU General Public License |
||||
# as published by the Free Software Foundation; either version 2 |
||||
# of the License, or (at your option) any later version. |
||||
# |
||||
# This program is distributed in the hope that it will be useful, |
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
# GNU General Public License for more details. |
||||
# |
||||
# You should have received a copy of the GNU General Public License |
||||
# along with this program; if not, write to the Free Software |
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
||||
# |
||||
# See doc/COPYRIGHT.rdoc for more details. |
||||
#++require 'rspec' |
||||
|
||||
require 'spec_helper' |
||||
|
||||
RSpec::Matchers.define :be_equivalent_to_journal do |expected| |
||||
ignored_attributes = [:notes_id] |
||||
|
||||
match do |actual| |
||||
expected_attributes = get_normalized_attributes expected |
||||
actual_attributes = get_normalized_attributes actual |
||||
|
||||
expected_attributes.except(*ignored_attributes) == actual_attributes.except(*ignored_attributes) |
||||
end |
||||
|
||||
failure_message do |actual| |
||||
expected_attributes = get_normalized_attributes expected |
||||
actual_attributes = actual.attributes.symbolize_keys |
||||
["expected attributes: #{display_sorted_hash(expected_attributes.except(*ignored_attributes))}", |
||||
"actual attributes: #{display_sorted_hash(actual_attributes.except(*ignored_attributes))}"] |
||||
.join($/) |
||||
end |
||||
|
||||
def get_normalized_attributes(journal) |
||||
result = journal.attributes.symbolize_keys |
||||
if result[:created_at] |
||||
# µs are not stored in all DBMS |
||||
result[:created_at] = result[:created_at].change(usec: 0) |
||||
end |
||||
|
||||
result |
||||
end |
||||
|
||||
def display_sorted_hash(hash) |
||||
'{ ' + hash.sort.map { |k, v| "#{k.inspect}=>#{v.inspect}" }.join(', ') + ' }' |
||||
end |
||||
end |
||||
|
||||
describe Journal::AggregatedJournal, type: :model do |
||||
let(:work_package) { |
||||
FactoryGirl.build(:work_package) |
||||
} |
||||
let(:user1) { FactoryGirl.create(:user) } |
||||
let(:user2) { FactoryGirl.create(:user) } |
||||
let(:initial_author) { user1 } |
||||
|
||||
subject { described_class.aggregated_journals.sort_by &:id } |
||||
|
||||
before do |
||||
allow(User).to receive(:current).and_return(initial_author) |
||||
work_package.save! |
||||
end |
||||
|
||||
it 'returns the one and only journal' do |
||||
expect(subject.count).to eql 1 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.first |
||||
end |
||||
|
||||
it 'also indicates its ID via notes_id' do |
||||
expect(subject.first.notes_id).to eql work_package.journals.first.id |
||||
end |
||||
|
||||
it 'is the initial journal' do |
||||
expect(subject.first.initial?).to be_truthy |
||||
end |
||||
|
||||
context 'WP updated immediately after uncommented change' do |
||||
let(:notes) { nil } |
||||
|
||||
before do |
||||
changes = { status: FactoryGirl.build(:status) } |
||||
changes[:notes] = notes if notes |
||||
|
||||
expect(work_package.update_by!(new_author, changes)).to be_truthy |
||||
end |
||||
|
||||
context 'by author of last change' do |
||||
let(:new_author) { initial_author } |
||||
|
||||
it 'returns a single aggregated journal' do |
||||
expect(subject.count).to eql 1 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.second |
||||
end |
||||
|
||||
it 'is the initial journal' do |
||||
expect(subject.first.initial?).to be_truthy |
||||
end |
||||
|
||||
context 'with a comment' do |
||||
let(:notes) { 'This is why I changed it.' } |
||||
|
||||
it 'returns a single aggregated journal' do |
||||
expect(subject.count).to eql 1 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.second |
||||
end |
||||
|
||||
context 'adding a second comment' do |
||||
let(:notes) { 'Another comment, unrelated to the first one.' } |
||||
|
||||
before do |
||||
expect(work_package.update_by!(new_author, notes: notes)).to be_truthy |
||||
end |
||||
|
||||
it 'returns two journals' do |
||||
expect(subject.count).to eql 2 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.second |
||||
expect(subject.second).to be_equivalent_to_journal work_package.journals.last |
||||
end |
||||
|
||||
it 'has one initial journal and one non-initial journal' do |
||||
expect(subject.first.initial?).to be_truthy |
||||
expect(subject.second.initial?).to be_falsey |
||||
end |
||||
end |
||||
|
||||
context 'adding another change without comment' do |
||||
before do |
||||
work_package.reload # need to update the lock_version, avoiding StaleObjectError |
||||
expect(work_package.update_by!(new_author, subject: 'foo')).to be_truthy |
||||
end |
||||
|
||||
it 'returns a single journal' do |
||||
expect(subject.count).to eql 1 |
||||
end |
||||
|
||||
it 'combines the notes of the earlier journal with attributes of the later journal' do |
||||
expected_journal = work_package.journals.last |
||||
expected_journal.notes = work_package.journals.second.notes |
||||
|
||||
expect(subject.first).to be_equivalent_to_journal expected_journal |
||||
end |
||||
|
||||
it 'indicates the ID of the earlier journal via notes_id' do |
||||
expect(subject.first.notes_id).to eql work_package.journals.second.id |
||||
end |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'by a different author' do |
||||
let(:new_author) { user2 } |
||||
|
||||
it 'returns both journals' do |
||||
expect(subject.count).to eql 2 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.first |
||||
expect(subject.second).to be_equivalent_to_journal work_package.journals.second |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'WP updated after aggregation timeout expired' do |
||||
let(:delay) { (Setting.journal_aggregation_time_minutes.to_i + 1).minutes } |
||||
|
||||
before do |
||||
work_package.status = FactoryGirl.build(:status) |
||||
work_package.save! |
||||
work_package.journals.second.created_at += delay |
||||
work_package.journals.second.save! |
||||
end |
||||
|
||||
it 'returns both journals' do |
||||
expect(subject.count).to eql 2 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.first |
||||
expect(subject.second).to be_equivalent_to_journal work_package.journals.second |
||||
end |
||||
end |
||||
|
||||
context 'different WP updated immediately after change' do |
||||
let(:other_wp) { FactoryGirl.build(:work_package) } |
||||
before do |
||||
other_wp.save! |
||||
end |
||||
|
||||
it 'returns both journals' do |
||||
expect(subject.count).to eql 2 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.first |
||||
expect(subject.second).to be_equivalent_to_journal other_wp.journals.first |
||||
end |
||||
end |
||||
|
||||
context 'passing a journable as parameter' do |
||||
subject { described_class.aggregated_journals(journable: work_package).sort_by &:id } |
||||
let(:other_wp) { FactoryGirl.build(:work_package) } |
||||
before do |
||||
other_wp.save! |
||||
end |
||||
|
||||
it 'only returns the journal for the requested work package' do |
||||
expect(subject.count).to eq 1 |
||||
expect(subject.first).to be_equivalent_to_journal work_package.journals.first |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue