Как использовать Tinkerpop с актерами

Я хотел бы знать, могу ли я использовать tinkerpop в Akka Futures, до сих пор, когда я фиксирую свои изменения в графике, они не сохраняются. Я понимаю, что tinkerpop — это локальная библиотека потока, что означает, что мне нужно будет снова установить свой поток в будущем ODatabaseRecordThreadLocal.INSTANCE.set(thread)

Я безуспешно пробовал следующий метод:

def test[T](graphChanges: => T): T = {
    val thread = ODatabaseRecordThreadLocal.INSTANCE.get
    try graphChanges finally {
      ODatabaseRecordThreadLocal.INSTANCE.set(thread)
      GraphPool.get("partitioned").commit
    }
}

// collect tinkerpop frames
test {
  future {
  // add changes to my tinkerpop frames
  }
}

Я хотел бы иметь поток Tinkerpop для play.mvc.Http.Context

Вот пример проекта, которого я хочу достичь: https://github.com/D-Roch/tinkerpop-play


person Roch    schedule 02.01.2014    source источник


Ответы (2)


Эта проблема

Проблема в том, что Tinkerpop работает локально. Таким образом, ваши изменения фиксируются только в текущем потоке. При создании фьючерсов Scala вы позволяете среде выбирать, в каком потоке будет выполняться фьючерс. А среда не знает лучше, поэтому выбирает не тот поток.

Проблема аналогична для фьючерсов Akka.

В какой нити работает будущее?

Создавая будущее, вы создаете его с двумя параметрами:

  1. Блок, который должен быть выполнен
  2. Контекст выполнения, который должен выполнять блок.

Второй параметр обычно задается как неявный параметр. Но вы можете переопределить значение по умолчанию.

Решение

При создании фьючерсов, связанных с Tinkerpop, используйте контекст выполнения, который запускает каждый блок в одном потоке.

Пример:

import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors

implicit val ec=ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)

future { 
    println(Thread.currentThread); 
    future {
        println(Thread.currentThread)
    }  
}

Этот код дважды выводит один и тот же идентификатор потока на консоль (протестировано с Java 7 и Scala 2.10.2.

Внимание! Использование такого небольшого пула потоков может легко привести к взаимоблокировкам или голоданию. Используйте его только для взаимодействия с Tinkerpop.

Вы также можете предоставить специальный метод tinkerpopFuture, который принимает блок в качестве параметра и возвращает будущее, которое будет выполняться в потоке tinkerpop. Или вы можете создать специальный актор, который инкапсулирует все взаимодействия tinkerpop (и запускает их в специальном контексте выполнения tinkerpop).

Литература

person stefan.schwetschke    schedule 08.01.2014
comment
Спасибо за ваш вклад, это имеет смысл, но я действительно не знаю, как действовать, должен ли я переопределить контекст exec после этого будущего: github.com/D-Roch/tinkerpop-play/blob/master/app/models/ - person Roch; 08.01.2014
comment
Вы используете метод scala.concurrent.future, который принимает неявный второй параметр типа ExecutionContext. В вашем случае он возьмет ExecutionContext по умолчанию из import ExecutionContext.Implicits.global. Вы можете переопределить это, предоставив неявный val tinkerpopCtx : ExecutionContext = ... с вашим контекстом выполнения одного потока. Дополнительные сведения о неявных параметрах см. на странице scala-lang.org/old/node/114 - person stefan.schwetschke; 08.01.2014
comment
Спасибо, я понимаю, как работают неявные переменные, но что мне передать вместо этого, чтобы переопределить ExecutionContext? - person Roch; 09.01.2014
comment
Вы просто используете ExecutionContext, который будет запускать все в одном потоке. Вы можете легко создать его, используя java.util.concurrent.ExecutorService, например. ExecutionContext.fromExecutorService(ExecutorService.newSingleThreadExecutor). Я должен попробовать правильный синтаксис, а затем добавлю пример в ответ выше. - person stefan.schwetschke; 09.01.2014
comment
Выглядит неплохо! Что касается одной неудачной вставки, я думаю, вам нужно добавить больше журналов. Основываясь на предоставленных данных, я понятия не имею, почему это не удалось. - person stefan.schwetschke; 09.01.2014

Это не похоже на что-то конкретное для Tinkerpop, это похоже на распространенную ошибку, связанную с использованием фьючерсов. Просто рассмотрите этот фрагмент:

try graphChanges finally { ... }

Само по себе это выглядит прекрасно, но я также вижу, что graphChanges здесь создает будущее. Так...

  • graphChanges инициирует будущее, мгновенно возвращаясь
  • блок try завершается, и выполняется блок finally
  • В какой-то момент непосредственно перед этим, или после, или, может быть, параллельно, но почти наверняка в другом потоке выполняется Future

Я бы посоветовал переместить асинхронную логику внутрь test, чтобы вы могли быть уверены в правильной привязке потоков и убедиться, что любые вызовы правильно помечены как blocking. Как это:

def test[T](graphChanges: => T): Future[T] = future {
  blocking {
    val tlocal = ODatabaseRecordThreadLocal.INSTANCE
    val dbrecord = tlocal.get

    try graphChanges finally {
      tlocal.set(dbrecord)
      GraphPool.get("partitioned").commit
    }
  }
}

// collect tinkerpop frames
test {
  // add changes to my tinkerpop frames
}
person Kevin Wright    schedule 02.01.2014