[NIO] WatchService

Language/Java 2020. 7. 28. 15:06 Posted by 사용자 devJoshua

이번 글에서는 NIO에 포함된 class 중에서 WatchService에 대해서 정리하고자 한다.

 

WatchService는 별도의 thread로 등록된 파일 혹은 디렉토리의 변경사항을 감지해서 이벤트로 리턴한다. 

보안 모니터링 및 속성 파일 변경 등 여러 작업에 대한 알림 용도로 사용할 때 유용하다.

 

예제 코드를 통해서 자세히 살펴보자.

  1. WatchService는 FileSystem.newWatchService() 메서드를 통해 생성한다. 생성한 WatchSservice와 감지하고자 하는 종류(ENTRY_MODIFY - 수정)를 등록한다.

  2. 무한루프에서 감지를 한다.

  3. WathService의 take() 메서드로 변경사항이 발생한 WatchKey를 찾을 때까지 기다린다. WatchKey를 얻으면 WatchEvent를 가져온다.

  4. WatchEvent가 ENTRY_MODIFY인 것을 찾는다.

  5. key를 리셋한다.

위와 같은 과정을 무한히 반복하면서 등록된 파일 혹은 디렉토리를 감지한다.

이벤트의 다른 종류로서 ENTRY_CREATE, ENTRY_DELETE, OVERFLOW (잃거나 버려짐)

 

WatchService가 동작하는 방식이 궁금해서 추가적으로 구현체가 있는 PollingWatchService를 살펴보았다.

WatchService를 생성할 때 newSingleThreadScheduledExecutor로 새로운 Thread를 생성했다.

PollingWatchService() {
	// TBD: Make the number of threads configurable
    scheduledExecutor = Executors
        .newSingleThreadScheduledExecutor(new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 Thread t = new Thread(null, r, "FileSystemWatcher", 0, false);
                 t.setDaemon(true);
                 return t;
             }});
}

그리고 Thread는 감지하고자하는 디렉토리를 등록하는 register 함수를 호출할 때 시작했다.

감지는 10초 단위로 이루어졌다.

this.poller = scheduledExecutor
    .scheduleAtFixedRate(thunk, period, period, TimeUnit.SECONDS);

 

변경사항이 발생한 것을 확인해야 하기에 감지대상이 되는 파일들을 내부적으로 캐싱을 하고 있었다.

 

WatchService에 대한 추가적인 정보는 참고문서로 올려둔 자료를 참고하면 된다.

 

 

참고문서

 

'Language > Java' 카테고리의 다른 글

[NIO] WatchService  (0) 2020.07.28
[multi thread] CountDownLatch  (0) 2020.06.04
[multi thread] Semaphore  (0) 2020.06.02
[Java9] Collections, Streams  (0) 2020.03.13
Proxy를 통한 Restful API 호출 by Java  (0) 2019.12.09
[Java8] yyyyMMddHHmmssSSS LocalDateTime parse 오류  (0) 2019.07.11

댓글을 달아 주세요

[Kafka] Add connector contexts to Connect worker logs

Big Data/Kafka 2020. 7. 22. 13:55 Posted by 사용자 devJoshua

Kafka Connect(이하 커넥트) 2.3.0 개선사항에 대한 두 번재 글이다. Logging 개선에 대해서 살펴본다.

 

기존 커넥트의 로그는 커넥트에 포함된 작업의 로그들이 구분되지 않고 뒤섞여서 확인하기 쉽지 않았다. 특히 리밸런스 과정이 일어나면 커넥트가 담당하는 파티션들이 변경되는데 로그가 구분이 안되니 어떤 작업이 진행중인지 명확하게 알 수 없었다.

 

KIP-449: Add connector contexts to Connect worker logs을 통해서 커넥트에 포함된 작업들의 로그를 쉽게 분리해서 볼 수 있다.

 

적용하는 방법은 간단하다. 기존 connect log4j 설정의 패턴을 다음과 같이 바꾸면 된다.

# 과거 버전 설정
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

# 변경된 설정
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

 

더보기

참고사항

%d : 로깅 이벤트가 발생한 시간을 출력 ex)포맷은 %d{HH:mm:ss} 같은 형태의 SimpleDateFormat

%p : debug, info, warn, error, fatal 등의 priority 출력

%m : 로그내용 출력

%c : 카테고리 출력 ex)카테고리가 a.b.c 처럼 되어있다면 %c{2}는 b.c 출력

%L : 로깅이 발생한 caller의 라인수 출력

%n : 플랫폼 종속적인 개행문자 출력

 

작업별 내용을 변경해서 표시해 주는 패턴은 다음 설정으로 이루어진다.

%X{connector.context}

 

그럼 로그를 실제로 다음과 같은 포맷으로 기록된다.

[<connectorName>|<scope>]<sp>

connectorName: 커넥터 이름, sp: 후행공간이다.

 

scope 항목은 좀 더 자세히 살펴보자.

  • task-<n>: 0부터 시작하는 번호가 붙여진 작업.

  • task-<n>|offset : 번호가 붙여진 작업에 대한 source 오프셋 커밋.

  • worker : connector 인스턴스 생성 및 사용

로그 기록 내용을 대략적으로 알 수 있도록 KIP-449에 예로 나타낸 로그의 일부를 가져왔다.

[2019-04-02 17:01:38,314] INFO [local-file-source|worker] ConnectorConfig values:
[2019-04-02 17:01:38,315] INFO [local-file-source|worker] Creating connector local-file-source of type FileStreamSource (org.apache.kafka.connect.runtime.Worker:227)
[2019-04-02 17:01:38,317] INFO [local-file-source|worker] Instantiated connector local-file-source with version 2.3.0-SNAPSHOT of type class org.apache.kafka.connect.file.FileStreamSourceConnector (org.apache.kafka.connect.runtime.Worker:230)
...
[2019-04-02 17:01:38,320] INFO [local-file-source|task-0] Creating task local-file-source-0 (org.apache.kafka.connect.runtime.Worker:395)
[2019-04-02 17:01:38,320] INFO [local-file-source|task-0] ConnectorConfig values:
[2019-04-02 17:01:38,355] INFO [local-file-source|task-0] Kafka commitId: da2bddb1331e740c (org.apache.kafka.common.utils.AppInfoParser:110)
...
[2019-04-02 17:01:40,901] INFO [local-file-source|task-0|offsets] WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)

Connector의 이름과 task 번호를 구분해서 로그를 기록했기 때문에 추후 특정 Connector에 대한 로그만 뽑아서 확인한다거나 리밸런싱 동작 과정 등도 추론해 볼 수 있다.

 

 

관련 문서

 

댓글을 달아 주세요

