F# MailboxProcessor — несколько ожидающих продолжений чтения для почтового ящика

Я играю с написанием чего-то вроде действительно простой среды асинхронного тестирования. Но я думаю, что сталкиваюсь с каким-то ограничением или ошибкой. Извините, но я не смог воспроизвести это на меньшей кодовой базе.

Это базовая структура, которую я придумал:

module TestRunner
    open System

    type TestOptions = {
        Writer : ConsoleColor -> string -> unit}
    type TestResults = {
        Time : TimeSpan
        Failure : exn option
        }
    type Test = {
        Name : string
        Finished : IEvent<TestResults>
        SetFinished : TestResults -> unit
        TestFunc : TestOptions -> Async<TestResults> }

    let createTest name f =  
        let ev = new Event<TestResults>()
        {
            Name = name 
            Finished = ev.Publish
            SetFinished = (fun res -> ev.Trigger res)
            TestFunc = 
                (fun options -> async {
                    let watch = System.Diagnostics.Stopwatch.StartNew()
                    try
                        do! f options
                        watch.Stop()
                        return { Failure = None; Time = watch.Elapsed }
                    with exn ->
                        watch.Stop()
                        return { Failure = Some exn; Time = watch.Elapsed }
                    })}

    let simpleTest name f = 
        createTest name (fun options -> f options.Writer)

    /// Create a new Test and change the result
    let mapResult mapping test = 
        { test with
            TestFunc = 
                (fun options -> async {
                    let! result = test.TestFunc options
                    return mapping result})}

    let writeConsole color f = 
        let old = System.Console.ForegroundColor
        try
            System.Console.ForegroundColor <- color
            f()
        finally
            System.Console.ForegroundColor <- old

    let printColor color (text:String) = 
        writeConsole color (fun _ -> Console.WriteLine(text))


    type WriterMessage = 
        | NormalWrite of ConsoleColor * String
        | StartTask of AsyncReplyChannel<int> * String
        | WriteMessage of int * ConsoleColor * String
        | EndTask of int

    /// will handle printing jobs for two reasons
    /// 1. Nice output grouped by tests (StartTask,WriteMessage,EndTask)
    /// 2. Print Summary after all tests finished (NormalWrite)
    let writer = MailboxProcessor.Start (fun inbox -> 
        let currentTask = ref 0
        let newHandle (returnHandle:AsyncReplyChannel<int>) = 
            let handle = System.Threading.Interlocked.Increment currentTask
            returnHandle.Reply handle
            handle 

        // the tasks describe which tasks are currently waiting to be processed
        let rec loop tasks = async {
            let! newTasks =
                match tasks with
                /// We process the Task with the number t and the name name
                | (t, name) :: next -> 
                    inbox.Scan
                        (fun msg -> 
                            match msg with
                            | EndTask (endTask) -> 
                                // if the message is from the current task finish it
                                if t = endTask then
                                    Some (async { return next })
                                else None
                            | WriteMessage(writeTask, color, message) ->
                                if writeTask = t then 
                                    Some (async {
                                        printColor color (sprintf "Task %s: %s" name message)
                                        return tasks
                                    })
                                else None
                            | StartTask (returnHandle, name) -> 
                                // Start any tasks instantly and add them to the list (because otherwise they would just wait for the resonse)
                                Some (async { 
                                    let handle = newHandle returnHandle
                                    return (List.append tasks [handle, name]) })
                            | _ -> None)
                // No Current Tasks so just start ones or process the NormalWrite messages
                | [] ->
                    inbox.Scan     
                        (fun msg -> 
                            match msg with
                            | StartTask (returnHandle, name) -> 
                                Some (async { 
                                    let handle = newHandle returnHandle
                                    return [handle, name] })
                            | NormalWrite(color, message) ->
                                Some (async {
                                    printColor color message
                                    return []
                                })
                            | _ -> None)   

            return! loop newTasks 
        }
        loop [])

    /// Write a normal message via writer
    let writerWrite color (text:String) = 
        writer.Post(NormalWrite(color, text))

    /// A wrapper around the communication (to not miss EndTask for a StartTask)
    let createTestWriter name f = async {
        let! handle = writer.PostAndAsyncReply(fun reply -> StartTask(reply, name))
        try
            let writer color s = 
                writer.Post(WriteMessage(handle,color,s))
            return! f(writer)
        finally
            writer.Post (EndTask(handle))
        }
    /// Run the given test and print the results
    let testRun t = async {
        let! results = createTestWriter t.Name (fun writer -> async {
            writer ConsoleColor.Green (sprintf "started")
            let! results = t.TestFunc { Writer = writer }
            match results.Failure with
            | Some exn -> 
                writer ConsoleColor.Red (sprintf "failed with %O" exn)
            | None ->
                writer ConsoleColor.Green (sprintf "succeeded!")
            return results}) 
        t.SetFinished results
        }
    /// Start the given task with the given amount of workers
    let startParallelMailbox workerNum f = 
        MailboxProcessor.Start(fun inbox ->
            let workers = Array.init workerNum (fun _ -> MailboxProcessor.Start f)
            let rec loop currentNum = async {
                let! msg = inbox.Receive()
                workers.[currentNum].Post msg
                return! loop ((currentNum + 1) % workerNum)
            }
            loop 0 )
    /// Runs all posted Tasks
    let testRunner = 
        startParallelMailbox 10 (fun inbox ->
            let rec loop () = async {
                let! test = inbox.Receive()
                do! testRun test
                return! loop()
            }
            loop ())
    /// Start the given tests and print a sumary at the end
    let startTests tests = async {
        let! results =
            tests 
                |> Seq.map (fun t ->
                    let waiter = t.Finished |> Async.AwaitEvent
                    testRunner.Post t
                    waiter
                   )
                |> Async.Parallel
        let testTime = 
            results
                |> Seq.map (fun res -> res.Time)
                |> Seq.fold (fun state item -> state + item) TimeSpan.Zero
        let failed = 
            results
                |> Seq.map (fun res -> res.Failure) 
                |> Seq.filter (fun o -> o.IsSome)
                |> Seq.length
        let testCount = results.Length
        if failed > 0 then
            writerWrite ConsoleColor.DarkRed (sprintf "--- %d of %d TESTS FAILED (%A) ---" failed testCount testTime)
        else
            writerWrite ConsoleColor.DarkGray (sprintf "--- %d TESTS FINISHED SUCCESFULLY (%A) ---" testCount testTime)
        }

