Merge branch 'develop' into issue/733
This commit is contained in:
commit
7dd51652f9
4
.gitignore
vendored
4
.gitignore
vendored
@ -43,3 +43,7 @@ docs/generated_config.md
|
||||
# Code test coverage
|
||||
/cover
|
||||
/Elixir.*.coverdata
|
||||
|
||||
.idea
|
||||
pleroma.iml
|
||||
|
||||
|
@ -276,7 +276,7 @@
|
||||
max_account_fields: 10,
|
||||
max_remote_account_fields: 20,
|
||||
account_field_name_length: 512,
|
||||
account_field_value_length: 512,
|
||||
account_field_value_length: 2048,
|
||||
external_user_synchronization: true
|
||||
|
||||
config :pleroma, :markup,
|
||||
@ -331,6 +331,10 @@
|
||||
follow_handshake_timeout: 500,
|
||||
sign_object_fetches: true
|
||||
|
||||
config :pleroma, :streamer,
|
||||
workers: 3,
|
||||
overflow_workers: 2
|
||||
|
||||
config :pleroma, :user, deny_follow_blocked: true
|
||||
|
||||
config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default
|
||||
|
@ -878,9 +878,9 @@
|
||||
%{
|
||||
key: :account_field_value_length,
|
||||
type: :integer,
|
||||
description: "An account field value maximum length (default: 512)",
|
||||
description: "An account field value maximum length (default: 2048)",
|
||||
suggestions: [
|
||||
512
|
||||
2048
|
||||
]
|
||||
},
|
||||
%{
|
||||
|
@ -135,7 +135,7 @@ config :pleroma, Pleroma.Emails.Mailer,
|
||||
* `max_account_fields`: The maximum number of custom fields in the user profile (default: `10`)
|
||||
* `max_remote_account_fields`: The maximum number of custom fields in the remote user profile (default: `20`)
|
||||
* `account_field_name_length`: An account field name maximum length (default: `512`)
|
||||
* `account_field_value_length`: An account field value maximum length (default: `512`)
|
||||
* `account_field_value_length`: An account field value maximum length (default: `2048`)
|
||||
* `external_user_synchronization`: Enabling following/followers counters synchronization for external users.
|
||||
|
||||
|
||||
|
63
lib/pleroma/activity/ir/topics.ex
Normal file
63
lib/pleroma/activity/ir/topics.ex
Normal file
@ -0,0 +1,63 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Activity.Ir.Topics do
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Web.ActivityPub.Visibility
|
||||
|
||||
def get_activity_topics(activity) do
|
||||
activity
|
||||
|> Object.normalize()
|
||||
|> generate_topics(activity)
|
||||
|> List.flatten()
|
||||
end
|
||||
|
||||
defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
|
||||
[]
|
||||
end
|
||||
|
||||
defp generate_topics(object, activity) do
|
||||
["user", "list"] ++ visibility_tags(object, activity)
|
||||
end
|
||||
|
||||
defp visibility_tags(object, activity) do
|
||||
case Visibility.get_visibility(activity) do
|
||||
"public" ->
|
||||
if activity.local do
|
||||
["public", "public:local"]
|
||||
else
|
||||
["public"]
|
||||
end
|
||||
|> item_creation_tags(object, activity)
|
||||
|
||||
"direct" ->
|
||||
["direct"]
|
||||
|
||||
_ ->
|
||||
[]
|
||||
end
|
||||
end
|
||||
|
||||
defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
|
||||
tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
|
||||
end
|
||||
|
||||
defp item_creation_tags(tags, _, _) do
|
||||
tags
|
||||
end
|
||||
|
||||
defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
|
||||
tags
|
||||
|> Enum.filter(&is_bitstring(&1))
|
||||
|> Enum.map(fn tag -> "hashtag:" <> tag end)
|
||||
end
|
||||
|
||||
defp hashtags_to_topics(_), do: []
|
||||
|
||||
defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
|
||||
|
||||
defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
|
||||
|
||||
defp attachment_topics(_object, _act), do: ["public:media"]
|
||||
end
|
@ -43,23 +43,9 @@ def start(_type, _args) do
|
||||
hackney_pool_children() ++
|
||||
[
|
||||
Pleroma.Stats,
|
||||
{Oban, Pleroma.Config.get(Oban)},
|
||||
%{
|
||||
id: :web_push_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
|
||||
restart: :temporary
|
||||
},
|
||||
%{
|
||||
id: :federator_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
|
||||
restart: :temporary
|
||||
},
|
||||
%{
|
||||
id: :internal_fetch_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]},
|
||||
restart: :temporary
|
||||
}
|
||||
{Oban, Pleroma.Config.get(Oban)}
|
||||
] ++
|
||||
task_children(@env) ++
|
||||
oauth_cleanup_child(oauth_cleanup_enabled?()) ++
|
||||
streamer_child(@env) ++
|
||||
chat_child(@env, chat_enabled?()) ++
|
||||
@ -141,7 +127,7 @@ defp oauth_cleanup_enabled?,
|
||||
defp streamer_child(:test), do: []
|
||||
|
||||
defp streamer_child(_) do
|
||||
[Pleroma.Web.Streamer]
|
||||
[Pleroma.Web.Streamer.supervisor()]
|
||||
end
|
||||
|
||||
defp oauth_cleanup_child(true),
|
||||
@ -163,4 +149,39 @@ defp hackney_pool_children do
|
||||
:hackney_pool.child_spec(pool, options)
|
||||
end
|
||||
end
|
||||
|
||||
defp task_children(:test) do
|
||||
[
|
||||
%{
|
||||
id: :web_push_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
|
||||
restart: :temporary
|
||||
},
|
||||
%{
|
||||
id: :federator_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
|
||||
restart: :temporary
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
defp task_children(_) do
|
||||
[
|
||||
%{
|
||||
id: :web_push_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
|
||||
restart: :temporary
|
||||
},
|
||||
%{
|
||||
id: :federator_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
|
||||
restart: :temporary
|
||||
},
|
||||
%{
|
||||
id: :internal_fetch_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]},
|
||||
restart: :temporary
|
||||
}
|
||||
]
|
||||
end
|
||||
end
|
||||
|
51
lib/pleroma/delivery.ex
Normal file
51
lib/pleroma/delivery.ex
Normal file
@ -0,0 +1,51 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Delivery do
|
||||
use Ecto.Schema
|
||||
|
||||
alias Pleroma.Delivery
|
||||
alias Pleroma.FlakeId
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.User
|
||||
alias Pleroma.User
|
||||
|
||||
import Ecto.Changeset
|
||||
import Ecto.Query
|
||||
|
||||
schema "deliveries" do
|
||||
belongs_to(:user, User, type: FlakeId)
|
||||
belongs_to(:object, Object)
|
||||
end
|
||||
|
||||
def changeset(delivery, params \\ %{}) do
|
||||
delivery
|
||||
|> cast(params, [:user_id, :object_id])
|
||||
|> validate_required([:user_id, :object_id])
|
||||
|> foreign_key_constraint(:object_id)
|
||||
|> foreign_key_constraint(:user_id)
|
||||
|> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index)
|
||||
end
|
||||
|
||||
def create(object_id, user_id) do
|
||||
%Delivery{}
|
||||
|> changeset(%{user_id: user_id, object_id: object_id})
|
||||
|> Repo.insert(on_conflict: :nothing)
|
||||
end
|
||||
|
||||
def get(object_id, user_id) do
|
||||
from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
# A hack because user delete activities have a fake id for whatever reason
|
||||
# TODO: Get rid of this
|
||||
def delete_all_by_object_id("pleroma:fake_object_id"), do: {0, []}
|
||||
|
||||
def delete_all_by_object_id(object_id) do
|
||||
from(d in Delivery, where: d.object_id == ^object_id)
|
||||
|> Repo.delete_all()
|
||||
end
|
||||
end
|
@ -14,7 +14,7 @@ defmodule Pleroma.FlakeId do
|
||||
|
||||
@type t :: binary
|
||||
|
||||
@behaviour Ecto.Type
|
||||
use Ecto.Type
|
||||
use GenServer
|
||||
require Logger
|
||||
alias __MODULE__
|
||||
|
@ -210,8 +210,10 @@ def create_notification(%Activity{} = activity, %User{} = user) do
|
||||
unless skip?(activity, user) do
|
||||
notification = %Notification{user_id: user.id, activity: activity}
|
||||
{:ok, notification} = Repo.insert(notification)
|
||||
Streamer.stream("user", notification)
|
||||
Streamer.stream("user:notification", notification)
|
||||
|
||||
["user", "user:notification"]
|
||||
|> Streamer.stream(notification)
|
||||
|
||||
Push.send(notification)
|
||||
notification
|
||||
end
|
||||
|
@ -20,6 +20,7 @@ defmodule Pleroma.Plugs.Cache do
|
||||
|
||||
- `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`.
|
||||
- `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`.
|
||||
- `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second.
|
||||
|
||||
Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct:
|
||||
|
||||
@ -56,6 +57,11 @@ def call(%{method: "GET"} = conn, opts) do
|
||||
{:ok, nil} ->
|
||||
cache_resp(conn, opts)
|
||||
|
||||
{:ok, {content_type, body, tracking_fun_data}} ->
|
||||
conn = opts.tracking_fun.(conn, tracking_fun_data)
|
||||
|
||||
send_cached(conn, {content_type, body})
|
||||
|
||||
{:ok, record} ->
|
||||
send_cached(conn, record)
|
||||
|
||||
@ -88,9 +94,17 @@ defp cache_resp(conn, opts) do
|
||||
ttl = Map.get(conn.assigns, :cache_ttl, opts.ttl)
|
||||
key = cache_key(conn, opts)
|
||||
content_type = content_type(conn)
|
||||
record = {content_type, body}
|
||||
|
||||
Cachex.put(:web_resp_cache, key, record, ttl: ttl)
|
||||
conn =
|
||||
unless opts[:tracking_fun] do
|
||||
Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl)
|
||||
conn
|
||||
else
|
||||
tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil)
|
||||
Cachex.put(:web_resp_cache, key, {content_type, body, tracking_fun_data}, ttl: ttl)
|
||||
|
||||
opts.tracking_fun.(conn, tracking_fun_data)
|
||||
end
|
||||
|
||||
put_resp_header(conn, "x-cache", "MISS from Pleroma")
|
||||
|
||||
|
@ -15,7 +15,8 @@ def call(%{assigns: %{valid_signature: true}} = conn, _opts) do
|
||||
end
|
||||
|
||||
def call(conn, _opts) do
|
||||
[signature | _] = get_req_header(conn, "signature")
|
||||
headers = get_req_header(conn, "signature")
|
||||
signature = Enum.at(headers, 0)
|
||||
|
||||
if signature do
|
||||
# set (request-target) header to the appropriate value
|
||||
|
@ -11,6 +11,7 @@ defmodule Pleroma.User do
|
||||
alias Comeonin.Pbkdf2
|
||||
alias Ecto.Multi
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Delivery
|
||||
alias Pleroma.Keys
|
||||
alias Pleroma.Notification
|
||||
alias Pleroma.Object
|
||||
@ -62,6 +63,7 @@ defmodule Pleroma.User do
|
||||
field(:last_digest_emailed_at, :naive_datetime)
|
||||
has_many(:notifications, Notification)
|
||||
has_many(:registrations, Registration)
|
||||
has_many(:deliveries, Delivery)
|
||||
embeds_one(:info, User.Info)
|
||||
|
||||
timestamps()
|
||||
@ -148,6 +150,7 @@ def get_cached_follow_state(user, target) do
|
||||
Cachex.fetch!(:user_cache, key, fn _ -> {:commit, follow_state(user, target)} end)
|
||||
end
|
||||
|
||||
@spec set_follow_state_cache(String.t(), String.t(), String.t()) :: {:ok | :error, boolean()}
|
||||
def set_follow_state_cache(user_ap_id, target_ap_id, state) do
|
||||
Cachex.put(
|
||||
:user_cache,
|
||||
@ -1640,6 +1643,18 @@ def is_internal_user?(%User{nickname: nil}), do: true
|
||||
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
|
||||
def is_internal_user?(_), do: false
|
||||
|
||||
# A hack because user delete activities have a fake id for whatever reason
|
||||
# TODO: Get rid of this
|
||||
def get_delivered_users_by_object_id("pleroma:fake_object_id"), do: []
|
||||
|
||||
def get_delivered_users_by_object_id(object_id) do
|
||||
from(u in User,
|
||||
inner_join: delivery in assoc(u, :deliveries),
|
||||
where: delivery.object_id == ^object_id
|
||||
)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
def change_email(user, email) do
|
||||
user
|
||||
|> cast(%{email: email}, [:email])
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Activity.Ir.Topics
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Conversation
|
||||
alias Pleroma.Notification
|
||||
@ -16,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.MRF
|
||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||
alias Pleroma.Web.Streamer
|
||||
alias Pleroma.Web.WebFinger
|
||||
alias Pleroma.Workers.BackgroundWorker
|
||||
|
||||
@ -187,9 +189,7 @@ def stream_out_participations(participations) do
|
||||
participations
|
||||
|> Repo.preload(:user)
|
||||
|
||||
Enum.each(participations, fn participation ->
|
||||
Pleroma.Web.Streamer.stream("participation", participation)
|
||||
end)
|
||||
Streamer.stream("participation", participations)
|
||||
end
|
||||
|
||||
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
|
||||
@ -208,41 +208,15 @@ def stream_out_participations(%Object{data: %{"context" => context}}, user) do
|
||||
|
||||
def stream_out_participations(_, _), do: :noop
|
||||
|
||||
def stream_out(activity) do
|
||||
if activity.data["type"] in ["Create", "Announce", "Delete"] do
|
||||
object = Object.normalize(activity)
|
||||
# Do not stream out poll replies
|
||||
unless object.data["type"] == "Answer" do
|
||||
Pleroma.Web.Streamer.stream("user", activity)
|
||||
Pleroma.Web.Streamer.stream("list", activity)
|
||||
def stream_out(%Activity{data: %{"type" => data_type}} = activity)
|
||||
when data_type in ["Create", "Announce", "Delete"] do
|
||||
activity
|
||||
|> Topics.get_activity_topics()
|
||||
|> Streamer.stream(activity)
|
||||
end
|
||||
|
||||
if get_visibility(activity) == "public" do
|
||||
Pleroma.Web.Streamer.stream("public", activity)
|
||||
|
||||
if activity.local do
|
||||
Pleroma.Web.Streamer.stream("public:local", activity)
|
||||
end
|
||||
|
||||
if activity.data["type"] in ["Create"] do
|
||||
object.data
|
||||
|> Map.get("tag", [])
|
||||
|> Enum.filter(fn tag -> is_bitstring(tag) end)
|
||||
|> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
|
||||
|
||||
if object.data["attachment"] != [] do
|
||||
Pleroma.Web.Streamer.stream("public:media", activity)
|
||||
|
||||
if activity.local do
|
||||
Pleroma.Web.Streamer.stream("public:local:media", activity)
|
||||
end
|
||||
end
|
||||
end
|
||||
else
|
||||
if get_visibility(activity) == "direct",
|
||||
do: Pleroma.Web.Streamer.stream("direct", activity)
|
||||
end
|
||||
end
|
||||
end
|
||||
def stream_out(_activity) do
|
||||
:noop
|
||||
end
|
||||
|
||||
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
|
||||
@ -436,6 +410,7 @@ def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ tru
|
||||
end
|
||||
end
|
||||
|
||||
@spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
|
||||
def block(blocker, blocked, activity_id \\ nil, local \\ true) do
|
||||
outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
|
||||
unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
|
||||
@ -464,10 +439,11 @@ def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
|
||||
end
|
||||
end
|
||||
|
||||
@spec flag(map()) :: {:ok, Activity.t()} | any
|
||||
def flag(
|
||||
%{
|
||||
actor: actor,
|
||||
context: context,
|
||||
context: _context,
|
||||
account: account,
|
||||
statuses: statuses,
|
||||
content: content
|
||||
@ -479,14 +455,6 @@ def flag(
|
||||
|
||||
additional = params[:additional] || %{}
|
||||
|
||||
params = %{
|
||||
actor: actor,
|
||||
context: context,
|
||||
account: account,
|
||||
statuses: statuses,
|
||||
content: content
|
||||
}
|
||||
|
||||
additional =
|
||||
if forward do
|
||||
Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
|
||||
|
@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
|
||||
use Pleroma.Web, :controller
|
||||
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Delivery
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Object.Fetcher
|
||||
alias Pleroma.User
|
||||
@ -23,7 +24,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
|
||||
|
||||
action_fallback(:errors)
|
||||
|
||||
plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object])
|
||||
plug(
|
||||
Pleroma.Plugs.Cache,
|
||||
[query_params: false, tracking_fun: &__MODULE__.track_object_fetch/2]
|
||||
when action in [:activity, :object]
|
||||
)
|
||||
|
||||
plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay])
|
||||
plug(:set_requester_reachable when action in [:inbox])
|
||||
plug(:relay_active? when action in [:relay])
|
||||
@ -54,6 +60,7 @@ def object(conn, %{"uuid" => uuid}) do
|
||||
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
|
||||
{_, true} <- {:public?, Visibility.is_public?(object)} do
|
||||
conn
|
||||
|> assign(:tracking_fun_data, object.id)
|
||||
|> set_cache_ttl_for(object)
|
||||
|> put_resp_content_type("application/activity+json")
|
||||
|> put_view(ObjectView)
|
||||
@ -64,6 +71,16 @@ def object(conn, %{"uuid" => uuid}) do
|
||||
end
|
||||
end
|
||||
|
||||
def track_object_fetch(conn, nil), do: conn
|
||||
|
||||
def track_object_fetch(conn, object_id) do
|
||||
with %{assigns: %{user: %User{id: user_id}}} <- conn do
|
||||
Delivery.create(object_id, user_id)
|
||||
end
|
||||
|
||||
conn
|
||||
end
|
||||
|
||||
def object_likes(conn, %{"uuid" => uuid, "page" => page}) do
|
||||
with ap_id <- o_status_url(conn, :object, uuid),
|
||||
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
|
||||
@ -99,6 +116,7 @@ def activity(conn, %{"uuid" => uuid}) do
|
||||
%Activity{} = activity <- Activity.normalize(ap_id),
|
||||
{_, true} <- {:public?, Visibility.is_public?(activity)} do
|
||||
conn
|
||||
|> maybe_set_tracking_data(activity)
|
||||
|> set_cache_ttl_for(activity)
|
||||
|> put_resp_content_type("application/activity+json")
|
||||
|> put_view(ObjectView)
|
||||
@ -109,6 +127,13 @@ def activity(conn, %{"uuid" => uuid}) do
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do
|
||||
object_id = Object.normalize(activity).id
|
||||
assign(conn, :tracking_fun_data, object_id)
|
||||
end
|
||||
|
||||
defp maybe_set_tracking_data(conn, _activity), do: conn
|
||||
|
||||
defp set_cache_ttl_for(conn, %Activity{object: object}) do
|
||||
set_cache_ttl_for(conn, object)
|
||||
end
|
||||
|
@ -5,8 +5,10 @@
|
||||
defmodule Pleroma.Web.ActivityPub.Publisher do
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Delivery
|
||||
alias Pleroma.HTTP
|
||||
alias Pleroma.Instances
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.Relay
|
||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||
@ -116,7 +118,18 @@ defp recipients(actor, activity) do
|
||||
{:ok, []}
|
||||
end
|
||||
|
||||
Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers
|
||||
fetchers =
|
||||
with %Activity{data: %{"type" => "Delete"}} <- activity,
|
||||
%Object{id: object_id} <- Object.normalize(activity),
|
||||
fetchers <- User.get_delivered_users_by_object_id(object_id),
|
||||
_ <- Delivery.delete_all_by_object_id(object_id) do
|
||||
fetchers
|
||||
else
|
||||
_ ->
|
||||
[]
|
||||
end
|
||||
|
||||
Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers
|
||||
end
|
||||
|
||||
defp get_cc_ap_ids(ap_id, recipients) do
|
||||
|
@ -1064,7 +1064,7 @@ def upgrade_user_from_ap_id(ap_id) do
|
||||
|
||||
defp upgrade_user(user, data) do
|
||||
user
|
||||
|> User.upgrade_changeset(data)
|
||||
|> User.upgrade_changeset(data, true)
|
||||
|> User.update_and_set_cache()
|
||||
end
|
||||
|
||||
|
@ -33,50 +33,40 @@ def normalize_params(params) do
|
||||
Map.put(params, "actor", get_ap_id(params["actor"]))
|
||||
end
|
||||
|
||||
def determine_explicit_mentions(%{"tag" => tag} = _object) when is_list(tag) do
|
||||
tag
|
||||
|> Enum.filter(fn x -> is_map(x) end)
|
||||
|> Enum.filter(fn x -> x["type"] == "Mention" end)
|
||||
|> Enum.map(fn x -> x["href"] end)
|
||||
@spec determine_explicit_mentions(map()) :: map()
|
||||
def determine_explicit_mentions(%{"tag" => tag} = _) when is_list(tag) do
|
||||
Enum.flat_map(tag, fn
|
||||
%{"type" => "Mention", "href" => href} -> [href]
|
||||
_ -> []
|
||||
end)
|
||||
end
|
||||
|
||||
def determine_explicit_mentions(%{"tag" => tag} = object) when is_map(tag) do
|
||||
Map.put(object, "tag", [tag])
|
||||
object
|
||||
|> Map.put("tag", [tag])
|
||||
|> determine_explicit_mentions()
|
||||
end
|
||||
|
||||
def determine_explicit_mentions(_), do: []
|
||||
|
||||
@spec recipient_in_collection(any(), any()) :: boolean()
|
||||
defp recipient_in_collection(ap_id, coll) when is_binary(coll), do: ap_id == coll
|
||||
defp recipient_in_collection(ap_id, coll) when is_list(coll), do: ap_id in coll
|
||||
defp recipient_in_collection(_, _), do: false
|
||||
|
||||
@spec recipient_in_message(User.t(), User.t(), map()) :: boolean()
|
||||
def recipient_in_message(%User{ap_id: ap_id} = recipient, %User{} = actor, params) do
|
||||
addresses = [params["to"], params["cc"], params["bto"], params["bcc"]]
|
||||
|
||||
cond do
|
||||
recipient_in_collection(ap_id, params["to"]) ->
|
||||
true
|
||||
|
||||
recipient_in_collection(ap_id, params["cc"]) ->
|
||||
true
|
||||
|
||||
recipient_in_collection(ap_id, params["bto"]) ->
|
||||
true
|
||||
|
||||
recipient_in_collection(ap_id, params["bcc"]) ->
|
||||
true
|
||||
|
||||
Enum.any?(addresses, &recipient_in_collection(ap_id, &1)) -> true
|
||||
# if the message is unaddressed at all, then assume it is directly addressed
|
||||
# to the recipient
|
||||
!params["to"] && !params["cc"] && !params["bto"] && !params["bcc"] ->
|
||||
true
|
||||
|
||||
Enum.all?(addresses, &is_nil(&1)) -> true
|
||||
# if the message is sent from somebody the user is following, then assume it
|
||||
# is addressed to the recipient
|
||||
User.following?(recipient, actor) ->
|
||||
true
|
||||
|
||||
true ->
|
||||
false
|
||||
User.following?(recipient, actor) -> true
|
||||
true -> false
|
||||
end
|
||||
end
|
||||
|
||||
@ -179,53 +169,58 @@ def maybe_federate(_), do: :ok
|
||||
Adds an id and a published data if they aren't there,
|
||||
also adds it to an included object
|
||||
"""
|
||||
def lazy_put_activity_defaults(map, fake? \\ false) do
|
||||
map =
|
||||
if not fake? do
|
||||
%{data: %{"id" => context}, id: context_id} = create_context(map["context"])
|
||||
@spec lazy_put_activity_defaults(map(), boolean) :: map()
|
||||
def lazy_put_activity_defaults(map, fake? \\ false)
|
||||
|
||||
map
|
||||
|> Map.put_new_lazy("id", &generate_activity_id/0)
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("context", context)
|
||||
|> Map.put_new("context_id", context_id)
|
||||
else
|
||||
map
|
||||
|> Map.put_new("id", "pleroma:fakeid")
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("context", "pleroma:fakecontext")
|
||||
|> Map.put_new("context_id", -1)
|
||||
end
|
||||
def lazy_put_activity_defaults(map, true) do
|
||||
map
|
||||
|> Map.put_new("id", "pleroma:fakeid")
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("context", "pleroma:fakecontext")
|
||||
|> Map.put_new("context_id", -1)
|
||||
|> lazy_put_object_defaults(true)
|
||||
end
|
||||
|
||||
if is_map(map["object"]) do
|
||||
object = lazy_put_object_defaults(map["object"], map, fake?)
|
||||
%{map | "object" => object}
|
||||
else
|
||||
def lazy_put_activity_defaults(map, _fake?) do
|
||||
%{data: %{"id" => context}, id: context_id} = create_context(map["context"])
|
||||
|
||||
map
|
||||
|> Map.put_new_lazy("id", &generate_activity_id/0)
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("context", context)
|
||||
|> Map.put_new("context_id", context_id)
|
||||
|> lazy_put_object_defaults(false)
|
||||
end
|
||||
|
||||
# Adds an id and published date if they aren't there.
|
||||
#
|
||||
@spec lazy_put_object_defaults(map(), boolean()) :: map()
|
||||
defp lazy_put_object_defaults(%{"object" => map} = activity, true)
|
||||
when is_map(map) do
|
||||
object =
|
||||
map
|
||||
end
|
||||
|> Map.put_new("id", "pleroma:fake_object_id")
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("context", activity["context"])
|
||||
|> Map.put_new("context_id", activity["context_id"])
|
||||
|> Map.put_new("fake", true)
|
||||
|
||||
%{activity | "object" => object}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Adds an id and published date if they aren't there.
|
||||
"""
|
||||
def lazy_put_object_defaults(map, activity \\ %{}, fake?)
|
||||
defp lazy_put_object_defaults(%{"object" => map} = activity, _)
|
||||
when is_map(map) do
|
||||
object =
|
||||
map
|
||||
|> Map.put_new_lazy("id", &generate_object_id/0)
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("context", activity["context"])
|
||||
|> Map.put_new("context_id", activity["context_id"])
|
||||
|
||||
def lazy_put_object_defaults(map, activity, true = _fake?) do
|
||||
map
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("id", "pleroma:fake_object_id")
|
||||
|> Map.put_new("context", activity["context"])
|
||||
|> Map.put_new("fake", true)
|
||||
|> Map.put_new("context_id", activity["context_id"])
|
||||
%{activity | "object" => object}
|
||||
end
|
||||
|
||||
def lazy_put_object_defaults(map, activity, _fake?) do
|
||||
map
|
||||
|> Map.put_new_lazy("id", &generate_object_id/0)
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("context", activity["context"])
|
||||
|> Map.put_new("context_id", activity["context_id"])
|
||||
end
|
||||
defp lazy_put_object_defaults(activity, _), do: activity
|
||||
|
||||
@doc """
|
||||
Inserts a full object if it is contained in an activity.
|
||||
@ -345,24 +340,24 @@ defp fetch_likes(object) do
|
||||
@doc """
|
||||
Updates a follow activity's state (for locked accounts).
|
||||
"""
|
||||
@spec update_follow_state_for_all(Activity.t(), String.t()) :: {:ok, Activity} | {:error, any()}
|
||||
def update_follow_state_for_all(
|
||||
%Activity{data: %{"actor" => actor, "object" => object}} = activity,
|
||||
state
|
||||
) do
|
||||
try do
|
||||
Ecto.Adapters.SQL.query!(
|
||||
Repo,
|
||||
"UPDATE activities SET data = jsonb_set(data, '{state}', $1) WHERE data->>'type' = 'Follow' AND data->>'actor' = $2 AND data->>'object' = $3 AND data->>'state' = 'pending'",
|
||||
[state, actor, object]
|
||||
)
|
||||
"Follow"
|
||||
|> Activity.Queries.by_type()
|
||||
|> Activity.Queries.by_actor(actor)
|
||||
|> Activity.Queries.by_object_id(object)
|
||||
|> where(fragment("data->>'state' = 'pending'"))
|
||||
|> update(set: [data: fragment("jsonb_set(data, '{state}', ?)", ^state)])
|
||||
|> Repo.update_all([])
|
||||
|
||||
User.set_follow_state_cache(actor, object, state)
|
||||
activity = Activity.get_by_id(activity.id)
|
||||
{:ok, activity}
|
||||
rescue
|
||||
e ->
|
||||
{:error, e}
|
||||
end
|
||||
User.set_follow_state_cache(actor, object, state)
|
||||
|
||||
activity = Activity.get_by_id(activity.id)
|
||||
|
||||
{:ok, activity}
|
||||
end
|
||||
|
||||
def update_follow_state(
|
||||
@ -413,6 +408,7 @@ def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
|
||||
@doc """
|
||||
Retruns an existing announce activity if the notice has already been announced
|
||||
"""
|
||||
@spec get_existing_announce(String.t(), map()) :: Activity.t() | nil
|
||||
def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do
|
||||
"Announce"
|
||||
|> Activity.Queries.by_type()
|
||||
@ -495,33 +491,35 @@ def make_unlike_data(
|
||||
|> maybe_put("id", activity_id)
|
||||
end
|
||||
|
||||
@spec add_announce_to_object(Activity.t(), Object.t()) ::
|
||||
{:ok, Object.t()} | {:error, Ecto.Changeset.t()}
|
||||
def add_announce_to_object(
|
||||
%Activity{
|
||||
data: %{"actor" => actor, "cc" => [Pleroma.Constants.as_public()]}
|
||||
},
|
||||
%Activity{data: %{"actor" => actor, "cc" => [Pleroma.Constants.as_public()]}},
|
||||
object
|
||||
) do
|
||||
announcements =
|
||||
if is_list(object.data["announcements"]) do
|
||||
Enum.uniq([actor | object.data["announcements"]])
|
||||
else
|
||||
[actor]
|
||||
end
|
||||
announcements = take_announcements(object)
|
||||
|
||||
update_element_in_object("announcement", announcements, object)
|
||||
with announcements <- Enum.uniq([actor | announcements]) do
|
||||
update_element_in_object("announcement", announcements, object)
|
||||
end
|
||||
end
|
||||
|
||||
def add_announce_to_object(_, object), do: {:ok, object}
|
||||
|
||||
@spec remove_announce_from_object(Activity.t(), Object.t()) ::
|
||||
{:ok, Object.t()} | {:error, Ecto.Changeset.t()}
|
||||
def remove_announce_from_object(%Activity{data: %{"actor" => actor}}, object) do
|
||||
announcements =
|
||||
if is_list(object.data["announcements"]), do: object.data["announcements"], else: []
|
||||
|
||||
with announcements <- announcements |> List.delete(actor) do
|
||||
with announcements <- List.delete(take_announcements(object), actor) do
|
||||
update_element_in_object("announcement", announcements, object)
|
||||
end
|
||||
end
|
||||
|
||||
defp take_announcements(%{data: %{"announcements" => announcements}} = _)
|
||||
when is_list(announcements),
|
||||
do: announcements
|
||||
|
||||
defp take_announcements(_), do: []
|
||||
|
||||
#### Unfollow-related helpers
|
||||
|
||||
def make_unfollow_data(follower, followed, follow_activity, activity_id) do
|
||||
@ -535,6 +533,7 @@ def make_unfollow_data(follower, followed, follow_activity, activity_id) do
|
||||
end
|
||||
|
||||
#### Block-related helpers
|
||||
@spec fetch_latest_block(User.t(), User.t()) :: Activity.t() | nil
|
||||
def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do
|
||||
"Block"
|
||||
|> Activity.Queries.by_type()
|
||||
@ -583,28 +582,32 @@ def make_create_data(params, additional) do
|
||||
end
|
||||
|
||||
#### Flag-related helpers
|
||||
|
||||
def make_flag_data(params, additional) do
|
||||
status_ap_ids =
|
||||
Enum.map(params.statuses || [], fn
|
||||
%Activity{} = act -> act.data["id"]
|
||||
act when is_map(act) -> act["id"]
|
||||
act when is_binary(act) -> act
|
||||
end)
|
||||
|
||||
object = [params.account.ap_id] ++ status_ap_ids
|
||||
|
||||
@spec make_flag_data(map(), map()) :: map()
|
||||
def make_flag_data(%{actor: actor, context: context, content: content} = params, additional) do
|
||||
%{
|
||||
"type" => "Flag",
|
||||
"actor" => params.actor.ap_id,
|
||||
"content" => params.content,
|
||||
"object" => object,
|
||||
"context" => params.context,
|
||||
"actor" => actor.ap_id,
|
||||
"content" => content,
|
||||
"object" => build_flag_object(params),
|
||||
"context" => context,
|
||||
"state" => "open"
|
||||
}
|
||||
|> Map.merge(additional)
|
||||
end
|
||||
|
||||
def make_flag_data(_, _), do: %{}
|
||||
|
||||
defp build_flag_object(%{account: account, statuses: statuses} = _) do
|
||||
[account.ap_id] ++
|
||||
Enum.map(statuses || [], fn
|
||||
%Activity{} = act -> act.data["id"]
|
||||
act when is_map(act) -> act["id"]
|
||||
act when is_binary(act) -> act
|
||||
end)
|
||||
end
|
||||
|
||||
defp build_flag_object(_), do: []
|
||||
|
||||
@doc """
|
||||
Fetches the OrderedCollection/OrderedCollectionPage from `from`, limiting the amount of pages fetched after
|
||||
the first one to `pages_left` pages.
|
||||
|
@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.OAuth.Token
|
||||
alias Pleroma.Web.Streamer
|
||||
|
||||
@behaviour :cowboy_websocket
|
||||
|
||||
@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
||||
]
|
||||
@anonymous_streams ["public", "public:local", "hashtag"]
|
||||
|
||||
# Handled by periodic keepalive in Pleroma.Web.Streamer.
|
||||
# Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
|
||||
@timeout :infinity
|
||||
|
||||
def init(%{qs: qs} = req, state) do
|
||||
@ -65,7 +66,7 @@ def websocket_info(:subscribe, state) do
|
||||
}, topic #{state.topic}"
|
||||
)
|
||||
|
||||
Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
|
||||
Streamer.add_socket(state.topic, streamer_socket(state))
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@ -80,7 +81,7 @@ def terminate(reason, _req, state) do
|
||||
}, topic #{state.topic || "?"}: #{inspect(reason)}"
|
||||
)
|
||||
|
||||
Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
|
||||
Streamer.remove_socket(state.topic, streamer_socket(state))
|
||||
:ok
|
||||
end
|
||||
|
||||
|
@ -81,6 +81,7 @@ defp parse_url(url) do
|
||||
{:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: @hackney_options)
|
||||
|
||||
html
|
||||
|> parse_html
|
||||
|> maybe_parse()
|
||||
|> Map.put(:url, url)
|
||||
|> clean_parsed_data()
|
||||
@ -91,6 +92,8 @@ defp parse_url(url) do
|
||||
end
|
||||
end
|
||||
|
||||
defp parse_html(html), do: Floki.parse(html)
|
||||
|
||||
defp maybe_parse(html) do
|
||||
Enum.reduce_while(parsers(), %{}, fn parser, acc ->
|
||||
case parser.parse(html, acc) do
|
||||
@ -100,7 +103,8 @@ defp maybe_parse(html) do
|
||||
end)
|
||||
end
|
||||
|
||||
defp check_parsed_data(%{title: title} = data) when is_binary(title) and byte_size(title) > 0 do
|
||||
defp check_parsed_data(%{title: title} = data)
|
||||
when is_binary(title) and byte_size(title) > 0 do
|
||||
{:ok, data}
|
||||
end
|
||||
|
||||
|
@ -135,6 +135,7 @@ defmodule Pleroma.Web.Router do
|
||||
|
||||
pipeline :http_signature do
|
||||
plug(Pleroma.Web.Plugs.HTTPSignaturePlug)
|
||||
plug(Pleroma.Web.Plugs.MappedSignatureToIdentityPlug)
|
||||
end
|
||||
|
||||
scope "/api/pleroma", Pleroma.Web.TwitterAPI do
|
||||
@ -514,6 +515,7 @@ defmodule Pleroma.Web.Router do
|
||||
|
||||
scope "/", Pleroma.Web do
|
||||
pipe_through(:ostatus)
|
||||
pipe_through(:http_signature)
|
||||
|
||||
get("/objects/:uuid", OStatus.OStatusController, :object)
|
||||
get("/activities/:uuid", OStatus.OStatusController, :activity)
|
||||
|
@ -1,318 +0,0 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.Streamer do
|
||||
use GenServer
|
||||
require Logger
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Conversation.Participation
|
||||
alias Pleroma.Notification
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||
alias Pleroma.Web.ActivityPub.Visibility
|
||||
alias Pleroma.Web.CommonAPI
|
||||
alias Pleroma.Web.MastodonAPI.NotificationView
|
||||
|
||||
@keepalive_interval :timer.seconds(30)
|
||||
|
||||
def start_link(_) do
|
||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
||||
end
|
||||
|
||||
def add_socket(topic, socket) do
|
||||
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
|
||||
end
|
||||
|
||||
def remove_socket(topic, socket) do
|
||||
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
|
||||
end
|
||||
|
||||
def stream(topic, item) do
|
||||
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
|
||||
end
|
||||
|
||||
def init(args) do
|
||||
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
|
||||
|
||||
{:ok, args}
|
||||
end
|
||||
|
||||
def handle_info(%{action: :ping}, topics) do
|
||||
topics
|
||||
|> Map.values()
|
||||
|> List.flatten()
|
||||
|> Enum.each(fn socket ->
|
||||
Logger.debug("Sending keepalive ping")
|
||||
send(socket.transport_pid, {:text, ""})
|
||||
end)
|
||||
|
||||
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
|
||||
|
||||
{:noreply, topics}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
|
||||
recipient_topics =
|
||||
User.get_recipients_from_activity(item)
|
||||
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
|
||||
|
||||
Enum.each(recipient_topics || [], fn user_topic ->
|
||||
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
|
||||
push_to_socket(topics, user_topic, item)
|
||||
end)
|
||||
|
||||
{:noreply, topics}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
|
||||
user_topic = "direct:#{participation.user_id}"
|
||||
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
|
||||
|
||||
push_to_socket(topics, user_topic, participation)
|
||||
|
||||
{:noreply, topics}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
|
||||
# filter the recipient list if the activity is not public, see #270.
|
||||
recipient_lists =
|
||||
case Visibility.is_public?(item) do
|
||||
true ->
|
||||
Pleroma.List.get_lists_from_activity(item)
|
||||
|
||||
_ ->
|
||||
Pleroma.List.get_lists_from_activity(item)
|
||||
|> Enum.filter(fn list ->
|
||||
owner = User.get_cached_by_id(list.user_id)
|
||||
|
||||
Visibility.visible_for_user?(item, owner)
|
||||
end)
|
||||
end
|
||||
|
||||
recipient_topics =
|
||||
recipient_lists
|
||||
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
|
||||
|
||||
Enum.each(recipient_topics || [], fn list_topic ->
|
||||
Logger.debug("Trying to push message to #{list_topic}\n\n")
|
||||
push_to_socket(topics, list_topic, item)
|
||||
end)
|
||||
|
||||
{:noreply, topics}
|
||||
end
|
||||
|
||||
def handle_cast(
|
||||
%{action: :stream, topic: topic, item: %Notification{} = item},
|
||||
topics
|
||||
)
|
||||
when topic in ["user", "user:notification"] do
|
||||
topics
|
||||
|> Map.get("#{topic}:#{item.user_id}", [])
|
||||
|> Enum.each(fn socket ->
|
||||
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
|
||||
true <- should_send?(user, item) do
|
||||
send(
|
||||
socket.transport_pid,
|
||||
{:text, represent_notification(socket.assigns[:user], item)}
|
||||
)
|
||||
end
|
||||
end)
|
||||
|
||||
{:noreply, topics}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
|
||||
Logger.debug("Trying to push to users")
|
||||
|
||||
recipient_topics =
|
||||
User.get_recipients_from_activity(item)
|
||||
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
|
||||
|
||||
Enum.each(recipient_topics, fn topic ->
|
||||
push_to_socket(topics, topic, item)
|
||||
end)
|
||||
|
||||
{:noreply, topics}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
|
||||
Logger.debug("Trying to push to #{topic}")
|
||||
Logger.debug("Pushing item to #{topic}")
|
||||
push_to_socket(topics, topic, item)
|
||||
{:noreply, topics}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
|
||||
topic = internal_topic(topic, socket)
|
||||
sockets_for_topic = sockets[topic] || []
|
||||
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
|
||||
sockets = Map.put(sockets, topic, sockets_for_topic)
|
||||
Logger.debug("Got new conn for #{topic}")
|
||||
{:noreply, sockets}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
|
||||
topic = internal_topic(topic, socket)
|
||||
sockets_for_topic = sockets[topic] || []
|
||||
sockets_for_topic = List.delete(sockets_for_topic, socket)
|
||||
sockets = Map.put(sockets, topic, sockets_for_topic)
|
||||
Logger.debug("Removed conn for #{topic}")
|
||||
{:noreply, sockets}
|
||||
end
|
||||
|
||||
def handle_cast(m, state) do
|
||||
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp represent_update(%Activity{} = activity, %User{} = user) do
|
||||
%{
|
||||
event: "update",
|
||||
payload:
|
||||
Pleroma.Web.MastodonAPI.StatusView.render(
|
||||
"status.json",
|
||||
activity: activity,
|
||||
for: user
|
||||
)
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
|
||||
defp represent_update(%Activity{} = activity) do
|
||||
%{
|
||||
event: "update",
|
||||
payload:
|
||||
Pleroma.Web.MastodonAPI.StatusView.render(
|
||||
"status.json",
|
||||
activity: activity
|
||||
)
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
|
||||
def represent_conversation(%Participation{} = participation) do
|
||||
%{
|
||||
event: "conversation",
|
||||
payload:
|
||||
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
|
||||
participation: participation,
|
||||
for: participation.user
|
||||
})
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
|
||||
@spec represent_notification(User.t(), Notification.t()) :: binary()
|
||||
defp represent_notification(%User{} = user, %Notification{} = notify) do
|
||||
%{
|
||||
event: "notification",
|
||||
payload:
|
||||
NotificationView.render(
|
||||
"show.json",
|
||||
%{notification: notify, for: user}
|
||||
)
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
|
||||
defp should_send?(%User{} = user, %Activity{} = item) do
|
||||
blocks = user.info.blocks || []
|
||||
mutes = user.info.mutes || []
|
||||
reblog_mutes = user.info.muted_reblogs || []
|
||||
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
|
||||
|
||||
with parent when not is_nil(parent) <- Object.normalize(item),
|
||||
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
|
||||
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
|
||||
%{host: item_host} <- URI.parse(item.actor),
|
||||
%{host: parent_host} <- URI.parse(parent.data["actor"]),
|
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
|
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
|
||||
true <- thread_containment(item, user),
|
||||
false <- CommonAPI.thread_muted?(user, item) do
|
||||
true
|
||||
else
|
||||
_ -> false
|
||||
end
|
||||
end
|
||||
|
||||
defp should_send?(%User{} = user, %Notification{activity: activity}) do
|
||||
should_send?(user, activity)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
|
||||
Enum.each(topics[topic] || [], fn socket ->
|
||||
# Get the current user so we have up-to-date blocks etc.
|
||||
if socket.assigns[:user] do
|
||||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
|
||||
|
||||
if should_send?(user, item) do
|
||||
send(socket.transport_pid, {:text, represent_update(item, user)})
|
||||
end
|
||||
else
|
||||
send(socket.transport_pid, {:text, represent_update(item)})
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Participation{} = participation) do
|
||||
Enum.each(topics[topic] || [], fn socket ->
|
||||
send(socket.transport_pid, {:text, represent_conversation(participation)})
|
||||
end)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Activity{
|
||||
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
|
||||
}) do
|
||||
Enum.each(topics[topic] || [], fn socket ->
|
||||
send(
|
||||
socket.transport_pid,
|
||||
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
|
||||
)
|
||||
end)
|
||||
end
|
||||
|
||||
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
|
||||
|
||||
def push_to_socket(topics, topic, item) do
|
||||
Enum.each(topics[topic] || [], fn socket ->
|
||||
# Get the current user so we have up-to-date blocks etc.
|
||||
if socket.assigns[:user] do
|
||||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
|
||||
blocks = user.info.blocks || []
|
||||
mutes = user.info.mutes || []
|
||||
|
||||
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
|
||||
true <- thread_containment(item, user) do
|
||||
send(socket.transport_pid, {:text, represent_update(item, user)})
|
||||
end
|
||||
else
|
||||
send(socket.transport_pid, {:text, represent_update(item)})
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
|
||||
"#{topic}:#{socket.assigns[:user].id}"
|
||||
end
|
||||
|
||||
defp internal_topic(topic, _), do: topic
|
||||
|
||||
@spec thread_containment(Activity.t(), User.t()) :: boolean()
|
||||
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
|
||||
|
||||
defp thread_containment(activity, user) do
|
||||
if Config.get([:instance, :skip_thread_containment]) do
|
||||
true
|
||||
else
|
||||
ActivityPub.contain_activity(activity, user)
|
||||
end
|
||||
end
|
||||
end
|
33
lib/pleroma/web/streamer/ping.ex
Normal file
33
lib/pleroma/web/streamer/ping.ex
Normal file
@ -0,0 +1,33 @@
|
||||
defmodule Pleroma.Web.Streamer.Ping do
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Web.Streamer.State
|
||||
alias Pleroma.Web.Streamer.StreamerSocket
|
||||
|
||||
@keepalive_interval :timer.seconds(30)
|
||||
|
||||
def start_link(opts) do
|
||||
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
|
||||
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
|
||||
end
|
||||
|
||||
def init(%{ping_interval: ping_interval} = args) do
|
||||
Process.send_after(self(), :ping, ping_interval)
|
||||
{:ok, args}
|
||||
end
|
||||
|
||||
def handle_info(:ping, %{ping_interval: ping_interval} = state) do
|
||||
State.get_sockets()
|
||||
|> Map.values()
|
||||
|> List.flatten()
|
||||
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||
Logger.debug("Sending keepalive ping")
|
||||
send(transport_pid, {:text, ""})
|
||||
end)
|
||||
|
||||
Process.send_after(self(), :ping, ping_interval)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
78
lib/pleroma/web/streamer/state.ex
Normal file
78
lib/pleroma/web/streamer/state.ex
Normal file
@ -0,0 +1,78 @@
|
||||
defmodule Pleroma.Web.Streamer.State do
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Web.Streamer.StreamerSocket
|
||||
|
||||
@env Mix.env()
|
||||
|
||||
def start_link(_) do
|
||||
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
|
||||
end
|
||||
|
||||
def add_socket(topic, socket) do
|
||||
GenServer.call(__MODULE__, {:add, topic, socket})
|
||||
end
|
||||
|
||||
def remove_socket(topic, socket) do
|
||||
do_remove_socket(@env, topic, socket)
|
||||
end
|
||||
|
||||
def get_sockets do
|
||||
%{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
|
||||
stream_sockets
|
||||
end
|
||||
|
||||
def init(init_arg) do
|
||||
{:ok, init_arg}
|
||||
end
|
||||
|
||||
def handle_call(:get_state, _from, state) do
|
||||
{:reply, state, state}
|
||||
end
|
||||
|
||||
def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
|
||||
internal_topic = internal_topic(topic, socket)
|
||||
stream_socket = StreamerSocket.from_socket(socket)
|
||||
|
||||
sockets_for_topic =
|
||||
sockets
|
||||
|> Map.get(internal_topic, [])
|
||||
|> List.insert_at(0, stream_socket)
|
||||
|> Enum.uniq()
|
||||
|
||||
state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
|
||||
Logger.debug("Got new conn for #{topic}")
|
||||
{:reply, state, state}
|
||||
end
|
||||
|
||||
def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
|
||||
internal_topic = internal_topic(topic, socket)
|
||||
stream_socket = StreamerSocket.from_socket(socket)
|
||||
|
||||
sockets_for_topic =
|
||||
sockets
|
||||
|> Map.get(internal_topic, [])
|
||||
|> List.delete(stream_socket)
|
||||
|
||||
state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
|
||||
{:reply, state, state}
|
||||
end
|
||||
|
||||
defp do_remove_socket(:test, _, _) do
|
||||
:ok
|
||||
end
|
||||
|
||||
defp do_remove_socket(_env, topic, socket) do
|
||||
GenServer.call(__MODULE__, {:remove, topic, socket})
|
||||
end
|
||||
|
||||
defp internal_topic(topic, socket)
|
||||
when topic in ~w[user user:notification direct] do
|
||||
"#{topic}:#{socket.assigns[:user].id}"
|
||||
end
|
||||
|
||||
defp internal_topic(topic, _) do
|
||||
topic
|
||||
end
|
||||
end
|
55
lib/pleroma/web/streamer/streamer.ex
Normal file
55
lib/pleroma/web/streamer/streamer.ex
Normal file
@ -0,0 +1,55 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.Streamer do
|
||||
alias Pleroma.Web.Streamer.State
|
||||
alias Pleroma.Web.Streamer.Worker
|
||||
|
||||
@timeout 60_000
|
||||
@mix_env Mix.env()
|
||||
|
||||
def add_socket(topic, socket) do
|
||||
State.add_socket(topic, socket)
|
||||
end
|
||||
|
||||
def remove_socket(topic, socket) do
|
||||
State.remove_socket(topic, socket)
|
||||
end
|
||||
|
||||
def get_sockets do
|
||||
State.get_sockets()
|
||||
end
|
||||
|
||||
def stream(topics, items) do
|
||||
if should_send?() do
|
||||
Task.async(fn ->
|
||||
:poolboy.transaction(
|
||||
:streamer_worker,
|
||||
&Worker.stream(&1, topics, items),
|
||||
@timeout
|
||||
)
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
def supervisor, do: Pleroma.Web.Streamer.Supervisor
|
||||
|
||||
defp should_send? do
|
||||
handle_should_send(@mix_env)
|
||||
end
|
||||
|
||||
defp handle_should_send(:test) do
|
||||
case Process.whereis(:streamer_worker) do
|
||||
nil ->
|
||||
false
|
||||
|
||||
pid ->
|
||||
Process.alive?(pid)
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_should_send(_) do
|
||||
true
|
||||
end
|
||||
end
|
31
lib/pleroma/web/streamer/streamer_socket.ex
Normal file
31
lib/pleroma/web/streamer/streamer_socket.ex
Normal file
@ -0,0 +1,31 @@
|
||||
defmodule Pleroma.Web.Streamer.StreamerSocket do
|
||||
defstruct transport_pid: nil, user: nil
|
||||
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.Streamer.StreamerSocket
|
||||
|
||||
def from_socket(%{
|
||||
transport_pid: transport_pid,
|
||||
assigns: %{user: nil}
|
||||
}) do
|
||||
%StreamerSocket{
|
||||
transport_pid: transport_pid
|
||||
}
|
||||
end
|
||||
|
||||
def from_socket(%{
|
||||
transport_pid: transport_pid,
|
||||
assigns: %{user: %User{} = user}
|
||||
}) do
|
||||
%StreamerSocket{
|
||||
transport_pid: transport_pid,
|
||||
user: user
|
||||
}
|
||||
end
|
||||
|
||||
def from_socket(%{transport_pid: transport_pid}) do
|
||||
%StreamerSocket{
|
||||
transport_pid: transport_pid
|
||||
}
|
||||
end
|
||||
end
|
33
lib/pleroma/web/streamer/supervisor.ex
Normal file
33
lib/pleroma/web/streamer/supervisor.ex
Normal file
@ -0,0 +1,33 @@
|
||||
defmodule Pleroma.Web.Streamer.Supervisor do
|
||||
use Supervisor
|
||||
|
||||
def start_link(opts) do
|
||||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
def init(args) do
|
||||
children = [
|
||||
{Pleroma.Web.Streamer.State, args},
|
||||
{Pleroma.Web.Streamer.Ping, args},
|
||||
:poolboy.child_spec(:streamer_worker, poolboy_config())
|
||||
]
|
||||
|
||||
opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
|
||||
Supervisor.init(children, opts)
|
||||
end
|
||||
|
||||
defp poolboy_config do
|
||||
opts =
|
||||
Pleroma.Config.get(:streamer,
|
||||
workers: 3,
|
||||
overflow_workers: 2
|
||||
)
|
||||
|
||||
[
|
||||
{:name, {:local, :streamer_worker}},
|
||||
{:worker_module, Pleroma.Web.Streamer.Worker},
|
||||
{:size, opts[:workers]},
|
||||
{:max_overflow, opts[:overflow_workers]}
|
||||
]
|
||||
end
|
||||
end
|
220
lib/pleroma/web/streamer/worker.ex
Normal file
220
lib/pleroma/web/streamer/worker.ex
Normal file
@ -0,0 +1,220 @@
|
||||
defmodule Pleroma.Web.Streamer.Worker do
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Conversation.Participation
|
||||
alias Pleroma.Notification
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||
alias Pleroma.Web.ActivityPub.Visibility
|
||||
alias Pleroma.Web.CommonAPI
|
||||
alias Pleroma.Web.Streamer.State
|
||||
alias Pleroma.Web.Streamer.StreamerSocket
|
||||
alias Pleroma.Web.StreamerView
|
||||
|
||||
def start_link(_) do
|
||||
GenServer.start_link(__MODULE__, %{}, [])
|
||||
end
|
||||
|
||||
def init(init_arg) do
|
||||
{:ok, init_arg}
|
||||
end
|
||||
|
||||
def stream(pid, topics, items) do
|
||||
GenServer.call(pid, {:stream, topics, items})
|
||||
end
|
||||
|
||||
def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
|
||||
Enum.each(topics, fn t ->
|
||||
do_stream(%{topic: t, item: item})
|
||||
end)
|
||||
|
||||
{:reply, state, state}
|
||||
end
|
||||
|
||||
def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
|
||||
Enum.each(items, fn i ->
|
||||
do_stream(%{topic: topic, item: i})
|
||||
end)
|
||||
|
||||
{:reply, state, state}
|
||||
end
|
||||
|
||||
def handle_call({:stream, topic, item}, _from, state) do
|
||||
do_stream(%{topic: topic, item: item})
|
||||
|
||||
{:reply, state, state}
|
||||
end
|
||||
|
||||
defp do_stream(%{topic: "direct", item: item}) do
|
||||
recipient_topics =
|
||||
User.get_recipients_from_activity(item)
|
||||
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
|
||||
|
||||
Enum.each(recipient_topics, fn user_topic ->
|
||||
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
|
||||
push_to_socket(State.get_sockets(), user_topic, item)
|
||||
end)
|
||||
end
|
||||
|
||||
defp do_stream(%{topic: "participation", item: participation}) do
|
||||
user_topic = "direct:#{participation.user_id}"
|
||||
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
|
||||
|
||||
push_to_socket(State.get_sockets(), user_topic, participation)
|
||||
end
|
||||
|
||||
defp do_stream(%{topic: "list", item: item}) do
|
||||
# filter the recipient list if the activity is not public, see #270.
|
||||
recipient_lists =
|
||||
case Visibility.is_public?(item) do
|
||||
true ->
|
||||
Pleroma.List.get_lists_from_activity(item)
|
||||
|
||||
_ ->
|
||||
Pleroma.List.get_lists_from_activity(item)
|
||||
|> Enum.filter(fn list ->
|
||||
owner = User.get_cached_by_id(list.user_id)
|
||||
|
||||
Visibility.visible_for_user?(item, owner)
|
||||
end)
|
||||
end
|
||||
|
||||
recipient_topics =
|
||||
recipient_lists
|
||||
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
|
||||
|
||||
Enum.each(recipient_topics, fn list_topic ->
|
||||
Logger.debug("Trying to push message to #{list_topic}\n\n")
|
||||
push_to_socket(State.get_sockets(), list_topic, item)
|
||||
end)
|
||||
end
|
||||
|
||||
defp do_stream(%{topic: topic, item: %Notification{} = item})
|
||||
when topic in ["user", "user:notification"] do
|
||||
State.get_sockets()
|
||||
|> Map.get("#{topic}:#{item.user_id}", [])
|
||||
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
|
||||
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
|
||||
true <- should_send?(user, item) do
|
||||
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp do_stream(%{topic: "user", item: item}) do
|
||||
Logger.debug("Trying to push to users")
|
||||
|
||||
recipient_topics =
|
||||
User.get_recipients_from_activity(item)
|
||||
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
|
||||
|
||||
Enum.each(recipient_topics, fn topic ->
|
||||
push_to_socket(State.get_sockets(), topic, item)
|
||||
end)
|
||||
end
|
||||
|
||||
defp do_stream(%{topic: topic, item: item}) do
|
||||
Logger.debug("Trying to push to #{topic}")
|
||||
Logger.debug("Pushing item to #{topic}")
|
||||
push_to_socket(State.get_sockets(), topic, item)
|
||||
end
|
||||
|
||||
defp should_send?(%User{} = user, %Activity{} = item) do
|
||||
blocks = user.info.blocks || []
|
||||
mutes = user.info.mutes || []
|
||||
reblog_mutes = user.info.muted_reblogs || []
|
||||
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
|
||||
|
||||
with parent when not is_nil(parent) <- Object.normalize(item),
|
||||
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
|
||||
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
|
||||
%{host: item_host} <- URI.parse(item.actor),
|
||||
%{host: parent_host} <- URI.parse(parent.data["actor"]),
|
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
|
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
|
||||
true <- thread_containment(item, user),
|
||||
false <- CommonAPI.thread_muted?(user, item) do
|
||||
true
|
||||
else
|
||||
_ -> false
|
||||
end
|
||||
end
|
||||
|
||||
defp should_send?(%User{} = user, %Notification{activity: activity}) do
|
||||
should_send?(user, activity)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
|
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{
|
||||
transport_pid: transport_pid,
|
||||
user: socket_user
|
||||
} ->
|
||||
# Get the current user so we have up-to-date blocks etc.
|
||||
if socket_user do
|
||||
user = User.get_cached_by_ap_id(socket_user.ap_id)
|
||||
|
||||
if should_send?(user, item) do
|
||||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
|
||||
end
|
||||
else
|
||||
send(transport_pid, {:text, StreamerView.render("update.json", item)})
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Participation{} = participation) do
|
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
|
||||
end)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Activity{
|
||||
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
|
||||
}) do
|
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||
send(
|
||||
transport_pid,
|
||||
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
|
||||
)
|
||||
end)
|
||||
end
|
||||
|
||||
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
|
||||
|
||||
def push_to_socket(topics, topic, item) do
|
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{
|
||||
transport_pid: transport_pid,
|
||||
user: socket_user
|
||||
} ->
|
||||
# Get the current user so we have up-to-date blocks etc.
|
||||
if socket_user do
|
||||
user = User.get_cached_by_ap_id(socket_user.ap_id)
|
||||
blocks = user.info.blocks || []
|
||||
mutes = user.info.mutes || []
|
||||
|
||||
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
|
||||
true <- thread_containment(item, user) do
|
||||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
|
||||
end
|
||||
else
|
||||
send(transport_pid, {:text, StreamerView.render("update.json", item)})
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
@spec thread_containment(Activity.t(), User.t()) :: boolean()
|
||||
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
|
||||
|
||||
defp thread_containment(activity, user) do
|
||||
if Config.get([:instance, :skip_thread_containment]) do
|
||||
true
|
||||
else
|
||||
ActivityPub.contain_activity(activity, user)
|
||||
end
|
||||
end
|
||||
end
|
66
lib/pleroma/web/views/streamer_view.ex
Normal file
66
lib/pleroma/web/views/streamer_view.ex
Normal file
@ -0,0 +1,66 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.StreamerView do
|
||||
use Pleroma.Web, :view
|
||||
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Conversation.Participation
|
||||
alias Pleroma.Notification
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.MastodonAPI.NotificationView
|
||||
|
||||
def render("update.json", %Activity{} = activity, %User{} = user) do
|
||||
%{
|
||||
event: "update",
|
||||
payload:
|
||||
Pleroma.Web.MastodonAPI.StatusView.render(
|
||||
"status.json",
|
||||
activity: activity,
|
||||
for: user
|
||||
)
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
|
||||
def render("notification.json", %User{} = user, %Notification{} = notify) do
|
||||
%{
|
||||
event: "notification",
|
||||
payload:
|
||||
NotificationView.render(
|
||||
"show.json",
|
||||
%{notification: notify, for: user}
|
||||
)
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
|
||||
def render("update.json", %Activity{} = activity) do
|
||||
%{
|
||||
event: "update",
|
||||
payload:
|
||||
Pleroma.Web.MastodonAPI.StatusView.render(
|
||||
"status.json",
|
||||
activity: activity
|
||||
)
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
|
||||
def render("conversation.json", %Participation{} = participation) do
|
||||
%{
|
||||
event: "conversation",
|
||||
payload:
|
||||
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
|
||||
participation: participation,
|
||||
for: participation.user
|
||||
})
|
||||
|> Jason.encode!()
|
||||
}
|
||||
|> Jason.encode!()
|
||||
end
|
||||
end
|
@ -10,7 +10,11 @@ defmodule Pleroma.Workers.WebPusherWorker do
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
|
||||
notification = Repo.get(Notification, notification_id)
|
||||
notification =
|
||||
Notification
|
||||
|> Repo.get(notification_id)
|
||||
|> Repo.preload([:activity])
|
||||
|
||||
Pleroma.Web.Push.Impl.perform(notification)
|
||||
end
|
||||
end
|
||||
|
5
mix.exs
5
mix.exs
@ -101,7 +101,7 @@ defp deps do
|
||||
{:phoenix_ecto, "~> 4.0"},
|
||||
{:ecto_sql, "~> 3.1"},
|
||||
{:postgrex, ">= 0.13.5"},
|
||||
{:oban, "~> 0.7"},
|
||||
{:oban, "~> 0.8.1"},
|
||||
{:quantum, "~> 2.3"},
|
||||
{:gettext, "~> 0.15"},
|
||||
{:comeonin, "~> 4.1.1"},
|
||||
@ -133,7 +133,7 @@ defp deps do
|
||||
{:phoenix_swoosh, "~> 0.2"},
|
||||
{:gen_smtp, "~> 0.13"},
|
||||
{:websocket_client, git: "https://github.com/jeremyong/websocket_client.git", only: :test},
|
||||
{:floki, "~> 0.20.0"},
|
||||
{:floki, "~> 0.23.0"},
|
||||
{:ex_syslogger, github: "slashmili/ex_syslogger", tag: "1.4.0"},
|
||||
{:timex, "~> 3.5"},
|
||||
{:ueberauth, "~> 0.4"},
|
||||
@ -144,6 +144,7 @@ defp deps do
|
||||
git: "https://git.pleroma.social/pleroma/http_signatures.git",
|
||||
ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"},
|
||||
{:telemetry, "~> 0.3"},
|
||||
{:poolboy, "~> 1.5"},
|
||||
{:prometheus_ex, "~> 3.0"},
|
||||
{:prometheus_plugs, "~> 1.1"},
|
||||
{:prometheus_phoenix, "~> 1.3"},
|
||||
|
11
mix.lock
11
mix.lock
@ -21,8 +21,8 @@
|
||||
"decimal": {:hex, :decimal, "1.8.0", "ca462e0d885f09a1c5a342dbd7c1dcf27ea63548c65a65e67334f4b61803822e", [:mix], [], "hexpm"},
|
||||
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"},
|
||||
"earmark": {:hex, :earmark, "1.3.6", "ce1d0675e10a5bb46b007549362bd3f5f08908843957687d8484fe7f37466b19", [:mix], [], "hexpm"},
|
||||
"ecto": {:hex, :ecto, "3.1.4", "69d852da7a9f04ede725855a35ede48d158ca11a404fe94f8b2fb3b2162cd3c9", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
|
||||
"ecto_sql": {:hex, :ecto_sql, "3.1.3", "2c536139190492d9de33c5fefac7323c5eaaa82e1b9bf93482a14649042f7cd9", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.1.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:myxql, "~> 0.2.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"ecto": {:hex, :ecto, "3.2.0", "940e2598813f205223d60c78d66e514afe1db5167ed8075510a59e496619cfb5", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
|
||||
"ecto_sql": {:hex, :ecto_sql, "3.2.0", "751cea597e8deb616084894dd75cbabfdbe7255ff01e8c058ca13f0353a3921b", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.2.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.2.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"esshd": {:hex, :esshd, "0.1.0", "6f93a2062adb43637edad0ea7357db2702a4b80dd9683482fe00f5134e97f4c1", [:mix], [], "hexpm"},
|
||||
"eternal": {:hex, :eternal, "1.2.0", "e2a6b6ce3b8c248f7dc31451aefca57e3bdf0e48d73ae5043229380a67614c41", [:mix], [], "hexpm"},
|
||||
"ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"},
|
||||
@ -34,7 +34,7 @@
|
||||
"ex_rated": {:hex, :ex_rated, "1.3.3", "30ecbdabe91f7eaa9d37fa4e81c85ba420f371babeb9d1910adbcd79ec798d27", [:mix], [{:ex2ms, "~> 1.5", [hex: :ex2ms, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"ex_syslogger": {:git, "https://github.com/slashmili/ex_syslogger.git", "f3963399047af17e038897c69e20d552e6899e1d", [tag: "1.4.0"]},
|
||||
"excoveralls": {:hex, :excoveralls, "0.11.1", "dd677fbdd49114fdbdbf445540ec735808250d56b011077798316505064edb2c", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"floki": {:hex, :floki, "0.20.4", "be42ac911fece24b4c72f3b5846774b6e61b83fe685c2fc9d62093277fb3bc86", [:mix], [{:html_entities, "~> 0.4.0", [hex: :html_entities, repo: "hexpm", optional: false]}, {:mochiweb, "~> 2.15", [hex: :mochiweb, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"floki": {:hex, :floki, "0.23.0", "956ab6dba828c96e732454809fb0bd8d43ce0979b75f34de6322e73d4c917829", [:mix], [{:html_entities, "~> 0.4.0", [hex: :html_entities, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"gen_smtp": {:hex, :gen_smtp, "0.14.0", "39846a03522456077c6429b4badfd1d55e5e7d0fdfb65e935b7c5e38549d9202", [:rebar3], [], "hexpm"},
|
||||
"gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},
|
||||
"gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"},
|
||||
@ -60,7 +60,7 @@
|
||||
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
|
||||
"mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
|
||||
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"},
|
||||
"oban": {:hex, :oban, "0.7.1", "171bdd1b69c1a4a839f8c768f5e962fc22d1de1513d459fb6b8e0cbd34817a9a", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"oban": {:hex, :oban, "0.8.1", "4bbf62eb1829f856d69aeb5069ac7036afe07db8221a17de2a9169cc7a58a318", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
|
||||
"pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
|
||||
"phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
@ -73,7 +73,8 @@
|
||||
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
|
||||
"plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
|
||||
"postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
|
||||
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"},
|
||||
"postgrex": {:hex, :postgrex, "0.15.1", "23ce3417de70f4c0e9e7419ad85bdabcc6860a6925fe2c6f3b1b5b1e8e47bf2f", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
|
||||
"prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"},
|
||||
"prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"prometheus_ex": {:hex, :prometheus_ex, "3.0.5", "fa58cfd983487fc5ead331e9a3e0aa622c67232b3ec71710ced122c4c453a02f", [:mix], [{:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
|
12
priv/repo/migrations/20190912065617_create_deliveries.exs
Normal file
12
priv/repo/migrations/20190912065617_create_deliveries.exs
Normal file
@ -0,0 +1,12 @@
|
||||
defmodule Pleroma.Repo.Migrations.CreateDeliveries do
|
||||
use Ecto.Migration
|
||||
|
||||
def change do
|
||||
create_if_not_exists table(:deliveries) do
|
||||
add(:object_id, references(:objects, type: :id), null: false)
|
||||
add(:user_id, references(:users, type: :uuid, on_delete: :delete_all), null: false)
|
||||
end
|
||||
create_if_not_exists index(:deliveries, :object_id, name: :deliveries_object_id)
|
||||
create_if_not_exists(unique_index(:deliveries, [:user_id, :object_id]))
|
||||
end
|
||||
end
|
11
priv/repo/migrations/20190917100019_update_oban.exs
Normal file
11
priv/repo/migrations/20190917100019_update_oban.exs
Normal file
@ -0,0 +1,11 @@
|
||||
defmodule Pleroma.Repo.Migrations.UpdateOban do
|
||||
use Ecto.Migration
|
||||
|
||||
def up do
|
||||
Oban.Migrations.up(version: 4)
|
||||
end
|
||||
|
||||
def down do
|
||||
Oban.Migrations.down(version: 2)
|
||||
end
|
||||
end
|
141
test/activity/ir/topics_test.exs
Normal file
141
test/activity/ir/topics_test.exs
Normal file
@ -0,0 +1,141 @@
|
||||
defmodule Pleroma.Activity.Ir.TopicsTest do
|
||||
use Pleroma.DataCase
|
||||
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Activity.Ir.Topics
|
||||
alias Pleroma.Object
|
||||
|
||||
require Pleroma.Constants
|
||||
|
||||
describe "poll answer" do
|
||||
test "produce no topics" do
|
||||
activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}
|
||||
|
||||
assert [] == Topics.get_activity_topics(activity)
|
||||
end
|
||||
end
|
||||
|
||||
describe "non poll answer" do
|
||||
test "always add user and list topics" do
|
||||
activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}}
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
assert Enum.member?(topics, "user")
|
||||
assert Enum.member?(topics, "list")
|
||||
end
|
||||
end
|
||||
|
||||
describe "public visibility" do
|
||||
setup do
|
||||
activity = %Activity{
|
||||
object: %Object{data: %{"type" => "Note"}},
|
||||
data: %{"to" => [Pleroma.Constants.as_public()]}
|
||||
}
|
||||
|
||||
{:ok, activity: activity}
|
||||
end
|
||||
|
||||
test "produces public topic", %{activity: activity} do
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
assert Enum.member?(topics, "public")
|
||||
end
|
||||
|
||||
test "local action produces public:local topic", %{activity: activity} do
|
||||
activity = %{activity | local: true}
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
assert Enum.member?(topics, "public:local")
|
||||
end
|
||||
|
||||
test "non-local action does not produce public:local topic", %{activity: activity} do
|
||||
activity = %{activity | local: false}
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
refute Enum.member?(topics, "public:local")
|
||||
end
|
||||
end
|
||||
|
||||
describe "public visibility create events" do
|
||||
setup do
|
||||
activity = %Activity{
|
||||
object: %Object{data: %{"type" => "Create", "attachment" => []}},
|
||||
data: %{"to" => [Pleroma.Constants.as_public()]}
|
||||
}
|
||||
|
||||
{:ok, activity: activity}
|
||||
end
|
||||
|
||||
test "with no attachments doesn't produce public:media topics", %{activity: activity} do
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
refute Enum.member?(topics, "public:media")
|
||||
refute Enum.member?(topics, "public:local:media")
|
||||
end
|
||||
|
||||
test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
|
||||
tagged_data = Map.put(data, "tag", ["foo", "bar"])
|
||||
activity = %{activity | object: %{object | data: tagged_data}}
|
||||
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
assert Enum.member?(topics, "hashtag:foo")
|
||||
assert Enum.member?(topics, "hashtag:bar")
|
||||
end
|
||||
|
||||
test "only converts strinngs to hash tags", %{
|
||||
activity: %{object: %{data: data} = object} = activity
|
||||
} do
|
||||
tagged_data = Map.put(data, "tag", [2])
|
||||
activity = %{activity | object: %{object | data: tagged_data}}
|
||||
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
refute Enum.member?(topics, "hashtag:2")
|
||||
end
|
||||
end
|
||||
|
||||
describe "public visibility create events with attachments" do
|
||||
setup do
|
||||
activity = %Activity{
|
||||
object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}},
|
||||
data: %{"to" => [Pleroma.Constants.as_public()]}
|
||||
}
|
||||
|
||||
{:ok, activity: activity}
|
||||
end
|
||||
|
||||
test "produce public:media topics", %{activity: activity} do
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
assert Enum.member?(topics, "public:media")
|
||||
end
|
||||
|
||||
test "local produces public:local:media topics", %{activity: activity} do
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
assert Enum.member?(topics, "public:local:media")
|
||||
end
|
||||
|
||||
test "non-local doesn't produce public:local:media topics", %{activity: activity} do
|
||||
activity = %{activity | local: false}
|
||||
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
refute Enum.member?(topics, "public:local:media")
|
||||
end
|
||||
end
|
||||
|
||||
describe "non-public visibility" do
|
||||
test "produces direct topic" do
|
||||
activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}}
|
||||
topics = Topics.get_activity_topics(activity)
|
||||
|
||||
assert Enum.member?(topics, "direct")
|
||||
refute Enum.member?(topics, "public")
|
||||
refute Enum.member?(topics, "public:local")
|
||||
refute Enum.member?(topics, "public:media")
|
||||
refute Enum.member?(topics, "public:local:media")
|
||||
end
|
||||
end
|
||||
end
|
@ -11,7 +11,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
|
||||
alias Pleroma.Integration.WebsocketClient
|
||||
alias Pleroma.Web.CommonAPI
|
||||
alias Pleroma.Web.OAuth
|
||||
alias Pleroma.Web.Streamer
|
||||
|
||||
@path Pleroma.Web.Endpoint.url()
|
||||
|> URI.parse()
|
||||
@ -19,14 +18,9 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
|
||||
|> Map.put(:path, "/api/v1/streaming")
|
||||
|> URI.to_string()
|
||||
|
||||
setup do
|
||||
GenServer.start(Streamer, %{}, name: Streamer)
|
||||
|
||||
on_exit(fn ->
|
||||
if pid = Process.whereis(Streamer) do
|
||||
Process.exit(pid, :kill)
|
||||
end
|
||||
end)
|
||||
setup_all do
|
||||
start_supervised(Pleroma.Web.Streamer.supervisor())
|
||||
:ok
|
||||
end
|
||||
|
||||
def start_socket(qs \\ nil, headers \\ []) do
|
||||
@ -43,6 +37,7 @@ test "refuses invalid requests" do
|
||||
capture_log(fn ->
|
||||
assert {:error, {400, _}} = start_socket()
|
||||
assert {:error, {404, _}} = start_socket("?stream=ncjdk")
|
||||
Process.sleep(30)
|
||||
end)
|
||||
end
|
||||
|
||||
@ -50,6 +45,7 @@ test "requires authentication and a valid token for protected streams" do
|
||||
capture_log(fn ->
|
||||
assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa")
|
||||
assert {:error, {403, _}} = start_socket("?stream=user")
|
||||
Process.sleep(30)
|
||||
end)
|
||||
end
|
||||
|
||||
@ -108,6 +104,7 @@ test "accepts the 'user' stream", %{token: token} = _state do
|
||||
|
||||
assert capture_log(fn ->
|
||||
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user")
|
||||
Process.sleep(30)
|
||||
end) =~ ":badarg"
|
||||
end
|
||||
|
||||
@ -116,6 +113,7 @@ test "accepts the 'user:notification' stream", %{token: token} = _state do
|
||||
|
||||
assert capture_log(fn ->
|
||||
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification")
|
||||
Process.sleep(30)
|
||||
end) =~ ":badarg"
|
||||
end
|
||||
|
||||
@ -125,6 +123,8 @@ test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
|
||||
assert capture_log(fn ->
|
||||
assert {:error, {403, "Forbidden"}} =
|
||||
start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])
|
||||
|
||||
Process.sleep(30)
|
||||
end) =~ ":badarg"
|
||||
end
|
||||
end
|
||||
|
@ -69,16 +69,7 @@ test "does not create a notification for subscribed users if status is a reply"
|
||||
end
|
||||
|
||||
describe "create_notification" do
|
||||
setup do
|
||||
GenServer.start(Streamer, %{}, name: Streamer)
|
||||
|
||||
on_exit(fn ->
|
||||
if pid = Process.whereis(Streamer) do
|
||||
Process.exit(pid, :kill)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
@tag needs_streamer: true
|
||||
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
|
||||
user = insert(:user)
|
||||
task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
|
||||
|
@ -40,6 +40,10 @@ defmodule Pleroma.Web.ConnCase do
|
||||
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
|
||||
end
|
||||
|
||||
if tags[:needs_streamer] do
|
||||
start_supervised(Pleroma.Web.Streamer.supervisor())
|
||||
end
|
||||
|
||||
{:ok, conn: Phoenix.ConnTest.build_conn()}
|
||||
end
|
||||
end
|
||||
|
@ -39,6 +39,10 @@ defmodule Pleroma.DataCase do
|
||||
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
|
||||
end
|
||||
|
||||
if tags[:needs_streamer] do
|
||||
start_supervised(Pleroma.Web.Streamer.supervisor())
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
|
@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
|
||||
|
||||
import Pleroma.Factory
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Delivery
|
||||
alias Pleroma.Instances
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Tests.ObanHelpers
|
||||
@ -893,4 +894,86 @@ test "it works for more than 10 users", %{conn: conn} do
|
||||
assert result["totalItems"] == 15
|
||||
end
|
||||
end
|
||||
|
||||
describe "delivery tracking" do
|
||||
test "it tracks a signed object fetch", %{conn: conn} do
|
||||
user = insert(:user, local: false)
|
||||
activity = insert(:note_activity)
|
||||
object = Object.normalize(activity)
|
||||
|
||||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
|
||||
|
||||
conn
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, user)
|
||||
|> get(object_path)
|
||||
|> json_response(200)
|
||||
|
||||
assert Delivery.get(object.id, user.id)
|
||||
end
|
||||
|
||||
test "it tracks a signed activity fetch", %{conn: conn} do
|
||||
user = insert(:user, local: false)
|
||||
activity = insert(:note_activity)
|
||||
object = Object.normalize(activity)
|
||||
|
||||
activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
|
||||
|
||||
conn
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, user)
|
||||
|> get(activity_path)
|
||||
|> json_response(200)
|
||||
|
||||
assert Delivery.get(object.id, user.id)
|
||||
end
|
||||
|
||||
test "it tracks a signed object fetch when the json is cached", %{conn: conn} do
|
||||
user = insert(:user, local: false)
|
||||
other_user = insert(:user, local: false)
|
||||
activity = insert(:note_activity)
|
||||
object = Object.normalize(activity)
|
||||
|
||||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
|
||||
|
||||
conn
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, user)
|
||||
|> get(object_path)
|
||||
|> json_response(200)
|
||||
|
||||
build_conn()
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, other_user)
|
||||
|> get(object_path)
|
||||
|> json_response(200)
|
||||
|
||||
assert Delivery.get(object.id, user.id)
|
||||
assert Delivery.get(object.id, other_user.id)
|
||||
end
|
||||
|
||||
test "it tracks a signed activity fetch when the json is cached", %{conn: conn} do
|
||||
user = insert(:user, local: false)
|
||||
other_user = insert(:user, local: false)
|
||||
activity = insert(:note_activity)
|
||||
object = Object.normalize(activity)
|
||||
|
||||
activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
|
||||
|
||||
conn
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, user)
|
||||
|> get(activity_path)
|
||||
|> json_response(200)
|
||||
|
||||
build_conn()
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, other_user)
|
||||
|> get(activity_path)
|
||||
|> json_response(200)
|
||||
|
||||
assert Delivery.get(object.id, user.id)
|
||||
assert Delivery.get(object.id, other_user.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -38,9 +38,7 @@ test "it streams them out" do
|
||||
stream: fn _, _ -> nil end do
|
||||
ActivityPub.stream_out_participations(conversation.participations)
|
||||
|
||||
Enum.each(participations, fn participation ->
|
||||
assert called(Pleroma.Web.Streamer.stream("participation", participation))
|
||||
end)
|
||||
assert called(Pleroma.Web.Streamer.stream("participation", participations))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -3,7 +3,7 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.ActivityPub.PublisherTest do
|
||||
use Pleroma.DataCase
|
||||
use Pleroma.Web.ConnCase
|
||||
|
||||
import ExUnit.CaptureLog
|
||||
import Pleroma.Factory
|
||||
@ -12,7 +12,9 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
|
||||
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Instances
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Web.ActivityPub.Publisher
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
@as_public "https://www.w3.org/ns/activitystreams#Public"
|
||||
|
||||
@ -268,5 +270,69 @@ test "it returns inbox for messages involving single recipients in total" do
|
||||
})
|
||||
)
|
||||
end
|
||||
|
||||
test_with_mock "publishes a delete activity to peers who signed fetch requests to the create acitvity/object.",
|
||||
Pleroma.Web.Federator.Publisher,
|
||||
[:passthrough],
|
||||
[] do
|
||||
fetcher =
|
||||
insert(:user,
|
||||
local: false,
|
||||
info: %{
|
||||
ap_enabled: true,
|
||||
source_data: %{"inbox" => "https://domain.com/users/nick1/inbox"}
|
||||
}
|
||||
)
|
||||
|
||||
another_fetcher =
|
||||
insert(:user,
|
||||
local: false,
|
||||
info: %{
|
||||
ap_enabled: true,
|
||||
source_data: %{"inbox" => "https://domain2.com/users/nick1/inbox"}
|
||||
}
|
||||
)
|
||||
|
||||
actor = insert(:user)
|
||||
|
||||
note_activity = insert(:note_activity, user: actor)
|
||||
object = Object.normalize(note_activity)
|
||||
|
||||
activity_path = String.trim_leading(note_activity.data["id"], Pleroma.Web.Endpoint.url())
|
||||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
|
||||
|
||||
build_conn()
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, fetcher)
|
||||
|> get(object_path)
|
||||
|> json_response(200)
|
||||
|
||||
build_conn()
|
||||
|> put_req_header("accept", "application/activity+json")
|
||||
|> assign(:user, another_fetcher)
|
||||
|> get(activity_path)
|
||||
|> json_response(200)
|
||||
|
||||
{:ok, delete} = CommonAPI.delete(note_activity.id, actor)
|
||||
|
||||
res = Publisher.publish(actor, delete)
|
||||
assert res == :ok
|
||||
|
||||
assert called(
|
||||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
actor_id: actor.id,
|
||||
id: delete.data["id"]
|
||||
})
|
||||
)
|
||||
|
||||
assert called(
|
||||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
|
||||
inbox: "https://domain2.com/users/nick1/inbox",
|
||||
actor_id: actor.id,
|
||||
id: delete.data["id"]
|
||||
})
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -87,6 +87,18 @@ test "works with an object that has only IR tags" do
|
||||
|
||||
assert Utils.determine_explicit_mentions(object) == []
|
||||
end
|
||||
|
||||
test "works with an object has tags as map" do
|
||||
object = %{
|
||||
"tag" => %{
|
||||
"type" => "Mention",
|
||||
"href" => "https://example.com/~alyssa",
|
||||
"name" => "Alyssa P. Hacker"
|
||||
}
|
||||
}
|
||||
|
||||
assert Utils.determine_explicit_mentions(object) == ["https://example.com/~alyssa"]
|
||||
end
|
||||
end
|
||||
|
||||
describe "make_unlike_data/3" do
|
||||
@ -300,8 +312,8 @@ test "updates the state of all Follow activities with the same actor and object"
|
||||
{:ok, follow_activity_two} =
|
||||
Utils.update_follow_state_for_all(follow_activity_two, "accept")
|
||||
|
||||
assert Repo.get(Activity, follow_activity.id).data["state"] == "accept"
|
||||
assert Repo.get(Activity, follow_activity_two.id).data["state"] == "accept"
|
||||
assert refresh_record(follow_activity).data["state"] == "accept"
|
||||
assert refresh_record(follow_activity_two).data["state"] == "accept"
|
||||
end
|
||||
end
|
||||
|
||||
@ -323,8 +335,8 @@ test "updates the state of the given follow activity" do
|
||||
|
||||
{:ok, follow_activity_two} = Utils.update_follow_state(follow_activity_two, "reject")
|
||||
|
||||
assert Repo.get(Activity, follow_activity.id).data["state"] == "pending"
|
||||
assert Repo.get(Activity, follow_activity_two.id).data["state"] == "reject"
|
||||
assert refresh_record(follow_activity).data["state"] == "pending"
|
||||
assert refresh_record(follow_activity_two).data["state"] == "reject"
|
||||
end
|
||||
end
|
||||
|
||||
@ -401,4 +413,216 @@ test "fetches existing like" do
|
||||
assert ^like_activity = Utils.get_existing_like(user.ap_id, object)
|
||||
end
|
||||
end
|
||||
|
||||
describe "get_get_existing_announce/2" do
|
||||
test "returns nil if announce not found" do
|
||||
actor = insert(:user)
|
||||
refute Utils.get_existing_announce(actor.ap_id, %{data: %{"id" => "test"}})
|
||||
end
|
||||
|
||||
test "fetches existing announce" do
|
||||
note_activity = insert(:note_activity)
|
||||
assert object = Object.normalize(note_activity)
|
||||
actor = insert(:user)
|
||||
|
||||
{:ok, announce, _object} = ActivityPub.announce(actor, object)
|
||||
assert Utils.get_existing_announce(actor.ap_id, object) == announce
|
||||
end
|
||||
end
|
||||
|
||||
describe "fetch_latest_block/2" do
|
||||
test "fetches last block activities" do
|
||||
user1 = insert(:user)
|
||||
user2 = insert(:user)
|
||||
|
||||
assert {:ok, %Activity{} = _} = ActivityPub.block(user1, user2)
|
||||
assert {:ok, %Activity{} = _} = ActivityPub.block(user1, user2)
|
||||
assert {:ok, %Activity{} = activity} = ActivityPub.block(user1, user2)
|
||||
|
||||
assert Utils.fetch_latest_block(user1, user2) == activity
|
||||
end
|
||||
end
|
||||
|
||||
describe "recipient_in_message/3" do
|
||||
test "returns true when recipient in `to`" do
|
||||
recipient = insert(:user)
|
||||
actor = insert(:user)
|
||||
assert Utils.recipient_in_message(recipient, actor, %{"to" => recipient.ap_id})
|
||||
|
||||
assert Utils.recipient_in_message(
|
||||
recipient,
|
||||
actor,
|
||||
%{"to" => [recipient.ap_id], "cc" => ""}
|
||||
)
|
||||
end
|
||||
|
||||
test "returns true when recipient in `cc`" do
|
||||
recipient = insert(:user)
|
||||
actor = insert(:user)
|
||||
assert Utils.recipient_in_message(recipient, actor, %{"cc" => recipient.ap_id})
|
||||
|
||||
assert Utils.recipient_in_message(
|
||||
recipient,
|
||||
actor,
|
||||
%{"cc" => [recipient.ap_id], "to" => ""}
|
||||
)
|
||||
end
|
||||
|
||||
test "returns true when recipient in `bto`" do
|
||||
recipient = insert(:user)
|
||||
actor = insert(:user)
|
||||
assert Utils.recipient_in_message(recipient, actor, %{"bto" => recipient.ap_id})
|
||||
|
||||
assert Utils.recipient_in_message(
|
||||
recipient,
|
||||
actor,
|
||||
%{"bcc" => "", "bto" => [recipient.ap_id]}
|
||||
)
|
||||
end
|
||||
|
||||
test "returns true when recipient in `bcc`" do
|
||||
recipient = insert(:user)
|
||||
actor = insert(:user)
|
||||
assert Utils.recipient_in_message(recipient, actor, %{"bcc" => recipient.ap_id})
|
||||
|
||||
assert Utils.recipient_in_message(
|
||||
recipient,
|
||||
actor,
|
||||
%{"bto" => "", "bcc" => [recipient.ap_id]}
|
||||
)
|
||||
end
|
||||
|
||||
test "returns true when message without addresses fields" do
|
||||
recipient = insert(:user)
|
||||
actor = insert(:user)
|
||||
assert Utils.recipient_in_message(recipient, actor, %{"bccc" => recipient.ap_id})
|
||||
|
||||
assert Utils.recipient_in_message(
|
||||
recipient,
|
||||
actor,
|
||||
%{"btod" => "", "bccc" => [recipient.ap_id]}
|
||||
)
|
||||
end
|
||||
|
||||
test "returns false" do
|
||||
recipient = insert(:user)
|
||||
actor = insert(:user)
|
||||
refute Utils.recipient_in_message(recipient, actor, %{"to" => "ap_id"})
|
||||
end
|
||||
end
|
||||
|
||||
describe "lazy_put_activity_defaults/2" do
|
||||
test "returns map with id and published data" do
|
||||
note_activity = insert(:note_activity)
|
||||
object = Object.normalize(note_activity)
|
||||
res = Utils.lazy_put_activity_defaults(%{"context" => object.data["id"]})
|
||||
assert res["context"] == object.data["id"]
|
||||
assert res["context_id"] == object.id
|
||||
assert res["id"]
|
||||
assert res["published"]
|
||||
end
|
||||
|
||||
test "returns map with fake id and published data" do
|
||||
assert %{
|
||||
"context" => "pleroma:fakecontext",
|
||||
"context_id" => -1,
|
||||
"id" => "pleroma:fakeid",
|
||||
"published" => _
|
||||
} = Utils.lazy_put_activity_defaults(%{}, true)
|
||||
end
|
||||
|
||||
test "returns activity data with object" do
|
||||
note_activity = insert(:note_activity)
|
||||
object = Object.normalize(note_activity)
|
||||
|
||||
res =
|
||||
Utils.lazy_put_activity_defaults(%{
|
||||
"context" => object.data["id"],
|
||||
"object" => %{}
|
||||
})
|
||||
|
||||
assert res["context"] == object.data["id"]
|
||||
assert res["context_id"] == object.id
|
||||
assert res["id"]
|
||||
assert res["published"]
|
||||
assert res["object"]["id"]
|
||||
assert res["object"]["published"]
|
||||
assert res["object"]["context"] == object.data["id"]
|
||||
assert res["object"]["context_id"] == object.id
|
||||
end
|
||||
end
|
||||
|
||||
describe "make_flag_data" do
|
||||
test "returns empty map when params is invalid" do
|
||||
assert Utils.make_flag_data(%{}, %{}) == %{}
|
||||
end
|
||||
|
||||
test "returns map with Flag object" do
|
||||
reporter = insert(:user)
|
||||
target_account = insert(:user)
|
||||
{:ok, activity} = CommonAPI.post(target_account, %{"status" => "foobar"})
|
||||
context = Utils.generate_context_id()
|
||||
content = "foobar"
|
||||
|
||||
target_ap_id = target_account.ap_id
|
||||
activity_ap_id = activity.data["id"]
|
||||
|
||||
res =
|
||||
Utils.make_flag_data(
|
||||
%{
|
||||
actor: reporter,
|
||||
context: context,
|
||||
account: target_account,
|
||||
statuses: [%{"id" => activity.data["id"]}],
|
||||
content: content
|
||||
},
|
||||
%{}
|
||||
)
|
||||
|
||||
assert %{
|
||||
"type" => "Flag",
|
||||
"content" => ^content,
|
||||
"context" => ^context,
|
||||
"object" => [^target_ap_id, ^activity_ap_id],
|
||||
"state" => "open"
|
||||
} = res
|
||||
end
|
||||
end
|
||||
|
||||
describe "add_announce_to_object/2" do
|
||||
test "adds actor to announcement" do
|
||||
user = insert(:user)
|
||||
object = insert(:note)
|
||||
|
||||
activity =
|
||||
insert(:note_activity,
|
||||
data: %{
|
||||
"actor" => user.ap_id,
|
||||
"cc" => [Pleroma.Constants.as_public()]
|
||||
}
|
||||
)
|
||||
|
||||
assert {:ok, updated_object} = Utils.add_announce_to_object(activity, object)
|
||||
assert updated_object.data["announcements"] == [user.ap_id]
|
||||
assert updated_object.data["announcement_count"] == 1
|
||||
end
|
||||
end
|
||||
|
||||
describe "remove_announce_from_object/2" do
|
||||
test "removes actor from announcements" do
|
||||
user = insert(:user)
|
||||
user2 = insert(:user)
|
||||
|
||||
object =
|
||||
insert(:note,
|
||||
data: %{"announcements" => [user.ap_id, user2.ap_id], "announcement_count" => 2}
|
||||
)
|
||||
|
||||
activity = insert(:note_activity, data: %{"actor" => user.ap_id})
|
||||
|
||||
assert {:ok, updated_object} = Utils.remove_announce_from_object(activity, object)
|
||||
assert updated_object.data["announcements"] == [user2.ap_id]
|
||||
assert updated_object.data["announcement_count"] == 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -752,7 +752,7 @@ test "get statuses by IDs", %{conn: conn} do
|
||||
query_string = "ids[]=#{id1}&ids[]=#{id2}"
|
||||
conn = get(conn, "/api/v1/statuses/?#{query_string}")
|
||||
|
||||
assert [%{"id" => ^id1}, %{"id" => ^id2}] = json_response(conn, :ok)
|
||||
assert [%{"id" => ^id1}, %{"id" => ^id2}] = Enum.sort_by(json_response(conn, :ok), & &1["id"])
|
||||
end
|
||||
|
||||
describe "deleting a status" do
|
||||
|
36
test/web/streamer/ping_test.exs
Normal file
36
test/web/streamer/ping_test.exs
Normal file
@ -0,0 +1,36 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.PingTest do
|
||||
use Pleroma.DataCase
|
||||
|
||||
import Pleroma.Factory
|
||||
alias Pleroma.Web.Streamer
|
||||
|
||||
setup do
|
||||
start_supervised({Streamer.supervisor(), [ping_interval: 30]})
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
describe "sockets" do
|
||||
setup do
|
||||
user = insert(:user)
|
||||
{:ok, %{user: user}}
|
||||
end
|
||||
|
||||
test "it sends pings", %{user: user} do
|
||||
task =
|
||||
Task.async(fn ->
|
||||
assert_receive {:text, received_event}, 40
|
||||
assert_receive {:text, received_event}, 40
|
||||
assert_receive {:text, received_event}, 40
|
||||
end)
|
||||
|
||||
Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
end
|
||||
end
|
54
test/web/streamer/state_test.exs
Normal file
54
test/web/streamer/state_test.exs
Normal file
@ -0,0 +1,54 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.StateTest do
|
||||
use Pleroma.DataCase
|
||||
|
||||
import Pleroma.Factory
|
||||
alias Pleroma.Web.Streamer
|
||||
alias Pleroma.Web.Streamer.StreamerSocket
|
||||
|
||||
@moduletag needs_streamer: true
|
||||
|
||||
describe "sockets" do
|
||||
setup do
|
||||
user = insert(:user)
|
||||
user2 = insert(:user)
|
||||
{:ok, %{user: user, user2: user2}}
|
||||
end
|
||||
|
||||
test "it can add a socket", %{user: user} do
|
||||
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
|
||||
|
||||
assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
|
||||
end
|
||||
|
||||
test "it can add multiple sockets per user", %{user: user} do
|
||||
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
|
||||
Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})
|
||||
|
||||
assert(
|
||||
%{
|
||||
"public" => [
|
||||
%StreamerSocket{transport_pid: 2},
|
||||
%StreamerSocket{transport_pid: 1}
|
||||
]
|
||||
} = Streamer.get_sockets()
|
||||
)
|
||||
end
|
||||
|
||||
test "it will not add a duplicate socket", %{user: user} do
|
||||
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
|
||||
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
|
||||
|
||||
assert(
|
||||
%{
|
||||
"activity" => [
|
||||
%StreamerSocket{transport_pid: 1}
|
||||
]
|
||||
} = Streamer.get_sockets()
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
@ -5,24 +5,20 @@
|
||||
defmodule Pleroma.Web.StreamerTest do
|
||||
use Pleroma.DataCase
|
||||
|
||||
import Pleroma.Factory
|
||||
|
||||
alias Pleroma.List
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI
|
||||
alias Pleroma.Web.Streamer
|
||||
import Pleroma.Factory
|
||||
alias Pleroma.Web.Streamer.StreamerSocket
|
||||
alias Pleroma.Web.Streamer.Worker
|
||||
|
||||
@moduletag needs_streamer: true
|
||||
clear_config_all([:instance, :skip_thread_containment])
|
||||
|
||||
describe "user streams" do
|
||||
setup do
|
||||
GenServer.start(Streamer, %{}, name: Streamer)
|
||||
|
||||
on_exit(fn ->
|
||||
if pid = Process.whereis(Streamer) do
|
||||
Process.exit(pid, :kill)
|
||||
end
|
||||
end)
|
||||
|
||||
user = insert(:user)
|
||||
notify = insert(:notification, user: user, activity: build(:note_activity))
|
||||
{:ok, %{user: user, notify: notify}}
|
||||
@ -125,11 +121,9 @@ test "it sends to public" do
|
||||
assert_receive {:text, _}, 4_000
|
||||
end)
|
||||
|
||||
fake_socket = %{
|
||||
fake_socket = %StreamerSocket{
|
||||
transport_pid: task.pid,
|
||||
assigns: %{
|
||||
user: user
|
||||
}
|
||||
user: user
|
||||
}
|
||||
|
||||
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
|
||||
@ -138,7 +132,7 @@ test "it sends to public" do
|
||||
"public" => [fake_socket]
|
||||
}
|
||||
|
||||
Streamer.push_to_socket(topics, "public", activity)
|
||||
Worker.push_to_socket(topics, "public", activity)
|
||||
|
||||
Task.await(task)
|
||||
|
||||
@ -155,11 +149,9 @@ test "it sends to public" do
|
||||
assert received_event == expected_event
|
||||
end)
|
||||
|
||||
fake_socket = %{
|
||||
fake_socket = %StreamerSocket{
|
||||
transport_pid: task.pid,
|
||||
assigns: %{
|
||||
user: user
|
||||
}
|
||||
user: user
|
||||
}
|
||||
|
||||
{:ok, activity} = CommonAPI.delete(activity.id, other_user)
|
||||
@ -168,7 +160,7 @@ test "it sends to public" do
|
||||
"public" => [fake_socket]
|
||||
}
|
||||
|
||||
Streamer.push_to_socket(topics, "public", activity)
|
||||
Worker.push_to_socket(topics, "public", activity)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -189,9 +181,9 @@ test "it doesn't send to user if recipients invalid and thread containment is en
|
||||
)
|
||||
|
||||
task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
|
||||
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
|
||||
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
|
||||
topics = %{"public" => [fake_socket]}
|
||||
Streamer.push_to_socket(topics, "public", activity)
|
||||
Worker.push_to_socket(topics, "public", activity)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -211,9 +203,9 @@ test "it sends message if recipients invalid and thread containment is disabled"
|
||||
)
|
||||
|
||||
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
|
||||
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
|
||||
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
|
||||
topics = %{"public" => [fake_socket]}
|
||||
Streamer.push_to_socket(topics, "public", activity)
|
||||
Worker.push_to_socket(topics, "public", activity)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -233,9 +225,9 @@ test "it sends message if recipients invalid and thread containment is enabled b
|
||||
)
|
||||
|
||||
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
|
||||
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
|
||||
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
|
||||
topics = %{"public" => [fake_socket]}
|
||||
Streamer.push_to_socket(topics, "public", activity)
|
||||
Worker.push_to_socket(topics, "public", activity)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -251,11 +243,9 @@ test "it doesn't send to blocked users" do
|
||||
refute_receive {:text, _}, 1_000
|
||||
end)
|
||||
|
||||
fake_socket = %{
|
||||
fake_socket = %StreamerSocket{
|
||||
transport_pid: task.pid,
|
||||
assigns: %{
|
||||
user: user
|
||||
}
|
||||
user: user
|
||||
}
|
||||
|
||||
{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
|
||||
@ -264,7 +254,7 @@ test "it doesn't send to blocked users" do
|
||||
"public" => [fake_socket]
|
||||
}
|
||||
|
||||
Streamer.push_to_socket(topics, "public", activity)
|
||||
Worker.push_to_socket(topics, "public", activity)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -284,11 +274,9 @@ test "it doesn't send unwanted DMs to list" do
|
||||
refute_receive {:text, _}, 1_000
|
||||
end)
|
||||
|
||||
fake_socket = %{
|
||||
fake_socket = %StreamerSocket{
|
||||
transport_pid: task.pid,
|
||||
assigns: %{
|
||||
user: user_a
|
||||
}
|
||||
user: user_a
|
||||
}
|
||||
|
||||
{:ok, activity} =
|
||||
@ -301,7 +289,7 @@ test "it doesn't send unwanted DMs to list" do
|
||||
"list:#{list.id}" => [fake_socket]
|
||||
}
|
||||
|
||||
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
|
||||
Worker.handle_call({:stream, "list", activity}, self(), topics)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -318,11 +306,9 @@ test "it doesn't send unwanted private posts to list" do
|
||||
refute_receive {:text, _}, 1_000
|
||||
end)
|
||||
|
||||
fake_socket = %{
|
||||
fake_socket = %StreamerSocket{
|
||||
transport_pid: task.pid,
|
||||
assigns: %{
|
||||
user: user_a
|
||||
}
|
||||
user: user_a
|
||||
}
|
||||
|
||||
{:ok, activity} =
|
||||
@ -335,12 +321,12 @@ test "it doesn't send unwanted private posts to list" do
|
||||
"list:#{list.id}" => [fake_socket]
|
||||
}
|
||||
|
||||
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
|
||||
Worker.handle_call({:stream, "list", activity}, self(), topics)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
|
||||
test "it send wanted private posts to list" do
|
||||
test "it sends wanted private posts to list" do
|
||||
user_a = insert(:user)
|
||||
user_b = insert(:user)
|
||||
|
||||
@ -354,11 +340,9 @@ test "it send wanted private posts to list" do
|
||||
assert_receive {:text, _}, 1_000
|
||||
end)
|
||||
|
||||
fake_socket = %{
|
||||
fake_socket = %StreamerSocket{
|
||||
transport_pid: task.pid,
|
||||
assigns: %{
|
||||
user: user_a
|
||||
}
|
||||
user: user_a
|
||||
}
|
||||
|
||||
{:ok, activity} =
|
||||
@ -367,11 +351,12 @@ test "it send wanted private posts to list" do
|
||||
"visibility" => "private"
|
||||
})
|
||||
|
||||
topics = %{
|
||||
"list:#{list.id}" => [fake_socket]
|
||||
}
|
||||
Streamer.add_socket(
|
||||
"list:#{list.id}",
|
||||
fake_socket
|
||||
)
|
||||
|
||||
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
|
||||
Worker.handle_call({:stream, "list", activity}, self(), %{})
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -387,11 +372,9 @@ test "it doesn't send muted reblogs" do
|
||||
refute_receive {:text, _}, 1_000
|
||||
end)
|
||||
|
||||
fake_socket = %{
|
||||
fake_socket = %StreamerSocket{
|
||||
transport_pid: task.pid,
|
||||
assigns: %{
|
||||
user: user1
|
||||
}
|
||||
user: user1
|
||||
}
|
||||
|
||||
{:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
|
||||
@ -401,7 +384,7 @@ test "it doesn't send muted reblogs" do
|
||||
"public" => [fake_socket]
|
||||
}
|
||||
|
||||
Streamer.push_to_socket(topics, "public", announce_activity)
|
||||
Worker.push_to_socket(topics, "public", announce_activity)
|
||||
|
||||
Task.await(task)
|
||||
end
|
||||
@ -417,6 +400,8 @@ test "it doesn't send posts from muted threads" do
|
||||
|
||||
task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
|
||||
|
||||
Process.sleep(4000)
|
||||
|
||||
Streamer.add_socket(
|
||||
"user",
|
||||
%{transport_pid: task.pid, assigns: %{user: user2}}
|
||||
@ -428,14 +413,6 @@ test "it doesn't send posts from muted threads" do
|
||||
|
||||
describe "direct streams" do
|
||||
setup do
|
||||
GenServer.start(Streamer, %{}, name: Streamer)
|
||||
|
||||
on_exit(fn ->
|
||||
if pid = Process.whereis(Streamer) do
|
||||
Process.exit(pid, :kill)
|
||||
end
|
||||
end)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
@ -480,6 +457,8 @@ test "it doesn't send conversation update to the 'direct' streamj when the last
|
||||
refute_receive {:text, _}, 4_000
|
||||
end)
|
||||
|
||||
Process.sleep(1000)
|
||||
|
||||
Streamer.add_socket(
|
||||
"direct",
|
||||
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||
@ -521,6 +500,8 @@ test "it sends conversation update to the 'direct' stream when a message is dele
|
||||
assert last_status["id"] == to_string(create_activity.id)
|
||||
end)
|
||||
|
||||
Process.sleep(1000)
|
||||
|
||||
Streamer.add_socket(
|
||||
"direct",
|
||||
%{transport_pid: task.pid, assigns: %{user: user}}
|
Loading…
Reference in New Issue
Block a user