Kafka 2.3.0 버전에서는 Connect(이하 커넥트) 관련 개선사항이 많았다. 관련 내용을 계속 정리해 갈 예정인데, 첫 번째 글로 KIP-458: Connector Client Config Override Policy에 대해서 정리한다.

KIP(Key Improvement Proposals) 문서 제목에서 알 수 있듯이 Connector에 포함된 Client(Producer, Consumer)의 설정을 override 할 수 있게 되었다.

 

필자가 해당 내용에 대해서 관심을 갖게 된 이유는

커넥트에 포함된 Sink Connector들의 Source 카프카 클러스터가 하나로만 유지할 수 있어 여러 카프카 클러스터를 보유했을 경우, 커넥트 클러스터도 여러개를 구축해야 했다.

그런데 커넥트의 컨슈머 설정을 Task 별로 변경할 수 있다면 하나의 커넥트 클러스터로 여러 카프카 클러스터의 데이터를 소비할 수 있다.

 

그럼 Override 방법에 대해 알아보자.

 

1. consumer 설정을 바꿔서 Connector 실행

적용하는 방법은 간단하다. Connector를 실행하는 Restful API에 설정값만 추가하면 된다.

다만 설정값의 규칙이 있다.

  • 'producer.override' : source connector가 사용하는 프로듀서 혹은 sink connector의 DLQ 프로듀서 (DLQ에 대해서는 나중에 다른 글로 정리하겠다)

  • 'consumer.override' : sink connector 사용 consumer

  • 'admin.override' : sink connector에 의해 만들어지는 DLQ 토픽

필자의 관심사는 sink connector의 boostrap.servers 설정을 바꾸는데 있으므로 아래의 예와 같이 API를 구성한다.

API의 예는 실제로 수행한 API가 아니라 confluent 블로그 글에서 참조했다. (테스트를 했지만 노출되지 않을 정보가 많아서 예만 든다)

curl -i -X PUT -H  "Content-Type:application/json" \
      http://localhost:8083/connectors/sink-elastic-orders-01-latest/config \
      -d '{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics": "orders",
  "consumer.override.auto.offset.reset": "latest",
  "consumer.override.bootstrap.servers": "test-broker01:9092,test-broker02:9092,test-broker03:9092"
  "tasks.max": 1,
  "connection.url": "http://elasticsearch:9200",  "type.name": "type.name=kafkaconnect",
  ...
}'

그렇게 바꿔서 실행을 했더니 필자는 다음과 같은 오류를 받았다.

Connect 설정 override 호출 error

 

오류 내용을 간단히 해석하자면 'None' 정책은 bootstrap.servers 설정을 바꿀 수 없단다.

사실 설정을 바꾸려면 커넥트의 정책을 변경해야 한다.

 

 

2. 커넥트 정책 변경

정책은 다음 3가지로 구분된다. 참고로 설정 변경 대상은 producer, consumer, admin이다.

  • None : 기본값. 변경을 허용하지 않는다.

  • Principal : security.protocol, sasl.jaas.config, sasl.mechanism prefix를 가진 설정값만 변경 가능.

  • All : 모든 설정 변경 가능

 

그래서 커넥트의 설정을 바꿔줬다. 다음과 같이 설정하면 된다.

connector.client.config.override.policy=All

connector.client.config.override.policy=All

 

커넥트의 설정을 바꾸고 1번 단계에서 수행한 API 호출을 다시 했더니, 다른 카프카 클러스터의 데이터가 소비됨을 알 수 있었다.

다만 커넥트의 설정이 카프카의 특정 토픽에 기록되는데, 여러 클러스터를 대상으로 활용하다 보면 관리 차원에서 부담이 될 수 있다.

 

그런 생각은 Github pull request 논의에서도 볼 수 있다.

향후에 커넥트의 설정 변경을 어디까지 허용하고, 관리 차원의 논란도 어떻게 정리해 갈 지 앞으로의 업데이트의 내용도 관심있게 봐야겠다.

 

 

관련 문서

 

댓글을 달아 주세요

카프카 관리자 업무를 진행하면서 가장 많이 받는 질문 중 하나는 "파티션 개수는 몇 개가 적당합니까?"이다.

그래서 이번 글은 Confluent 공동 창업자 중 한명인 Jun Rao가 쓴 How to choose the number of topics/partitions in a Kafka cluster? 라는 블로그 글을 인용해서 파티션 숫자와 관련하여 정리한다.

 

필자의 글은 Jun Rao의 글을 바탕으로 이해한 부분들을 풀어서 썼기 때문에 해당 주제에 관심이 있는 분들은 Jun Rao의 원문을 다시 읽어보시길 추천한다.

 

많은 파티션은 높은 처리량을 이끈다.

카프카에 기본적인 지식을 가지고 계신 분이라면 카프카가 분산 처리의 목적으로 설계되었음을 안다. 카프카의 분산 처리를 가능하게 하는 핵심 개념이 토픽의 파티션이다. 프로듀서와 컨슈머는 여러 파티션에 동시에 접근해서 데이터를 처리하기 때문에 파티션이 많다면 처리량이 높아진다.

 

파티션 수를 결정하는 공식은 처리량에 기반으로 한다. 프로듀서(p), 컨슈머(c)의 한 파티션이 요구하는 처리량을 기준으로 파티션 수를 결정한다. 프로듀서의 처리량에 영향을 끼치는 요소로  배치 사이즈, 압축 코덱, Acks 타입, 복제 수 등이 있다. 2014년에 링크드인에서 수행한 벤치마크를 보면 한 파티션에 10MB/sec까지 속도가 나온다는 자료가 있다. 반면에 컨슈머는 컨슈머가 데이터를 처리하는 로직에 따라 처리량이 바뀐다.

 

기본적으로 파티션 수를 결정할 때, 1~2년 후의 처리량을 고려해서 결정한다. 또한 현재 카프카 클러스터의 처리량 대비 파티션 수를 결정하고 클러스터의 서버가 늘어나면 파티션을 늘리고 파티션을 적당히 분배시켜 줄 수 있다.

 

[메세지 Key 관련 추가 참고사항]

더보기

프로프로듀서에서 Key를 사용하면 파티션을 늘릴 때 고려해야 할 사항이 있다. key를 포함해서 카프카에 데이터를 보내면 key의 해시값을 기준으로 파티션이 결정된다. 만약 컨슈머가 처리하는 데이터가 Key를 기준으로 동일한 파티션에 위치해야 한다면 문제가 발생할 수 있다.

 

아래에서는 처리량 이외에 파티션 수를 결정할 때 고려해야 할 요소들을 다뤄보도록 하자. 계속 살펴보겠지만 너무 많은 파티션 수를 부정적인 영향을 끼칠 수 있다.

 