Теперь исключение запускается только тогда, когда я использую определенный набор тестов, которые выполняют сканирование в Интернете (некоторые терпят неудачу, а некоторые нет, что нормально):

#r @"Yaaf.GameMediaManager.Primitives.dll";; // See below
open TestRunner

let testLink link =
    Yaaf.GameMediaManager.EslGrabber.getMatchMembers link
    |> Async.Ignore

let tests = [
    // Some working links (links that should work)
    yield! 
      [ //"TestMatch", "http://www.esl.eu/eu/wire/anti-cheat/css/anticheat_test/match/26077222/"
        "MatchwithCheater", "http://www.esl.eu/de/csgo/ui/versus/match/3035028"
        "DeletedAccount", "http://www.esl.eu/de/css/ui/versus/match/2852106" 
        "CS1.6", "http://www.esl.eu/de/cs/ui/versus/match/2997440" 
        "2on2Versus", "http://www.esl.eu/de/css/ui/versus/match/3012767" 
        "SC2cup1on1", "http://www.esl.eu/eu/sc2/go4sc2/cup230/match/26964055/"
        "CSGO2on2Cup", "http://www.esl.eu/de/csgo/cups/2on2/season_08/match/26854846/"
        "CSSAwpCup", "http://www.esl.eu/eu/css/cups/2on2/awp_cup_11/match/26811005/"
        ] |> Seq.map (fun (name, workingLink) -> simpleTest (sprintf "TestEslMatches_%s" name) (fun o -> testLink workingLink))
    ]

startTests tests |> Async.Start;; // this will produce the Exception now and then

https://github.com/matthid/Yaaf.GameMediaManager/blob/core/src/Yaaf.GameMediaManager.Primitives/EslGrabber.fs — это код, и вы можете скачать https://github.com/downloads/matthid/Yaaf.GameMediaManager/GameMediaManager.%200.9.3.1.wireplugin (в основном это переименованный zip-архив) и распакуйте его, чтобы получить двоичный файл Yaaf.GameMediaManager.Primitives.dll (вы можете вставить его в FSI вместо загрузки, когда захотите, но тогда вам нужно будет сослаться на HtmlAgilityPack)

Я могу воспроизвести это с помощью Microsoft (R) F# 2.0 Interactive, Build 4.0.40219.1. Проблема в том, что Exception не будет запускаться всегда (но очень часто), и трассировка стека мне ничего не говорит.

