0 Вопрос: Pymongo объемное время постоянно увеличивается

вопрос создан в Fri, Mar 29, 2019 12:00 AM

Я работаю над проектом, в котором мне нужно обработать огромное количество данных.

В рамках обработки я прохожу следующие шаги -

  1. Читать строку CSV
  2. Перевести строку CSV в словарь
  3. Bulk Upsert 1000 строк в MongoDB
  4. Итерация до конца файла CSV

Логика довольно проста и работает без проблем для миллионов записей.

Однако bulk_write Время постоянно увеличивается.

Первый вызов занимает 0,55 секунды и начинает увеличиваться для следующих пакетов.

Статистика

 введите описание изображения здесь

  

РЕДАКТИРОВАТЬ - Добавление журналов базы данных

Добавление журналов базы данных, чтобы доказать, что время базы данных постоянно увеличивается (показано в конце строки журнала)

2019-03-29T10:55:43.726+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 4439, w: 4439 } }, Database: { acquireCount: { w: 4438, W: 1 } }, Collection: { acquireCount: { w: 4438 } } } protocol:op_query 531ms

2019-03-29T10:55:45.214+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 12250, w: 12250 } }, Database: { acquireCount: { w: 12250 } }, Collection: { acquireCount: { w: 12250 } } } protocol:op_query 1040ms

2019-03-29T10:55:47.041+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 20059, w: 20059 } }, Database: { acquireCount: { w: 20059 } }, Collection: { acquireCount: { w: 20059 } } } protocol:op_query 1406ms

2019-03-29T10:55:49.390+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 27870, w: 27870 } }, Database: { acquireCount: { w: 27870 } }, Collection: { acquireCount: { w: 27870 } } } protocol:op_query 1733ms

2019-03-29T10:55:52.099+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 35679, w: 35679 } }, Database: { acquireCount: { w: 35679 } }, Collection: { acquireCount: { w: 35679 } } } protocol:op_query 2209ms

2019-03-29T10:55:55.046+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 43503, w: 43503 } }, Database: { acquireCount: { w: 43503 } }, Collection: { acquireCount: { w: 43503 } } } protocol:op_query 2453ms

2019-03-29T10:55:58.724+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 51325, w: 51325 } }, Database: { acquireCount: { w: 51325 } }, Collection: { acquireCount: { w: 51325 } } } protocol:op_query 3146ms

2019-03-29T10:56:02.565+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 59127, w: 59127 } }, Database: { acquireCount: { w: 59127 } }, Collection: { acquireCount: { w: 59127 } } } protocol:op_query 3413ms

2019-03-29T10:56:06.820+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 66941, w: 66941 } }, Database: { acquireCount: { w: 66941 } }, Collection: { acquireCount: { w: 66941 } } } protocol:op_query 3868ms

2019-03-29T10:56:11.890+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 74747, w: 74747 } }, Database: { acquireCount: { w: 74747 } }, Collection: { acquireCount: { w: 74747 } } } protocol:op_query 4612ms

2019-03-29T10:56:17.461+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 82567, w: 82567 } }, Database: { acquireCount: { w: 82567 } }, Collection: { acquireCount: { w: 82567 } } } protocol:op_query 5118ms

2019-03-29T10:56:23.417+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 90397, w: 90397 } }, Database: { acquireCount: { w: 90397 } }, Collection: { acquireCount: { w: 90397 } } } protocol:op_query 5502ms

2019-03-29T10:56:30.232+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 98209, w: 98209 } }, Database: { acquireCount: { w: 98209 } }, Collection: { acquireCount: { w: 98209 } } } protocol:op_query 6377ms

2019-03-29T10:56:37.326+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 106014, w: 106014 } }, Database: { acquireCount: { w: 106014 } }, Collection: { acquireCount: { w: 106014 } } } protocol:op_query 6606ms

2019-03-29T10:56:44.692+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 113818, w: 113818 } }, Database: { acquireCount: { w: 113818 } }, Collection: { acquireCount: { w: 113818 } } } protocol:op_query 6970ms

2019-03-29T10:56:53.036+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 121635, w: 121635 } }, Database: { acquireCount: { w: 121635 } }, Collection: { acquireCount: { w: 121635 } } } protocol:op_query 7678ms

