SEGFAULT при использовании shared_ptr

Я пытаюсь реализовать набор Lazy Concurrent List-based Set на C++, используя shared_ptr. Я полагаю, что unreachable nodes будет автоматически освобожден последним shared_ptr. Насколько я понимаю, операции увеличения и уменьшения для shared_ptr's reference count являются атомарными. Это означает, что только последний shared_ptr со ссылкой на узел должен вызывать delete/free для этого узла. Я запустил программу для нескольких потоков, но моя программа аварийно завершает работу с ошибкой double free called или просто с ошибкой сегментации (SIGSEGV). Я не понимаю, как это возможно. Ниже приведен мой код для реализации с именами методов, обозначающими их предполагаемую операцию.

#include<thread>
#include<iostream>
#include<mutex>
#include<climits>

using namespace std;

class Thread
{
   public:
      std::thread t;  
};
int n=50,ki=100,kd=100,kc=100;`/*no of threads, no of inserts,deletes & searches*/`


class Node
{
public:
      int key;
      shared_ptr<Node> next;
      bool marked;
      std::mutex nodeLock;

      Node() {
         key=0;
         next = nullptr;
         marked = false;
      }

      Node(int k) {
         key = k;
         next = nullptr;
         marked = false;
      }

      void lock() {
         nodeLock.lock();
      }

      void unlock() {
         nodeLock.unlock();
      }

      ~Node()
      {
      }
};

class List {
   shared_ptr<Node> head;
   shared_ptr<Node> tail;

public:

   bool validate(shared_ptr<Node> pred, shared_ptr<Node> curr) {
      return !(pred->marked) && !(curr->marked) && ((pred->next) == curr);
   }

   List() {
      head=make_shared<Node>(INT_MIN);
      tail=make_shared<Node>(INT_MAX);
      head->next=tail;
   }

   bool add(int key)
   {
      while(true)
      {
         /*shared_ptr<Node> pred = head;
         shared_ptr<Node> curr = pred->next;*/
        auto pred = head;
        auto curr = pred->next;

         while (key>(curr->key))
         {
            pred = curr;
            curr = curr->next;
         }

         pred->lock();
         curr->lock();

         if (validate(pred,curr))
         {
            if (curr->key == key)
            {
               curr->unlock();
               pred->unlock();
               return false;
            }
            else
            {
                shared_ptr<Node> newNode(new Node(key));
               //auto newNode = make_shared<Node>(key);
                //shared_ptr<Node> newNode = make_shared<Node>(key);
                newNode->next = curr;
                pred->next = newNode;
                curr->unlock();
                pred->unlock();
                return true;
            }
         }
         curr->unlock();
         pred->unlock();
      }
   }

   bool remove(int key)
   {
      while(true)
      {
         /*shared_ptr<Node> pred = head;
         shared_ptr<Node> curr = pred->next;*/

        auto pred = head;
        auto curr = pred->next;

         while (key>(curr->key))
         {
            pred = curr;
            curr = curr->next;
         }

         pred->lock();
         curr->lock();

         if (validate(pred,curr))
         {
            if (curr->key != key)
            {
               curr->unlock();
               pred->unlock();
               return false;
            }
            else
            {
               curr->marked = true;
               pred->next = curr->next;
               curr->unlock();
               pred->unlock();
               return true;
            }
         }
         curr->unlock();
         pred->unlock();
      }
   }

   bool contains(int key) {
      //shared_ptr<Node> curr = head->next;
    auto curr = head->next;

      while (key>(curr->key)) {
         curr = curr->next;
      }
      return curr->key == key && !curr->marked;
   }
}list;

void test(int curr)
{
   bool test;
    int time;

    int val, choice;
    int total,k=0;
    total=ki+kd+kc;

    int i=0,d=0,c=0;

    while(k<total)
    {
        choice = (rand()%3)+1;

        if(choice==1)
        {
            if(i<ki)
            {
                val = (rand()%99)+1;
                test = list.add(val);
                i++;
                k++;
            }
        }
        else if(choice==2)
        {
            if(d<kd)
            {
                val = (rand()%99)+1;
                test = list.remove(val);
                d++;
                k++;
            }
        }
        else if(choice==3)
        {
            if(c<kc)
            {
                val = (rand()%99)+1;
                test = list.contains(val);
                c++;
                k++;
            }
        }
    }
}

int main()
{
   int i;

   vector<Thread>thr(n);

   for(i=0;i<n;i++)
   {
      thr[i].t = thread(test,i+1);
   }
   for(i=0;i<n;i++)
   {
      thr[i].t.join();
   }
   return 0;
}

Я не могу понять, что не так с приведенным выше кодом. Ошибки каждый раз разные, некоторые из них просто SEGFAULTS или

pure virtual method called
terminate called without an active exception
Aborted (core dumped)

Не могли бы вы указать, что я делаю неправильно в приведенном выше коде? И как исправить эту ошибку?
EDIT: Добавлен очень грубый test function, который случайным образом вызывает три list methods. Кроме того, количество потоков и количество каждой операции объявляются глобально. Грубое программирование, но оно воссоздает SEGFAULT.


