데이터 스트림을 non-blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양
그 구현체 예시 RxJava, Reactor..
리액티브 스트림즈(reactive streams) 구성 요소
publisher : 데이터 생성하고 통지
subscriber : 구독한 publisher로 부터 통지된 데이터를 전달받아서 처리
subscription : publisher에 요청할 데이터의 개수를 지정, 데이터의 구독을 취소
processor : publisher + subscriber
Publisher interface
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
subscribe 메서드를 통해서 Subscriber를 등록
Subscriber interface
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
onSubscribe : 구독 시작 시점에 어떤 처리를 하는 역할 ( ex : publisher에게 요철 할 데이터의 개수를 지정..)
onNext : publisher가 통지한 데이터를 처리하는 역할
onError : publisher에서 에러가 발생했을 때, subscriber의 해당 메서드 호출
onComplete : publisher가 데이터 통지를 완료했음을 알릴 때 호출
Subscription interface
public interface Subscription {
public void request(long n);
public void cancel();
}
publisher가 subscriber에게 전달하는 객체로
해당 객체를 통해 publisher, subscirber가 필요한 정보를 주고 받는다
해당 interface들을 구현한 예시
public class ForPractice {
@Test
void test() {
Iterable<Integer> iter = List.of(1, 2, 3, 4, 5);
Publisher pub = new Publisher() {
@Override
public void subscribe(Subscriber sub) {
Iterator<Integer> iterator = iter.iterator();
// 해당 요청이 오면 어떻게 동작할건지 정의되어 있음 ..
// 이 동작은 pub가 정의하는게 맞을듯 .. sub가 이 요청을 하면 나는 이렇게 해야지
// 그리고 그걸 sub에게 건내서 sub가 pub에게 요청을 보내고 싶은 경우에 사용함
// pub-sub 사이에 매개체
Subscription subscription = new Subscription() {
@Override
public void request(long n) {
System.out.println("[request] ");
if (iterator.hasNext()) {
sub.onNext(iterator.next());
} else {
sub.onComplete();
}
}
@Override
public void cancel() {
}
};
sub.onSubscribe(subscription);
}
};
Subscriber<Integer> sub = new Subscriber<>() {
Subscription subscription;
int REQUEST_NUM = 1;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("[sub][onSubscribe] ");
this.subscription = subscription;
this.subscription.request(REQUEST_NUM); // 요청준비가 되었어?! 그럼 이 만큼 메시지 줘!
}
@Override
public void onNext(Integer i) {
System.out.println("[sub][onNext] " + i);
subscription.request(REQUEST_NUM);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
System.out.println("[sub][onComplete]");
}
};
pub.subscribe(sub);
}
}