From 79dde5804427ab24c5bc9aa1b5a44d2259cecc6d Mon Sep 17 00:00:00 2001 From: Alex S Date: Wed, 4 Sep 2019 20:18:11 +0300 Subject: [PATCH] one more temp commit --- lib/load_testing/fetcher.ex | 120 +++++++++--------- lib/load_testing/generator.ex | 127 +++++++++++++++---- lib/mix/tasks/pleroma/load_testing.ex | 46 ++++--- lib/pleroma/web/activity_pub/activity_pub.ex | 5 + 4 files changed, 202 insertions(+), 96 deletions(-) diff --git a/lib/load_testing/fetcher.ex b/lib/load_testing/fetcher.ex index 70c0fcd0c..ed744db9b 100644 --- a/lib/load_testing/fetcher.ex +++ b/lib/load_testing/fetcher.ex @@ -4,33 +4,18 @@ defmodule Pleroma.LoadTesting.Fetcher do def fetch_user(user) do IO.puts("=================================") - {time, _value} = :timer.tc(fn -> Repo.get_by(User, id: user.id) end) - - IO.puts("Query user by id: #{to_sec(time)} sec.") - - {time, _value} = - :timer.tc(fn -> - Repo.get_by(User, ap_id: user.ap_id) - end) - - IO.puts("Query user by ap_id: #{to_sec(time)} sec.") - - {time, _value} = - :timer.tc(fn -> - Repo.get_by(User, email: user.email) - end) - - IO.puts("Query user by email: #{to_sec(time)} sec.") - - {time, _value} = :timer.tc(fn -> Repo.get_by(User, nickname: user.nickname) end) - - IO.puts("Query user by nickname: #{to_sec(time)} sec.") + Benchee.run(%{ + "By id" => fn -> Repo.get_by(User, id: user.id) end, + "By ap_id" => fn -> Repo.get_by(User, ap_id: user.ap_id) end, + "By email" => fn -> Repo.get_by(User, email: user.email) end, + "By nickname" => fn -> Repo.get_by(User, nickname: user.nickname) end + }) end def query_timelines(user) do IO.puts("\n=================================") - params = %{ + home_timeline_params = %{ "count" => 20, "with_muted" => true, "type" => ["Create", "Announce"], @@ -39,14 +24,7 @@ def query_timelines(user) do "user" => user } - {time, _} = - :timer.tc(fn -> - ActivityPub.ActivityPub.fetch_activities([user.ap_id | user.following], params) - end) - - IO.puts("Query user home timeline: #{to_sec(time)} sec.") - - params = %{ + mastodon_public_timeline_params = %{ "count" => 20, "local_only" => true, "only_media" => "false", @@ -56,14 +34,7 @@ def query_timelines(user) do "muting_user" => user } - {time, _} = - :timer.tc(fn -> - ActivityPub.ActivityPub.fetch_public_activities(params) - end) - - IO.puts("Query user mastodon public timeline: #{to_sec(time)} sec.") - - params = %{ + mastodon_federated_timeline_params = %{ "count" => 20, "only_media" => "false", "type" => ["Create", "Announce"], @@ -72,45 +43,76 @@ def query_timelines(user) do "muting_user" => user } - {time, _} = - :timer.tc(fn -> - ActivityPub.ActivityPub.fetch_public_activities(params) - end) - - IO.puts("Query user mastodon federated public timeline: #{to_sec(time)} sec.") + Benchee.run(%{ + "User home timeline" => fn -> + Pleroma.Web.ActivityPub.ActivityPub.fetch_activities( + [user.ap_id | user.following], + home_timeline_params + ) + end, + "User mastodon public timeline" => fn -> + ActivityPub.ActivityPub.fetch_public_activities(mastodon_public_timeline_params) + end, + "User mastodon federated public timeline" => fn -> + ActivityPub.ActivityPub.fetch_public_activities(mastodon_federated_timeline_params) + end + }) end def query_notifications(user) do IO.puts("\n=================================") - params = %{"count" => "20", "with_muted" => "false"} + without_muted_params = %{"count" => "20", "with_muted" => "false"} + with_muted_params = %{"count" => "20", "with_muted" => "true"} - {time, _} = - :timer.tc(fn -> Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, params) end) + Benchee.run(%{ + "Notifications without muted" => fn -> + Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, without_muted_params) + end, + "Notifications with muted" => fn -> + Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, with_muted_params) + end + }) + end - IO.puts("Query user notifications with out muted: #{to_sec(time)} sec.") + def query_dms(user) do + IO.puts("\n=================================") - params = %{"count" => "20", "with_muted" => "true"} + params = %{ + "count" => "20", + "with_muted" => "true", + "type" => "Create", + "blocking_user" => user, + "user" => user, + visibility: "direct" + } - {time, _} = - :timer.tc(fn -> Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, params) end) - - IO.puts("Query user notifications with muted: #{to_sec(time)} sec.") + Benchee.run(%{ + "Direct messages with muted" => fn -> + Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params) + |> Pleroma.Pagination.fetch_paginated(params) + end, + "Direct messages without muted" => fn -> + Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params) + |> Pleroma.Pagination.fetch_paginated(Map.put(params, "with_muted", false)) + end + }) end def query_long_thread(user, activity) do IO.puts("\n=================================") - {time, replies} = - :timer.tc(fn -> + Benchee.run(%{ + "Fetch main post" => fn -> Activity.get_by_id_with_object(activity.id) end, + "Fetch context of main post" => fn -> Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_for_context( activity.data["context"], %{ "blocking_user" => user, - "user" => user + "user" => user, + "exclude_id" => activity.id } ) - end) - - IO.puts("Query long thread with #{length(replies)} replies: #{to_sec(time)} sec.") + end + }) end end diff --git a/lib/load_testing/generator.ex b/lib/load_testing/generator.ex index a9016b9e8..7f50ee68e 100644 --- a/lib/load_testing/generator.ex +++ b/lib/load_testing/generator.ex @@ -8,25 +8,15 @@ def generate_users(opts) do end defp do_generate_users(opts) do - min = Keyword.get(opts, :users_min, 1) max = Keyword.get(opts, :users_max) - query = - "INSERT INTO \"users\" (\"ap_id\",\"bio\",\"email\",\"follower_address\",\"following\",\"following_address\",\"info\", - \"local\",\"name\",\"nickname\",\"password_hash\",\"tags\",\"id\",\"inserted_at\",\"updated_at\") VALUES \n" - - users = - Task.async_stream( - min..max, - &generate_user_data(&1), - max_concurrency: 10, - timeout: 30_000 - ) - |> Enum.reduce("", fn {:ok, data}, acc -> acc <> data <> ", \n" end) - - query = query <> String.replace_trailing(users, ", \n", ";") - - Ecto.Adapters.SQL.query!(Repo, query) + Task.async_stream( + 1..max, + &generate_user_data(&1), + max_concurrency: 10, + timeout: 30_000 + ) + |> Enum.to_list() end defp generate_user_data(i) do @@ -47,11 +37,7 @@ defp generate_user_data(i) do following: [User.ap_id(user)] } - "('#{user.ap_id}', '#{user.bio}', '#{user.email}', '#{user.follower_address}', '{#{ - user.following - }}', '#{user.following_address}', '#{Jason.encode!(user.info)}', '#{user.local}', '#{ - user.name - }', '#{user.nickname}', '#{user.password_hash}', '{#{user.tags}}', uuid_generate_v4(), NOW(), NOW())" + Pleroma.Repo.insert!(user) end def generate_activities(users, opts) do @@ -80,4 +66,101 @@ defp do_generate_activity(users, opts) do Pleroma.Web.CommonAPI.post(Enum.random(users), %{"status" => status}) end + + def generate_dms(user, users, opts) do + IO.puts("Starting generating #{opts[:dms_max]} DMs") + {time, _} = :timer.tc(fn -> do_generate_dms(user, users, opts) end) + IO.puts("Inserting dms take #{to_sec(time)} sec.\n") + end + + defp do_generate_dms(user, users, opts) do + Task.async_stream( + 1..opts[:dms_max], + fn _ -> + do_generate_dm(user, users) + end, + max_concurrency: 10, + timeout: 30_000 + ) + |> Stream.run() + end + + defp do_generate_dm(user, users) do + post = %{ + "status" => "@#{user.nickname} some direct message", + "visibility" => "direct" + } + + Pleroma.Web.CommonAPI.post(Enum.random(users), post) + end + + def generate_long_thread(user, users, opts) do + IO.puts("Starting generating long thread with #{opts[:long_thread_length]} replies") + {time, activity} = :timer.tc(fn -> do_generate_long_thread(user, users, opts) end) + IO.puts("Inserting long thread replies take #{to_sec(time)} sec.\n") + {:ok, activity} + end + + defp do_generate_long_thread(user, users, opts) do + {:ok, %{id: id} = activity} = + Pleroma.Web.CommonAPI.post(user, %{"status" => "Start of long thread"}) + + Task.async_stream( + 1..opts[:long_thread_length], + fn _ -> do_generate_thread(users, id) end, + max_concurrency: 10, + timeout: 30_000 + ) + |> Stream.run() + + activity + end + + defp do_generate_thread(users, activity_id) do + Pleroma.Web.CommonAPI.post(Enum.random(users), %{ + "status" => "reply to main post", + "in_reply_to_status_id" => activity_id + }) + end + + def generate_private_thread(users, opts) do + IO.puts("Starting generating long thread with #{opts[:non_visible_posts_max]} replies") + {time, _} = :timer.tc(fn -> do_generate_non_visible_posts(users, opts) end) + IO.puts("Inserting long thread replies take #{to_sec(time)} sec.\n") + end + + defp do_generate_non_visible_posts(users, opts) do + [user1, user2] = Enum.take(users, 2) + {:ok, user1} = Pleroma.User.follow(user1, user2) + {:ok, user2} = Pleroma.User.follow(user2, user1) + + {:ok, activity} = + Pleroma.Web.CommonAPI.post(user1, %{ + "status" => "Some private post", + "visibility" => "private" + }) + + {:ok, activity_public} = + Pleroma.Web.CommonAPI.post(user2, %{ + "status" => "Some public reply", + "in_reply_to_status_id" => activity.id + }) + + Task.async_stream( + 1..opts[:non_visible_posts_max], + fn _ -> do_generate_non_visible_post(users, activity_public) end, + max_concurrency: 10, + timeout: 30_000 + ) + end + + defp do_generate_non_visible_post(users, activity) do + visibility = Enum.random(["private", "public"]) + + Pleroma.Web.CommonAPI.post(Enum.random(users), %{ + "visibility" => visibility, + "status" => "Some #{visibility} reply", + "in_reply_to_status_id" => activity.id + }) + end end diff --git a/lib/mix/tasks/pleroma/load_testing.ex b/lib/mix/tasks/pleroma/load_testing.ex index 9ed30db7e..d17cf0b3e 100644 --- a/lib/mix/tasks/pleroma/load_testing.ex +++ b/lib/mix/tasks/pleroma/load_testing.ex @@ -21,31 +21,38 @@ defmodule Mix.Tasks.Pleroma.LoadTesting do - `--activities NUMBER` - number of activities to generate (default: 20000) """ - @aliases [u: :users, a: :activities, d: :delete] - @switches [users: :integer, activities: :integer, delete: :boolean] + @aliases [u: :users, a: :activities] + @switches [ + users: :integer, + activities: :integer, + dms: :integer, + thread_length: :integer, + non_visible_posts: :integer + ] @users_default 20_000 @activities_default 50_000 + @dms_default 50_000 + @thread_length_default 2_000 + @non_visible_posts_default 2_000 def run(args) do - {opts, _} = OptionParser.parse!(args, strict: @switches, aliases: @aliases) start_pleroma() + {opts, _} = OptionParser.parse!(args, strict: @switches, aliases: @aliases) - current_max = Keyword.get(opts, :users, @users_default) + users_max = Keyword.get(opts, :users, @users_default) activities_max = Keyword.get(opts, :activities, @activities_default) + dms_max = Keyword.get(opts, :dms, @dms_default) + long_thread_length = Keyword.get(opts, :thread_length, @thread_length_default) + non_visible_posts = Keyword.get(opts, :non_visible_posts, @non_visible_posts_default) - {users_min, users_max} = - if opts[:delete] do - clean_tables() - {1, current_max} - else - current_count = Repo.aggregate(from(u in User), :count, :id) + 1 - {current_count, current_max + current_count} - end + clean_tables() opts = - Keyword.put(opts, :users_min, users_min) - |> Keyword.put(:users_max, users_max) + Keyword.put(opts, :users_max, users_max) |> Keyword.put(:activities_max, activities_max) + |> Keyword.put(:dms_max, dms_max) + |> Keyword.put(:long_thread_length, long_thread_length) + |> Keyword.put(:non_visible_posts_max, non_visible_posts) generate_users(opts) @@ -76,6 +83,12 @@ def run(args) do generate_activities(users, Keyword.put(opts, :mention, user)) + generate_dms(user, users, opts) + + {:ok, activity} = generate_long_thread(user, users, opts) + + generate_private_thread(users, opts) + # generate_replies(user, users, activities) # activity = Enum.random(activities) @@ -86,9 +99,12 @@ def run(args) do IO.puts("Objects in DB: #{Repo.aggregate(from(o in Object), :count, :id)}") IO.puts("Notifications in DB: #{Repo.aggregate(from(n in Notification), :count, :id)}") + fetch_user(user) query_timelines(user) query_notifications(user) - # query_long_thread(user, activity) + query_dms(user) + query_long_thread(user, activity) + query_timelines(user) end defp clean_tables do diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index eeb826814..f2b322314 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -249,6 +249,7 @@ def create(%{to: to, actor: actor, context: context, object: object} = params, f # only accept false as false value local = !(params[:local] == false) published = params[:published] + quick_insert? = Pleroma.Config.get([:env]) == :benchmark with create_data <- make_create_data( @@ -259,12 +260,16 @@ def create(%{to: to, actor: actor, context: context, object: object} = params, f {:fake, false, activity} <- {:fake, fake, activity}, _ <- increase_replies_count_if_reply(create_data), _ <- increase_poll_votes_if_vote(create_data), + {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity}, # Changing note count prior to enqueuing federation task in order to avoid # race conditions on updating user.info {:ok, _actor} <- increase_note_count_if_public(actor, activity), :ok <- maybe_federate(activity) do {:ok, activity} else + {:quick_insert, true, activity} -> + {:ok, activity} + {:fake, true, activity} -> {:ok, activity}