Помогите с алгоритмом объединения векторов

Мне нужен очень быстрый алгоритм для следующей задачи. Я уже реализовал несколько алгоритмов, которые его завершают, но все они слишком медленные для нужной мне производительности. Он должен быть достаточно быстрым, чтобы алгоритм мог запускаться не менее 100 000 раз в секунду на современном процессоре. Он будет реализован на C++.

Я работаю с промежутками/диапазонами, структурой, которая имеет начальную и конечную координаты на линии.

У меня есть два вектора (динамические массивы) диапазонов, и мне нужно их объединить. Один вектор — это src, а другой — dst. Векторы сортируются по начальным координатам отрезка, а отрезки не перекрываются в пределах одного вектора.

Промежутки в векторе src должны быть объединены с промежутками в векторе dst, чтобы результирующий вектор все еще был отсортирован и не имел перекрытий. Т.е. если во время слияния обнаруживаются перекрытия, два пролета объединяются в один. (Объединение двух пролетов — это всего лишь вопрос изменения координат в структуре.)

Теперь есть еще одна загвоздка: промежутки в векторе src должны быть «расширены» во время слияния. Это означает, что константа будет добавлена ​​к началу и другая (более крупная) константа к конечной координате каждого интервала в src. Это означает, что после расширения диапазонов src они могут перекрываться.


На данный момент я пришел к выводу, что это невозможно сделать полностью на месте, необходимо какое-то временное хранилище. Я думаю, что это должно быть выполнимо за линейное время по количеству суммированных элементов src и dst.

Любое временное хранилище, вероятно, может быть разделено между несколькими запусками алгоритма.

Два основных подхода, которые я пробовал, являются слишком медленными:

  1. Добавьте все элементы src к dst, расширяя каждый элемент перед его добавлением. Затем запустите сортировку на месте. Наконец, выполните итерацию по результирующему вектору, используя указатель «чтение» и «запись», при этом указатель чтения опережает указатель записи, объединяя диапазоны по мере их прохождения. Когда все элементы объединены (указатель чтения достигает конца), dst усекается.

  2. Создайте временный рабочий вектор. Выполните наивное слияние, как описано выше, многократно выбирая следующий элемент либо из src, либо из dst и объединяя его с рабочим вектором. Когда закончите, скопируйте рабочий вектор в dst, заменив его.

Проблема первого метода заключается в том, что сортировка выполняется за O((m+n)*log(m+n)) вместо O(m+n) и имеет некоторые накладные расходы. Это также означает, что вектор dst должен вырасти намного больше, чем ему действительно нужно.

Второй имеет основную проблему большого количества копий и снова выделения/освобождения памяти.

Структуры данных, используемые для хранения/управления диапазонами/векторами, могут быть изменены, если вы считаете, что это необходимо.

Обновление: забыл сказать, насколько велики наборы данных. Наиболее распространенными случаями являются от 4 до 30 элементов в любом векторе, и либо dst пуст, либо существует большое количество перекрытий между интервалами в src и dst.


person jfs    schedule 18.09.2008    source источник


Ответы (9)


Мы знаем, что время выполнения в абсолютном лучшем случае — O(m+n). Это связано с тем, что вам, по крайней мере, нужно просмотреть все данные, чтобы иметь возможность объединить списки. Учитывая это, ваш второй метод должен дать вам такой тип поведения.

Вы профилировали свой второй метод, чтобы выяснить, каковы узкие места? Вполне возможно, что в зависимости от объема данных, о которых вы говорите, на самом деле невозможно сделать то, что вы хотите, за указанное количество времени. Один из способов проверить это — сделать что-то простое, например, просуммировать все начальные и конечные значения интервалов в каждом векторе в цикле и засечь время. По сути, здесь вы выполняете минимальный объем работы для каждого элемента в векторах. Это даст вам базовый уровень для наилучшей производительности, которую вы можете ожидать.

Кроме того, вы можете избежать копирования векторов поэлементно, используя метод подкачки stl, и вы можете предварительно выделить временный вектор до определенного размера, чтобы избежать запуска расширения массива при объединении элементов.

Вы можете рассмотреть возможность использования 2 векторов в своей системе, и всякий раз, когда вам нужно выполнить слияние, вы объединяете их с неиспользуемым вектором, а затем меняете местами (это похоже на двойную буферизацию, используемую в графике). Таким образом, вам не нужно перераспределять векторы каждый раз, когда вы выполняете слияние.

Тем не менее, вам лучше сначала профилировать и выяснить, в чем ваше узкое место. Если выделения минимальны по сравнению с фактическим процессом слияния, вам нужно выяснить, как сделать это быстрее.

Некоторые возможные дополнительные ускорения могут быть получены за счет прямого доступа к необработанным данным векторов, что позволяет избежать проверок границ при каждом доступе к данным.

person Ben Childs    schedule 18.09.2008
comment
Спасибо, что напомнили мне о std::swap, это может на самом деле нарушить сделку. Я вернусь после тестирования этого;) - person jfs; 18.09.2008

