загрузка 1 ГБ данных в hbase занимает 1 час

Я хочу загрузить CSV-файл размером 1 ГБ (10 миллионов записей) в Hbase. Я написал для него программу Map-Reduce. Мой код работает нормально, но на выполнение требуется 1 час. Последний редуктор занимает более получаса. Может ли кто-нибудь помочь мне?

Мой код выглядит следующим образом:

Driver.Java


    package com.cloudera.examples.hbase.bulkimport;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /**
     * HBase bulk import example
* Data preparation MapReduce job driver *
    *
  1. args[0]: HDFS input path *
  2. args[1]: HDFS output path *
  3. args[2]: HBase table name *
*/ public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * NBA Final 2010 game 1 tip-off time (seconds from epoch) * Thu, 03 Jun 2010 18:00:00 PDT */ // conf.setInt("epoch.seconds.tipoff", 1275613200); conf.set("hbase.table.name", args[2]); // Load hbase-site.xml HBaseConfiguration.addHbaseResources(conf); Job job = new Job(conf, "HBase Bulk Import Example"); job.setJarByClass(HBaseKVMapper.class); job.setMapperClass(HBaseKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); HTable hTable = new HTable(conf, args[2]); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); // Load generated HFiles into table // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // loader.doBulkLoad(new Path(args[1]), hTable); } }

HColumnEnum.java


        package com.cloudera.examples.hbase.bulkimport;

    /**
     * HBase table columns for the 'srv' column family
     */
    public enum HColumnEnum {
      SRV_COL_employeeid ("employeeid".getBytes()),
      SRV_COL_eventdesc ("eventdesc".getBytes()),
      SRV_COL_eventdate ("eventdate".getBytes()),
      SRV_COL_objectname ("objectname".getBytes()),
      SRV_COL_objectfolder ("objectfolder".getBytes()),
      SRV_COL_ipaddress ("ipaddress".getBytes());

      private final byte[] columnName;

      HColumnEnum (byte[] column) {
        this.columnName = column;
      }

      public byte[] getColumnName() {
        return this.columnName;
      }
    }

HBaseKVMapper.java

package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import au.com.bytecode.opencsv.CSVParser;

/**
 * HBase bulk import example
 * <p>
 * Parses Facebook and Twitter messages from CSV files and outputs
 * <ImmutableBytesWritable, KeyValue>.
 * <p>
 * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
 * into the correct HBase table region.
 * <p>
 * The KeyValue value holds the HBase mutation information (column family,
 * column, and value)
 */
public class HBaseKVMapper extends
    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

  final static byte[] SRV_COL_FAM = "srv".getBytes();
  final static int NUM_FIELDS = 6;

  CSVParser csvParser = new CSVParser();
  int tipOffSeconds = 0;
  String tableName = "";

  // DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
  //    .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));

  ImmutableBytesWritable hKey = new ImmutableBytesWritable();
  KeyValue kv;

  /** {@inheritDoc} */
  @Override
  protected void setup(Context context) throws IOException,
      InterruptedException {
    Configuration c = context.getConfiguration();

  //  tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
    tableName = c.get("hbase.table.name");
  }

  /** {@inheritDoc} */
  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    /*if (value.find("Service,Term,") > -1) {
      // Skip header
      return;
    }*/

    String[] fields = null;

    try {
      fields = value.toString().split(",");
      //csvParser.parseLine(value.toString());
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
      return;
    }

    if (fields.length != NUM_FIELDS) {
      context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
      return;
    }

    // Get game offset in seconds from tip-off
  /*  DateTime dt = null;

    try {
      dt = p.parseDateTime(fields[9]);
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
      return;
    }

    int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
    String offsetForKey = String.format("%04d", gameOffset);

    String username = fields[2];
    if (username.equals("")) {
      username = fields[3];
    }*/

    // Key: e.g. "1200:twitter:jrkinley"
    hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
        .getBytes());

    // Service columns
    if (!fields[0].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[1].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[2].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[3].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[4].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[5].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
      context.write(hKey, kv);
    }


    context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);

    /*
     * Output number of messages per quarter and before/after game. This should
     * correspond to the number of messages per region in HBase
     */
  /*  if (gameOffset < 0) {
      context.getCounter("QStats", "BEFORE_GAME").increment(1);
    } else if (gameOffset < 900) {
      context.getCounter("QStats", "Q1").increment(1);
    } else if (gameOffset < 1800) {
      context.getCounter("QStats", "Q2").increment(1);
    } else if (gameOffset < 2700) {
      context.getCounter("QStats", "Q3").increment(1);
    } else if (gameOffset < 3600) {
      context.getCounter("QStats", "Q4").increment(1);
    } else {
      context.getCounter("QStats", "AFTER_GAME").increment(1);
    }*/
  }
}

