Исключение базы данных в Slick 3.0 при пакетной вставке

При вставке тысяч записей за пять секунд с помощью пакетной вставки в slick 3 я получаю

org.postgresql.util.PSQLException: FATAL: sorry, too many clients already

Мой уровень доступа к данным выглядит так:

val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url, user=user, password=password, driver= jdbcDriver)



 override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = {
    val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)}
//db.close()
        res
      }

  override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int], NoStream, Write] = {
    query ++= (rowList)
  }

закрытие соединения в пакете вставки не имеет никакого эффекта... оно все еще дает ту же ошибку.

Я вызываю пакетную вставку из своего кода следующим образом:

val temp1 = list1.flatMap { li =>
        Future.sequence(li.map { trip =>
            val data = for {
              tripData <- TripDataRepository.insertQuery( trip.tripData)
              subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
            } yield ((tripData, subTripData))
            val res=db.run(data.transactionally)
          res
//db.close()
        })
      }

если я закрою соединение после моей работы здесь, как вы можете видеть в прокомментированном коде, я получаю сообщение об ошибке:

java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]

После вызова метода без Future.sequence следующим образом:

 val temp1 =list.map { trip =>
          val data = for {
            tripData <- TripDataRepository.insertQuery( trip.tripData)
            subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
          } yield ((tripData, subTripData))
          val res=db.run(data.transactionally)
          res
      }

У меня все еще есть слишком много ошибок клиентов ...


person Archana    schedule 11.06.2015    source источник
comment
извините, слишком много клиентов означает, что вы открываете очень много соединений, но никогда их не закрываете.   -  person a_horse_with_no_name    schedule 11.06.2015
comment
Можете ли вы опубликовать, как вы вызываете insertBatch и немного более неприятный код? Как предложил @a_horse_with_no_name, ошибка означает, что вы открываете слишком много соединений.   -  person Biswanath    schedule 11.06.2015
comment
li.map { trip => val data = for { tripData ‹- TripDataRepository.insertQuery( trip.tripData)// (TripDataRepository.query возвращает TripDataRepository.query.map(obj =› obj) += trip.tripData) subTripData ‹- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id)) } yield ((tripData, subTripData)) val res=db.run(data.transactionally)   -  person Archana    schedule 11.06.2015
comment
вот как я вызываю метод пакетной вставки... даже если я попытаюсь закрыть соединение в пакетной вставке после выполнения своей работы... это все та же ошибка   -  person Archana    schedule 11.06.2015
comment
Можете ли вы проверить, сколько подключений разрешает ваша база данных postgres? Пожалуйста, используйте эту команду на postgres show max_connections   -  person Biswanath    schedule 11.06.2015
comment
максимальное количество подключений 100   -  person Archana    schedule 11.06.2015
comment
Еще один запрос, можете ли вы изменить форму future.sequence на плоскую карту. Просто запускать действия последовательно, а не параллельно из future.sequence?   -  person Biswanath    schedule 11.06.2015
comment
Мне нужно использовать параллельные выполнения, поэтому я использую фьючерсы, я не могу использовать плоскую карту. В slick 3 ... все вернется в будущее   -  person Archana    schedule 11.06.2015
comment
Я считаю, что future.sequence выполняет ваши запросы параллельно, что истощает соединение. Хотел поработать без него, чтобы убедиться, что так и есть. Это больше похоже на то, что если вы знаете, где ошибка, вы можете двигаться вперед, чтобы исправить ее.   -  person Biswanath    schedule 11.06.2015
comment
может быть, это так... я проверю его выполнение без фьючерсов   -  person Archana    schedule 11.06.2015
comment
Давайте продолжим обсуждение в чате.   -  person Archana    schedule 11.06.2015


Ответы (1)


Корень этой проблемы в том, что вы одновременно запускаете неограниченный список Future, каждый из которых подключается к базе данных — по одному на запись в list.

Эту проблему можно решить, запуская ваши вставки последовательно, заставляя каждую партию вставок зависеть от предыдущей:

// Empty Future for the results. Replace Unit with the correct type - whatever
// "res" is below.
val emptyFuture = Future.successful(Seq.empty[Unit])
// This will only insert one at a time. You could use list.sliding to batch the
// inserts if that was important.
val temp1 = list.foldLeft(emptyFuture) { (previousFuture, trip) =>
  previousFuture flatMap { previous =>
    // Inner code copied from your example.
    val data = for {
      tripData <- TripDataRepository.insertQuery(trip.tripData)
      subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
    } yield ((tripData, subTripData))
    val res = db.run(data.transactionally)
    previous :+ res
  }
}
person jkinkead    schedule 16.10.2015