Rechunk batch JSONRPC request when 413 is returned

When status 413 Request Entity Too Large is returned from JSONRPC, we
can split the batch in half and try again.  Continuing to break up the
chunks of the batch until we suceed or hit 1 request in the chunk, in
which case we still need to error.
pull/316/head
Luke Imhoff 7 years ago
parent ab3dff3c45
commit e44a617a55
  1. 77
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  2. 48
      apps/ethereum_jsonrpc/test/etheream_jsonrpc_test.exs
  3. 2
      coveralls.json

@ -192,13 +192,16 @@ defmodule EthereumJSONRPC do
* Handled response * Handled response
* `{:error, reason}` if POST failes * `{:error, reason}` if POST failes
""" """
def json_rpc(payload, url) when is_list(payload) do
chunked_json_rpc(url, [payload], config(:http), [])
end
def json_rpc(payload, url) do def json_rpc(payload, url) do
json = encode_json(payload) json = encode_json(payload)
headers = [{"Content-Type", "application/json"}]
case HTTPoison.post(url, json, headers, config(:http)) do case post(url, json, config(:http)) do
{:ok, %HTTPoison.Response{body: body, status_code: code}} -> {:ok, %HTTPoison.Response{body: body, status_code: code}} ->
body |> decode_json(payload, url) |> handle_response(code) body |> decode_json(code, json, url) |> handle_response(code)
{:error, %HTTPoison.Error{reason: reason}} -> {:error, %HTTPoison.Error{reason: reason}} ->
{:error, reason} {:error, reason}
@ -251,6 +254,50 @@ defmodule EthereumJSONRPC do
|> Timex.from_unix() |> Timex.from_unix()
end end
defp chunked_json_rpc(_url, [], _options, decoded_response_bodies) when is_list(decoded_response_bodies) do
list =
decoded_response_bodies
|> Enum.reverse()
|> List.flatten()
{:ok, list}
end
defp chunked_json_rpc(url, [batch | tail] = chunks, options, decoded_response_bodies)
when is_list(batch) and is_list(tail) and is_list(decoded_response_bodies) do
json = encode_json(batch)
case post(url, json, options) do
{:ok, %HTTPoison.Response{status_code: 413} = response} ->
rechunk_json_rpc(url, chunks, options, response, decoded_response_bodies)
{:ok, %HTTPoison.Response{body: body, status_code: status_code}} ->
decoded_body = decode_json(body, status_code, json, url)
chunked_json_rpc(url, tail, options, [decoded_body | decoded_response_bodies])
{:error, %HTTPoison.Error{reason: reason}} ->
{:error, reason}
end
end
defp rechunk_json_rpc(url, [batch | tail], options, response, decoded_response_bodies) do
case length(batch) do
# it can't be made any smaller
1 ->
Logger.error(fn ->
"413 Request Entity Too Large returned from single request batch. Cannot shrink batch further."
end)
{:error, response}
batch_size ->
split_size = div(batch_size, 2)
{first_chunk, second_chunk} = Enum.split(batch, split_size)
new_chunks = [first_chunk, second_chunk | tail]
chunked_json_rpc(url, new_chunks, options, decoded_response_bodies)
end
end
defp get_balance_requests(id_to_params) when is_map(id_to_params) do defp get_balance_requests(id_to_params) when is_map(id_to_params) do
Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} -> Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} ->
get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data})
@ -365,20 +412,23 @@ defmodule EthereumJSONRPC do
defp encode_json(data), do: Jason.encode_to_iodata!(data) defp encode_json(data), do: Jason.encode_to_iodata!(data)
defp decode_json(body, posted_payload, url) do defp decode_json(response_body, response_status_code, request_body, request_url) do
Jason.decode!(body) Jason.decode!(response_body)
rescue rescue
Jason.DecodeError -> Jason.DecodeError ->
Logger.error(""" Logger.error(fn ->
failed to decode json payload: """
failed to decode json payload:
url: #{inspect(url)} request url: #{inspect(request_url)}
body: #{inspect(body)} request body: #{inspect(request_body)}
posted payload: #{inspect(posted_payload)} response status code: #{inspect(response_status_code)}
""") response body: #{inspect(response_body)}
"""
end)
raise("bad jason") raise("bad jason")
end end
@ -413,7 +463,6 @@ defmodule EthereumJSONRPC do
defp handle_response(resp, 200) do defp handle_response(resp, 200) do
case resp do case resp do
[%{} | _] = batch_resp -> {:ok, batch_resp}
%{"error" => error} -> {:error, error} %{"error" => error} -> {:error, error}
%{"result" => result} -> {:ok, result} %{"result" => result} -> {:ok, result}
end end
@ -422,4 +471,8 @@ defmodule EthereumJSONRPC do
defp handle_response(resp, _status) do defp handle_response(resp, _status) do
{:error, resp} {:error, resp}
end end
defp post(url, json, options) do
HTTPoison.post(url, json, [{"Content-Type", "application/json"}], options)
end
end end

@ -79,4 +79,52 @@ defmodule EthereumJSONRPCTest do
]} ]}
end end
end end
describe "json_rpc/2" do
# regression test for https://github.com/poanetwork/poa-explorer/issues/254
test "transparently splits batch payloads that would trigger a 413 Request Entity Too Large" do
block_numbers = 0..13000
payload =
block_numbers
|> Stream.with_index()
|> Enum.map(&get_block_by_number_request/1)
assert_payload_too_large(payload)
url = EthereumJSONRPC.config(:url)
assert {:ok, responses} = EthereumJSONRPC.json_rpc(payload, url)
assert Enum.count(responses) == Enum.count(block_numbers)
block_number_set = MapSet.new(block_numbers)
response_block_number_set =
Enum.into(responses, MapSet.new(), fn %{"result" => %{"number" => quantity}} ->
EthereumJSONRPC.quantity_to_integer(quantity)
end)
assert MapSet.equal?(response_block_number_set, block_number_set)
end
end
defp assert_payload_too_large(payload) do
json = Jason.encode_to_iodata!(payload)
headers = [{"Content-Type", "application/json"}]
url = EthereumJSONRPC.config(:url)
assert {:ok, %HTTPoison.Response{body: body, status_code: 413}} =
HTTPoison.post(url, json, headers, EthereumJSONRPC.config(:http))
assert body =~ "413 Request Entity Too Large"
end
defp get_block_by_number_request({block_number, id}) do
%{
"id" => id,
"jsonrpc" => "2.0",
"method" => "eth_getBlockByNumber",
"params" => [EthereumJSONRPC.integer_to_quantity(block_number), true]
}
end
end end

@ -1,7 +1,7 @@
{ {
"coverage_options": { "coverage_options": {
"treat_no_relevant_lines_as_covered": true, "treat_no_relevant_lines_as_covered": true,
"minimum_coverage": 93.9 "minimum_coverage": 93.7
}, },
"terminal_options": { "terminal_options": {
"file_column_width": 120 "file_column_width": 120

Loading…
Cancel
Save