person Dee Jay    schedule 10.02.2018    source источник
comment
Что касается использования shared_ptr для этого, круто. Я приветствую эксперименты.   -  person user4581301    schedule 11.02.2018
comment
@ user4581301, должен ли я открыть еще один вопрос с полным кодом, включая main? Или просто написать в комментариях?   -  person Dee Jay    schedule 11.02.2018
comment
@DeeJay Отредактируйте свой текущий пост с помощью функции main. Нет необходимости начинать еще один вопрос. Также не размещайте код в комментариях.   -  person PaulMcKenzie    schedule 11.02.2018
comment
@ user4581301, Нет, но это основная идея этого списка. Как только вы заблокируете pred & curr, будет вызвана функция проверки для обнаружения любых конфликтов синхронизации. validate проверяет, не был ли отмечен pred & curr и pred все еще указывает на curr. Даже если pred или curr будут удалены, они будут отмечены первыми (метод удаления). Отсюда и название списка Lazy Synchronization.   -  person Dee Jay    schedule 11.02.2018
comment
@ user4581301 добавил необходимые правки кода.   -  person Dee Jay    schedule 11.02.2018
comment
@PaulMcKenzie добавил необходимые правки кода.   -  person Dee Jay    schedule 11.02.2018


Ответы (1)


Проблема в том, что вы не используете атомарное хранилище и операции загрузки для ваших shared_ptrs.

Это правда, что счетчик ссылок в блоке управления (на который имеет указатель каждый shared_ptr, участвующий во владении конкретным общим объектом) shared_ptr является атомарным, однако элементы данных самого shared_ptr не являются.

Таким образом, безопасно иметь несколько потоков, каждый со своим собственным shared_ptr для общего объекта, но это не спасает, когда несколько потоков обращаются к одному и тому же shared_ptr, как только хотя бы один из них использует неконстантную функцию-член, которая что вы делаете при переназначении указателя next.

Иллюстрация проблемы

Давайте посмотрим на (упрощенный и улучшенный) конструктор копирования реализации shared_ptr в libstdc++:

shared_ptr(const shared_ptr& rhs)
 : m_ptr(rhs.m_ptr),
   m_refcount(rhs.m_refcount) 
{ }

Здесь m_ptr — это просто необработанный указатель на общий объект, а m_refcount — это класс, который выполняет подсчет ссылок, а также обрабатывает возможное удаление объекта, на который указывает m_ptr.

Только один пример того, что может пойти не так: предположим, что в данный момент поток пытается выяснить, содержится ли конкретный ключ в списке. Он начинается с инициализации копирования auto curr = head->next в List::contains. Сразу после того, как ему удалось инициализировать curr.m_ptr, планировщик ОС решает, что этот поток должен быть приостановлен и запланирован в другом потоке.

Этот другой поток удаляет преемника head. Поскольку счетчик ссылок head->next по-прежнему равен 1 (в конце концов, счетчик ссылок head->next еще не был изменен потоком 1), когда второй поток завершает удаление узла, он удаляется.

Затем некоторое время спустя первый поток продолжается. Он завершает инициализацию curr, но поскольку m_ptr уже был инициализирован до того, как поток 2 начал удаление, он по-прежнему указывает на теперь удаленный узел. При попытке сравнения key > curr->key поток 1 получит доступ к недопустимой памяти.

Использование std::atomic_load и std::atomic_store для предотвращения проблемы

std::atomic_load и std::atomic_store предотвращают возникновение проблемы, блокируя мьютекс перед вызовом конструктора копирования/оператора присваивания копирования shared_ptr, который передается указателем. Если все операции чтения и записи в shared_ptrs, которые совместно используются несколькими потоками, осуществляются через std::atomic_load/std::atomic_store соответственно. никогда не может случиться так, что один поток изменил только m_ptr, но не счетчик ссылок, в то время, когда другой поток начинает читать или изменять тот же самый shared_ptr.

С необходимыми атомарными загрузками и сохранениями функции-члены List должны выглядеть следующим образом:

bool validate(Node const& pred, Node const& curr) {
   return !(pred.marked) && !(curr.marked) && 
          (std::atomic_load(&pred.next).get() == &curr);
}

bool add(int key) {
    while (true) {
        auto pred = std::atomic_load(&head);
        auto curr = std::atomic_load(&pred->next);

        while (key > (curr->key)) {
            pred = std::move(curr);
            curr = std::atomic_load(&pred->next);
        }

        std::scoped_lock lock{pred->nodeLock, curr->nodeLock};
        if (validate(*pred, *curr)) {
            if (curr->key == key) {
                return false;
            } else {
                auto new_node = std::make_shared<Node>(key);

                new_node->next = std::move(curr);
                std::atomic_store(&pred->next, std::move(new_node));
                return true;
            }
        }
    }
}

