나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Hadoop

[Spring] Hadoop hdfs 파일 업로드

devidea 2019. 3. 6. 11:40
회사에서 서비스를 만들다가 Restful API를 통해 파일을 업로드 받고 HDFS에 저장하는 기능이 필요했다. API 서버는 Spring으로 구현되어 있었기에 Spring에서 Hadoop 관련 repository도 찾아보고 JavaConfig 설정도 해보았는데 이번 포스트에서 간단히 정리해 보고자 한다.

내용은 아래의 순서로 진행하고자 한다.

1. Spring Apache Hadoop 라이브러리 찾기.
2. Hadoop 관련 JavaConfig 추가.
3. Hadoop NameNode HA 구성 맛보기.
4. HDFS에 파일 업로드.


1. Spring apache hadoop 라이브러리 찾기.
Spring Data 프로젝트 하위에 Spring for Apache Hadoop이 포함되어 있다. 라이브러리의 기능을 설명하는 Features 부분에 다음과 같은 내용이 있다. 
"Integration with Spring Boot to simply creat Spring apps that connect to HDFS to read and write data." 이 블로그에서 하고자 했던 HDFS로 데이터의 읽기/ 쓰기를 Spring Boot에 간단히 통합할 수 있게 한다는 내용이다.

그럼 테스트를 하고자 하는 Hadoop 버전에 맞는 라이브러리 버전을 먼저 찾아보자.
spring-projects/spring-hadoop 다음 Github wiki 페이지에서 Hadoop 버전 (혹은 배포판)에 맞는 라이브러리 버전을 소개하고 있다. 그런데 자세히 살펴보니 Hadoop 2.7.3 버전까지만 지원하고 있다. 필자의 로컬 컴퓨터에는 Hadoop 2.8.2 버전이 설치되어 있지만 client 호환성에는 문제가 없어보였다(HDFS 파일 쓰기의 테스트 결과로서). 

이유를 찾기 위해 구글링을 했더니 Hortonworks의 Hadoop 3 migration 문서를 살펴보았을 때, client는 Hadoop 2에서도 호환성이 유지된다고 쓰여있다. 그래서 HDFS 쓰기 용도로만 client를 사용해서 문제가 없는 듯 했다. 다만 Shell Script가 다시 띄어져 호환이 되지 않고, 일부 deprecated된 API가 있으니 더 자세히 살펴볼 필요가 있다.


2. Hadoop 관련 JavaConfig 추가.
그러면은 본격적으로 Spring에 Hadoop 설정을 적용해 보도록 하자. Spring boot가 등장한 이후로는 POJO 형태의 Java 코드로 설정을 추가한다. 그래서 Spring for Hadoop 문서에서 JavaConfig 관련 내용을 제일 먼저 찾았다.

아래 코드는 JavaConfig 관련 문서에 찾은 소스 그대로 이다. 설정은 정말 간단하다. SpringHadoopConfigurerAdapter 상속받은 Class를 하나 만들고, configure method를 override해서 필요한 설정을 추가하면 된다. 일단 hdfs 경로만 넣어줬다. 3번 항목에서 HA 구성된 Hadoop 관련 설정을 추가하는 내용을 살펴보겠다.

import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.config.annotation.EnableHadoop
import org.springframework.data.hadoop.config.annotation.SpringHadoopConfigurerAdapter
import org.springframework.data.hadoop.config.annotation.builders.HadoopConfigConfigurer;

@Configuration
@EnableHadoop
static class Config extends SpringHadoopConfigurerAdapter {

  @Override
  public void configure(HadoopConfigConfigurer config) throws Exception {
    config
      .fileSystemUri("hdfs://localhost:8020");
  }
}


