From 6c29571219dec5c42e27fb76e50894f108d44381 Mon Sep 17 00:00:00 2001 From: Vadim Date: Tue, 26 Nov 2019 17:21:26 +0300 Subject: [PATCH] Fix snapshotting --- .../lib/explorer/staking/contract_reader.ex | 14 +- .../lib/explorer/staking/contract_state.ex | 64 +++---- .../explorer/staking/stake_snapshotting.ex | 160 ++++++++++++------ 3 files changed, 143 insertions(+), 95 deletions(-) diff --git a/apps/explorer/lib/explorer/staking/contract_reader.ex b/apps/explorer/lib/explorer/staking/contract_reader.ex index 88f241846f..1ec4d440ef 100644 --- a/apps/explorer/lib/explorer/staking/contract_reader.ex +++ b/apps/explorer/lib/explorer/staking/contract_reader.ex @@ -20,8 +20,6 @@ defmodule Explorer.Staking.ContractReader do pools_likelihood: {:staking, "getPoolsLikelihood", []}, validators: {:validator_set, "getValidators", []}, unremovable_validator: {:validator_set, "unremovableValidator", []}, - # pending_validators: {:validator_set, "getPendingValidators", []}, - # be_finalized_validators: {:validator_set, "validatorsToBeFinalized", []}, validator_set_apply_block: {:validator_set, "validatorSetApplyBlock", []} ] end @@ -50,13 +48,13 @@ defmodule Explorer.Staking.ContractReader do ] end - def delegator_requests(pool_staking_address, delegator_address) do + def staker_requests(pool_staking_address, staker_address) do [ - stake_amount: {:staking, "stakeAmount", [pool_staking_address, delegator_address]}, - ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, delegator_address]}, - max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, delegator_address]}, - max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, delegator_address]}, - ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, delegator_address]} + max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, staker_address]}, + max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, staker_address]}, + ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, staker_address]}, + ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, staker_address]}, + stake_amount: {:staking, "stakeAmount", [pool_staking_address, staker_address]} ] end diff --git a/apps/explorer/lib/explorer/staking/contract_state.ex b/apps/explorer/lib/explorer/staking/contract_state.ex index 59c5237bfb..ec2c06d0db 100644 --- a/apps/explorer/lib/explorer/staking/contract_state.ex +++ b/apps/explorer/lib/explorer/staking/contract_state.ex @@ -120,6 +120,7 @@ defmodule Explorer.Staking.ContractState do end defp fetch_state(contracts, abi, block_number) do + # read general info from the contracts (including pool list and validator list) global_responses = ContractReader.perform_requests(ContractReader.global_requests(), contracts, abi) token = get_token(global_responses.token_contract_address) @@ -138,12 +139,14 @@ defmodule Explorer.Staking.ContractState do |> Map.to_list() |> Enum.concat(token: token) + # save the general info in ETS (excluding pool list and validator list) :ets.insert(@table_name, settings) pools = global_responses.active_pools ++ global_responses.inactive_pools is_validator = Enum.into(global_responses.validators, %{}, &{hash_to_string(&1), true}) unremovable_validator = global_responses.unremovable_validator + # read info about each pool from the contracts (including delegator list) pool_staking_responses = pools |> Enum.map(&ContractReader.pool_staking_requests/1) @@ -154,20 +157,23 @@ defmodule Explorer.Staking.ContractState do |> Enum.map(&ContractReader.pool_mining_requests(pool_staking_responses[&1].mining_address_hash)) |> ContractReader.perform_grouped_requests(pools, contracts, abi) - delegators = - Enum.flat_map(pool_staking_responses, fn {pool_address, responses} -> - [{pool_address, pool_address, true}] ++ - Enum.map(responses.active_delegators, &{pool_address, &1, true}) ++ - Enum.map(responses.inactive_delegators, &{pool_address, &1, false}) + # form a flat list of all stakers in the form {pool_staking_address, staker_address, is_active} + stakers = + Enum.flat_map(pool_staking_responses, fn {pool_staking_address, resp} -> + [{pool_staking_address, pool_staking_address, true}] ++ + Enum.map(resp.active_delegators, &{pool_staking_address, &1, true}) ++ + Enum.map(resp.inactive_delegators, &{pool_staking_address, &1, false}) end) - delegator_responses = - delegators - |> Enum.map(fn {pool_address, delegator_address, _} -> - ContractReader.delegator_requests(pool_address, delegator_address) + # get amounts for each of the stakers + staker_responses = + stakers + |> Enum.map(fn {pool_staking_address, staker_address, _} -> + ContractReader.staker_requests(pool_staking_address, staker_address) end) - |> ContractReader.perform_grouped_requests(delegators, contracts, abi) + |> ContractReader.perform_grouped_requests(stakers, contracts, abi) + # calculate total amount staked into all active pools staked_total = Enum.sum(for {_, pool} <- pool_staking_responses, pool.is_active, do: pool.total_staked_amount) [likelihood_values, total_likelihood] = global_responses.pools_likelihood @@ -177,7 +183,7 @@ defmodule Explorer.Staking.ContractState do |> Enum.zip(likelihood_values) |> Enum.into(%{}) - pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _response} -> key end) + pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _} -> key end) pool_reward_responses = pool_staking_responses @@ -191,10 +197,10 @@ defmodule Explorer.Staking.ContractState do end) |> ContractReader.perform_grouped_requests(pool_staking_keys, contracts, abi) - delegator_keys = Enum.map(delegator_responses, fn {key, _response} -> key end) + delegator_keys = Enum.map(staker_responses, fn {key, _} -> key end) delegator_reward_responses = - delegator_responses + staker_responses |> Enum.map(fn {{pool_address, _, _}, response} -> staking_response = pool_staking_responses[pool_address] @@ -231,26 +237,26 @@ defmodule Explorer.Staking.ContractState do } |> Map.merge( Map.take(staking_response, [ - :mining_address_hash, :is_active, - :total_staked_amount, - :self_staked_amount + :mining_address_hash, + :self_staked_amount, + :total_staked_amount ]) ) |> Map.merge( Map.take(mining_response, [ - :was_validator_count, - :is_banned, :are_delegators_banned, - :banned_until, :banned_delegators_until, - :was_banned_count + :banned_until, + :is_banned, + :was_banned_count, + :was_validator_count ]) ) end) delegator_entries = - Enum.map(delegator_responses, fn {{pool_address, delegator_address, is_active}, response} -> + Enum.map(staker_responses, fn {{pool_address, delegator_address, is_active}, response} -> delegator_reward_response = delegator_reward_responses[{pool_address, delegator_address, is_active}] Map.merge(response, %{ @@ -269,16 +275,12 @@ defmodule Explorer.Staking.ContractState do }) if global_responses.epoch_start_block == block_number + 1 do - with( - true <- :ets.insert(@table_name, is_snapshotted: false), - {:ok, _} <- - StakeSnapshotting.start_snapshotting( - %{contracts: contracts, abi: abi, global_responses: global_responses}, - block_number - ) - ) do - :ets.insert(@table_name, is_snapshotted: true) - end + spawn(StakeSnapshotting, :do_snapshotting, [ + %{contracts: contracts, abi: abi, epoch_number: global_responses.epoch_number, ets_table_name: @table_name}, + pool_staking_responses, + Map.new(Enum.map(staker_responses, fn {key, resp} -> {pool_staking_address, staker_address, _} = key; {{pool_staking_address, staker_address}, resp} end)), + block_number # the last block of the finished staking epoch + ]) end Publisher.broadcast(:staking_update) diff --git a/apps/explorer/lib/explorer/staking/stake_snapshotting.ex b/apps/explorer/lib/explorer/staking/stake_snapshotting.ex index 4b50f1fc73..75845b8dbb 100644 --- a/apps/explorer/lib/explorer/staking/stake_snapshotting.ex +++ b/apps/explorer/lib/explorer/staking/stake_snapshotting.ex @@ -14,7 +14,15 @@ defmodule Explorer.Staking.StakeSnapshotting do alias Explorer.SmartContract.Reader alias Explorer.Staking.ContractReader - def start_snapshotting(%{contracts: contracts, abi: abi, global_responses: global_responses}, block_number) do + def do_snapshotting( + %{contracts: contracts, abi: abi, epoch_number: epoch_number, ets_table_name: ets_table_name}, + cached_pool_staking_responses, + cached_staker_responses, + block_number + ) do + :ets.insert(ets_table_name, is_snapshotted: false) + + # get the list of pending validators %{ "getPendingValidators" => {:ok, [pending_validators]}, "validatorsToBeFinalized" => {:ok, [to_be_finalized_validators]} @@ -26,6 +34,7 @@ defmodule Explorer.Staking.StakeSnapshotting do pool_mining_addresses = Enum.uniq(pending_validators ++ to_be_finalized_validators) + # get staking addresses for the pending validators pool_staking_addresses = pool_mining_addresses |> Enum.map(&staking_by_mining_requests/1) @@ -33,54 +42,84 @@ defmodule Explorer.Staking.StakeSnapshotting do |> Enum.flat_map(fn {_, value} -> value end) |> Enum.map(fn {_key, staking_address_hash} -> decode_data(staking_address_hash) end) + # get snapshotted amounts and other pool info for each pending validator. + # use `cached_pool_staking_responses` when possible pool_staking_responses = pool_staking_addresses - |> Enum.map(fn address_hashe -> pool_staking_requests(address_hashe, block_number) end) - |> ContractReader.perform_grouped_requests(pool_staking_addresses, contracts, abi) + |> Enum.map(fn address_hash -> + case Map.fetch(cached_pool_staking_responses, address_hash) do + {:ok, resp} -> + Map.merge(resp, ContractReader.perform_requests(snapshotted_amounts_requests(address_hash, block_number), contracts, abi)) + :error -> + ContractReader.perform_requests(pool_staking_requests(address_hash, block_number), contracts, abi) + end + end) + |> Enum.zip(pool_staking_addresses) + |> Map.new(fn {key, val} -> {val, key} end) pool_mining_responses = pool_staking_addresses |> Enum.map(&ContractReader.pool_mining_requests(pool_staking_responses[&1].mining_address_hash)) |> ContractReader.perform_grouped_requests(pool_staking_addresses, contracts, abi) - delegators = - Enum.flat_map(pool_staking_responses, fn {pool_address, responses} -> - [{pool_address, pool_address, true}] ++ - Enum.map(responses.active_delegators, &{pool_address, &1, true}) ++ - Enum.map(responses.inactive_delegators, &{pool_address, &1, false}) + # form a flat list of all stakers in the form {pool_staking_address, staker_address} + stakers = + Enum.flat_map(pool_staking_responses, fn {pool_staking_address, resp} -> + [{pool_staking_address, pool_staking_address}] ++ + Enum.map(resp.active_delegators, &{pool_staking_address, &1}) ++ + Enum.map(resp.inactive_delegators, &{pool_staking_address, &1}) end) - delegator_responses = - delegators - |> Enum.map(fn {pool_address, delegator_address, _} -> - delegator_requests(pool_address, delegator_address, block_number) + # get amounts for each of the stakers + # use `cached_staker_responses` when possible + staker_responses = + stakers + |> Enum.map(fn {pool_staking_address, staker_address} = key -> + case Map.fetch(cached_staker_responses, key) do + {:ok, resp} -> + Map.merge( + resp, + ContractReader.perform_requests( + snapshotted_staker_amount_request(pool_staking_address, staker_address, block_number), + contracts, + abi + ) + ) + :error -> + ContractReader.perform_requests( + staker_requests(pool_staking_address, staker_address, block_number), + contracts, + abi + ) + end end) - |> ContractReader.perform_grouped_requests(delegators, contracts, abi) + |> Enum.zip(stakers) + |> Map.new(fn {key, val} -> {val, key} end) - pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _response} -> key end) + pool_staking_keys = Enum.map(pool_staking_responses, fn {key, _} -> key end) pool_reward_responses = pool_staking_responses - |> Enum.map(fn {_address, response} -> + |> Enum.map(fn {_address, resp} -> ContractReader.validator_reward_requests([ - global_responses.epoch_number, - response.snapshotted_self_staked_amount, - response.snapshotted_total_staked_amount, + epoch_number, + resp.snapshotted_self_staked_amount, + resp.snapshotted_total_staked_amount, 1000_000 ]) end) |> ContractReader.perform_grouped_requests(pool_staking_keys, contracts, abi) - delegator_keys = Enum.map(delegator_responses, fn {key, _response} -> key end) + delegator_keys = Enum.map(staker_responses, fn {key, _} -> key end) delegator_reward_responses = - delegator_responses - |> Enum.map(fn {{pool_address, _delegator_address, _}, response} -> + staker_responses + |> Enum.map(fn {{pool_address, _delegator_address}, response} -> staking_response = pool_staking_responses[pool_address] ContractReader.delegator_reward_requests([ - global_responses.epoch_number, - response.stake_amount, + epoch_number, + response.snapshotted_stake_amount, staking_response.snapshotted_self_staked_amount, staking_response.snapshotted_total_staked_amount, 1000_000 @@ -101,39 +140,47 @@ defmodule Explorer.Staking.StakeSnapshotting do } |> Map.merge( Map.take(staking_response, [ - :snapshotted_total_staked_amount, - :snapshotted_self_staked_amount, - :total_staked_amount, + :mining_address_hash, :self_staked_amount, - :mining_address_hash + :snapshotted_self_staked_amount, + :snapshotted_total_staked_amount, + :total_staked_amount ]) ) |> Map.merge( Map.take(mining_response, [ - :was_validator_count, + :banned_until, :was_banned_count, - :banned_until + :was_validator_count ]) ) end) delegator_entries = - Enum.map(delegator_responses, fn {{pool_address, delegator_address, is_active}, response} -> - delegator_reward_response = delegator_reward_responses[{pool_address, delegator_address, is_active}] + Enum.map(staker_responses, fn {{pool_staking_address, staker_address}, response} -> + delegator_reward_response = delegator_reward_responses[{pool_staking_address, staker_address}] + # %{ + # address_hash: staker_address, + # staking_address_hash: pool_staking_address, + # snapshotted_stake_amount: response.snapshotted_stake_amount, + # snapshotted_reward_ratio: Float.floor(delegator_reward_response.delegator_share / 10_000, 2) + # } Map.merge(response, %{ - address_hash: delegator_address, - staking_address_hash: pool_address, - is_active: is_active, + address_hash: staker_address, + staking_address_hash: pool_staking_address, snapshotted_reward_ratio: Float.floor(delegator_reward_response.delegator_share / 10_000, 2) }) end) - Chain.import(%{ + case Chain.import(%{ staking_pools: %{params: pool_entries, on_conflict: staking_pool_on_conflict()}, - staking_pools_delegators: %{params: delegator_entries, on_conflict: staking_pools_delegator_on_conflict()}, + staking_pools_delegators: %{params: delegator_entries, on_conflict: staking_pools_delegators_update()}, timeout: :infinity - }) + }) do + {:ok, _} -> :ets.insert(ets_table_name, is_snapshotted: true) + _ -> Logger.error("Cannot finish snapshotting started at block #{block_number}") + end end def staking_by_mining_requests(mining_address) do @@ -144,24 +191,34 @@ defmodule Explorer.Staking.StakeSnapshotting do defp pool_staking_requests(staking_address, block_number) do [ - snapshotted_total_staked_amount: {:staking, "stakeAmountTotal", [staking_address], block_number - 1}, - snapshotted_self_staked_amount: {:staking, "stakeAmount", [staking_address, staking_address], block_number - 1}, total_staked_amount: {:staking, "stakeAmountTotal", [staking_address]}, self_staked_amount: {:staking, "stakeAmount", [staking_address, staking_address]}, mining_address_hash: {:validator_set, "miningByStakingAddress", [staking_address]}, active_delegators: {:staking, "poolDelegators", [staking_address]}, inactive_delegators: {:staking, "poolDelegatorsInactive", [staking_address]} + ] ++ snapshotted_amounts_requests(staking_address, block_number) + end + + defp snapshotted_amounts_requests(staking_address, block_number) do + [ + snapshotted_total_staked_amount: {:staking, "stakeAmountTotal", [staking_address], block_number}, + snapshotted_self_staked_amount: {:staking, "stakeAmount", [staking_address, staking_address], block_number} ] end - defp delegator_requests(pool_staking_address, delegator_address, block_number) do + defp staker_requests(pool_staking_address, staker_address, block_number) do + [ + max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, staker_address]}, + max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, staker_address]}, + ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, staker_address]}, + ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, staker_address]}, + stake_amount: {:staking, "stakeAmount", [pool_staking_address, staker_address]} + ] ++ snapshotted_staker_amount_request(pool_staking_address, staker_address, block_number) + end + + defp snapshotted_staker_amount_request(pool_staking_address, staker_address, block_number) do [ - stake_amount: {:staking, "stakeAmount", [pool_staking_address, delegator_address]}, - snapshotted_stake_amount: {:staking, "stakeAmount", [pool_staking_address, delegator_address], block_number - 1}, - ordered_withdraw: {:staking, "orderedWithdrawAmount", [pool_staking_address, delegator_address]}, - max_withdraw_allowed: {:staking, "maxWithdrawAllowed", [pool_staking_address, delegator_address]}, - max_ordered_withdraw_allowed: {:staking, "maxWithdrawOrderAllowed", [pool_staking_address, delegator_address]}, - ordered_withdraw_epoch: {:staking, "orderWithdrawEpoch", [pool_staking_address, delegator_address]} + snapshotted_stake_amount: {:staking, "stakeAmount", [pool_staking_address, staker_address], block_number} ] end @@ -199,22 +256,13 @@ defmodule Explorer.Staking.StakeSnapshotting do ) end - defp staking_pools_delegator_on_conflict do + defp staking_pools_delegators_update do from( delegator in StakingPoolsDelegator, update: [ set: [ - stake_amount: fragment("EXCLUDED.stake_amount"), snapshotted_stake_amount: fragment("EXCLUDED.snapshotted_stake_amount"), - ordered_withdraw: fragment("EXCLUDED.ordered_withdraw"), - max_withdraw_allowed: fragment("EXCLUDED.max_withdraw_allowed"), - max_ordered_withdraw_allowed: fragment("EXCLUDED.max_ordered_withdraw_allowed"), - ordered_withdraw_epoch: fragment("EXCLUDED.ordered_withdraw_epoch"), - reward_ratio: delegator.reward_ratio, snapshotted_reward_ratio: fragment("EXCLUDED.snapshotted_reward_ratio"), - is_active: delegator.is_active, - is_deleted: delegator.is_deleted, - inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", delegator.inserted_at), updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", delegator.updated_at) ] ]