IT/Spring

[리액터] 리액터 사용하기

김 정 환 2022. 7. 11. 01:02
반응형

 

일련의 작업 단계를 기술하는 명령형 프로그램과 달리, 리액티브 프로그래밍은 데이터가 전달될 파이프라인을 구성하는 것입니다. 그리고 이 파이프라인을 통해 데이터가 전달되는 동안 어떤 형태로든 변경 또는 사용될 수 있습니다.

 

명령형 프로그래밍과 리액티브 프로그래밍의 차이를 예시로 알아보겠습니다.

 

명령형 프로그래밍 예시부터 보겠습니다. 과일의 이름을 가져와서 모두 대문자로 변경한 후 하나의 문장을 만들어 출력해보겠습니다. 이 경우는 각 줄의 코드가 같은 스레드에서 한 단계씩 차례대로 실행됩니다. 그리고 각 단계가 완료될 때까지 다음 단계로 이동하지 못하게 실행 중인 스레드를 막습니다.

String fruit = "Apple";
String capitalFruit = fruit.toUpperCase();
String likeFruit = "I like " + capitalFruit + "!";
System.out.println(likeFruit);

 

리액티브 프로그래밍 예시를 보겠습니다. 아래 코드는 마치 단계별로 실행되는 것처럼 보이지만, 실제로는 데이터가 전달되는 파이프라인을 구성하는 것입니다. 그리고 파이프라인의 각 단계에서는 어떻게 하든 데이터가 변경됩니다. 또한, 각 오퍼레이션(여기서는 map)은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있습니다. 아래 과정을 나타내면, just() 오퍼레이션으로 "Apple"을 가진 Mono를 만듭니다. 그리고 이것이 map() 오퍼레이션으로 전달되어 대문자로 변경되고 다른 Mono를 만듭니다. 그리고 이것은 다음 map() 오퍼레이션으로 전달되어 문자열 결합을 수행하고 또 다른 Mono를 만듭니다. 이렇게 세 개의 Mono를 생성하고 끝으로 subscribe()를 호출하여 마지막에 생성된 Mono를 구독하여 데이터를 수신하고 출력합니다.

Mono.just("Apple")
    .map(n -> n.toUpperCase())
    .map(cn -> "I like " + cn + "!")
    .subscribe(System.out::println);

 

 

 

리액티브 플로우는 마블 다이어그램(marble diagram)으로 나타내곤 합니다. 마블 다이어그램의 제일 위에는 Flux나 Mono를 통해 전달되는 데이터의 타임라인을 나타내고, 중앙에는 오퍼레이션을, 제일 밑에는 결과로 생성되는 Flux나 Mono의 타임라인을 나타냅니다.

https://mister11.github.io/posts/project_reactor_intro/

 

 

 

Flux와 Mono가 제공하는 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성합니다. 오페레이션의 종류는 생성 오퍼레이션, 조합 오퍼레이션, 변환 오퍼레이션, 로직 오퍼레이션가 있습니다. 자주 사용되는 오퍼레이션을 살펴보겠습니다.

 

생성 오퍼레이션부터 보겠습니다. just() 메서드를 이용해서 Flux나 Mono 리액티브 타입 객체를 생성할 수 있습니다. 리액티브 객체를 생성한 후에 subscribe() 메서드를 호출하면 구독할 수 있게 되고 데이터가 흘러갈 수 있게 됩니다.

public void createAFlux_just(){

    Flux<String> fruitFlux = Flux.just("Apple", "Grape", "Orange");
    fruitFlux.subscribe(f -> System.out.println("Here's some fruit: " + f));
}

 

fromArray()를 호출하여 배열로부터 Flux를 생성할 수 있습니다. 그리고 fromIterable()를 호출하여 List와 Set과 같은 Iterable으로부터 Flux를 생성할 수 있습니다. 이외에 생성하는 오퍼페이션은 여기에서 찾아보실 수 있습니다. 'create '라고 검색하면 찾을 수 있습니다.

public void createAFlux_FromArray(){
    
    String[] fruits = new String[] {"Apple", "Orange", "Grape"};
    Flux<String> fruitFlux = Flux.fromArray(fruits);
}

public void createAFlux_FromIterable(){
    List<String> fruitList = new ArrayList<>();
    fruitList.add("Apple");
    fruitList.add("Orange");
    fruitList.add("Grape");
    Flux<String> fruitFlux = Flux.fromIterable(fruitList);
}

 

 

조합 오퍼레이션을 보겠습니다. 두 개의 리액티브 타입을 결합해야 하거나 하나의 Flux를 두 개 이상의 리액티브 타입으로 분할해야 하는 경우에 사용합니다. mergeWith()를 사용하여 두 Flux를 하나의 Flux로 생성할 수 있습니다.

public void mergeFluxes(){
    
    Flux<String> UpperFlux = Flux
        .just("AAA", "BBB", "CCC")
        .delayElements(Duration.ofMillis(500)); # 방출시간 늦춤
    
    Flux<String> lowerFlux = Flux
        .just("aaa", "bbb", "ccc")
        .delaySubscription(Duration.ofMillis(250)) # 구독시간 늦춤
        .delayElements(Duration.ofMillis(500));
    
    Flux<String> mergedFlux = upperFlux.mergeWith(lowerFlux);
}

 

이외에, zip()을 사용하여 각 Flux 소스로부터 한 항목씩 번갈아 가져와 새로운 Flux를 생성할 수 있고, zip()에 람다식을 넣어서 새로운 Flux를 생성할 수 있습니다. first()를 사용하여 먼저 값을 방출하는 소스 Flux를 선택해서 새로운 Flux를 생성합니다.

 

 