3. Hadoop NameNode HA 구성 맛보기.
Hadoop의 안정적 운영을 위해서는 NameNode의 HA 구성이 필수적으로 보인다. 회사에서 운영 중인 Hadoop 역시 NameNode의 HA 구성이 되어 있었다. NameNode의 HA 구성은 이 블로그글의 범위를 벗어나서 자세히 기술하지는 않겠다. 
JavaConfig에 설정을 추가해야 하는 부분을 위해서 설명하자면, Active/ Standby NameNode로 구성되고 fsimage, edits log를 통해서 NameNode의 데이터 sync를 맞추게 된다. HA의 아키텍처를 구성에 여러 방법들(Shared Storage, JournalNode)이 존재한다. Spring에서 Hadoop 설정을 할 때, Active/Standby NameNode의 정보가 모두 필요함을 의미한다. 하지만 edits log를 공유하는 방법은 NameNode의 내부의 기능에 포함되므로 현재로서는 자세히 알지 않아도 된다.
필자 역시 NameNode HA에 대해서 이해가 부족했는데 다음 블로그 글이 "Hadoop NameNode 이중화 시 fencing의 역할"로 도움을 많이 받았다.

추가해야 하는 설정에 대해서 먼저 코드부터 보자. 필자는 코드부터 보는게 이해가 빠를 때가 많았다.

config
  .withProperties()
  .property("dfs.nameservices", "mycluster")
  .property("dfs.ha.namenodes.mycluster", "cluster_n1,cluster_n2")
  .property("dfs.namenode.rpc-address.mycluster.cluster_n1", "nameNode1:8020")
  .property("dfs.namenode.rpc-address.mycluster.cluster_n2", "nameNode2:8020")
  .property(
    "dfs.client.failover.proxy.provider.mycluster",
    "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  );


그리고 각 설정을 살펴보자. Hadoop 공식 문서에 보면 설명이 자세히 나와 있다.
  • dfs.nameservices : 새로운 nameservice를 위한 논리적인 이름.
  • dfs.ha.namenodes.[nameservice ID] : nameservice 안에 있는 각 NameNode의 식별자.
  • dfs.namenode.rpc-address.[nameservice ID].[name node ID] : 각 NameNode가 수신 대기하는 RPC 주소.
  • dfs.client.failover.proxy.provider.[nameservice ID] : Java client가 어떤 NameNode를 호출해야 하는지 판별하는 기능
    • ConfiguredFailoverProxyProvider : 첫 호출에서 모든 NameNode를 동시에 호출하여 판별하고, active node에 fail-over가 일어나기 전까지 active NameNode만 호출 함)


4. HDFS에 파일 업로드.
이제 HDFS에 파일을 올릴 준비만 되었다. Spring Service를 하나 만들어서 MultipartFile로 파일을 받아서 올리는 함수를 구현했다. 이번에도 코드부터 보자.

@Slf4j
@Service
public class HdfsService {

    @Autowired
    private Configuration hadoopConfiguration;

    public void uploadHdfs(MultipartFile file) {

        try {
            // hadoop 설정 가져오기.
            FileSystem hdfs = FileSystem.get(hadoopConfiguration);
           
            Path outputPath = new Path("/user/test");

            // Duplicate check for jar file.
            if (hdfs.exists(outputPath)) {               
                throw new Exception("Exist hdfs file");
            }

            OutputStream outStream = hdfs.create(outputPath);
            IOUtils.copyBytes(file.getInputStream(), outStream, hadoopConfiguration);

            hdfs.close();

        } catch (IOException e) {
            log.error("HDFS IOException. message:{}", e.getMessage());
        }
    }
}


정말 간단한 코드이다. 앞서 2,3번 항목에서 설명한 설정을 @Autowired로 불러와서 org.apache.hadoop.fs.FileSystem 객체를 만드는데 사용했다.
hdfs.create(outputPath)에서 관련 파일을 생성했다. 그리고 org.apache.hadoop.io.IOUtils를 통해서 MultipartFile의 InputStream을 복사해 주었다.

4장의 간단한 내용을 설명하기 위해서 서론이 길었던 것 같다.
Spring Apache Hadoop을 활용해서 Hadoop의 다른 기능도 쓸 수 있는 부분이 많을 텐데 맛보기로 살펴보았다. 기회가 되다면 다른 기능들도 하나씩 살펴보고자 한다.


관련 문서


반응형

'Big Data > Hadoop' 카테고리의 다른 글

HBase 활용을 위한 기본 개념  (2) 2021.09.03
Parquet (파케이)  (1) 2020.06.23