Add basic queue prioritization.

This commit is contained in:
Lain Iwakura 2017-12-05 18:21:30 +01:00
parent e7c2472abd
commit 66c3813ea6
2 changed files with 35 additions and 6 deletions

View File

@ -15,8 +15,8 @@ def start_link do
enqueue(:refresh_subscriptions, nil) enqueue(:refresh_subscriptions, nil)
end) end)
GenServer.start_link(__MODULE__, %{ GenServer.start_link(__MODULE__, %{
in: {:sets.new(), :queue.new()}, in: {:sets.new(), [],
out: {:sets.new(), :queue.new()} out: {:sets.new(), []}
}, name: __MODULE__) }, name: __MODULE__)
end end
@ -88,8 +88,8 @@ def enqueue(type, payload) do
end end
def maybe_start_job(running_jobs, queue) do def maybe_start_job(running_jobs, queue) do
if (:sets.size(running_jobs) < @max_jobs) && !:queue.is_empty(queue) do if (:sets.size(running_jobs) < @max_jobs) && queue != [] do
{{:value, {type, payload}}, queue} = :queue.out(queue) {{:value, {type, payload}}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> handle(type, payload) end) {:ok, pid} = Task.start(fn -> handle(type, payload) end)
mref = Process.monitor(pid) mref = Process.monitor(pid)
{:sets.add_element(mref, running_jobs), queue} {:sets.add_element(mref, running_jobs), queue}
@ -100,14 +100,14 @@ def maybe_start_job(running_jobs, queue) do
def handle_cast({:enqueue, type, payload}, state) when type in [:incoming_doc] do def handle_cast({:enqueue, type, payload}, state) when type in [:incoming_doc] do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
i_queue = :queue.in({type, payload}, i_queue) i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end end
def handle_cast({:enqueue, type, payload}, state) do def handle_cast({:enqueue, type, payload}, state) do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
o_queue = :queue.in({type, payload}, o_queue) o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end end
@ -126,4 +126,13 @@ def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end end
def enqueue_sorted(queue, element, priority) do
[%{item: element, priority: priority} | queue]
|> Enum.sort_by(fn (%{priority: priority}) -> priority end)
end
def queue_pop([%{item: element} | queue]) do
{element, queue}
end
end end

View File

@ -0,0 +1,20 @@
defmodule Pleroma.Web.FederatorTest do
alias Pleroma.Web.Federator
use Pleroma.DataCase
test "enqueues an element according to priority" do
queue = [%{item: 1, priority: 2}]
new_queue = Federator.enqueue_sorted(queue, 2, 1)
assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
new_queue = Federator.enqueue_sorted(queue, 2, 3)
assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}]
end
test "pop first item" do
queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
assert {2, [%{item: 1, priority: 2}]} = Federator.queue_pop(queue)
end
end