Ввод потока WSO2BAM REST в BAM/Cassandra; не можете получить данные EVENT_KS с помощью запроса куста?

Фоном для этого вопроса по существу является статья, написанная Sachini Jayasekara @ WSO2, называется Использование различных структур отчетности с WSO2 Business Activity Monitor . Я делаю примерно то же самое, но использую REST API для определения потока данных и вызываю REST WS API для передачи данных в BAM. Затем используйте запросы HIVE, чтобы получить данные. Однако, кажется, я что-то упустил, так как данные атрибута не отображаются. Отсюда запрос.

В настоящее время используется REST API, который вызывается через демон на основе Perl. Это вызывает REST API, используя следующее определение потоков и полезную нагрузку:

{
  "name":"currentcostRealtime2.stream",
  "version": "1.0.6",
  "nickName": "Currentcost Realtime",
  "description": "This is the Currentcost realtime stream",
  "payloadData":[
    {
      "name":"sensor",
      "type":"INT"
    },
    {
      "name":"temp",
      "type":"FLOAT"
    },
    {
      "name":"timestamp",
      "type":"STRING"
    },
    {
      "name":"watt",
      "type":"INT"
    }
  ]
}

.. и определение полезной нагрузки ..

[
 {
   "payloadData" : [SENSOR, TEMP, "TIMESTAMP", WATT] ,
 }
]

Я должен отметить, что полезная нагрузка заменяется строкой до ее фиксации; например фактическая полезная нагрузка, которая зафиксирована, выглядит так:

[
 {
   "payloadData" : [1, 18.7, "2014-06-15 16:15:56", 1] ,
 }
]

Запросы выполняются без видимых проблем, но теперь у меня возникла проблема с запросом HIVE в BAM, который выводит записи, но не значения. Например. теперь пытаюсь выполнить следующий запрос HIVE:

CREATE TABLE IF NOT EXISTS CurrentCostDataTemp ( sensor INT, temp FLOAT, ts TIMESTAMP, watt INT ) 
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
    "cassandra.port" = "9160",
    "cassandra.ks.name" = "EVENT_KS",
    "cassandra.ks.username" = "admin",
    "cassandra.ks.password" = "admin",
    "cassandra.cf.name" = "currentcostRealtime2_stream",
    "cassandra.columns.mapping" = "payload_sensor, payload_temp, payload_timestamp, payload_watt" );

select * from CurrentCostDataTemp;                                  

.. но это дает только следующее (см. конкретное изображение ниже) - например. что НЕТ отображаемых данных уровня атрибута. Однако очевидно, что есть записи EVENT_KS, учитывая, что он выводит 4 строки.. поэтому вопрос в том, как я могу ссылаться на данные для извлечения значений, или здесь происходит что-то еще, о чем я не знаю?:

key sensor  temp    ts  watt
1402816273765::192.168.1.106::9443::52              
1402815283659::192.168.1.106::9443::51              
1402815238323::192.168.1.106::9443::49              
1402815280532::192.168.1.106::9443::50              

Убедились, что данные находятся в Cassandra, проверив с помощью Cqlsh - см. здесь:

cqlsh:EVENT_KS> select * from "currentcostRealtime_stream";

 key                                    | Description                             | Name                       | Nick_Name            | StreamId                         | Timestamp     | Version | meta_ipAdd | payload_sensor | payload_temp | payload_timestamp   | payload_watt
----------------------------------------+-----------------------------------------+----------------------------+----------------------+----------------------------------+---------------+---------+------------+----------------+--------------+---------------------+--------------
 1402815283659::192.168.1.106::9443::51 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815283659 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 14:54:43 |            1
 1402815238323::192.168.1.106::9443::49 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815238323 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 14:53:58 |            1
 1402815280532::192.168.1.106::9443::50 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815280532 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 14:54:40 |            1
 1402816273765::192.168.1.106::9443::52 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402816273765 |   1.0.5 |       null |              1 |         18.7 | 2014-06-15 15:11:13 |            1

(4 rows)

