События используются для отслеживания изменений в блокчейне и запуска пользовательских действий для определенных типов событий (например, когда фиксируется новый пакет).

Вот как вы можете реализовать прослушиватель событий для обработки событий блокчейна Hyperledger Sawtooth. Сюда входит подписка на определенные события, анализ сообщения о событии и вызов обработчиков событий. Реализация написана на Rust, но может быть использована в качестве шаблона для простой реализации на других языках.

Для тех, кто не знаком с этой технологией:

Hyperledger Sawtooth — это корпоративное решение для создания, развертывания и запуска распределенных реестров (также называемых блокчейнами).[1]

Одной из его функций является генерация событий при изменении состояния блокчейна, а также генерация пользовательских событий с использованием процессоров транзакций. Эта статья предназначена для людей, немного знакомых с архитектурой Sawtooth и основными понятиями. Я сосредоточусь исключительно на событиях.

Отправляя новый пакет в блокчейн, мы не фиксируем его немедленно. Чтобы узнать, когда это произошло, мы могли бы запросить конечную точку REST GET /batch_statuses, ожидая статуса «COMMITTED».

Но мы могли бы реализовать это и по-другому, имея более надежное решение. Вместо активного опроса мы можем прослушивать изменения состояния, просыпаясь только тогда, когда это необходимо. Как обрабатывать определенное событие, полностью зависит от пользователя. Для каждого конкретного типа события может быть запущено несколько действий.

События генерируются валидатором (1), который является центральным компонентом архитектуры Sawtooth. Пользовательские события могут быть указаны зарегистрированными обработчиками транзакций (2).

Слушатели (3) подключаются к узлу валидатора и подписываются на определенные типы событий.

Генерация событий

При каждом изменении состояния блокчейна по умолчанию генерируются два события: sawtooth/block-commit и sawtooth/state-delta. Они называются основными пилообразными событиями.

Пример содержания:

event_type: "sawtooth/block-commit"
attributes {key: "block_id" value: "adbfb73946ba8ed03e2e5156da83c1c39be82b01b7ec0cd096975311d1cce90c1e66e74e4aca125f90d9ef10d4ce406b0b378318fe3419965a9012987f461634"} 
attributes {key: "block_num" value: "9"} 
attributes {key: "state_root_hash" value: "3cba7bae9bac9faac7e4ed71d5c2168afc8a0e8b14f62474a743568d1406c238"} 
attributes {key: "previous_block_id" value: "2a56bd2ea2f18e3599dda3d85017051557c9cc827b2295ea8e6cc416a75fb4501cadfe3eafbefbd9a43387c6a400e583f80d468a42a7a6d05d6e72e9a9cca199"}

Как видите, sawtooth/block-commit содержит данные, связанные с блокчейном, такие как идентификатор предыдущего и текущего блока.

event_type: "sawtooth/state-delta" 
attributes {key: "address" value: "ddb0ea57ac26c4f83050b26172078765826a024bd4a13fbf4aad0ecaeaa479bb0c9590"} 
data: "\nW\nFddb0ea57ac26c4f83050b26172078765826a024bd4a13fbf4aad0ecaeaa479bb0c9590\022\013\n\ttest_user\030\001"

Пока sawtooth/state-delta содержит информацию об изменении состояния.

Если этого недостаточно для вашего варианта использования, вы можете создавать пользовательские события в обработчике транзакций. Это легко сделать с помощью Sawtooth rust_sdk [2].

    // event type is prefixed with the transaction family (TF) name by convention
    let event_type = "my_tf/something_wonderful_happened".to_string();
    // attributes: key-value pairs
    let attribute1 = (
        ("attribute name".to_string()),
        ("attribute value".to_string()),
    );
    let attribute2 = (
        ("favourite_city".to_string()), 
        ("Huanta".to_string()));
    let attributes = vec![attribute1, attribute2];
    // event data can be any piece of information serialized to bytes
    let event_data = "custom data attached to this event".as_bytes();

    // adding the event
    context
        .add_event(event_type.to_string(), attributes, event_data)
        .expect("Failed to add event");

Контекст исходит из функции apply, определяемой в черте TransactionHandler.

fn apply(
        &self,
        request: &TpProcessRequest,
        context: &mut dyn TransactionContext,
    ) -> Result<(), ApplyError> {

При получении слушателем это событие выглядит так:

event_type: "my_tf/something_wonderful_happened" 
attributes {key: "attribute name" value: "attribute value"} 
attributes {key: "favourite_city" value: "Huanta"} 
data: "custom data attached to this event"

Прослушивание событий

Теперь давайте перейдем к прослушивателю, который представляет собой пользовательское приложение, имеющее доступ к сети валидатора.

Во-первых, нам нужно создать сокет, подключенный к валидатору.

    // context of the listening sockets
    let ctx = zmq::Context::new();

    // create a socket and connect to validator
    let socket = ctx.socket(zmq::DEALER).unwrap();
    socket.connect(DEFAULT_VALIDATOR_ENDPOINT).unwrap();

Подписка на мероприятие

Прежде чем мы начнем слушать, мы хотим подписаться на определенные типы событий. Чтобы получить еще более конкретный результат, вы можете использовать EventFilter , но в этом примере мы его пропустим.

// define event subscriptions
let subscription_block_commit = sawtooth_sdk::messages::events::EventSubscription {
    event_type: "sawtooth/block-commit".to_string(),
    ..Default::default()
};
let subscription_state_delta = sawtooth_sdk::messages::events::EventSubscription {
    event_type: "sawtooth/state-delta".to_string(),
    ..Default::default()
};

Имея подписки на события, мы можем подготовить ClientEventsSubscribeRequest.

// prepare event subscribe request
let event_subscribe_req = sawtooth_sdk::messages::client_event::ClientEventsSubscribeRequest {
    subscriptions: protobuf::RepeatedField::from_vec(vec![
        subscription_block_commit,
        subscription_state_delta,
    ]),
    ..Default::default()
};

Запрос нужно завернуть в validator::Message, иначе валидатор его не поймет.

let message = sawtooth_sdk::messages::validator::Message {
    correlation_id: correlation_id.clone(),
    message_type:
        sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS_SUBSCRIBE_REQUEST,
    content: event_subscribe_req.write_to_bytes().unwrap(),
    ..Default::default()
};

Идентификатор корреляции генерируется нами и должен быть уникальным, мы будем использовать его позже для проверки ответа на подписку.

Сообщение готово, давайте отправим его через сокет. Сразу после этого мы должны начать слушать ответ, чтобы подтвердить, что подписка прошла успешно.

// send the message over the socket
socket
    .send(
        message.write_to_bytes().unwrap(),
        0, /* flags, none specified */
    )
    .unwrap();

// and receive a response
let mut resp = zmq::Message::new();
socket
    .recv(&mut resp, 0 /* flags, none specified */)
    .unwrap();

Звонок на recv заблокирован. Когда сообщение получено, мы можем проверить статус подписки.

// parse the response into a validator message
let validator_resp =
    sawtooth_sdk::messages::validator::Message::parse_from_bytes(&resp.to_vec()).unwrap();

// and verify that the subscription succeeded
if validator_resp.correlation_id != correlation_id {
    panic!("Wrong correlation_id")
}

if validator_resp.message_type
    != sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS_SUBSCRIBE_RESPONSE
{
    panic!("Unexpected message type");
}


let subscription_resp =
    sawtooth_sdk::messages::client_event::ClientEventsSubscribeResponse::parse_from_bytes(
        &validator_resp.content,
    )
    .unwrap();

if subscription_resp.status
    != sawtooth_sdk::messages::client_event::ClientEventsSubscribeResponse_Status::OK
{
    panic!("Failed to subscribe");
}

Петля прослушивания

Как только подписка будет подтверждена, мы можем перейти к сердцевине нашего приложения — циклу прослушивания. Здесь программа заблокирует ожидание сообщений о событиях и срабатывание обработчиков событий.

// start listening
loop {
    let mut message = zmq::Message::new();
    socket.recv(&mut message, 0 /* flags */).unwrap();

    // todo: parse the message
    // todo: call a handler
}

Расшифровка сообщения о событии

Одно сообщение о событии фактически состоит из списка событий, произошедших при определенном изменении состояния. Чтобы распаковать сообщение нам нужно пройти 3 слоя: zmq::Message =› validator::Message =› events::EventList

// wrapped in a validator message
let validator_msg =
    sawtooth_sdk::messages::validator::Message::parse_from_bytes(&message_raw).unwrap();

// check if the message type is CLIENT_EVENTS
if validator_msg.message_type
    != sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS
{
    // hint: such situation should not cause program panic,
    // handle errors more gracefully in a production code
    panic!("Invalid message type");
}

// validator message holds a list of events, we need to unpack them
let event_list =
    sawtooth_sdk::messages::events::EventList::parse_from_bytes(validator_msg.get_content())
        .unwrap();

event_list

Вызов обработчиков событий

Наконец, имея список событий, мы можем вызывать собственные обработчики, созданные специально для определенного типа события.

fn handler_block_commit(event: sawtooth_sdk::messages::events::Event) {
    let attributes = event.get_attributes();

    for attribute in attributes {
        if attribute.key == "state_root_hash" {
            println!(
                "Handling sawtooth/block-commit event, state root hash: {}",
                attribute.value
            )
        }
    }
}

fn handler_state_delta(event: sawtooth_sdk::messages::events::Event) {
    println!(
        "Handling sawtooth/state-delta event, data: {}",
        String::from_utf8(event.data).unwrap()
    );
}

Чтобы выполнить их, мы пройдемся по списку событий, проверим тип события и наличие выделенного обработчика.

Окончательный код выглядит так:

  // start listening
  loop {
      let mut message = zmq::Message::new();
      socket.recv(&mut message, 0 /* flags */).unwrap();

      let event_list: sawtooth_sdk::messages::events::EventList = parse_event_message(&message);

      for event in event_list.events {
          if event.event_type == "sawtooth/block-commit" {
              handler_block_commit(event);
          } else if event.event_type == "sawtooth/block-commit" {
              handler_state_delta(event);
          } else {
              println!("No handler for this event type: {}", event.event_type);
          }
      }
  }

При запуске прослушивателя и активации любого состояния блокчейна мы получаем вывод:

Handling sawtooth/block-commit event, state root hash: 263ac5ef355d519dc2cd103866ca1965b6d901011bd6d1c062b305f14aeced56
Handling sawtooth/state-delta event, data: WFddb0ea57ac2665ac5c9aa183721e613b082699513c1ad8e039434771ec0ded96f8b4e7

ДЕЛАТЬ

  1. Обрабатывайте ошибки, избегая паники в вашем коде (например, вызванной «unwrap»). Кроме того, когда выполняется цикл прослушивания, в идеале вы не хотите прерываться на каждой ошибке (встречающейся при синтаксическом анализе сообщения или выполнении обработчика), а только на существенных, таких как ошибки сокета.
  2. Выполняйте обработчики в отдельных потоках, чтобы продолжать слушать другие события во время обработки.

Грузовые зависимости, используемые для этой программы:

[dependencies]
protobuf = "2.23"
sawtooth-sdk = "0.5.2"
zmq = "0.9.0"

Полный исходный код можно найти здесь: https://gist.github.com/anjankow/c2aad2d2afc5e1da34af365ff709a2de

Дальнейшее чтение:

Английский не является моим родным языком, простите мои ошибки.

Вы нашли вводящую в заблуждение информацию, что-то не сработало для вас? Дайте мне знать.