From 32d4630c9ca1c89e655b37b3b939d728f569dc9f Mon Sep 17 00:00:00 2001 From: William Pitcock Date: Mon, 13 May 2019 01:58:30 +0000 Subject: [PATCH 1/4] user: move initial post fetching to job queue --- lib/pleroma/user.ex | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 474de9ba5..a79da4dd8 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -552,8 +552,7 @@ def get_or_fetch_by_nickname(nickname) do with [_nick, _domain] <- String.split(nickname, "@"), {:ok, user} <- fetch_by_nickname(nickname) do if Pleroma.Config.get([:fetch_initial_posts, :enabled]) do - # TODO turn into job - {:ok, _} = Task.start(__MODULE__, :fetch_initial_posts, [user]) + fetch_initial_posts(user) end {:ok, user} @@ -564,15 +563,8 @@ def get_or_fetch_by_nickname(nickname) do end @doc "Fetch some posts when the user has just been federated with" - def fetch_initial_posts(user) do - pages = Pleroma.Config.get!([:fetch_initial_posts, :pages]) - - Enum.each( - # Insert all the posts in reverse order, so they're in the right order on the timeline - Enum.reverse(Utils.fetch_ordered_collection(user.info.source_data["outbox"], pages)), - &Pleroma.Web.Federator.incoming_ap_doc/1 - ) - end + def fetch_initial_posts(user), + do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() def get_followers_query(%User{} = user, nil) do @@ -1077,6 +1069,19 @@ def perform(:delete, %User{} = user) do delete_user_activities(user) end + @spec perform(atom(), User.t()) :: {:ok, User.t()} + def perform(:fetch_initial_posts, %User{} = user) do + pages = Pleroma.Config.get!([:fetch_initial_posts, :pages]) + + Enum.each( + # Insert all the posts in reverse order, so they're in the right order on the timeline + Enum.reverse(Utils.fetch_ordered_collection(user.info.source_data["outbox"], pages)), + &Pleroma.Web.Federator.incoming_ap_doc/1 + ) + + {:ok, user} + end + def delete_user_activities(%User{ap_id: ap_id} = user) do stream = ap_id @@ -1130,8 +1135,8 @@ def get_or_fetch_by_ap_id(ap_id) do resp = fetch_by_ap_id(ap_id) if should_fetch_initial do - with {:ok, %User{} = user} = resp do - {:ok, _} = Task.start(__MODULE__, :fetch_initial_posts, [user]) + with {:ok, %User{} = user} <- resp do + fetch_initial_posts(user) end end From 57d11ac9dbe4f3befd288cb0f59f368968474f93 Mon Sep 17 00:00:00 2001 From: William Pitcock Date: Mon, 13 May 2019 02:02:00 +0000 Subject: [PATCH 2/4] activitypub: move post rich media fetching to job queue --- lib/pleroma/web/activity_pub/activity_pub.ex | 4 +--- lib/pleroma/web/rich_media/helpers.ex | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 11777c220..d7c0ab4d3 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -133,9 +133,7 @@ def insert(map, local \\ true, fake \\ false) when is_map(map) do activity end - Task.start(fn -> - Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) - end) + PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) Notification.create_notifications(activity) diff --git a/lib/pleroma/web/rich_media/helpers.ex b/lib/pleroma/web/rich_media/helpers.ex index f67aaf58b..0162a5be9 100644 --- a/lib/pleroma/web/rich_media/helpers.ex +++ b/lib/pleroma/web/rich_media/helpers.ex @@ -34,4 +34,6 @@ def fetch_data_for_activity(%Activity{data: %{"type" => "Create"}} = activity) d end def fetch_data_for_activity(_), do: %{} + + def perform(:fetch, %Activity{} = activity), do: fetch_data_for_activity(activity) end From 69a9e0563cc441a772c4884d747bb755ddf58c45 Mon Sep 17 00:00:00 2001 From: William Pitcock Date: Mon, 13 May 2019 02:09:28 +0000 Subject: [PATCH 3/4] user: migrate follow/blocks import to job queue --- lib/pleroma/user.ex | 73 ++++++++++--------- .../controllers/util_controller.ex | 14 +++- 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index a79da4dd8..c94660de4 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -413,24 +413,6 @@ def following?(%User{} = follower, %User{} = followed) do Enum.member?(follower.following, followed.follower_address) end - def follow_import(%User{} = follower, followed_identifiers) - when is_list(followed_identifiers) do - Enum.map( - followed_identifiers, - fn followed_identifier -> - with {:ok, %User{} = followed} <- get_or_fetch(followed_identifier), - {:ok, follower} <- maybe_direct_follow(follower, followed), - {:ok, _} <- ActivityPub.follow(follower, followed) do - followed - else - err -> - Logger.debug("follow_import failed for #{followed_identifier} with: #{inspect(err)}") - err - end - end - ) - end - def locked?(%User{} = user) do user.info.locked || false end @@ -844,23 +826,6 @@ defp trigram_search_subquery(term) do ) end - def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do - Enum.map( - blocked_identifiers, - fn blocked_identifier -> - with {:ok, %User{} = blocked} <- get_or_fetch(blocked_identifier), - {:ok, blocker} <- block(blocker, blocked), - {:ok, _} <- ActivityPub.block(blocker, blocked) do - blocked - else - err -> - Logger.debug("blocks_import failed for #{blocked_identifier} with: #{inspect(err)}") - err - end - end - ) - end - def mute(muter, %User{ap_id: ap_id}) do info_cng = muter.info @@ -1082,6 +1047,44 @@ def perform(:fetch_initial_posts, %User{} = user) do {:ok, user} end + @spec perform(atom(), User.t(), list()) :: list() | {:error, any()} + def perform(:blocks_import, %User{} = blocker, blocked_identifiers) + when is_list(blocked_identifiers) do + Enum.map( + blocked_identifiers, + fn blocked_identifier -> + with {:ok, %User{} = blocked} <- get_or_fetch(blocked_identifier), + {:ok, blocker} <- block(blocker, blocked), + {:ok, _} <- ActivityPub.block(blocker, blocked) do + blocked + else + err -> + Logger.debug("blocks_import failed for #{blocked_identifier} with: #{inspect(err)}") + err + end + end + ) + end + + @spec perform(atom(), User.t(), list()) :: list() | {:error, any()} + def perform(:follow_import, %User{} = follower, followed_identifiers) + when is_list(followed_identifiers) do + Enum.map( + followed_identifiers, + fn followed_identifier -> + with {:ok, %User{} = followed} <- get_or_fetch(followed_identifier), + {:ok, follower} <- maybe_direct_follow(follower, followed), + {:ok, _} <- ActivityPub.follow(follower, followed) do + followed + else + err -> + Logger.debug("follow_import failed for #{followed_identifier} with: #{inspect(err)}") + err + end + end + ) + end + def delete_user_activities(%User{ap_id: ap_id} = user) do stream = ap_id diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex index c03f8ab3a..143960206 100644 --- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex +++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -310,7 +310,12 @@ def follow_import(%{assigns: %{user: follower}} = conn, %{"list" => list}) do String.split(line, ",") |> List.first() end) |> List.delete("Account address"), - {:ok, _} = Task.start(fn -> User.follow_import(follower, followed_identifiers) end) do + :ok <- + PleromaJobQueue.enqueue(:background, User, [ + :follow_import, + follower, + followed_identifiers + ]) do json(conn, "job started") end end @@ -321,7 +326,12 @@ def blocks_import(conn, %{"list" => %Plug.Upload{} = listfile}) do def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do with blocked_identifiers <- String.split(list), - {:ok, _} = Task.start(fn -> User.blocks_import(blocker, blocked_identifiers) end) do + :ok <- + PleromaJobQueue.enqueue(:background, User, [ + :blocks_import, + blocker, + blocked_identifiers + ]) do json(conn, "job started") end end From ac3a3abf6bfae5a6217e0a212abd6be0b4a17309 Mon Sep 17 00:00:00 2001 From: William Pitcock Date: Tue, 14 May 2019 15:07:38 +0000 Subject: [PATCH 4/4] clean up follow/block imports a little --- lib/pleroma/user.ex | 16 +++++++++++ .../controllers/util_controller.ex | 28 +++++++++---------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index c94660de4..417d57d72 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1085,6 +1085,22 @@ def perform(:follow_import, %User{} = follower, followed_identifiers) ) end + def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), + do: + PleromaJobQueue.enqueue(:background, __MODULE__, [ + :blocks_import, + blocker, + blocked_identifiers + ]) + + def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), + do: + PleromaJobQueue.enqueue(:background, __MODULE__, [ + :follow_import, + follower, + followed_identifiers + ]) + def delete_user_activities(%User{ap_id: ap_id} = user) do stream = ap_id diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex index 143960206..deaacd946 100644 --- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex +++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -309,13 +309,13 @@ def follow_import(%{assigns: %{user: follower}} = conn, %{"list" => list}) do Enum.map(lines, fn line -> String.split(line, ",") |> List.first() end) - |> List.delete("Account address"), - :ok <- - PleromaJobQueue.enqueue(:background, User, [ - :follow_import, - follower, - followed_identifiers - ]) do + |> List.delete("Account address") do + PleromaJobQueue.enqueue(:background, User, [ + :follow_import, + follower, + followed_identifiers + ]) + json(conn, "job started") end end @@ -325,13 +325,13 @@ def blocks_import(conn, %{"list" => %Plug.Upload{} = listfile}) do end def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do - with blocked_identifiers <- String.split(list), - :ok <- - PleromaJobQueue.enqueue(:background, User, [ - :blocks_import, - blocker, - blocked_identifiers - ]) do + with blocked_identifiers <- String.split(list) do + PleromaJobQueue.enqueue(:background, User, [ + :blocks_import, + blocker, + blocked_identifiers + ]) + json(conn, "job started") end end