В статье Обновление рыночных данных в реальном времени с помощью Elixir мы увидели, как создать клиент Coinbase WebSocket и получать сделки в реальном времени. В комментариях к статье читатель спросил, сколько сделок мы получаем в секунду. В целом скорость торгов зависит от товара и времени суток.

В этой статье мы начинаем видеть, как агрегировать эти сделки в режиме реального времени, используя поведение GenServer. Мы создадим процесс агрегации, который группирует и подсчитывает сделки.

Coinbase.Клиент

Давайте загрузим код, который мы написали в предыдущей статье. Вы можете найти его в этом репозитории GitHub, poeticoding/learning_coinbase.

$ git clone https://github.com/poeticoding/learning_coinbase

git clone [email protected]:poeticoding/learning_coinbase.git
Cloning into 'learning_coinbase'...
....

$ cd learning_coinbase && git checkout 45f34e5

Чтобы начать с того же кода, которым мы закончили в предыдущей статье, мы делаем проверку на выпуск части 1 (коммит 45f34e5).

Проверяя модуль Coinbase.Client (client.ex), мы видим, что каждый раз, когда мы получаем сообщение, мы декодируем его в карту, а затем печатаем его, только если "type" равно "match", что означает, что это сделка.

def handle_frame({:text, msg}, state) do
  handle_msg(Poison.decode!(msg), state)
end

def handle_msg(%{"type" => "match"} = trade, state) do
  IO.inspect(trade)
  {:ok, state}
end

Теперь вместо того, чтобы начинать писать наш код агрегации в модуле Coinbase.Client, лучше отделить код агрегации и создать для него новый модуль. Модуль агрегации, который мы собираемся реализовать, будет получать сделки от клиента, используя функцию new_trade(trade).

Агрегация — подсчет сделок

Мы можем получать несколько сделок в течение одной секунды. На изображении ниже мы видим, что сделки 1, 2 и 3 происходят в одну и ту же секунду, а сделки 4 и 5 — в другую секунду.

Глядя на пример сделки, мы видим, что у нас есть временная строка, которую мы можем использовать для группировки сделок.

%{
  "maker_order_id" => "ab678037-b931-4896-8310-8e2a91efc3aa",
  "price" => "3588.00000000",
  "product_id" => "BTC-USD",
  "sequence" => 7675147771,
  "side" => "sell",
  "size" => "0.05333699",
  "taker_order_id" => "df54a7ae-5fc5-4bc4-8db8-635e632c8597",
  "time" => "2018-12-28T00:02:23.809000Z",
  "trade_id" => 56786993,
  "type" => "match"
}

Чтобы преобразовать структуру "time" в структуру DateTime, мы используем функцию DateTime.from_iso8601/2.

