From bac70a2bc155e8c714b0ea3cf83acb9583b71ec0 Mon Sep 17 00:00:00 2001 From: Ekaterina Vaartis Date: Mon, 22 Nov 2021 21:39:54 +0300 Subject: [PATCH] Implement suggestions from the Meilisearch MR - Index unlisted posts - Move version check outside of the streaming and only do it once - Use a PUT request instead of checking manually if there is need to insert - Add error handling, sort of --- lib/mix/tasks/pleroma/search/meilisearch.ex | 84 +++++++++----------- lib/pleroma/search/meilisearch.ex | 85 ++++++++++++++------- 2 files changed, 93 insertions(+), 76 deletions(-) diff --git a/lib/mix/tasks/pleroma/search/meilisearch.ex b/lib/mix/tasks/pleroma/search/meilisearch.ex index 62ace7e39..6730a99a9 100644 --- a/lib/mix/tasks/pleroma/search/meilisearch.ex +++ b/lib/mix/tasks/pleroma/search/meilisearch.ex @@ -3,38 +3,40 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Mix.Tasks.Pleroma.Search.Meilisearch do - require Logger require Pleroma.Constants import Mix.Pleroma import Ecto.Query - import Pleroma.Search.Meilisearch, only: [meili_post!: 2, meili_delete!: 1, meili_get!: 1] + import Pleroma.Search.Meilisearch, + only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1] - def run(["index" | args]) do + def run(["index"]) do start_pleroma() - is_reindex = "--reindex" in args + {:ok, _} = + meili_post( + "/indexes/objects/settings/ranking-rules", + [ + "desc(published)", + "words", + "exactness", + "proximity", + "wordsPosition", + "typo", + "attribute" + ] + ) - meili_post!( - "/indexes/objects/settings/ranking-rules", - [ - "desc(published)", - "words", - "exactness", - "proximity", - "wordsPosition", - "typo", - "attribute" - ] - ) + {:ok, _} = + meili_post( + "/indexes/objects/settings/searchable-attributes", + [ + "content" + ] + ) - meili_post!( - "/indexes/objects/settings/searchable-attributes", - [ - "content" - ] - ) + IO.puts("Created indices. Starting to insert posts.") chunk_size = 10_000 @@ -42,11 +44,11 @@ def run(["index" | args]) do fn -> query = from(Pleroma.Object, - # Only index public posts which are notes and have some text + # Only index public and unlisted posts which are notes and have some text where: fragment("data->>'type' = 'Note'") and - fragment("LENGTH(data->>'content') > 0") and - fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()), + (fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or + fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())), order_by: [desc: fragment("data->'published'")] ) @@ -70,34 +72,18 @@ def run(["index" | args]) do {[objects], new_acc} end) |> Stream.each(fn objects -> - objects = - objects - |> Enum.filter(fn o -> - if is_reindex do - result = meili_get!("/indexes/objects/documents/#{o.id}") - - # With >= 0.24.0 the name for "errorCode" is just "code" - error_code_key = - if meili_get!("/version")["pkgVersion"] |> Version.match?(">= 0.24.0"), - do: "code", - else: "errorCode" - - # Filter out the already indexed documents. - # This is true when the document does not exist - result[error_code_key] == "document_not_found" - else - true - end - end) - result = - meili_post!( + meili_put( "/indexes/objects/documents", objects ) - if not Map.has_key?(result, "updateId") do - IO.puts("Failed to index: #{inspect(result)}") + with {:ok, res} <- result do + if not Map.has_key?(res, "updateId") do + IO.puts("\nFailed to index: #{inspect(result)}") + end + else + e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}") end end) |> Stream.run() @@ -137,7 +123,7 @@ def run(["show-private-key", master_key]) do def run(["stats"]) do start_pleroma() - result = meili_get!("/indexes/objects/stats") + {:ok, result} = meili_get("/indexes/objects/stats") IO.puts("Number of entries: #{result["numberOfDocuments"]}") IO.puts("Indexing? #{result["isIndexing"]}") end diff --git a/lib/pleroma/search/meilisearch.ex b/lib/pleroma/search/meilisearch.ex index fa9e27b03..21b44de86 100644 --- a/lib/pleroma/search/meilisearch.ex +++ b/lib/pleroma/search/meilisearch.ex @@ -14,29 +14,50 @@ defp meili_headers do if is_nil(private_key), do: [], else: [{"X-Meili-API-Key", private_key}] end - def meili_get!(path) do + def meili_get(path) do endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) - {:ok, result} = + result = Pleroma.HTTP.get( Path.join(endpoint, path), meili_headers() ) - Jason.decode!(result.body) + with {:ok, res} <- result do + {:ok, Jason.decode!(res.body)} + end end - def meili_post!(path, params) do + def meili_post(path, params) do endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) - {:ok, result} = + result = Pleroma.HTTP.post( Path.join(endpoint, path), Jason.encode!(params), meili_headers() ) - Jason.decode!(result.body) + with {:ok, res} <- result do + {:ok, Jason.decode!(res.body)} + end + end + + def meili_put(path, params) do + endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) + + result = + Pleroma.HTTP.request( + :put, + Path.join(endpoint, path), + Jason.encode!(params), + meili_headers(), + [] + ) + + with {:ok, res} <- result do + {:ok, Jason.decode!(res.body)} + end end def meili_delete!(path) do @@ -57,34 +78,40 @@ def search(user, query, options \\ []) do offset = Keyword.get(options, :offset, 0) author = Keyword.get(options, :author) - result = - meili_post!( + res = + meili_post( "/indexes/objects/search", %{q: query, offset: offset, limit: limit} ) - hits = result["hits"] |> Enum.map(& &1["ap"]) + with {:ok, result} <- res do + hits = result["hits"] |> Enum.map(& &1["ap"]) - try do - hits - |> Activity.create_by_object_ap_id() - |> Activity.with_preloaded_object() - |> Activity.with_preloaded_object() - |> Activity.restrict_deactivated_users() - |> maybe_restrict_local(user) - |> maybe_restrict_author(author) - |> maybe_restrict_blocked(user) - |> maybe_fetch(user, query) - |> order_by([object: obj], desc: obj.data["published"]) - |> Pleroma.Repo.all() - rescue - _ -> maybe_fetch([], user, query) + try do + hits + |> Activity.create_by_object_ap_id() + |> Activity.with_preloaded_object() + |> Activity.with_preloaded_object() + |> Activity.restrict_deactivated_users() + |> maybe_restrict_local(user) + |> maybe_restrict_author(author) + |> maybe_restrict_blocked(user) + |> maybe_fetch(user, query) + |> order_by([object: obj], desc: obj.data["published"]) + |> Pleroma.Repo.all() + rescue + _ -> maybe_fetch([], user, query) + end end end def object_to_search_data(object) do + # Only index public or unlisted Notes if not is_nil(object) and object.data["type"] == "Note" and - Pleroma.Constants.as_public() in object.data["to"] do + not is_nil(object.data["content"]) and + (Pleroma.Constants.as_public() in object.data["to"] or + Pleroma.Constants.as_public() in object.data["cc"]) and + String.length(object.data["content"]) > 1 do data = object.data content_str = @@ -117,13 +144,17 @@ def add_to_index(activity) do if activity.data["type"] == "Create" and maybe_search_data do result = - meili_post!( + meili_put( "/indexes/objects/documents", [maybe_search_data] ) - if not Map.has_key?(result, "updateId") do - Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}") + with {:ok, res} <- result, + true <- Map.has_key?(res, "updateId") do + # Do nothing + else + _ -> + Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}") end end end