Очередь сообщений в Node.js с AWS SQS

Введение С увеличением сложности современных программных систем возникла необходимость разбивать системы, которые переросли свой первоначальный размер. Это усложнение систем усложнило их обслуживание, обновление и модернизацию. Это проложило путь для микросервисов, которые позволили разделить массивные монолитные системы на более мелкие сервисы, которые слабо связаны, но взаимодействуют для обеспечения полной функциональности исходного монолитного решения. Слабая муфта обеспечивает

Вступление

С увеличением сложности современных программных систем возникла необходимость разбивать системы, которые переросли свой первоначальный размер. Это усложнение систем усложнило их обслуживание, обновление и модернизацию.

Это проложило путь для микросервисов, которые позволили разделить массивные монолитные системы на более мелкие сервисы, которые слабо связаны, но взаимодействуют для обеспечения полной функциональности исходного монолитного решения. Слабое соединение обеспечивает маневренность и упрощает процесс обслуживания и добавления новых функций без необходимости модификации всей системы.

Именно в этих микросервисных архитектурах используются системы очередей для облегчения взаимодействия между отдельными сервисами, составляющими всю установку.

В этом посте мы погрузимся в системы очередей, в частности, в Simple Queue Service от Amazon, и продемонстрируем, как мы можем использовать его функции в среде микросервисов.

Что такое очередь сообщений?

До появления Интернета и электронной почты люди на больших расстояниях общались в основном посредством обмена письмами. Письма содержали сообщения, которые должны были быть переданы, и были отправлены на местное почтовое отделение, откуда они будут отправлены на адрес получателя.

Это могло отличаться от региона к региону, но идея была той же. Люди доверяли посредникам доставлять им сообщения, пока они продолжали жить своей жизнью.

Когда система разбита на более мелкие компоненты или службы, которые, как ожидается, будут работать вместе, им потребуется обмениваться информацией и передавать информацию от одной службы к другой, в зависимости от функциональности отдельных служб.

Очередь сообщений облегчает этот процесс, выступая в роли «почтовой службы» для микросервисов. Сообщения помещаются в очередь, и целевые службы принимают адресованные им сообщения и действуют в соответствии с ними. Сообщения могут содержать что угодно - например, инструкции о том, какие действия следует предпринять, данные, которые необходимо выполнить или сохранить, или асинхронные задания, которые необходимо выполнить.

Очередь сообщений - это механизм, который позволяет компонентам системы передавать информацию и обмениваться ею в асинхронном режиме. Это означает, что слабосвязанные системы не должны ждать немедленной обратной связи по отправляемым ими сообщениям, и они могут быть освобождены для продолжения обработки других запросов. Когда приходит время и требуется ответ, служба может искать ответ в очереди сообщений.

Вот несколько примеров популярных очередей сообщений или брокеров:

  • Amazon Simple Queue Service - которому посвящена эта статья
  • RabbitMQ - с открытым исходным кодом, обеспечивающий возможности асинхронного обмена сообщениями.
  • Apache Kafka - это распределенная потоковая платформа, поддерживающая режим взаимодействия pub / sub.
  • Другие включают Apache RocketMQ , NSQ и HornetQ.

Примеры использования очереди сообщений

Очереди сообщений не нужны для каждой системы, но есть определенные сценарии, в которых они стоят усилий и ресурсов, необходимых для их настройки и обслуживания. При правильном использовании очереди сообщений имеют несколько преимуществ.

Во-первых, очереди сообщений поддерживают разделение больших систем, обеспечивая механизм связи в слабосвязанной системе.

Избыточность поддерживается за счет использования очередей сообщений путем поддержания состояния в случае сбоя службы. Когда отказавший или неисправный сервис возобновляет операции, все операции, которые он должен был обрабатывать, все еще находятся в очереди, и он может забрать их и продолжить транзакции, которые в противном случае могли быть потеряны.

Очередь сообщений облегчает группировку таких операций, как отправка электронных писем или вставка записей в базу данных. Пакетные инструкции могут быть сохранены в очереди и все обработаны одновременно по порядку, вместо того, чтобы обрабатываться одна за другой, что может быть неэффективным.