Пожалуйста, помогите мне повысить производительность или сообщите мне, если у вас есть альтернативное решение с образцом кода.

МОЙ mapred-site.xml

 <?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>
  <name>mapred.job.tracker</name>
    <value>namenode:54311</value>
    </property>

<property>
  <name>mapred.reduce.parallel.copies</name>
    <value>20</value>
    </property>

<property>
  <name>tasktracker.http.threads</name>
    <value>50</value>
    </property>

<property>
  <name>mapred.job.shuffle.input.buffer.percent</name>
    <value>0.70</value>
    </property>

<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>reduce.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.job.shuffle.merge.percent</name>
    <value>0.65</value>
    </property>

<property>
  <name>mapred.task.timeout</name>
    <value>1200000</value>
    </property>

<property>
    <name>mapred.child.java.opts</name>
        <value>-Xms1024M -Xmx2048M</value>
        </property>



<property>
  <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>-1</value>
    </property>

<property>
    <name>mapred.compress.map.output</name>
    <value>true</value>
</property>

<property>
    <name>mapred.map.output.compression.codec</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

<property>
    <name>io.sort.mb</name>
    <value>800</value>
</property>


<property>
  <name>mapred.child.ulimit</name>
    <value>unlimited</value>
    </property>

<property>
<name>io.sort.factor</name>
<value>100</value>
<description>More streams merged at once while sorting files.</description>
</property>  


 <property>
 <name>mapreduce.admin.map.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>
 <property>
 <name>mapreduce.admin.reduce.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>


<property>
   <name>mapred.min.split.size</name>
   <value>0</value>
</property>

<property>
   <name>mapred.job.map.memory.mb</name>
     <value>-1</value>
     </property>

<property>
   <name>mapred.jobtracker.maxtasks.per.job</name>
        <value>-1</value>
             </property>


</configuration>

hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://namenode:54310/hbase</value>
    <description>The directory shared by RegionServers.
    </description>
</property>

<property>
    <name>hbase.master</name>
    <value>slave:60000</value>
    <description>The host and port that the HBase master runs at.
    A value of 'local' runs the master and a regionserver
    in a single process.
    </description>
</property>

<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
    <description>The mode the cluster will be in. Possible values are
    false: standalone and pseudo-distributed setups with managed Zookeeper
    true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh)
    </description>
</property>

<property>
    <name>hbase.zookeeper.quorum</name>
    <value>slave</value>
    <description>Comma separated list of servers in the ZooKeeper Quorum.
    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
    By default this is set to localhost for local and pseudo-distributed modes
    of operation. For a fully-distributed setup, this should be set to a full
    list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh
    this is the list of servers which we will start/stop ZooKeeper on.
    </description>
</property>

<property>
       <name>hbase.zookeeper.property.clientPort</name>
       <value>2181</value>
</property>

<property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hduser/work/zoo_data</value>
    <description>Property from ZooKeeper's config zoo.cfg.
    The directory where the snapshot is stored.
    </description>
</property>

</configuration>

Пожалуйста, помогите мне, чтобы я мог улучшить свою работу.


