Как уменьшить высокую загрузку ЦП с помощью Elasticsearch

Я использую Elasticsearch 6.8.10 с Spring Boot 2.2.7 и Spring Data Elasticsearch.

У меня есть потоки данных статистики и тенденций, которые хранятся в темах Kafka. Эти темы читаются с помощью Spring Kafka и сохраняются в MongoDB и Elasticsearch для анализа и составления отчетов. Проблема, с которой я сталкиваюсь, заключается в том, что когда очереди обрабатываются и данные записываются в Elasticsearch, потребление ЦП Elasticsearch постоянно составляет около 250%. Это приводит к спорадическим ошибкам тайм-аута в приложении. Я понимаю, что индексация — это интенсивная операция, но я пытаюсь понять, что я могу сделать, чтобы уменьшить использование ЦП.

Данные:

  • Приблизительная статистика элементов очереди (1,2 млн)
  • Размер документа статистики (220 байт)

Детали конфигурации виртуальной машины:

  • 4 ЦП, 16 ГБ памяти, 20 ГБ диск (SSD)
  • Запуск на виртуальной машине в Google Cloud Platform.
  • ВМ используется только для Elasticsearch

Детали конфигурации Docker Elasticsearch:

  • Я использую один узел (на данный момент)
version: '2.4'
services:

  elasticsearch:
    container_name: elasticsearch
    image: 'docker.elastic.co/elasticsearch/elasticsearch:6.8.10'
    ports:
      - '9200:9200'
      - '9300:9300'
    mem_limit: 16GB
    environment:
      - discovery.type=single-node
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms8g -Xmx8g"      
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - 'esdata1:/usr/share/elasticsearch/data'
    restart: always

volumes:
  esdata1:
    driver: local

Пример документа Spring Stat:

  • Осколки = 1, реплики = 0
@Document(indexName = "stats_test", type = "stat", shards = 1, replicas = 0)
public class EsStat {

    @Id
    @Field(type = FieldType.Keyword)
    private String id;

    @Field(type = FieldType.Keyword)
    private String entityOrRelationshipId;

    @Field(type = FieldType.Keyword)
    private String articleId;

    @Field(type = FieldType.Keyword)
    private String status;

    @Field(type = FieldType.Keyword)
    private String type;

    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
    @Field(type = FieldType.Date, format = DateFormat.custom, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
    private ZonedDateTime date;

    @JsonProperty("type")
    @Field(type = FieldType.Keyword)
    private String dataSource;

    // getter and setters 
}

Репозиторий Stats Spring:

  • Индексация выполняется через репозиторий Spring Data Elasticsearch:
public interface StatElasticsearchRepository extends ElasticsearchRepository<EsStat, String> {
}

Отображение статистики:

{
  "stats": {
    "mappings": {
      "stat": {
        "properties": {
          "_class": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "articleId": {
            "type": "keyword"
          },
          "dataSource": {
            "type": "keyword"
          },
          "date": {
            "type": "date",
            "format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
          },
          "entityOrRelationshipId": {
            "type": "keyword"
          },
          "id": {
            "type": "keyword"
          },
          "status": {
            "type": "keyword"
          },
          "type": {
            "type": "keyword"
          }
        }
      }
    }
  }
}

Что я могу сделать, чтобы определить, почему загрузка ЦП так высока, и что я могу сделать, чтобы уменьшить ее?

Любые советы или рекомендации будут высоко оценены. Я рад добавить больше конфигурации/вывода, если это необходимо.


person Swordfish    schedule 25.06.2020    source источник
comment
у вас есть виртуальная машина с 16 ГБ только для ES, но ограничить ее с помощью: "ES_JAVA_OPTS=-Xms8g -Xmx8g" ? Как вы записываете свои записи в Elasticsearch с помощью Spring Data Elasticsearch? Используете ли вы репозитории или шаблон Elasticsearch, делаете ли вы одиночные или пакетные вставки?   -  person P.J.Meisch    schedule 26.06.2020
comment
Да, это основано на рекомендациях Elastic установить для кучи половину физической оперативной памяти (elastic.co/guide/en/elasticsearch/reference/current/). Я использую репозитории. На данный момент это одна вставка, так как каждое сообщение (документ) исходит от Кафки.   -  person Swordfish    schedule 26.06.2020
comment
репозиторий или шаблон? Вставка в репозиторий обновляет данные после каждой вставки.   -  person P.J.Meisch    schedule 26.06.2020
comment
Репозиторий (добавьте конфиг выше). Когда я смотрю на topThreads (elastic. co/guide/en/elasticsearch/reference/current/), я вижу обновление. Поскольку поступает много статистических данных / данных о тенденциях (я думаю, что опрос Kafka по умолчанию составляет 500 записей), может быть, обновление просто убивает его? Есть ли способ предотвратить обновление после вставки?   -  person Swordfish    schedule 26.06.2020
comment
@Swordfish да, вы можете отключить refresh_interval (по умолчанию 1 секунда), это один из самых полезных советов по улучшению производительности индексации, упомянутый в ссылках, которые я упомянул в своем ответе.   -  person user156327    schedule 26.06.2020
comment
@Swordfish прошло довольно много времени, было бы здорово, если бы вы могли продолжить и сообщить мне, если у вас есть дополнительные вопросы, также было бы неплохо, если бы вы могли проголосовать и принять мой ответ, если он вам помог.   -  person user156327    schedule 14.07.2020
comment
@Swordfish прошло довольно много времени, было бы здорово, если бы вы могли проголосовать и принять ответ, если он был вам полезен, и дайте мне знать, если вам нужна дополнительная информация.   -  person user156327    schedule 04.08.2020


Ответы (1)


Итак, давайте предположим, что это вызов обновления, который выполняется после каждого save(). Довольно легко добавить в ваш репозиторий метод, который выполняет сохранение без вызова обновления:

Сначала добавьте пользовательский интерфейс репозитория — здесь он у меня общий:

CustomRepository.java:

public interface CustomRepository<T> {
    T saveNoRefresh(T entity);
}

Реализация будет:

CustomRepositoryImpl.java:

public class CustomRepositoryImpl<T> implements CustomRepository<T> {

    private final ElasticsearchOperations operations;

    public CustomRepositoryImpl(ElasticsearchOperations operations) {
        this.operations = operations;
    }

    @Override
    public T saveNoRefresh(T entity) {
        IndexQuery query = new IndexQueryBuilder().withObject(entity).build();
        operations.index(query, operations.getIndexCoordinatesFor(entity.getClass()));
        return entity;
    }
}

Ваш репозиторий изменится на:

public interface StatElasticsearchRepository 
  extends ElasticsearchRepository<EsStat, String>,
           CustomRepository<EsStat> {
}

Вместо метода save(T) вызовите метод saveNoRefresh(T) вашего репозитория.

Еще лучше было бы, если бы вы могли собирать данные в пакеты определенного размера, а затем выполнять вызов saveAll().

person P.J.Meisch    schedule 26.06.2020
comment
ОК, спасибо PJ, я посмотрю, как это реализовать. Рекомендация по пакетной обработке — хорошая, я думаю, что смогу получить партию записей от Кафки. Может ли «saveAll()» также выполняться без обновления с использованием вышеуказанного механизма? - person Swordfish; 01.07.2020