Можно ли создать бесконечный поток из таблицы базы данных с помощью Akka Stream

Я играю с Akka Streams 2.4.2 и мне интересно, можно ли настроить поток, который использует таблицу базы данных в качестве источника, и всякий раз, когда в таблицу добавляется запись, эта запись материализуется и перемещается вниз по течению?

ОБНОВЛЕНИЕ: 23 февраля 2016 г.

Я реализовал решение от @PH88. Вот мое определение таблицы:

case class Record(id: Int, value: String)

class Records(tag: Tag) extends Table[Record](tag, "my_stream") {
  def id = column[Int]("id")
  def value = column[String]("value")
  def * = (id, value) <> (Record.tupled, Record.unapply)
}

Вот реализация:

 implicit val system = ActorSystem("Publisher")
 implicit val materializer = ActorMaterializer()
 val db = Database.forConfig("pg-postgres")

 try{
  val newRecStream = Source.unfold((0, List[Record]())) { n =>
    try {
      val q = for (r <- TableQuery[Records].filter(row => row.id > n._1)) yield (r)
      val r = Source.fromPublisher(db.stream(q.result)).collect {
        case rec => println(s"${rec.id}, ${rec.value}"); rec
      }.runFold((n._1, List[Record]())) {
        case ((id, xs), current) => (current.id, current :: xs)
      }

      val answer: (Int, List[Record]) = Await.result(r, 5.seconds)
      Option(answer, None)
    }
    catch { case e:Exception => println(e); Option(n, e) }
  }


   Await.ready(newRecStream.throttle(1, 1.second, 1, ThrottleMode.shaping).runForeach(_ => ()), Duration.Inf)
 }
 finally {
   system.shutdown
   db.close
 }

Но моя проблема в том, что когда я пытаюсь вызвать flatMapConcat, я получаю тип Serializable.

ОБНОВЛЕНИЕ: 24 февраля 2016 г.

Обновлено, чтобы попробовать db.run предложение от @PH88:

implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val disableAutoCommit = SimpleDBIO(_.connection.setAutoCommit(false))
val queryLimit = 1

try {
 val newRecStream = Source.unfoldAsync(0) { n =>
     val q = TableQuery[Records].filter(row => row.id > n).take(queryLimit)
     db.run(q.result).map { recs =>
       Some(recs.last.id, recs)
     }
   }
   .throttle(1, 1.second, 1, ThrottleMode.shaping)
   .flatMapConcat { recs =>
      Source.fromIterator(() => recs.iterator)
   }
   .runForeach { rec =>
       println(s"${rec.id}, ${rec.value}")
   }

   Await.ready(newRecStream, Duration.Inf)
 }
 catch
 {
   case ex: Throwable => println(ex)
 }
 finally {
   system.shutdown
   db.close
 }

Что работает (я изменил ограничение запроса на 1, так как в настоящее время у меня есть только пара элементов в моей таблице базы данных) - за исключением того, что когда он печатает последнюю строку в таблице, программа существует. Вот мой вывод журнала:

17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/xxxxxxx/dev/src/scratch/scala/fpp-in-scala/target/scala-2.11/classes/logback.xml]
17:09:28,062 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:09:28,064 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
17:09:28,079 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
17:09:28,102 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [application] to DEBUG
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
17:09:28,103 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
17:09:28,104 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@4278284b - Registering current configuration as safe fallback point
17:09:28.117 [main] INFO  com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
1, WASSSAAAAAAAP!
2, WHAAAAT?!?
3, booyah!
4, what!
5, This rocks!
6, Again!
7, Again!2
8, I love this!
9, Akka Streams rock
10, Tuning jdbc
17:09:39.000 [main] INFO  com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.

Process finished with exit code 0

Нашел недостающую часть - нужно заменить это:

Some(recs.last.id, recs)

с этим:

 val lastId = if(recs.isEmpty) n else recs.last.id
 Some(lastId, recs)

Вызов recs.last.id выдавал java.lang.UnsupportedOperationException: empty.last, когда набор результатов был пуст.


