diff --git a/config/config.exs b/config/config.exs index 00f9af797..7df0521f2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -850,17 +850,14 @@ config :pleroma, ConcurrentLimiter, [ {Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]}, - {Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy, [max_running: 5, max_waiting: 5]} + {Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy, [max_running: 5, max_waiting: 5]}, + {Pleroma.Search, [max_running: 20, max_waiting: 50]} ] config :pleroma, :search, provider: Pleroma.Search.Builtin -config :pleroma, :telemetry, - slow_queries_logging: [ - enabled: false, - min_duration: 500_000, - exclude_sources: [nil, "oban_jobs"] - ] +config :pleroma, Pleroma.Search, module: Pleroma.Activity.Search +config :pleroma, Pleroma.Search.Meilisearch, url: "http://127.0.0.1:7700/" # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. diff --git a/config/test.exs b/config/test.exs index a5bf3a4d1..445975205 100644 --- a/config/test.exs +++ b/config/test.exs @@ -134,6 +134,8 @@ ap_streamer: Pleroma.Web.ActivityPub.ActivityPubMock, logger: Pleroma.LoggerMock +config :pleroma, Pleroma.Search, module: Pleroma.Activity.Search + # Reduce recompilation time # https://dashbit.co/blog/speeding-up-re-compilation-of-elixir-projects config :phoenix, :plug_init_mode, :runtime diff --git a/lib/mix/tasks/pleroma/search/meilisearch.ex b/lib/mix/tasks/pleroma/search/meilisearch.ex new file mode 100644 index 000000000..2af8e5853 --- /dev/null +++ b/lib/mix/tasks/pleroma/search/meilisearch.ex @@ -0,0 +1,38 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Mix.Tasks.Pleroma.Search.Meilisearch do + import Mix.Pleroma + + import Ecto.Query + + def run(["index"]) do + start_pleroma() + + endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) + + Pleroma.Repo.chunk_stream( + from(Pleroma.Object, + limit: 200, + where: fragment("data->>'type' = 'Note'") and fragment("LENGTH(data->>'source') > 0") + ), + 100, + :batches + ) + |> Stream.map(fn objects -> + Enum.map(objects, fn object -> + data = object.data + %{id: object.id, source: data["source"], ap: data["id"]} + end) + end) + |> Stream.each(fn activities -> + {:ok, _} = + Pleroma.HTTP.post( + "#{endpoint}/indexes/objects/documents", + Jason.encode!(activities) + ) + end) + |> Stream.run() + end +end diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex index 4106feef6..10b1b0120 100644 --- a/lib/pleroma/activity.ex +++ b/lib/pleroma/activity.ex @@ -368,6 +368,7 @@ def restrict_deactivated_users(query) do end defdelegate search(user, query, options \\ []), to: Pleroma.Activity.Search + def add_to_index(_activity), do: nil def direct_conversation_id(activity, for_user) do alias Pleroma.Conversation.Participation diff --git a/lib/pleroma/activity/search.ex b/lib/pleroma/activity/search.ex index 09671f621..7152b0e46 100644 --- a/lib/pleroma/activity/search.ex +++ b/lib/pleroma/activity/search.ex @@ -57,7 +57,7 @@ def maybe_restrict_blocked(query, %User{} = user) do def maybe_restrict_blocked(query, _), do: query - defp restrict_public(q) do + def restrict_public(q) do from([a, o] in q, where: fragment("?->>'type' = 'Create'", a.data), where: ^Pleroma.Constants.as_public() in a.recipients @@ -124,7 +124,7 @@ defp query_with(q, :rum, search_query, :websearch) do ) end - defp maybe_restrict_local(q, user) do + def maybe_restrict_local(q, user) do limit = Pleroma.Config.get([:instance, :limit_to_local_content], :unauthenticated) case {limit, user} do @@ -137,7 +137,7 @@ defp maybe_restrict_local(q, user) do defp restrict_local(q), do: where(q, local: true) - defp maybe_fetch(activities, user, search_query) do + def maybe_fetch(activities, user, search_query) do with true <- Regex.match?(~r/https?:/, search_query), {:ok, object} <- Fetcher.fetch_object_from_id(search_query), %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index d37454d2c..be03cdffb 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -307,7 +307,11 @@ defp http_children(_, _), do: [] def limiters_setup do config = Config.get(ConcurrentLimiter, []) - [Pleroma.Web.RichMedia.Helpers, Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy] + [ + Pleroma.Web.RichMedia.Helpers, + Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy, + Pleroma.Search + ] |> Enum.each(fn module -> mod_config = Keyword.get(config, module, []) diff --git a/lib/pleroma/search/meilisearch.ex b/lib/pleroma/search/meilisearch.ex new file mode 100644 index 000000000..92e0d3429 --- /dev/null +++ b/lib/pleroma/search/meilisearch.ex @@ -0,0 +1,60 @@ +defmodule Pleroma.Search.Meilisearch do + require Logger + + alias Pleroma.Activity + + import Pleroma.Activity.Search + import Ecto.Query + + def search(user, query, options \\ []) do + limit = Enum.min([Keyword.get(options, :limit), 40]) + offset = Keyword.get(options, :offset, 0) + author = Keyword.get(options, :author) + + endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) + + {:ok, result} = + Pleroma.HTTP.post( + "#{endpoint}/indexes/objects/search", + Jason.encode!(%{q: query, offset: offset, limit: limit}) + ) + + hits = Jason.decode!(result.body)["hits"] |> Enum.map(& &1["ap"]) + + try do + hits + |> Activity.create_by_object_ap_id() + |> Activity.with_preloaded_object() + |> Activity.with_preloaded_object() + |> Activity.restrict_deactivated_users() + |> maybe_restrict_local(user) + |> maybe_restrict_author(author) + |> maybe_restrict_blocked(user) + |> maybe_fetch(user, query) + |> order_by([activity], desc: activity.id) + |> Pleroma.Repo.all() + rescue + _ -> maybe_fetch([], user, query) + end + end + + def add_to_index(activity) do + object = activity.object + + if activity.data["type"] == "Create" and not is_nil(object) and object.data["type"] == "Note" do + data = object.data + + endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) + + {:ok, result} = + Pleroma.HTTP.post( + "#{endpoint}/indexes/objects/documents", + Jason.encode!([%{id: object.id, source: data["source"], ap: data["id"]}]) + ) + + if not Map.has_key?(Jason.decode!(result.body), "updateId") do + Logger.error("Failed to add activity #{activity.id} to index: #{result.body}") + end + end + end +end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 756096952..615bee428 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -140,6 +140,12 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end) end) + search_module = Pleroma.Config.get([Pleroma.Search, :module]) + + ConcurrentLimiter.limit(Pleroma.Search, fn -> + Task.start(fn -> search_module.add_to_index(activity) end) + end) + {:ok, activity} else %Activity{} = activity -> diff --git a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex index 86ad388fd..aaf52cdc5 100644 --- a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex @@ -5,6 +5,7 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do use Pleroma.Web, :controller + alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.ControllerHelper alias Pleroma.Web.MastodonAPI.AccountView @@ -64,6 +65,106 @@ defp search_options(params, user) do |> Enum.filter(&elem(&1, 1)) end + defp resource_search(_, "accounts", query, options) do + accounts = with_fallback(fn -> User.search(query, options) end) + + AccountView.render("index.json", + users: accounts, + for: options[:for_user], + embed_relationships: options[:embed_relationships] + ) + end + + defp resource_search(_, "statuses", query, options) do + search_module = Pleroma.Config.get([Pleroma.Search, :module], Pleroma.Activity) + + statuses = with_fallback(fn -> search_module.search(options[:for_user], query, options) end) + + StatusView.render("index.json", + activities: statuses, + for: options[:for_user], + as: :activity + ) + end + + defp resource_search(:v2, "hashtags", query, options) do + tags_path = Endpoint.url() <> "/tag/" + + query + |> prepare_tags(options) + |> Enum.map(fn tag -> + %{name: tag, url: tags_path <> tag} + end) + end + + defp resource_search(:v1, "hashtags", query, options) do + prepare_tags(query, options) + end + + defp prepare_tags(query, options) do + tags = + query + |> preprocess_uri_query() + |> String.split(~r/[^#\w]+/u, trim: true) + |> Enum.uniq_by(&String.downcase/1) + + explicit_tags = Enum.filter(tags, fn tag -> String.starts_with?(tag, "#") end) + + tags = + if Enum.any?(explicit_tags) do + explicit_tags + else + tags + end + + tags = Enum.map(tags, fn tag -> String.trim_leading(tag, "#") end) + + tags = + if Enum.empty?(explicit_tags) && !options[:skip_joined_tag] do + add_joined_tag(tags) + else + tags + end + + Pleroma.Pagination.paginate(tags, options) + end + + defp add_joined_tag(tags) do + tags + |> Kernel.++([joined_tag(tags)]) + |> Enum.uniq_by(&String.downcase/1) + end + + # If `query` is a URI, returns last component of its path, otherwise returns `query` + defp preprocess_uri_query(query) do + if query =~ ~r/https?:\/\// do + query + |> String.trim_trailing("/") + |> URI.parse() + |> Map.get(:path) + |> String.split("/") + |> Enum.at(-1) + else + query + end + end + + defp joined_tag(tags) do + tags + |> Enum.map(fn tag -> String.capitalize(tag) end) + |> Enum.join() + end + + defp with_fallback(f, fallback \\ []) do + try do + f.() + rescue + error -> + Logger.error("#{__MODULE__} search error: #{inspect(error)}") + fallback + end + end + defp get_author(%{account_id: account_id}) when is_binary(account_id), do: User.get_cached_by_id(account_id)