Как прослушать изменения в коллекции MongoDB?

120

Я создаю своего рода систему очереди заданий с MongoDB как хранилище данных. Как я могу "прослушать" вставки в коллекцию MongoDB до того, как вы разложите рабочих для обработки задания? Нужно ли мне опроса каждые несколько секунд, чтобы увидеть, есть ли какие-либо изменения с последнего времени, или есть способ, которым мой script может ждать появления вставок? Это проект PHP, над которым я работаю, но не стесняйтесь отвечать на Ruby или язык агностик.

  • 0
    Изменение потока было добавлено в MongoDB 3.6 для решения вашего сценария. docs.mongodb.com/manual/changeStreams Также, если вы используете MongoDB Atlas, вы можете использовать триггеры Stitch, которые позволяют вам выполнять функции в ответ на вставку / обновление / удаление / и т.д. docs.mongodb.com/stitch/triggers/overview Больше нет необходимости разбирать оплог.
Теги:

7 ответов

78
Лучший ответ

MongoDB имеет то, что называется capped collections и tailable cursors, который позволяет MongoDB передавать данные слушателям.

A capped collection - это, по сути, набор, который является фиксированным размером и допускает только вставки. Вот как это будет выглядеть:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable cursors (оригинальный пост Джонатана Х. Ваге)

рубин

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (Роберт Стюарт)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (Max)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Дополнительные ресурсы:

Ruby/ Node.js Учебное пособие, в котором вы можете создать приложение, которое прослушивает вставки в коллекции, собранной MongoDB.

Более подробно статья о хвостовых курсорах.

PHP, Ruby, Python и Perl примеры использования хвостовых курсоров.

  • 63
    спать 1? действительно? для производственного кода? как это не опрос?
  • 2
    @ rbp ха-ха, я никогда не говорил, что это производственный код, но ты прав, спать на секунду не очень хорошая практика. Уверен, я получил этот пример откуда-то еще. Не уверен, как реорганизовать это все же.
Показать ещё 8 комментариев
91

То, о чем вы думаете, очень похоже на триггеры. У MongoDB нет поддержки триггеров, однако некоторые люди "сворачивали", используя некоторые трюки. Ключ здесь - это oplog.

Когда вы запускаете MongoDB в наборе реплик, все действия MongoDB записываются в журнал операций (известный как oplog). Oplog - это в основном простой список изменений, внесенных в данные. Replicas Устанавливает функцию, слушая изменения в этом всплеске, а затем применяя изменения локально.

Звучит ли это знакомо?

Я не могу подробно описать весь процесс здесь, это несколько страниц документации, но необходимые инструменты доступны.

Сначала некоторые рецензии на oplog  - Краткое описание  - Макет local коллекции (который содержит oplog)

Вы также захотите использовать tailable cursors. Они предоставят вам возможность прослушать изменения вместо опроса для них. Обратите внимание, что репликация использует хвостовые курсоры, поэтому это поддерживаемая функция.

  • 1
    хм ... не совсем то, что я имел в виду. На данный момент я работаю только с одним экземпляром (без рабов). Так может быть, более простое решение?
  • 15
    Вы можете запустить сервер с параметром --replSet и он создаст / заполнит oplog . Даже без вторичного. Это, безусловно, единственный способ «прослушать» изменения в БД.
Показать ещё 3 комментария
5

Так как MongoDB 3.6 появится новый API уведомлений, который вы можете использовать для этого. См. это сообщение в блоге для примера. Пример из него:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
  • 3
    Как убить производительность на вашем сервере.
  • 4
    Зачем? Можете ли вы уточнить? Это стандартный способ сейчас?
Показать ещё 3 комментария
3

В качестве альтернативы вы можете использовать стандартный метод Mongo FindAndUpdate, а в обратном вызове запускать событие EventEmitter (в Node), когда выполняется обратный вызов.

Любые другие части приложения или архитектуры, прослушивающие это событие, будут уведомлены об обновлении и любые соответствующие данные, отправленные туда. Это действительно простой способ получения уведомлений от Mongo.

  • 0
    это очень неэффективно ... вы блокируете БД для каждого FindAndUpdate!
  • 1
    Я полагаю, что Алекс отвечал на немного другой (не специально адресованный вставки), но связанный вопрос о том, как отключить какое-то уведомление для клиентов, когда состояние задания в очереди изменяется, которое, как мы предполагаем, должно произойти, когда задания появляются. , успешно завершить или не выполнить. Когда клиенты подключены к узлу с помощью веб-сокетов, все они могут получать уведомления об изменениях с помощью широковещательного события обратного вызова FIndAndUpdate, которое может вызываться при получении сообщений об изменении состояния. Я бы сказал, что это не неэффективно, так как обновления должны быть сделаны.
1

Многие из этих ответов будут давать вам новые записи, а не обновления и/или крайне неэффективны

Единственный надежный, эффективный способ сделать это - создать хвостовой курсор в локальной коллекции db: oplog.rs, чтобы получить ВСЕ изменения в MongoDB и сделать с ними то, что вы сделаете. (MongoDB даже делает это внутренне более или менее для поддержки репликации!)

Объяснение того, что содержит oplog: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Пример библиотеки Node.js, которая предоставляет API вокруг того, что можно сделать с помощью oplog: https://github.com/cayasso/mongo-oplog

1

Существует рабочий пример java, который можно найти здесь.

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

Ключ QUERY OPTIONS, указанный здесь.

Также вы можете изменить запрос поиска, если вам не нужно каждый раз загружать все данные.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
-2

Собственно, вместо просмотра вывода, почему вы не замечаете, когда что-то новое вставлено с помощью промежуточного продукта, которое было предоставлено схема mongoose

Вы можете поймать событие вставки нового документа и сделать что-то после этой вставки

  • 0
    Виноват. Простите, сэр.

Ещё вопросы

Сообщество Overcoder
Наверх
Меню