Как получить тип полей записи StreamSets внутри Jython Evaluator

У меня есть конвейер StreamSets, где я читаю из удаленной базы данных SQL Server, используя компонент JDBC в качестве источника, и помещаю данные в Hive и озеро данных Kudu.

Я столкнулся с некоторыми проблемами с типом двоичных столбцов, поскольку в Impala нет поддержки двоичного типа, который я использую для доступа как к Hive, так и к Kudu.

Я решил преобразовать столбцы типа Binary (которые передаются в конвейере как тип Byte_Array) в String и вставить их вот так.

Я попытался использовать элемент преобразователя типов полей для преобразования всех типов Byte_Array в String, но это не сработало. Поэтому я использовал компонент Jython для преобразования всех типов arr.arr в String. Он работает нормально, пока я не получил значение Null в этом поле, поэтому тип Jython был None.type, и я не смог обнаружить тип Byte_Array и не смог преобразовать его в String. Поэтому я не мог вставить его в Куду.

Любая помощь, как получить типы полей записи StreamSets внутри Jython Evaluator? Или какие-либо предлагаемые решения проблемы, с которой я столкнулся?


person Abdelrahman Aly    schedule 23.07.2019    source источник


Ответы (2)


Вам нужно использовать sdcFunctions.getFieldNull(), чтобы проверить, является ли поле NULL_BYTE_ARRAY. Например:

import array

def convert(item):
  return ':-)'

def is_byte_array(record, k, v):
  # getFieldNull expect a field path, so we need to prepend the '/'
  return (sdcFunctions.getFieldNull(record, '/'+k) == NULL_BYTE_ARRAY 
          or (type(v) == array.array and v.typecode == 'b'))

for record in records:
  try:
    record.value = {k: convert(v) if is_byte_array(record, k, v) else v 
                    for k, v in record.value.items()}
    output.write(record)

  except Exception as e:
    error.write(record, str(e))

введите здесь описание изображения

person metadaddy    schedule 24.07.2019
comment
Спасибо, что нашли время ответить на этот - person Abdelrahman Aly; 25.07.2019

Итак, вот мое окончательное решение:

  • Вы можете использовать приведенную ниже логику для обнаружения любого типа StreamSet внутри компонента Jython, используя NULL_CONSTANTS:

    NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG, 
    NULL_FLOAT, NULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL, 
    NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP
    

Идея состоит в том, чтобы сохранить значение поля во временной переменной, установить для поля значение None и использовать функцию sdcFunctions.getFieldNull, чтобы узнать тип StreamSets, сравнив его с одним из NULL_CONSTANTS.

import binascii

def toByteArrayToHexString(value):
  if value is None:
    return NULL_STRING
  value = '0x'+binascii.hexlify(value).upper()
  return value

for record in records:
  try:

    for colName,value in record.value.items():
      temp = record.value[colName]
      record.value[colName] = None
      if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_BYTE_ARRAY:
        temp = toByteArrayToHexString(temp)
      record.value[colName] = temp

    output.write(record)
  except Exception as e
    error.write(record, str(e))

Ограничение: приведенный выше код преобразует тип Date в тип Datetime, только если он имеет значение (если он не NULL).

person Abdelrahman Aly    schedule 25.07.2019
comment
Любопытно, почему вы не использовали тест (type(v) == array.array and v.typecode == 'b'), чтобы проверить, является ли значение массивом байтов? - person metadaddy; 26.07.2019
comment
Потому что я хочу получить точный тип, используемый в StreamSets, в любом случае без использования преобразований Jython. Я думаю, что тип Jython является избыточной информацией, если у вас уже есть тип StreamSets. Кроме того, я не уверен, что преобразование типов StreamSets в Jython является сопоставлением один к одному. Что ты об этом думаешь? Другое дело, что после этой строки record.value[colName] = None тип Jython будет None.type, и я не хочу это проверять. - person Abdelrahman Aly; 28.07.2019