나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Kafka

schema registry HA 구성

devidea 2024. 3. 12. 15:03

이번 글에서는 schema registry를 HA로 설정하는 방법에 대해서 설명한다. 하지만 HA를 구성하기 위한 설정은 생각보다 단순하기 때문에 schema registry에 대해서 대략적으로 소개한다. 그리고 설치가 완료된 schema registry를 사용해서 카프카에 avro 포맷으로 데이터를 보내고 읽는 kafka-avro-console-producer와 kafka-avro-console-consumer를 사용해서 테스트하는 방법까지 해보자.

 

1. 개요

먼저 schema registry에 대해서 간략히 설명한다. 한 문장으로 설명하면 카프카에 기록할 메시지의 데이터 스키마를 관리하고 검증하는 서비스이다. 스키마를 관리하고 검증한다고 표현했는데 관리는 스키마를 기록하고 유지하는 역할이며, 검증은 새롭게 들어오는 데이터의 스키마가 기존에 등록된 스키마와 호환성을 체크하는 역할이다.

 

아래 그림은 confluent 공식 문서에서 가져온 그림이다. 필자가 앞서 말한 스키마의 관리 역할로 프로듀서와 컨슈머에 의해서 스키마를 필요로 할 때, 스키마를 제공한다. 그리고 검증의 역할은 카프카에서 연결된 화살표에 표시된 validate schema로 표현됐는데 카프카에 들어갈 데이터의 스키마가 기존에 등록된 스키마와 호환성이 맞는지 검증한다. 호환성에 맞지 않는 스키마일 경우 카프카에 데이터가 등록되지 않는다.

출처: https://docs.confluent.io/platform/current/schema-registry/index.html

 

호환성은 크게 2개의 타입의 확장으로 이해하면 된다. BACKWARD, FORWARD.

  • BACKWARD
    • 과거 데이터를 최신 스키마로 읽을 경우.
    • 최신 스키마에 추가된 과거에 존재하지 않는 필드를 default 값으로 채운다.
  • FORWARD
    • 최신 데이터를 과거 스키마로 읽을 경우.
    • 과거 스키마에 없는 최신 필드를 제거한다.

스키마는 단로롭게 계속 진화한다고 가정하는데, 바로 전/ 후 단계의 스키마만 호환성을 가져갈지, 더 확장할지에 따라 설정방법이 달라진다. 그리고 물론 BACKWARD, FORWARD를 모두 지원하는 FULL도 있다. 스키마 호환성과 관련해서는 다음 confluent 공식 문서(Schema Evolution and Compatibility for Schema Registry)를 더 참조하자.

 

대략적으로 소개만 드린 schema registry에 대한 설명은 confluent에서 발표한 자료를 살펴보면 도움이 된다.

 

 

 

2. HA 구성

이제 schema registry를 HA 구성하는 방법에 대해서 살펴보자. 이번에 설명할 HA는 하나의 데이터 센터에서 여러 대의 schema registry를 구성한다고 가정한다. HA로 구성됨은 leader와 여러 개의 follower로 서버가 나뉜다고 정의한다. 이 구조는 카프카 브로커의 파티션 리더를 선정함과 동일하다. 아래 그림을 살펴보자.

출처: https://docs.confluent.io/platform/current/schema-registry/multidc.html

 

카프카 브로커의 파티션 리더도 토픽의 해당 파티션으로 쓰고 읽는 모든 역할을 감당한다. 프로듀서, 컨슈머가 모두 파티션 리더 서버에 접근한다. Schema registry도 동일하다. 여러 서버 중에서 하나의 서버가 리더 역할을 하며, 해당 서버가 스키마 처리를 담당한다. 리더 서버가 다운되면 팔로워 서버 중 한대가 리더 역할을 이어받는다.

 

2.1. HA 설정

HA를 설정하는 방법은 단순하다. schema registry에 아래 설정을 추가하면 된다.

schema.registry.group.id={group_id}
leader.eligibility=true

 

카프카 컨슈머를 사용해 본 분이라면 설정에서 group.id라는 값이 눈에 익을 것이다. 컨슈머의 그룹을 구분하는 값으로 동일 그룹으로 띄워진 컨슈머들 사이에 리밸런싱이 일어나기 때문이다. Schema registry도 카프카 컨슈머의 그룹 리더를 선정하는 방식을 그대로 따랐기 때문에 group.id로 표현한 것으로 추측했다.

 

