From 2a25481663fcff78e881c17399f99df84c73f04d Mon Sep 17 00:00:00 2001 From: saneery Date: Mon, 27 May 2019 10:56:59 +0300 Subject: [PATCH] import staking pools delegators --- .../chain/import/runner/staking_pools.ex | 54 ++++++----- .../import/runner/staking_pools_delegators.ex | 91 +++++++++++++++++++ .../chain/import/stage/address_referencing.ex | 3 +- .../lib/explorer/chain/staking_pool.ex | 3 + .../lib/explorer/staking/pools_reader.ex | 47 ++++++++-- .../lib/indexer/fetcher/staking_pools.ex | 23 ++++- 6 files changed, 180 insertions(+), 41 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/import/runner/staking_pools_delegators.ex diff --git a/apps/explorer/lib/explorer/chain/import/runner/staking_pools.ex b/apps/explorer/lib/explorer/chain/import/runner/staking_pools.ex index d30ca934fa..d7655674d8 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/staking_pools.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/staking_pools.ex @@ -128,37 +128,35 @@ defmodule Explorer.Chain.Import.Runner.StakingPools do end defp calculate_stakes_ratio(repo, %{timeout: timeout}) do - try do - total_query = - from( - pool in StakingPool, - where: pool.is_active == true, - select: sum(pool.staked_amount) - ) + total_query = + from( + pool in StakingPool, + where: pool.is_active == true, + select: sum(pool.staked_amount) + ) + + total = repo.one!(total_query) - total = repo.one!(total_query) - - if total > Decimal.new(0) do - query = - from( - p in StakingPool, - where: p.is_active == true, - update: [ - set: [ - staked_ratio: p.staked_amount / ^total * 100, - likelihood: p.staked_amount / ^total * 100 - ] + if total > Decimal.new(0) do + query = + from( + p in StakingPool, + where: p.is_active == true, + update: [ + set: [ + staked_ratio: p.staked_amount / ^total * 100, + likelihood: p.staked_amount / ^total * 100 ] - ) + ] + ) - {count, _} = repo.update_all(query, [], timeout: timeout) - {:ok, count} - else - {:ok, 1} - end - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error}} + {count, _} = repo.update_all(query, [], timeout: timeout) + {:ok, count} + else + {:ok, 1} end + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error}} end end diff --git a/apps/explorer/lib/explorer/chain/import/runner/staking_pools_delegators.ex b/apps/explorer/lib/explorer/chain/import/runner/staking_pools_delegators.ex new file mode 100644 index 0000000000..5d44d68a92 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/runner/staking_pools_delegators.ex @@ -0,0 +1,91 @@ +defmodule Explorer.Chain.Import.Runner.StakingPoolsDelegators do + @moduledoc """ + Bulk imports staking pools to StakingPoolsDelegators tabe. + """ + + require Ecto.Query + + alias Ecto.{Changeset, Multi, Repo} + alias Explorer.Chain.{Import, StakingPoolsDelegators} + + import Ecto.Query, only: [from: 2] + + @behaviour Import.Runner + + # milliseconds + @timeout 60_000 + + @type imported :: [StakingPoolsDelegators.t()] + + @impl Import.Runner + def ecto_schema_module, do: StakingPoolsDelegators + + @impl Import.Runner + def option_key, do: :staking_pools_delegators + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } + end + + @impl Import.Runner + def run(multi, changes_list, %{timestamps: timestamps} = options) do + insert_options = + options + |> Map.get(option_key(), %{}) + |> Map.take(~w(on_conflict timeout)a) + |> Map.put_new(:timeout, @timeout) + |> Map.put(:timestamps, timestamps) + + multi + |> Multi.run(:insert_staking_pools_delegators, fn repo, _ -> + insert(repo, changes_list, insert_options) + end) + end + + @impl Import.Runner + def timeout, do: @timeout + + @spec insert(Repo.t(), [map()], %{ + optional(:on_conflict) => Import.Runner.on_conflict(), + required(:timeout) => timeout, + required(:timestamps) => Import.timestamps() + }) :: + {:ok, [StakingPoolsDelegators.t()]} + | {:error, [Changeset.t()]} + defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + + {:ok, _} = + Import.insert_changes_list( + repo, + changes_list, + conflict_target: [:pool_address_hash, :delegator_address_hash], + on_conflict: on_conflict, + for: StakingPoolsDelegators, + returning: [:pool_address_hash, :delegator_address_hash], + timeout: timeout, + timestamps: timestamps + ) + end + + defp default_on_conflict do + from( + name in StakingPoolsDelegators, + update: [ + set: [ + stake_amount: fragment("EXCLUDED.stake_amount"), + ordered_withdraw: fragment("EXCLUDED.ordered_withdraw"), + max_withdraw_allowed: fragment("EXCLUDED.max_withdraw_allowed"), + max_ordered_withdraw_allowed: fragment("EXCLUDED.max_ordered_withdraw_allowed"), + ordered_withdraw_epoch: fragment("EXCLUDED.ordered_withdraw_epoch"), + inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", name.inserted_at), + updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", name.updated_at) + ] + ] + ) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex index 0e42bdc1f9..b5037f3b04 100644 --- a/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex +++ b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex @@ -25,7 +25,8 @@ defmodule Explorer.Chain.Import.Stage.AddressReferencing do Runner.TokenTransfers, Runner.Address.CurrentTokenBalances, Runner.Address.TokenBalances, - Runner.StakingPools + Runner.StakingPools, + Runner.StakingPoolsDelegators ] @impl Stage diff --git a/apps/explorer/lib/explorer/chain/staking_pool.ex b/apps/explorer/lib/explorer/chain/staking_pool.ex index 297ee8d06d..31c5b1cc26 100644 --- a/apps/explorer/lib/explorer/chain/staking_pool.ex +++ b/apps/explorer/lib/explorer/chain/staking_pool.ex @@ -9,6 +9,7 @@ defmodule Explorer.Chain.StakingPool do alias Explorer.Chain.{ Address, Hash, + StakingPoolsDelegators, Wei } @@ -53,6 +54,7 @@ defmodule Explorer.Chain.StakingPool do field(:was_banned_count, :integer) field(:was_validator_count, :integer) field(:is_deleted, :boolean, default: false) + has_many(:delegators, StakingPoolsDelegators) belongs_to( :staking_address, @@ -77,6 +79,7 @@ defmodule Explorer.Chain.StakingPool do def changeset(staking_pool, attrs) do staking_pool |> cast(attrs, @attrs) + |> cast_assoc(:delegators) |> validate_required(@req_attrs) |> validate_staked_amount() |> unique_constraint(:staking_address_hash) diff --git a/apps/explorer/lib/explorer/staking/pools_reader.ex b/apps/explorer/lib/explorer/staking/pools_reader.ex index 1c88e09493..0c04c8e570 100644 --- a/apps/explorer/lib/explorer/staking/pools_reader.ex +++ b/apps/explorer/lib/explorer/staking/pools_reader.ex @@ -24,10 +24,11 @@ defmodule Explorer.Staking.PoolsReader do @spec pool_data(String.t()) :: {:ok, map()} | :error def pool_data(staking_address) do with {:ok, [mining_address]} <- call_validators_method("miningByStakingAddress", [staking_address]), - data = fetch_data(staking_address, mining_address), + data = fetch_pool_data(staking_address, mining_address), {:ok, [is_active]} <- data["isPoolActive"], {:ok, [delegator_addresses]} <- data["poolDelegators"], delegators_count = Enum.count(delegator_addresses), + delegators = delegators_data(delegator_addresses, staking_address), {:ok, [staked_amount]} <- data["stakeAmountTotalMinusOrderedWithdraw"], {:ok, [self_staked_amount]} <- data["stakeAmountMinusOrderedWithdraw"], {:ok, [is_validator]} <- data["isValidator"], @@ -48,7 +49,8 @@ defmodule Explorer.Staking.PoolsReader do was_validator_count: was_validator_count, is_banned: is_banned, banned_until: banned_until, - was_banned_count: was_banned_count + was_banned_count: was_banned_count, + delegators: delegators } } else @@ -57,6 +59,35 @@ defmodule Explorer.Staking.PoolsReader do end end + defp delegators_data(delegators, pool_address) do + Enum.map(delegators, fn address -> + data = + call_methods([ + {:staking, "stakeAmount", [pool_address, address]}, + {:staking, "orderedWithdrawAmount", [pool_address, address]}, + {:staking, "maxWithdrawAllowed", [pool_address, address]}, + {:staking, "maxWithdrawOrderAllowed", [pool_address, address]}, + {:staking, "orderWithdrawEpoch", [pool_address, address]} + ]) + + {:ok, [stake_amount]} = data["stakeAmount"] + {:ok, [ordered_withdraw]} = data["orderedWithdrawAmount"] + {:ok, [max_withdraw_allowed]} = data["maxWithdrawAllowed"] + {:ok, [max_ordered_withdraw_allowed]} = data["maxWithdrawOrderAllowed"] + {:ok, [ordered_withdraw_epoch]} = data["orderWithdrawEpoch"] + + %{ + delegator_address_hash: address, + pool_address_hash: pool_address, + stake_amount: stake_amount, + ordered_withdraw: ordered_withdraw, + max_withdraw_allowed: max_withdraw_allowed, + max_ordered_withdraw_allowed: max_ordered_withdraw_allowed, + ordered_withdraw_epoch: ordered_withdraw_epoch + } + end) + end + defp call_staking_method(method, params) do %{^method => resp} = Reader.query_contract(config(:staking_contract_address), abi("staking.json"), %{ @@ -75,10 +106,8 @@ defmodule Explorer.Staking.PoolsReader do resp end - defp fetch_data(staking_address, mining_address) do - contract_abi = abi("staking.json") ++ abi("validators.json") - - methods = [ + defp fetch_pool_data(staking_address, mining_address) do + call_methods([ {:staking, "isPoolActive", [staking_address]}, {:staking, "poolDelegators", [staking_address]}, {:staking, "stakeAmountTotalMinusOrderedWithdraw", [staking_address]}, @@ -88,7 +117,11 @@ defmodule Explorer.Staking.PoolsReader do {:validators, "isValidatorBanned", [mining_address]}, {:validators, "bannedUntil", [mining_address]}, {:validators, "banCounter", [mining_address]} - ] + ]) + end + + defp call_methods(methods) do + contract_abi = abi("staking.json") ++ abi("validators.json") methods |> Enum.map(&format_request/1) diff --git a/apps/indexer/lib/indexer/fetcher/staking_pools.ex b/apps/indexer/lib/indexer/fetcher/staking_pools.ex index 1632229e08..e5fb5485a6 100644 --- a/apps/indexer/lib/indexer/fetcher/staking_pools.ex +++ b/apps/indexer/lib/indexer/fetcher/staking_pools.ex @@ -94,21 +94,22 @@ defmodule Indexer.Fetcher.StakingPools do defp import_pools(pools) do {failed, success} = Enum.reduce(pools, {[], []}, fn - %{error: _error, staking_address_hash: address}, {failed, success} -> - {[address | failed], success} + %{error: _error} = pool, {failed, success} -> + {[pool | failed], success} - %{staking_address_hash: address} = pool, {failed, success} -> + pool, {failed, success} -> changeset = StakingPool.changeset(%StakingPool{}, pool) if changeset.valid? do {failed, [changeset.changes | success]} else - {[address | failed], success} + {[pool | failed], success} end end) import_params = %{ - staking_pools: %{params: success}, + staking_pools: %{params: remove_assoc(success)}, + staking_pools_delegators: %{params: delegators_list(success)}, timeout: :infinity } @@ -124,4 +125,16 @@ defmodule Indexer.Fetcher.StakingPools do failed end + + defp delegators_list(pools) do + Enum.reduce(pools, [], fn pool, acc -> + pool.delegators + |> Enum.map(pool.delegators, &Map.get(&1, :changes)) + |> Enum.concat(acc) + end) + end + + defp remove_assoc(pools) do + Enum.map(pools, &Map.delete(&1, :delegators)) + end end