새소식

반응형
Development

[Reactive] Reactive Streams(리액티브 스트림즈)

  • -
반응형

리액티브 스트림즈(reactive streams)란

리액티브 라이브러리를 어떻게 구현할지 정의해 놓은 별도의 표준 사양

데이터 스트림을 non-blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양

그 구현체 예시 RxJava, Reactor.. 

 

리액티브 스트림즈(reactive streams) 구성 요소 

  • publisher : 데이터 생성하고 통지
  • subscriber : 구독한 publisher로 부터 통지된 데이터를 전달받아서 처리
  • subscription : publisher에 요청할 데이터의 개수를 지정, 데이터의 구독을 취소
  • processor : publisher + subscriber

출처 : https://www.moelzayat.com/reactive-programming-reactor-java-best-practices-tips-and-tricks/

 

 

Publisher interface

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

 

subscribe 메서드를 통해서 Subscriber를 등록 

 

Subscriber interface

public interface Subscriber<T> {

    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}

 

onSubscribe : 구독 시작 시점에 어떤 처리를 하는 역할 ( ex : publisher에게 요철 할 데이터의 개수를 지정..) 

onNext : publisher가 통지한 데이터를 처리하는 역할

onError : publisher에서 에러가 발생했을 때, subscriber의 해당 메서드 호출

onComplete : publisher가 데이터 통지를 완료했음을 알릴 때 호출 

 

Subscription interface

public interface Subscription {

    public void request(long n);

    public void cancel();
}

publisher가 subscriber에게 전달하는 객체로

해당 객체를 통해 publisher, subscirber가 필요한 정보를 주고 받는다 

 

해당 interface들을 구현한 예시


public class ForPractice {

    @Test
    void test() {
        Iterable<Integer> iter = List.of(1, 2, 3, 4, 5);
        Publisher pub = new Publisher() {
            @Override
            public void subscribe(Subscriber sub) {
                Iterator<Integer> iterator = iter.iterator();
                // 해당 요청이 오면 어떻게 동작할건지 정의되어 있음 ..
                // 이 동작은 pub가 정의하는게 맞을듯 .. sub가 이 요청을 하면 나는 이렇게 해야지
                // 그리고 그걸 sub에게 건내서 sub가 pub에게 요청을 보내고 싶은 경우에 사용함 
                // pub-sub 사이에 매개체
                Subscription subscription = new Subscription() {

                    @Override
                    public void request(long n) {
                        System.out.println("[request] ");
                        if (iterator.hasNext()) {
                            sub.onNext(iterator.next());
                        } else {
                            sub.onComplete();
                        }

                    }

                    @Override
                    public void cancel() {

                    }
                };
                sub.onSubscribe(subscription);
            }
        };

        Subscriber<Integer> sub = new Subscriber<>() {
            Subscription subscription;
            int REQUEST_NUM = 1;

            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("[sub][onSubscribe] ");
                this.subscription = subscription;
                this.subscription.request(REQUEST_NUM); // 요청준비가 되었어?! 그럼 이 만큼 메시지 줘!
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("[sub][onNext] " + i);
                subscription.request(REQUEST_NUM);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {
                System.out.println("[sub][onComplete]");
            }
        };

        pub.subscribe(sub);
    }
}

 

output 

[sub][onSubscribe] 
[request] 
[sub][onNext] 1
[request] 
[sub][onNext] 2
[request] 
[sub][onNext] 3
[request] 
[sub][onNext] 4
[request] 
[sub][onNext] 5
[request] 
[sub][onComplete]
반응형
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.