Разделение ввода для функции Map в Hadoop

Это моя первая реализация в Hadoop. Я пытаюсь реализовать свой алгоритм вероятностного набора данных в Map Reduce. В моем наборе данных последний столбец будет иметь некоторый идентификатор (количество уникальных идентификаторов в наборе данных равно количеству узлов в моем кластере). Я должен разделить свой набор данных на основе этого значения столбца, и каждый набор записей должен обрабатываться каждым узлом в моем кластере.

Например, если у меня есть три узла в моем кластере, для приведенного ниже набора данных один узел должен обрабатывать все записи с идентификатором = 1, другой — с идентификатором = 2, еще один — с идентификатором = 3.

name time  dept  id
--------------------
 b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

Моя функция карты должна принимать каждое разделение в качестве входных данных и обрабатывать его параллельно в каждом узле.

Я просто пытаюсь понять, какой подход возможен в Hadoop. Либо ввести этот набор данных в качестве входных данных для моей функции карты, либо передать дополнительный аргумент с картой, чтобы разделить данные на основе значения идентификатора. Или заранее разделить данные на подмножества «n» (количество узлов) и загрузить их в узлы, если это правильный подход, как можно разделить данные на основе значения и загрузить в разные узлы. Потому что, как я понял из своих чтений, это то, что Hadoop разбивает данные на блоки в зависимости от указанного размера. Как мы можем указать конкретное условие при загрузке. Просто чтобы добавить, я пишу свою программу на python.

Кто-нибудь, пожалуйста, посоветуйте. Спасибо


person ds_user    schedule 08.09.2014    source источник


Ответы (2)


Самое простое для вас, вероятно, было бы сделать так, чтобы сопоставитель выдавал данные с идентификатором в качестве ключа, что гарантирует, что один редуктор получит все записи для определенного идентификатора, а затем выполнит вашу обработку на этапе редьюсера.

Например,

Входные данные:

 b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

Код сопоставления:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[-1]
    print key + "\t" + line

Вывод карты:

 1  b1  2:00pm z1   1
 2  b2  3:00pm z2   2
 1  c1  4:00pm y2   1
 3  b3  3:00pm z3   3
 2  c4  4:00pm x2   2

Редуктор 1:

 1  b1  2:00pm z1   1
 1  c1  4:00pm y2   1

Редуктор 2:

 2  b2  3:00pm z2   2

Редуктор 3:

 3  b3  3:00pm z3   3

Код редуктора:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    orig_line = "\t".join(cols[1:])
    # do stuff...

Обратите внимание, что таким образом один редьюсер может получить несколько ключей, но данные будут упорядочены, и вы можете контролировать количество редюсеров с помощью параметра mapred.reduce.tasks.

EDIT Если вы хотите собрать свои данные в редукторе для каждого ключа, вы можете сделать что-то вроде этого (не уверен, что он будет работать как есть, но вы поняли идею)

#!/usr/bin/env python
import sys
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

last_key = None
data = []
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    if last_key and key != last_key:
        process_data(last_key, data)
        data = []
    orig_line = "\t".join(cols[1:])
    data.append(orig_line)
    last_key = key
process_data(last_key, data)

Если вас не беспокоит нехватка памяти на шаге редуктора, вы можете упростить код следующим образом:

#!/usr/bin/env python
import sys
from collections import defaultdict
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

all_data = defaultdict(list)
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    orig_line = "\t".join(cols[1:])
    all_data[key].append(orig_line)
for key, data in all_data.iteritems():
    process_data(key, data)
