Run the transaction importer inline with the block importer

pull/2/head
Doc Ritezel 7 years ago
parent 8ae9881509
commit e9341c0b51
  1. 2
      lib/explorer/block_transaction.ex
  2. 2
      lib/explorer/from_address.ex
  3. 17
      lib/explorer/importers/block_importer.ex
  4. 76
      lib/explorer/importers/transaction_importer.ex
  5. 2
      lib/explorer/to_address.ex
  6. 4
      lib/explorer/workers/import_block.ex
  7. 4
      lib/explorer/workers/import_transaction.ex
  8. 46
      test/explorer/importers/block_importer_test.exs
  9. 40
      test/explorer/importers/transaction_importer_test.exs
  10. 30
      test/support/fixture/vcr_cassettes/block_importer_import_1_pending.json

@ -22,5 +22,7 @@ defmodule Explorer.BlockTransaction do
|> validate_required(@required_attrs)
|> cast_assoc(:block)
|> cast_assoc(:transaction)
|> unique_constraint(:transaction_id,
name: :block_transactions_transaction_id_index)
end
end

@ -17,5 +17,7 @@ defmodule Explorer.FromAddress do
def changeset(%FromAddress{} = to_address, attrs \\ %{}) do
to_address
|> cast(attrs, [:transaction_id, :address_id])
|> unique_constraint(:transaction_id,
name: :from_addresses_transaction_id_index)
end
end

@ -8,14 +8,21 @@ defmodule Explorer.BlockImporter do
alias Explorer.Repo.NewRelic, as: Repo
alias Explorer.Workers.ImportTransaction
@dialyzer {:nowarn_function, import: 1}
def import("pending") do
raw_block = download_block("pending")
Enum.map(raw_block["transactions"], &ImportTransaction.perform_later/1)
end
@dialyzer {:nowarn_function, import: 1}
def import(block_number) do
raw_block = download_block(block_number)
changes = extract_block(raw_block)
block = changes.hash |> find()
changes.hash |> find() |> Block.changeset(changes) |> Repo.insert_or_update!
if is_nil(block.id), do: block |> Block.changeset(changes) |> Repo.insert
import_transactions(raw_block["transactions"])
Enum.map(raw_block["transactions"], &ImportTransaction.perform/1)
end
def find(hash) do
@ -47,12 +54,6 @@ defmodule Explorer.BlockImporter do
}
end
def import_transactions(transactions) do
Enum.map(transactions, fn (transaction) ->
ImportTransaction.perform_later(transaction)
end)
end
def decode_integer_field(hex) do
{"0x", base_16} = String.split_at(hex, 2)
String.to_integer(base_16, 16)

@ -22,13 +22,18 @@ defmodule Explorer.TransactionImporter do
def persist_transaction(raw_transaction) do
changes = extract_attrs(raw_transaction)
found_transaction = changes.hash |> find()
changes.hash
|> find()
|> Transaction.changeset(changes)
|> Repo.insert_or_update!
transaction = case is_nil(found_transaction.id) do
true ->
found_transaction |> Transaction.changeset(changes) |> Repo.insert!
false -> found_transaction
end
to_address = raw_transaction["to"] || raw_transaction["creates"]
transaction
|> create_from_address(raw_transaction["from"])
|> create_to_address(raw_transaction["to"] || raw_transaction["creates"])
|> create_to_address(to_address)
|> create_block_transaction(raw_transaction["blockHash"])
end
@ -61,19 +66,24 @@ defmodule Explorer.TransactionImporter do
}
end
def create_block_transaction(transaction, block_hash) do
block = Repo.get_by(Block, hash: block_hash)
def create_block_transaction(transaction, hash) do
query = from t in Block,
where: fragment("lower(?)", t.hash) == ^String.downcase(hash),
limit: 1
block = query |> Repo.one()
if block do
block_transaction =
Repo.get_by(BlockTransaction, transaction_id: transaction.id) ||
%BlockTransaction{}
changes = %{block_id: block.id, transaction_id: transaction.id}
block_transaction
|>BlockTransaction.changeset(changes)
|> Repo.insert_or_update!
case Repo.get_by(BlockTransaction, transaction_id: transaction.id) do
nil ->
%BlockTransaction{}
|> BlockTransaction.changeset(changes)
|> Repo.insert
block_transaction ->
block_transaction
|> BlockTransaction.changeset(%{block_id: block.id})
|> Repo.update
end
end
transaction
@ -81,24 +91,36 @@ defmodule Explorer.TransactionImporter do
def create_from_address(transaction, hash) do
address = Address.find_or_create_by_hash(hash)
changes = %{transaction_id: transaction.id, address_id: address.id}
from_address = Repo.get_by(FromAddress, changes) || %FromAddress{}
from_address
|> FromAddress.changeset(changes)
|> Repo.insert_or_update!
changes = %{address_id: address.id, transaction_id: transaction.id}
case Repo.get_by(FromAddress, transaction_id: transaction.id) do
nil ->
%FromAddress{}
|> FromAddress.changeset(changes)
|> Repo.insert
from_address ->
from_address
|> FromAddress.changeset(%{address_id: address.id})
|> Repo.update
end
transaction
end
def create_to_address(transaction, hash) do
address = Address.find_or_create_by_hash(hash)
changes = %{transaction_id: transaction.id, address_id: address.id}
to_address = Repo.get_by(ToAddress, changes) || %ToAddress{}
to_address
|> ToAddress.changeset(changes)
|> Repo.insert_or_update!
changes = %{address_id: address.id, transaction_id: transaction.id}
case Repo.get_by(ToAddress, transaction_id: transaction.id) do
nil ->
%ToAddress{}
|> ToAddress.changeset(changes)
|> Repo.insert
to_address ->
to_address
|> ToAddress.changeset(%{address_id: address.id})
|> Repo.update
end
transaction
end