Сортировка, которую вы упоминаете в подходе 1, может быть уменьшена до линейного времени (от логарифмически-линейного, как вы его описываете), потому что два входных списка уже отсортированы. Просто выполните шаг слияния сортировки слиянием. При соответствующем представлении входных векторов диапазона (например, односвязных списков) это можно сделать на месте.

http://en.wikipedia.org/wiki/Merge_sort

person Community    schedule 18.09.2008

Как насчет второго метода без повторного выделения — другими словами, выделить временный вектор один раз и больше никогда его не размещать? Или, если входные векторы достаточно малы (но не постоянного размера), просто используйте alloca вместо malloc.

Кроме того, с точки зрения скорости вы можете убедиться, что ваш код использует CMOV для сортировки, поскольку, если код действительно разветвляется для каждой отдельной итерации сортировки слиянием:

if(src1[x] < src2[x])
    dst[x] = src1[x];
else
    dst[x] = src2[x];

Предсказание ветвления будет неудачным в 50% случаев, что сильно скажется на производительности. Условное перемещение, вероятно, будет работать намного лучше, поэтому убедитесь, что компилятор делает это, а если нет, попытайтесь уговорить его сделать это.

person Dark Shikari    schedule 18.09.2008

я не думаю, что строго линейное решение возможно, потому что расширение диапазонов векторов src может в худшем случае привести к их перекрытию (в зависимости от величины константы, которую вы добавляете)

проблема может быть в реализации, а не в алгоритме; я бы предложил профилировать код для ваших предыдущих решений, чтобы увидеть, где тратится время

рассуждение:

для действительно «современного» процессора, такого как Intel Core 2 Extreme QX9770, работающего на частоте 3,2 ГГц, можно ожидать около 59 455 MIPS.

для 100 000 векторов вам пришлось бы обрабатывать каждый вектор в 594 550 инструкциях. Это МНОГО инструкций.

ссылка: Википедия MIPS

кроме того, обратите внимание, что добавление константы к векторным диапазонам src не отменяет их сортировку, поэтому вы можете нормализовать векторные диапазоны src независимо, а затем объединить их с векторными диапазонами dst; это должно уменьшить рабочую нагрузку вашего исходного алгоритма

person Steven A. Lowe    schedule 18.09.2008
comment
100k на самом деле может быть недостатком, я действительно не рассчитывал число. Кроме того, когда я говорил о современных процессорах, я на самом деле имел в виду что-то до 5 лет, Athlon XP 3000+ не является нереалистичной целью. - person jfs; 18.09.2008

1 правильно - полная сортировка медленнее, чем слияние двух отсортированных списков.

Итак, вы смотрите на настройку 2 (или что-то совершенно новое).

Если вы измените структуры данных на двусвязные списки, то вы сможете объединить их в постоянном рабочем пространстве.

Используйте распределитель кучи фиксированного размера для узлов списка, чтобы уменьшить использование памяти для каждого узла и повысить вероятность того, что узлы будут расположены близко друг к другу в памяти, что уменьшит количество пропущенных страниц.

Вы можете найти код в Интернете или в своей любимой книге алгоритмов, чтобы оптимизировать слияние связанных списков. Вы захотите настроить это, чтобы объединить диапазоны одновременно со слиянием списка.

Чтобы оптимизировать слияние, сначала обратите внимание, что для каждой серии значений, исходящих с одной стороны, без прихода другой стороны, вы можете вставить всю серию в список dst за один раз, вместо того, чтобы вставлять каждый узел по очереди. И вы можете сэкономить одну запись на вставку по сравнению с обычной операцией со списком, оставив конец «висячим», зная, что вы исправите его позже. И при условии, что вы не выполняете удаления где-либо еще в своем приложении, список может быть односвязным, что означает одну запись на узел.

Что касается времени выполнения 10 микросекунд - это зависит от n и m...

person Steve Jessop    schedule 18.09.2008

Я всегда буду сортировать свой вектор пролетов. Это делает реализацию алгоритмов НАМНОГО проще — и их можно сделать за линейное время.

ОК, поэтому я бы отсортировал интервалы на основе:

  • минимум размаха в порядке возрастания
  • затем охват максимума в порядке убывания

Для этого вам нужно создать функцию.

Затем я бы использовал std::set_union для объединения векторов (вы можете объединить более одного, прежде чем продолжить).

Затем для каждого последовательного набора интервалов с одинаковыми минимумами вы сохраняете первый и удаляете остальные (они являются поддиапазонами первого интервала).

Затем вам нужно объединить ваши промежутки. Это должно быть довольно выполнимо сейчас и осуществимо в линейное время.

ОК, вот в чем хитрость. Не пытайтесь сделать это на месте. Используйте один или несколько временных векторов (и зарезервируйте достаточно места заранее). Затем, в конце, вызовите std::vector::swap, чтобы поместить результаты в выбранный вами входной вектор.

Надеюсь, этого достаточно, чтобы вы начали.

person Kevin    schedule 18.09.2008

Какова ваша целевая система? Он многоядерный? Если это так, вы можете рассмотреть многопоточность этого алгоритма

