Создайте или опубликуйте сообщение о цене криптовалюты в формате JSON в службе Apache Kafka.

Я всегда нахожу, что при изучении языка программирования полезно просто начинать разработку приложения с нуля, шаг за шагом. Этот урок основан на предыдущем уроке Изучение Rust путем разработки приложения: Crypto Publisher CLI для публикации/производства цен на криптовалюту (например, цены биткойнов в долларах США или BTC-USD) для Apache Kafka.

Перед тем, как углубиться в статью, давайте сначала разберемся, предоставив ссылку на репозиторий GitHub, где доступны все исходные коды => https://github.com/sungkim11/crypto-publisher.

Кроме того, я хотел бы продемонстрировать, какое приложение вы будете разрабатывать с использованием Rust, как показано ниже.

Предпосылки

Апач Кафка

На вашем компьютере должен быть установлен Apache Kafka или у вас должен быть доступ к службе Apache Kafka. Я использовал APACHE KAFKA QUICKSTART для установки Apache Kafka в Windows 10/11 WSL (подсистема Windows для Linux).

Ржавые ящики

Прежде чем мы начнем разработку приложения, нам нужно добавить семь крейтов Rust:

  • Serde: Это фреймворк для эффективной и универсальной сериализации и десериализации структур данных Rust. Их URL-адрес crates.io — https://crates.io/crates/serde. Serde используется для сопоставления данных JSON со структурами данных Rust или Struct. Он используется для десериализации или преобразования JSON в структуру и сериализации или преобразования структуры в JSON.
  • Reqwest: Это HTTP-клиент для Rust. Их URL-адрес crates.io — https://crates.io/crates/reqwest. Reqwest используется для выполнения HTTPS-запросов к Coinbase REST API.
  • Tokio: Это платформа для написания асинхронных приложений. Их crates.io URL — https://crates.io/crates/tokio. Tokio нужен для разработки асинхронного приложения.
  • Clap: это парсер аргументов командной строки для Rust. Их URL-адрес crates.io — https://crates.io/crates/clap. Clap необходим для разработки приложения с интерфейсом командной строки.
  • (НОВИНКА) rdkafka: полностью асинхронная клиентская библиотека Apache Kafka с поддержкой Futures для Rust на основе librdkafka. Их URL-адрес crates.io — https://crates.io/crates/rdkafka.
  • (НОВИНКА) log: библиотека Rust, обеспечивающая облегченный фасад ведения журнала. Их URL-адрес crates.io — https://crates.io/crates/log. Журнал используется для предоставления информационных сообщений.
  • (НОВОЕ) serde_json: Serde — это фреймворк для эффективной и общей сериализации и десериализации структур данных Rust. Их URL-адрес crates.io — https://crates.io/crates/serde_json. Serde_json используется для преобразования структуры обратно в JSON.

Наконец, Cargo.toml изменено, чтобы включить семь ящиков следующим образом:

[package]
name = "rust-struct"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# CLI
serde = { version = "1.0.136", features = ["derive"] }
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1.17.0", features = ["full"] }
clap = { version = "3.1.2", features = ["derive"] }
# Apach Kafka
rdkafka = { version = "0.25", features = ["cmake-build"] }
log = "0.4.14"
serde_json = "1.0"

Что мы собираемся строить?

В этой статье мы разработаем простое приложение, а затем создадим приложение, чтобы добавить дополнительные функции в последующих уроках:

  1. Создайте или опубликуйте сообщение о цене криптовалюты в Apache Kafka Service.
  2. Создайте или опубликуйте сообщение о цене криптовалюты в формате JSON в службе Apache Kafka.

1. Создайте или опубликуйте сообщение о цене криптовалюты в Apache Kafka Service.

Мы создадим новую асинхронную функцию для публикации или создания сообщений для службы Apache Kafka.

Публикация (сообщение) — во-первых, мы настроили параметры для клиента производителя Kafka, где я определил минимальное количество параметров, как показано ниже:

  • broker
  • message.timeout
  • безопасность (нет безопасности)
let producer: &FutureProducer = &ClientConfig::new()
     .set("bootstrap.servers", broker)
     .set("message.timeout.ms", "5000")
     .set("security.protocol", "plaintext")
     .create()
     .expect("Failed to create producer");

Полный список свойств конфигурации доступен здесь =› https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Во-вторых, мы определили как ключ, так и полезную нагрузку (например, сообщение), как показано ниже:

let payload = format!("message {}", pub_message);
let key = format!("key {}", count);

Наконец, мы отправляем сообщение службе Apache Kafka, как показано ниже:

let status = producer.send(
    FutureRecord::to(topic)
         .payload(&payload)
         .key(&key)
         .headers(OwnedHeaders::new().add(
              &format!("header_key_{}", count),
              &format!("header_value_{}", count)
         )),
    Duration::from_secs(0)
).await;

Полный код для async fn publish() приведен ниже:

Затем мы изменим конфигурацию хлопка, включив в нее:

  • брокер, который определяет, где работают службы Apache Kafka.
  • тема, определяющая категорию, в которой хранятся и публикуются записи.

Наконец, мы модифицируем pub fn crypto_publisher() для вызова новой функции:

publish(broker, topic, &price_message, count);

Полный код под названием crypto_publisher_6.rs доступен в репозитории GitHub (издатель https://github.com/sungkim11/crypto-p).

2. Создайте или опубликуйте сообщение о цене криптовалюты в формате JSON в службе Apache Kafka.

В предыдущем уроке мы отправили сообщение в виде одной строки строки с ценой криптовалюты, как показано ниже:

2022-03-28T02:22:11Z: BTC-USD SPOT Price: 46811.21 | BUY Price: 47048.86 | SELL Price: 46575.70 | Price Spread: 473.16016

В этом уроке мы преобразуем сообщение в формат JSON, используя функцию сериализации serde.

Во-первых, мы создаем структуру для хранения сообщения в формате JSON следующим образом:

#[derive(Serialize, Debug)]
pub struct CryptoPriceData {
   pub data: CryptoPrice
}
#[derive(Serialize, Debug)]
pub struct CryptoPrice {
   pub quote_time: String,
   pub currency: String,
   pub rate: String,
   pub spot_price: String, 
   pub buy_price: String, 
   pub sell_price: String,
   pub spread_price: String,
}

Затем мы заполняем структуру данными о ценах на криптовалюту, как показано ниже:

let price_struct = CryptoPriceData {
   data: CryptoPrice {
      quote_time: quote_time.unwrap().to_string(),
      currency: currency.to_string(),
      rate: rates.to_string(),
      spot_price: spot_price.unwrap().to_string(),
      buy_price: buy_price.unwrap().to_string(),
      sell_price: sell_price.unwrap().to_string(),
      spread_price: spread_price.to_string(),
   }
};

Наконец, мы конвертируем структуру в JSON, а затем вызываем async fn publish(), как показано ниже:

let price_json = serde_json::to_string(&price_struct).unwrap();
publish(broker, topic, &price_json, count);

Полный код под названием crypto_publisher_7.rs доступен в репозитории GitHub (https://github.com/sungkim11/crypto-p, издатель).

На следующем уроке мы создадим криптоподписчика, который будет использовать сообщение и сохранять его в формате Apache Parquet. Обычно мы используем Apache Spark для обработки этих потоков данных (то есть сообщений) от Apache Kafka, но я хочу продемонстрировать, что вы также можете использовать Rust для выполнения аналогичной задачи. Однако я рекомендую использовать Apache Spark.

Я надеюсь, что эта статья была полезной! Спасибо за прочтение.