Фоном для этого вопроса по существу является статья, написанная Sachini Jayasekara @ WSO2, называется Использование различных структур отчетности с WSO2 Business Activity Monitor . Я делаю примерно то же самое, но использую REST API для определения потока данных и вызываю REST WS API для передачи данных в BAM. Затем используйте запросы HIVE, чтобы получить данные. Однако, кажется, я что-то упустил, так как данные атрибута не отображаются. Отсюда запрос.
В настоящее время используется REST API, который вызывается через демон на основе Perl. Это вызывает REST API, используя следующее определение потоков и полезную нагрузку:
{
"name":"currentcostRealtime2.stream",
"version": "1.0.6",
"nickName": "Currentcost Realtime",
"description": "This is the Currentcost realtime stream",
"payloadData":[
{
"name":"sensor",
"type":"INT"
},
{
"name":"temp",
"type":"FLOAT"
},
{
"name":"timestamp",
"type":"STRING"
},
{
"name":"watt",
"type":"INT"
}
]
}
.. и определение полезной нагрузки ..
[
{
"payloadData" : [SENSOR, TEMP, "TIMESTAMP", WATT] ,
}
]
Я должен отметить, что полезная нагрузка заменяется строкой до ее фиксации; например фактическая полезная нагрузка, которая зафиксирована, выглядит так:
[
{
"payloadData" : [1, 18.7, "2014-06-15 16:15:56", 1] ,
}
]
Запросы выполняются без видимых проблем, но теперь у меня возникла проблема с запросом HIVE в BAM, который выводит записи, но не значения. Например. теперь пытаюсь выполнить следующий запрос HIVE:
CREATE TABLE IF NOT EXISTS CurrentCostDataTemp ( sensor INT, temp FLOAT, ts TIMESTAMP, watt INT )
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160",
"cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "admin",
"cassandra.ks.password" = "admin",
"cassandra.cf.name" = "currentcostRealtime2_stream",
"cassandra.columns.mapping" = "payload_sensor, payload_temp, payload_timestamp, payload_watt" );
select * from CurrentCostDataTemp;
.. но это дает только следующее (см. конкретное изображение ниже) - например. что НЕТ отображаемых данных уровня атрибута. Однако очевидно, что есть записи EVENT_KS, учитывая, что он выводит 4 строки.. поэтому вопрос в том, как я могу ссылаться на данные для извлечения значений, или здесь происходит что-то еще, о чем я не знаю?:
key sensor temp ts watt
1402816273765::192.168.1.106::9443::52
1402815283659::192.168.1.106::9443::51
1402815238323::192.168.1.106::9443::49
1402815280532::192.168.1.106::9443::50
Убедились, что данные находятся в Cassandra, проверив с помощью Cqlsh - см. здесь:
cqlsh:EVENT_KS> select * from "currentcostRealtime_stream";
key | Description | Name | Nick_Name | StreamId | Timestamp | Version | meta_ipAdd | payload_sensor | payload_temp | payload_timestamp | payload_watt
----------------------------------------+-----------------------------------------+----------------------------+----------------------+----------------------------------+---------------+---------+------------+----------------+--------------+---------------------+--------------
1402815283659::192.168.1.106::9443::51 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815283659 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 14:54:43 | 1
1402815238323::192.168.1.106::9443::49 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815238323 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 14:53:58 | 1
1402815280532::192.168.1.106::9443::50 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402815280532 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 14:54:40 | 1
1402816273765::192.168.1.106::9443::52 | This is the Currentcost realtime stream | currentcostRealtime.stream | Currentcost Realtime | currentcostRealtime.stream:1.0.5 | 1402816273765 | 1.0.5 | null | 1 | 18.7 | 2014-06-15 15:11:13 | 1
(4 rows)
cqlsh:EVENT_KS>
Скорее всего, это незначительная проблема, которую я наблюдал, но было бы здорово, если бы кто-то еще увидел это и мог ответить.
При внешнем добавлении определения удаленной таблицы в базу данных MySQL таблицы и все остальное создаются, но похоже, что проблема заключается в получении данных атрибутов в самой таблице EVENT_KS, а также в создании и доступе к ним через сценарий HIVE.
Заранее спасибо!
/Йорген
[ОБНОВЛЕНИЕ – Четверг, 19 – РЕШЕНО] Удалось решить этот вопрос с помощью нескольких подсказок. Следующий код теперь работает нормально, и это здорово.. Большое спасибо за время, чтобы ответить от вас, ребята..
drop table CurrentCostDataTemp10;
drop table CurrentCostDataTemp_Summary10;
CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT )
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160",
"cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "<USER>",
"cassandra.ks.password" = "<PASSWORD>",
"cassandra.cf.name" = "currentcostsimple5_stream",
"cassandra.columns.mapping" = ":key, payload_sensor, Timestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt" );
CREATE EXTERNAL TABLE IF NOT EXISTS CurrentCostDataTemp_Summary10 ( messageRowID STRING, payload_sensor INT, messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql STRING, payload_watt INT )
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
'mapred.jdbc.url' = 'jdbc:mysql://127.0.0.1:8889/currentcost' ,
'mapred.jdbc.username' = '<USER>',
'mapred.jdbc.password' = '<PASSWORD>',
'hive.jdbc.update.on.duplicate'= 'true',
'hive.jdbc.primary.key.fields' = 'messageRowID',
'hive.jdbc.table.create.query' = 'CREATE TABLE CurrentCostDataTemp1 ( messageRowID VARCHAR(100) NOT NULL PRIMARY KEY, payload_sensor TINYINT(4), messageTimestamp BIGINT, payload_temp FLOAT, payload_timestamp BIGINT, payload_timestampmysql DATETIME, payload_watt INT ) ');
insert overwrite table CurrentCostDataTemp_Summary10 select messageRowID, payload_sensor, messageTimestamp, payload_temp, payload_timestamp, payload_timestampmysql, payload_watt FROM CurrentCostDataTemp10;
Использование различных структур отчетности с WSO2 Монитор деловой активности. Сачини Джаясекара