person Mark J Miller    schedule 19.02.2016    source источник
comment
Самое близкое, что я смог найти, это slick.typesafe.com/doc/3.1. 1/dbio.html#потоковая передача . Но это похоже на потоковую передачу результатов из статического ResultSet, который не будет предоставлять обновления в реальном времени...   -  person Ramón J Romero y Vigil    schedule 20.02.2016
comment
@Mark, вы можете упростить свой код, используя db.run. Смотрите мой обновленный ответ ниже   -  person PH88    schedule 24.02.2016


Ответы (2)


В общем, база данных SQL является «пассивной» конструкцией и не вносит активно изменений, подобных тому, что вы описали. Вы можете только «имитировать» «push» с периодическим опросом, например:

val newRecStream = Source

  // Query for table changes
  .unfold(initState) { lastState =>
    // query for new data since lastState and save the current state into newState...
    Some((newState, newRecords))
  }

  // Throttle to limit the poll frequency
  .throttle(...)  

  // breaks down into individual records...
  .flatMapConcat { newRecords =>
    Source.unfold(newRecords) { pendingRecords =>
      if (records is empty) {
        None
      } else {
        // take one record from pendingRecords and save to newRec.  Save the rest into remainingRecords.
        Some(remainingRecords, newRec)
      }
    }
  }

Обновлено: 24 февраля 2016 г.

Пример псевдокода, основанный на обновлении вопроса от 23.02.2016:

implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val queryLimit = 10
try {
  val completion = Source
    .unfoldAsync(0) { lastRowId =>
      val q = TableQuery[Records].filter(row => row.id > lastRowId).take(queryLimit)
      db.run(q.result).map { recs =>
        Some(recs.last.id, recs)
      }
    }
    .throttle(1, 1.second, 1, ThrottleMode.shaping)
    .flatMapConcat { recs =>
      Source.fromIterator(() => recs.iterator)
    }
    .runForeach { rec =>
      println(s"${rec.id}, ${rec.value}")
    }

  // Block forever
  Await.ready(completion, Duration.Inf)

} catch {
  case ex: Throwable => println(ex)
} finally {
  system.shutdown
  db.close
}

Он будет многократно выполнять запрос в unfoldAsync к базе данных, извлекая не более 10 (queryLimit) записей за раз и отправляя записи вниз по течению (-> throttle -> flatMapConcat -> runForeach). Await в конце фактически заблокирует навсегда.

Обновлено: 25 февраля 2016 г.

Исполняемый код «доказательства концепции»:

import akka.actor.ActorSystem
import akka.stream.{ThrottleMode, ActorMaterializer}
import akka.stream.scaladsl.Source
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