bool remove(int key) {
    while (true) {
        auto pred = std::atomic_load(&head);
        auto curr = std::atomic_load(&pred->next);

        while (key > (curr->key)) {
            pred = std::move(curr);
            curr = std::atomic_load(&pred->next);
        }

        std::scoped_lock lock{pred->nodeLock, curr->nodeLock};
        if (validate(*pred, *curr)) {
            if (curr->key != key) {
                return false;
            } else {
                curr->marked = true;
                std::atomic_store(&pred->next, std::atomic_load(&curr->next));
                return true;
            }
        }
    }
}

bool contains(int key) {
    auto curr = std::atomic_load(&head->next);

    while (key > (curr->key)) {
        curr = std::atomic_load(&curr->next);
    }
    return curr->key == key && !curr->marked;
}

Кроме того, вы также должны сделать Node::marked std::atomic_bool.

person Corristo    schedule 11.02.2018
comment
Хорошо... Я рассуждал так: в тот момент (моментально) поток обнаруживает объект (используя shared_ptr ), счетчик ссылок будет увеличиваться атомарно. Я предположил, что оба вышеупомянутых шага, то есть чтение следующего указателя и приращение ref_count, будут одним атомарным шагом. Таким образом, ни один поток не сможет освободить узел, если другой поток сначала получит ссылку (прочитает следующий указатель). - person Dee Jay; 11.02.2018
comment
@DeeJay Проблема в том, что для создания копии shared_ptr всегда нужно делать как минимум две вещи: увеличивать счетчик ссылок в блоке управления, а затем копировать указатель в блок управления. Предположим, что сначала изменяется счетчик ссылок. Тогда у вас может быть следующий сценарий: один поток собирается переназначить next ptr. Он уже уменьшил счетчик ссылок старого объекта и заметил, что счетчик ссылок равен нулю. Перед фактическим удалением объекта этот поток запланирован. - person Corristo; 11.02.2018
comment
Появляется другой поток и теперь копирует этот файл shared_ptr. Итак, он продолжает и увеличивает счетчик ссылок и копирует указатель на блок управления. Затем первый поток возобновляет работу и удаляет указатель и, возможно, также блок управления и заменяет его новым указателем и блоком управления. В любом случае копия shared_ptr второго потока теперь указывает на уже освобожденную область памяти. - person Corristo; 11.02.2018
comment
Итак, как использование atomics решает эту проблему? Похоже, мне нужно более подробно прочитать о atomic и shared_ptr. - person Dee Jay; 11.02.2018
comment
@DeeJay std::atomic_load и std::atomic_store блокируют мьютекс, так что во время выполнения atomic_load не может быть одновременного atomic_store к (или другому std::atomic_load от) того же shared_ptr. - person Corristo; 11.02.2018
comment
не могли бы вы уточнить это утверждение? счетчик ссылок в управляющем блоке shared_ptr является атомарным, однако члены данных самого shared_ptr не являются атомарными - person Dee Jay; 11.02.2018
comment
@DeeJay Я расширил свой ответ, чтобы показать проблему на примере выполнения двух потоков. - person Corristo; 11.02.2018
comment
Требуется ли для этого поддержка С++ 17? Потому что я получаю много ошибок из этого кода. no matching function for call to ‘atomic_load(const std::shared_ptr<Node>*) - person Dee Jay; 12.02.2018
comment
@DeeJay: единственные функции C++17, которые я использовал, — это std::scoped_lock (в качестве замены ручных вызовов Node::lock и Node::unlock), а также вывод аргументов шаблона конструктора для std::scoped_locks. Но std::atomic_load и std::atomic_store — это функции C++11. - person Corristo; 12.02.2018
comment
Хорошо, но atomic_load и atomic_store не работают с shared_ptr. Я использую g++ 4.9.4 и C++11 , но получаю следующую ошибку: no matching function for call to ‘atomic_load(const std::shared_ptr<Node>*) - person Dee Jay; 12.02.2018
comment
Давайте продолжим обсуждение в чате. - person Dee Jay; 12.02.2018
comment
@DeeJay Очевидно, libstdc++ не полностью реализовывал STL C++11 до GCC 5.1. Но с этим я смог заставить его работать после замены каждого std::scoped_lock на два std::lock_guard. Вы можете увидеть результат здесь. Однако обратите внимание, что мне пришлось уменьшить количество потоков до 20, иначе у wandbox закончатся ресурсы. Конечно, вы можете легко скопировать/вставить код и протестировать его самостоятельно с исходным количеством потоков. - person Corristo; 12.02.2018
comment
Хорошо. Код сработал у меня после того, как я удалил scoped_lock и сделал блокировку и разблокировку вручную. Спасибо! :) - person Dee Jay; 13.02.2018
comment
Умные указатели @Corristo не имеют ничего общего со стандартной библиотекой шаблонов. - person curiousguy; 01.07.2018
comment
@curiousguy Что ты имеешь в виду? Я нигде не говорил об общих умных указателях, ни в своем ответе, ни в комментариях. - person Corristo; 01.07.2018
comment
@Corristo Отвечая на ваш комментарий Очевидно, libstdc++ не полностью реализовывал STL C++11 до GCC 5.1... - person curiousguy; 01.07.2018