Синхронизация карты Hazelcast

Я пытаюсь реализовать распределенный кеш с помощью Hazelcast в своем приложении. Я использую IMap Hazelcast. У меня проблема в том, что каждый раз, когда я получаю значение с карты и обновляю значение, мне нужно снова делать put(key, value). Если у моего объекта значения 10 свойств, и мне нужно обновить все 10, мне нужно вызвать put(key, value) 10 раз. Что-то вроде -

IMap<Integer, Employee> mapEmployees = hz.getMap("employees");
Employee emp1 = mapEmployees.get(100);
emp1.setAge(30);
mapEmployees.put(100, emp1);
emp1.setSex(“F”);
mapEmployees.put(100, emp1);
emp1.setSalary(5000);
mapEmployees.put(100, emp1);

Если я этого не сделаю, какой-то другой узел, работающий с тем же объектом Employee, обновит его, и в конечном итоге объект employee не будет синхронизирован. Есть ли какое-нибудь решение, чтобы избежать явного многократного вызова put? В ConcurrentHashMap мне не нужно этого делать, потому что, если я изменяю объект, карта также обновляется.


person Ananth    schedule 19.06.2013    source источник
comment
Hazelcast дает вам клон объекта (поскольку он хранится в кластере как двоичный / сериализованный). Чтобы сделать обновления видимыми для других узлов / потоков, вы должны вернуть их. Кроме того, если ваш класс Employee не является потокобезопасным, JDK ConcurrentHashMap не может гарантировать видимость обновлений для других потоков (иногда даже если вы вернете его на карту). Так что в любом случае вам следует использовать механизм синхронизации.   -  person mdogan    schedule 19.06.2013


Ответы (4)


Начиная с версии 3.3 вы захотите использовать EntryProcessor:

Что вы действительно хотите здесь сделать, так это создать EntryProcessor<Integer, Employee> и вызвать его с помощью mapEmployees.executeOnKey( 100, new EmployeeUpdateEntryProcessor( new ObjectContainingUpdatedFields( 30, "F", 5000 ) );

Таким образом, Hazelcast обрабатывает блокировку карты на ключе для этого объекта Employee и позволяет вам запускать любой код в методе process() EntryProcessor атомарно, включая обновление значений на карте.

Таким образом, вы должны реализовать EntryProcessor с помощью специального конструктора, который принимает объект, содержащий все свойства, которые вы хотите обновить, а затем в process() вы создаете последний Employee объект, который окажется на карте, и выполните entry.setValue(). Не забудьте создать новый StreamSerializer для EmployeeUpdateEntryProcessor, который может сериализовать Employee объекты, чтобы вы не застряли с сериализацией java.io.

Источник: http://docs.hazelcast.org/docs/3.5/manual/html/entryprocessor.html

person Drew    schedule 26.10.2015

Возможно, вам нужна транзакция. Или вы можете взглянуть на распределенную блокировку.

Обратите внимание, что в вашем решении, если этот код выполняется двумя потоками, изменения, сделанные одним из них, будут перезаписаны.

person Piotr Gwiazda    schedule 19.06.2013

Это может вас заинтересовать.

Вы можете сделать что-то подобное для своего класса Employee (упрощенный код только с одной переменной экземпляра):

public final class Employee
    implements Frozen<Builder>
{
    private final int salary;

    private Employee(Builder builder)
    {
        salary = builder.salary;
    }

    public static Builder newBuilder()
    {
        return new Builder();
    }

    @Override
    public Builder thaw()
    {
        return new Builder(this);
    }

    public static final class Builder
        implements Thawed<Employee>
    {
        private int salary;

        private Builder()
        {
        }

        private Builder(Employee employee)
        {
            salary = employee.salary;
        }

        public Builder withSalary(int salary)
        {
            this.salary = salary;
            return this;
        }

        @Override
        public Employee freeze()
        {
            return new Employee(this);
        }
    }
}

Таким образом, чтобы изменить кеш, вы должны:

Employee victim = map.get(100);
map.put(100, victim.thaw().withSalary(whatever).freeze());

Это полностью атомарная операция.

person fge    schedule 19.06.2013
comment
Спасибо за ответ. Я попробую это сделать. - person Ananth; 19.06.2013
comment
Это не атомарная операция, поскольку у вас есть чтение, изменение и запись. Если вы действительно хотите иметь атомарную операцию, не прибегая к блокировке или транзакции, вы можете использовать следующий подход: 'for (;;) {Person oldPerson = map.get (foo); Человек newPerson = новый Человек (oldPerson); newPerson.incAge (); если (map.replace (foo, oldPerson, newPerson)) перерыв; } 'извините за дерьмовый макет .. - person pveentjer; 27.09.2013

Если есть вероятность, что другой узел может обновить данные, с которыми работает ваш узел, то использование put () перезапишет изменения, сделанные другим узлом. Обычно это нежелательное поведение, поскольку оно приводит к потере данных и несогласованному состоянию данных.

Взгляните на IMap.replace () и другие ConcurrentMap. Если replace() не удалось, значит, вы столкнулись с коллизией изменений. В этом случае вам следует попробовать еще раз:

  1. перечитать запись из hazelcast
  2. обновить поля
  3. сохранить в hazelcast с заменой

После нескольких неудачных попыток вы можете выбросить StorageException на верхний уровень.

person Sneg    schedule 15.06.2016