person basszero    schedule 18.09.2008
comment
Моя целевая система — это настольная система последних 5 лет или около того, я ничего не могу сказать о поддержке SMP или наборах инструкций SIMD. (Ну, я могу предположить MMX на x86, но это все.) - person jfs; 19.09.2008

Я написал новый контейнерный класс именно для этого алгоритма, заточенный под нужды. Это также дало мне возможность настроить другой код вокруг моей программы, которая в то же время немного увеличила скорость.

Это значительно быстрее, чем старая реализация с использованием векторов STL, но в остальном это было то же самое. Но, несмотря на то, что он быстрее, он все еще недостаточно быстр... к сожалению.

Профилирование больше не показывает, что является реальным узким местом. Профилировщик MSVC, кажется, иногда возлагает «вину» на неправильные вызовы (предположительно, идентичные запуски назначают сильно разное время выполнения), и большинство вызовов объединяются в одну большую щель.

Глядя на дизассемблирование сгенерированного кода, видно, что в сгенерированном коде очень много скачков, я думаю, что это может быть основной причиной медлительности сейчас.

class SpanBuffer {
private:
    int *data;
    size_t allocated_size;
    size_t count;

    inline void EnsureSpace()
    {
        if (count == allocated_size)
            Reserve(count*2);
    }

public:
    struct Span {
        int start, end;
    };

public:
    SpanBuffer()
        : data(0)
        , allocated_size(24)
        , count(0)
    {
        data = new int[allocated_size];
    }

    SpanBuffer(const SpanBuffer &src)
        : data(0)
        , allocated_size(src.allocated_size)
        , count(src.count)
    {
        data = new int[allocated_size];
        memcpy(data, src.data, sizeof(int)*count);
    }

    ~SpanBuffer()
    {
        delete [] data;
    }

    inline void AddIntersection(int x)
    {
        EnsureSpace();
        data[count++] = x;
    }

    inline void AddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);
        EnsureSpace();
        data[count] = s;
        data[count+1] = e;
        count += 2;
    }

    inline void Clear()
    {
        count = 0;
    }

    inline size_t GetCount() const
    {
        return count;
    }

    inline int GetIntersection(size_t i) const
    {
        return data[i];
    }

    inline const Span * GetSpanIteratorBegin() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data);
    }

    inline Span * GetSpanIteratorBegin()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data);
    }

    inline const Span * GetSpanIteratorEnd() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data+count);
    }

    inline Span * GetSpanIteratorEnd()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data+count);
    }

    inline void MergeOrAddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);

        if (count == 0)
        {
            AddSpan(s, e);
            return;
        }

        int *lastspan = data + count-2;

        if (s > lastspan[1])
        {
            AddSpan(s, e);
        }
        else
        {
            if (s < lastspan[0])
                lastspan[0] = s;
            if (e > lastspan[1])
                lastspan[1] = e;
        }
    }

    inline void Reserve(size_t minsize)
    {
        if (minsize <= allocated_size)
            return;

        int *newdata = new int[minsize];

        memcpy(newdata, data, sizeof(int)*count);

        delete [] data;
        data = newdata;

        allocated_size = minsize;
    }

    inline void SortIntersections()
    {
        assert((count & 1) == 0);
        std::sort(data, data+count, std::less<int>());
        assert((count & 1) == 0);
    }

    inline void Swap(SpanBuffer &other)
    {
        std::swap(data, other.data);
        std::swap(allocated_size, other.allocated_size);
        std::swap(count, other.count);
    }
};


struct ShapeWidener {
    // How much to widen in the X direction
    int widen_by;
    // Half of width difference of src and dst (width of the border being produced)
    int xofs;

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
    SpanBuffer buffer;

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

    ShapeWidener(int _xofs) : xofs(_xofs) { }
};


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
    if (src.GetCount() == 0) return;
    if (src.GetCount() + dst.GetCount() == 0) return;

    assert((src.GetCount() & 1) == 0);
    assert((dst.GetCount() & 1) == 0);

    assert(buffer.GetCount() == 0);

    dst.Swap(buffer);

    const int widen_s = xofs - widen_by;
    const int widen_e = xofs + widen_by;

    size_t resta = src.GetCount()/2;
    size_t restb = buffer.GetCount()/2;
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

    while (resta > 0 || restb > 0)
    {
        if (restb == 0)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else if (resta == 0)
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
        else if (spa->start < spb->start)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
    }

    buffer.Clear();
}
person Community    schedule 18.09.2008
comment
Попробуйте использовать deque вместо вектора. Deque выделяет меньше памяти за счет отсутствия непрерывной памяти. - person moswald; 18.09.2008

Если ваша последняя реализация по-прежнему недостаточно быстра, вам, возможно, придется рассмотреть альтернативные подходы.

Для чего вы используете выходные данные этой функции?

person tfinniga    schedule 18.09.2008
comment
Ну, вы упускаете одну вещь, здесь не одна «дельта», а две. Слева и справа от диапазона добавляются разные значения, в частности, справа добавляется большее значение, чем слева. - person jfs; 18.09.2008