Системы очередей также могут быть полезны для обеспечения согласованности операций, гарантируя, что они выполняются в том порядке, в котором они были получены. Это особенно важно, когда определенные компоненты или службы системы были реплицированы для того, чтобы справиться с возросшей нагрузкой. Таким образом, система будет хорошо масштабироваться, чтобы справиться с нагрузкой, а также гарантировать, что обрабатываемые транзакции согласованы и упорядочены, поскольку все реплицированные службы будут получать свои инструкции из очереди сообщений, которая будет действовать как единственный источник истины.

Amazon Simple Queue Service - SQS

Как и большинство других предложений Amazon Web Services, Simple Queue Service (SQS) - это решение для очередей сообщений, которое распространяется и полностью управляется Amazon, подобно бессерверным вычислениям через Chalice .

SQS позволяет нам отправлять и получать сообщения или инструкции между программными компонентами, что позволяет нам внедрять и масштабировать микросервисы в наших системах без хлопот по настройке и обслуживанию системы очередей.

Как и другие сервисы AWS, SQS динамически масштабируется в зависимости от спроса, обеспечивая при этом безопасность данных, передаваемых посредством (необязательного) шифрования сообщений.

Демонстрационный проект

Чтобы изучить Amazon Simple Queue Service , мы создадим в Node.js независимую систему, каждый компонент которой будет взаимодействовать с другими, отправляя и получая сообщения из SQS.

Поскольку мы небольшая организация, у которой нет пропускной способности для обработки заказов по мере их поступления, у нас будет одна служба для приема заказов пользователей, а другая - доставлять все заказы, отправленные в этот день, в наш почтовый ящик в определенное время. день для пакетной обработки. Все заказы будут храниться в очереди, пока они не будут собраны нашей второй службой и доставлены в наш почтовый ящик.

Наши микросервисы будут состоять из простых API-интерфейсов Node.js, один из которых получает информацию о заказе от пользователей, а другой отправляет пользователям электронные письма с подтверждением.

Асинхронная отправка подтверждений по электронной почте через очередь обмена сообщениями позволит нашей службе заказов продолжать получать заказы, несмотря на нагрузку, поскольку ей не нужно беспокоиться об отправке электронных писем.

Кроме того, в случае отказа почтовой службы после восстановления она продолжит отправку писем из очереди, поэтому нам не придется беспокоиться о потерянных заказах.

Веб-сервисы Amazon

Для этого проекта нам понадобится активная и действующая учетная запись AWS, которую мы можем зарегистрировать на домашней странице AWS . AWS требует, чтобы мы предоставляли не только некоторые личные данные, но и свои платежные данные. Чтобы избежать выставления счетов за этот демонстрационный проект, мы будем использовать уровень бесплатного пользования AWS для тестирования и разработки.

Нам также потребуется установить инструмент AWS CLI, чтобы взаимодействовать с нашими ресурсами AWS с наших компьютеров. Инструкции по установке инструмента AWS CLI на нескольких платформах можно найти здесь .

Установив инструмент AWS CLI, мы можем перейти к консоли AWS и под раскрывающимся списком профиля есть раздел « Мои учетные данные безопасности ». Здесь мы сможем создать учетные данные, которые будут использоваться при взаимодействии с консолью AWS.

Эти учетные данные также будут использоваться инструментом Amazon CLI, который мы настроим, запустив:

 $ aws configure 

Нам будет предложено ввести Access Key ID , Secret Access Key , а также регионы и форматы вывода по умолчанию. Последние два являются необязательными, но нам понадобятся ключ доступа и секрет, которые мы получили на панели инструментов консоли AWS.

Когда наша учетная запись AWS запущена и настроен интерфейс командной строки AWS, мы можем настроить нашу AWS Simple Queue Service, перейдя на домашнюю страницу SQS .

nodeshop.fifo экрана, после указания имени нашей очереди как nodehop.fifo нам представлены два варианта очереди:

инициализацияsqs{.ezlazyload}