@ -17,5 +17,7 @@ defmodule Explorer.ToAddress do
def changeset(%ToAddress{} = to_address, attrs \\ %{}) do
to_address
|> cast(attrs, [:transaction_id, :address_id])
|> unique_constraint(:transaction_id,
name: :to_addresses_transaction_id_index)
end
end

@ -4,9 +4,7 @@ defmodule Explorer.Workers.ImportBlock do
@moduledoc "Imports blocks by web3 conventions."
@dialyzer {:nowarn_function, perform: 1}
def perform(number) do
BlockImporter.import("#{number}")
end
def perform(number), do: BlockImporter.import("#{number}")
@dialyzer {:nowarn_function, perform: 0}
def perform, do: perform("latest")

@ -4,9 +4,7 @@ defmodule Explorer.Workers.ImportTransaction do
alias Explorer.TransactionImporter
@dialyzer {:nowarn_function, perform: 1}
def perform(hash) do
TransactionImporter.import(hash)
end
def perform(hash), do: TransactionImporter.import(hash)
def perform_later(hash) do
Exq.enqueue(Exq.Enqueuer, "transactions", __MODULE__, [hash])

@ -4,14 +4,14 @@ defmodule Explorer.BlockImporterTest do
import Mock
alias Explorer.Block
alias Explorer.BlockImporter
alias Explorer.Transaction
alias Explorer.BlockImporter
alias Explorer.Workers.ImportTransaction
describe "import/1" do
test "imports and saves a block to the database" do
use_cassette "block_importer_import_1_saves_the_block" do
with_mock ImportTransaction, [perform_later: fn(block) -> block end] do
with_mock ImportTransaction, [perform: fn(_) -> {:ok} end] do
BlockImporter.import("0xc4f0d")
block = Block |> order_by(desc: :inserted_at) |> Repo.one()
@ -20,20 +20,39 @@ defmodule Explorer.BlockImporterTest do
end
end
test "when a block with the same hash is imported it updates the block" do
test "when a block with the same hash is imported it does not update the block" do
use_cassette "block_importer_import_1_duplicate_block" do
with_mock ImportTransaction, [perform_later: fn(hash) -> insert(:transaction, hash: hash) end] do
with_mock ImportTransaction, [perform: fn(hash) -> insert(:transaction, hash: hash) end] do
insert(:block, hash: "0x16cb43ccfb7875c14eb3f03bdc098e4af053160544270594fa429d256cbca64e", gas_limit: 5)
BlockImporter.import("0xc4f0d")
block = Repo.get_by(Block, hash: "0x16cb43ccfb7875c14eb3f03bdc098e4af053160544270594fa429d256cbca64e")
assert block.gas_limit == 8000000
assert block.gas_limit == 5
assert Block |> Repo.all |> Enum.count == 1
end
end
end
end
describe "import/1 pending" do
test "does not create a block" do
use_cassette "block_importer_import_1_pending" do
with_mock ImportTransaction, [perform_later: fn(_) -> {:ok} end] do
BlockImporter.import("pending")
assert Block |> Repo.all |> Enum.count == 0
end
end
end
test "when a block with the same hash is imported does not create a block" do
use_cassette "block_importer_import_1_pending" do
with_mock ImportTransaction, [perform_later: fn(_) -> insert(:transaction) end] do
BlockImporter.import("pending")
assert Transaction |> Repo.all |> Enum.count == 21
end
end
end
end
describe "find/1" do
test "returns an empty block when there is no block with the given hash" do
assert BlockImporter.find("0xC001") == %Block{}
@ -87,21 +106,6 @@ defmodule Explorer.BlockImporterTest do
end
end
describe "import_transactions/1" do
test "it enqueues workers that download each transaction" do
with_mock ImportTransaction, [perform_later: fn(hash) -> insert(:transaction, hash: hash) end] do
queue = BlockImporter.import_transactions([
"0x004bda8224214d277fc41be030fc55109afc662c5a87236de8990c8589f1c7b6",
"0x31d59a001b870543c2b34618aecfa8846f7c3e50e9e267119670b311c086db99",
])
last_transaction = Transaction |> order_by(desc: :inserted_at) |> limit(1) |> Repo.one
assert last_transaction.hash == "0x004bda8224214d277fc41be030fc55109afc662c5a87236de8990c8589f1c7b6"
assert queue |> Enum.count == 2
end
end
end
describe "decode_integer_field/1" do
test "returns the integer value of a hex value" do
assert(BlockImporter.decode_integer_field("0x7f2fb") == 520955)

