Я новичок в HBase, и у меня возникла проблема, на которую я не могу найти ответ в Google.
Я пытаюсь массово вставить данные из Hive в HBase, используя подход с солевой таблицей, как описано в: https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase.-using-spark/
Единственный поворот заключается в том, что мне нужно вставить данные с несколькими столбцами. Таблица в Hive имеет следующие столбцы: Код, Описание, Total_emp, зарплата
Я пытаюсь вставить точно такие же столбцы в HBase. Таблица HBase выглядит следующим образом:
'test2', {TABLE_ATTRIBUTES => {METADATA => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy'}}, {NAME => 'epsg_3857', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '1000000', METADATA => {'NEW_VERSION_BEHAVIOR' => 'false'}}
Однако я продолжаю получать эту ошибку при вставке столбца зарплаты в HFile:
java.io.IOException: Added a key not lexically larger than previous. Current cell = 0:0:11-1011/epsg_3857:Salary/1557231349613/Put/vlen=6/seqid=0, lastCell = 0:0:11-1011/epsg_3857:Total/1557231349613/Put/vlen=6/seqid=0
Я могу создать HFile, если удалю столбец зарплаты или перенесу столбец зарплаты в новое семейство столбцов. Однако этого не должно быть, поскольку я читал, что одно семейство столбцов может вместить множество столбцов.
Я попытался увеличить размер блока по умолчанию до 1 МБ, и все та же проблема.
Ниже мой тестовый код:
import Salter.Salts._
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
object SaltedKeyExample2 extends App {
System.setProperty("HADOOP_USER_NAME", "cloudera")
val hive_session = SparkSession
.builder()
.appName("Salted Key Example 2")
.master("local[*]")
.config("spark.submit.deployMode", "client")
.config("spark.yarn.jars", "hdfs://192.168.30.12:8020/user/cloudera/jars/*.jar")
//.config("hive.metastore.uris", "thrift://192.168.30.12:9083")
.enableHiveSupport()
.getOrCreate()
import hive_session.sql
val df_07 = sql("SELECT * from sample_07")
val df_08 = sql("SELECT * from sample_08")
df_07.filter(df_07("salary") > 150000).show()
val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"), df_07.col("description"))
//val sourceRDD = df_09.rdd
val sourceRDD = df_07.filter(df_07("salary") > 150000).rdd
df_09.show()
val spp = new SaltPrefixPartitioner(modulus = 2)
val saltedRDD = sourceRDD.flatMap(r => {Seq((salt(r.getString(0), 2), (r.getString(0), r.get(1), r.get(2), r.get(3))))})
saltedRDD.foreach(x => println(x))
val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(spp)
partitionedRDD.foreach(x => println(x))
val cells = saltedRDD.sortByKey(true).flatMap(r => {
val salted_keys = salt(r._1, 2)
val codes = r._2._1.toString()
val descriptions = r._2._2.toString()
val total = r._2._3.toString()
val salary = r._2._4.toString()
val colFamily = "epsg_3857"
val colFamily2 = "epsg_3857_2"
val colNameCodes = "Code"
val colNameDesc = "Description"
val colNameTotal = "Total"
val colNameSalary = "Salary"
Seq((new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameCodes.getBytes(), codes.getBytes())),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameDesc.getBytes(), descriptions.getBytes())),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameTotal.getBytes(), total.getBytes())),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameSalary.getBytes(), salary.getBytes())))
})
cells.foreach(x => println(x))
// setup the HBase configuration
val baseConf = HBaseConfiguration.create(hive_session.sparkContext.hadoopConfiguration)
// NOTE: job creates a copy of the conf
val job = Job.getInstance(baseConf, "test2")
val connection = ConnectionFactory.createConnection(baseConf)
val table = connection.getTable(TableName.valueOf("test2"))
val regionLoc = connection.getRegionLocator(table.getName)
cells.foreach(x => println(x))
// Major gotcha(!) - see comments that follow
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLoc)
val conf = job.getConfiguration // important(!)
cells.foreach(x => println(x))
// write HFiles onto HDFS
cells.saveAsNewAPIHadoopFile(
"/tmp/test/hfiles",
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf)
println("hello")
}
Я ожидаю вставить более 3 столбцов в семейство столбцов в HBase, но на самом деле я не могу в данный момент. Ценим любую помощь в решении этой проблемы. Спасибо.