Наборы потоков: есть ли способ подсчитать записи в теме Kafka с помощью наборов потоков?

Я использую StreamSets в качестве инструмента для загрузки записей из базы данных Oracle в темы Kafka. Теперь я хочу использовать его через сам StreamSets, а также хотел подсчитать количество записей в темах Kafka.

Как мне это сделать. Пожалуйста, помогите


person Ankita    schedule 12.05.2019    source источник


Ответы (1)


Вы можете использовать history REST API StreamSets Data Collector для извлечения данных с количеством записей для каждого этапа. Например, вот счетчики последнего запуска данного конвейера. Я использую отличный инструмент jq для анализа JSON в командной строке.

$ curl -s -u admin:admin -H 'X-Requested-By:sdc' http://localhost:18630/rest/v1/pipeline/RedshiftStreamingwithKinesisFirehose537add73-bb16-4358-a26a-a51576dea32b/history | jq -r .[0].metrics | jq .counters
{
  "pipeline.batchCount.counter": {
    "count": 1029
  },
  "pipeline.batchErrorMessages.counter": {
    "count": 0
  },
  "pipeline.batchErrorRecords.counter": {
    "count": 0
  },
  "pipeline.batchInputRecords.counter": {
    "count": 648226
  },
  "pipeline.batchOutputRecords.counter": {
    "count": 648226
  },
  "stage.ExpressionEvaluator_01.errorRecords.counter": {
    "count": 0
  },
  "stage.ExpressionEvaluator_01.inputRecords.counter": {
    "count": 648226
  },
  "stage.ExpressionEvaluator_01.outputRecords.counter": {
    "count": 648226
  },
  "stage.ExpressionEvaluator_01.stageErrors.counter": {
    "count": 0
  },
  "stage.ExpressionEvaluator_01:ExpressionEvaluator_01OutputLane15561338960790.outputRecords.counter": {
    "count": 648226
  },
  "stage.FieldOrder_01.errorRecords.counter": {
    "count": 0
  },
  "stage.FieldOrder_01.inputRecords.counter": {
    "count": 648226
  },
  "stage.FieldOrder_01.outputRecords.counter": {
    "count": 648226
  },
  "stage.FieldOrder_01.stageErrors.counter": {
    "count": 0
  },
  "stage.FieldOrder_01:FieldOrder_01OutputLane15561351879260.outputRecords.counter": {
    "count": 648226
  },
  "stage.FieldTypeConverter_01.errorRecords.counter": {
    "count": 0
  },
  "stage.FieldTypeConverter_01.inputRecords.counter": {
    "count": 648226
  },
  "stage.FieldTypeConverter_01.outputRecords.counter": {
    "count": 648226
  },
  "stage.FieldTypeConverter_01.stageErrors.counter": {
    "count": 0
  },
  "stage.FieldTypeConverter_01:FieldTypeConverter_01OutputLane15560499048280.outputRecords.counter": {
    "count": 648226
  },
  "stage.KinesisFirehose_01.errorRecords.counter": {
    "count": 0
  },
  "stage.KinesisFirehose_01.inputRecords.counter": {
    "count": 648226
  },
  "stage.KinesisFirehose_01.outputRecords.counter": {
    "count": 648226
  },
  "stage.KinesisFirehose_01.stageErrors.counter": {
    "count": 0
  },
  "stage.MySQLBinaryLog_01.errorRecords.counter": {
    "count": 0
  },
  "stage.MySQLBinaryLog_01.inputRecords.counter": {
    "count": 0
  },
  "stage.MySQLBinaryLog_01.outputRecords.counter": {
    "count": 648226
  },
  "stage.MySQLBinaryLog_01.stageErrors.counter": {
    "count": 0
  },
  "stage.MySQLBinaryLog_01:MySQLBinaryLog_01OutputLane15561313696850.outputRecords.counter": {
    "count": 648226
  },
  "stage.StreamSelector_01.errorRecords.counter": {
    "count": 0
  },
  "stage.StreamSelector_01.inputRecords.counter": {
    "count": 648226
  },
  "stage.StreamSelector_01.outputRecords.counter": {
    "count": 648226
  },
  "stage.StreamSelector_01.stageErrors.counter": {
    "count": 0
  },
  "stage.StreamSelector_01:StreamSelector_01OutputLane1556133811620.outputRecords.counter": {
    "count": 0
  },
  "stage.StreamSelector_01:StreamSelector_01OutputLane1556133816638.outputRecords.counter": {
    "count": 648226
  },
  "stage.Trash_01.errorRecords.counter": {
    "count": 0
  },
  "stage.Trash_01.inputRecords.counter": {
    "count": 0
  },
  "stage.Trash_01.outputRecords.counter": {
    "count": 0
  },
  "stage.Trash_01.stageErrors.counter": {
    "count": 0
  }
}
person metadaddy    schedule 14.05.2019
comment
Спасибо за ответ. Поправьте меня, если я ошибаюсь. Таким образом, приведенная выше информация — это та же информация, что и в предварительном просмотре наборов потоков. Верно? - person Ankita; 14.05.2019
comment
Это информация из панели истории — наборы потоков. com/documentation/datacollector/latest/help/ - person metadaddy; 14.05.2019
comment
Можете ли вы помочь мне с некоторыми другими способами? - person Ankita; 16.05.2019
comment
Что вы ищете? Чем он должен отличаться от вышеперечисленного? Изучили ли вы доступные вызовы REST - Help / RESTful API - person metadaddy; 21.05.2019
comment
Нет. Полностью не изучил. Я искал что-то, что мы можем добавить в качестве процессоров в конвейере? Любой способ или любая идея. - person Ankita; 21.05.2019
comment
Процессор здесь не при чем. REST API предоставляет всю статистику, отображаемую в пользовательском интерфейсе. - person metadaddy; 23.05.2019
comment
В порядке. Если нет способов с процессорами, то я принимаю и ваш ответ. Спасибо :) - person Ankita; 23.05.2019