diff --git a/lib/mix/tasks/pleroma/search.ex b/lib/mix/tasks/pleroma/search.ex index 25a277f88..2324561c1 100644 --- a/lib/mix/tasks/pleroma/search.ex +++ b/lib/mix/tasks/pleroma/search.ex @@ -8,32 +8,38 @@ defmodule Mix.Tasks.Pleroma.Search do import Ecto.Query alias Pleroma.Activity alias Pleroma.Pagination + alias Pleroma.User + alias Pleroma.Hashtag @shortdoc "Manages elasticsearch" - def run(["import_since", d | _rest]) do - start_pleroma() - {:ok, since, _} = DateTime.from_iso8601(d) - - from(a in Activity, where: not ilike(a.actor, "%/relay") and a.inserted_at > ^since) - |> Activity.with_preloaded_object() - |> Activity.with_preloaded_user_actor() - |> get_all - end - - def run(["import" | _rest]) do + def run(["import", "activities" | _rest]) do start_pleroma() from(a in Activity, where: not ilike(a.actor, "%/relay")) |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data)) |> Activity.with_preloaded_object() |> Activity.with_preloaded_user_actor() - |> get_all + |> get_all(:activities) end - defp get_all(query, max_id \\ nil) do - IO.puts(max_id) - params = %{limit: 2000} + def run(["import", "users" | _rest]) do + start_pleroma() + + from(u in User, where: u.nickname not in ["internal.fetch", "relay"]) + |> get_all(:users) + end + + def run(["import", "hashtags" | _rest]) do + start_pleroma() + + from(h in Hashtag) + |> Pleroma.Repo.all() + |> Pleroma.Elasticsearch.bulk_post(:hashtags) + end + + defp get_all(query, index, max_id \\ nil) do + params = %{limit: 1000} params = if max_id == nil do @@ -50,17 +56,9 @@ defp get_all(query, max_id \\ nil) do :ok else res - |> Enum.filter(fn x -> - t = - x.object - |> Map.get(:data, %{}) - |> Map.get("type", "") + |> Pleroma.Elasticsearch.bulk_post(index) - t == "Note" - end) - |> Pleroma.Elasticsearch.bulk_post(:activities) - - get_all(query, List.last(res).id) + get_all(query, index, List.last(res).id) end end end diff --git a/lib/pleroma/elasticsearch/document_mappings/note.ex b/lib/pleroma/elasticsearch/document_mappings/activity.ex similarity index 100% rename from lib/pleroma/elasticsearch/document_mappings/note.ex rename to lib/pleroma/elasticsearch/document_mappings/activity.ex diff --git a/lib/pleroma/elasticsearch/document_mappings/hashtag.ex b/lib/pleroma/elasticsearch/document_mappings/hashtag.ex new file mode 100644 index 000000000..1c47d1451 --- /dev/null +++ b/lib/pleroma/elasticsearch/document_mappings/hashtag.ex @@ -0,0 +1,10 @@ +defmodule Pleroma.Elasticsearch.DocumentMappings.Hashtag do + def id(obj), do: obj.id + + def encode(hashtag) do + %{ + hashtag: hashtag.name, + timestamp: hashtag.inserted_at + } + end +end diff --git a/lib/pleroma/elasticsearch/document_mappings/user.ex b/lib/pleroma/elasticsearch/document_mappings/user.ex new file mode 100644 index 000000000..0e57438f2 --- /dev/null +++ b/lib/pleroma/elasticsearch/document_mappings/user.ex @@ -0,0 +1,13 @@ +defmodule Pleroma.Elasticsearch.DocumentMappings.User do + def id(obj), do: obj.id + + def encode(%{actor_type: "Person"} = user) do + %{ + timestamp: user.inserted_at, + instance: URI.parse(user.ap_id).host, + nickname: user.nickname, + bio: user.bio, + display_name: user.name + } + end +end diff --git a/lib/pleroma/elasticsearch/store.ex b/lib/pleroma/elasticsearch/store.ex index 68e14bb64..2d8aeabc2 100644 --- a/lib/pleroma/elasticsearch/store.ex +++ b/lib/pleroma/elasticsearch/store.ex @@ -1,26 +1,32 @@ defmodule Pleroma.Elasticsearch do alias Pleroma.Activity + alias Pleroma.User alias Pleroma.Elasticsearch.DocumentMappings alias Pleroma.Config + require Logger defp url do Config.get([:elasticsearch, :url]) end - def put_by_id(id) do + defp enabled? do + Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch + end + + def put_by_id(:activity, id) do id |> Activity.get_by_id_with_object() |> maybe_put_into_elasticsearch() end - def maybe_put_into_elasticsearch({:ok, activity}) do - maybe_put_into_elasticsearch(activity) + def maybe_put_into_elasticsearch({:ok, item}) do + maybe_put_into_elasticsearch(item) end def maybe_put_into_elasticsearch( %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity ) do - if Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch do + if enabled?() do actor = Pleroma.Activity.user_actor(activity) activity @@ -29,23 +35,50 @@ def maybe_put_into_elasticsearch( end end + def maybe_put_into_elasticsearch(%User{} = user) do + if enabled?() do + put(user) + end + end + def maybe_put_into_elasticsearch(_) do {:ok, :skipped} end def put(%Activity{} = activity) do - Elastix.Document.index( + {:ok, _} = Elastix.Document.index( url(), "activities", "activity", DocumentMappings.Activity.id(activity), DocumentMappings.Activity.encode(activity) ) + {:ok, _} = bulk_post( + activity.object.hashtags, :hashtags + ) + end + + def put(%User{} = user) do + {:ok, _ } = Elastix.Document.index( + url(), + "users", + "user", + DocumentMappings.User.id(user), + DocumentMappings.User.encode(user) + ) end def bulk_post(data, :activities) do d = data + |> Enum.filter(fn x -> + t = + x.object + |> Map.get(:data, %{}) + |> Map.get("type", "") + + t == "Note" + end) |> Enum.map(fn d -> [ %{index: %{_id: DocumentMappings.Activity.id(d)}}, @@ -54,7 +87,7 @@ def bulk_post(data, :activities) do end) |> List.flatten() - Elastix.Bulk.post( + {:ok, %{body: %{"errors" => false}}} = Elastix.Bulk.post( url(), d, index: "activities", @@ -62,12 +95,92 @@ def bulk_post(data, :activities) do ) end - def search_activities(q) do - Elastix.Search.search( + def bulk_post(data, :users) do + d = + data + |> Enum.map(fn d -> + [ + %{index: %{_id: DocumentMappings.User.id(d)}}, + DocumentMappings.User.encode(d) + ] + end) + |> List.flatten() + + Elastix.Bulk.post( url(), - "activities", - ["activity"], - q + d, + index: "users", + type: "user" ) end + + def bulk_post(data, :hashtags) do + d = + data + |> Enum.map(fn d -> + [ + %{index: %{_id: DocumentMappings.Hashtag.id(d)}}, + DocumentMappings.Hashtag.encode(d) + ] + end) + |> List.flatten() + + Elastix.Bulk.post( + url(), + d, + index: "hashtags", + type: "hashtag" + ) + end + + def search(:raw, index, type, q) do + with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do + results = + raw_results + |> Map.get(:body, %{}) + |> Map.get("hits", %{}) + |> Map.get("hits", []) + + {:ok, results} + else + {:error, e} -> + Logger.error(e) + {:error, e} + end + end + + def search(:activities, q) do + with {:ok, results} <- search(:raw, "activities", "activity", q) do + results + |> Enum.map(fn result -> result["_id"] end) + |> Pleroma.Activity.all_by_ids_with_object() + else + e -> + Logger.error(e) + [] + end + end + + def search(:users, q) do + with {:ok, results} <- search(:raw, "users", "user", q) do + results + |> Enum.map(fn result -> result["_id"] end) + |> Pleroma.User.get_all_by_ids() + else + e -> + Logger.error(e) + [] + end + end + + def search(:hashtags, q) do + with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do + results + |> Enum.map(fn result -> result["_source"]["hashtag"] end) + else + e -> + Logger.error(e) + [] + end + end end diff --git a/lib/pleroma/search/elasticsearch.ex b/lib/pleroma/search/elasticsearch.ex index 3815ef766..c897a2ace 100644 --- a/lib/pleroma/search/elasticsearch.ex +++ b/lib/pleroma/search/elasticsearch.ex @@ -2,50 +2,13 @@ defmodule Pleroma.Search.Elasticsearch do @behaviour Pleroma.Search alias Pleroma.Web.MastodonAPI.StatusView + alias Pleroma.Web.MastodonAPI.AccountView alias Pleroma.Web.ActivityPub.Visibility + alias Pleroma.Search.Elasticsearch.Parsers + alias Pleroma.Web.Endpoint - defp to_es(term) when is_binary(term) do + defp es_query(:activity, query) do %{ - match: %{ - content: %{ - query: term, - operator: "AND" - } - } - } - end - - defp to_es({:quoted, term}), do: to_es(term) - - defp to_es({:filter, ["hashtag", query]}) do - %{ - term: %{ - hashtags: %{ - value: query - } - } - } - end - - defp to_es({:filter, [field, query]}) do - %{ - term: %{ - field => %{ - value: query - } - } - } - end - - defp parse(query) do - query - |> SearchParser.parse!() - |> Enum.map(&to_es/1) - end - - @impl Pleroma.Search - def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do - q = %{ size: 500, terminate_after: 500, timeout: "10s", @@ -54,34 +17,94 @@ def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) d ], query: %{ bool: %{ - must: parse(String.trim(query)) + must: Parsers.Activity.parse(query) } } } + end - out = Pleroma.Elasticsearch.search_activities(q) - - with {:ok, raw_results} <- out do - results = - raw_results - |> Map.get(:body, %{}) - |> Map.get("hits", %{}) - |> Map.get("hits", []) - |> Enum.map(fn result -> result["_id"] end) - |> Pleroma.Activity.all_by_ids_with_object() - |> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end) - |> Enum.reverse() - - %{ - "accounts" => [], - "hashtags" => [], - "statuses" => - StatusView.render("index.json", - activities: results, - for: user, - as: :activity - ) + defp es_query(:user, query) do + %{ + size: 50, + terminate_after: 50, + timeout: "10s", + sort: [ + %{"_timestamp" => "desc"} + ], + query: %{ + bool: %{ + must: Parsers.User.parse(query) + } } - end + } + end + + defp es_query(:hashtag, query) do + %{ + size: 50, + terminate_after: 50, + timeout: "10s", + query: %{ + bool: %{ + must: Parsers.Hashtag.parse(query) + } + } + } + end + + @impl Pleroma.Search + def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do + parsed_query = + query + |> String.trim() + |> SearchParser.parse!() + + activity_task = + Task.async(fn -> + q = es_query(:activity, parsed_query) + + Pleroma.Elasticsearch.search(:activities, q) + |> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end) + end) + + user_task = + Task.async(fn -> + q = es_query(:user, parsed_query) + + Pleroma.Elasticsearch.search(:users, q) + |> Enum.filter(fn x -> Pleroma.User.visible_for(x, user) == :visible end) + end) + + hashtag_task = + Task.async(fn -> + q = es_query(:hashtag, parsed_query) + + Pleroma.Elasticsearch.search(:hashtags, q) + end) + + activity_results = Task.await(activity_task) + user_results = Task.await(user_task) + hashtag_results = Task.await(hashtag_task) + + %{ + "accounts" => + AccountView.render("index.json", + users: user_results, + for: user + ), + "hashtags" => + Enum.map(hashtag_results, fn x -> + %{ + url: Endpoint.url() <> "/tag/" <> x, + name: x + } + end), + "statuses" => + StatusView.render("index.json", + activities: activity_results, + for: user, + as: :activity + ) + } end end diff --git a/lib/pleroma/search/elasticsearch/activity_parser.ex b/lib/pleroma/search/elasticsearch/activity_parser.ex new file mode 100644 index 000000000..0c124d537 --- /dev/null +++ b/lib/pleroma/search/elasticsearch/activity_parser.ex @@ -0,0 +1,38 @@ +defmodule Pleroma.Search.Elasticsearch.Parsers.Activity do + defp to_es(term) when is_binary(term) do + %{ + match: %{ + content: %{ + query: term, + operator: "AND" + } + } + } + end + + defp to_es({:quoted, term}), do: to_es(term) + + defp to_es({:filter, ["hashtag", query]}) do + %{ + term: %{ + hashtags: %{ + value: query + } + } + } + end + + defp to_es({:filter, [field, query]}) do + %{ + term: %{ + field => %{ + value: query + } + } + } + end + + def parse(q) do + Enum.map(q, &to_es/1) + end +end diff --git a/lib/pleroma/search/elasticsearch/hashtag_parser.ex b/lib/pleroma/search/elasticsearch/hashtag_parser.ex new file mode 100644 index 000000000..6e2801ed0 --- /dev/null +++ b/lib/pleroma/search/elasticsearch/hashtag_parser.ex @@ -0,0 +1,30 @@ +defmodule Pleroma.Search.Elasticsearch.Parsers.Hashtag do + defp to_es(term) when is_binary(term) do + %{ + term: %{ + hashtag: %{ + value: String.downcase(term), + } + } + } + end + + defp to_es({:quoted, term}), do: to_es(term) + + defp to_es({:filter, ["hashtag", query]}) do + %{ + term: %{ + hashtag: %{ + value: String.downcase(query) + } + } + } + end + + defp to_es({:filter, _}), do: nil + + def parse(q) do + Enum.map(q, &to_es/1) + |> Enum.filter(fn x -> x != nil end) + end +end diff --git a/lib/pleroma/search/elasticsearch/user_paser.ex b/lib/pleroma/search/elasticsearch/user_paser.ex new file mode 100644 index 000000000..96bfdc7d2 --- /dev/null +++ b/lib/pleroma/search/elasticsearch/user_paser.ex @@ -0,0 +1,53 @@ +defmodule Pleroma.Search.Elasticsearch.Parsers.User do + defp to_es(term) when is_binary(term) do + %{ + bool: %{ + minimum_should_match: 1, + should: [ + %{ + match: %{ + bio: %{ + query: term, + operator: "AND" + } + } + }, + %{ + term: %{ + nickname: %{ + value: term + } + } + }, + %{ + match: %{ + display_name: %{ + query: term, + operator: "AND" + } + } + } + ] + } + } + end + + defp to_es({:quoted, term}), do: to_es(term) + + defp to_es({:filter, ["user", query]}) do + %{ + term: %{ + nickname: %{ + value: query + } + } + } + end + + defp to_es({:filter, _}), do: nil + + def parse(q) do + Enum.map(q, &to_es/1) + |> Enum.filter(fn x -> x != nil end) + end +end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 8e40dfc0d..a2cf22e55 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1088,6 +1088,7 @@ def update_and_set_cache(struct, params) do def update_and_set_cache(changeset) do with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do + Pleroma.Elasticsearch.maybe_put_into_elasticsearch(user) set_cache(user) end end diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index 91e9c72e0..a93961922 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -538,7 +538,7 @@ defp add_notifications(meta, notifications) do @impl true def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do - Pleroma.Elasticsearch.put_by_id(activity.id) + Pleroma.Elasticsearch.put_by_id(:activity, activity.id) end def handle_after_transaction(%Pleroma.Activity{}) do diff --git a/priv/es-mappings/activity.json b/priv/es-mappings/activity.json new file mode 100644 index 000000000..e476fd59f --- /dev/null +++ b/priv/es-mappings/activity.json @@ -0,0 +1,21 @@ +{ + "properties": { + "_timestamp": { + "type": "date", + "index": true + }, + "instance": { + "type": "keyword" + }, + "content": { + "type": "text" + }, + "hashtags": { + "type": "keyword" + }, + "user": { + "type": "text" + } + } +} + diff --git a/priv/es-mappings/hashtag.json b/priv/es-mappings/hashtag.json new file mode 100644 index 000000000..3a0ade8f8 --- /dev/null +++ b/priv/es-mappings/hashtag.json @@ -0,0 +1,11 @@ +{ + "properties": { + "timestamp": { + "type": "date", + "index": true + }, + "hashtag": { + "type": "text" + } + } +} diff --git a/priv/es-mappings/user.json b/priv/es-mappings/user.json new file mode 100644 index 000000000..77cc66a4b --- /dev/null +++ b/priv/es-mappings/user.json @@ -0,0 +1,18 @@ +{ + "properties": { + "timestamp": { + "type": "date", + "index": true + }, + "instance": { + "type": "keyword" + }, + "nickname": { + "type": "text" + }, + "bio": { + "type": "text" + } + } +} +