2019-03-29T10:57:01.974+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 129463, w: 129463 } }, Database: { acquireCount: { w: 129463 } }, Collection: { acquireCount: { w: 129463 } } } protocol:op_query 8385ms

2019-03-29T10:57:10.969+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 137277, w: 137277 } }, Database: { acquireCount: { w: 137277 } }, Collection: { acquireCount: { w: 137277 } } } protocol:op_query 8517ms

2019-03-29T10:57:20.760+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 145089, w: 145089 } }, Database: { acquireCount: { w: 145089 } }, Collection: { acquireCount: { w: 145089 } } } protocol:op_query 9189ms

2019-03-29T10:57:30.568+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 152891, w: 152891 } }, Database: { acquireCount: { w: 152891 } }, Collection: { acquireCount: { w: 152891 } } } protocol:op_query 9294ms

код

# save records in database
def insert_car_records(db, records, header):
    try:
        if db and header and records:
            a = datetime.datetime.now()
            coll = db.get_collection("car_records")
            db_cars = convert_csv_row(header, records)
            rq = []
            for car in db_cars:
                obj = { "$set": car }
                rq.append(UpdateOne({"uid": car["uid"]}, obj, upsert=True))
            bulkop = coll.bulk_write(rq, ordered=False)
            b = datetime.datetime.now()
            delta = b - a
            print("nInserted count - {}".format(bulkop.bulk_api_result["nInserted"]))
            print("nMatched count - {}".format(bulkop.bulk_api_result["nMatched"]))
            print("nModified count - {}".format(bulkop.bulk_api_result["nModified"]))
            print("nRemoved count - {}".format(bulkop.bulk_api_result["nRemoved"]))
            print("nUpserted count - {}".format(bulkop.bulk_api_result["nUpserted"]))
            print("time is seconds - {}".format(delta.total_seconds()))
    except BulkWriteError as bwe:
        print(bwe.details)
        raise

Как показано, вышеупомянутая функция вызывается после обработки 1000 строк CSV.

Код для генерации 1000 строк очень прост:

def init():
    quotechar = '"'
    paginated_rows = []
    page_size = 1000
    for line in request.iter_lines(decode_unicode=True):
        if len(paginated_rows) == page_size:
            insert_car_records(database, paginated_rows)
            paginated_rows = []
        #  prepare new row
        paginated_rows.append(generate_row(line, quotechar))

Вы можете игнорировать синтаксические ошибки в коде, так как я изменил его, прежде чем вставлять его здесь

Я знаком с mongo и понимаю, как массовые операции могут повысить производительность.

Тем не менее, я новичок в Python, и я думаю, что мне здесь чего-то не хватает.

Спасибо.

    
0
  1. Это было бы ожидаемо. Это просто характер "upserts", потому что вы сопоставляете документ, чтобы либо обновить его, либо вставить, если совпадения не найдено. Было бы хуже без индекса для поля, используемого в предикате, но также был бы эффект даже с индексом, так как сам индекс должен быть добавлен к с каждым upsert . Одни только это факты, которые могут привести к постепенному увеличению продолжительности.
    2019-03-29 11: 10: 37Z
  2. Спасибо @NeilLunn за быстрый ответ. Я понимаю эти проблемы, но я не ожидал, что обработка будет такой медленной. Я работал над множеством приложений NodeJ с миллионами строк CSV, обрабатываемых в течение нескольких минут. Я использовал точно такой же подход, но разница в производительности огромна. Я имею в виду 10 секунд для обновления 1000 строк в 20-й итерации, просто не имеет никакого смысла, не так ли?
    2019-03-29 11: 14: 33Z
  3. Но NodeJS "асинхронный", тогда как ваш код Python - нет. В этом и заключается «яблоки и апельсины». Вы можете написать «асинхронный» код на python, просто вы этого не сделали.
    2019-03-29 11: 16: 18Z
  4. Ну, это один из способов решения этой проблемы. Я это попробую. Однако, если мы посмотрим на журналы базы данных, сама база данных занимает дополнительное время, и скорость ее увеличения огромна. Даже если я делаю синхронные операции, я не ожидал, что MongoDb будет работать медленнее. В любом случае, я попробую решения indexing и async и вернусь с результатами. Благодаря
    2019-03-29 11: 21: 00Z
0 ответов                              0                         
источник размещен Вот