Невозможно вставить более 3 столбцов в семейство столбцов

Я новичок в HBase, и у меня возникла проблема, на которую я не могу найти ответ в Google.

Я пытаюсь массово вставить данные из Hive в HBase, используя подход с солевой таблицей, как описано в: https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase.-using-spark/

Единственный поворот заключается в том, что мне нужно вставить данные с несколькими столбцами. Таблица в Hive имеет следующие столбцы: Код, Описание, Total_emp, зарплата

Я пытаюсь вставить точно такие же столбцы в HBase. Таблица HBase выглядит следующим образом:

'test2', {TABLE_ATTRIBUTES => {METADATA => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy'}}, {NAME => 'epsg_3857', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '1000000', METADATA => {'NEW_VERSION_BEHAVIOR' => 'false'}}

Однако я продолжаю получать эту ошибку при вставке столбца зарплаты в HFile:

java.io.IOException: Added a key not lexically larger than previous. Current cell = 0:0:11-1011/epsg_3857:Salary/1557231349613/Put/vlen=6/seqid=0, lastCell = 0:0:11-1011/epsg_3857:Total/1557231349613/Put/vlen=6/seqid=0

Я могу создать HFile, если удалю столбец зарплаты или перенесу столбец зарплаты в новое семейство столбцов. Однако этого не должно быть, поскольку я читал, что одно семейство столбцов может вместить множество столбцов.

Я попытался увеличить размер блока по умолчанию до 1 МБ, и все та же проблема.

Ниже мой тестовый код:

import Salter.Salts._
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2

object SaltedKeyExample2 extends App {
  System.setProperty("HADOOP_USER_NAME", "cloudera")

  val hive_session = SparkSession
    .builder()
    .appName("Salted Key Example 2")
    .master("local[*]")
    .config("spark.submit.deployMode", "client")
    .config("spark.yarn.jars", "hdfs://192.168.30.12:8020/user/cloudera/jars/*.jar")
    //.config("hive.metastore.uris", "thrift://192.168.30.12:9083")
    .enableHiveSupport()
    .getOrCreate()

  import hive_session.sql

  val df_07 = sql("SELECT * from sample_07")
  val df_08 = sql("SELECT * from sample_08")
  df_07.filter(df_07("salary") > 150000).show()
  val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"), df_07.col("description"))
  //val sourceRDD = df_09.rdd
  val sourceRDD = df_07.filter(df_07("salary") > 150000).rdd
  df_09.show()

  val spp = new SaltPrefixPartitioner(modulus = 2)

  val saltedRDD = sourceRDD.flatMap(r => {Seq((salt(r.getString(0), 2), (r.getString(0), r.get(1), r.get(2), r.get(3))))})

  saltedRDD.foreach(x => println(x))

  val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(spp)

  partitionedRDD.foreach(x => println(x))

  val cells = saltedRDD.sortByKey(true).flatMap(r => {
    val salted_keys = salt(r._1, 2)
    val codes = r._2._1.toString()
    val descriptions = r._2._2.toString()
    val total = r._2._3.toString()
    val salary = r._2._4.toString()
    val colFamily = "epsg_3857"
    val colFamily2 = "epsg_3857_2"
    val colNameCodes = "Code"
    val colNameDesc = "Description"
    val colNameTotal = "Total"
    val colNameSalary = "Salary"

    Seq((new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameCodes.getBytes(), codes.getBytes())),
      (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameDesc.getBytes(), descriptions.getBytes())),
      (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameTotal.getBytes(), total.getBytes())),
      (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), colNameSalary.getBytes(), salary.getBytes())))
  })

  cells.foreach(x => println(x))

  // setup the HBase configuration
  val baseConf = HBaseConfiguration.create(hive_session.sparkContext.hadoopConfiguration)

  // NOTE: job creates a copy of the conf
  val job = Job.getInstance(baseConf, "test2")
  val connection = ConnectionFactory.createConnection(baseConf)
  val table = connection.getTable(TableName.valueOf("test2"))
  val regionLoc = connection.getRegionLocator(table.getName)
  cells.foreach(x => println(x))
  // Major gotcha(!) - see comments that follow
  HFileOutputFormat2.configureIncrementalLoad(job, table, regionLoc)

  val conf = job.getConfiguration // important(!)

  cells.foreach(x => println(x))
  // write HFiles onto HDFS
  cells.saveAsNewAPIHadoopFile(
    "/tmp/test/hfiles",
    classOf[ImmutableBytesWritable],
    classOf[KeyValue],
    classOf[HFileOutputFormat2],
    conf)

  println("hello")
}

Я ожидаю вставить более 3 столбцов в семейство столбцов в HBase, но на самом деле я не могу в данный момент. Ценим любую помощь в решении этой проблемы. Спасибо.


person Kok-Lim Wong    schedule 07.05.2019    source источник
comment
stackoverflow.com/questions/46325233/ отвечает ли это на ваш вопрос?   -  person Ben Watson    schedule 07.05.2019
comment
Привет @Бен Уотсон. Дайте мне немного времени, чтобы увидеть, как я могу это сделать. Обычно я новичок в искрах, скалах и т. д. Практически во всех технологиях больших данных. Я вернусь к вам.   -  person Kok-Lim Wong    schedule 07.05.2019
comment
Справедливо. Я бы сказал, что запись напрямую в HFiles — довольно сложная задача, но, надеюсь, ссылка поможет.   -  person Ben Watson    schedule 07.05.2019
comment
Святая корова! это сработало! Так что придется все сортировать в HFile. Быстрый вопрос.... как отметить комментарий, который сработал? вы знаете зеленую галочку, чтобы показать, что комментарий сработал?   -  person Kok-Lim Wong    schedule 07.05.2019
comment
Я не отвечал, поэтому галочки нет. Я отмечу это как дубликат ответа, на который я ссылался; таким образом другие люди увидят это.   -  person Ben Watson    schedule 07.05.2019