System.Exception: multiple waiting reader continuations for mailbox
   bei <StartupCode$FSharp-Core>[email protected](AsyncParams`1 _arg11)
   bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
   bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   bei <StartupCode$FSharp-Core>.$Control.finishTask@1280[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 firstExn, T[] results, TrampolineHolder trampolineHolder, Int32 remaining)
   bei <StartupCode$FSharp-Core>.$Control.recordFailure@1302[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 count, FSharpRef`1 firstExn, T[] results, LinkedSubSource innerCTS, TrampolineHolder trampolineHolder, FSharpChoice`2 exn)
   bei <StartupCode$FSharp-Core>[email protected](Exception exn)
   bei Microsoft.FSharp.Control.AsyncBuilderImpl.protectedPrimitive@690.Invoke(AsyncParams`1 args)
   bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
   bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   bei <StartupCode$FSharp-Core>[email protected](Object state)
   bei System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   bei System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
   bei System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   bei System.Threading.ThreadPoolWorkQueue.Dispatch()
   bei System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

Поскольку это будет запущено в рабочем потоке, который я не могу контролировать, это приведет к сбою приложения (не FSI, но здесь также будет отображаться исключение).

Я нашел http://cs.hubfs.net/topic/Some/2/59152 и http://cs.hubfs.net/topic/None/59146, но Я не использую StartChild и не думаю, что каким-то образом вызываю получение из нескольких потоков одновременно?

Что-то не так с моим кодом или это действительно ошибка? Как я могу обойти это, если это возможно?

Я заметил, что в FSI все тесты будут выполняться, как и ожидалось, когда исключение игнорируется. Как я могу сделать то же самое?

РЕДАКТИРОВАТЬ: я заметил, что после того, как я исправил неудачные модульные тесты, он будет работать правильно. Однако я все еще не могу воспроизвести это с меньшей кодовой базой. Например, с моими неудачными тестами.

Спасибо, Матхид


person matthid    schedule 28.09.2012    source источник
comment
Агенты, агенты... везде! Это какой-то непостижимый код. Возможно, поставьте точку останова на каждый вызов Receive/Scan и посмотрите, не прерывается ли он каждый раз в одном и том же потоке.   -  person Daniel    schedule 28.09.2012
comment
Думаю, он должен разбиться на разные темы. Насколько я понимаю, ограничение заключается в том, что только один поток может вызывать Receive (его можно вызывать в разных потоках, но не одновременно?). В этом весь смысл асинхронности? Я отредактирую код и добавлю несколько комментариев.   -  person matthid    schedule 28.09.2012
comment
Я использую написанный мной код, реализующий альтернативу MailboxProcessor. Я думаю, это должно сработать для вас. Можете ли вы проверить это (самому лень это делать)? gist.github.com/3827112   -  person t0yv0    schedule 03.10.2012
comment
В нем нет реализации Scan? Итак, я должен реализовать свой собственный (что я мог бы сделать и на MailboxProcessor) или сделать то, что предложил 7sharp9?   -  person matthid    schedule 04.10.2012
comment
@reddragon, нет. Я не знаю, как это эффективно реализовать. Да, я бы попытался сделать то, что предложил 7sharp9, это также может немного упростить код. Могут ли работать несколько каналов? Я также не буду использовать события ..   -  person t0yv0    schedule 04.10.2012
comment
например, что-то вроде gist.github.com/3834976   -  person t0yv0    schedule 04.10.2012


Ответы (4)


Я чувствую, что ограничение будет внутри самого MailboxProcessor, а не в асинхронном режиме.

Честно говоря, я бы перестраховывался с функциями сканирования. Я написал сообщение в блоге об опасностях их использования.

Можно ли обрабатывать задачи стандартным механизмом приема, а не функциями сканирования?

В качестве примечания: внутри async есть трамплин, который используется для повторного использования одного и того же потока заданное количество раз, чтобы избежать ненужного использования пула потоков (я думаю, что это установлено на 300), поэтому при отладке вы можете увидеть это поведение.

Я бы подошел к этой проблеме немного по-другому, разложив отдельные компоненты на этапы конвейера, а не на вложенные асинхронные блоки. Я бы создал компонент супервизора и компонент маршрутизации.

Супервизор следил за первоначальными тестами и отправлял сообщения компоненту маршрутизации, который циклически распределял запросы к другим агентам. Когда задачи будут выполнены, они могут отправить обратно руководителю.

