새소식

반응형
Development

[Reactive] Reactive Streams(리액티브 스트림즈)

  • -
반응형

리액티브 라이브러리를 어떻게 구현할지 정의해 놓은 별도의 표준 사양

데이터 스트림을 non-blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양

그 구현체 예시 RxJava, Reactor.. 

 

  • publisher : 데이터 생성하고 통지
  • subscriber : 구독한 publisher로 부터 통지된 데이터를 전달받아서 처리
  • subscription : publisher에 요청할 데이터의 개수를 지정, 데이터의 구독을 취소
  • processor : publisher + subscriber

출처 : https://www.moelzayat.com/reactive-programming-reactor-java-best-practices-tips-and-tricks/

 

 

public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }

 

subscribe 메서드를 통해서 Subscriber를 등록 

 

public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }

 

onSubscribe : 구독 시작 시점에 어떤 처리를 하는 역할 ( ex : publisher에게 요철 할 데이터의 개수를 지정..) 

onNext : publisher가 통지한 데이터를 처리하는 역할

onError : publisher에서 에러가 발생했을 때, subscriber의 해당 메서드 호출

onComplete : publisher가 데이터 통지를 완료했음을 알릴 때 호출 

 

public interface Subscription { public void request(long n); public void cancel(); }

publisher가 subscriber에게 전달하는 객체로

해당 객체를 통해 publisher, subscirber가 필요한 정보를 주고 받는다 

 

public class ForPractice { @Test void test() { Iterable<Integer> iter = List.of(1, 2, 3, 4, 5); Publisher pub = new Publisher() { @Override public void subscribe(Subscriber sub) { Iterator<Integer> iterator = iter.iterator(); // 해당 요청이 오면 어떻게 동작할건지 정의되어 있음 .. // 이 동작은 pub가 정의하는게 맞을듯 .. sub가 이 요청을 하면 나는 이렇게 해야지 // 그리고 그걸 sub에게 건내서 sub가 pub에게 요청을 보내고 싶은 경우에 사용함 // pub-sub 사이에 매개체 Subscription subscription = new Subscription() { @Override public void request(long n) { System.out.println("[request] "); if (iterator.hasNext()) { sub.onNext(iterator.next()); } else { sub.onComplete(); } } @Override public void cancel() { } }; sub.onSubscribe(subscription); } }; Subscriber<Integer> sub = new Subscriber<>() { Subscription subscription; int REQUEST_NUM = 1; @Override public void onSubscribe(Subscription subscription) { System.out.println("[sub][onSubscribe] "); this.subscription = subscription; this.subscription.request(REQUEST_NUM); // 요청준비가 되었어?! 그럼 이 만큼 메시지 줘! } @Override public void onNext(Integer i) { System.out.println("[sub][onNext] " + i); subscription.request(REQUEST_NUM); } @Override public void onError(Throwable t) { } @Override public void onComplete() { System.out.println("[sub][onComplete]"); } }; pub.subscribe(sub); } }

 

output 

[sub][onSubscribe] [request] [sub][onNext] 1 [request] [sub][onNext] 2 [request] [sub][onNext] 3 [request] [sub][onNext] 4 [request] [sub][onNext] 5 [request] [sub][onComplete]
반응형

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.