많은 파티션은 많은 파일 핸들러의 오픈이 필요하다 (More Partitions Requires More Open File Handles)

카프카 클러스터의 파티션은 개별적으로 디렉토리를 가진다. log segment 당 2개의 파일(index, actual data)을 생성한다. 각 브로커는 모든 log segment의 index, data 파일 핸들을 엽니다. 파티션이 너무 많다면 OS의 파일 핸들 제한 설정을 바꿔야 할 수 있다.

 

많은 파티션은 비가용성이 높아진다 (More Partitions May Increase Unavailability)

카프카는 내부적으로 파티션을 복제한다. 프로듀서와 컨슈머는 파티션의 리더 브로커와 통신을 하고 팔로워 브로커가 리더 브로커로부터 데이터를 가져와서 복제한다. 만약 브로커가 다운되면 해당 브로커가 담당하는 리더 파티션을 사용하지 못하게 된다. 카프카는 자동적으로 다른 팔로워에게 리더 역할을 맞긴다. 다른 리더를 선출하는 역할은 컨트롤러라 부르는 브로커 중 한 대의 서버에서 한다. 파티션의 메타 데이터는 주키퍼에 존재한다.

 

브로커를 정상적으로 종료할 때는 컨트롤러가 파티션 리더를 순차적으로 이동한다. 리더 이동은 몇 밀리초 밖에 소요되지 않는다. 그렇지만 브로커가 비정상 종료되었을 때는 상황이 다르다. 비가용성은 파티션 수에 비례해서 늘어난다. 200개의 파티션에 2개의 복제를 가진다고 하자. 대략적으로 1000개의 파티션 리더가 존재한다. 한 파티션의 새로운 리더를 결정하는데 5ms가 걸린다면 1000개의 파티션 리더를 결정할 때 5초가 소요된다. 클라이언트에서는 문제가 발생할 수 있는 시간이 될 수 있다.

만약 비정상 종료된 브로커가 컨트롤러 역할을 한다면 문제가 더 심해진다. 새로운 컨트롤러를 먼저 선출해야 하며, 새로운 컨트롤러는 주키퍼로부터 파티션 메타 데이터를 새로 받아와야 한다. 10000개의 파티션이 존재하고 한 파티션 메타 데이터를 가져오는데 2ms가 걸린다면 20초의 비가용 시간이 발생한다.

 

브로커의 비정상 종료는 일반적인 상황은 아니다. 그래도 파티션 수와 연관된 문제이니 한 브로커당 최대 파티션 숫자를 고려해서 관리해야 한다. 한 브로커 당 2000~4000개의 파티션, 전체 클러스터에 10000개로 제한하는 것을 추천한다. (다만 클러스터 숫자를 보여주지 않았고 현재는 컨트롤러의 재기동 시간이 2015년 시점에 비해 개선되었다. 관련 내용은 필자의 [Kafka] 컨트롤러 분석 글을 참고하자)

 

많은 파티션은 처리(대기)시간이 늘어난다 (More Partitions May Increase End-to-end Latency)

end-to-end latency (이하 처리시간)은 프로듀서가 보낸 데이터가 컨슈머에 이르기까지의 시간으로 하자. 컨슈머는 카프카에 커밋된 데이터만을 가져올 수 있다. 커밋되었다는 것은 모든 브로커에 복제가 완료되었음을 나타낸다. 브로커는 한 스레드에 의해서 복제를 처리한다 (기본 설정). 실험 상 1000개의 파티션을 다른 브로커에 복제할 때 20ms의 시간이 걸렸다. 이것은 컨슈머가 데이터를 가져가는데 20ms의 대기시간이 있음을 의미한다.

브로커가 많은 큰 클러스터에서 이 문제는 완화가 된다. 파티션 리더가 1000개이고 10개의 다른 브로커가 있다고 가정하자. 나머지 10개의 브로커는 각각 100개의 파티션만 가져오면 된다. 따라서 커밋으로 인한 대기 시간은 수십 ms가 아니라 몇 ms에 불과하다.

 

그래서 클러스터에 여러 개의 브로커가 존재하면 복제 시간은 줄어든다. 다음 수식으로 각 브로커당 파티션 수 최대치를 제안한다.

100 * b (브로커 수) * r (복제 수)

 

많은 파티션은 많은 클라이언트 메모리를 필요로한다 (More Partitions May Require More Memory In the Client)

0.8.2 버전에서 프로듀서의 주요한 기능 중 하나로서 사용자가 메시지를 버퍼링하는데 사용되는 메모리 상한을 설정할 수 있다는 것이다. 프로듀서는 파티션 당 메세지를 버퍼를 가진다. 충분한 시간이 지나면 버퍼에 누적된 메세지가 브로커에 전송된다. 파티션이 늘어나게 되면 Producer가 사용하는 메모리양이 제한을 초가할 수 있다.

Consumer도 비슷한 문제가 있다. Consumer는 파티션당 메세지를 일괄로 가져온다. 더 많은 파티션을 소비하려면 많은 메모리가 필요하다.

 

 

파티션 수 결정과 관련하여 정리를 했다. 카프카를 통해 서비스를 개발하는 입장에서는 처리량을 높이기 위해 파티션을 무조건 높여야 한다고 생각할 수 있다. 그러나 카프카 관리자 측면에서는 다른 측면도 고려해야 한다. 명확한 정답은 없다. 하지만 처리량과 카프카 운영을 위해 파티션 수를 상황에 맞춰 관리하는 것 만이 정답으로 보인다.

 

 

관련 문서

댓글을 달아 주세요

[Kafka] Connect distributed mode 참고사항

Big Data/Kafka 2020. 7. 7. 16:08 Posted by 사용자 devJoshua

이번글은 Kafka Connect를 distributed mode로 사용할 때 필자가 실수했던 내용을 공유하는 목적이다.

 

Kafka Connect는 Standalone, Distributed 2가지의 mode가 있다.

테스트나 로컬에서 데이터를 이동할 때는 Standalone을 사용하지만 대부분의 운영 상황에서는 Distributed mode를 사용하게 된다.

 

필자의 실수에 대한 내용을 간략히 기술하면 다음과 같다.

  1. Custom한 Connector를 개발한 이후, Connect에 plugin을 배포하고 실행했다.

  2. REST API로 하나의 Job을 실행했다. 정상적으로 동작했다.

  3. 여러개의 Job의 동작 유무를 확인하기 위해 추가로 Job을 실행했다.

  4. 에러가 발생

 

에러의 내용은 다음과 같다.

org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Cannot complete request because of a conflicting operation (e.g worker rebalance)

 

새로운 Job 실행 요청에 대해 충돌이 발생해서 완료되지 않았고 충돌의 예로 리밸런스가 있단다.

 

