138 lines
3.7 KiB
Elixir
138 lines
3.7 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Web.FedSockets.FedSocket do
|
|
@moduledoc """
|
|
The FedSocket module abstracts the actions to be taken taken on connections regardless of
|
|
whether the connection started as inbound or outbound.
|
|
|
|
|
|
Normally outside modules will have no need to call the FedSocket module directly.
|
|
"""
|
|
|
|
alias Pleroma.Object
|
|
alias Pleroma.Object.Containment
|
|
alias Pleroma.User
|
|
alias Pleroma.Web.ActivityPub.ObjectView
|
|
alias Pleroma.Web.ActivityPub.UserView
|
|
alias Pleroma.Web.ActivityPub.Visibility
|
|
alias Pleroma.Web.FedSockets.FetchRegistry
|
|
alias Pleroma.Web.FedSockets.IngesterWorker
|
|
alias Pleroma.Web.FedSockets.OutgoingHandler
|
|
alias Pleroma.Web.FedSockets.SocketInfo
|
|
|
|
require Logger
|
|
|
|
@shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
|
|
|
|
def connect_to_host(uri) do
|
|
case OutgoingHandler.start_link(uri) do
|
|
{:ok, pid} ->
|
|
{:ok, pid}
|
|
|
|
error ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
def close(%SocketInfo{pid: socket_pid}),
|
|
do: Process.send(socket_pid, :close, [])
|
|
|
|
def publish(%SocketInfo{pid: socket_pid}, json) do
|
|
%{action: :publish, data: json}
|
|
|> Jason.encode!()
|
|
|> send_packet(socket_pid)
|
|
end
|
|
|
|
def fetch(%SocketInfo{pid: socket_pid}, id) do
|
|
fetch_uuid = FetchRegistry.register_fetch(id)
|
|
|
|
%{action: :fetch, data: id, uuid: fetch_uuid}
|
|
|> Jason.encode!()
|
|
|> send_packet(socket_pid)
|
|
|
|
wait_for_fetch_to_return(fetch_uuid, 0)
|
|
end
|
|
|
|
def receive_package(%SocketInfo{} = fed_socket, json) do
|
|
json
|
|
|> Jason.decode!()
|
|
|> process_package(fed_socket)
|
|
end
|
|
|
|
defp wait_for_fetch_to_return(uuid, cntr) do
|
|
case FetchRegistry.check_fetch(uuid) do
|
|
{:error, :waiting} ->
|
|
Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
|
|
wait_for_fetch_to_return(uuid, cntr + 1)
|
|
|
|
{:error, :missing} ->
|
|
Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
|
|
{:error, :timeout}
|
|
|
|
{:ok, _fr} ->
|
|
FetchRegistry.pop_fetch(uuid)
|
|
end
|
|
end
|
|
|
|
defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
|
|
if Containment.contain_origin(origin, data) do
|
|
IngesterWorker.enqueue("ingest", %{"object" => data})
|
|
end
|
|
|
|
{:reply, %{"action" => "publish_reply", "status" => "processed"}}
|
|
end
|
|
|
|
defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
|
|
FetchRegistry.register_fetch_received(uuid, data)
|
|
{:noreply, nil}
|
|
end
|
|
|
|
defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
|
|
{:ok, data} = render_fetched_data(ap_id, uuid)
|
|
{:reply, data}
|
|
end
|
|
|
|
defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
|
|
{:noreply, nil}
|
|
end
|
|
|
|
defp process_package(other, _fed_socket) do
|
|
Logger.warn("unknown json packages received #{inspect(other)}")
|
|
{:noreply, nil}
|
|
end
|
|
|
|
defp render_fetched_data(ap_id, uuid) do
|
|
{:ok,
|
|
%{
|
|
"action" => "fetch_reply",
|
|
"status" => "processed",
|
|
"uuid" => uuid,
|
|
"data" => represent_item(ap_id)
|
|
}}
|
|
end
|
|
|
|
defp represent_item(ap_id) do
|
|
case User.get_by_ap_id(ap_id) do
|
|
nil ->
|
|
object = Object.get_cached_by_ap_id(ap_id)
|
|
|
|
if Visibility.is_public?(object) do
|
|
Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
|
|
else
|
|
nil
|
|
end
|
|
|
|
user ->
|
|
Phoenix.View.render_to_string(UserView, "user.json", user: user)
|
|
end
|
|
end
|
|
|
|
defp send_packet(data, socket_pid) do
|
|
Process.send(socket_pid, {:send, data}, [])
|
|
end
|
|
|
|
def shake, do: @shake
|
|
end
|