diff --git a/app/chewy/public_statuses_index.rb b/app/chewy/public_statuses_index.rb new file mode 100644 index 00000000000000..ff7b770138d432 --- /dev/null +++ b/app/chewy/public_statuses_index.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +class PublicStatusesIndex < Chewy::Index + include FormattingHelper + + settings index: { refresh_interval: '30s' }, analysis: { + filter: { + english_stop: { + type: 'stop', + stopwords: '_english_', + }, + english_stemmer: { + type: 'stemmer', + language: 'english', + }, + english_possessive_stemmer: { + type: 'stemmer', + language: 'possessive_english', + }, + }, + analyzer: { + content: { + tokenizer: 'uax_url_email', + filter: %w( + english_possessive_stemmer + lowercase + asciifolding + cjk_width + english_stop + english_stemmer + ), + }, + }, + } + + # We do not use delete_if option here because it would call a method that we + # expect to be called with crutches without crutches, causing n+1 queries + index_scope ::Status.unscoped + .kept + .without_reblogs + .includes(:media_attachments, :preloadable_poll) + .joins(:account) + .where(accounts: { discoverable: true }) + .where(visibility: :public) + + crutch :mentions do |collection| + data = ::Mention.where(status_id: collection.map(&:id)).where(account: Account.local, silent: false).pluck(:status_id, :account_id) + data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } + end + + crutch :favourites do |collection| + data = ::Favourite.where(status_id: collection.map(&:id)).where(account: Account.local).pluck(:status_id, :account_id) + data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } + end + + crutch :reblogs do |collection| + data = ::Status.where(reblog_of_id: collection.map(&:id)).where(account: Account.local).pluck(:reblog_of_id, :account_id) + data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } + end + + crutch :bookmarks do |collection| + data = ::Bookmark.where(status_id: collection.map(&:id)).where(account: Account.local).pluck(:status_id, :account_id) + data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } + end + + crutch :votes do |collection| + data = ::PollVote.joins(:poll).where(poll: { status_id: collection.map(&:id) }).where(account: Account.local).pluck(:status_id, :account_id) + data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } + end + + root date_detection: false do + field(:id, type: 'long') + field(:account_id, type: 'long') + + field(:text, type: 'text', value: ->(status) { status.searchable_text }) do + field(:stemmed, type: 'text', analyzer: 'content') + end + end +end diff --git a/app/chewy/statuses_index.rb b/app/chewy/statuses_index.rb index 6dd4fb18b024dc..1b2781100971a5 100644 --- a/app/chewy/statuses_index.rb +++ b/app/chewy/statuses_index.rb @@ -63,13 +63,13 @@ class StatusesIndex < Chewy::Index end root date_detection: false do - field :id, type: 'long' - field :account_id, type: 'long' + field(:id, type: 'long') + field(:account_id, type: 'long') - field :text, type: 'text', value: ->(status) { status.searchable_text } do - field :stemmed, type: 'text', analyzer: 'content' + field(:text, type: 'text', value: ->(status) { status.searchable_text }) do + field(:stemmed, type: 'text', analyzer: 'content') end - field :searchable_by, type: 'long', value: ->(status, crutches) { status.searchable_by(crutches) } + field(:searchable_by, type: 'long', value: ->(status, crutches) { status.searchable_by(crutches) }) end end diff --git a/app/lib/importer/public_statuses_index_importer.rb b/app/lib/importer/public_statuses_index_importer.rb new file mode 100644 index 00000000000000..b9628d7c58a919 --- /dev/null +++ b/app/lib/importer/public_statuses_index_importer.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +class Importer::PublicStatusesIndexImporter < Importer::BaseImporter + def import! + # Similar to the StatusesIndexImporter, we will process different scopes + # to import data into the PublicStatusesIndex. + scopes.each do |scope| + scope.find_in_batches(batch_size: @batch_size) do |batch| + in_work_unit(batch.map(&:status_id)) do |status_ids| + bulk = ActiveRecord::Base.connection_pool.with_connection do + status_data = Status.includes(:media_attachments, :preloadable_poll) + .joins(:account) + .where(accounts: { discoverable: true }) + .where(id: status_ids) + Chewy::Index::Import::BulkBuilder.new(index, to_index: status_data).bulk_body + end + + indexed = 0 + deleted = 0 + + bulk.map! do |entry| + if entry[:index] + indexed += 1 + else + deleted += 1 + end + entry + end + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + end + + wait! + end + + private + + def index + PublicStatusesIndex + end + + def scopes + [ + local_statuses_scope, + local_mentions_scope, + local_favourites_scope, + local_votes_scope, + local_bookmarks_scope, + ] + end + + def local_mentions_scope + Mention.where(account: Account.local, silent: false) + .joins(status: :account) + .where(accounts: { discoverable: true }) + .where(statuses: { visibility: :public }) + .select('mentions.id, statuses.id AS status_id') + end + + def local_favourites_scope + Favourite.where(account: Account.local) + .joins(status: :account) + .where(accounts: { discoverable: true }) + .where(statuses: { visibility: :public }) + .select('favourites.id, statuses.id AS status_id') + end + + def local_bookmarks_scope + Bookmark.joins(status: :account) + .where(accounts: { discoverable: true }) + .where(statuses: { visibility: :public }) + .select('bookmarks.id, statuses.id AS status_id') + end + + def local_votes_scope + local_account_ids = Account.where(discoverable: true).pluck(:id) + + Poll.joins(:votes) + .where(poll_votes: { account_id: local_account_ids }) + .where(status_id: Status.where(visibility: :public)) + end + + def local_statuses_scope + Status.local + .select('"statuses"."id", COALESCE("statuses"."reblog_of_id", "statuses"."id") AS status_id') + .joins(:account) + .where(accounts: { discoverable: true }) + .where(visibility: :public) + end +end diff --git a/app/lib/vacuum/statuses_vacuum.rb b/app/lib/vacuum/statuses_vacuum.rb index 28c087b1c2e8eb..ad1de07380ceb9 100644 --- a/app/lib/vacuum/statuses_vacuum.rb +++ b/app/lib/vacuum/statuses_vacuum.rb @@ -20,7 +20,10 @@ def vacuum_statuses! statuses.direct_visibility .includes(mentions: :account) .find_each(&:unlink_from_conversations!) - remove_from_search_index(statuses.ids) if Chewy.enabled? + if Chewy.enabled? + remove_from_index(statuses.ids, 'chewy:queue:StatusesIndex') + remove_from_index(statuses.ids, 'chewy:queue:PublicStatusesIndex') + end # Foreign keys take care of most associated records for us. # Media attachments will be orphaned. @@ -38,7 +41,7 @@ def retention_period_as_id Mastodon::Snowflake.id_at(@retention_period.ago, with_random: false) end - def remove_from_search_index(status_ids) - with_redis { |redis| redis.sadd('chewy:queue:StatusesIndex', status_ids) } + def remove_from_index(status_ids, index) + with_redis { |redis| redis.sadd(index, status_ids) } end end diff --git a/app/models/account.rb b/app/models/account.rb index 1edc15972d5eaf..ed74a1e4f35d69 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -79,6 +79,7 @@ class Account < ApplicationRecord include DomainMaterializable include AccountMerging include AccountSearch + include AccountStatusesSearch enum protocol: { ostatus: 0, activitypub: 1 } enum suspension_origin: { local: 0, remote: 1 }, _prefix: true @@ -131,6 +132,7 @@ class Account < ApplicationRecord scope :not_domain_blocked_by_account, ->(account) { where(arel_table[:domain].eq(nil).or(arel_table[:domain].not_in(account.excluded_from_timeline_domains))) } after_update_commit :trigger_update_webhooks + after_update :enqueue_update_public_statuses_index, if: :saved_change_to_discoverable? and Chewy.enabled? delegate :email, :unconfirmed_email, @@ -168,6 +170,10 @@ def bot? %w(Application Service).include? actor_type end + def undiscoverable? + !discoverable? + end + def instance_actor? id == -99 end @@ -451,6 +457,7 @@ def emojis before_validation :prepare_username, on: :create before_create :generate_keys before_destroy :clean_feed_manager + after_commit :enqueue_remove_from_public_statuses_index, on: :destroy, if: -> { Chewy.enabled? && discoverable? } def ensure_keys! return unless local? && private_key.blank? && public_key.blank? diff --git a/app/models/concerns/account_statuses_search.rb b/app/models/concerns/account_statuses_search.rb new file mode 100644 index 00000000000000..4d88f9954acfa6 --- /dev/null +++ b/app/models/concerns/account_statuses_search.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module AccountStatusesSearch + extend ActiveSupport::Concern + + def enqueue_update_public_statuses_index + if discoverable? + enqueue_add_to_public_statuses_index + else + enqueue_remove_from_public_statuses_index + end + end + + def enqueue_add_to_public_statuses_index + return unless Chewy.enabled? + + AddToPublicStatusesIndexWorker.perform_async(id) + end + + def enqueue_remove_from_public_statuses_index + return unless Chewy.enabled? + + RemoveFromPublicStatusesIndexWorker.perform_async(id) + end + + def add_to_public_statuses_index! + return unless Chewy.enabled? + + batch_size = 1000 + offset = 0 + + loop do + batch = Status.where(account_id: id).offset(offset).limit(batch_size) + + break if batch.empty? + + Chewy.strategy(:sidekiq) do + PublicStatusesIndex.import(query: batch) + end + + offset += batch_size + end + end + + def remove_from_public_statuses_index! + return unless Chewy.enabled? + + PublicStatusesIndex.filter(term: { account_id: id }).delete_all + end +end diff --git a/app/models/status.rb b/app/models/status.rb index ff85ff2388f88c..e68d38f77a5b81 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -47,6 +47,7 @@ class Status < ApplicationRecord attr_accessor :override_timestamps update_index('statuses', :proper) + update_index('publicStatuses', :proper) enum visibility: { public: 0, unlisted: 1, private: 2, direct: 3, limited: 4 }, _suffix: :visibility diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index f5cb339cdf747f..9098e10535bc19 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -35,7 +35,7 @@ def call(statuses, **options) # Since we skipped all callbacks, we also need to manually # deindex the statuses - Chewy.strategy.current.update(StatusesIndex, statuses_and_reblogs) if Chewy.enabled? + Chewy::Index.update([StatusesIndex, PublicStatusesIndex], statuses_and_reblogs) if Chewy.enabled? return if options[:skip_side_effects] diff --git a/app/services/delete_account_service.rb b/app/services/delete_account_service.rb index 190a72e5c5f178..6ef6773d0a98a0 100644 --- a/app/services/delete_account_service.rb +++ b/app/services/delete_account_service.rb @@ -187,7 +187,7 @@ def purge_favourites! @account.favourites.in_batches do |favourites| ids = favourites.pluck(:status_id) StatusStat.where(status_id: ids).update_all('favourites_count = GREATEST(0, favourites_count - 1)') - Chewy.strategy.current.update(StatusesIndex, ids) if Chewy.enabled? + Chewy::Index.update([StatusesIndex, PublicStatusesIndex], ids) if Chewy.enabled? Rails.cache.delete_multi(ids.map { |id| "statuses/#{id}" }) favourites.delete_all end @@ -195,7 +195,7 @@ def purge_favourites! def purge_bookmarks! @account.bookmarks.in_batches do |bookmarks| - Chewy.strategy.current.update(StatusesIndex, bookmarks.pluck(:status_id)) if Chewy.enabled? + Chewy::Index.update([StatusesIndex, PublicStatusesIndex], bookmarks.pluck(:status_id)) if Chewy.enabled? bookmarks.delete_all end end diff --git a/app/services/search_service.rb b/app/services/search_service.rb index 30937471bd5e56..4e1e7ea26e1c1f 100644 --- a/app/services/search_service.rb +++ b/app/services/search_service.rb @@ -39,25 +39,15 @@ def perform_accounts_search! end def perform_statuses_search! - definition = parsed_query.apply(StatusesIndex.filter(term: { searchable_by: @account.id })) - - definition = definition.filter(term: { account_id: @options[:account_id] }) if @options[:account_id].present? - - if @options[:min_id].present? || @options[:max_id].present? - range = {} - range[:gt] = @options[:min_id].to_i if @options[:min_id].present? - range[:lt] = @options[:max_id].to_i if @options[:max_id].present? - definition = definition.filter(range: { id: range }) - end - - results = definition.limit(@limit).offset(@offset).objects.compact - account_ids = results.map(&:account_id) - account_domains = results.map(&:account_domain) - preloaded_relations = @account.relations_map(account_ids, account_domains) - - results.reject { |status| StatusFilter.new(status, @account, preloaded_relations).filtered? } - rescue Faraday::ConnectionFailed, Parslet::ParseFailed - [] + StatusesSearchService.new.call( + @query, + @account, + limit: @limit, + offset: @offset, + account_id: @options[:account_id], + min_id: @options[:min_id], + max_id: @options[:max_id] + ) end def perform_hashtags_search! @@ -114,8 +104,4 @@ def hashtag_search? def statuses_search? @options[:type].blank? || @options[:type] == 'statuses' end - - def parsed_query - SearchQueryTransformer.new.apply(SearchQueryParser.new.parse(@query)) - end end diff --git a/app/services/statuses_search_service.rb b/app/services/statuses_search_service.rb new file mode 100644 index 00000000000000..7686da51b5edcf --- /dev/null +++ b/app/services/statuses_search_service.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +class StatusesSearchService < BaseService + def call(query, account = nil, options = {}) + @query = query&.strip + @account = account + @options = options + @limit = options[:limit].to_i + @offset = options[:offset].to_i + + status_search_results + end + + private + + def status_search_results + definition = parsed_query.apply( + StatusesIndex.filter( + bool: { + should: [ + publicly_searchable, + non_publicly_searchable, + ], + minimum_should_match: 1, + } + ) + ) + + definition = definition.filter(term: { account_id: @options[:account_id] }) if @options[:account_id].present? + + if @options[:min_id].present? || @options[:max_id].present? + range = {} + range[:gt] = @options[:min_id].to_i if @options[:min_id].present? + range[:lt] = @options[:max_id].to_i if @options[:max_id].present? + definition = definition.filter(range: { id: range }) + end + + definition.instance_variable_get(:@parameters)[:indices].value[:indices] << PublicStatusesIndex + + results = definition.limit(@limit).offset(@offset).objects.compact + account_ids = results.map(&:account_id) + account_domains = results.map(&:account_domain) + preloaded_relations = @account.relations_map(account_ids, account_domains) + + results.reject { |status| StatusFilter.new(status, @account, preloaded_relations).filtered? } + rescue Faraday::ConnectionFailed, Parslet::ParseFailed + [] + end + + def publicly_searchable + { + bool: { + must_not: { + exists: { + field: 'searchable_by', + }, + }, + }, + } + end + + def non_publicly_searchable + { + bool: { + must: [ + { + exists: { + field: 'searchable_by', + }, + }, + { + term: { searchable_by: @account.id }, + }, + ], + }, + } + end + + def parsed_query + SearchQueryTransformer.new.apply(SearchQueryParser.new.parse(@query)) + end +end diff --git a/app/workers/add_to_public_statuses_index_worker.rb b/app/workers/add_to_public_statuses_index_worker.rb new file mode 100644 index 00000000000000..04205125f403dd --- /dev/null +++ b/app/workers/add_to_public_statuses_index_worker.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +class AddToPublicStatusesIndexWorker + include Sidekiq::Worker + + def perform(account_id) + account = Account.find(account_id) + return unless account&.discoverable? + + account.add_to_public_statuses_index! + end +end diff --git a/app/workers/remove_from_public_statuses_index_worker.rb b/app/workers/remove_from_public_statuses_index_worker.rb new file mode 100644 index 00000000000000..61a3578a69b4bd --- /dev/null +++ b/app/workers/remove_from_public_statuses_index_worker.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +class RemoveFromPublicStatusesIndexWorker + include Sidekiq::Worker + + def perform(account_id) + account = Account.find(account_id) + return unless account&.undiscoverable? + + account.remove_from_public_statuses_index! + end +end diff --git a/app/workers/scheduler/indexing_scheduler.rb b/app/workers/scheduler/indexing_scheduler.rb index d622f5586e4666..e98a6f0d0d0fc0 100644 --- a/app/workers/scheduler/indexing_scheduler.rb +++ b/app/workers/scheduler/indexing_scheduler.rb @@ -25,6 +25,6 @@ def perform end def indexes - [AccountsIndex, TagsIndex, StatusesIndex] + [AccountsIndex, TagsIndex, PublicStatusesIndex, StatusesIndex] end end diff --git a/lib/mastodon/cli/search.rb b/lib/mastodon/cli/search.rb index 33bcad5feabe50..692933f748a2b7 100644 --- a/lib/mastodon/cli/search.rb +++ b/lib/mastodon/cli/search.rb @@ -10,6 +10,7 @@ class Search < Base InstancesIndex, AccountsIndex, TagsIndex, + PublicStatusesIndex, StatusesIndex, ].freeze diff --git a/spec/chewy/public_statuses_index_spec.rb b/spec/chewy/public_statuses_index_spec.rb new file mode 100644 index 00000000000000..2f93d0ff025a59 --- /dev/null +++ b/spec/chewy/public_statuses_index_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe PublicStatusesIndex do + describe 'Searching the index' do + before do + mock_elasticsearch_response(described_class, raw_response) + end + + it 'returns results from a query' do + results = described_class.query(match: { name: 'status' }) + + expect(results).to eq [] + end + end + + def raw_response + { + took: 3, + hits: { + hits: [ + { + _id: '0', + _score: 1.6375021, + }, + ], + }, + } + end +end diff --git a/spec/lib/importer/public_statuses_index_importer_spec.rb b/spec/lib/importer/public_statuses_index_importer_spec.rb new file mode 100644 index 00000000000000..9bf4bc2843dcd6 --- /dev/null +++ b/spec/lib/importer/public_statuses_index_importer_spec.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe Importer::PublicStatusesIndexImporter do + describe 'import!' do + let(:pool) { Concurrent::FixedThreadPool.new(5) } + let(:importer) { described_class.new(batch_size: 123, executor: pool) } + + before { Fabricate(:status) } + + it 'indexes relevant statuses' do + expect { importer.import! }.to update_index(PublicStatusesIndex) + end + end +end diff --git a/spec/models/concerns/account_statuses_search_spec.rb b/spec/models/concerns/account_statuses_search_spec.rb new file mode 100644 index 00000000000000..541754bd970983 --- /dev/null +++ b/spec/models/concerns/account_statuses_search_spec.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe AccountStatusesSearch do + let(:account) { Fabricate(:account, discoverable: discoverable) } + + before do + allow(Chewy).to receive(:enabled?).and_return(true) + end + + describe '#enqueue_update_public_statuses_index' do + before do + allow(account).to receive(:enqueue_add_to_public_statuses_index) + allow(account).to receive(:enqueue_remove_from_public_statuses_index) + end + + context 'when account is discoverable' do + let(:discoverable) { true } + + it 'enqueues add_to_public_statuses_index and not to remove_from_public_statuses_index' do + account.enqueue_update_public_statuses_index + expect(account).to have_received(:enqueue_add_to_public_statuses_index) + expect(account).to_not have_received(:enqueue_remove_from_public_statuses_index) + end + end + + context 'when account is not discoverable' do + let(:discoverable) { false } + + it 'enqueues remove_from_public_statuses_index and not to add_to_public_statuses_index' do + account.enqueue_update_public_statuses_index + expect(account).to have_received(:enqueue_remove_from_public_statuses_index) + expect(account).to_not have_received(:enqueue_add_to_public_statuses_index) + end + end + end + + describe '#enqueue_add_to_public_statuses_index' do + let(:discoverable) { nil } + let(:worker) { AddToPublicStatusesIndexWorker } + + before do + allow(worker).to receive(:perform_async) + end + + it 'enqueues AddToPublicStatusesIndexWorker' do + account.enqueue_add_to_public_statuses_index + expect(worker).to have_received(:perform_async).with(account.id) + end + end + + describe '#enqueue_remove_from_public_statuses_index' do + let(:discoverable) { nil } + let(:worker) { RemoveFromPublicStatusesIndexWorker } + + before do + allow(worker).to receive(:perform_async) + end + + it 'enqueues RemoveFromPublicStatusesIndexWorker' do + account.enqueue_remove_from_public_statuses_index + expect(worker).to have_received(:perform_async).with(account.id) + end + end +end diff --git a/spec/workers/add_to_public_statuses_index_worker_spec.rb b/spec/workers/add_to_public_statuses_index_worker_spec.rb new file mode 100644 index 00000000000000..2cc901e4e32e8c --- /dev/null +++ b/spec/workers/add_to_public_statuses_index_worker_spec.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe AddToPublicStatusesIndexWorker do + describe '#perform' do + let(:account) { Fabricate(:account, discoverable: discoverable) } + let(:account_id) { account.id } + + before do + allow(Account).to receive(:find).with(account_id).and_return(account) + allow(account).to receive(:add_to_public_statuses_index!) + end + + context 'when account is discoverable' do + let(:discoverable) { true } + + it 'adds the account to the public statuses index' do + subject.perform(account_id) + expect(account).to have_received(:add_to_public_statuses_index!) + end + end + + context 'when account is undiscoverable' do + let(:discoverable) { false } + + it 'does not add the account to public statuses index' do + subject.perform(account_id) + expect(account).to_not have_received(:add_to_public_statuses_index!) + end + end + + context 'when account does not exist' do + let(:account_id) { 999 } + let(:discoverable) { nil } + + it 'does not raise an error' do + expect { subject.perform(account_id) }.to_not raise_error + end + end + end +end diff --git a/spec/workers/remove_from_public_statuses_index_worker_spec.rb b/spec/workers/remove_from_public_statuses_index_worker_spec.rb new file mode 100644 index 00000000000000..88c810aac7dec0 --- /dev/null +++ b/spec/workers/remove_from_public_statuses_index_worker_spec.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe RemoveFromPublicStatusesIndexWorker do + describe '#perform' do + let(:account) { Fabricate(:account, discoverable: discoverable) } + let(:account_id) { account.id } + + before do + allow(Account).to receive(:find).with(account_id).and_return(account) + allow(account).to receive(:remove_from_public_statuses_index!) + end + + context 'when account is undiscoverable' do + let(:discoverable) { false } + + it 'removes the account from public statuses index' do + subject.perform(account_id) + expect(account).to have_received(:remove_from_public_statuses_index!) + end + end + + context 'when account is discoverable' do + let(:discoverable) { true } + + it 'does not remove the account from public statuses index' do + subject.perform(account_id) + expect(account).to_not have_received(:remove_from_public_statuses_index!) + end + end + + context 'when account does not exist' do + let(:account_id) { 999 } + let(:discoverable) { nil } + + it 'does not raise an error' do + expect { subject.perform(account_id) }.to_not raise_error + end + end + end +end