그래서 필자가 만든 Connector이 각 파티션의 설정을 사용하는 로직이 있는데 거기서 버그가 있나 의심했다.

그런데 다시 생각해보니 리밸런스는 여러 Connect들에서 워커를 조정하면서 발생할 텐데 새로 만든 Connector가 파티션을 할당 받기 전에 발생한 문제 같았다.

 

Connect의 설정의 문제가 있나 의심하기 시작했고, 구글링으로 multi Job을 돌렸을 때 발생하는 케이스가 있는지 찾아봤다.

 

평소에 가끔 들어가보던 블로그에서 Connect 관련 내용이 있었다. 해당 블로그는 Confluent에 소속된 개발자이다.

블로그에서 주요하게 설명하는 설정은 rest.advertised.host.name 이었다.

 

Distributed mode로 동작하면 여러 Connect들이 서로 통신하며 Job을 분배한다.

그때 다른 Connect가 접근을 할 수 있도록 hostname이 설정되어 있어야 한다.

필자가 경험한 문제는 Job 실행 이후 다른 Connect가 실행한 Job에 대해 정보를 가져가지 못하면서 발생한 것으로 추측됐다.

 

그래서 rest.advertised.host.name, rest.advertised.host.port 2개 정보를 각 Connect에 모두 추가했다.

그 이후에 multi Job을 실행했을 때 정상적으로 동작했다.

 

필자는 기존에 Connect의 plugin 만드는 부분에 대해서만 검토를 했었는데, Distributed mode에서 Connect들이 서로 어떤 통신을 무엇을 위해 하는지 더 찾아봐야 겠다고 생각했다.

추후 해당 내용에 대해서 공부를 더 하면 정리하는 글을 써보도록 하겠다.

 

 

참고 문서

댓글을 달아 주세요

Parquet (파케이)

Big Data/Hadoop 2020. 6. 23. 09:41 Posted by 사용자 devJoshua

이번 글은 하둡 생태계에서 많이 사용되는 파일 포맷인 Parquet (이하 파케이)에 대해 정리한다.

글의 포함된 설명 중 많은 부분은 하둡 완벽 가이드에서 발췌한 것임을 밝힌다.

 

파케이는 columnar 저장 포맷이다. 구글에서 발표한 논문 Dremel: Interactive Analysis of Web-Scale Datasets를 토대로 Twitter, Cloudera에서 같이 개발했다.

columnar 포맷을 기존에 많이 사용하던 Row 기반 포맷과 비교하면 이해하는데 도움이 된다.

Columnar vs Row-based

전통적인 row 기반 저장 방식은 A1, B1, C1과 같이 같은 row 값이 연속적으로 저장된다.

반면에 columnar 저장 방식은 A1, A2, A3과 같이 컬럼의 데이터가 연속된 구조로 저장한다.

 

columnar 저장 방식을 개발하게 된 배경은 무엇일까?

 

파케이 개발에 참여한 Twitter는 대표젹인 SNS 회사로서 대량의 데이터를 HDFS에 저장하고 있었다. HDFS가 낮은 가격의 장비에 빅데이터를 저장하는 용도로 만들어졌다 하더라도 SNS의 데이터를 저장할 때 너무 많은 디스크를 소모하므로 데이터 사이즈를 줄이는 파일 포맷 개발이 필요했다. 그래서 파케이 구조를 살펴보면 작은 파일 사이즈와 낮은 I/O 사용을 목적으로 개발되었음을 알 수 있다.

 

파케이의 장점을 먼저 살펴보고, 어떻게 장점이 나타났는지 파케이 아키텍처를 분석하자.

 

장점

  • 압축률이 좋다. 컬럼 단위로 구성하면 데이터가 더 균일하므로 압축률이 높아진다.

  • 데이터를 가져갈 때, 전체 컬럼 중에서 일부 컬럼을 선택해서 가져가면 I/O가 많이 줄어든다. 컬럼단위로 데이터가 저장되어 선택되지 않은 컬럼의 데이터에서는 I/O가 발생하지 않기 때문이다.

  • 컬럼에 동일한 데이터 타입이 저장되기 때문에 컬럼별로 적합한(데이터형에 유리한) 인코딩을 사용할 수 있다.

장점으로 기술된 압축률, I/O가 줄어드는 부분은 전적으로 columnar 구조이기 때문에 가능하다. 컬럼 단위로 데이터를 저장하기에 비슷한 데이터가 나와서 압축의 효과도 높아지고, 일부 컬럼들만 가져오기도 용이한 것이다.

 

 

그럼 본격적으로 파케이의 구조를 살펴보자.

 

자료형

각 필드는 반복자 (required, optional, repeated), 자료형, 이름으로 되어 있다.

필드에 사용 가능한 기본 자료형은 다음과 같다.

타입 설명
boolean 바이너리 값
int32 부호 있는 32비트 정수
int64 부호 있는 64비트 정수
int96 부호 있는 96비트 정수
float 단정밀도(32비트) IEEE 754 부동소수점 숫자
double 배정밀도(64비트) IEEE 754 부동소수점 숫자
binary 순차 8비트 부호 없는 바이트
fixed_len_byte_array 고정길이 8비트 부호 없는 바이트

주목한 점은 기본 문자열 자료형이 없다. 대신 기본 자료형에 대한 해석 방식을 정의한 논리 자료형을 정의한다.

문자열은 UTF8 어노테이션을 가진 binary 기본 자료형으로 표현된다.

 

자료형에서 깊게 살펴볼 부분은 논리 자료형을 반복자, 그룹을 사용해 논리적으로 구성하는데 있다.

간단한 타입으로 LIST, MAP을 구현하지 쉬지 않은데, 명세 수준과 반복 수준을 사용해서 중첩 구조로 구현했다. 해당 구조가 구글의 Dremel 논문에 포함된 개념이다.

 

말로 이렇게 설명하면 이해하기 쉽지 않은데, LIST, MAP를 적용한 스키마를 살펴보자. 그리고 데이터 예제를 통해 명세 수준과 반복 수준이 어떻게 표현되는지 보자.

# List
message m {
    required group a (LIST) {
        repeated group list {
            required int32 element;
        }
    }
}

# Map
message m {
    required group a (MAP) {
        repeated group key_value {
            required binary key (UTF8);
            optional int32 value;
        }
    }
}

LIST와 MAP은 두 단게 그룹 구조로 만든다. LIST는 element 필드를 가진 반복 그룹으로 표현했다. 동일 타입의 필드가 여러개 있는 필드 list를 가지도록 했다.

MAP도 비슷하게 구현되어 있는데, 키-값 구조이기 때문에 key_value 필드에 key, value가 각각 필드로 포함되었다.

 