아래 코드는 schema registry의 리더를 선정하는 코드의 구현을 제외한 메서드 정의들만 가져왔다. AbstractCoordinator를 상속받아 abstruct method들을 override 한 것을 볼 수 있다. AbstractCoordinator는 카프카 컨슈머의 코드로서 컨슈머 그룹을 관리하는 코디네이터 관련 코드이다. 카프카 컨슈머에서 그룹 리더를 선정하는 코드가 컨슈머 내부에 존재하는데 schema registry가  컨슈머의 AbstractCoordinator를 상속받아 동일하게 구현했다.

final class SchemaRegistryCoordinator extends AbstractCoordinator implements Closeable {

  @Override
  public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {}

  @Override
  protected void onJoinComplete() {}
  
  @Override
  protected Map<String, ByteBuffer> onLeaderElected() {}
  
  @Override
  protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {}
  
  @Override
  protected synchronized boolean ensureCoordinatorReady(Timer timer) {}

  @Override
  protected boolean rejoinNeededOrPending() {}
}

 

앞서 설정한 schema.registry.group.id 설정 값은 AbstractCoordinator이 포함한 GroupRebalanceConfig 클래스 안에 포함된다. 컨슈머의 group.id와 동일한 값이다.

 

2.2. 카프카 클라이언트(프로듀서, 컨슈머) 설정

schema registry를 사용해서 avro 타입의 데이터를 카프카에 전송하는 코드를 살펴보면 설정에 schema registry url을 등록하게 된다.

 

아래 코드에서 "schema.registry.url" 설정에 schema registry의 url이 등록된다. 샘플 코드에서 살펴보면 2개의 url이 등록됨을 볼 수 있다. HA 구성된 schema registry는 리더 서버가 내려갔을 때, 접근해야 하는 팔로워 서버도 클라이언트가 알아야 하므로 schema registry의 서버들을 모두 포함한 설정을 넣어야 한다.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

props.put("schema.registry.url", "http://localhost:8081,http://localhost2:8081");

KafkaProducer<Long, GenericRecord> producer = new KafkaProducer<>(props);

 

2.3. Follower로 오는 요청 처리

앞서 schema registry로 들어오는 모든 요청을 리더 서버에서 담당하고 있다고 했다. 그럼 팔로워로 요청이 들어왔을 때, schema registry는 어떻게 대응을 할까? 

 

아래 코드는 KafkaSchemaRegistry.java 파일의 isLeader 메서드이다. 해당 메서드로 현재 서버가 리더인지 여부를 판별할 수 있다. 해당 메서드를 호출하는 코드들을 보면 현재 서버가 리더인지 여부를 확인하고, 리더가 아니라면 리더 서버에 요청을 forward 한다. 그래서 팔로워 서버로 요청을 하더라도 리더가 처리를 하게 된다.

public boolean isLeader() {
    kafkaStore.leaderLock().lock();
    try {
      if (leaderIdentity != null && leaderIdentity.equals(myIdentity)) {
        return true;
      } else {
        return false;
      }
    } finally {
      kafkaStore.leaderLock().unlock();
    }
  }

 

 

3. 결론

schema registry는 카프카로 유입되는 데이터 스키마의 관리를 위해서 더욱 중요성이 커지고 있다. 운영 환경에서 schema registry를 사용하기 위해서는 HA 구성이 필수이다. 그래서 HA 설정에 대해서 찾아봤는데, schema registry 내부에서 리더 선정을 위해서 카프카 컨슈머의 코드를 활용했다는 점이 흥미로웠다. 컨슈머 코디네이터 관리에 대해서는 더 찾아볼 주제로 생각된다.

 

 

 

참고 문서

반응형

'Big Data > Kafka' 카테고리의 다른 글

Kafka Connect docker image 만들기  (0) 2024.07.19
[Kafka] Producer Partitioner 변천사 (no key)  (0) 2023.05.16
[ksqlDB] concepts  (0) 2022.12.22
[ksqlDB] High Availability를 위한 설정  (0) 2022.12.17
MirrorMaker2 소개 발표  (1) 2022.04.15