У нас есть возможность выбрать между стандартной очередью или очередью FIFO . Стандартная очередь не поддерживает порядок получаемых сообщений и лучше подходит для проектов, в которых пропускная способность важнее порядка событий.

Очередь FIFO, с другой стороны, поддерживает порядок сообщений в том виде, в котором они были получены, и они также извлекаются в том же порядке « первым пришел - первым обслужен» .

Учитывая, что мы будем создавать мини-платформу для покупок, важно поддерживать порядок запросов, поскольку мы надеемся продавать товары людям в порядке их покупок. После того, как мы выбрали тип очереди, которая нам нужна, мы можем изменить некоторые дополнительные настройки нашей очереди:

расширенная конфигурация sqsfifo{.ezlazyload}

Мы можем настроить время до автоматического удаления сообщения из очереди и размер сообщения, среди других параметров. А пока мы настроим нашу очередь FIFO, используя значения по умолчанию. Теперь, когда наша очередь готова, мы можем создать наши API-интерфейсы Node.js, которые будут читать и записывать в нашу очередь Amazon SQS FIFO.

Настройка Node.js и NPM

Инструкцию по настройке Node.js на нескольких платформах можно найти здесь, на официальном сайте Node.js. Node Package Manager (NPM) поставляется с Node.js, и мы можем проверить нашу установку следующим образом:

 # Node.js version 
 $ node -v 
 v12.12.0 
 
 # NPM version 
 $ npm -v 
 6.11.3 

Следующий шаг - настроить наши папки следующим образом:

 # create folder and move into it 
 $ mkdir nodeshop_apis && cd $_ 
 
 # create the orders and emails services folders 
 $ mkdir orderssvc emailssvc 

Настройка API узла

Сначала мы создадим orders поскольку она принимает заказы от пользователей и отправляет информацию в нашу очередь. Наша emails затем прочитает из очереди и отправит электронные письма.

Мы инициализируем проект Node.js и установим платформу Express.js , которую мы будем использовать для создания нашего минималистичного API. Мы также установим промежуточное программное обеспечение body-parser, которое будет обрабатывать данные нашего запроса за нас, а также проверять их.

Для этого в нашей корневой папке:

 # initialize node project 
 $ npm init 
 
 # install express and body-parser 
 $ npm install express body-parser --save 

После установки Express и body-parser они будут автоматически добавлены в раздел зависимостей нашего файла package.json благодаря опции --save

Поскольку у нас будет несколько служб, которые будут работать одновременно, мы также установим пакет npm-run-all , чтобы помочь нам запускать все наши службы одновременно и не запускать команды в нескольких окнах терминала:

 $ npm install npm-run-all --save 

npm-run-all , давайте теперь настроим scripts в нашем package.json чтобы включить команды для запуска наших служб и одну команду для их запуска:

 { 
 // Truncated for brevity... 
 "scripts": { 
 "start-orders-svc": "node ./orderssvc/index.js 8081", 
 "start-emails-svc": "node ./emailssvc/index.js", 
 "start": "npm-run-all -p -r start-orders-svc" 
 }, 
 // ... 
 } 

Мы добавим команды start-orders-svc и start-emails-svc для запуска наших orders и emails соответственно. Затем мы настроим команду start для выполнения их обоих с помощью npm-run-all .

При такой настройке запустить все наши службы будет так же просто, как выполнить следующую команду:

 $ npm start 

Мы можем создать наш API orders index.js следующим образом:

 const express = require('express'); 
 const bodyParser = require('body-parser'); 
 
 const port = process.argv.slice(2)[0]; 
 const app = express(); 
 
 app.use(bodyParser.json()); 
 
 app.get('/index', (req, res) => { 
 res.send("Welcome to NodeShop Orders.") 
 }); 
 
 console.log(`Orders service listening on port ${port}`); 
 app.listen(port); 

После добавления необходимых библиотек в наше app Express конечная точка «/ index» ответит простым отправлением приветственного сообщения. Наконец, API будет прослушивать порт, который мы укажем при запуске.

Мы запустим приложение, запустив команду npm start и будем взаимодействовать с нашими API с помощью приложения Postman :

почтальон получитьзапрос{.ezlazyload}

