Иногда в приложение нужно внедрить функцию отложенного или повторяющегося с некоторой периодичностью действия. Например, отправить пуш через 10 минут или каждый день очищать временную папку.

Чтобы решить такую задачу, можно использовать cron задачи — автоматический запуск скриптов на сервере в определенное время — или пакет node-schedule (библиотека для планирования задач типа cron на Node.js).

Но при применении этих решений возникнет проблема при скейлинге:

  • Если есть много серверов или запущенных сервисов, может быть не понятно, на каком именно запускать эту задачу
  • Выбранный сервер может упасть
  • Нода может удалиться из-за освобождения ресурсов

Одно из возможных решений этой проблемы — брокер сообщений RabbitMQ. Схема реализации следующая:

1. Создаем 2 обменника: обычный и delayed

 
 export const HELLO_EXCHANGE = Object.freeze({  
    name: 'hello', 
    type: 'direct', 
    options: { 
        durable: true, 
     }, 
   queues: {}, 
}); 
export const HELLO_DELAYED_EXCHANGE = Object.freeze({ 
    name: 'helloDelayed', 
    type: 'direct', 
    options: { 
        durable: true, 
    }, 
   queues: {}, 
}); 
 

2. Создаем в них очереди с одинаковыми типом связывания данных (binding), но разными именами (name)

Для HELLO_EXCHANGE:

 
 queues: {  
        WORLD: { 
            name: 'hello.world', // subscribe to this queue 
            binding: 'hello.world', 
            options: { 
                durable: true, 
            }, 
        }, 
    }, 
 

Для HELLO_DELAYED_EXCHANGE:

 
 queues: { 
        WORLD: { 
            name: 'helloDelayed.world', 
            binding: 'hello.world', 
            options: { 
                durable: true, 
                queueMode: 'lazy', // set the message to remain in the hard memory 
            }, 
        } 
 

Для очереди delayed-обменника указываем аргумент x-dead-letter-exchange с именем обычной очереди. Аргумент указывает брокеру перенести сообщение в этот обменник, если сообщение не обработается.

 
  arguments: { 
                    'x-dead-letter-exchange': HELLO_EXCHANGE.name, // указываем в какую очередь должно переместиться сообщение после своей смерти 
                } 
 

3. Паблишим сообщение в очередь delayed-обменника с указанием срока действия — expiration

 
 // services/base-service/src/broker/hello/publisher.ts 
export const publishHelloDelayedWorld = createPublisher({ 
    exchangeName: exchangeNameDelayed, 
    queue: WORLD_DELAYED, 
    expirationInMs: 30000, //указываем, через сколько сообщение должно умереть (30s)  
});
 

По истечении срока expiration сообщение автоматически перенесется в очередь обычного обменника.

Теперь осталось только создать подписчика (consumer) на очередь обычного обменника:

 
 // services/base-service/src/broker/hello/consumer.ts 
export const initHelloExchange = () => Promise.all([ 
    createConsumer( 
        { 
            queueName: HELLO_EXCHANGE.queues.WORLD.name, 
            prefetch: 50, 
            log: true, 
        }, 
        controller.consumeHelloWorld, 
    ), 
]); 
// services/base-service/src/broker/hello/controller.ts 
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => { 
    const result = await world({ name: payload.name }); 
    logger.info(result.message); 
    // await publishHelloDelayedWorld({ name: payload.name }); // если необходимо вновь обработать сообщение
};
 

Profit!

Если нужно выполнять действие периодично, в конце consumer снова паблишим сообщение в delayed-очередь.

 
    // await publishHelloDelayedWorld({ name: payload.name }); 
 

NOTE: RabbitMQ работает по принципу FIFO (first in, first out) — команды обрабатываются в том порядке, в котором были заданы. То есть, если запаблишим в одну очередь сначала сообщение с expiration в 1 день, а потом еще одно с expiration в 1 минуту, второе сообщение обработается только после того как обработается первое, и целевое действие, заданное второму сообщению, произойдет через минуту после обработки первого.

В конечном итоге получаем такую цепочку:

  1. Создаем обменники и очереди:
 
 // services/base-service/src/broker/const/exchanges.ts 
export const HELLO_EXCHANGE = Object.freeze({ 
    name: 'hello', 
    type: 'direct', 
    options: {
        durable: true, 
     }, 
     queues: {  
        WORLD: { 
            name: 'hello.world', // подписываемся на эту очередь 
            binding: 'hello.world', 
            options: { 
            durable: true, 
            }, 
        }, 
    }, 
}); 
export const HELLO_DELAYED_EXCHANGE = Object.freeze({ 
    name: 'helloDelayed', 
    type: 'direct', 
    options: { 
        durable: true, 
        queueMode: 'lazy', // указываем, что сообщение должно сохраниться в жесткой памяти 
    }, 
    queues: { 
        WORLD: { 
            name: 'helloDelayed.world', 
            binding: 'hello.world', 
            options: { 
                durable: true, 
                queueMode: 'lazy', // указываем чтобы сообщение хранилось в жесткой памяти                arguments: { 
                    'x-dead-letter-exchange': HELLO_EXCHANGE.name, //  указываем в какую очередь должно переместиться сообщение после своей смерти 
                }, 
            }, 
        }, 
    }, 
}); 
 
  1. Добавляем publisher, который будет отправлять сообщение в delayed очередь:
 
 // services/base-service/src/broker/hello/publisher.ts 
