import staking pools delegators

pull/2036/head
saneery 6 years ago
parent b6d30d4229
commit 2a25481663
  1. 54
      apps/explorer/lib/explorer/chain/import/runner/staking_pools.ex
  2. 91
      apps/explorer/lib/explorer/chain/import/runner/staking_pools_delegators.ex
  3. 3
      apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex
  4. 3
      apps/explorer/lib/explorer/chain/staking_pool.ex
  5. 47
      apps/explorer/lib/explorer/staking/pools_reader.ex
  6. 23
      apps/indexer/lib/indexer/fetcher/staking_pools.ex

@ -128,37 +128,35 @@ defmodule Explorer.Chain.Import.Runner.StakingPools do
end
defp calculate_stakes_ratio(repo, %{timeout: timeout}) do
try do
total_query =
from(
pool in StakingPool,
where: pool.is_active == true,
select: sum(pool.staked_amount)
)
total_query =
from(
pool in StakingPool,
where: pool.is_active == true,
select: sum(pool.staked_amount)
)
total = repo.one!(total_query)
total = repo.one!(total_query)
if total > Decimal.new(0) do
query =
from(
p in StakingPool,
where: p.is_active == true,
update: [
set: [
staked_ratio: p.staked_amount / ^total * 100,
likelihood: p.staked_amount / ^total * 100
]
if total > Decimal.new(0) do
query =
from(
p in StakingPool,
where: p.is_active == true,
update: [
set: [
staked_ratio: p.staked_amount / ^total * 100,
likelihood: p.staked_amount / ^total * 100
]
)
]
)
{count, _} = repo.update_all(query, [], timeout: timeout)
{:ok, count}
else
{:ok, 1}
end
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error}}
{count, _} = repo.update_all(query, [], timeout: timeout)
{:ok, count}
else
{:ok, 1}
end
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error}}
end
end