Позже мы реализуем emails На данный момент наша orders настроена, и теперь мы можем реализовать нашу бизнес-логику.

Реализация: Служба заказов

Чтобы реализовать нашу бизнес-логику, мы начнем со orders которая будет получать наши заказы и записывать их в нашу очередь Amazon SQS.

Мы добьемся этого, представив новый маршрут и контроллер для обработки ввода заказа от конечного пользователя и отправки данных заказа в нашу очередь Amazon SQS.

Перед внедрением контроллера нам потребуется установить Amazon SDK для Node.js:

 $ npm install aws-sdk --save 

Наша новая конечная точка «/ order» получит полезную нагрузку, содержащую данные о заказе, и отправит ее в нашу очередь SQS с помощью AWS SDK:

 // ./orderssvc/index.js 
 
 // 
 // Code removed for brevity... 
 // 
 
 // Import the AWS SDK 
 const AWS = require('aws-sdk'); 
 
 // Configure the region 
 AWS.config.update({region: 'us-east-1'}); 
 
 // Create an SQS service object 
 const sqs = new AWS.SQS({apiVersion: '2012-11-05'}); 
 const queueUrl = "SQS_QUEUE_URL"; 
 
 // the new endpoint 
 app.post('/order', (req, res) => { 
 
 let orderData = { 
 'userEmail': req.body['userEmail'], 
 'itemName': req.body['itemName'], 
 'itemPrice': req.body['itemPrice'], 
 'itemsQuantity': req.body['itemsQuantity'] 
 } 
 
 let sqsOrderData = { 
 MessageAttributes: { 
 "userEmail": { 
 DataType: "String", 
 StringValue: orderData.userEmail 
 }, 
 "itemName": { 
 DataType: "String", 
 StringValue: orderData.itemName 
 }, 
 "itemPrice": { 
 DataType: "Number", 
 StringValue: orderData.itemPrice 
 }, 
 "itemsQuantity": { 
 DataType: "Number", 
 StringValue: orderData.itemsQuantity 
 } 
 }, 
 MessageBody: JSON.stringify(orderData), 
 MessageDeduplicationId: req.body['userEmail'], 
 MessageGroupId: "UserOrders", 
 QueueUrl: queueUrl 
 }; 
 
 // Send the order data to the SQS queue 
 let sendSqsMessage = sqs.sendMessage(sqsOrderData).promise(); 
 
 sendSqsMessage.then((data) => { 
 console.log(`OrdersSvc | SUCCESS: ${data.MessageId}`); 
 res.send("Thank you for your order. Check you inbox for the confirmation email."); 
 }).catch((err) => { 
 console.log(`OrdersSvc | ERROR: ${err}`); 
 
 // Send email to emails API 
 res.send("We ran into an error. Please try again."); 
 }); 
 }); 

sqsOrderData объект полезной нагрузки, определяющий данные, которые мы отправляем в очередь, в нашем случае мы определяем его как sqsOrderData.

Затем мы передаем этот объект функции sendMessage() , которая отправит наше сообщение в очередь, используя учетные данные, которые мы использовали для настройки интерфейса командной строки AWS. Наконец, мы ждем ответа и уведомляем пользователя, что его заказ был успешно получен и что он должен проверить подтверждение по электронной почте.

Чтобы протестировать orders , мы запускаем команду npm start и отправляем на localhost:8081/order :

 { 
 "itemName": "Phone case", 
 "itemPrice": "10", 
 "userEmail": " [email protected] ", 
 "itemsQuantity": "2" 
 } 

Это отправит наш заказ в orders , откуда сообщение будет отправлено в нашу очередь SQS. Мы можем просмотреть порядок в очереди SQS через консоль AWS, как показано:

sqs просмотретьсообщения{.ezlazyload}

Наша orders смогла получить заказ пользователя и успешно отправить данные в нашу очередь в Simple Queue Service .

Реализация: Служба электронной почты

Наша orders готова и уже принимает заказы от пользователей. Служба emails будет отвечать за чтение сообщений, хранящихся в очереди, и отправку пользователям электронных писем с подтверждением. Эта служба не уведомляется о размещении заказов и поэтому должна постоянно проверять очередь на наличие новых заказов.