Я понимаю, что это на самом деле не помогает решить проблему в текущем коде, но я думаю, что вам все равно придется разложить проблему, чтобы отлаживать асинхронные части системы.

person 7sharp9    schedule 03.10.2012
comment
Вы указываете в своем блоге на проблему с производительностью и на то, что тайм-аут сбрасывается (или я что-то пропустил?). Ни один из них не должен сильно повредить в моем сценарии. Я попробую ваше предложение с руководителем, но мне показалось, что Сканирование было правильным решением в моем сценарии! - person matthid; 03.10.2012
comment
Мне просто не нравится реализация Scan, я бы предпочел перепроектировать альтернативу, чем доверять ей. Я думаю, вам было бы полезно разбить проблему дальше, в любом случае это облегчит отладку. - person 7sharp9; 08.10.2012
comment
Насколько я понимаю, Scan - это в основном способ актеров Erlang. У меня есть собственная реализация асинхронного компоновщика, которая позволяет мне очень легко отслеживать вещи, однако это, похоже, выходит из-под моего контроля. Я также не думаю, что этот код настолько непостижим, как все говорят (если вы думаете об этом с более высокого уровня). К сожалению, сейчас у меня недостаточно времени, чтобы внести много изменений, но я обязательно посмотрю на это. - person matthid; 09.10.2012
comment
У меня есть сообщение в блоге, наполовину написанное о различиях между агентами Erlang/F#. Выборочный прием — это механизм, используемый в Erlang. При выборочном приеме несопоставленные сообщения остаются в стороне до успешного приема, после чего несопоставленные сообщения затем перемещаются обратно в основной буфер в том же порядке. - person 7sharp9; 10.10.2012

Я действительно считаю, что в реализации 2.0 Scan/TryScan/Receive была ошибка, которая могла ложно вызвать

multiple waiting reader continuations for mailbox 

исключение; Я думаю, что эта ошибка исправлена ​​в реализации 3.0. Я не внимательно просмотрел ваш код, чтобы убедиться, что вы пытаетесь получить только одно сообщение за раз в своей реализации, поэтому также возможно, что это может быть ошибка в вашем коде. Если вы можете попробовать это против F # 3.0, было бы здорово узнать, исчезнет ли это.

person Brian    schedule 28.09.2012
comment
Спасибо за Ваш ответ. Я протестировал его с выпуском F# 3.0 (Microsoft (R) F# Interactive, версия 11.0.50727.1), и ситуация на самом деле еще хуже. Исключение срабатывает чаще. Однако я все равно должен оставаться на CLR2 (.net3.5). - person matthid; 28.09.2012

К сожалению, я никогда не мог воспроизвести это на меньшей базе кода, и теперь я бы использовал NUnit с поддержкой асинхронного тестирования вместо своей собственной реализации. С тех пор я использовал агенты (MailboxProcessor) и асинхронные процессы в различных проектах и ​​больше никогда с этим не сталкивался...

person matthid    schedule 01.01.2015

Некоторые примечания на случай, если кому-то мой опыт будет полезен (потребовалось много времени на отладку нескольких процессов, чтобы найти проблему):

Выполнение и пропускная способность начали забиваться всего 50 агентами/почтовыми ящиками. Иногда при небольшой нагрузке это работало для первого раунда сообщений, но что-то столь же значительное, как вызов библиотеки журналов, вызывало более длительную задержку.

Отладка с использованием окна Threads/Parallel Stacks в VS IDE, среда выполнения ожидает результатов вызова FSharpAsync.RunSynchronously -> CancellationTokenOps.RunSynchronally с помощью Trampoline.ExecuteAction.

Я подозреваю, что базовый ThreadPool тормозит запуск (после первого раза кажется, что он работает нормально). Это очень большая задержка. Я использую агенты для сериализации в определенных очередях второстепенных вычислений, позволяя основному диспетчерскому агенту оставаться на связи, поэтому задержка происходит где-то в CLR.

Я обнаружил, что запуск MailboxProcessor Receive с тайм-аутом в рамках попытки с останавливает задержку, но это необходимо обернуть в асинхронный блок, чтобы остальная часть программы не замедлялась, какой бы короткой ни была задержка. Несмотря на небольшую возню, я очень доволен F# MailboxProcessor для реализации модели акторов.

person Anthony    schedule 19.09.2013