[#534] Optimized bulk publish ops to filter on reachability early. Instance refactoring.

This commit is contained in:
Ivan Tashkinov 2019-01-24 19:15:23 +03:00
parent 8654a591f0
commit 3e9399ec0b
5 changed files with 69 additions and 28 deletions

View File

@ -3,10 +3,19 @@ defmodule Pleroma.Instances do
@adapter Pleroma.Instances.Instance @adapter Pleroma.Instances.Instance
defdelegate filter_reachable(urls), to: @adapter
defdelegate reachable?(url), to: @adapter defdelegate reachable?(url), to: @adapter
defdelegate set_reachable(url), to: @adapter defdelegate set_reachable(url), to: @adapter
defdelegate set_unreachable(url, unreachable_since \\ nil), to: @adapter defdelegate set_unreachable(url, unreachable_since \\ nil), to: @adapter
def reachability_time_threshold, def reachability_time_threshold,
do: NaiveDateTime.add(NaiveDateTime.utc_now(), -30 * 24 * 3600, :second) do: NaiveDateTime.add(NaiveDateTime.utc_now(), -30 * 24 * 3600, :second)
def host(url_or_host) when is_binary(url_or_host) do
if url_or_host =~ ~r/^http/i do
URI.parse(url_or_host).host
else
url_or_host
end
end
end end

View File

@ -18,12 +18,35 @@ defmodule Pleroma.Instances.Instance do
timestamps() timestamps()
end end
def update_changeset(struct, params \\ %{}) do defdelegate host(url), to: Instances
def changeset(struct, params \\ %{}) do
struct struct
|> cast(params, [:host, :unreachable_since, :reachability_checked_at]) |> cast(params, [:host, :unreachable_since, :reachability_checked_at])
|> validate_required([:host])
|> unique_constraint(:host) |> unique_constraint(:host)
end end
def filter_reachable([]), do: []
def filter_reachable(urls) when is_list(urls) do
hosts =
urls
|> Enum.map(&(&1 && host(&1)))
|> Enum.filter(&(to_string(&1) != ""))
unreachable_hosts =
Repo.all(
from(i in Instance,
where:
i.host in ^hosts and i.unreachable_since <= ^Instances.reachability_time_threshold(),
select: i.host
)
)
Enum.filter(urls, &(&1 && host(&1) not in unreachable_hosts))
end
def reachable?(url) when is_binary(url) do def reachable?(url) when is_binary(url) do
!Repo.one( !Repo.one(
from(i in Instance, from(i in Instance,
@ -37,13 +60,13 @@ def reachable?(url) when is_binary(url) do
def reachable?(_), do: true def reachable?(_), do: true
def set_reachable(url) when is_binary(url) do def set_reachable(url) when is_binary(url) do
Repo.update_all( with host <- host(url),
from(i in Instance, where: i.host == ^host(url)), %Instance{} = existing_record <- Repo.get_by(Instance, %{host: host}) do
set: [ {:ok, _instance} =
unreachable_since: nil, existing_record
reachability_checked_at: DateTime.utc_now() |> changeset(%{unreachable_since: nil, reachability_checked_at: DateTime.utc_now()})
] |> Repo.update()
) end
end end
def set_reachable(_), do: {0, :noop} def set_reachable(_), do: {0, :noop}
@ -67,19 +90,17 @@ def set_unreachable(url, unreachable_since) when is_binary(url) do
do: Map.delete(changes, :unreachable_since), do: Map.delete(changes, :unreachable_since),
else: changes else: changes
{:ok, _instance} = Repo.update(update_changeset(existing_record, update_changes)) {:ok, _instance} =
existing_record
|> changeset(update_changes)
|> Repo.update()
else else
{:ok, _instance} = Repo.insert(update_changeset(%Instance{}, Map.put(changes, :host, host))) {:ok, _instance} =
%Instance{}
|> changeset(Map.put(changes, :host, host))
|> Repo.insert()
end end
end end
def set_unreachable(_, _), do: {0, :noop} def set_unreachable(_, _), do: {0, :noop}
defp host(url_or_host) do
if url_or_host =~ ~r/^http/i do
URI.parse(url_or_host).host
else
url_or_host
end
end
end end

View File

@ -689,7 +689,7 @@ def should_federate?(inbox, public) do
end end
def publish(actor, activity) do def publish(actor, activity) do
followers = remote_followers =
if actor.follower_address in activity.recipients do if actor.follower_address in activity.recipients do
{:ok, followers} = User.get_followers(actor) {:ok, followers} = User.get_followers(actor)
followers |> Enum.filter(&(!&1.local)) followers |> Enum.filter(&(!&1.local))
@ -700,13 +700,14 @@ def publish(actor, activity) do
public = is_public?(activity) public = is_public?(activity)
remote_inboxes = remote_inboxes =
(Pleroma.Web.Salmon.remote_users(activity) ++ followers) (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
|> Enum.filter(fn user -> User.ap_enabled?(user) end) |> Enum.filter(fn user -> User.ap_enabled?(user) end)
|> Enum.map(fn %{info: %{source_data: data}} -> |> Enum.map(fn %{info: %{source_data: data}} ->
(is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
end) end)
|> Enum.uniq() |> Enum.uniq()
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end) |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|> Instances.filter_reachable()
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data) json = Jason.encode!(data)

View File

@ -221,7 +221,13 @@ def publish(%{info: %{keys: keys}} = user, %{data: %{"type" => type}} = activity
{:ok, private, _} = keys_from_pem(keys) {:ok, private, _} = keys_from_pem(keys)
{:ok, feed} = encode(private, feed) {:ok, feed} = encode(private, feed)
remote_users(activity) remote_users = remote_users(activity)
salmon_urls = Enum.map(remote_users, & &1.info.salmon)
reachable_salmon_urls = Instances.filter_reachable(salmon_urls)
remote_users
|> Enum.filter(&(&1.info.salmon in reachable_salmon_urls))
|> Enum.each(fn remote_user -> |> Enum.each(fn remote_user ->
Task.start(fn -> Task.start(fn ->
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)

View File

@ -54,7 +54,12 @@ def verify(subscription, getter \\ &@httpoison.get/3) do
] ]
def publish(topic, user, %{data: %{"type" => type}} = activity) def publish(topic, user, %{data: %{"type" => type}} = activity)
when type in @supported_activities do when type in @supported_activities do
# TODO: Only send to still valid subscriptions. response =
user
|> FeedRepresenter.to_simple_form([activity], [user])
|> :xmerl.export_simple(:xmerl_xml)
|> to_string
query = query =
from( from(
sub in WebsubServerSubscription, sub in WebsubServerSubscription,
@ -64,13 +69,12 @@ def publish(topic, user, %{data: %{"type" => type}} = activity)
subscriptions = Repo.all(query) subscriptions = Repo.all(query)
Enum.each(subscriptions, fn sub -> callbacks = Enum.map(subscriptions, & &1.callback)
response = reachable_callbacks = Instances.filter_reachable(callbacks)
user
|> FeedRepresenter.to_simple_form([activity], [user])
|> :xmerl.export_simple(:xmerl_xml)
|> to_string
subscriptions
|> Enum.filter(&(&1.callback in reachable_callbacks))
|> Enum.each(fn sub ->
data = %{ data = %{
xml: response, xml: response,
topic: topic, topic: topic,