구조를 표현하는 대략적인 개념은 익혔는데, 명세 수준과 반복 수준은 무엇인지 어떻게 표현되는지 다른 예를 살펴보자.

다음과 같은 파케이 스키마가 있다고 하자.

message ExampleDefinitionLevel {
    optional group a {
        optional group b {
            optional string c;
        }
    }
}

값이 없는 단층 레코드는 null을 사용하고 중첩이나 반복 수준이 올라가면 null이 아닌 값을 사용해서 비트 필드를 인코딩하는 일반적인 기법으로 명세 수준과 반복 수준을 저장한다.

위 구조에 해당하는 예시 값과 명세 수준을 보자. optional로 정의한 필드의 값 유뮤에 따라 명세 수준이 달라진다.

반복 수준은 LIST의 시작값이 어느 곳인지 나타내는 마커의 역할을 한다.

 

그럼 데이터는 실제 파일로 어떻게 저장될까?

 

파일 구조

파케이 파일은 헤더, 하나 이상의 블록, 꼬리말 순으로 구성된다. 헤더는 파케이 포맷의 파일임을 알려주는 4바이트 매직 숫자인 PAR1만 포함하고 있다. 파일의 모든 메타데이터는 꼬리말에 저장된다.

꼬리말 데이터는 포맷버전, 스키마, 추가 키-값 쌍, 파일의 모든 블록에 대한 메타데이터와 같은 정보를 포함하고 있다.

파케이 파일 구조

파케이 파일의 각 블록은 행 그룹을 저장한다. 행 그룹은 행에 대한 컬럼 데이터를 포함한 컬럼 청크로 되어 있다. 각 컬럼 청크의 데이터는 페이지에 기록된다.

각 페이지는 동일한 컬럼의 값만 포함하고 있다. 따라서 페이지에 있는 값은 비슷한 경향이 있기 때문에 페이지를 압축할 때 매우 유리하다.

 

파케이 블록, 페이지

 

데이터의 가장 최소 단위인 페이지에는 동일 컬럼의 데이터만 존재한다. 그래서 인코딩/ 압축을 할 때, 페이지 단위로 수행하면 된다.

 

위 구조가 머리 속에 그려지면 파케이 파일을 만들 때의 설정 값이 이해된다.

 

 

파케이 설정

속성 명 타입 기본값 설명
parquet.block.size int 128MB 블록의 바이트 크기 (행 그룹)
parquet.page.size int 1MB 페이지의 바이트 크기
parquet.dictionary.page.size int 1MB 일반 인코딩으로 돌아가기 전의 사전의 최대 허용 바이트 크기
parquet.enable.dictionary boolean true 사전 인코딩 사용 여부
parquet.compression String UNCOMPRESSED

파케이 파일에서 사용할 압축 종류
 - UNCOMPRESSED, SNAPPY, GZIP, LZO

블록 크기를 설정할 때 스킨 효율성과 메모리 사용률 사이의 트레이드오프 관계를 고려해야 한다. 블록의 크기를 높이면 더 많은 행을 가지므로 순차 I/O의 성능을 높일 수 있어 효율적으로 스캔할 수 있다. 하지만 개별 블록을 읽고 쓸 때 모든 데이터가 메모리에 저장되어야 하기 때문에 너무 큰 블록을 사용하는 것은 한계가 있다.

파이케 블록과 HDFS 블록을 동일하게 설정하는 것이 일반적이며, 실제 두 블록의 기본값은 128MB 이다.

 

패이지는 파케이 파일의 최소 저장 단위로, 원하는 행을 읽기 위해서는 그 행을 포함한 페이지의 압축을 해제하고 디코딩해야 한다. 단일 행 검색은 페이지가 작을수록 효율적인데, 원하는 값을 찾기 전에 읽어야 하는 값이 더 적어지기 때문이다. 하지만 페이지의 크기가 작으면 필요한 페이지의 수가 늘어남으로써 발생하는 추가적인 메타데이터(오프셋, 사전)로 인해 저장 용량과 처리 시간이 증가하는 단점이 있다.

 

설정 내용 중에 위 설명에서 언급하지 않은 사전이 있다.

사전은 데이터 인코딩 시에 사용한다. 값의 사전을 만들어 인코딩한 후 사전의 인덱스를 나타내는 정수로 그 값을 저장한다.

파케이는 파일을 기록할 때 컬럼의 자료형을 기준으로 적합한 인코딩을 자동으로 선택한다. boolean을 제외한 대부분의 자료형은 사전 인코딩을 주로 이용한다. 사전의 크기는 페이지의 크기와 관련이 있다. 이유는 한 페이지에 사전이 모두 들어가야 하기 때문인데, 페이지 크기보다 사전이 커지면 일반 인코딩(값을 그대로 기록)으로 대체된다.

 

파케이의 이론적인 부분을 설명했는데, 실제로 파케이 파일을 만들어보고 만든 파일을 확인하는 실습을 해보자.

 

 

파케이 파일 만들기

파케이를 만들려면 일단 스키마가 필요하다. 파케이는 스키마를 포함한 데이터 구조이기 때문이다.

필자는 Avro 프로코톨을 통해서 파케이 파일을 만들 예정이다. Avro를 사용해서 파케이 파일을 만들 경우, AvroParquetWriter를 사용하면 된다.

 

아래 코드를 보자.

 

 

AvroParquetWriter에 전달할 avro schema를 먼저 만든다. schema는 2개의 string, 1개의 int 타입을 갖도록 간단히 구성했다.

그리고 AvroParquetWriter.builder를 통해 미리 만든 schema를 전달했다.

그리고 옵션들을 하나씩 설정하는데 빌더 패턴으로 되어 있으며, 모든 옵션 설정이 끝나면 build로 writer를 생성한다.

 

생성한 writer는 GenericRecord 타입으로 데이터를 입력받게 했는데, GenericRecord는 avro schema에 의해 만들어진 데이터이다.

임의의 값으로 record를 생성하고 AvroParquetWriter.write(record) 함수를 통해 데이터를 넣는다.

 

데이터를 다 넣었으면 writer를 close 함으로써 파케이 파일을 만든다.

 

파케이 파일 검토

생성된 파케이 파일을 확인하기 위해 parquet-tools을 사용한다.

parquet-tools은 parquet 모듈에 포함된 것으로서 CLI를 통해 파일의 스키마, 메타, 데이터를 확인할 수 있게 한다.

 

GitHub: apache/parquet-mr을 빌드해서 만든 jar 파일로 실행하면 되는데, 필자는 mac 환경에서 brew로 간편하게 설치했다.

// https://formulae.brew.sh/formula/parquet-tools
// parquet-tools 설치
brew install parquet-tools

