Fix snapshotting

staking
Vadim 5 years ago committed by Victor Baranov
parent 1859186550
commit 6c29571219
  1. 14
      apps/explorer/lib/explorer/staking/contract_reader.ex
  2. 64
      apps/explorer/lib/explorer/staking/contract_state.ex
  3. 160
      apps/explorer/lib/explorer/staking/stake_snapshotting.ex

@ -20,8 +20,6 @@ defmodule Explorer.Staking.ContractReader do
pools_likelihood: {:staking, "getPoolsLikelihood", []}, pools_likelihood: {:staking, "getPoolsLikelihood", []},
validators: {:validator_set, "getValidators", []}, validators: {:validator_set, "getValidators", []},
unremovable_validator: {:validator_set, "unremovableValidator", []}, unremovable_validator: {:validator_set, "unremovableValidator", []},
# pending_validators: {:validator_set, "getPendingValidators", []},
# be_finalized_validators: {:validator_set, "validatorsToBeFinalized", []},
validator_set_apply_block: {:validator_set, "validatorSetApplyBlock", []} validator_set_apply_block: {:validator_set, "validatorSetApplyBlock", []}
] ]
end end
@ -50,13 +48,13 @@ defmodule Explorer.Staking.ContractReader do
] ]
end end
def delegator_requests(pool_staking_address, delegator_address) do def staker_requests(pool_staking_address, staker_address) do
[ [
stake_amount: {:staking, "stakeAmount", [pool_staking_address, delegator_address]}, max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, staker_address]},
ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, delegator_address]}, max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, staker_address]},
max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, delegator_address]}, ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, staker_address]},
max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, delegator_address]}, ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, staker_address]},
ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, delegator_address]} stake_amount: {:staking, "stakeAmount", [pool_staking_address, staker_address]}
] ]
end end