export const publishHelloDelayedWorld = createPublisher({ 
    exchangeName: exchangeNameDelayed, 
    queue: WORLD_DELAYED, 
    expirationInMs: 30000, // set when the message dies (in 30s) 
});
 
  1. Добавляем consumer для очереди из обычного обменника:
 
 // services/base-service/src/broker/hello/consumer.ts 
export const initHelloExchange = () => Promise.all([ 
    createConsumer( 
        { 
            queueName: HELLO_EXCHANGE.queues.WORLD.name, 
            prefetch: 50, 
            log: true, 
        }, 
        controller.consumeHelloWorld, 
    ), 
]); 
// services/base-service/src/broker/hello/controller.ts 
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => { 
    const result = await world({ name: payload.name }); 
    logger.info(result.message); 
    // await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again 
};
 

Profit!

Есть плагин, который будет делать эту работу за нас — в нем реализация проще. Нужно только создать один обменник, одну очередь, один паблишер и один консьюмер.

При паблише плагин самостоятельно обработает сообщение и по истечении времени доставит сообщение в нужную очередь.

При реализации через плагин сообщения обрабатываются в порядке истечения задержки — delay. То есть, если запаблишим сначала сообщение с delay в 1 день, потом еще одно с delay 1 в минуту, второе сообщение обработается раньше, чем первое.

 
 // services/base-service/src/broker/const/exchanges.ts 
export const HELLO_PLUGIN_DELAYED_EXCHANGE = Object.freeze({ 
    name: 'helloPluginDelayed', 
    type: 'x-delayed-message', // specify the delayed queue 
    options: { 
        durable: true, 
        arguments: { 
            'x-delayed-type': 'direct', // set the recipient         }, 
    }, 
    queues: { 
        WORLD_PLUGIN_DELAYED: { 
            name: 'helloPluginDelayed.world', // subscribe to the queue 
            binding: 'helloPluginDelayed.world', 
            options: { 
                durable: true, 
            }, 
        }, 
    }, 
}); 
 

Добавляем publisher, который будет отправлять сообщение в delayed-очередь:

 
 export const publishHelloPluginDelayedWorld = createPublisher({ 
    exchangeName: exchangeNamePluginDelayed, 
    queue: WORLD_PLUGIN_DELAYED, 
    delayInMs: 60000,  // specify when the message should die (60s) 
}); 
 

Добавляем consumer для очереди:

 
 // services/base-service/src/broker/hello/consumer.ts 
export const initHelloExchange = () => Promise.all([ 
    createConsumer( 
        { 
            queueName: HELLO_PLUGIN_DELAYED_EXCHANGE.queues.WORLD_PLUGIN_DELAYED.name, 
            prefetch: 50, 
            log: true, 
        }, 
        controller.consumeHelloWorld, 
    ), 
]); 
// services/base-service/src/broker/hello/controller.ts 
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => { 
    const result = await world({ name: payload.name }); 
    logger.info(result.message); 
}; 
 

Посмотреть реализацию можно в репозитории.

Мы уже использовали это во многих наших проектах. Например, в Janson Media internet TV. Это прокат фильмов, только в их цифровом варианте.

В этом проекте мы использовали RabbitMQ для реализации 3 основных функциональностей сервиса: отправка SMS и имейлов юзерам с напоминанием о том, что срок аренды истекает; отправка сообщений о совершении оплаты в сокет и выдача соответствующего уведомления пользователю; отправка загруженных видео на дальнейшую обработку.

  • Разработка