From f5f267fa764f53ef617bc9504c7ecb68b5d3d7ab Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 14 Jan 2021 22:41:27 +0300 Subject: [PATCH] [#3213] Refactoring of HashtagsTableMigrator. --- .../migrators/hashtags_table_migrator.ex | 98 ++++++++++--------- .../hashtags_table_migrator/state.ex | 26 +++++ .../20190711042021_create_safe_jsonb_set.exs | 2 +- 3 files changed, 79 insertions(+), 47 deletions(-) create mode 100644 lib/pleroma/migrators/hashtags_table_migrator/state.ex diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index a7e3de542..9f1a00f9c 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -3,39 +3,13 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Migrators.HashtagsTableMigrator do - defmodule State do - use Agent - - @init_state %{} - - def start_link(_) do - Agent.start_link(fn -> @init_state end, name: __MODULE__) - end - - def get do - Agent.get(__MODULE__, & &1) - end - - def put(key, value) do - Agent.update(__MODULE__, fn state -> - Map.put(state, key, value) - end) - end - - def increment(key, increment \\ 1) do - Agent.update(__MODULE__, fn state -> - updated_value = (state[key] || 0) + increment - Map.put(state, key, updated_value) - end) - end - end - use GenServer require Logger import Ecto.Query + alias __MODULE__.State alias Pleroma.Config alias Pleroma.DataMigration alias Pleroma.Hashtag @@ -43,13 +17,23 @@ def increment(key, increment \\ 1) do alias Pleroma.Repo defdelegate state(), to: State, as: :get - defdelegate put_state(key, value), to: State, as: :put - defdelegate increment_state(key, increment), to: State, as: :increment + defdelegate put_stat(key, value), to: State, as: :put + defdelegate increment_stat(key, increment), to: State, as: :increment defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + @reg_name {:global, __MODULE__} + + def whereis, do: GenServer.whereis(@reg_name) + def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) + case whereis() do + nil -> + GenServer.start_link(__MODULE__, nil, name: @reg_name) + + pid -> + {:ok, pid} + end end @impl true @@ -61,21 +45,22 @@ def init(_) do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - put_state(:status, :init) + put_stat(:status, :init) dm = data_migration() + manual_migrations = Config.get([:instance, :manual_data_migrations], []) cond do Config.get(:env) == :test -> - put_state(:status, :noop) + put_stat(:status, :noop) is_nil(dm) -> - put_state(:status, :halt) - put_state(:message, "Data migration does not exist.") + put_stat(:status, :halt) + put_stat(:message, "Data migration does not exist.") - dm.state == :manual -> - put_state(:status, :noop) - put_state(:message, "Data migration is in manual execution state.") + dm.state == :manual or dm.name in manual_migrations -> + put_stat(:status, :noop) + put_stat(:message, "Data migration is in manual execution state.") dm.state == :complete -> handle_success() @@ -91,8 +76,12 @@ def handle_continue(:init_state, _state) do def handle_info(:migrate_hashtags, state) do data_migration = data_migration() - {:ok, data_migration} = DataMigration.update_state(data_migration, :running) - put_state(:status, :running) + persistent_data = Map.take(data_migration.data, ["max_processed_id"]) + + {:ok, data_migration} = + DataMigration.update(data_migration, %{state: :running, data: persistent_data}) + + put_stat(:status, :running) Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") @@ -137,10 +126,12 @@ def handle_info(:migrate_hashtags, state) do ) max_object_id = Enum.at(object_ids, -1) - _ = DataMigration.update(data_migration, %{data: %{"max_processed_id" => max_object_id}}) - increment_state(:processed_count, length(object_ids)) - increment_state(:failed_count, length(failed_ids)) + put_stat(:max_processed_id, max_object_id) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + + persist_stats(data_migration) # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) @@ -153,14 +144,15 @@ def handle_info(:migrate_hashtags, state) do "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", [data_migration.id] ) do - put_state(:status, :complete) _ = DataMigration.update_state(data_migration, :complete) handle_success() else _ -> - put_state(:status, :failed) - put_state(:message, "Please check data_migration_failed_ids records.") + _ = DataMigration.update_state(data_migration, :failed) + + put_stat(:status, :failed) + put_stat(:message, "Please check data_migration_failed_ids records.") end {:noreply, state} @@ -199,8 +191,13 @@ defp transfer_object_hashtags(object) do end) end + defp persist_stats(data_migration) do + runner_state = Map.drop(state(), [:status]) + _ = DataMigration.update(data_migration, %{data: runner_state}) + end + defp handle_success do - put_state(:status, :complete) + put_stat(:status, :complete) unless Config.improved_hashtag_timeline() do Config.put(Config.improved_hashtag_timeline_path(), true) @@ -208,4 +205,13 @@ defp handle_success do :ok end + + def force_continue do + send(whereis(), :migrate_hashtags) + end + + def force_restart do + {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) + force_continue() + end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex new file mode 100644 index 000000000..79926892c --- /dev/null +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -0,0 +1,26 @@ +defmodule Pleroma.Migrators.HashtagsTableMigrator.State do + use Agent + + @init_state %{} + + def start_link(_) do + Agent.start_link(fn -> @init_state end, name: __MODULE__) + end + + def get do + Agent.get(__MODULE__, & &1) + end + + def put(key, value) do + Agent.update(__MODULE__, fn state -> + Map.put(state, key, value) + end) + end + + def increment(key, increment \\ 1) do + Agent.update(__MODULE__, fn state -> + updated_value = (state[key] || 0) + increment + Map.put(state, key, updated_value) + end) + end +end diff --git a/priv/repo/migrations/20190711042021_create_safe_jsonb_set.exs b/priv/repo/migrations/20190711042021_create_safe_jsonb_set.exs index 43d616705..bfac09f9e 100644 --- a/priv/repo/migrations/20190711042021_create_safe_jsonb_set.exs +++ b/priv/repo/migrations/20190711042021_create_safe_jsonb_set.exs @@ -9,7 +9,7 @@ def change do begin result := jsonb_set(target, path, coalesce(new_value, 'null'::jsonb), create_missing); if result is NULL then - raise 'jsonb_set tried to wipe the object, please report this incindent to Pleroma bug tracker. https://git.pleroma.social/pleroma/pleroma/issues/new'; + raise 'jsonb_set tried to wipe the object, please report this incident to Pleroma bug tracker. https://git.pleroma.social/pleroma/pleroma/issues/new'; return target; else return result;