From 938823c73040f6b55896581daf5baf732f859f02 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 16 Feb 2021 23:14:15 +0300 Subject: [PATCH] [#3213] HashtagsTableMigrator state management refactoring & improvements (proper stats serialization etc.). --- lib/pleroma/data_migration.ex | 15 ++-- .../migrators/hashtags_table_migrator.ex | 87 ++++++++----------- .../hashtags_table_migrator/state.ex | 87 ++++++++++++++++--- 3 files changed, 122 insertions(+), 67 deletions(-) diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex index 64fa155ff..1377af16e 100644 --- a/lib/pleroma/data_migration.ex +++ b/lib/pleroma/data_migration.ex @@ -10,6 +10,7 @@ defmodule Pleroma.DataMigration do alias Pleroma.Repo import Ecto.Changeset + import Ecto.Query schema "data_migrations" do field(:name, :string) @@ -28,14 +29,12 @@ def changeset(data_migration, params \\ %{}) do |> unique_constraint(:name) end - def update(data_migration, params \\ %{}) do - data_migration - |> changeset(params) - |> Repo.update() - end - - def update_state(data_migration, new_state) do - update(data_migration, %{state: new_state}) + def update_one_by_id(id, params \\ %{}) do + with {1, _} <- + from(dm in DataMigration, where: dm.id == ^id) + |> Repo.update_all(set: params) do + :ok + end end def get_by_name(name) do diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 432c3401a..a226d9d29 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -11,16 +11,16 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do alias __MODULE__.State alias Pleroma.Config - alias Pleroma.DataMigration alias Pleroma.Hashtag alias Pleroma.Object alias Pleroma.Repo - defdelegate state(), to: State, as: :get - defdelegate put_stat(key, value), to: State, as: :put - defdelegate increment_stat(key, increment), to: State, as: :increment + defdelegate data_migration(), to: State - defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + defdelegate state(), to: State + defdelegate get_stat(key, value), to: State, as: :get_data_key + defdelegate put_stat(key, value), to: State, as: :put_data_key + defdelegate increment_stat(key, increment), to: State, as: :increment_data_key @reg_name {:global, __MODULE__} @@ -45,7 +45,7 @@ def init(_) do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - update_status(:init) + update_status(:pending) data_migration = data_migration() manual_migrations = Config.get([:instance, :manual_data_migrations], []) @@ -55,13 +55,13 @@ def handle_continue(:init_state, _state) do update_status(:noop) is_nil(data_migration) -> - update_status(:halt, "Data migration does not exist.") + update_status(:failed, "Data migration does not exist.") data_migration.state == :manual or data_migration.name in manual_migrations -> - update_status(:noop, "Data migration is in manual execution state.") + update_status(:manual, "Data migration is in manual execution state.") data_migration.state == :complete -> - handle_success(data_migration) + on_complete(data_migration) true -> send(self(), :migrate_hashtags) @@ -72,20 +72,15 @@ def handle_continue(:init_state, _state) do @impl true def handle_info(:migrate_hashtags, state) do - State.clear() + State.reinit() update_status(:running) put_stat(:started_at, NaiveDateTime.utc_now()) - data_migration = data_migration() - persistent_data = Map.take(data_migration.data, ["max_processed_id"]) + %{id: data_migration_id} = data_migration() + max_processed_id = get_stat(:max_processed_id, 0) - {:ok, data_migration} = - DataMigration.update(data_migration, %{state: :running, data: persistent_data}) - - Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") - - max_processed_id = data_migration.data["max_processed_id"] || 0 + Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") query() |> where([object], object.id > ^max_processed_id) @@ -104,7 +99,7 @@ def handle_info(:migrate_hashtags, state) do Repo.query( "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> "VALUES ($1, $2) ON CONFLICT DO NOTHING;", - [data_migration.id, failed_id] + [data_migration_id, failed_id] ) end @@ -112,7 +107,7 @@ def handle_info(:migrate_hashtags, state) do Repo.query( "DELETE FROM data_migration_failed_ids " <> "WHERE data_migration_id = $1 AND record_id = ANY($2)", - [data_migration.id, object_ids -- failed_ids] + [data_migration_id, object_ids -- failed_ids] ) max_object_id = Enum.at(object_ids, -1) @@ -120,14 +115,8 @@ def handle_info(:migrate_hashtags, state) do put_stat(:max_processed_id, max_object_id) increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) - - put_stat( - :records_per_second, - state()[:processed_count] / - Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1]) - ) - - persist_stats(data_migration) + put_stat(:records_per_second, records_per_second()) + _ = State.persist_to_db() # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) @@ -135,22 +124,25 @@ def handle_info(:migrate_hashtags, state) do end) |> Stream.run() - with 0 <- failures_count(data_migration.id) do + with 0 <- failures_count(data_migration_id) do _ = delete_non_create_activities_hashtags() - - {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) - - handle_success(data_migration) + set_complete() else _ -> - _ = DataMigration.update_state(data_migration, :failed) - update_status(:failed, "Please check data_migration_failed_ids records.") end {:noreply, state} end + defp records_per_second do + get_stat(:processed_count, 0) / Enum.max([running_time(), 1]) + end + + defp running_time do + NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now())) + end + @hashtags_objects_cleanup_query """ DELETE FROM hashtags_objects WHERE object_id IN (SELECT DISTINCT objects.id FROM objects @@ -169,6 +161,10 @@ def handle_info(:migrate_hashtags, state) do WHERE hashtags_objects.hashtag_id IS NULL); """ + @doc """ + Deletes `hashtags_objects` for legacy objects not asoociated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ def delete_non_create_activities_hashtags do {:ok, %{num_rows: hashtags_objects_count}} = Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) @@ -256,14 +252,7 @@ def count(force \\ false, timeout \\ :infinity) do end end - defp persist_stats(data_migration) do - runner_state = Map.drop(state(), [:status]) - _ = DataMigration.update(data_migration, %{data: runner_state}) - end - - defp handle_success(data_migration) do - update_status(:complete) - + defp on_complete(data_migration) do cond do data_migration.feature_lock -> :noop @@ -321,18 +310,18 @@ def force_continue do end def force_restart do - {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) + :ok = State.reset() force_continue() end - def force_complete do - {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) - - handle_success(data_migration) + def set_complete do + update_status(:complete) + _ = State.persist_to_db() + on_complete(data_migration()) end defp update_status(status, message \\ nil) do - put_stat(:status, status) + put_stat(:state, status) put_stat(:message, message) end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index 901563426..ed9848824 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -5,31 +5,98 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do use Agent - @init_state %{} + alias Pleroma.DataMigration + + defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + @reg_name {:global, __MODULE__} def start_link(_) do - Agent.start_link(fn -> @init_state end, name: @reg_name) + Agent.start_link(fn -> load_state_from_db() end, name: @reg_name) end - def clear do - Agent.update(@reg_name, fn _state -> @init_state end) + defp load_state_from_db do + data_migration = data_migration() + + data = + if data_migration do + Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end) + else + %{} + end + + %{ + data_migration_id: data_migration && data_migration.id, + data: data + } end - def get do + def persist_to_db do + %{data_migration_id: data_migration_id, data: data} = state() + + if data_migration_id do + DataMigration.update_one_by_id(data_migration_id, data: data) + else + {:error, :nil_data_migration_id} + end + end + + def reset do + %{data_migration_id: data_migration_id} = state() + + with false <- is_nil(data_migration_id), + :ok <- + DataMigration.update_one_by_id(data_migration_id, + state: :pending, + data: %{} + ) do + reinit() + else + true -> {:error, :nil_data_migration_id} + e -> e + end + end + + def reinit do + Agent.update(@reg_name, fn _state -> load_state_from_db() end) + end + + def state do Agent.get(@reg_name, & &1) end - def put(key, value) do + def get_data_key(key, default \\ nil) do + get_in(state(), [:data, key]) || default + end + + def put_data_key(key, value) do + _ = persist_non_data_change(key, value) + Agent.update(@reg_name, fn state -> - Map.put(state, key, value) + put_in(state, [:data, key], value) end) end - def increment(key, increment \\ 1) do + def increment_data_key(key, increment \\ 1) do Agent.update(@reg_name, fn state -> - updated_value = (state[key] || 0) + increment - Map.put(state, key, updated_value) + initial_value = get_in(state, [:data, key]) || 0 + updated_value = initial_value + increment + put_in(state, [:data, key], updated_value) end) end + + defp persist_non_data_change(:state, value) do + with true <- get_data_key(:state) != value, + true <- value in Pleroma.DataMigration.State.__valid_values__(), + %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do + DataMigration.update_one_by_id(data_migration_id, state: value) + else + false -> :ok + _ -> {:error, :nil_data_migration_id} + end + end + + defp persist_non_data_change(_, _) do + nil + end end