DateTime.from_iso8601("2018-12-28T00:02:23.809000Z")
{:ok, #DateTime<2018-12-28 00:02:23.809000Z>, 0}

Структура DateTime вместе с Map удобна для группировки сделок, совершенных в одну и ту же секунду. С помощью DateTime мы можем преобразовать строку времени в кортеж даты и времени с точностью до секунд, опустив миллисекунды.

{:ok, dt, _} = DateTime.from_iso8601("2018-12-28T00:02:23.809000Z")
key = {dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second}
{2018, 12, 28, 0, 2, 23}

Затем этот кортеж будет использоваться в качестве ключа карты группировки, которую мы увидим позже.

Модуль агрегации

Приступим к реализации нашего модуля агрегации.

defmodule Coinbase.Aggregation do
  use GenServer

  def start_link([]) do
    GenServer.start_link(__MODULE__, :ok, name: Coinbase.Aggregation)
  end

  def init(:ok), do: {:ok, %{}}

end

Мы реализуем поведение GenServer и для простоты запускаем процесс с именем Coinbase.Aggregation. Поскольку нам не нужно передавать какой-либо параметр в функцию init, мы передаем только :ok. Функция init(:ok) устанавливает состояние, которое представляет собой пустую карту.

Наша цель — подсчитать количество сделок в секунду, обновив счетчик для каждой полученной сделки, создав карту, подобную этой.

%{
  {2018, 12, 28, 0, 2, 23} => 2,
  {2018, 12, 28, 0, 2, 24} => 1,
  {2018, 12, 28, 0, 2, 25} => 5,
  ...
}

Давайте реализуем функцию handle_cast/2. Мы хотим асинхронно получать торговые сообщения от клиента, и нам не нужно возвращать клиенту какой-либо результат.

# line 15 - Coinbase.Aggregation - lib/coinbase/aggregation.ex
def handle_cast({:new_trade, %{"time" => time}}, %{}=counts) do
  {:ok, dt, _} = DateTime.from_iso8601(time)
  key = {dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second}
  updated_counts = Map.update(counts, key, 1, fn v-> v+1 end)
  {:noreply, updated_counts}
end
  • строка 16: мы реализуем функцию handle_cast/2, шаблон которой соответствует сообщению {:new_trade, %{"time" => time}}. Таким образом мы извлекаем значение time, готовое для использования внутри функции. counts — это наше состояние, в котором мы суммируем количество сделок.
  • строка 17: мы преобразуем строку времени в структуру DateTime dt, которую мы используем для создания в строке 18 ключа, представляющего дату и время (с точностью до секунд) этой сделки.
  • строка 19: затем мы обновляем счетчик внутри карты для этого конкретного ключа. Используемая нами функция Map.update(map, key, initial, fun) принимает параметр initial, который является начальным значением, если ключ отсутствует на карте. Когда на карте уже есть ключ со счетчиком для этой конкретной секунды, функция update/4 обновляет значение, увеличивая его на 1.
  • строка 20: затем мы обновляем состояние, возвращая карту updated_counts.

Интерфейс

Coinbase.Client может напрямую использовать функцию GenServer.cast/2 для отправки сообщений процессу Coinbase.Aggregation. В общем, гораздо лучше написать публичный интерфейс в модуле Coinbase.Aggregation, созданный функциями, которые имеют дело с обменом сообщениями.

# Coinbase.Aggregation - lib/coinbase/aggregation.ex
def new_trade(pid, trade),
  do: GenServer.cast(pid, {:new_trade, trade})

def new_trade(trade),
  do: new_trade(Coinbase.Aggregation, trade)

new_trade(pid, trade) отправляет сообщение {:new_trade, trade} процессу pid. Поскольку мы запускаем процесс с фиксированным именем Coinbase.Aggregation, мы также добавляем функцию new_trade/1, которая использует имя процесса как pid.

Модуль Coinbase.Client теперь может использовать функцию Coinbase.Aggregation.new_trade/1 для отправки сделки в процесс агрегации.

# Coinbase.Client - lib/coinbase/client.ex
def handle_msg(%{"type" => "match"} = trade, state) do
  Coinbase.Aggregation.new_trade(trade)
  {:ok, state}
end

Полный код агрегации и клиента вы найдете здесь: aggregation.ex и client.ex

Надзор

В предыдущей статье мы уже настроили простой супервизор, который запускает и контролирует Coinbase.Client. Теперь нам нужно запустить процесс Coinbase.Aggregation до запуска клиента. Для этого нам просто нужно добавить наш модуль в качестве первого элемента списка children.

# Coinbase.Application - lib/coinbase/application.ex
def start(_type, _args) do
  children = [
    {Coinbase.Aggregation, []},
    {Coinbase.Client, ["BTC-USD"]}
  ]

  opts = [strategy: :one_for_one, name: Coinbase.Supervisor]
  Supervisor.start_link(children, opts)
end

Полный код: application.ex

Проверка состояния агрегации

Пришло время запустить наше приложение на iex.

$ iex -S mix
...
connected!
iex(1)>

Запускается интерактивная оболочка, и единственное сообщение, которое мы видим, — «подключено!». Чтобы понять, что происходит внутри процесса агрегации, мы можем использовать функцию Эрланга :sys.get_state/1, которая возвращает состояние процесса.

iex> :sys.get_state(Coinbase.Aggregation)
%{
  {2018, 12, 28, 12, 1, 30} => 1,
  {2018, 12, 28, 12, 1, 31} => 1,
  {2018, 12, 28, 12, 1, 32} => 5,
  {2018, 12, 28, 12, 1, 33} => 2,
  {2018, 12, 28, 12, 1, 38} => 12
}

Здорово! Это то, чего мы хотели добиться: карта с количеством сделок за каждую секунду. Давайте посмотрим, сможем ли мы увеличить количество сделок, добавив некоторые другие продукты.

# Coinbase.Application - lib/coinbase/application.ex
def start(_type, _args) do
  children = [
    {Coinbase.Aggregation, []},
    {Coinbase.Client, ["BTC-USD", "BTC-EUR", "BTC-GBP",
          "ETH-USD", "ETH-EUR", "ETH-GBP",
          "LTC-USD", "LTC-EUR", "LTC-GBP",
          "BTC-USD"]}
  ]
  ...

Таким образом, мы получим сделки для всех этих продуктов. Мы будем считать все сделки, не обращая внимания на то, к какому продукту они относятся.

iex> :sys.get_state Coinbase.Aggregation
%{
  {2018, 12, 28, 12, 16, 45} => 2,
  {2018, 12, 28, 12, 16, 47} => 1,
  {2018, 12, 28, 12, 16, 48} => 5,
  {2018, 12, 28, 12, 16, 49} => 7,
  {2018, 12, 28, 12, 16, 50} => 1,
  {2018, 12, 28, 12, 16, 51} => 1,
  {2018, 12, 28, 12, 16, 52} => 1,
  {2018, 12, 28, 12, 16, 53} => 1,
  {2018, 12, 28, 12, 16, 54} => 2,
  {2018, 12, 28, 12, 16, 55} => 13,
  {2018, 12, 28, 12, 16, 56} => 2,
  {2018, 12, 28, 12, 16, 57} => 3
}

Еще не так много сделок. Как мы можем увеличить количество сообщений?

Если мы хотим резко увеличить количество сообщений, которые мы получаем от Coinbase, мы также можем подписаться на канал level2. Этот канал отправляет нам сообщение о любых изменениях в книге заказов. Книга заказов — это место, где люди (или боты!) размещают свои заказы купить или продать.

Чтобы это заработало, нам нужно немного реорганизовать наш код.
Сначала мы добавляем канал level2 в сообщение о подписке в модуле Coinbase.Client.

# Coinbase.Client - lib/coinbase/client.ex
defp subscription_frame(products) do
  subscription_json = %{
    type: "subscribe",
    product_ids: products,
    channels: ["matches", "level2"]
  }
  |> Poison.encode!()

  {:text, subscription_json}
end

Наш handle_msg/2 принимает только сделки с условием "type" => "match". Изменяем это условие и просто проверяем, что в сообщении есть ключ "time", который нужен для счетной части.

# Coinbase.Client - lib/coinbase/client.ex
def handle_msg(%{"time" => _} = msg, state) do
  Coinbase.Aggregation.new_message(msg)
  {:ok, state}
end

Мы также изменили название функции Coinbase.Aggregation с new_trade(trade) на что-то более общее, например new_message(msg).

Вы можете увидеть рефакторинг кода клиентского модуля по этой ссылке: client.ex

Теперь давайте рефакторим модуль Coinbase.Aggregation.

# Coinbase.Aggregation - lib/coinbase/aggregation.ex
def new_message(msg),
  do: new_message(Coinbase.Aggregation, msg)

def new_message(pid, msg),
  do: GenServer.cast(pid, {:new_message, msg})

def handle_cast({:new_message, %{"time" => time}}, %{}=counts) do
...

Мы видим, что рефакторинг действительно минимален. Мы только что переименовали функции интерфейса и изменили сообщение с {:new_trade, trade} на {:new_message, msg}.

Рефакторинг кода агрегации по этой ссылке: aggregation.ex

Давайте теперь посмотрим, увеличилось ли количество сообщений, которые мы получаем от Coinbase.

$ iex -S mix
connected!
iex> :sys.get_state Coinbase.Aggregation
%{
  {2018, 12, 28, 14, 13, 51} => 198,
  {2018, 12, 28, 14, 13, 52} => 218,
  {2018, 12, 28, 14, 13, 53} => 222,
  ...
}

Идеально! Увеличение весьма заметно.

Если вы хотите получать еще больше сообщений, попробуйте полный канал

Репозиторий GitHub

В этом коммите вы найдете полный рабочий код реализации этой статьи.

Подведение итогов

В этой статье мы начали использовать данные в реальном времени, поступающие от Coinbase, с простым процессом агрегирования, который подсчитывает сделки в одну и ту же секунду. Мы использовали карту, чтобы сгруппировать сделки, совершенные за одну секунду, и увидели результат, отображающий состояние процесса агрегирования. Это дает нам представление о частоте сделок/сообщений, которые мы получаем от сервера, но, очевидно, далеко от идеала.

Использование карты упрощает группировку, но чтобы получить среднюю скорость, мы должны учитывать пустые секунды (секунды без сделок) и вычислять среднее количество последних 10–20 секунд. Ключи на карте не упорядочены, и если мы хотим увидеть только последние 10–20 секунд, нам нужно отсортировать эти ключи, что может быть вычислительно затратной операцией, особенно когда у нас тысячи ключей. В дальнейших статьях мы увидим, как считать сделки, используя Lists и Queues.

Первоначально опубликовано на www.poeticoding.com.