@ -51,17 +51,17 @@ defmodule Explorer.TransactionImporterTest do
end
end
test "when it has previously been saved it updates the transaction" do
test "when the transaction has previously been saved does not update it" do
use_cassette "transaction_importer_updates_the_association" do
insert(:transaction, hash: "0x170baac4eca26076953370dd603c68eab340c0135b19b585010d3158a5dbbf23", gas: 5)
TransactionImporter.import("0x170baac4eca26076953370dd603c68eab340c0135b19b585010d3158a5dbbf23")
transaction = Transaction |> order_by(desc: :inserted_at) |> Repo.one
assert transaction.gas == Decimal.new(231855)
assert transaction.gas == Decimal.new(5)
end
end
test "when it has a block hash that's saved in the database it saves the association" do
test "binds an association to an existing block" do
use_cassette "transaction_importer_saves_the_association" do
block = insert(:block, hash: "0xfce13392435a8e7dab44c07d482212efb9dc39a9bea1915a9ead308b55a617f9")
TransactionImporter.import("0x64d851139325479c3bb7ccc6e6ab4cde5bc927dce6810190fe5d770a4c1ac333")
@ -82,7 +82,7 @@ defmodule Explorer.TransactionImporterTest do
end
end
test "it creates a from address" do
test "creates a from address" do
use_cassette "transaction_importer_creates_a_from_address" do
TransactionImporter.import("0xc445f5410912458c480d992dd93355ae3dad64d9f65db25a3cf43a9c609a2e0d")
@ -94,7 +94,33 @@ defmodule Explorer.TransactionImporterTest do
end
end
test "it creates a to address" do
test "binds an existing from address" do
insert(:address, hash: "0xa5b4b372112ab8dbbb48c8d0edd89227e24ec785")
use_cassette "transaction_importer_creates_a_from_address" do
TransactionImporter.import("0xc445f5410912458c480d992dd93355ae3dad64d9f65db25a3cf43a9c609a2e0d")
transaction = Transaction |> Repo.get_by(hash: "0xc445f5410912458c480d992dd93355ae3dad64d9f65db25a3cf43a9c609a2e0d")
address = Address |> Repo.get_by(hash: "0xa5b4b372112ab8dbbb48c8d0edd89227e24ec785")
from_address = FromAddress |> Repo.get_by(transaction_id: transaction.id, address_id: address.id)
assert from_address
end
end
test "creates a to address" do
use_cassette "transaction_importer_creates_a_to_address" do
TransactionImporter.import("0xdc533d4227734a7cacd75a069e8dc57ac571b865ed97bae5ea4cb74b54145f4c")
transaction = Transaction |> Repo.get_by(hash: "0xdc533d4227734a7cacd75a069e8dc57ac571b865ed97bae5ea4cb74b54145f4c")
address = Address |> Repo.get_by(hash: "0x24e5b8528fe83257d5fe3497ef616026713347f8")
to_address = ToAddress |> Repo.get_by(transaction_id: transaction.id, address_id: address.id)
assert(to_address)
end
end
test "binds an existing to address" do
insert(:address, hash: "0x24e5b8528fe83257d5fe3497ef616026713347f8")
use_cassette "transaction_importer_creates_a_to_address" do
TransactionImporter.import("0xdc533d4227734a7cacd75a069e8dc57ac571b865ed97bae5ea4cb74b54145f4c")
@ -106,7 +132,7 @@ defmodule Explorer.TransactionImporterTest do
end
end
test "it creates a to address using creates when to is nil" do
test "creates a to address using creates when to is nil" do
use_cassette "transaction_importer_creates_a_to_address_from_creates" do
TransactionImporter.import("0xdc533d4227734a7cacd75a069e8dc57ac571b865ed97bae5ea4cb74b54145f4c")
transaction = Transaction |> Repo.get_by(hash: "0xdc533d4227734a7cacd75a069e8dc57ac571b865ed97bae5ea4cb74b54145f4c")
@ -117,7 +143,7 @@ defmodule Explorer.TransactionImporterTest do
end
end
test "it's able to takes a map of transaction attributes" do
test "processes a map of transaction attributes" do
insert(:block, hash: "0xtakis")
TransactionImporter.import(Map.merge(@raw_transaction, %{"hash" => "0xmunchos", "blockHash" => "0xtakis"}))
last_transaction = Transaction |> order_by(desc: :inserted_at) |> limit(1) |> Repo.one()

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save