cqlsh:EVENT_KS>

Скорее всего, это незначительная проблема, которую я наблюдал, но было бы здорово, если бы кто-то еще увидел это и мог ответить.

При внешнем добавлении определения удаленной таблицы в базу данных MySQL таблицы и все остальное создаются, но похоже, что проблема заключается в получении данных атрибутов в самой таблице EVENT_KS, а также в создании и доступе к ним через сценарий HIVE.

Заранее спасибо!

/Йорген

[ОБНОВЛЕНИЕ – Четверг, 19 – РЕШЕНО] Удалось решить этот вопрос с помощью нескольких подсказок. Следующий код теперь работает нормально, и это здорово.. Большое спасибо за время, чтобы ответить от вас, ребята..

drop table CurrentCostDataTemp10;
drop table CurrentCostDataTemp_Summary10;

CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT ) 
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
  "cassandra.port" = "9160",
  "cassandra.ks.name" = "EVENT_KS",
  "cassandra.ks.username" = "<USER>",
  "cassandra.ks.password" = "<PASSWORD>",
  "cassandra.cf.name" = "currentcostsimple5_stream",
  "cassandra.columns.mapping" = ":key, payload_sensor, Timestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt" );

CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp_Summary10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT ) 
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
  'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
  'mapred.jdbc.url' = 'jdbc:mysql://127.0.0.1:8889/currentcost' ,
  'mapred.jdbc.username' = '<USER>',
  'mapred.jdbc.password' = '<PASSWORD>',
  'hive.jdbc.update.on.duplicate'= 'true',
  'hive.jdbc.primary.key.fields' = 'messageRowID',
  'hive.jdbc.table.create.query' = 'CREATE TABLE CurrentCostDataTemp1 ( messageRowID VARCHAR(100) NOT NULL PRIMARY KEY, payload_sensor TINYINT(4), messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql DATETIME, payload_watt INT ) ');

insert overwrite table CurrentCostDataTemp_Summary10 select messageRowID, payload_sensor, messageTimestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt FROM CurrentCostDataTemp10;

Использование различных структур отчетности с WSO2 Монитор деловой активности. Сачини Джаясекара


person VK3FNOR    schedule 17.06.2014    source источник


Ответы (2)


Я изменил ваш запрос следующим образом. Пожалуйста, попробуйте с этим.

CREATE external TABLE IF NOT EXISTS CurrentCostDataTemp ( key string, sensor INT, temp FLOAT, ts TIMESTAMP, watt INT ) 
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
    "cassandra.port" = "9160",
    "cassandra.ks.name" = "EVENT_KS",
    "cassandra.ks.username" = "admin",
    "cassandra.ks.password" = "admin",
    "cassandra.cf.name" = "currentcostRealtime2_stream",
    "cassandra.columns.mapping" = ":key,payload_sensor, payload_temp, payload_timestamp, payload_watt" );

select * from CurrentCostDataTemp;  
person Dunith Dhanushka    schedule 19.06.2014
comment
Отлично — начали тестирование, основываясь на том, что вы заметили, и в итоге получилось то, что сработало (см. ниже). Это дает правильные входные данные в MySQL, поэтому приближаемся - person VK3FNOR; 19.06.2014

Попробуйте изменить 1-ю строку скрипта следующим образом.

СОЗДАЙТЕ EXTERNAL ТАБЛИЦУ, ЕСЛИ НЕ СУЩЕСТВУЕТ CurrentCostDataTemp (key STRING, датчик INT, temp FLOAT, ts TIMESTAMP, ватт INT)

(Удалите часть key STRING, если она дает ошибки.)

Примечание. Возможно, вам придется запустить DROP TABLE CurrentCostDataTemp перед запуском выше, если он уже создан, когда вы запускали его раньше.

person Bee    schedule 17.06.2014
comment
Понятно! Оба ответа дали мне несколько советов, чтобы попытаться обновить подход e2e, и он заработал.. включил решение выше. Куча спасибо! - person VK3FNOR; 19.06.2014