kafka python – Bluemix MessageHub – ConnectionError: сокет отключен

Я использую клиент kafka python для отправки сообщений в концентратор сообщений, но заметил, что через некоторое время после запуска моего приложения оно перестанет отправлять сообщения в концентратор сообщений.

Затем я заметил следующее в своих файлах журнала:

ConnectionError: socket disconnected

Я обновил свой код, чтобы добавить retries=5:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl

sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'

# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1

producer = KafkaProducer(bootstrap_servers = app.config['KAFKA_BROKERS_SASL'],
                         sasl_plain_username = app.config['KAFKA_USERNAME'],
                         sasl_plain_password = app.config['KAFKA_PASSWORD'],
                         security_protocol = security_protocol,
                         ssl_context = context,
                         sasl_mechanism = sasl_mechanism,
                         api_version = (0,10),
                         retries=5)

def send_message(message):

    try:
        producer.send(app.config['KAFKA_TOPIC'], message.encode('utf-8'))

        # FIXME sending seems to be unreliable unless we flush
        producer.flush()
    except:
        print("Unexpected error:", sys.exc_info()[0])
        raise

Мой код запускается из приложения python flask. Каждый поступающий запрос определенного типа вызывает метод send_message().

Вот соответствующие строки журнала из Bluemix. Возможно, я пропустил пару строк при копировании и вставке, но, надеюсь, этого достаточно, чтобы понять, что происходит:

APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:54 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (4 attempts left). Error: ConnectionError: socket disconnectedMay 15, 2017 8:50:54 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxxMay 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:54 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (2 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (1 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (0 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:56 PM
APP/1 ConnectionError: socket disconnected May 15, 2017 8:50:56 PM
APP/1 Unable to import 'sasl'. Fallback to 'puresasl'. May 15, 2017 8:51:00 PM
APP/1 Closing active operation May 15, 2017 8:51:01 PM

Моя тема существует. Полный код моего клиента находится здесь: https://github.com/snowch/movie-recommender-demo/blob/effc981cc9f799c41952719619f693172eebcd6a/web_app/app/messagehub_client.py

Любые указатели наиболее ценятся...


person Chris Snow    schedule 15.05.2017    source источник
comment
@chris.snow обновите свой Python Kafka с 1.3.1 до чего-то более нового, например 1.3.3. Они исправили серьезную ошибку sasl в PR github.com/dpkp/kafka-python/pull/. 1003, что, как мне кажется, было причиной этого.   -  person Dominic Evans    schedule 16.05.2017


Ответы (1)


Согласно комментарию Доминика, обновление kafka python до версии 1.3.3 устранило проблему для меня.

person Chris Snow    schedule 20.05.2017