Создайте или опубликуйте сообщение о цене криптовалюты в формате JSON в службе Apache Kafka.
Я всегда нахожу, что при изучении языка программирования полезно просто начинать разработку приложения с нуля, шаг за шагом. Этот урок основан на предыдущем уроке Изучение Rust путем разработки приложения: Crypto Publisher CLI для публикации/производства цен на криптовалюту (например, цены биткойнов в долларах США или BTC-USD) для Apache Kafka.
Перед тем, как углубиться в статью, давайте сначала разберемся, предоставив ссылку на репозиторий GitHub, где доступны все исходные коды => https://github.com/sungkim11/crypto-publisher.
Кроме того, я хотел бы продемонстрировать, какое приложение вы будете разрабатывать с использованием Rust, как показано ниже.
Предпосылки
Апач Кафка
На вашем компьютере должен быть установлен Apache Kafka или у вас должен быть доступ к службе Apache Kafka. Я использовал APACHE KAFKA QUICKSTART для установки Apache Kafka в Windows 10/11 WSL (подсистема Windows для Linux).
Ржавые ящики
Прежде чем мы начнем разработку приложения, нам нужно добавить семь крейтов Rust:
Serde
: Это фреймворк для эффективной и универсальной сериализации и десериализации структур данных Rust. Их URL-адрес crates.io — https://crates.io/crates/serde. Serde используется для сопоставления данных JSON со структурами данных Rust или Struct. Он используется для десериализации или преобразования JSON в структуру и сериализации или преобразования структуры в JSON.Reqwest
: Это HTTP-клиент для Rust. Их URL-адрес crates.io — https://crates.io/crates/reqwest.Reqwest
используется для выполнения HTTPS-запросов к Coinbase REST API.Tokio
: Это платформа для написания асинхронных приложений. Ихcrates.io
URL — https://crates.io/crates/tokio. Tokio нужен для разработки асинхронного приложения.- Clap: это парсер аргументов командной строки для Rust. Их URL-адрес crates.io — https://crates.io/crates/clap. Clap необходим для разработки приложения с интерфейсом командной строки.
- (НОВИНКА)
rdkafka
: полностью асинхронная клиентская библиотека Apache Kafka с поддержкой Futures для Rust на основеlibrdkafka
. Их URL-адрес crates.io — https://crates.io/crates/rdkafka. - (НОВИНКА)
log
: библиотека Rust, обеспечивающая облегченный фасад ведения журнала. Их URL-адрес crates.io — https://crates.io/crates/log. Журнал используется для предоставления информационных сообщений. - (НОВОЕ)
serde_json
: Serde — это фреймворк для эффективной и общей сериализации и десериализации структур данных Rust. Их URL-адрес crates.io — https://crates.io/crates/serde_json. Serde_json используется для преобразования структуры обратно в JSON.
Наконец, Cargo.toml
изменено, чтобы включить семь ящиков следующим образом:
[package] name = "rust-struct" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] # CLI serde = { version = "1.0.136", features = ["derive"] } reqwest = { version = "0.11", features = ["json"] } tokio = { version = "1.17.0", features = ["full"] } clap = { version = "3.1.2", features = ["derive"] } # Apach Kafka rdkafka = { version = "0.25", features = ["cmake-build"] } log = "0.4.14" serde_json = "1.0"
Что мы собираемся строить?
В этой статье мы разработаем простое приложение, а затем создадим приложение, чтобы добавить дополнительные функции в последующих уроках:
- Создайте или опубликуйте сообщение о цене криптовалюты в Apache Kafka Service.
- Создайте или опубликуйте сообщение о цене криптовалюты в формате JSON в службе Apache Kafka.
1. Создайте или опубликуйте сообщение о цене криптовалюты в Apache Kafka Service.
Мы создадим новую асинхронную функцию для публикации или создания сообщений для службы Apache Kafka.
Публикация (сообщение) — во-первых, мы настроили параметры для клиента производителя Kafka, где я определил минимальное количество параметров, как показано ниже:
broker
message.timeout
- безопасность (нет безопасности)
let producer: &FutureProducer = &ClientConfig::new() .set("bootstrap.servers", broker) .set("message.timeout.ms", "5000") .set("security.protocol", "plaintext") .create() .expect("Failed to create producer");
Полный список свойств конфигурации доступен здесь =› https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
Во-вторых, мы определили как ключ, так и полезную нагрузку (например, сообщение), как показано ниже:
let payload = format!("message {}", pub_message); let key = format!("key {}", count);
Наконец, мы отправляем сообщение службе Apache Kafka, как показано ниже:
let status = producer.send( FutureRecord::to(topic) .payload(&payload) .key(&key) .headers(OwnedHeaders::new().add( &format!("header_key_{}", count), &format!("header_value_{}", count) )), Duration::from_secs(0) ).await;
Полный код для async fn publish()
приведен ниже:
Затем мы изменим конфигурацию хлопка, включив в нее:
- брокер, который определяет, где работают службы Apache Kafka.
- тема, определяющая категорию, в которой хранятся и публикуются записи.
Наконец, мы модифицируем pub fn crypto_publisher()
для вызова новой функции:
publish(broker, topic, &price_message, count);
Полный код под названием crypto_publisher_6.rs
доступен в репозитории GitHub (издатель https://github.com/sungkim11/crypto-p).
2. Создайте или опубликуйте сообщение о цене криптовалюты в формате JSON в службе Apache Kafka.
В предыдущем уроке мы отправили сообщение в виде одной строки строки с ценой криптовалюты, как показано ниже:
2022-03-28T02:22:11Z: BTC-USD SPOT Price: 46811.21 | BUY Price: 47048.86 | SELL Price: 46575.70 | Price Spread: 473.16016
В этом уроке мы преобразуем сообщение в формат JSON, используя функцию сериализации serde.
Во-первых, мы создаем структуру для хранения сообщения в формате JSON следующим образом:
#[derive(Serialize, Debug)] pub struct CryptoPriceData { pub data: CryptoPrice } #[derive(Serialize, Debug)] pub struct CryptoPrice { pub quote_time: String, pub currency: String, pub rate: String, pub spot_price: String, pub buy_price: String, pub sell_price: String, pub spread_price: String, }
Затем мы заполняем структуру данными о ценах на криптовалюту, как показано ниже:
let price_struct = CryptoPriceData { data: CryptoPrice { quote_time: quote_time.unwrap().to_string(), currency: currency.to_string(), rate: rates.to_string(), spot_price: spot_price.unwrap().to_string(), buy_price: buy_price.unwrap().to_string(), sell_price: sell_price.unwrap().to_string(), spread_price: spread_price.to_string(), } };
Наконец, мы конвертируем структуру в JSON, а затем вызываем async fn publish()
, как показано ниже:
let price_json = serde_json::to_string(&price_struct).unwrap(); publish(broker, topic, &price_json, count);
Полный код под названием crypto_publisher_7.rs
доступен в репозитории GitHub (https://github.com/sungkim11/crypto-p, издатель).
На следующем уроке мы создадим криптоподписчика, который будет использовать сообщение и сохранять его в формате Apache Parquet. Обычно мы используем Apache Spark для обработки этих потоков данных (то есть сообщений) от Apache Kafka, но я хочу продемонстрировать, что вы также можете использовать Rust для выполнения аналогичной задачи. Однако я рекомендую использовать Apache Spark.
Я надеюсь, что эта статья была полезной! Спасибо за прочтение.