данные kafka в elaticsearch

Я пытаюсь отправить данные от производителя apache kafka в эластичный поиск с помощью kafka connect elasticsearch

Получение следующей ошибки в elasticsearch:

[2017-12-21T11:00:54,979][DEBUG][o.e.a.b.TransportShardBulkAction] 
[pageviews7][0] failed to execute bulk item (index) BulkShardRequest 
[[pageviews7][0]] containing [index {[pageviews7][kafkaconnect]
[pageviews7+0+0], source[{"key1":"value1"}]}]
org.elasticsearch.index.mapper.MapperParsingException: failed to find 
type parsed [string] for [key1]

Ниже приведен мой файл свойств kafka-connect-elasticsearch:

name=elasticsearch-sink
connector.class=
io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=pageviews7
key.ignore=true
connection.url=http://localhost:9200
type.name=kafkaconnect
schemas.enable=false
schema.ignore=true

person Sandesh    schedule 21.12.2017    source источник


Ответы (3)


Попробуйте соединители stream-reactor Kafka для Elastic-Search. Один использует протокол TCP, а другой - протокол HTTP, и они прошли боевые испытания в производственной среде, погружая 5+ миллиардов событий в день в индексы ES, и они довольно просты в настройке и хорошо документированы.

https://github.com/landoop/stream-reactor

person Antonios Chalkiopoulos    schedule 22.12.2017

Это похоже на проблему с индексированием документа в Elasticsearch:

  1. Попробуйте опубликовать документ в ES вручную
  2. Вы должны получить ту же ошибку. Это означает, что у вас возникли проблемы с отображением индекса.
  3. Если у индекса есть проблема с отображением, как я подозреваю, вы можете либо записать в другой индекс, используя свойство topic.index.map, либо удалить текущий индекс (только если вы не заботятся о данных).

Пример преобразования вручную в ES:

curl -XPOST <ES-url>/pageviews7/test_id -d '{"key1":"value1"}'

Удалить текущий Index:

curl -XDELETE <ES-url>/pageviews7
person Ehud Lev    schedule 21.12.2017

Какую версию Elasticsearch вы используете? Вы можете разрешить настройку schema.ignore=true в конфигурации коннектора Kafka Connect для обхода этой ошибки.

person Robin Moffatt    schedule 06.02.2018