person Nonnib    schedule 16.09.2014
comment
Один редуктор не должен получать несколько ключей, так как я хочу, чтобы мой алгоритм запускался для каждой записи набора данных (записи, принадлежащие одному ключу), это повлияет на мой вывод. И после получения результатов моего алгоритма по всем ключам мне нужно еще больше уменьшить его, найдя минимальные подмножества вывода. как я могу этого добиться? - person ds_user; 16.09.2014
comment
Несмотря на то, что редюсер получит более одного ключа, вы можете обрабатывать его как хотите. Например, собирайте данные, пока не дойдете до следующего ключа (помните, что он упорядочен), и в этот момент вы обрабатываете данные для предыдущего ключа. Используя этот метод, чтобы еще больше уменьшить его, вам нужно будет запустить еще одно задание map/reduce на выходе или просто запустить скрипт python для выходных данных (если он достаточно мал). - person Nonnib; 16.09.2014
comment
В этом есть смысл. прохладно. Возможно ли, чтобы один ключ был разделен более чем на один редуктор? Потому что мне нужны все записи для одного ключа за раз, чтобы запустить мой алгоритм - person ds_user; 16.09.2014
comment
Нет, Hadoop гарантирует, что все записи для одного ключа будут обрабатываться одним и тем же редюсером. - person Nonnib; 16.09.2014
comment
Отлично, после того, как я получу ввод в редуктор, мне нужно только это b1 14:00 z1 из ввода сокращения 1 b1 14:00 z1 1, я добавил 1 для обозначения номера моего набора данных, в таком случае, как я могу манипулировать этим в python, потому что я не уверен, какой редуктор формата получает ввод и как его анализировать. - person ds_user; 16.09.2014
comment
И когда вы говорите для строки в sys.stdin: в коде сокращения, обрабатывает ли он каждую строку за раз или все данные, принадлежащие каждому ключу за раз, потому что мне нужны все данные для определенного ключа за раз для обработки. Если я могу собрать все данные для ключа один раз и обработать, как я могу это реализовать. Любая помощь? - person ds_user; 16.09.2014
comment
При потоковой передаче вы получите одну строку в стандартном вводе на каждую строку вывода из преобразователя (что отличается от родной Java, где редуктор будет получать все данные для каждого ключа сразу). Вам нужно будет собрать данные самостоятельно, но это довольно просто. Просто поместите его в словарь на этапе for line in sys.stdin, а затем выполните итерацию по словарю. - person Nonnib; 16.09.2014
comment
Спасибо большое. Это очень помогает, последний вопрос, вы что-то упомянули - собирайте данные, пока не дойдете до следующего ключа, есть ли для этого специальная функция в python? - person ds_user; 16.09.2014
comment
Нет, не совсем. Это просто оптимизация, если вы не можете поместить весь набор данных в память. Собирайте вещи в список, пока не нажмете следующую клавишу, затем вызовите process_data(my_data) для предыдущей клавиши. Просто поместить данные в словарь, а затем обработать их отдельно после того, как все строки будут обработаны, проще. - person Nonnib; 16.09.2014
comment
Неа. Я спрашивал, как реализовать эту логику: собирать до тех пор, пока вы не нажмете следующую клавишу, есть ли функция для получения данных для одной клавиши или мне нужно сделать это вручную, если бы вы могли написать код, это было бы здорово . - person ds_user; 16.09.2014
comment
Я изменил ответ, добавив запрошенный код. Надеюсь, это поможет! - person Nonnib; 16.09.2014
comment
Давайте продолжим обсуждение в чате. - person ds_user; 16.09.2014

Если я понял ваш вопрос, лучший способ — загрузить набор данных в таблицу hive, а затем написать UDF на python. После этого сделайте что-то вроде этого:

select your_python_udf(name, time, dept, id) from table group by id;

Это похоже на фазу сокращения, поэтому вам, возможно, это понадобится перед запуском запроса.

set mapred.reduce.tasks=50;

Как создать пользовательскую UDF:

Плагины Hive

Создать функцию

person nosmo    schedule 12.09.2014
comment
Привет. Спасибо за объяснение, но не могли бы вы уточнить, как я могу тогда использовать данные. Это должен быть мой шаг карты, так как я должен запустить свой алгоритм для каждого подмножества (на основе идентификатора) в каждом узле. Все не должно работать в одном узле. Затем выходные данные всех этих узлов будут использоваться на этапе сокращения для обработки и получения объединенного вывода. - person ds_user; 13.09.2014