commit
7669662c14
@ -1,92 +1,125 @@ |
||||
DO $$ |
||||
DECLARE |
||||
row_count integer := 1; |
||||
batch_size integer := 50000; -- HOW MANY ITEMS WILL BE UPDATED AT TIME |
||||
iterator integer := batch_size; |
||||
affected integer; |
||||
total_count integer := 0; |
||||
completed_count integer := 0; |
||||
remaining_count integer := 0; |
||||
batch_size integer := 50000; -- HOW MANY ITEMS WILL BE UPDATED AT TIME |
||||
iterator integer := batch_size; |
||||
updated_count integer; |
||||
deleted_count integer; |
||||
start_time TIMESTAMP WITHOUT TIME ZONE := clock_timestamp(); |
||||
end_time TIMESTAMP WITHOUT TIME ZONE; |
||||
elapsed_time INTERVAL; |
||||
temp_start_time TIMESTAMP WITHOUT TIME ZONE; |
||||
temp_end_time TIMESTAMP WITHOUT TIME ZONE; |
||||
temp_elapsed_time INTERVAL; |
||||
update_start_time TIMESTAMP WITHOUT TIME ZONE; |
||||
update_end_time TIMESTAMP WITHOUT TIME ZONE; |
||||
update_elapsed_time INTERVAL; |
||||
per_row INTERVAL; |
||||
BEGIN |
||||
DROP TABLE IF EXISTS address_token_temp; |
||||
CREATE TEMP TABLE address_token_temp |
||||
RAISE NOTICE 'Started at %', start_time; |
||||
|
||||
temp_start_time := clock_timestamp(); |
||||
|
||||
DROP TABLE IF EXISTS correct_address_current_token_block_numbers; |
||||
CREATE TEMP TABLE correct_address_current_token_block_numbers |
||||
( |
||||
address_hash bytea NOT NULL, |
||||
token_contract_address_hash bytea NOT NULL, |
||||
address_hash bytea NOT NULL, |
||||
token_contract_address_hash bytea NOT NULL, |
||||
block_number bigint NOT NULL, |
||||
row_number integer |
||||
); |
||||
INSERT INTO address_token_temp |
||||
SELECT DISTINCT ON (address_hash, token_contract_address_hash) address_hash, |
||||
token_contract_address_hash, |
||||
ROW_NUMBER() OVER () |
||||
INSERT INTO correct_address_current_token_block_numbers |
||||
SELECT address_token_balances.address_hash, |
||||
address_token_balances.token_contract_address_hash, |
||||
MAX(address_token_balances.block_number), |
||||
ROW_NUMBER() OVER () |
||||
FROM address_token_balances |
||||
WHERE value IS NOT NULL |
||||
ORDER BY address_hash, token_contract_address_hash; |
||||
INNER JOIN address_current_token_balances |
||||
ON address_current_token_balances.address_hash = |
||||
address_token_balances.address_hash AND |
||||
address_current_token_balances.token_contract_address_hash = |
||||
address_token_balances.token_contract_address_hash |
||||
GROUP BY address_token_balances.address_hash, |
||||
address_token_balances.token_contract_address_hash, |
||||
address_current_token_balances.block_number |
||||
HAVING MAX(address_token_balances.block_number) != address_current_token_balances.block_number; |
||||
|
||||
temp_end_time := clock_timestamp(); |
||||
temp_elapsed_time := temp_end_time - temp_start_time; |
||||
total_count := (SELECT COUNT(*) FROM correct_address_current_token_block_numbers); |
||||
|
||||
RAISE NOTICE 'correct_address_current_token_block_numbers TEMP table filled in %', temp_elapsed_time; |
||||
|
||||
row_count := (SELECT COUNT(*) FROM address_token_temp); |
||||
RAISE NOTICE '% items to be updated', row_count; |
||||
remaining_count := total_count; |
||||
|
||||
RAISE NOTICE '% address_current_token_balances to be updated', remaining_count; |
||||
|
||||
update_start_time := clock_timestamp(); |
||||
|
||||
-- ITERATES THROUGH THE ITEMS UNTIL THE TEMP TABLE IS EMPTY |
||||
WHILE row_count > 0 |
||||
WHILE remaining_count > 0 |
||||
LOOP |
||||
UPDATE address_current_token_balances |
||||
SET block_number = new_address_current_token_balances.block_number, |
||||
value = new_address_current_token_balances.value, |
||||
inserted_at = new_address_current_token_balances.inserted_at, |
||||
updated_at = new_address_current_token_balances.updated_at |
||||
FROM ( |
||||
SELECT address_token_blocks.address_hash, |
||||
address_token_blocks.token_contract_address_hash, |
||||
address_token_blocks.block_number, |
||||
address_token_balances.value, |
||||
MIN(address_token_balances.inserted_at) OVER w AS inserted_at, |
||||
MAX(address_token_balances.updated_at) OVER w AS updated_at |
||||
FROM ( |
||||
SELECT address_token_batch.address_hash, |
||||
address_token_batch.token_contract_address_hash, |
||||
MAX(address_token_balances.block_number) AS block_number |
||||
FROM ( |
||||
SELECT address_hash, |
||||
token_contract_address_hash |
||||
FROM address_token_temp |
||||
WHERE address_token_temp.row_number <= iterator |
||||
) AS address_token_batch |
||||
INNER JOIN address_token_balances |
||||
ON address_token_balances.address_hash = address_token_batch.address_hash AND |
||||
address_token_balances.token_contract_address_hash = |
||||
address_token_batch.token_contract_address_hash |
||||
GROUP BY address_token_batch.address_hash, |
||||
address_token_batch.token_contract_address_hash |
||||
) AS address_token_blocks |
||||
INNER JOIN address_token_balances |
||||
ON address_token_balances.address_hash = address_token_blocks.address_hash AND |
||||
address_token_balances.token_contract_address_hash = |
||||
address_token_blocks.token_contract_address_hash AND |
||||
address_token_balances.block_number = address_token_blocks.block_number |
||||
WINDOW w AS (PARTITION BY address_token_balances.address_hash, address_token_balances.token_contract_address_hash) |
||||
) AS new_address_current_token_balances |
||||
WHERE new_address_current_token_balances.address_hash = address_current_token_balances.address_hash |
||||
SET block_number = correct_address_current_token_block_numbers.block_number, |
||||
value = address_token_balances.value, |
||||
updated_at = NOW() |
||||
FROM correct_address_current_token_block_numbers, |
||||
address_token_balances |
||||
WHERE correct_address_current_token_block_numbers.row_number <= iterator |
||||
AND |
||||
correct_address_current_token_block_numbers.address_hash = address_current_token_balances.address_hash |
||||
AND |
||||
correct_address_current_token_block_numbers.token_contract_address_hash = |
||||
address_current_token_balances.token_contract_address_hash |
||||
AND |
||||
address_current_token_balances.block_number < correct_address_current_token_block_numbers.block_number |
||||
AND |
||||
new_address_current_token_balances.token_contract_address_hash = |
||||
address_token_balances.address_hash = address_current_token_balances.address_hash |
||||
AND |
||||
address_token_balances.token_contract_address_hash = |
||||
address_current_token_balances.token_contract_address_hash |
||||
AND |
||||
(new_address_current_token_balances.block_number != address_current_token_balances.block_number OR |
||||
new_address_current_token_balances.value != address_current_token_balances.value); |
||||
address_token_balances.block_number = correct_address_current_token_block_numbers.block_number; |
||||
|
||||
GET DIAGNOSTICS affected = ROW_COUNT; |
||||
RAISE NOTICE '-> % address current token balances updated.', affected; |
||||
GET DIAGNOSTICS updated_count = ROW_COUNT; |
||||
RAISE NOTICE '-> % address current token balances updated.', updated_count; |
||||
|
||||
DELETE |
||||
FROM address_token_temp |
||||
WHERE address_token_temp.row_number <= iterator; |
||||
FROM correct_address_current_token_block_numbers |
||||
WHERE correct_address_current_token_block_numbers.row_number <= iterator; |
||||
|
||||
GET DIAGNOSTICS affected = ROW_COUNT; |
||||
RAISE NOTICE '-> % address tokens removed from queue.', affected; |
||||
GET DIAGNOSTICS deleted_count = ROW_COUNT; |
||||
RAISE NOTICE '-> % address tokens block numbers removed from queue.', deleted_count; |
||||
|
||||
-- COMMITS THE BATCH UPDATES |
||||
CHECKPOINT; |
||||
|
||||
-- UPDATES THE COUNTER SO IT DOESN'T TURN INTO AN INFINITE LOOP |
||||
row_count := (SELECT COUNT(*) FROM address_token_temp); |
||||
remaining_count := remaining_count - deleted_count; |
||||
iterator := iterator + batch_size; |
||||
RAISE NOTICE '-> % counter', row_count; |
||||
RAISE NOTICE '-> % remaining', remaining_count; |
||||
RAISE NOTICE '-> % next batch', iterator; |
||||
update_elapsed_time := clock_timestamp() - update_start_time; |
||||
completed_count := total_count - remaining_count; |
||||
per_row := update_elapsed_time / completed_count; |
||||
RAISE NOTICE '-> Estimated time until completion: %s', per_row * remaining_count; |
||||
END LOOP; |
||||
|
||||
end_time := clock_timestamp(); |
||||
update_end_time := end_time; |
||||
update_elapsed_time = update_end_time - update_start_time; |
||||
|
||||
IF total_count > 0 THEN |
||||
per_row := update_elapsed_time / total_count; |
||||
ELSE |
||||
per_row := 0; |
||||
END IF; |
||||
|
||||
RAISE NOTICE 'address_current_token_balances updated in % (% per row)', update_elapsed_time, per_row; |
||||
|
||||
elapsed_time := end_time - start_time; |
||||
|
||||
RAISE NOTICE 'Ended at %s', end_time; |
||||
RAISE NOTICE 'Elapsed time: %', elapsed_time; |
||||
END $$; |
||||
|
Loading…
Reference in new issue