[#3213] Added query options support for Repo.chunk_stream/4.

Used infinite timeout in transfer_hashtags select query.
This commit is contained in:
Ivan Tashkinov 2020-12-31 09:36:26 +03:00
parent 8d1a0c1afd
commit 367f0c31c3
2 changed files with 8 additions and 9 deletions

View File

@ -149,9 +149,9 @@ def run(["transfer_hashtags"]) do
tag: fragment("(?)->>'tag'", object.data) tag: fragment("(?)->>'tag'", object.data)
} }
) )
|> Pleroma.Repo.chunk_stream(100, :batches) |> Repo.chunk_stream(100, :batches, timeout: :infinity)
|> Stream.each(fn objects -> |> Stream.each(fn objects ->
Logger.info("Processing #{length(objects)} objects...") Logger.info("Processing #{length(objects)} objects starting from id #{hd(objects).id}...")
Enum.map( Enum.map(
objects, objects,
@ -165,10 +165,9 @@ def run(["transfer_hashtags"]) do
with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
for hashtag_record <- hashtag_records do for hashtag_record <- hashtag_records do
with {:ok, _} <- with {:ok, _} <-
Ecto.Adapters.SQL.query( Repo.query(
Repo, "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
"insert into hashtags_objects(hashtag_id, object_id) values " <> [hashtag_record.id, object.id]
"(#{hashtag_record.id}, #{object.id});"
) do ) do
:noop :noop
else else

View File

@ -63,8 +63,8 @@ def get_assoc(resource, association) do
iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches) iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
""" """
@spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t() @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
def chunk_stream(query, chunk_size, returns_as \\ :one) do def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do
# We don't actually need start and end funcitons of resource streaming, # We don't actually need start and end functions of resource streaming,
# but it seems to be the only way to not fetch records one-by-one and # but it seems to be the only way to not fetch records one-by-one and
# have individual records be the elements of the stream, instead of # have individual records be the elements of the stream, instead of
# lists of records # lists of records
@ -76,7 +76,7 @@ def chunk_stream(query, chunk_size, returns_as \\ :one) do
|> order_by(asc: :id) |> order_by(asc: :id)
|> where([r], r.id > ^last_id) |> where([r], r.id > ^last_id)
|> limit(^chunk_size) |> limit(^chunk_size)
|> all() |> all(query_options)
|> case do |> case do
[] -> [] ->
{:halt, last_id} {:halt, last_id}