Загрузите вложение в Confluence с помощью Nifi ExecuteScript на Python

Я пытаюсь загрузить файл PDF в Confluence с помощью процессора Nifi ExecuteScript. Я могу успешно загрузить файл, но когда я загружаю и открываю его, он ПУСТО. Что-то не так с моим обращением. Может ли кто-нибудь помочь проверить?

Вот как я это делаю:

  1. загрузите файл PDF из внутреннего API  Атрибуты Nifi после загрузки файла
  2. ExecuteScript Groovy - для преобразования содержимого потокового файла в атрибут
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile)return
def text = ''
session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)

flowFile = session?.putAttribute(flowFile, "file_content", text)
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)

attribute file_content 3. ExecuteScript Python - для загрузки файла PDF в Confluence.

Вот мой код для №3. Думаю, здесь что-то не так ->

import json
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from org.apache.nifi.processor.io import OutputStreamCallback

class OutputWrite(OutputStreamCallback):
  def __init__(self, obj):
  self.obj = obj
  def process(self, outputStream):
     outputStream.write(bytearray(json.dumps(self.obj).encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
  url = 'https://myconfluence.com/rest/api/content/12345/child/attachment'
  auth = 'myauthorization'
  file_name = 'mypdf.pdf'
  file_content = flowFile.getAttribute('file_content')

  s = requests.Session()

  m = MultipartEncoder(fields={'file': (file_name, file_content, 'application/pdf')})
  headers = {"X-Atlassian-Token":"nocheck", "Authorization":auth, "Content-Type":m.content_type}

  r = s.post(url, data=m, headers=headers, verify=False)

  session.write(flowFile, OutputWrite(json.loads(r.text)))
  session.transfer(flowFile, REL_SUCCESS)
  session.commit()

ОБНОВЛЕНИЕ 28.06.2019

Я решил последовать совету Питера и объединить коды 1 и 2. Он все еще не работает. Раньше размер PDF-файла составлял 2 МБ, но он был ПУСТЫМ. Теперь его размер составляет 0 КБ. Любая помощь будет принята с благодарностью!

import json
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from org.apache.nifi.processor.io import OutputStreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback

class PyInputStreamCallback(InputStreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

class OutputWrite(OutputStreamCallback):
    def __init__(self, obj):
        self.obj = obj
    def process(self, outputStream):
        outputStream.write(bytearray(json.dumps(self.obj).encode('utf-8')))

text = ''
flowFile = session.get()
if(flowFile != None):
    session.read(flowFile, PyInputStreamCallback())
    confluence_attachment_api = flowFile.getAttribute('confluence_attachment_api')
    confluence_authorization = flowFile.getAttribute('confluence_authorization')
    file_name = flowFile.getAttribute('file_name')

    s = requests.Session()
    m = MultipartEncoder(fields={'file': (file_name, text, 'application/pdf')})
    headers = {"X-Atlassian-Token":"nocheck", "Authorization":confluence_authorization, "Content-Type":m.content_type}
    r = s.post(confluence_attachment_api, data=m, headers=headers, verify=False)

    session.write(flowFile, OutputWrite(json.loads(r.text)))
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

person Raii    schedule 28.06.2019    source источник
comment
просто добавьте несколько отпечатков на консоль в своем коде, чтобы увидеть, где вы теряете данные. единственный момент, который я вижу - вы читаете файл как текст с PyInputStreamCallback, но может быть двоичное содержимое ..   -  person daggett    schedule 09.07.2019


Ответы (1)


Не похоже, что вы действительно отправляете содержимое FlowFile. Вместо этого вы отправляете атрибут с именем file_content в качестве содержимого файла, что, вероятно, не то, что вы намеревались

Вам нужно будет сделать session.read, чтобы получить поток файлов. Приведенный ниже код не работает как есть, но показывает, как вы можете получить доступ к потоку.

class PyInputStreamCallback(InputStreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream):
    m = MultipartEncoder(fields={'file': (file_name, inputStream, 'application/pdf')})

session.read(flowFile, PyInputStreamCallback())

Ссылка: https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html

person Peter    schedule 28.06.2019
comment
Привет, @Peter, думаю, я сделал это здесь - ›ExecuteScript Python - для преобразования содержимого потокового файла в атрибут. Он находится в [file_content]. Я отредактирую свой вопрос и включу код (а не только скриншот). - person Raii; 28.06.2019
comment
@Raii - Извините, изображение было маленьким, поэтому я предположил, что это тот же процессор, который вы использовали для публикации данных. В любом случае, я бы посоветовал так не поступать. Просто объедините их в один сценарий, и посмотрите, как это получится. - person Peter; 28.06.2019
comment
Привет @Peter, я сделал все, что мог, и слил код. Я выложил это как обновление выше. Можете помочь проверить, есть ли у вас время? Спасибо! - person Raii; 28.06.2019