Иногда в приложение нужно внедрить функцию отложенного или повторяющегося с некоторой периодичностью действия. Например, отправить пуш через 10 минут или каждый день очищать временную папку.
Чтобы решить такую задачу, можно использовать cron задачи — автоматический запуск скриптов на сервере в определенное время — или пакет node-schedule (библиотека для планирования задач типа cron на Node.js).
Но при применении этих решений возникнет проблема при скейлинге:
- Если есть много серверов или запущенных сервисов, может быть не понятно, на каком именно запускать эту задачу
- Выбранный сервер может упасть
- Нода может удалиться из-за освобождения ресурсов
Одно из возможных решений этой проблемы — брокер сообщений RabbitMQ. Схема реализации следующая:
1. Создаем 2 обменника: обычный и delayed
export const HELLO_EXCHANGE = Object.freeze({
name: 'hello',
type: 'direct',
options: {
durable: true,
},
queues: {},
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
name: 'helloDelayed',
type: 'direct',
options: {
durable: true,
},
queues: {},
});
2. Создаем в них очереди с одинаковыми типом связывания данных (binding), но разными именами (name)
Для HELLO_EXCHANGE:
queues: {
WORLD: {
name: 'hello.world', // subscribe to this queue
binding: 'hello.world',
options: {
durable: true,
},
},
},
Для HELLO_DELAYED_EXCHANGE:
queues: {
WORLD: {
name: 'helloDelayed.world',
binding: 'hello.world',
options: {
durable: true,
queueMode: 'lazy', // set the message to remain in the hard memory
},
}
Для очереди delayed-обменника указываем аргумент x-dead-letter-exchange с именем обычной очереди. Аргумент указывает брокеру перенести сообщение в этот обменник, если сообщение не обработается.
arguments: {
'x-dead-letter-exchange': HELLO_EXCHANGE.name, // указываем в какую очередь должно переместиться сообщение после своей смерти
}
3. Паблишим сообщение в очередь delayed-обменника с указанием срока действия — expiration
// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
exchangeName: exchangeNameDelayed,
queue: WORLD_DELAYED,
expirationInMs: 30000, //указываем, через сколько сообщение должно умереть (30s)
});
По истечении срока expiration сообщение автоматически перенесется в очередь обычного обменника.
Теперь осталось только создать подписчика (consumer) на очередь обычного обменника:
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_EXCHANGE.queues.WORLD.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
// await publishHelloDelayedWorld({ name: payload.name }); // если необходимо вновь обработать сообщение
};
Profit!
Если нужно выполнять действие периодично, в конце consumer снова паблишим сообщение в delayed-очередь.
// await publishHelloDelayedWorld({ name: payload.name });
NOTE: RabbitMQ работает по принципу FIFO (first in, first out) — команды обрабатываются в том порядке, в котором были заданы. То есть, если запаблишим в одну очередь сначала сообщение с expiration в 1 день, а потом еще одно с expiration в 1 минуту, второе сообщение обработается только после того как обработается первое, и целевое действие, заданное второму сообщению, произойдет через минуту после обработки первого.
В конечном итоге получаем такую цепочку:
- Создаем обменники и очереди:
// services/base-service/src/broker/const/exchanges.ts
export const HELLO_EXCHANGE = Object.freeze({
name: 'hello',
type: 'direct',
options: {
durable: true,
},
queues: {
WORLD: {
name: 'hello.world', // подписываемся на эту очередь
binding: 'hello.world',
options: {
durable: true,
},
},
},
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
name: 'helloDelayed',
type: 'direct',
options: {
durable: true,
queueMode: 'lazy', // указываем, что сообщение должно сохраниться в жесткой памяти
},
queues: {
WORLD: {
name: 'helloDelayed.world',
binding: 'hello.world',
options: {
durable: true,
queueMode: 'lazy', // указываем чтобы сообщение хранилось в жесткой памяти arguments: {
'x-dead-letter-exchange': HELLO_EXCHANGE.name, // указываем в какую очередь должно переместиться сообщение после своей смерти
},
},
},
},
});
- Добавляем publisher, который будет отправлять сообщение в delayed очередь:
// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
exchangeName: exchangeNameDelayed,
queue: WORLD_DELAYED,
expirationInMs: 30000, // set when the message dies (in 30s)
});
- Добавляем consumer для очереди из обычного обменника:
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_EXCHANGE.queues.WORLD.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
// await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
Profit!
Есть плагин, который будет делать эту работу за нас — в нем реализация проще. Нужно только создать один обменник, одну очередь, один паблишер и один консьюмер.
При паблише плагин самостоятельно обработает сообщение и по истечении времени доставит сообщение в нужную очередь.
При реализации через плагин сообщения обрабатываются в порядке истечения задержки — delay. То есть, если запаблишим сначала сообщение с delay в 1 день, потом еще одно с delay 1 в минуту, второе сообщение обработается раньше, чем первое.
// services/base-service/src/broker/const/exchanges.ts
export const HELLO_PLUGIN_DELAYED_EXCHANGE = Object.freeze({
name: 'helloPluginDelayed',
type: 'x-delayed-message', // specify the delayed queue
options: {
durable: true,
arguments: {
'x-delayed-type': 'direct', // set the recipient },
},
queues: {
WORLD_PLUGIN_DELAYED: {
name: 'helloPluginDelayed.world', // subscribe to the queue
binding: 'helloPluginDelayed.world',
options: {
durable: true,
},
},
},
});
Добавляем publisher, который будет отправлять сообщение в delayed-очередь:
export const publishHelloPluginDelayedWorld = createPublisher({
exchangeName: exchangeNamePluginDelayed,
queue: WORLD_PLUGIN_DELAYED,
delayInMs: 60000, // specify when the message should die (60s)
});
Добавляем consumer для очереди:
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_PLUGIN_DELAYED_EXCHANGE.queues.WORLD_PLUGIN_DELAYED.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
};
Посмотреть реализацию можно в репозитории.
Мы уже использовали это во многих наших проектах. Например, в Janson Media internet TV. Это прокат фильмов, только в их цифровом варианте.
В этом проекте мы использовали RabbitMQ для реализации 3 основных функциональностей сервиса: отправка SMS и имейлов юзерам с напоминанием о том, что срок аренды истекает; отправка сообщений о совершении оплаты в сокет и выдача соответствующего уведомления пользователю; отправка загруженных видео на дальнейшую обработку.
Комментарии