Выход из цикла с транзакциями ACID с помощью Knex SQL Query Builder и Bluebird в Node.js

Мы используем Knex SQL Query Builder для выполнения транзакции ACID в Node, и мы испытываем странное поведение при использовании цикла с Knex. Приведенный ниже код принимает массив таблиц и затем условно выполняет вставку или обновление. Первая таблица - это transactionHeader, и она обрабатывается первой. Затем строки в таблице transactionDetail обрабатываются в рамках всей транзакции. Новые ключи (rowids) накапливаются в массиве rowids.

ПРОБЛЕМА. Основная проблема заключается в том, что кажется невозможным выйти из цикла в processTransactionDetail (), если Knex вернул ошибку. Ни throw, ни return не выйдут из цикла или функции. Это означает, что в случае ошибки при обработке transactionDetail он продолжит обработку оставшихся строк перед выходом.

    let rowids: any[] = [];

    knex.transaction(function(trx) {

        // Process transactionHeader
        if (transactionHeader.rowid) {

            // Update transactionHeader
            trx('transaction')
                .transacting(trx)
                .update(transactionHeader)
                .where('rowid', transactionHeader.rowid)
                .then(function(transactionrowid) {
                    rowids.push({ table: 'transaction', rowid: transactionHeader.rowid });
                    // Update transactionDetail rows.
                    processTransactionDetail(transactionrowid, trx);
                })
                .catch(trx.rollback);

        } else {

            // Insert transactionHeader
            trx('transaction')
                .transacting(trx)
                .insert(transactionHeader, 'rowid')
                .then(function(transactionrowid) {
                    rowids.push({ table: 'transaction', rowid: transactionrowid });
                    // Insert transactionDetail rows.
                    processTransactionDetail(transactionrowid, trx);
                })
                .catch(trx.rollback);
        }

    }).then(function(inserts) {
        console.log('success!', rowids)
        callback(null, { success: true }, { data: rowids })
        return;
    }).catch(function(error) {
        console.error('error', error);
        callback(null, {
            success: false,
            message: error.message
        }, { data: rowids })
        return;
    });

    /*
    * Process transactionDetail rows.
    */
    function processTransactionDetail(transactionHeaderRowID: number, trx) {

        var promise: any;
        let table: TABLE;
        let rowid: number;

        for (let i = 1; i < tablesToProcess.length; i++) {

            table = tablesToProcess[i];
            rowid = table.data[0].rowid;

            // Update
            if (rowid) {

                for (let row = 0; row < table.data.length; row++) {

                    promise = knex(table.name)
                        .transacting(trx)
                        .update(table.data[row])
                        .where('rowid', rowid)
                        .then(function(rowid) {                                
                            rowids.push({ table: table.name, rowid: rowid });
                        .catch(function(error) {                                
                             // --------------------------------
                             // **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
                             // --------------------------------
                             throw 'error';
                             return;
                             // --------------------------------
                        })
                 } 

            // Insert
            } else {

                for (let row = 0; row < table.data.length; row++) {

                    promise = knex(table.name)
                        .transacting(trx)
                        .insert(table.data[row])
                        .then(function(rowid) {
                            rowids.push({ table: table.name, rowid: rowid });
                        })
                        .catch(function(error) {
                             // --------------------------------
                             // **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
                             // --------------------------------
                             throw 'error';
                             return;
                             // --------------------------------
                        });
                }
            }
        }

        promise.then(function(x) {
            promise.then(trx.commit);
        });
    }

ОБНОВЛЕНО: Это правильная структура? Не уверен, что обработчики ошибок внизу действительно нужны.

    knex.transaction(function(trx) {

        // Update Row
        function updateRow(table, rowid, row) {
            return knex(table.name)
                .transacting(trx)
                .update(row)
                .where('rowid', rowid)
                .then(function(rowid) {

                    rowids.push({
                        table: table.name,
                        rowid: rowid
                    });
                });
        }

        // Insert Row
        function insertRow(table, rowid, row) {
            return knex(table.name)
                .transacting(trx)
                .insert(row)
                .then(function(rowid) {

                    rowids.push({
                        table: table.name,
                        rowid: rowid
                    });
                });
        }

        // Process Tables
        Promise.mapSeries(tablesToProcess, function(table) {

            let rowid = table.data[0].rowid;

            // choose the right function to apply to the rows
            var fn = rowid ? updateRow : insertRow;

            // fn needs table and rowid
            fn = fn.bind(this, table, rowid);

            // Process Rows
            return Promise.mapSeries(table.data, fn)
                .then(function(result) {

                    // result is an array with all the knex promises result
                    return result;

                }).catch(function(err) {
                    console.log('an error happened');
                    //trx.rollback();  // QUESTION: IS THIS NEEDED?
                    throw err;     // IS THERE A WAY TO
                });

        }).then(function(result) {
            console.log('success', result);
            trx.commit();
            // callback(null, { success: true }, { data: rowids })
            // return;
        }).catch(function(error) {
            console.log('error', error);
            trx.rollback();
            callback(null, { success: false, message: error.message }, { data: rowids })
        });

    }).then(function(inserts) {
        console.log('success!', rowids)
        callback(null, { success: true }, { data: rowids })
    }).catch(function(error) {
        console.log('error', error);
        callback(null, { success: false, message: error.message }, { data: rowids })
    });

person A2MetalCore    schedule 28.01.2016    source источник


Ответы (1)


Вы имеете дело с обещаниями, поэтому вы должны использовать способ поддержки их цикла. Например, bluebird mapSeries():

var Promise = require('bluebird');

function updateRow(table, rowid, row) {
  return knex(table.name)
    .transacting(trx)
    .update(table.data[row])
    .where('rowid', rowid)
    .then(function(rowid) {
      rowids.push({
        table: table.name,
        rowid: rowid
      });
    });
}

function insertRow(table, rowid, row) {
  return knex(table.name)
    .transacting(trx)
    .insert(table.data[row])
    .then(function(rowid) {
      rowids.push({
        table: table.name,
        rowid: rowid
      });
    });
}

// if there is an error, the iteration will stop immediately
Promise.mapSeries(tablesToProcess, function(table) {
  rowid = table.data[0].rowid;

  // choose the right function to apply to the rows
  var fn = rowid ? updateRow : insertRow;
  // fn need table and rowid
  fn = fn.bind(this, table, rowid);

  // call fn for each row
  // if there is an error, the iteration will stop immediately
  return Promise.mapSeries(table.data, fn)
    .then(function(result) {
      // result is an array with all the knex promises result
      return result;
    }).catch(function(err) {
      console.log('an error happened');
      trx.rollback();
      throw err;
    });
}).then(function(result) {
  console.log('all is good');
  // you can safely commit here
  trx.commit();
}).catch(function(err) {
  console.log('an error happened');
  trx.rollback();
});

Обновить

По поводу ваших вопросов:

knex.transaction(function (trx) {
    // Update Row
    function updateRow(table, rowid, row) {
        return knex(table.name)
            .transacting(trx)
            .update(row)
            .where('rowid', rowid)
            .then(function (rowid) {
                rowids.push({
                    table: table.name,
                    rowid: rowid
                });
            });
    }

    // Insert Row
    function insertRow(table, rowid, row) {
        return knex(table.name)
            .transacting(trx)
            .insert(row)
            .then(function (rowid) {
                rowids.push({
                    table: table.name,
                    rowid: rowid
                });
            });
    }

    // you need to return here so the 'then(function (inserts)' and the catch work
    // Process Tables
    return Promise.mapSeries(tablesToProcess, function (table) {
        let rowid = table.data[0].rowid;

        // choose the right function to apply to the rows
        var fn = rowid ? updateRow : insertRow;

        // fn needs table and rowid
        fn = fn.bind(this, table, rowid);

        // Process Rows
        // if you don't do anything special in the then and the catch, you can remove them
        return Promise.mapSeries(table.data, fn)
            .then(function (result) {
                // result is an array with all the knex promises result
                return result;
            }).catch(function (err) {
                // this catch is not necessary,
                // you can remove it you don't need to do something here
                console.log('an error happened');
                //trx.rollback();  // QUESTION: IS THIS NEEDED? << // no, my mistake, the rollback is done on the other catch
                throw err;     // IS THERE A WAY TO
            });
    }).then(function (result) {
        console.log('success', result);
        trx.commit();
        return result;
        // callback(null, { success: true }, { data: rowids })
        // return;
    }).catch(function (error) {
        console.log('error', error);
        trx.rollback();
        throw error; // always rethrow error when you chain, if you don't, it's like the promise is resolved (ok) 
        // you already do this below
        // callback(null, { success: false, message: error.message }, { data: rowids });
    });
}).then(function (inserts) {
    console.log('success!', rowids)
    callback(null, { success: true }, { data: rowids })
}).catch(function (error) {
    console.log('error', error);
    callback(null, { success: false, message: error.message }, { data: rowids })
});

2 тогда и 2 внизу могут быть объединены. Кроме того, почему есть обратный вызов? Лучше не смешивать обещания и обратные вызовы, если вы не можете этого избежать.

person Shanoor    schedule 28.01.2016
comment
Это было очень полезно! Я обновил приведенный выше пример, добавив несколько вопросов. - person A2MetalCore; 28.01.2016
comment
Одна из моих проблем заключается в том, что эта конечная точка работает нормально в первый раз, но не может быть вызвана после этого. Не уверен, имеет ли какое-то отношение к Bluebird или нет. - person A2MetalCore; 28.01.2016
comment
@ ASA2 Что касается зависания, я думаю, это произошло из-за того, что в коде не хватает нескольких возвратов, поэтому некоторые обещания просто выполняются, а не ждут. О обещаниях есть хорошая статья: pouchdb. ru / 2015/05/18 / we-have-a-problem-with-promises.html Это очень полезно, чтобы избежать типичных ошибок. - person Shanoor; 29.01.2016