@ -120,6 +120,7 @@ defmodule Explorer.Staking.ContractState do
end end
defp fetch_state(contracts, abi, block_number) do defp fetch_state(contracts, abi, block_number) do
# read general info from the contracts (including pool list and validator list)
global_responses = ContractReader.perform_requests(ContractReader.global_requests(), contracts, abi) global_responses = ContractReader.perform_requests(ContractReader.global_requests(), contracts, abi)
token = get_token(global_responses.token_contract_address) token = get_token(global_responses.token_contract_address)
@ -138,12 +139,14 @@ defmodule Explorer.Staking.ContractState do
|> Map.to_list() |> Map.to_list()
|> Enum.concat(token: token) |> Enum.concat(token: token)
# save the general info in ETS (excluding pool list and validator list)
:ets.insert(@table_name, settings) :ets.insert(@table_name, settings)
pools = global_responses.active_pools ++ global_responses.inactive_pools pools = global_responses.active_pools ++ global_responses.inactive_pools
is_validator = Enum.into(global_responses.validators, %{}, &{hash_to_string(&1), true}) is_validator = Enum.into(global_responses.validators, %{}, &{hash_to_string(&1), true})
unremovable_validator = global_responses.unremovable_validator unremovable_validator = global_responses.unremovable_validator
# read info about each pool from the contracts (including delegator list)
pool_staking_responses = pool_staking_responses =
pools pools
|> Enum.map(&ContractReader.pool_staking_requests/1) |> Enum.map(&ContractReader.pool_staking_requests/1)
@ -154,20 +157,23 @@ defmodule Explorer.Staking.ContractState do
|> Enum.map(&ContractReader.pool_mining_requests(pool_staking_responses[&1].mining_address_hash)) |> Enum.map(&ContractReader.pool_mining_requests(pool_staking_responses[&1].mining_address_hash))
|> ContractReader.perform_grouped_requests(pools, contracts, abi) |> ContractReader.perform_grouped_requests(pools, contracts, abi)
delegators = # form a flat list of all stakers in the form {pool_staking_address, staker_address, is_active}
Enum.flat_map(pool_staking_responses, fn {pool_address, responses} -> stakers =
[{pool_address, pool_address, true}] ++ Enum.flat_map(pool_staking_responses, fn {pool_staking_address, resp} ->
Enum.map(responses.active_delegators, &{pool_address, &1, true}) ++ [{pool_staking_address, pool_staking_address, true}] ++
Enum.map(responses.inactive_delegators, &{pool_address, &1, false}) Enum.map(resp.active_delegators, &{pool_staking_address, &1, true}) ++
Enum.map(resp.inactive_delegators, &{pool_staking_address, &1, false})
end) end)
delegator_responses = # get amounts for each of the stakers
delegators staker_responses =
|> Enum.map(fn {pool_address, delegator_address, _} -> stakers
ContractReader.delegator_requests(pool_address, delegator_address) |> Enum.map(fn {pool_staking_address, staker_address, _} ->
ContractReader.staker_requests(pool_staking_address, staker_address)
end) end)
|> ContractReader.perform_grouped_requests(delegators, contracts, abi) |> ContractReader.perform_grouped_requests(stakers, contracts, abi)
# calculate total amount staked into all active pools
staked_total = Enum.sum(for {_, pool} <- pool_staking_responses, pool.is_active, do: pool.total_staked_amount) staked_total = Enum.sum(for {_, pool} <- pool_staking_responses, pool.is_active, do: pool.total_staked_amount)
[likelihood_values, total_likelihood] = global_responses.pools_likelihood [likelihood_values, total_likelihood] = global_responses.pools_likelihood
@ -177,7 +183,7 @@ defmodule Explorer.Staking.ContractState do
|> Enum.zip(likelihood_values) |> Enum.zip(likelihood_values)
|> Enum.into(%{}) |> Enum.into(%{})
pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _response} -> key end) pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _} -> key end)
pool_reward_responses = pool_reward_responses =
pool_staking_responses pool_staking_responses
@ -191,10 +197,10 @@ defmodule Explorer.Staking.ContractState do
end) end)
|> ContractReader.perform_grouped_requests(pool_staking_keys, contracts, abi) |> ContractReader.perform_grouped_requests(pool_staking_keys, contracts, abi)
delegator_keys = Enum.map(delegator_responses, fn {key, _response} -> key end) delegator_keys = Enum.map(staker_responses, fn {key, _} -> key end)
delegator_reward_responses = delegator_reward_responses =
delegator_responses staker_responses
|> Enum.map(fn {{pool_address, _, _}, response} -> |> Enum.map(fn {{pool_address, _, _}, response} ->
staking_response = pool_staking_responses[pool_address] staking_response = pool_staking_responses[pool_address]
@ -231,26 +237,26 @@ defmodule Explorer.Staking.ContractState do
} }
|> Map.merge( |> Map.merge(
Map.take(staking_response, [ Map.take(staking_response, [
:mining_address_hash,
:is_active, :is_active,
:total_staked_amount, :mining_address_hash,
:self_staked_amount :self_staked_amount,
:total_staked_amount
]) ])
) )
|> Map.merge( |> Map.merge(
Map.take(mining_response, [ Map.take(mining_response, [
:was_validator_count,
:is_banned,
:are_delegators_banned, :are_delegators_banned,
:banned_until,
:banned_delegators_until, :banned_delegators_until,
:was_banned_count :banned_until,
:is_banned,
:was_banned_count,
:was_validator_count
]) ])
) )
end) end)
delegator_entries = delegator_entries =
Enum.map(delegator_responses, fn {{pool_address, delegator_address, is_active}, response} -> Enum.map(staker_responses, fn {{pool_address, delegator_address, is_active}, response} ->
delegator_reward_response = delegator_reward_responses[{pool_address, delegator_address, is_active}] delegator_reward_response = delegator_reward_responses[{pool_address, delegator_address, is_active}]
Map.merge(response, %{ Map.merge(response, %{
@ -269,16 +275,12 @@ defmodule Explorer.Staking.ContractState do
}) })
if global_responses.epoch_start_block == block_number + 1 do if global_responses.epoch_start_block == block_number + 1 do
with( spawn(StakeSnapshotting, :do_snapshotting, [
true <- :ets.insert(@table_name, is_snapshotted: false), %{contracts: contracts, abi: abi, epoch_number: global_responses.epoch_number, ets_table_name: @table_name},
{:ok, _} <- pool_staking_responses,
StakeSnapshotting.start_snapshotting( Map.new(Enum.map(staker_responses, fn {key, resp} -> {pool_staking_address, staker_address, _} = key; {{pool_staking_address, staker_address}, resp} end)),
%{contracts: contracts, abi: abi, global_responses: global_responses}, block_number # the last block of the finished staking epoch
block_number ])
)
) do
:ets.insert(@table_name, is_snapshotted: true)
end
end end
Publisher.broadcast(:staking_update) Publisher.broadcast(:staking_update)

@ -14,7 +14,15 @@ defmodule Explorer.Staking.StakeSnapshotting do
alias Explorer.SmartContract.Reader alias Explorer.SmartContract.Reader
alias Explorer.Staking.ContractReader alias Explorer.Staking.ContractReader
def start_snapshotting(%{contracts: contracts, abi: abi, global_responses: global_responses}, block_number) do def do_snapshotting(
%{contracts: contracts, abi: abi, epoch_number: epoch_number, ets_table_name: ets_table_name},
cached_pool_staking_responses,
cached_staker_responses,
block_number
) do
:ets.insert(ets_table_name, is_snapshotted: false)
# get the list of pending validators
%{ %{
"getPendingValidators" => {:ok, [pending_validators]}, "getPendingValidators" => {:ok, [pending_validators]},
"validatorsToBeFinalized" => {:ok, [to_be_finalized_validators]} "validatorsToBeFinalized" => {:ok, [to_be_finalized_validators]}
@ -26,6 +34,7 @@ defmodule Explorer.Staking.StakeSnapshotting do
pool_mining_addresses = Enum.uniq(pending_validators ++ to_be_finalized_validators) pool_mining_addresses = Enum.uniq(pending_validators ++ to_be_finalized_validators)
# get staking addresses for the pending validators
pool_staking_addresses = pool_staking_addresses =
pool_mining_addresses pool_mining_addresses
|> Enum.map(&staking_by_mining_requests/1) |> Enum.map(&staking_by_mining_requests/1)
@ -33,54 +42,84 @@ defmodule Explorer.Staking.StakeSnapshotting do
|> Enum.flat_map(fn {_, value} -> value end) |> Enum.flat_map(fn {_, value} -> value end)
|> Enum.map(fn {_key, staking_address_hash} -> decode_data(staking_address_hash) end) |> Enum.map(fn {_key, staking_address_hash} -> decode_data(staking_address_hash) end)
# get snapshotted amounts and other pool info for each pending validator.
# use `cached_pool_staking_responses` when possible
pool_staking_responses = pool_staking_responses =
pool_staking_addresses pool_staking_addresses
|> Enum.map(fn address_hashe -> pool_staking_requests(address_hashe, block_number) end) |> Enum.map(fn address_hash ->
|> ContractReader.perform_grouped_requests(pool_staking_addresses, contracts, abi) case Map.fetch(cached_pool_staking_responses, address_hash) do
{:ok, resp} ->
Map.merge(resp, ContractReader.perform_requests(snapshotted_amounts_requests(address_hash, block_number), contracts, abi))
:error ->
ContractReader.perform_requests(pool_staking_requests(address_hash, block_number), contracts, abi)
end
end)
|> Enum.zip(pool_staking_addresses)
|> Map.new(fn {key, val} -> {val, key} end)
pool_mining_responses = pool_mining_responses =
pool_staking_addresses pool_staking_addresses
|> Enum.map(&ContractReader.pool_mining_requests(pool_staking_responses[&1].mining_address_hash)) |> Enum.map(&ContractReader.pool_mining_requests(pool_staking_responses[&1].mining_address_hash))
|> ContractReader.perform_grouped_requests(pool_staking_addresses, contracts, abi) |> ContractReader.perform_grouped_requests(pool_staking_addresses, contracts, abi)
delegators = # form a flat list of all stakers in the form {pool_staking_address, staker_address}
Enum.flat_map(pool_staking_responses, fn {pool_address, responses} -> stakers =
[{pool_address, pool_address, true}] ++ Enum.flat_map(pool_staking_responses, fn {pool_staking_address, resp} ->
Enum.map(responses.active_delegators, &{pool_address, &1, true}) ++ [{pool_staking_address, pool_staking_address}] ++
Enum.map(responses.inactive_delegators, &{pool_address, &1, false}) Enum.map(resp.active_delegators, &{pool_staking_address, &1}) ++
Enum.map(resp.inactive_delegators, &{pool_staking_address, &1})
end) end)
delegator_responses = # get amounts for each of the stakers
delegators # use `cached_staker_responses` when possible
|> Enum.map(fn {pool_address, delegator_address, _} -> staker_responses =
delegator_requests(pool_address, delegator_address, block_number) stakers
|> Enum.map(fn {pool_staking_address, staker_address} = key ->
case Map.fetch(cached_staker_responses, key) do
{:ok, resp} ->
Map.merge(
resp,
ContractReader.perform_requests(
snapshotted_staker_amount_request(pool_staking_address, staker_address, block_number),
contracts,
abi
)
)
:error ->
ContractReader.perform_requests(
staker_requests(pool_staking_address, staker_address, block_number),
contracts,
abi
)
end
end) end)
|> ContractReader.perform_grouped_requests(delegators, contracts, abi) |> Enum.zip(stakers)
|> Map.new(fn {key, val} -> {val, key} end)
pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _response} -> key end) pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _} -> key end)
pool_reward_responses = pool_reward_responses =
pool_staking_responses pool_staking_responses
|> Enum.map(fn {_address, response} -> |> Enum.map(fn {_address, resp} ->
ContractReader.validator_reward_requests([ ContractReader.validator_reward_requests([
global_responses.epoch_number, epoch_number,
response.snapshotted_self_staked_amount, resp.snapshotted_self_staked_amount,
response.snapshotted_total_staked_amount, resp.snapshotted_total_staked_amount,
1000_000 1000_000
]) ])
end) end)
|> ContractReader.perform_grouped_requests(pool_staking_keys, contracts, abi) |> ContractReader.perform_grouped_requests(pool_staking_keys, contracts, abi)
delegator_keys = Enum.map(delegator_responses, fn {key, _response} -> key end) delegator_keys = Enum.map(staker_responses, fn {key, _} -> key end)
delegator_reward_responses = delegator_reward_responses =
delegator_responses staker_responses
|> Enum.map(fn {{pool_address, _delegator_address, _}, response} -> |> Enum.map(fn {{pool_address, _delegator_address}, response} ->
staking_response = pool_staking_responses[pool_address] staking_response = pool_staking_responses[pool_address]
ContractReader.delegator_reward_requests([ ContractReader.delegator_reward_requests([
global_responses.epoch_number, epoch_number,
response.stake_amount, response.snapshotted_stake_amount,
staking_response.snapshotted_self_staked_amount, staking_response.snapshotted_self_staked_amount,
staking_response.snapshotted_total_staked_amount, staking_response.snapshotted_total_staked_amount,
1000_000 1000_000
@ -101,39 +140,47 @@ defmodule Explorer.Staking.StakeSnapshotting do
} }
|> Map.merge( |> Map.merge(
Map.take(staking_response, [ Map.take(staking_response, [
:snapshotted_total_staked_amount, :mining_address_hash,
:snapshotted_self_staked_amount,
:total_staked_amount,
:self_staked_amount, :self_staked_amount,
:mining_address_hash :snapshotted_self_staked_amount,
:snapshotted_total_staked_amount,
:total_staked_amount
]) ])
) )
|> Map.merge( |> Map.merge(
Map.take(mining_response, [ Map.take(mining_response, [
:was_validator_count, :banned_until,
:was_banned_count, :was_banned_count,
:banned_until :was_validator_count
]) ])
) )
end) end)
delegator_entries = delegator_entries =
Enum.map(delegator_responses, fn {{pool_address, delegator_address, is_active}, response} -> Enum.map(staker_responses, fn {{pool_staking_address, staker_address}, response} ->
delegator_reward_response = delegator_reward_responses[{pool_address, delegator_address, is_active}] delegator_reward_response = delegator_reward_responses[{pool_staking_address, staker_address}]
# %{
# address_hash: staker_address,
# staking_address_hash: pool_staking_address,
# snapshotted_stake_amount: response.snapshotted_stake_amount,
# snapshotted_reward_ratio: Float.floor(delegator_reward_response.delegator_share / 10_000, 2)
# }
Map.merge(response, %{ Map.merge(response, %{
address_hash: delegator_address, address_hash: staker_address,
staking_address_hash: pool_address, staking_address_hash: pool_staking_address,
is_active: is_active,
snapshotted_reward_ratio: Float.floor(delegator_reward_response.delegator_share / 10_000, 2) snapshotted_reward_ratio: Float.floor(delegator_reward_response.delegator_share / 10_000, 2)
}) })
end) end)
Chain.import(%{ case Chain.import(%{
staking_pools: %{params: pool_entries, on_conflict: staking_pool_on_conflict()}, staking_pools: %{params: pool_entries, on_conflict: staking_pool_on_conflict()},
staking_pools_delegators: %{params: delegator_entries, on_conflict: staking_pools_delegator_on_conflict()}, staking_pools_delegators: %{params: delegator_entries, on_conflict: staking_pools_delegators_update()},
timeout: :infinity timeout: :infinity
}) }) do
{:ok, _} -> :ets.insert(ets_table_name, is_snapshotted: true)
_ -> Logger.error("Cannot finish snapshotting started at block #{block_number}")
end
end end
def staking_by_mining_requests(mining_address) do def staking_by_mining_requests(mining_address) do
@ -144,24 +191,34 @@ defmodule Explorer.Staking.StakeSnapshotting do
defp pool_staking_requests(staking_address, block_number) do defp pool_staking_requests(staking_address, block_number) do
[ [
snapshotted_total_staked_amount: {:staking, "stakeAmountTotal", [staking_address], block_number - 1},
snapshotted_self_staked_amount: {:staking, "stakeAmount", [staking_address, staking_address], block_number - 1},
total_staked_amount: {:staking, "stakeAmountTotal", [staking_address]}, total_staked_amount: {:staking, "stakeAmountTotal", [staking_address]},
self_staked_amount: {:staking, "stakeAmount", [staking_address, staking_address]}, self_staked_amount: {:staking, "stakeAmount", [staking_address, staking_address]},
mining_address_hash: {:validator_set, "miningByStakingAddress", [staking_address]}, mining_address_hash: {:validator_set, "miningByStakingAddress", [staking_address]},
active_delegators: {:staking, "poolDelegators", [staking_address]}, active_delegators: {:staking, "poolDelegators", [staking_address]},
inactive_delegators: {:staking, "poolDelegatorsInactive", [staking_address]} inactive_delegators: {:staking, "poolDelegatorsInactive", [staking_address]}
] ++ snapshotted_amounts_requests(staking_address, block_number)
end
defp snapshotted_amounts_requests(staking_address, block_number) do
[
snapshotted_total_staked_amount: {:staking, "stakeAmountTotal", [staking_address], block_number},
snapshotted_self_staked_amount: {:staking, "stakeAmount", [staking_address, staking_address], block_number}
] ]
end end
defp delegator_requests(pool_staking_address, delegator_address, block_number) do defp staker_requests(pool_staking_address, staker_address, block_number) do
[
max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, staker_address]},
max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, staker_address]},
ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, staker_address]},
ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, staker_address]},
stake_amount: {:staking, "stakeAmount", [pool_staking_address, staker_address]}
] ++ snapshotted_staker_amount_request(pool_staking_address, staker_address, block_number)
end
defp snapshotted_staker_amount_request(pool_staking_address, staker_address, block_number) do
[ [
stake_amount: {:staking, "stakeAmount", [pool_staking_address, delegator_address]}, snapshotted_stake_amount: {:staking, "stakeAmount", [pool_staking_address, staker_address], block_number}
snapshotted_stake_amount: {:staking, "stakeAmount", [pool_staking_address, delegator_address], block_number - 1},
ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, delegator_address]},
max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, delegator_address]},
max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, delegator_address]},
ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, delegator_address]}
] ]
end end
@ -199,22 +256,13 @@ defmodule Explorer.Staking.StakeSnapshotting do
) )
end end
defp staking_pools_delegator_on_conflict do defp staking_pools_delegators_update do
from( from(
delegator in StakingPoolsDelegator, delegator in StakingPoolsDelegator,
update: [ update: [
set: [ set: [
stake_amount: fragment("EXCLUDED.stake_amount"),
snapshotted_stake_amount: fragment("EXCLUDED.snapshotted_stake_amount"), snapshotted_stake_amount: fragment("EXCLUDED.snapshotted_stake_amount"),
ordered_withdraw: fragment("EXCLUDED.ordered_withdraw"),
max_withdraw_allowed: fragment("EXCLUDED.max_withdraw_allowed"),
max_ordered_withdraw_allowed: fragment("EXCLUDED.max_ordered_withdraw_allowed"),
ordered_withdraw_epoch: fragment("EXCLUDED.ordered_withdraw_epoch"),
reward_ratio: delegator.reward_ratio,
snapshotted_reward_ratio: fragment("EXCLUDED.snapshotted_reward_ratio"), snapshotted_reward_ratio: fragment("EXCLUDED.snapshotted_reward_ratio"),
is_active: delegator.is_active,
is_deleted: delegator.is_deleted,
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", delegator.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", delegator.updated_at) updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", delegator.updated_at)
] ]
] ]

Loading…
Cancel
Save