@ -0,0 +1,91 @@
defmodule Explorer.Chain.Import.Runner.StakingPoolsDelegators do
@moduledoc """
Bulk imports staking pools to StakingPoolsDelegators tabe.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Import, StakingPoolsDelegators}
import Ecto.Query, only: [from: 2]
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [StakingPoolsDelegators.t()]
@impl Import.Runner
def ecto_schema_module, do: StakingPoolsDelegators
@impl Import.Runner
def option_key, do: :staking_pools_delegators
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
multi
|> Multi.run(:insert_staking_pools_delegators, fn repo, _ ->
insert(repo, changes_list, insert_options)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{
optional(:on_conflict) => Import.Runner.on_conflict(),
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) ::
{:ok, [StakingPoolsDelegators.t()]}
| {:error, [Changeset.t()]}
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
{:ok, _} =
Import.insert_changes_list(
repo,
changes_list,
conflict_target: [:pool_address_hash, :delegator_address_hash],
on_conflict: on_conflict,
for: StakingPoolsDelegators,
returning: [:pool_address_hash, :delegator_address_hash],
timeout: timeout,
timestamps: timestamps
)
end
defp default_on_conflict do
from(
name in StakingPoolsDelegators,
update: [
set: [
stake_amount: fragment("EXCLUDED.stake_amount"),
ordered_withdraw: fragment("EXCLUDED.ordered_withdraw"),
max_withdraw_allowed: fragment("EXCLUDED.max_withdraw_allowed"),
max_ordered_withdraw_allowed: fragment("EXCLUDED.max_ordered_withdraw_allowed"),
ordered_withdraw_epoch: fragment("EXCLUDED.ordered_withdraw_epoch"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", name.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", name.updated_at)
]
]
)
end
end

@ -25,7 +25,8 @@ defmodule Explorer.Chain.Import.Stage.AddressReferencing do
Runner.TokenTransfers,
Runner.Address.CurrentTokenBalances,
Runner.Address.TokenBalances,
Runner.StakingPools
Runner.StakingPools,
Runner.StakingPoolsDelegators
]
@impl Stage

@ -9,6 +9,7 @@ defmodule Explorer.Chain.StakingPool do
alias Explorer.Chain.{
Address,
Hash,
StakingPoolsDelegators,
Wei
}
@ -53,6 +54,7 @@ defmodule Explorer.Chain.StakingPool do
field(:was_banned_count, :integer)
field(:was_validator_count, :integer)
field(:is_deleted, :boolean, default: false)
has_many(:delegators, StakingPoolsDelegators)
belongs_to(
:staking_address,
@ -77,6 +79,7 @@ defmodule Explorer.Chain.StakingPool do
def changeset(staking_pool, attrs) do
staking_pool
|> cast(attrs, @attrs)
|> cast_assoc(:delegators)
|> validate_required(@req_attrs)
|> validate_staked_amount()
|> unique_constraint(:staking_address_hash)

@ -24,10 +24,11 @@ defmodule Explorer.Staking.PoolsReader do
@spec pool_data(String.t()) :: {:ok, map()} | :error
def pool_data(staking_address) do
with {:ok, [mining_address]} <- call_validators_method("miningByStakingAddress", [staking_address]),
data = fetch_data(staking_address, mining_address),
data = fetch_pool_data(staking_address, mining_address),
{:ok, [is_active]} <- data["isPoolActive"],
{:ok, [delegator_addresses]} <- data["poolDelegators"],
delegators_count = Enum.count(delegator_addresses),
delegators = delegators_data(delegator_addresses, staking_address),
{:ok, [staked_amount]} <- data["stakeAmountTotalMinusOrderedWithdraw"],
{:ok, [self_staked_amount]} <- data["stakeAmountMinusOrderedWithdraw"],
{:ok, [is_validator]} <- data["isValidator"],
@ -48,7 +49,8 @@ defmodule Explorer.Staking.PoolsReader do
was_validator_count: was_validator_count,
is_banned: is_banned,
banned_until: banned_until,
was_banned_count: was_banned_count
was_banned_count: was_banned_count,
delegators: delegators
}
}
else
@ -57,6 +59,35 @@ defmodule Explorer.Staking.PoolsReader do
end
end
defp delegators_data(delegators, pool_address) do
Enum.map(delegators, fn address ->
data =
call_methods([
{:staking, "stakeAmount", [pool_address, address]},
{:staking, "orderedWithdrawAmount", [pool_address, address]},
{:staking, "maxWithdrawAllowed", [pool_address, address]},
{:staking, "maxWithdrawOrderAllowed", [pool_address, address]},
{:staking, "orderWithdrawEpoch", [pool_address, address]}
])
{:ok, [stake_amount]} = data["stakeAmount"]
{:ok, [ordered_withdraw]} = data["orderedWithdrawAmount"]
{:ok, [max_withdraw_allowed]} = data["maxWithdrawAllowed"]
{:ok, [max_ordered_withdraw_allowed]} = data["maxWithdrawOrderAllowed"]
{:ok, [ordered_withdraw_epoch]} = data["orderWithdrawEpoch"]
%{
delegator_address_hash: address,
pool_address_hash: pool_address,
stake_amount: stake_amount,
ordered_withdraw: ordered_withdraw,
max_withdraw_allowed: max_withdraw_allowed,
max_ordered_withdraw_allowed: max_ordered_withdraw_allowed,
ordered_withdraw_epoch: ordered_withdraw_epoch
}
end)
end
defp call_staking_method(method, params) do
%{^method => resp} =
Reader.query_contract(config(:staking_contract_address), abi("staking.json"), %{
@ -75,10 +106,8 @@ defmodule Explorer.Staking.PoolsReader do
resp
end
defp fetch_data(staking_address, mining_address) do
contract_abi = abi("staking.json") ++ abi("validators.json")
methods = [
defp fetch_pool_data(staking_address, mining_address) do
call_methods([
{:staking, "isPoolActive", [staking_address]},
{:staking, "poolDelegators", [staking_address]},
{:staking, "stakeAmountTotalMinusOrderedWithdraw", [staking_address]},
@ -88,7 +117,11 @@ defmodule Explorer.Staking.PoolsReader do
{:validators, "isValidatorBanned", [mining_address]},
{:validators, "bannedUntil", [mining_address]},
{:validators, "banCounter", [mining_address]}
]
])
end
defp call_methods(methods) do
contract_abi = abi("staking.json") ++ abi("validators.json")
methods
|> Enum.map(&format_request/1)

@ -94,21 +94,22 @@ defmodule Indexer.Fetcher.StakingPools do
defp import_pools(pools) do
{failed, success} =
Enum.reduce(pools, {[], []}, fn
%{error: _error, staking_address_hash: address}, {failed, success} ->
{[address | failed], success}
%{error: _error} = pool, {failed, success} ->
{[pool | failed], success}
%{staking_address_hash: address} = pool, {failed, success} ->
pool, {failed, success} ->
changeset = StakingPool.changeset(%StakingPool{}, pool)
if changeset.valid? do
{failed, [changeset.changes | success]}
else
{[address | failed], success}
{[pool | failed], success}
end
end)
import_params = %{
staking_pools: %{params: success},
staking_pools: %{params: remove_assoc(success)},
staking_pools_delegators: %{params: delegators_list(success)},
timeout: :infinity
}
@ -124,4 +125,16 @@ defmodule Indexer.Fetcher.StakingPools do
failed
end
defp delegators_list(pools) do
Enum.reduce(pools, [], fn pool, acc ->
pool.delegators
|> Enum.map(pool.delegators, &Map.get(&1, :changes))
|> Enum.concat(acc)
end)
end
defp remove_assoc(pools) do
Enum.map(pools, &Map.delete(&1, :delegators))
end
end

Loading…
Cancel
Save