object Infinite extends App{
  implicit val system = ActorSystem("Publisher")
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()
  case class Record(id: Int, value: String)
  try {
    val completion = Source
      .unfoldAsync(0) { lastRowId =>
        Future {
          val recs = (lastRowId to lastRowId + 10).map(i => Record(i, s"rec#$i"))
          Some(recs.last.id, recs)
        }
      }
      .throttle(1, 1.second, 1, ThrottleMode.Shaping)
      .flatMapConcat { recs =>
        Source.fromIterator(() => recs.iterator)
      }
      .runForeach { rec =>
        println(rec)
      }

    Await.ready(completion, Duration.Inf)

  } catch {
    case ex: Throwable => println(ex)
  } finally {
    system.shutdown
  }
}
person PH88    schedule 22.02.2016
comment
Пытаюсь следовать вашему псевдокоду, но я все еще новичок. Я застреваю, когда вызываю concatFlatMap, потому что переданный аргумент имеет тип Serializeable. Это потому что я звоню Await? Что мне делать вместо этого? - person Mark J Miller; 23.02.2016
comment
Разобрался... добавил Await.ready(newRecStream.throttle(.... Спасибо за Ваш ответ. - person Mark J Miller; 23.02.2016
comment
Мне нравится упрощенный синтаксис, но запрос больше не бесконечен — он завершается в конце потока. Я хочу, чтобы он продолжал слушать вечно, как только появится новая пластинка, он должен взять ее и выпустить. - person Mark J Miller; 24.02.2016
comment
Марк, нет, это бесконечно. Он будет неоднократно выполнять запрос в unfoldAsync к БД, извлекая не более 10 (queryLimit) записей за раз и отправляя записи вниз по течению (-› дроссель - > flatMapConcat - > runForeach). Последний Await... фактически заблокируется навсегда. - person PH88; 24.02.2016
comment
Я просто добавляю еще один фрагмент кода (настоящий исполняемый код :-)), чтобы проиллюстрировать концепцию. Можешь попробовать запустить и посмотреть. - person PH88; 24.02.2016
comment
Да, ваше последнее обновление от 25.02.2016 работает бесконечно. Однако если вы измените реализацию для использования запроса к базе данных, процесс завершится, как только запрос вернет пустой результат. Я предполагаю, что что-то в реализации базы данных вызывает результат Done и закрывает поток. Я все еще копаюсь в том, почему. - person Mark J Miller; 25.02.2016
comment
Оказывается, recs.last.id выбрасывал java.lang.UnsupportedOperationException: empty.last, поэтому он завершался. Однажды я добавил проверку для isEmpty, которая это исправила. Я обновил OP, чтобы продемонстрировать исправление. - person Mark J Miller; 25.02.2016

Вот рабочий код бесконечной потоковой передачи базы данных. Это было протестировано с миллионами записей, вставленных в базу данных postgresql во время работы потокового приложения -

package infinite.streams.db

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.slick.scaladsl.SlickSession
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.slf4j.LoggerFactory
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor}

case class Record(id: Int, value: String) {
  val content = s"<ROW><ID>$id</ID><VALUE>$value</VALUE></ROW>"
}

object InfiniteStreamingApp extends App {

  println("Starting app...")

  implicit val system: ActorSystem = ActorSystem("Publisher")
  implicit val ec: ExecutionContextExecutor = system.dispatcher
  implicit val materializer: ActorMaterializer = ActorMaterializer()

  println("Initializing database configuration...")
  val databaseConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig[JdbcProfile]("postgres3")
  implicit val session: SlickSession = SlickSession.forConfig(databaseConfig)

  import databaseConfig.profile.api._

  class Records(tag: Tag) extends Table[Record](tag, "test2") {
    def id = column[Int]("c1")

    def value = column[String]("c2")

    def * = (id, value) <> (Record.tupled, Record.unapply)
  }

  val db = databaseConfig.db

  println("Prime for streaming...")

  val logic: Flow[(Int, String), (Int, String), NotUsed] = Flow[(Int, String)].map {
    case (id, value) => (id, value.toUpperCase)
  }

  val fetchSize = 5
  try {
    val done = Source
      .unfoldAsync(0) {
      lastId =>
        println(s"Fetching next: $fetchSize records with id > $lastId")
        val query = TableQuery[Records].filter(_.id > lastId).take(fetchSize)
        db.run(query.result.withPinnedSession)
          .map {
            recs => Some(recs.last.id, recs)
          }
    }
      .throttle(5, 1.second, 1, ThrottleMode.shaping)
      .flatMapConcat {
        recs => Source.fromIterator(() => recs.iterator)
      }
      .map(x => (x.id, x.content))
      .via(logic)
      .log("*******Post Transformation******")
//      .runWith(Sink.foreach(r => println("SINK: " + r._2)))
// Use runForeach or runWith(Sink)
      .runForeach(rec => println("REC: " + rec))

    println("Waiting for result....")
    Await.ready(done, Duration.Inf)

  } catch {
    case ex: Throwable => println(ex.getMessage)
  } finally {
    println("Streaming end successfully")
    db.close()
    system.terminate()
  }

}



application.conf

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
}
# Load using SlickSession.forConfig("slick-postgres")
postgres3 {
  profile = "slick.jdbc.PostgresProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost/testdb"
      user = "postgres"
      password = "postgres"
    }
    numThreads = 2
  }
}
person ASD    schedule 30.07.2019