parent
ff81c21d44
commit
bf45faa513
@ -0,0 +1,117 @@ |
||||
defmodule Explorer.Market.History.Cataloger do |
||||
@moduledoc """ |
||||
Fetches the daily market history. |
||||
|
||||
Market grabs the last 365 day's worth of market history for the configured |
||||
coin in the explorer. Once that data is fectched, current day's values are |
||||
checked every 60 minutes. Additionally, failed requests to the history |
||||
source will follow exponential backoff `100ms * 2^(n+1)` where `n` is the |
||||
number of failed requests. |
||||
|
||||
## Configuration |
||||
|
||||
The following example shows the configurable values in a sample config. |
||||
|
||||
config :explorer, Explorer.Market.History.Cataloger, |
||||
# fetch interval in milliseconds |
||||
history_fetch_interval: :timer.minutes(60), |
||||
# Base backoff in milliseconds for failed requets to history API |
||||
base_backoff: 100 |
||||
|
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Market |
||||
|
||||
@typep milliseconds :: non_neg_integer() |
||||
|
||||
## GenServer callbacks |
||||
|
||||
@impl GenServer |
||||
def init(:ok) do |
||||
send(self(), {:fetch_history, 365}) |
||||
|
||||
{:ok, %{}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info({:fetch_history, day_count}, state) do |
||||
fetch_history(day_count) |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
@impl GenServer |
||||
# Record fetch successful. |
||||
def handle_info({_ref, {_, _, {:ok, records}}}, state) do |
||||
Market.bulk_insert_history(records) |
||||
|
||||
# Schedule next check for history |
||||
fetch_after = config_or_default(:history_fetch_interval, :timer.minutes(60)) |
||||
Process.send_after(self(), {:fetch_history, 1}, fetch_after) |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
# Failed to get records. Try again. |
||||
@impl GenServer |
||||
def handle_info({_ref, {day_count, failed_attempts, :error}}, state) do |
||||
Logger.warn(fn -> "Failed to fetch market history. Trying again." end) |
||||
|
||||
fetch_history(day_count, failed_attempts + 1) |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
# Callback that a monitored process has shutdown. |
||||
@impl GenServer |
||||
def handle_info({:DOWN, _, :process, _, _}, state) do |
||||
{:noreply, state} |
||||
end |
||||
|
||||
@doc """ |
||||
Starts a process to continually fetch market history. |
||||
""" |
||||
@spec start_link(term()) :: GenServer.on_start() |
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
## Private Functions |
||||
|
||||
@spec base_backoff :: milliseconds() |
||||
defp base_backoff do |
||||
config_or_default(:base_backoff, 100) |
||||
end |
||||
|
||||
@spec config_or_default(atom(), term()) :: term() |
||||
defp config_or_default(key, default) do |
||||
Application.get_env(:explorer, __MODULE__, [])[key] || default |
||||
end |
||||
|
||||
@spec source() :: module() |
||||
defp source do |
||||
config_or_default(:source, Explorer.Market.History.Source.CryptoCompare) |
||||
end |
||||
|
||||
@spec fetch_history(non_neg_integer(), non_neg_integer()) :: Task.t() |
||||
defp fetch_history(day_count, failed_attempts \\ 0) do |
||||
Task.Supervisor.async_nolink(Explorer.MarketTaskSupervisor, fn -> |
||||
Process.sleep(delay(failed_attempts)) |
||||
{day_count, failed_attempts, source().fetch_history(day_count)} |
||||
end) |
||||
end |
||||
|
||||
@spec delay(non_neg_integer()) :: milliseconds() |
||||
defp delay(0), do: 0 |
||||
defp delay(1), do: base_backoff() |
||||
|
||||
defp delay(failed_attempts) do |
||||
# Simulates 2^n |
||||
multiplier = Enum.reduce(2..failed_attempts, 1, fn _, acc -> 2 * acc end) |
||||
multiplier * base_backoff() |
||||
end |
||||
end |
@ -0,0 +1,19 @@ |
||||
defmodule Explorer.Market.History.Source do |
||||
@moduledoc """ |
||||
Interface for a source that allows for fetching of market history. |
||||
""" |
||||
|
||||
@typedoc """ |
||||
Record of market values for a specific date. |
||||
""" |
||||
@type record :: %{ |
||||
closing_price: Decimal.t(), |
||||
date: Date.t(), |
||||
opening_price: Decimal.t() |
||||
} |
||||
|
||||
@doc """ |
||||
Fetch history for a specified amount of days in the past. |
||||
""" |
||||
@callback fetch_history(previous_days :: non_neg_integer()) :: {:ok, [record()]} | :error |
||||
end |
@ -0,0 +1,75 @@ |
||||
defmodule Explorer.Market.History.Source.CryptoCompare do |
||||
@moduledoc """ |
||||
Adapter for fetching market history from https://cryptocompare.com. |
||||
|
||||
The history is fetched for the configured coin. You can specify a |
||||
different coin by changing the targeted coin. |
||||
|
||||
# In config.exs |
||||
config :explorer, coin: "POA" |
||||
|
||||
""" |
||||
|
||||
alias Explorer.Market.History.Source |
||||
alias HTTPoison.Response |
||||
|
||||
@behaviour Source |
||||
|
||||
@typep unix_timestamp :: non_neg_integer() |
||||
|
||||
@impl Source |
||||
def fetch_history(previous_days) do |
||||
url = history_url(previous_days) |
||||
headers = [{"Content-Type", "application/json"}] |
||||
|
||||
case HTTPoison.get(url, headers) do |
||||
{:ok, %Response{body: body, status_code: 200}} -> |
||||
{:ok, format_data(body)} |
||||
|
||||
_ -> |
||||
:error |
||||
end |
||||
end |
||||
|
||||
@spec base_url :: String.t() |
||||
defp base_url do |
||||
configured_url = Application.get_env(:explorer, __MODULE__, [])[:base_url] |
||||
configured_url || "https://min-api.cryptocompare.com" |
||||
end |
||||
|
||||
@spec configured_coin :: String.t() |
||||
defp configured_coin do |
||||
Application.get_env(:explorer, :coin) |
||||
end |
||||
|
||||
@spec date(unix_timestamp()) :: Date.t() |
||||
defp date(unix_timestamp) do |
||||
unix_timestamp |
||||
|> DateTime.from_unix!() |
||||
|> DateTime.to_date() |
||||
end |
||||
|
||||
@spec format_data(String.t()) :: [Source.record()] |
||||
defp format_data(data) do |
||||
json = Jason.decode!(data) |
||||
|
||||
for item <- json["Data"] do |
||||
%{ |
||||
closing_price: Decimal.new(item["close"]), |
||||
date: date(item["time"]), |
||||
opening_price: Decimal.new(item["open"]) |
||||
} |
||||
end |
||||
end |
||||
|
||||
@spec history_url(non_neg_integer()) :: String.t() |
||||
defp history_url(previous_days) do |
||||
query_params = %{ |
||||
"fsym" => configured_coin(), |
||||
"limit" => previous_days, |
||||
"tsym" => "USD" |
||||
} |
||||
|
||||
"#{base_url()}/data/histoday?#{URI.encode_query(query_params)}" |
||||
end |
||||
end |
@ -0,0 +1,34 @@ |
||||
defmodule Explorer.Market do |
||||
@moduledoc """ |
||||
Context for data related to the cryptocurrency market. |
||||
""" |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Market.MarketHistory |
||||
alias Explorer.Repo |
||||
|
||||
@doc """ |
||||
Retrieves the history for the recent specified amount of days. |
||||
|
||||
Today's date is include as part of the day count |
||||
""" |
||||
@spec fetch_recent_history(non_neg_integer()) :: [MarketHistory.t()] |
||||
def fetch_recent_history(days) when days >= 1 do |
||||
day_diff = days * -1 |
||||
|
||||
query = |
||||
from( |
||||
mh in MarketHistory, |
||||
where: mh.date > date_add(^Date.utc_today(), ^day_diff, "day"), |
||||
order_by: [desc: mh.date] |
||||
) |
||||
|
||||
Repo.all(query) |
||||
end |
||||
|
||||
@doc false |
||||
def bulk_insert_history(records) do |
||||
Repo.insert_all(MarketHistory, records, on_conflict: :replace_all, conflict_target: [:date]) |
||||
end |
||||
end |
@ -0,0 +1,25 @@ |
||||
defmodule Explorer.Market.MarketHistory do |
||||
@moduledoc """ |
||||
Represents market history of configured coin to USD. |
||||
""" |
||||
use Ecto.Schema |
||||
|
||||
schema "market_history" do |
||||
field(:closing_price, :decimal) |
||||
field(:date, :date) |
||||
field(:opening_price, :decimal) |
||||
end |
||||
|
||||
@typedoc """ |
||||
The recorded values of the configured coin to USD for a single day. |
||||
|
||||
* `:closing_price` - Closing price in USD. |
||||
* `:date` - The date in UTC. |
||||
* `:opening_price` - Opening price in USD. |
||||
""" |
||||
@type t :: %__MODULE__{ |
||||
closing_price: Decimal.t(), |
||||
date: Date.t(), |
||||
opening_price: Decimal.t() |
||||
} |
||||
end |
@ -0,0 +1,13 @@ |
||||
defmodule Explorer.Repo.Migrations.CreateMarketHistory do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
create table(:market_history) do |
||||
add :date, :date |
||||
add :closing_price, :decimal |
||||
add :opening_price, :decimal |
||||
end |
||||
|
||||
create unique_index(:market_history, :date) |
||||
end |
||||
end |
@ -0,0 +1,62 @@ |
||||
defmodule Explorer.Market.History.CatalogerTest do |
||||
use Explorer.DataCase, async: false |
||||
|
||||
import Mox |
||||
|
||||
alias Explorer.Market.MarketHistory |
||||
alias Explorer.Market.History.Cataloger |
||||
alias Explorer.Market.History.Source.TestSource |
||||
alias Explorer.Repo |
||||
|
||||
setup do |
||||
Application.put_env(:explorer, Cataloger, source: TestSource) |
||||
:ok |
||||
end |
||||
|
||||
test "init" do |
||||
assert {:ok, %{}} == Cataloger.init(:ok) |
||||
assert_received {:fetch_history, 365} |
||||
end |
||||
|
||||
test "handle_info with `{:fetch_history, days}`" do |
||||
records = [%{date: ~D[2018-04-01], closing_price: Decimal.new(10), opening_price: Decimal.new(5)}] |
||||
expect(TestSource, :fetch_history, fn 1 -> {:ok, records} end) |
||||
set_mox_global() |
||||
state = %{} |
||||
|
||||
assert {:noreply, state} == Cataloger.handle_info({:fetch_history, 1}, state) |
||||
assert_receive {_ref, {1, 0, {:ok, ^records}}} |
||||
end |
||||
|
||||
test "handle_info with successful task" do |
||||
Application.put_env(:explorer, Cataloger, history_fetch_interval: 1) |
||||
record = %{date: ~D[2018-04-01], closing_price: Decimal.new(10), opening_price: Decimal.new(5)} |
||||
state = %{} |
||||
|
||||
assert {:noreply, state} == Cataloger.handle_info({nil, {1, 0, {:ok, [record]}}}, state) |
||||
assert_receive {:fetch_history, 1} |
||||
assert Repo.get_by(MarketHistory, date: record.date) |
||||
end |
||||
|
||||
@tag capture_log: true |
||||
test "handle_info with failed task" do |
||||
state = %{} |
||||
test_pid = self() |
||||
expect(TestSource, :fetch_history, fn 10 -> send(test_pid, :retry) end) |
||||
set_mox_global() |
||||
|
||||
assert {:noreply, state} == Cataloger.handle_info({nil, {10, 0, :error}}, state) |
||||
# Back off check |
||||
refute_receive :retry, 100 |
||||
assert_receive :retry, 300 |
||||
end |
||||
|
||||
test "handle info for DOWN message" do |
||||
assert {:noreply, %{}} == Cataloger.handle_info({:DOWN, nil, :process, nil, nil}, %{}) |
||||
end |
||||
|
||||
@tag capture_log: true |
||||
test "start_link" do |
||||
assert {:ok, _} = Cataloger.start_link([]) |
||||
end |
||||
end |
@ -0,0 +1,90 @@ |
||||
defmodule Explorer.Market.History.Source.CryptoCompareTest do |
||||
use ExUnit.Case, async: false |
||||
|
||||
alias Explorer.Market.History.Source.CryptoCompare |
||||
alias Plug.Conn |
||||
|
||||
@json """ |
||||
{ |
||||
"Response": "Success", |
||||
"Type": 100, |
||||
"Aggregated": false, |
||||
"Data": [ |
||||
{ |
||||
"time": 1524528000, |
||||
"close": 9655.77, |
||||
"high": 9741.91, |
||||
"low": 8957.68, |
||||
"open": 8967.86, |
||||
"volumefrom": 136352.05, |
||||
"volumeto": 1276464750.74 |
||||
}, |
||||
{ |
||||
"time": 1524614400, |
||||
"close": 8873.62, |
||||
"high": 9765.23, |
||||
"low": 8757.06, |
||||
"open": 9657.69, |
||||
"volumefrom": 192797.41, |
||||
"volumeto": 1779806222.98 |
||||
}, |
||||
{ |
||||
"time": 1524700800, |
||||
"close": 8804.32, |
||||
"high": 8965.84, |
||||
"low": 8669.38, |
||||
"open": 8873.57, |
||||
"volumefrom": 74704.5, |
||||
"volumeto": 661168891 |
||||
} |
||||
], |
||||
"TimeTo": 1524700800, |
||||
"TimeFrom": 1523836800, |
||||
"FirstValueInArray": true, |
||||
"ConversionType": { |
||||
"type": "direct", |
||||
"conversionSymbol": "" |
||||
} |
||||
} |
||||
""" |
||||
|
||||
describe "fetch_history/1" do |
||||
setup do |
||||
bypass = Bypass.open() |
||||
Application.put_env(:explorer, CryptoCompare, base_url: "http://localhost:#{bypass.port}") |
||||
|
||||
{:ok, bypass: bypass} |
||||
end |
||||
|
||||
test "with successful request", %{bypass: bypass} do |
||||
Bypass.expect(bypass, fn conn -> Conn.resp(conn, 200, @json) end) |
||||
|
||||
expected = [ |
||||
%{ |
||||
closing_price: Decimal.new(9655.77), |
||||
date: ~D[2018-04-24], |
||||
opening_price: Decimal.new(8967.86) |
||||
}, |
||||
%{ |
||||
closing_price: Decimal.new(8873.62), |
||||
date: ~D[2018-04-25], |
||||
opening_price: Decimal.new(9657.69) |
||||
}, |
||||
%{ |
||||
closing_price: Decimal.new(8804.32), |
||||
date: ~D[2018-04-26], |
||||
opening_price: Decimal.new(8873.57) |
||||
} |
||||
] |
||||
|
||||
assert {:ok, expected} == CryptoCompare.fetch_history(3) |
||||
end |
||||
|
||||
test "with errored request", %{bypass: bypass} do |
||||
error_text = ~S({"error": "server error"}) |
||||
Bypass.expect(bypass, fn conn -> Conn.resp(conn, 500, error_text) end) |
||||
|
||||
assert :error == CryptoCompare.fetch_history(3) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,87 @@ |
||||
defmodule Explorer.MarketTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Explorer.Market |
||||
alias Explorer.Market.MarketHistory |
||||
alias Explorer.Repo |
||||
|
||||
test "fetch_recent_history/1" do |
||||
today = Date.utc_today() |
||||
|
||||
records = |
||||
for i <- 0..5 do |
||||
%{ |
||||
date: Timex.shift(today, days: i * -1), |
||||
closing_price: Decimal.new(1), |
||||
opening_price: Decimal.new(1) |
||||
} |
||||
end |
||||
|
||||
Market.bulk_insert_history(records) |
||||
|
||||
history = Market.fetch_recent_history(1) |
||||
assert length(history) == 1 |
||||
assert Enum.at(history, 0).date == Enum.at(records, 0).date |
||||
|
||||
more_history = Market.fetch_recent_history(5) |
||||
assert length(more_history) == 5 |
||||
|
||||
for {history_record, index} <- Enum.with_index(more_history) do |
||||
assert history_record.date == Enum.at(records, index).date |
||||
end |
||||
end |
||||
|
||||
describe "bulk_insert_history/1" do |
||||
test "inserts records" do |
||||
comparable_values = %{ |
||||
~D[2018-04-01] => %{ |
||||
date: ~D[2018-04-01], |
||||
closing_price: Decimal.new(1), |
||||
opening_price: Decimal.new(1) |
||||
}, |
||||
~D[2018-04-02] => %{ |
||||
date: ~D[2018-04-02], |
||||
closing_price: Decimal.new(1), |
||||
opening_price: Decimal.new(1) |
||||
}, |
||||
~D[2018-04-03] => %{ |
||||
date: ~D[2018-04-03], |
||||
closing_price: Decimal.new(1), |
||||
opening_price: Decimal.new(1) |
||||
} |
||||
} |
||||
|
||||
insertable_records = Map.values(comparable_values) |
||||
Market.bulk_insert_history(insertable_records) |
||||
history = Repo.all(MarketHistory) |
||||
|
||||
missing_records = |
||||
Enum.reduce(history, comparable_values, fn record, acc -> |
||||
initial_record = Map.get(acc, record.date) |
||||
assert record.date == initial_record.date |
||||
assert record.closing_price == initial_record.closing_price |
||||
assert record.opening_price == initial_record.opening_price |
||||
Map.delete(acc, record.date) |
||||
end) |
||||
|
||||
assert missing_records == %{} |
||||
end |
||||
|
||||
test "overrides existing records on date conflict" do |
||||
date = ~D[2018-04-01] |
||||
Repo.insert(%MarketHistory{date: date}) |
||||
|
||||
new_record = %{ |
||||
date: date, |
||||
closing_price: Decimal.new(1), |
||||
opening_price: Decimal.new(1) |
||||
} |
||||
|
||||
Market.bulk_insert_history([new_record]) |
||||
|
||||
fetched_record = Repo.get_by(MarketHistory, date: date) |
||||
assert fetched_record.closing_price == new_record.closing_price |
||||
assert fetched_record.opening_price == new_record.opening_price |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue