Потоки в Node.js имеют репутацию трудных для работы и еще более трудных для понимания.

По словам Доминика Тарра: «Потоки - лучшая и наиболее неправильно понимаемая идея Node». Даже Дэн Абрамов, создатель Redux и член основной команды React.js, боится потоков Node.

Эта статья поможет вам понять потоки и то, как с ними работать. Так что не бойтесь. Мы можем в этом разобраться!

Что такое потоки?

Потоки - одна из фундаментальных концепций, лежащих в основе приложений Node.js. Они представляют собой метод обработки данных и используются для последовательного чтения или записи ввода в вывод.

Потоки - это способ эффективно обрабатывать чтение / запись файлов, сетевое взаимодействие или любой вид сквозного обмена информацией.

Что делает потоки уникальными, так это то, что вместо программы, считывающей файл в память все сразу, как в традиционном способе, потоки читают фрагменты данных по частям, обрабатывая их содержимое, не сохраняя все в памяти. .

Это делает потоки действительно мощными при работе с большими объемами данных, например, размер файла может быть больше, чем ваше свободное пространство в памяти, что делает невозможным чтение всего файла в память для обработки Это. Здесь на помощь приходят ручьи!

Использование потоков для обработки небольших фрагментов данных позволяет читать файлы большего размера.

Возьмем, к примеру, «потоковые» сервисы, такие как YouTube или Netflix: эти сервисы не заставляют вас загружать видео и аудио сразу. Вместо этого ваш браузер получает видео в виде непрерывного потока фрагментов, позволяя получателям начать просмотр и / или прослушивание практически сразу.

Однако потоки предназначены не только для работы с медиа или большими данными. Они также дают нам возможность «компоновать» в нашем коде. Проектирование с учетом возможности комбинирования означает, что несколько компонентов могут быть объединены определенным образом для получения одного и того же результата. В Node.js можно составлять мощные фрагменты кода, передавая данные в другие более мелкие фрагменты кода и из них, используя потоки.

Почему стримы

Потоки в основном предоставляют два основных преимущества по сравнению с другими методами обработки данных:

  1. Эффективность памяти: вам не нужно загружать большие объемы данных в память, прежде чем вы сможете их обработать.
  2. Эффективность времени: требуется значительно меньше времени, чтобы начать обработку данных, как только они у вас есть, вместо того, чтобы ждать с обработкой, пока вся полезная нагрузка не будет передана.

В Node.js есть 4 типа потоков:

  1. Возможность записи: потоки, в которые мы можем записывать данные. Например, fs.createWriteStream() позволяет записывать данные в файл с помощью потоков.
  2. Читаемые: потоки, из которых можно читать данные. Например: fs.createReadStream() позволяет нам читать содержимое файла.
  3. Дуплекс: потоки, которые доступны для чтения и записи. Например, net.Socket
  4. Преобразование: потоки, которые могут изменять или преобразовывать данные во время записи и чтения. Например, в случае сжатия файла вы можете записывать сжатые данные и читать распакованные данные в файл и из файла.

Если вы уже работали с Node.js, возможно, вы встречали потоки. Например, на HTTP-сервере на основе Node.js request - это читаемый поток, а response - доступный для записи поток. Возможно, вы использовали модуль fs, который позволяет работать как с читаемыми, так и с доступными для записи файловыми потоками. Всякий раз, когда вы используете Express, вы используете потоки для взаимодействия с клиентом, также потоки используются в каждом драйвере подключения к базе данных, с которым вы можете работать, потому что сокеты TCP, стек TLS и другие соединения основаны на Node.js потоки.

Практический пример

Как создать читаемый поток

Сначала нам нужен поток Readable, и мы его инициализируем.

const Stream = require('stream')
const readableStream = new Stream.Readable()

Теперь, когда поток инициализирован, мы можем отправлять в него данные:

readableStream.push('ping!')
readableStream.push('pong!')

асинхронный итератор

Настоятельно рекомендуется использовать асинхронный итератор при работе с потоками. Согласно Dr. Аксель Раушмайер , Асинхронная итерация - это протокол для асинхронного извлечения содержимого контейнера данных (то есть текущая задача может быть приостановлена ​​перед извлечением элемента). Также важно отметить, что реализация потокового асинхронного итератора использует внутри читаемое событие.

Вы можете использовать асинхронный итератор при чтении из читаемых потоков:

Также можно собрать содержимое читаемого потока в строку:

Обратите внимание, что в этом случае нам пришлось использовать асинхронную функцию, потому что мы хотели вернуть обещание.

Важно помнить, что нельзя смешивать асинхронные функции с EventEmitter, потому что в настоящее время нет способа поймать отклонение, когда оно генерируется в обработчике событий, что затрудняет отслеживание ошибок и утечек памяти. Лучшая текущая практика - всегда заключать содержимое асинхронной функции в блок try / catch и обрабатывать ошибки, но это чревато ошибками. Этот запрос на вытягивание направлен на решение этой проблемы, как только он попадает в ядро ​​Node.

Чтобы узнать больше о потоках Node.js с помощью асинхронной итерации, ознакомьтесь с этой замечательной статьей.

Readable.from (): создание читаемых потоков из итераций

stream.Readable.from(iterable, [options]) это служебный метод для создания читаемых потоков из итераторов, в котором хранятся данные, содержащиеся в итерируемом. Итерируемый может быть синхронным итерируемым или асинхронным итеративным. Параметры параметров являются необязательными и могут, среди прочего, использоваться для указания кодировки текста.

Два режима чтения

Согласно Streams API, читаемые потоки эффективно работают в одном из двух режимов: текущий и приостановленный. Доступный для чтения поток может находиться в объектном режиме или нет, независимо от того, в текущем ли он режиме или в приостановленном.

  • В потоковом режиме данные автоматически считываются из базовой системы и предоставляются приложению как можно быстрее с использованием событий через интерфейс EventEmitter.
  • В приостановленном режиме метод stream.read() должен вызываться явно для чтения фрагментов данных из потока.

В потоковом режиме, чтобы читать данные из потока, можно прослушать событие data и прикрепить обратный вызов. Когда часть данных доступна, читаемый поток генерирует событие данных, и ваш обратный вызов выполняется. Взгляните на следующий фрагмент:

Вызов функции fs.createReadStream() дает читаемый поток. Изначально поток находится в статическом состоянии. Как только вы прослушиваете событие данных и присоединяете обратный вызов, он начинает течь. После этого фрагменты данных считываются и передаются вашему обратному вызову. Разработчик потока решает, как часто генерируется событие данных. Например, HTTP-запрос может генерировать событие данных после чтения каждых нескольких килобайт данных. Когда вы читаете данные из файла, вы можете решить генерировать событие данных после чтения строки.

Когда данных для чтения больше нет (достигнут конец), поток генерирует событие завершения. В приведенном выше фрагменте мы прослушиваем это событие, чтобы получить уведомление, когда будет достигнут конец.

Кроме того, в случае ошибки поток выдаст сообщение об ошибке.

В режиме паузы вам просто нужно несколько раз вызывать read () в экземпляре потока до тех пор, пока не будет прочитан каждый фрагмент данных, как в следующем примере:

Функция read () считывает некоторые данные из внутреннего буфера и возвращает их. Когда читать нечего, возвращается ноль. Итак, в цикле while мы проверяем значение null и завершаем цикл. Обратите внимание, что читаемое событие генерируется, когда часть данных может быть прочитана из потока.

Все Readable потоки начинаются в режиме паузы, но их можно переключить в текущий режим одним из следующих способов:

  • Добавление обработчика события data.
  • Вызов метода stream.resume().
  • Вызов метода stream.pipe() для отправки данных в Writable.

Readable может вернуться в режим паузы, используя одно из следующих действий:

  • Если адресатов канала нет, вызовите метод stream.pause().
  • Если есть пункты назначения каналов, удалив все пункты назначения каналов. Несколько пунктов назначения канала можно удалить, вызвав метод stream.unpipe().

Важно помнить, что Readable не будет генерировать данные, пока не будет предоставлен механизм для использования или игнорирования этих данных. Если механизм потребления отключен или убран, Readable будет пытаться прекратить генерирование данных. Добавление обработчика событий readable автоматически заставляет поток перестать течь, а данные потребляются через readable.read(). Если удалить «читаемый» обработчик событий, поток снова начнет течь, если есть обработчик событий «данные».

Как создать доступный для записи поток

Чтобы записать данные в поток с возможностью записи, вам необходимо вызвать write() в экземпляре потока. Как в следующем примере:

Приведенный выше код прост. Он просто считывает фрагменты данных из входного потока и записывает их в место назначения, используя write(). Эта функция возвращает логическое значение, указывающее, была ли операция успешной. Если это так, значит, запись прошла успешно, и вы можете продолжить запись данных. Если возвращается false, это означает, что что-то пошло не так, и в данный момент вы не можете ничего писать. Доступный для записи поток сообщит вам, когда вы сможете начать запись дополнительных данных, испуская событие слива.

Вызов метода writable.end() сигнализирует о том, что в Writable больше не будут записываться данные. Если предоставляется, необязательная функция обратного вызова присоединяется как прослушиватель для события «finish».

Используя доступный для записи поток, вы можете читать данные из читаемого потока:

Вы также можете использовать асинхронные итераторы для записи в доступный для записи поток, что рекомендуется

Версия stream.finished () по умолчанию основана на обратном вызове, но может быть преобразована в версию на основе Promise с помощью util.promisify () (строка A).

В этом примере используются следующие два шаблона:

Запись в доступный для записи поток при обработке обратного давления (строка B):

if (!writable.write(chunk)) {
  await once(writable, 'drain');

Закрытие доступного для записи потока и ожидание завершения записи (строка C):

writable.end();
await finished(writable);

трубопровод()

Конвейер - это механизм, в котором мы обеспечиваем вывод одного потока в качестве ввода для другого потока. Обычно он используется для получения данных из одного потока и передачи вывода этого потока в другой поток. Нет ограничений на операции с трубопроводом. Другими словами, конвейерная обработка данных используется для обработки потоковых данных в несколько этапов.

В Node 10.x был представлен stream.pipeline(). Это модульный метод для передачи ошибок пересылки потоков между потоками и правильной очистки, а также для обеспечения обратного вызова после завершения конвейера.

Вот пример использования конвейера:

pipeline следует использовать вместо pipe, поскольку труба небезопасна.

Модуль Stream

Модуль потока Node.js обеспечивает основу, на которой построены все API-интерфейсы потоковой передачи.

Модуль Stream - это собственный модуль, который по умолчанию поставляется в Node.js. Stream - это экземпляр класса EventEmitter, который асинхронно обрабатывает события в Node. Из-за этого потоки по своей сути основаны на событиях.

Чтобы получить доступ к модулю потока:

const stream = require('stream');

Модуль stream полезен для создания новых типов экземпляров потоков. Обычно нет необходимости использовать модуль stream для использования потоков.

API-интерфейсы узлов на основе потоков

Благодаря своим преимуществам многие базовые модули Node.js предоставляют встроенные возможности обработки потоков, в частности:

  • net.Socket - это основной API узла, на котором основан поток, который лежит в основе большинства следующих API.
  • process.stdin возвращает поток, подключенный к стандартному вводу
  • process.stdout возвращает поток, подключенный к stdout
  • process.stderr возвращает поток, подключенный к stderr
  • fs.createReadStream() создает читаемый поток в файл
  • fs.createWriteStream() создает доступный для записи поток в файл
  • net.connect() инициирует потоковое соединение
  • http.request() возвращает экземпляр класса http.ClientRequest, который является доступным для записи потоком.
  • zlib.createGzip() сжимать данные с помощью gzip (алгоритм сжатия) в поток
  • zlib.createGunzip() распаковать поток gzip.
  • zlib.createDeflate() сжатие данных с помощью deflate (алгоритм сжатия) в поток
  • zlib.createInflate() распаковать поток спуска

Памятка по потокам:

Вот некоторые важные события, связанные с доступными для записи потоками:

  • error - испускается, чтобы указать, что произошла ошибка при записи / конвейере.
  • pipeline - Когда доступный для чтения поток передается по конвейеру в доступный для записи поток, это событие генерируется доступным для записи потоком.
  • unpipe - генерируется, когда вы вызываете unpipe в читаемом потоке и останавливаете его от передачи в целевой поток.

Заключение

Это было все об основах стримов. Потоки, конвейеры и цепочки - это основные и самые мощные функции Node.js. Потоки действительно могут помочь вам написать аккуратный и производительный код для выполнения ввода-вывода.

Также стоит обратить внимание на стратегическую инициативу Node.js под названием BOB, направленную на улучшение интерфейсов потоковых данных Node.js как внутри ядра Node.js, так и, надеюсь, в качестве будущих общедоступных API.

использованная литература

Особая благодарность Маттео Колине и Джереми Сенкпилу за ваш отзыв!

Stream API

Node.js Streams: все, что вам нужно знать

Node.js Streams

Основы потоковой передачи Node.js

Шпаргалка по потокам Node.js

Node.js - Streams

Более простые потоки Node.js за счет асинхронной итерации

Вы, наверное, используете стримы