Чтобы наша emails постоянно проверяла наличие новых заказов, мы будем использовать sqs-consumer которая будет постоянно и периодически проверять наличие новых заказов и рассылать электронные письма пользователям. sqs-consumer также удалит сообщения из очереди после успешного чтения их из очереди.

Мы начнем с установки sqs-consumer , выполнив следующую команду:

 $ npm install sqs-consumer --save 

Теперь мы можем реализовать emails следующим образом:

 const AWS = require('aws-sdk'); 
 const { Consumer } = require('sqs-consumer'); 
 
 // Configure the region 
 AWS.config.update({region: 'us-east-1'}); 
 
 const queueUrl = "SQS_QUEUE_URL"; 
 
 // Configure Nodemailer to user Gmail 
 let transport = nodemailer.createTransport({ 
 host: 'smtp.googlemail.com', 
 port: 587, 
 auth: { 
 user: 'Email address', 
 pass: 'Password' 
 } 
 }); 
 
 function sendMail(message) { 
 let sqsMessage = JSON.parse(message.Body); 
 const emailMessage = { 
 from: 'sender_email_adress', // Sender address 
 to: sqsMessage.userEmail, // Recipient address 
 subject: 'Order Received | NodeShop', // Subject line 
 html: `<p>Hi ${sqsMessage.userEmail}.</p. <p>Your order of ${sqsMessage.itemsQuantity} ${sqsMessage.itemName} has been received and is being processed.</p> <p> Thank you for shopping with us! </p>` // Plain text body 
 }; 
 
 transport.sendMail(emailMessage, (err, info) => { 
 if (err) { 
 console.log(`EmailsSvc | ERROR: ${err}`) 
 } else { 
 console.log(`EmailsSvc | INFO: ${info}`); 
 } 
 }); 
 } 
 
 // Create our consumer 
 const app = Consumer.create({ 
 queueUrl: queueUrl, 
 handleMessage: async (message) => { 
 sendMail(message); 
 }, 
 sqs: new AWS.SQS() 
 }); 
 
 app.on('error', (err) => { 
 console.error(err.message); 
 }); 
 
 app.on('processing_error', (err) => { 
 console.error(err.message); 
 }); 
 
 console.log('Emails service is running'); 
 app.start(); 

Мы создадим новое приложение sqs-consumer с помощью функции Consumer.create() и предоставим URL-адрес запроса и функцию для обработки сообщений, полученных из очереди SQS.

В нашем случае функция sendMail() берет сообщение, полученное из очереди, извлекает детали заказа пользователя, а затем отправляет электронное письмо пользователю с помощью Nodemailer . Прочтите нашу статью об отправке писем в Node.js , если вы хотите узнать больше.

Наша emails готова. Чтобы интегрировать его в наш сценарий выполнения, мы просто изменим параметр scripts в нашем package.json :

 { 
 // Truncated for brevity... 
 "scripts": { 
 "start-orders-svc": "node ./orderssvc/index.js 8081", 
 "start-emails-svc": "node ./emailssvc/index.js", 
 // Update this line 
 "start": "npm-run-all -p -r start-orders-svc start-emails-svc" 
 }, 
 // ... 
 } 

Когда мы отправляем новый заказ через orders , мы получаем следующее электронное письмо в нашем почтовом ящике:

почтовый ящикsqs{.ezlazyload}

Заключение

В этом посте мы использовали Node.js и Express для создания API, предназначенного для приема заказов пользователей и публикации деталей заказа в нашей очереди SQS на AWS. Затем мы создали другую службу для получения сообщений, размещенных в очереди, и отправки электронных писем с подтверждением пользователям, разместившим заказы.

Мы отделили логику упорядочивания от логики управления электронной почтой и объединили две службы с помощью системы очереди сообщений. Таким образом, наша orders может обрабатывать размещение заказов, в то время как emails отправляет электронные письма пользователям.

Исходный код этого проекта доступен здесь, на GitHub ./

comments powered by Disqus