변환 오퍼레이션을 보겠습니다. 데이터 스트림으로 데이터가 흐르는 동안 일부 값을 필터링하거나 변환해야하는 경우에 사용할 수 있습니다. filter()에 조건식을 넣어서 선택적으로 발행할 수 있습니다. 아래 필터 오퍼레이션을 거치면 "Yellowstone"과 "Yosemite"를 가진 Flux가 생성됩니다.

public void filter(){

    Flux<String> nationalParkFlux = Flux
        .just("Yellowstone", "Yosemite", "Grand Canyon")
        .filter(np -> !np.contains(" "));
}

 

map()은 지정된 함수를 이용하여 메시지의 변환을 수행하고 새로운 메시지를 만들어서 Flux를 생성합니다. 아래의 예시를 보겠습니다. just()로 생성된 Flux는 String 객체를 발행하고 map()의 결과로 생성된 Flux는 Player 객체를 발행합니다. map()에서 각 항목이 소스 Flux로부터 발행될 때 동기적으로(각 항목이 순차 처리) 매핑되어 수행됩니다. 비동기적으로(각 항목이 병렬 처리) 수행하고 싶다면 flatMap()을 사용하면 됩니다.

public void map(){

    Flux<Player> playerFlux = Flux
        .just("Jeong Kim", "Il Kim", "Soo Tam")
        .map(n -> {
            String[] split = n.split("\\s");
            return new Player(split[0], split[1]);
        })
}

 

flatMap()은 수행 도중 생성되는 임시 Flux를 사용해서 변환을 수행하므로 비동기 변환이 가능합니다. map()에서는 한 객체를 다른 객체로 매핑하는 정도였지만, flatMap()에서는 각 객체를 새로운 Mono나 Flux로 매핑하며, 해당 Mono나 Flux로 새로운 변환을 하여 새로운 Flux를 생성합니다. 그리고 subscribeOn()과 함께 사용하면 리액터 타입의 변환을 비동기적으로 수행할 수 있습니다.

 

아래 예시에서, flatMap()에는 String 타입을 Mono로 변환하는 람다가 지정되어 있습니다. 그 다음 map()이 해당 Mono에 적용되어 String 객체를 Player 객체로 변환합니다. 여기까지만 하고 멈추면 순차적으로(동기적으로) 수행됩니다. 그러나 마지막에 subscribeOn()을 호출하면 각 구독이 병렬 스레드로 수행됩니다. 따라서 다수의 입력 객체들의 map()이 비동기적으로 병렬 처리됩니다. StepVerifier로 처리된 항목을 확인하면 {"Jeong", "Kim"}, {"Il", "Jae"}, {"Soo", "Park"}으로 나올 때도 있고 {"Soo", "Park"}, {"Il", "Jae"},  {"Jeong", "Kim"}으로 나올 때도 있습니다.

public void flatMap(){
    Flux<Player> playerFlux = Flux
        .just("Jeong Kim", "Il Jae", "Soo Park")
        .flatMap(n -> Mono.just(n)
            .map(p -> {
                String[] split = p.split("\\s");
                return new Player(split[0], split[1]);
                })
            .subscribeOn(Schedulers.parallel())
    );
    
    List<Player> playerList = Arrays.asList(
       new Player("Jeong", "Kim"),
       new Player("Il", "Jae"),
       new Player("Soo", "Park")
    );
    
    # Mono와 Flux를 구독하고 스트림을 통해서 전달되는 데이터를 확인하여 테스트
    StepVerifier.create(playerFlux)
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .verifyComplete();
}

 

이외에, skip()을 사용하여 지정된 수의 메시지를 건너뛴 후에 Flux를 생성합니다. take()를 사용하여 처음부터 지정된 수의 메시지만 가져와서 Flux를 생성합니다. distinct()를 사용하여 발행된 적이 없는(중복되지 않는) 항목만 모아서 Flux를 생성합니다.

 

 

 

로직 오퍼레이션을 보겠습니다. Mono나 Flux가 발행한 항목이 어떤 조건과 일치하는지만 알아야 할 경우가 있습니다. 이때는 all()이나 any() 오퍼레이션을 사용할 수 있습니다.

 

all()은 모든 메시지가 조건을 충족하는지 확인하는데 사용합니다. 아래 예시에서 모든 동물은 알파벳 'a'를 포함하지 않으므로 false를 갖는 Mono를 생성합니다.

public void all(){

    Flux<String> animalFlux = Flux.just("cat", "dog", "eagle");
    Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
}

 

any()를 사용하여 일부 조건과 일치하는지 확인할 수 있습니다. 아래 예시에서 일부 동물은 알파벳 't'를 포함하므로 true를 갖는 Mono를 생성합니다.

public void any(){

    Flux<String> animalFlux = Flux.just("cat", "dog", "eagle");
    Mono<Boolean> hasTMono = animalFlux.any(a -> a.contains("t"));
}

 

 

 

요약

  • 리액티브 프로그래밍에서는 데이터가 흘러가는 파이프라인을 생성합니다.
  • 리액티브 스트림은 Publisher, Subscriber, Subscription, Processor의 네 가지 타입을 정의합니다.
  • 프로젝트 리엑터는 리액티브 스트림을 구현하며, 수많은 오퍼레이션을 제공하는 Flux와 Mono의 두 가지 타입으로 스트림을 정의합니다.

 

[Spring in Action 서적을 바탕으로 작성되었습니다.]

반응형