person Viru    schedule 02.05.2014    source источник
comment
Вы пробовали профилировать и контролировать ввод-вывод?   -  person awksp    schedule 02.05.2014
comment
Для меня время дольше минуты звучит довольно медленно. Похоже, что-то не так, и профилирование вашего приложения кажется лучшим подходом. Я бы начал с процесса, который использует больше всего ЦП.   -  person Peter Lawrey    schedule 02.05.2014
comment
Возможно, это как-то связано с размером транзакции, если вы еще этого не делаете, я бы попытался объединить вставки, скажем, по 5 КБ на транзакцию.   -  person Gimby    schedule 02.05.2014
comment
Я поместил всю лучшую конфигурацию в свой mapred-site.xml, но все еще не увеличил производительность. Я использую 5-узловой кластер. У меня все узлы с процессорами Quard Core и 8 ГБ оперативной памяти. Пожалуйста, подскажите, как я могу улучшить производительность   -  person Viru    schedule 02.05.2014
comment
Какие у вас mapred-site.xml и hbase-site.xml? Есть ли у вас какие-либо показатели оборудования, такие как сеть, ввод-вывод, загрузка ЦП?   -  person b4hand    schedule 03.05.2014
comment
@ b4hand Вы можете найти ниже mapredsite.xml и hbase-site.xml.   -  person Viru    schedule 04.05.2014
comment
Как насчет размера кучи JVM? А размер кучи Hadoop JVM? Вы пытались это проверить? Попробуй установить на 2048мб мин.   -  person eliasah    schedule 04.05.2014
comment
Вы упомянули, что последний редуктор требует больше времени. Возможно ли, что ваши ключи неправильно разбиты на разделы. Это означает, что первые три редуктора получают, скажем, 25% рабочей нагрузки, а последний - 75% из 10 миллионов записей. Можете ли вы хешировать ключи или создать собственный метод раздела.   -  person Arun Janarthnam    schedule 05.05.2014


Ответы (3)


Во-первых, зачем нужна программа Mapreduce для загрузки данных в Hbase для такого маленького файла (1 ГБ).

По моему опыту, я обработал 5 ГБ Json с помощью потоковой передачи Джексона (я не хочу переносить весь json в память) и сохранил в Hbase через 8 минут, используя технику пакетной обработки.

Я использовал hbase помещает в пакетный список объекты 100000 записей.

Ниже приведен фрагмент кода, с помощью которого я добился этого. То же самое можно сделать и при разборе других форматов)

Может быть вам нужно вызвать этот метод в 2-х местах

1) с пакетом 100000 записей.

2) Для обработки напоминания о ваших пакетных записях меньше 100000

  public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception {
        try {
            final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
            table.put(puts);
            LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
        } catch (final Throwable e) {
            e.printStackTrace();
        } finally {
            LOG.info("Processed ---> " + puts.size());
            if (puts != null) {
                puts.clear();
            }
        }
    }
person Ram Ghadiyaram    schedule 02.05.2016

Я создал только класс сопоставления и взял класс выходного формата hbase. Теперь это займет 10 минут. Скорость моей сети очень низкая, поэтому это занимает много времени.

person Viru    schedule 26.05.2014
comment
Это может быть достигнуто с помощью обычной Java-программы с меньшим, чем временной интервал mapreduce. пожалуйста, посмотрите мой ответ. - person Ram Ghadiyaram; 02.05.2016

Его можно дополнительно настроить, указав количество разделов на регионы, которые будут использоваться при создании таблицы Hbase. Поскольку количество экземпляров редукторов для массовой загрузки также будет зависеть от количества регионов. Это можно сделать с помощью следующей команды

hbase org.apache.hadoop.hbase.util.RegionSplitter -c <number of regions> -f <column families> <New Hbase Table Name> <splitAlgorithm>

для алгоритма разделения можно указать

  • UniformSplit - обрабатывает ключи как произвольные байты

  • HexStringSplit - обрабатывает ключи как шестнадцатеричный ASCII

person BigDataArjun    schedule 25.09.2014