Как написать транзакционную многопоточную службу WCF, использующую MSMQ

У меня есть служба WCF, которая отправляет сообщения в частную нетранзакционную очередь MSMQ. У меня есть еще одна служба WCF (многопоточная), которая обрабатывает сообщения MSMQ и вставляет их в базу данных.

Моя проблема связана с последовательностью. Я хочу, чтобы сообщения были в определенном порядке. Например, MSG-A необходимо перейти в базу данных перед вставкой MSG-B. Так что мое текущее решение для этого очень грубое и дорогое с точки зрения базы данных.

Я читаю сообщение, если его MSG-B и в базе данных нет MSG-A, я возвращаю его в очередь сообщений и продолжаю делать это до тех пор, пока MSG-A не будет вставлен в базу данных. Но это очень дорогая операция, так как она включает сканирование таблицы (SELECT stmt).

Сообщения всегда размещаются в очереди последовательно.

Если не считать того, что моя служба обработки очередей WCF является однопоточной (установив для атрибута поведения службы InstanceContextMode значение Single), может ли кто-нибудь предложить лучшее решение?

Спасибо

Дэн


person Community    schedule 30.05.2009    source источник
comment
Я немного сбит с толку, потому что в заголовке говорится, что очередь является транзакционной, а в описании проблемы говорится, что очередь не является транзакционной. Если это транзакция, вы, вероятно, можете сделать что-то умное, просто отслеживая соответствие A и B в памяти и позволяя транзакции откатить их обратно в очередь, если что-то выйдет из строя или истечет время ожидания.   -  person Bruce    schedule 30.05.2009


Ответы (2)


Вместо того, чтобы сразу отправлять сообщения в БД после их извлечения из очереди, держите список ожидающих сообщений в памяти. Когда вы получаете A или B, проверьте, есть ли соответствующий вариант в списке. Если это так, отправьте их оба (в правильном порядке) в базу данных и удалите соответствующий из списка. В противном случае просто добавьте новое сообщение в этот список.

Если проверка совпадения слишком дорогая задача для сериализации - я предполагаю, что вы используете многопоточность по какой-то причине - вы можете использовать другой поток для обработки списка. Существующие несколько потоков читают, немедленно отправляют большинство сообщений в БД, но откладывают As и B в список (потокобезопасный). Фоновый поток просматривает этот список, находя совпадающие A и B, и когда он находит их, он отправляет их в правильном порядке (и удаляет их из списка).

Суть в том, что, поскольку вы удаляете элементы из очереди с несколькими потоками, вам придется где-то сериализоваться, чтобы обеспечить порядок. Хитрость заключается в том, чтобы свести к минимуму количество раз и продолжительность времени, которое вы проводите взаперти в последовательном коде.

Также может быть что-то, что вы могли бы сделать на уровне базы данных, с помощью триггеров или чего-то еще, чтобы изменить порядок записей при обнаружении этой ситуации. Боюсь, я недостаточно знаю программирование БД, чтобы помочь.

ОБНОВЛЕНИЕ: Предполагая, что сообщения содержат некоторый идентификатор, который позволяет связать сообщение «A» с правильным связанным сообщением «B», следующий код гарантирует, что A будет в базе данных перед B. Обратите внимание, что он не гарантирует, что они являются смежными записи в базе данных — между A и B могут быть другие сообщения. Кроме того, если по какой-то причине вы получаете A или B, но никогда не получаете совпадающее сообщение другого типа, этот код приведет к утечке памяти, поскольку он зависает на несовпадающем сообщении. навсегда.

(Вы можете выделить эти два «заблокированных» блока в одну подпрограмму, но я оставлю это так для ясности в отношении A и B.)

static private object dictionaryLock = new object();
static private Dictionary<int, MyMessage> receivedA = 
    new Dictionary<int, MyMessage>();
static private Dictionary<int, MyMessage> receivedB = 
    new Dictionary<int, MyMessage>();

public void MessageHandler(MyMessage message)
{
    MyMessage matchingMessage = null;
    if (IsA(message))
    {
        InsertIntoDB(message);
        lock (dictionaryLock)
        {
            if (receivedB.TryGetValue(message.id, out matchingMessage))
            {
                receivedB.Remove(message.id);
            }
            else
            {
                receivedA.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(matchingMessage);
        }
    }
    else if (IsB(message))
    {
        lock (dictionaryLock)
        {
            if (receivedA.TryGetValue(message.id, out matchingMessage))
            {
                receivedA.Remove(message.id);
            }
            else
            {
                receivedB.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(message);
        }
    }
    else
    {
        // not A or B, do whatever
    }
}
person Bruce    schedule 30.05.2009
comment
Спасибо за ответ Брюс. Очередь, которая у меня сейчас есть, не является транзакционной. Извините за путаницу. Я хочу искать в БД, так как это может оказаться очень дорогим. Однако я собираюсь попытаться сериализовать его в памяти. Но, как вы указали, это будет сложно, поскольку несколько потоков попытаются получить доступ к данным, извлеченным из очереди. Я думал о статическом словаре, где ключ может быть уникальным идентификатором, который я использую для идентификации сообщений. Видите ли вы какую-либо озабоченность по поводу Словаря? Есть ли лучший способ управлять этим списком? - person ; 01.06.2009
comment
Ознакомьтесь с потокобезопасными структурами в 4.0 Framework. Если вам неудобно выпускать бета-версию, это, по крайней мере, даст вам точку в правильном направлении. - person Adam Fyles; 01.06.2009
comment
Статический словарь не является потокобезопасным. Я обновлю свой ответ некоторым образцом кода. - person Bruce; 02.06.2009
comment
Прежде чем я попытаюсь написать код, не могли бы вы прояснить один момент? Вы думаете, что сообщение содержит уникальный идентификатор, и что A и B имеют одинаковый идентификатор? Итак, если вы получаете сообщения A1, A2, A3 и B1, B2, B3, вы знаете, что A1 идет с B1 и т. д.? Я надеюсь, что это так, потому что с несколькими потоками, получающими сообщения одновременно, я не знаю, как еще вы узнаете, какой A идет с каким B. - person Bruce; 02.06.2009

Если вы являетесь единственным клиентом этих очередей, вы можете очень просто добавить метку времени в качестве заголовка сообщения (см. IDesign образец) и также сохраните поле «Отправлено» (что-то вроде сообщения Outlook) в базе данных. Вы можете обрабатывать их в том порядке, в котором они были отправлены (в основном вы перемещаете логику сортировки во время потребления).

Надеюсь, это поможет, Адриан

person NotADba    schedule 09.07.2009