나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Kafka

[Kafka] Connector-level producer/consumer configuration overrides

devidea 2020. 7. 21. 17:06

Kafka 2.3.0 버전에서는 Connect(이하 커넥트) 관련 개선사항이 많았다. 관련 내용을 계속 정리해 갈 예정인데, 첫 번째 글로 KIP-458: Connector Client Config Override Policy에 대해서 정리한다.

KIP(Key Improvement Proposals) 문서 제목에서 알 수 있듯이 Connector에 포함된 Client(Producer, Consumer)의 설정을 override 할 수 있게 되었다.

 

필자가 해당 내용에 대해서 관심을 갖게 된 이유는

커넥트에 포함된 Sink Connector들의 Source 카프카 클러스터가 하나로만 유지할 수 있어 여러 카프카 클러스터를 보유했을 경우, 커넥트 클러스터도 여러개를 구축해야 했다.

그런데 커넥트의 컨슈머 설정을 Task 별로 변경할 수 있다면 하나의 커넥트 클러스터로 여러 카프카 클러스터의 데이터를 소비할 수 있다.

 

그럼 Override 방법에 대해 알아보자.

 

1. consumer 설정을 바꿔서 Connector 실행

적용하는 방법은 간단하다. Connector를 실행하는 Restful API에 설정값만 추가하면 된다.

다만 설정값의 규칙이 있다.

  • 'producer.override' : source connector가 사용하는 프로듀서 혹은 sink connector의 DLQ 프로듀서 (DLQ에 대해서는 나중에 다른 글로 정리하겠다)

  • 'consumer.override' : sink connector 사용 consumer

  • 'admin.override' : sink connector에 의해 만들어지는 DLQ 토픽

필자의 관심사는 sink connector의 boostrap.servers 설정을 바꾸는데 있으므로 아래의 예와 같이 API를 구성한다.

API의 예는 실제로 수행한 API가 아니라 confluent 블로그 글에서 참조했다. (테스트를 했지만 노출되지 않을 정보가 많아서 예만 든다)

curl -i -X PUT -H  "Content-Type:application/json" \
      http://localhost:8083/connectors/sink-elastic-orders-01-latest/config \
      -d '{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics": "orders",
  "consumer.override.auto.offset.reset": "latest",
  "consumer.override.bootstrap.servers": "test-broker01:9092,test-broker02:9092,test-broker03:9092"
  "tasks.max": 1,
  "connection.url": "http://elasticsearch:9200",  "type.name": "type.name=kafkaconnect",
  ...
}'

그렇게 바꿔서 실행을 했더니 필자는 다음과 같은 오류를 받았다.

Connect 설정 override 호출 error

 

오류 내용을 간단히 해석하자면 'None' 정책은 bootstrap.servers 설정을 바꿀 수 없단다.

사실 설정을 바꾸려면 커넥트의 정책을 변경해야 한다.

 

 

2. 커넥트 정책 변경

정책은 다음 3가지로 구분된다. 참고로 설정 변경 대상은 producer, consumer, admin이다.

  • None : 기본값. 변경을 허용하지 않는다.

  • Principal : security.protocol, sasl.jaas.config, sasl.mechanism prefix를 가진 설정값만 변경 가능.

  • All : 모든 설정 변경 가능

 

그래서 커넥트의 설정을 바꿔줬다. 다음과 같이 설정하면 된다.

connector.client.config.override.policy=All

connector.client.config.override.policy=All

 

커넥트의 설정을 바꾸고 1번 단계에서 수행한 API 호출을 다시 했더니, 다른 카프카 클러스터의 데이터가 소비됨을 알 수 있었다.

다만 커넥트의 설정이 카프카의 특정 토픽에 기록되는데, 여러 클러스터를 대상으로 활용하다 보면 관리 차원에서 부담이 될 수 있다.

 

그런 생각은 Github pull request 논의에서도 볼 수 있다.

향후에 커넥트의 설정 변경을 어디까지 허용하고, 관리 차원의 논란도 어떻게 정리해 갈 지 앞으로의 업데이트의 내용도 관심있게 봐야겠다.

 

 

관련 문서

 

반응형