parquet-tools schema {file}
parquet-tools meta {file}
parquet-tools head {file

위와 같이 brew로 parquet-tools을 간편하게 설치 가능하다.

 

그럼 parquet-tools로 위에서 만든 파케이 파일을 확인해 보자.

먼저 schema 부터 확인하자.

parquet-tools schema

schema를 살펴보면 정의한 대로 3개의 필드가 보인다. 또한 앞서 설명한 대로 string은 binary가 UTF8로 인코딩된 값임을 알 수 있다.

 

다음은 메타 정보이다.

메타를 보면 파케이 파일의 버전과 함께 압축 정보도 표시된다.

파케이 파일을 만들 때 별도의 압축 알고리즘을 넣지 않아서 기본값인 UNCOMPRESSED로 되어 있음을 알 수 있다.

 

그리고 avro schma로 파케이 파일을 만들어서 avro schema도 메타정보에서 확인할 수 있다.

parquet-tools meta

마지막으로 실제 파케이 파일에 있는 데이터를 살펴보자. 

parquet-tools에 head 옵션을 주면 일부 데이터를 확인할 수 있다.

parquet-tools head

이번 글에서는 파케이의 전반적인 개념과 함께 파케이 파일을 생성하고 확인하는 작업까지 해봤다.

실무에서 HDFS에 파케이 파일을 저장하거나 읽어서 가공하는 작업들을 할텐데 파케이의 개념을 숙지하고 작업하면 도움이 되지 않을까 싶다.

 

 

참고 문서

 

 

'Big Data > Hadoop' 카테고리의 다른 글

Parquet (파케이)  (0) 2020.06.23
[Spring] Hadoop hdfs 파일 업로드  (0) 2019.03.06

댓글을 달아 주세요

[Kafka] Producer config 정리

Big Data/Kafka 2020. 6. 16. 15:36 Posted by 사용자 devJoshua

이번 글에서는 카프카 Producer(이하 프로듀서)의 주요 설정 값이 프로듀서의 아키텍처에서 어떤 역할을 하는지 정리한다.

카프카 문서에서는 각 설정값이 설명으로만 나열되어 있어서 이해하기 어려울 수 있다. 그래서 프로듀서의 주요 컴포넌트를 그림으로 표현하고 각 컴포넌트에서 어떤 설정 값을 사용해서 무슨 역할을 하는지 정리할 필요가 있다.

 

설정을 정리함에 있어서 카프카 문서를 제일 먼저 참조했지만 참고 문서에 포함한 내용도 추가해서 이해를 높이고자 했다.

 

1. 프로듀서 설정을 분석하는 이유

프로듀서의 정의를 사전에서 찾아보면 '생산자, 제작자'로 나온다.

카프카에서 프로듀서는 말 그래도 데이터를 생산하는 역할을 한다.

 

프로듀서의 설정값들은 데이터를 브로커에 발송할 때, 발송하는 데이터의 양/ 주기 및 데이터를 받는 브로커와의 네트웍 연결 등을 조절하는데 사용한다.

프로듀서 설정에는 여러 타입 (브로커 주소, 인증 관련 등) 들이 있지만 이 글에서는 발송하는 과정에서 주의깊게 봐야하는 설정들 위주로 살펴볼 예정이다.

 

앞으로 볼 설정들을 주의깊게 봐야하는 이유는 프로듀서가 비동기로 브로커에 데이터를 발송하기 때문이다.

프로듀서 코드에서 ProducerRecord를 생성해서 send() 메서드로 보낼 때, 바로 데이터가 브로커로 발송되지 않고 비동기로 데이터를 전송한다.

그래서 실제로 브로커에 데이터를 발송하기 전까지 데이터를 모아둘 버퍼가 필요하며, 얼마만큼 모아서 보내고, 보낸 데이터의 성공을 어떻게 체크할 지 등의 로직이 설정값에 녹여져 있다.

 

설정값을 이해하게 되면 프로듀서의 아키텍처도 더 깊이 이해할 수 있다.

 

 

2. 프로듀서 아키텍처

Accumulator(축적자), Sender(발송자) 2개의 컴포넌트를 주요하게 살펴보자.

 

다음 클래스의 주석에 다음과 같은 설명이 있다.

  • RecordAccumulator

    • 큐 역할을 하는 클래스

    • 한계가 있는 메모리 양을 사용하며 메모리를 다 사용했을 때 블록한다.

  • Sender

    • 카프카 브로커에 레코드를 보내는 백그라운드 스레드

아래는 프로듀서에서 브로커로 레코드 발송할 때의 구조를 그린 그림인데 Accumulator와 Sender가 포함되어 있다.

그림에 각 컴포넌트에서 사용하는 설정들을 같이 명시했다. 그림에 설정값이 어디에 표시되어 있는지 참고하면서 각 설정의 의미를 살펴보자.

 

그림 1. 프로듀서 아키텍처 및 모듈 별 사용 설정 정리

3. Accumulator

Accumulator는 레코드를 축적하는 역할이다.

그래서 레코드를 축적할 때 사용하는 버퍼의 크기 (buffer.memory), 레코드를 묶는 양 (batch.size), 버퍼가 찾을 때 블록할 시간 (max.block.ms) 설정이 있다.

 

  • buffer.memory

    • 프로듀서가 브로커에 보낼 레코드를 모아두는 버퍼의 전체 메모리 바이트 수 이다. 프로듀서가 사용하는 전체 메모리 크기는 buffer.memory와 대략 비슷하다. 정확히 일치하지 않는 것은 압축하는 과정과 브로커에 보내는 과정에서 사용하는 메모리가 추가로 들기 때문이다.

    • 브로커로 레코드를 전달하는 것보다 쌓이는 양이 더 많다면 버퍼가 부족할 수 있다. 버퍼가 부족할 때, max.block.ms 설정이 필요하다.

  • max.block.ms

    • 버퍼가 가득 찾을 때 send(), partitionsFor() 메서드를 블록하는 시간이다.

  • batch.size

    • 프로듀서는 레코드를 한번에 하나씩 발송하지 않고 묶어서 발송한다. 묶어서 발송해야 네트웍 단계가 줄어드며 성능 향상이 도움이 된다.

    • batch.size는 레코드를 묶는 byte 단위의 사이즈이다. 이 크기보다 크게 레코드를 묶지 않는다.

    • 그렇다고 배치가 가득찰 때까지 프로듀서가 기다린다는 것은 아니다.

4. Sender

Sender는 레코드를 브로커에 전달하는 역할이다. Accumulator가 쌓아놓은 레코드를 비동기로 브로커에 계속 발송한다.

그래서 Sender와 연결된 컴포넌트를 보면 Accumulator와 Kafka(브로커)가 있다. 

 

Accumulator와 연결된 선에 linger.ms 값이 있다.

  • linger.ms

    • 사전으로 정의를 찾아봤더니 '꾸물거리다'가 눈의 들어왔다.

    • 브로커로 발송하기 전에 Accumulator에 축적된 데이터를 꾸물거리며 가져오는 것이다. 이유는 부하 상황에 브로커에 요청 수를 줄이기 위함이다. 다음 메세지를 바로 보내지 않고 딜레이 시간을 주는 것이다.

    • 한 파티션에 batch.size 만큼 레코드가 있으면 linger.ms 값을 무시하고 발송한다.

    • 기본값은 0으로 대기하지 않는다.

다음은 브로커와 통신할 때 사용하는 설정들이다.

  • retries

    • 레코드 발송에 실패할 경우 재시도 하는 회수이다. 기본값이 2147483647 이다.

    • 여기서 의문은 이렇게 큰 재시도 횟수를 한 메세지에 대해서 계속 발송하면 실패할 경우 다른 메세지는 전송 시도도 못하고 버퍼에 데이터가 쌓이는지 여부이다. retries를 다 채우지 않고도 재시도를 멈추는 다른 설정이 있다. delivery.timeout.ms 이다.

    • 일반적으로 재발송 관련해서 retries 보다 delivery.timeout.ms로 제어한다.

  • delivery.timeout.ms

    • send() 메서드를 호출하고 성공과 실패를 결정하는 상한시간이다. 브로커로부터 ack을 받기 위해 대기하는 시간이며 실패 시 재전송에 허용된 시간이다. 복구할 수 없는 오류가 발생하거나 재시도 횟수를 다 소모하면 delivery.timeout.ms 설정 시간 보다 먼저 에러를 낼 수 있다.

    • request.timeout.ms과 linger.ms의 합보다 같거나 커야 한다.

  • max.request.size

    • 요청할 수 있는 최대 bytes 사이즈.

    • 거대한 요청을 피하기 위해 프로듀서에 의한 한 배치에 보내는 사이즈를 제한한다. 압축되지 않은 배치의 최대 사이즈.

    • 서버에서는 별도의 배치 사이즈 설정을 가지고 있다.

  • max.in.flight.request.per.connection

    • 블록되기 전에 클라이언트가 보내고 받지 못한 요청의 최대 개수. 한번에 브로커와 통신하는 개수로 이해했다.

    • 만약 1보다 높은 값을 설정하면 재발송 과정에서 순서가 변경되는 위험이 발생한다.

  • request.timeout.ms

    • 요청 응답에 대한 클라이언트의 최대 대기 시간. 타임아웃 시간 동안 응답을 받지 못하면 요청을 다시 보낸다.

    • 불필요한 재시도를 줄이려면 브로커 설정 replica.lag.time.max.ms 보다 큰 값이어야 한다.

  • replica.lag.time.max.ms

    • 브로커 팔로워가 복제를 위한 패치 요청을 하지 않을 경우 ISR에서 제외하는 시간이다.

프로듀서 설정에 대해서 정리를 해보았다.

개인적으로 설정들의 항목들을 전체적으로 확인함으로써 프로듀서 아키텍처를 더 이해할 수 있는 시간이 되었다.

 

해당 글을 읽으시는 분들이 카프카 프로듀서를 이해하는데 좀 더 도움이 되었으면 한다.

컨슈머, 커넥터, 브로커에 대한 설정들도 하나씩 정리해 볼 예정이다.

 

 

참고 문서

 

 

댓글을 달아 주세요

[multi thread] CountDownLatch

Language/Java 2020. 6. 4. 14:48 Posted by 사용자 devJoshua

지난 글에서는 Semaphore(이하 세마포어)를 알아봤다. 이번에는 Thread의 동시 실행과 관련된 기능을 하지만 차이가 있어 헷갈리는 CountDownLatch(이하 래치)를 정리한다.

 

세마포어는 동시에 실행할 수 있는 스레드 수를 조절해서 고정된 리소스의 사용을 제한할 수 있었다.

이해하기 쉽게 예를 들자면, 어떤 공연의 좌석이 정해져 있는데 입구에서 관리자가 좌석 만큼의 인원만 수용하도록 체크하는 것과 같다.

 

그에 비해 래치는 경마장에서 출발 선상에 있는 말들이 모두 오기를 기다리는 것과 같다.

모든 말이 출발 준비를 마치고 출발선에 오면 경주가 시작되듯 그 시점까지 기다리게 한다.

 

래치를 생성할 때 수행할 스레드 수를 지정한다.

그리고 await 메서드를 실행하는데, 이 지점에서 모든 스레드가 준비되었는지 대기한다.

준비되었다는 신호는 countDown 메서드로 알려준다. countDown의 이름처럼 준비가 필요해서 기다려야 하는 스레드가 하나씩 줄어든다.

 

아래 코드도 세마포어 처럼 IBM 블로그에서 제공한 코드를 가져왔다.

코드를 살펴보면 특정 이름을 가진 말들을 준비한다.

그리고 레이스가 시작하면 임의의 레이스 거리가 만들어지며, 랜덤한 수행시간에 달린 랜덤한 거리를 레이스 거리에서 빼가면서 말들이 각자 다른 속도로 달려감을 표현했다.

 

래치는 마지막 결승점에 모든 말들이 도착할 때까지 대기하도록 await를 걸고, 각 말들이 레이스를 끝내면 countDown 메서드로 끝났음을 알렸다.

모든 말들의 레이스가 끝나면 래치로 인한 대기가 끝나게 되고 레이스 결과를 표시해 준다.

 

레이스를 예로 래치를 적용하니 이해하기가 쉬웠다.

병렬 프로그래밍을 하다가 모든 스레드를 동시에 수행시켜서 성능 테스트를 할 때가 있다. 이런 경우 래치를 사용하면 된다.

 

 

관련 문서

 

'Language > Java' 카테고리의 다른 글

[NIO] WatchService  (0) 2020.07.28
[multi thread] CountDownLatch  (0) 2020.06.04
[multi thread] Semaphore  (0) 2020.06.02
[Java9] Collections, Streams  (0) 2020.03.13
Proxy를 통한 Restful API 호출 by Java  (0) 2019.12.09
[Java8] yyyyMMddHHmmssSSS LocalDateTime parse 오류  (0) 2019.07.11

댓글을 달아 주세요

[multi thread] Semaphore

Language/Java 2020. 6. 2. 14:22 Posted by 사용자 devJoshua

이전에 java.util.concurrent에 포함된 유용한 동기화 클래스들을 정리한 적이 있다.

해당 글은 다음 링크에서 확인할 수 있다.

 

동기화 클래스들이 더 있는데 이번 글에서는 Semaphore(이하 세마포어)에 대해서 정리한다.

 

세마포어는 특정 자원이나 특정 연산을 동시에 사용하거나 호출할 수 있는 스레드의 수를 제한하고자 할 때 사용한다.

자원 풀이나 컬렉션의 크기에 제한을 두고자 할 때 유용하다.

 

세마포어는 생성자에 퍼밋의 숫자를 받아서 해당 수 만큼 동시 사용을 허용한다.

더보기

permit을 사전에서 찾아보면 다음과 같은데 특정 수 만큼 허가증을 발급한다고 이해하자.

[불][가]〔…의〕 인가(서), 허가(증), 면허(장) (※특히 여행·노동·수출 등 기한이 있는 것을 말함)감찰〔for …〕

 

acquire() 메서드를 호출하면 최대 퍼밋이 될 때까지 등록을 허가하며, release() 메서드는 한 퍼밋을 해제한다.

 

아래 코드를 살펴보자. 코드는 IBM 블로그에 있는 코드에서 가져왔다.

코드를 보면 10개의 스레드를 생성하는데 세마포어는 3개의 퍼밋만 허용했다.

각 스레드는 임의의 수행시간(최대 15초)이 걸리기 때문에 종료 시점이 다르다.

코드를 실행해 보면 수행시간이 짧은 스레드가 작업을 마치고 release() 메서드를 호출하면 대기하던 다른 스레드가 실행된다.

 

의도했던 대로 여러 작업을 고정된 리소스에서 처리하고자 할 때 유용하다.

 

 

참고 문서

 

 

'Language > Java' 카테고리의 다른 글

[NIO] WatchService  (0) 2020.07.28
[multi thread] CountDownLatch  (0) 2020.06.04
[multi thread] Semaphore  (0) 2020.06.02
[Java9] Collections, Streams  (0) 2020.03.13
Proxy를 통한 Restful API 호출 by Java  (0) 2019.12.09
[Java8] yyyyMMddHHmmssSSS LocalDateTime parse 오류  (0) 2019.07.11

댓글을 달아 주세요

오래 준비해온 대답

2020. 6. 2. 09:46 Posted by 사용자 devJoshua

오랜만에 책과 관련된 글을 쓰고자 한다. 책을 꾸준히 읽으려고 노력하고 있으나 읽고 생각한 바를 글로 남기지 않으니 생각이 남지 않고 사라져 버려 아쉬울 때가 많다. 개인적으로 자주는 가지 못하지만 여행을 좋아하고 여행기를 쓴 책도 좋아한다. 여행이란 일상에서 벗어나서 새로운 경험을 하게 되니 새로운 생각과 관점도 생기게 된다. 내가 가지는 못하지만 다른 사람의 여행기에서 나의 예전 여행의 기억도 살아나고 기분전환도 된다.

 

최근에 여행과 관련한 책 2권을 읽었다. '김영하' 작가의 책 「여행의 이유」 그리고 「오래 준비해온 대답」 이다. TV에 출연을 많이 하는 작가라서 그의 삶을 본인에게 들을 수 있는 기회가 있었는데 여행을 많이 다녔다고 했다. 기회가 되면 여행을 다니고 책도 해외에서 쓰기고 한단다. 여행을 많이 하는 작가가 쓰는 여행의 이유와 여행기는 내 관심을 끌기에 충분했다.

 

여행의 이유
국내도서
저자 : 김영하(Young Ha Kim)
출판 : 문학동네 2019.04.17
상세보기

 

「여행의 이유」를 읽고 김영하 작가의 여행은 채움이 아닌 비움에 있음을 알게 됐다. 책의 한 부분이다.

 

"영감을 얻기 위해서 혹은 글을 쓰기 위해서 여행을 떠나지는 않는다. 여행은 오히려 그것들과 멀어지기 위해 떠나는 것이다. 격렬한 운동으로 다른 어떤 것도 생각할 수 없을 때 마침내 정신의 편안함이 찾아오듯이 잡념이 사라지는 곳, 모국어가 들리지 않는 땅에서 때로 평화를 느낀다."

 

일상에서 계속 여행을 꿈꾸는 것은 힘겨움이 몸에 조금씩 쌓여서 힘든 주변은 모두 정리하고 싶어서는 아닐까?

사랑하는 가족들과 여행을 하면 거추장스러운 것들과 멀리할 수 있고, 가족에게 집중하며 여유를 느끼기 때문인 듯 하다.

 

오래 준비해온 대답
국내도서
저자 : 김영하(Young Ha Kim)
출판 : 복복서가 2020.04.29
상세보기

 

「오래 준비해온 대답」은 작가의 여행을 실제로 볼 수 있어 흥미롭다. 그리고 여행지가 '시칠리아' 인데 많이 찾지 않는 곳이여서 궁금했다. 사실 시칠리아는 이탈리아인데 이탈리아의 유명한 관광지를 여행하는 것과는 달랐다.

이름도 낯선 도시들 '리파리', '시라쿠사', '아그리젠토'. 낯선 이름처럼 그들의 삶도 우리와는 달라 낯설지만 재밌다.

 

여행의 순서대로 이야기 하다가 특별한 에피소드들을 하나씩 소개한다. 개인적으로 '리파리'에서 낯선 곳에서 배타적인 사람들과 친밀하게 지내게 되는 사연이 인상 깊다. 한국에서는 바로 옆 집 사람들과도 대화를 하지 않게 되는데, 리파리의 사람들은 모두가 이웃이다. 새로운 사람이 오면 이웃이 아니기에 이방인이지만 친구가 되어 가는 과정이 좋아 보였다. 현실에서는 경쟁하고 이기적이 되어가며 관계로 인해 스트레스를 받는데 친구가 되어가는 과정은 관계의 행복을 다시 일깨워줬다. 

그리고 스쿠터를 타고 자유로움을 느끼며 자연을 보는 여유는 삶을 바쁘게 달리기만 했던 내 모습과 비교해 부러웠다.

 

코로나로 인해 여행도 쉽지 않지만, 여행을 갈 수 있는 때가 되면 다시 한번 책장을 열어서 여행자의 마음으로 되돌려 줄 것 같다.

 

 

'' 카테고리의 다른 글

오래 준비해온 대답  (0) 2020.06.02
아트 인문학 여행  (0) 2018.08.06
위대하고 위험한 약 이야기  (0) 2018.06.25
주경철의 유럽인 이야기1  (0) 2018.05.17
아날로그의 반격 - 몰스킨  (0) 2017.07.26
숨결이 바람될 때  (0) 2017.07.20

댓글을 달아 주세요