Overview

지난 글 Rxjava를 이용한 안드로이드 개발에서는 RxJava의 Android 연결 방법과 기본적인 사용법을 다뤘습니다. 이번 글에서는 RxJava의 강력하고 다양한 함수들을 살펴보고자 합니다. Android에서 복잡하게 구현되는 내용들을 단 몇 개의 함수로 처리할 수 있는 RxJava를 꼭 사용해보길 권합니다.

1. just
2. fromArray/fromlterable
3. range/rangLong
4. interval
5. timer
6. map
7. flatMap
8. concatMap
9. toList
10. toMap
11. toMultiMap
12. filter
13. distinct
14. take
15. skip
16. throttleFirst
17. throttleLast
18. throttleWithTimeout

참고: 공통적으로 사용하는 구독(수신) 클래스는 아래와 같습니다.

static class CustomSubscriber<T> extends DisposableSubscriber<T> {
    @Override
    public void onNext(T t) {
        System.out.println(Thread.currentThread().getName() + " onNext( " + t + " )");
    }

    @Override
    public void onError(Throwable t) {
        System.out.println(Thread.currentThread().getName() + " onError( " + t + ")");
    }

    @Override
    public void onComplete() {
        System.out.println(Thread.currentThread().getName() + " onComplete()");
    }
}




1. just

파라미터를 통해 받은 데이터로 Flowable을 생성하는 연산자입니다. 최대 10까지 전달할 수 있고, 모든 데이터가 수신되면 onComplete() 수신됩니다. 기본적인 Flowable 생성자 함수로 볼 수 있으며 단순 작업에서 많이 사용합니다.

public static void just() {
    //파라미터 값을 순차적으로 송신하는 Flowable 생성
    Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E", "F");

    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과 
main onNext( A )
main onNext( B )
main onNext( C )
main onNext( D )
main onNext( E )
main onNext( F )
main onComplete()




2. fromArray/fromIterable

fromArray, fromIterable 함수는 파리미터로 배열 또는 Iterable(리스트 등)에 담긴 데이터를 순서대로 Flowable을 생성하는 연산자입니다. 모든 데이터를 순차적으로 송신 후 완료됩니다. 반복적인 데이터 변환 작업 같은 경우 for 문 대신 대체할 수 있습니다. 결과를 보면 main Thread 에서 작업 결과가 나오지만, flatMap 을 사용한다면 별도의 Thread로 main Thread의 부하를 막을 수 있습니다.

1. fromArray
public static void fromArray() {
    //fromArray 배열로 파라미터를 전달 받는다.
    Flowable<String> flowable = Flowable.fromArray("A", "B", "C", "D", "E");

    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
main onNext( A )
main onNext( B )
main onNext( C )
main onNext( D )
main onNext( E )
main onComplete()

2. fromIterable
public static void fromIterable() {
    List<String> list = Arrays.asList("A", "B", "C", "D", "E");
    //fromIterable 리스트로 파라미터를 전달받는다.
    Flowable<String> flowable = Flowable.fromIterable(list);
    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
main onNext( A )
main onNext( B )
main onNext( C )
main onNext( D )
main onNext( E )
main onComplete()

파라미터와 함수는 다르지만 동일하게 처리된다.




3. range/rangLong

range 함수는 지정한 숫자부터 지정한 개수만큼 증가하는 Integer 값 데이터를 송신하는 Flowable를 생성합니다. rangLong 함수는 range와 동일하며 데이터 타입은 Long을 사용합니다. 두 함수 데이터 송신을 마치면 onComplete를 송신합니다.

1. range
public static void range() {
    //range(int start, int count)
    //start : 시작 값
    //end : 발생하는 횟수 
    Flowable<Integer> flowable = Flowable.range(10, 5);
        
    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
main onNext( 10 )
main onNext( 11 )
main onNext( 12 )
main onNext( 13 )
main onNext( 14 )
main onComplete()

2. rangLong
public static void rangeLong() {
    //range(int start, int count)
    //start : 시작 값
    //end : 발생하는 횟수 
    Flowable<Long> flowable = Flowable.rangeLong(10, 5);

    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
main onNext( 10 )
main onNext( 11 )
main onNext( 12 )
main onNext( 13 )
main onNext( 14 )
main onComplete()




4. interval

지정한 간격마다 0부터 시작해 Long 타입 숫자의 데이터를 송신하는 Flowable을 생성합니다. 데이터는 0, 1, 2, 4 순차적으로 증가된 데이터를 송신합니다. Android 에서는 반복적인 작업인 TimerTask를 대신해서 interval로 간단하게 처리할 수 있습니다. UI 변경이 필요한 부분에서는 interval scheduler를 AndroidSchedulers.mainThread() 를 변경해 적용할 수 있습니다.

public static void interval() {
    //(long time, TimeUnit unit, Scheduler scheduler)
    //time : 발생 간격 시간
    //unit : 간격 시간 단위 
    //scheduler : 발생 scheduler를 변경하여 사용할 수 있습니다. 
    //            ex)AndroidSchedulers.mainThread()
    // - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - 9
    // 1초 간격으로 데이터 요청을 송신하다.
    Flowable<Long> flowable = Flowable
            .interval(1000L, TimeUnit.MILLISECONDS).take(10);
        
         //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
RxComputationThreadPool-1 onNext( 0 )
RxComputationThreadPool-1 onNext( 1 )
RxComputationThreadPool-1 onNext( 2 )
RxComputationThreadPool-1 onNext( 3 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onNext( 5 )
RxComputationThreadPool-1 onNext( 6 )
RxComputationThreadPool-1 onNext( 7 )
RxComputationThreadPool-1 onNext( 8 )
RxComputationThreadPool-1 onNext( 9 )




5. timer

timer 함수는 호출된 시간부터 일정한 시간 동안 대기하고 Long 타입 0을 송신 및 종료하는 flowable을 생성합니다. interval이 조건까지 반복적으로 송신한다면, timer는 한번만 송신하고 종료됩니다.

public static void timer() {
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy.MM.dd hh:mm ss");

    System.out.println("현재시간 : " + simpleDateFormat.format(System.currentTimeMillis()));

    //(long time, TimeUnit unit, Scheduler scheduler)
    //time : 발생 간격 시간
    //unit : 간격 시간 단위 
    //scheduler : 발생 scheduler를 변경하여 사용할 수 있습니다. 
    //            ex)AndroidSchedulers.mainThread()
    Flowable<Long> flowable = Flowable.timer(1000L, TimeUnit.MILLISECONDS);

    //구독을 시작한다.
    flowable.subscribe(value -> {
        System.out.println(" timer : " + simpleDateFormat.format(System.currentTimeMillis()));
    }, throwable -> {
        System.out.println(throwable);
    }, () -> {
        System.out.println(" complete");
    });
}

결과
현재시간 : 2019.04.29 09:09 56
timer : 2019.04.29 09:09 57
complete




6. map

Flowable 에서 송신하는 데이터를 변환하고, 변환된 데이터를 송신하는 연산자입니다. 하나의 데이터만 송신할 수 있으며, 반드시 데이터를 송신해야 합니다. 혹여 송신되는 데이터가 null 을 포함하면 map 대신 아래의 flatMap 을사용하는 것이 좋습니다.

public static void map() {
    Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E")
            //map(Function mapper)
            //mapper : 받은 데이터를 가공하는 함수형 인터페이스 
            //알파벳 값을 소문자로 변경하여 return 한다
            .map(value -> value.toLowerCase());

    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
main onNext( a )
main onNext( b )
main onNext( c )
main onNext( d )
main onNext( e )
main onComplete()




7. flatMap

flatMap은 map과 동일한 함수이지만, map과는 달리 여러 데이터가 담긴 Flowable을 반환할 수 있습니다. 또한 빈 Flowable를 리턴해 특정 데이터를 건너뛰거나 에러 Flowable를 송신할 수 있습니다.

파라미터 mapper에서 새로운 Flowable의 데이터 전달이 아닌 다른 타임라인 Flowable로 작업하면 들어온 데이터 순서대로 출력을 지원하지 않습니다. 타임라인 Flowable(timer, delay, interval 등)에서는 가급적 사용을 피하거나, 순서에 지장이 없을 때 사용하는 것이 좋습니다.

public static void flatMap() {
    Flowable<String> flowable = Flowable.range(10, 2)
        //flatMap(Function mapper, BiFunction combiner)
        //mapper : 받은 데이터로 새로운 Flowable를 생성하는 함수형 인터페이스
        //combiner : mapper가 새로 생성한 Flowable 과 원본 데이터를 조합해 새로운 송신 데이트를 생성하는 함수형 인터페이스
        //첫 번째 데이터를 받으면 새로운 Flowable를 생성한다.
        //take(3) : 3개까지만 발생한다.
        .flatMap(value -> Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3), (value, newData) ->
                    "value " + value + " newData " + newData);
    
    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
RxComputationThreadPool-1 onNext( value 10 newData 0 )
RxComputationThreadPool-2 onNext( value 11 newData 0 )
RxComputationThreadPool-1 onNext( value 10 newData 1 )
RxComputationThreadPool-2 onNext( value 11 newData 1 )
RxComputationThreadPool-1 onNext( value 10 newData 2 )
RxComputationThreadPool-2 onNext( value 11 newData 2 )
RxComputationThreadPool-2 onComplete()

결과를 보면 각기 생성된 Flowable 비동기식으로 송신 되기때문에 
서로 다른 스레드에서 실행돼 데이터를 받는 순서대로 송신하지 않는다는 점을 주목하자




8. concatMap

받은 데이터를 Flowable로 변환하고 변환된 Flowable을 하나씩 순서대로 실행해서 수신자에서 송신합니다. 다시 말해 여러 데이터를 계속 받더라도 첫 번째 데이터로 생성한 Flowable 의 처리가 끝나야 다음 데이터로 생성한 Flowable을 실행하는 것입니다.

생성된 Flowable의 스레드에서 실행되더라도 데이터를 받은 순서대로 처리하는 것을 보장하지만, 처리 성능에 영향을 줄 수 있습니다.

public static void concatMap() {
    Flowable<String> flowable = Flowable.range(10, 5)
    //map(Function mapper)
    //mapper : 받은 데이터를 가공하는 함수형 인터페이스
        .concatMap(value ->
                Flowable.interval(100L, TimeUnit.MILLISECONDS).take(2)
                        .map(data -> ("value : " + value + " data : " + data)));

    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
RxComputationThreadPool-1 onNext( value : 10 data : 0 )
RxComputationThreadPool-1 onNext( value : 10 data : 1 )
RxComputationThreadPool-2 onNext( value : 11 data : 0 )
RxComputationThreadPool-2 onNext( value : 11 data : 1 )
RxComputationThreadPool-3 onNext( value : 12 data : 0 )
RxComputationThreadPool-3 onNext( value : 12 data : 1 )
RxComputationThreadPool-4 onNext( value : 13 data : 0 )
RxComputationThreadPool-4 onNext( value : 13 data : 1 )
RxComputationThreadPool-5 onNext( value : 14 data : 0 )
RxComputationThreadPool-5 onNext( value : 14 data : 1 )
RxComputationThreadPool-5 onComplete()

결과를 보면 생성된 Flowable 스레드와 데이터 순서대로 출력이 보장된다 것을   있다.




9. toList

toList는 송신할 데이터를 모두 리스트에 담아 전달합니다. 한꺼번에 데이터를 List로 가공해서 받기에 좋습니다. 하지만 많은 양의 데이터를 처리할 경우 버퍼가 생길 수 있고, 쌓은 데이터 때문에 메모리가 부족해질 수도 있습니다. 또한 수신되는 데이터는 하나이므로 Flowable이 아닌 Single 반환값을 사용합니다.

public static void toList() {
    Single<List<String>> single =
            Flowable.just("A", "B", "C", "D", "E", "F")
                    .toList();
        
    // 구독을 시작한다.
    single.subscribe(new SingleObserver<List<String>>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println(Thread.currentThread().getName() + " onNext()");
        }

        @Override
        public void onSuccess(List<String> strings) {
                        //최종 완료된 리스트를 순서대로 출력한다.
            for (String text : strings) {
                System.out.println(Thread.currentThread().getName() + " onSuccess( " + text + " )");
            }
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(Thread.currentThread().getName() + " onError() " + e);
        }
    });
}

결과
main onNext()
main onSuccess( A )
main onSuccess( B )
main onSuccess( C )
main onSuccess( D )
main onSuccess( E )
main onSuccess( F )




10. toMap

toMap은 송신할 데이터를 모두 키와 값의 쌍으로 Map에 담아 전달합니다. 나머지는 toList의 특징과 같습니다. 송신되는 데이터 타입은 Map에 담아서 송신하는데 동일한 key에서 value는 마지막 데이터가 덮어 씁니다. 요청되는 값보다 결과 값이 적을 수도 있습니다. List 값을 손쉽게 key, value로 분리할 수 있는 함수이기도 합니다.

public static void toMap() {
    Single<Map<Long, String>> single = Flowable.just("1A", "2B", "3C", "1D", "2E")
        //toMap(Fuction keySelector, Function valueSelector,  Callable mapSupplier)
        //keySelector : 받은 데이터로 Map에서 사용할 키를 생성하는 함수형 인터페이스
        //valueSelector : 받은 데이터로 Map 넣을 값을 생성하는 함수형 인터페이스
        .toMap(value -> Long.valueOf(value.substring(0, 1)), data -> data.substring(1));

    //구독을 시작한다.
    single.subscribe(new SingleObserver<Map<Long, String>>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println(Thread.currentThread().getName() + " onNext()");
        }

        @Override
        public void onSuccess(Map<Long, String> longStringMap) {
                        //최종 완료된 map을 순서대로 출력한다.
            for (long id : longStringMap.keySet()) {
                System.out.println(Thread.currentThread().getName() + " onSuccess( id : " + id + ", value " + longStringMap.get(id) + " )");
            }
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(Thread.currentThread().getName() + " onError() " + e);
        }
    });
}

결과
main onNext()
main onSuccess( id : 1, value D )
main onSuccess( id : 2, value E )
main onSuccess( id : 3, value C )




11. toMultiMap

키와 컬렉션 값으로 이루어진 Map을 데이터로 변환하여 송신하는 함수입니다. 나머지 특징은 toList, toMap과 같습니다. toMap에서 중복되는 value를 관리하는 건 없었지만, value를 collection으로 관리하여 전달되는 데이터를 모두 수신할 수 있습니다.

public static void toMultiMap() {
    Single<Map<String, Collection<Long>>> single =
        Flowable.interval(100L, TimeUnit.MILLISECONDS)
                .take(5)
                //toMultimap(Function keySelector, Function valueSelector)
                .toMultimap(value -> {
                    //value가 홀수인지 짝수 인지 판단해서 key값을 리턴한다.
                    if (value % 2 == 0) {
                        return "짝수";
                    } else {
                        return "홀수";
                    }
                });

    //구독을 시작한다.
    single.subscribe(new SingleObserver<Map<String, Collection<Long>>>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println(Thread.currentThread().getName() + " onNext( " + d + " )");
        }

        @Override
        public void onSuccess(Map<String, Collection<Long>> stringCollectionMap) {
            for (String key : stringCollectionMap.keySet()) {

                StringBuffer stringBuffer = new StringBuffer();

                for (long value : stringCollectionMap.get(key)) {
                    stringBuffer.append(" " + value);
                }

                System.out.println(Thread.currentThread().getName() + " onSuccess( id : " + key + ", value " + stringBuffer.toString() + ")");
            }
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(Thread.currentThread().getName() + " onError() " + e);
        }
    });
}

결과
main onNext()
RxComputationThreadPool-1 onSuccess( id : 짝수, value  0 2 4 )
RxComputationThreadPool-1 onSuccess( id : 홀수, value  1 3 )




12. filter

filter는 받은 데이터가 조건에 맞는지 판단해 결과가 true인 값만 송신합니다. 위의 just, fromArray, interval이 반복적인 케이스였다면, filter는 if문처럼 조건문의 역할을 할 수 있습니다. 반복문 함수와 조건문 함수를 같이 사용해 몇 줄 안에 for, if와 똑같이 구현할 수 있죠.

public static void filter() {
    Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        //짝수만 통과한다.                   3개만큼
        .filter(value -> value % 2 == 0).take(3);

    //구독을 시작한다.
    flowable.subscribe(new CustomSubscriber<>());
}

결과
RxComputationThreadPool-1 onNext( 0 )
RxComputationThreadPool-1 onNext( 2 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onComplete()




13. distinct

이미 처리된 데이터를 다시 볼 필요가 없을 때 사용하는 함수입니다. 송신하려는 데이터가 이미 송신된 데이터와 같다면 해당 데이터는 무시합니다. 이 함수는 내부에서 HashSet으로 데이터가 같은지 확인합니다.

public static void distinct() {
        Flowable<String> flowable = Flowable.just("A", "a", "B", "b", "A", "a", "B", "b")
            //distinct(Function keySelector)
            //keySelector : 받은 데이터와 비교할 데이터를 확인하는 함수
            //모두 소문자로 변환하여 알파벳 기준으로 데이터를 판단한다. 
            .distinct(value -> value.toLowerCase());
            
        //구독을 시작한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    main onNext( A )
    main onNext( B )
    main onComplete()




14. take

1.take
take 함수로 지정된 횟수만큼 받은 데이터를 송신합니다. 지정된 횟수에 도달하면 완료를 송신해 처리 종료합니다.

2.takeUntil
지정된 조건까지 데이터를 송신하는 연산자입니다. 조건이 되면 완료를 송신해 종료합니다.

3.takeWhile
지정된 조건이 해당할 때만 데이터를 송신하는 연산자입니다.

4.takeLast
데이터의 끝에서부터 지정한 조건까지 데이터를 송신하는 연산자입니다.

take 함수는 한 화면에 출력되거나 칠요한 데이터만큼 리스트에서 값을 하나씩 수신할 때 사용합니다. 예를 들어 화면에 데이터가 6개가 필요하면 take를 이용해 원하는 만큼의 데이터를 가져올 수 있습니다.

Flowable.take(6)



또한 이후에 나올 skip 함수를 같이 사용하면 두 번째 화면에서 필요한 데이터를 6개 가져올 수 있습니다.

Flowable.skip(6).take(12)



1. take
    public static void take() {
        // 100 밀리세컨드만큼 반복하며 총 5개를 출력후 종료한다.
        Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
                .take(5);
    
        //구독을 시작한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과 
    RxComputationThreadPool-1 onNext( 0 )
    RxComputationThreadPool-1 onNext( 1 )
    RxComputationThreadPool-1 onNext( 2 )
    RxComputationThreadPool-1 onNext( 3 )
    RxComputationThreadPool-1 onNext( 4 )
    RxComputationThreadPool-1 onComplete()
    
    
    2. takeUntil
    public static void takeUntil() {
        // 100 밀리세컨드만큼 반복하며 값이 5가 될때까지 송신한다.
        Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
                .takeUntil(value -> value == 5);
    
        //구독을 시작한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    RxComputationThreadPool-1 onNext( 0 )
    RxComputationThreadPool-1 onNext( 1 )
    RxComputationThreadPool-1 onNext( 2 )
    RxComputationThreadPool-1 onNext( 3 )
    RxComputationThreadPool-1 onNext( 4 )
    RxComputationThreadPool-1 onNext( 5 )
    RxComputationThreadPool-1 onComplete()
    
    
    3. takeWhile
    public static void takeWhile() {
        // 100 밀리세컨드만큼 반복하며 값이 5가 아닐경우까지 송신한다.
        Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
                .takeWhile(value -> value != 5);
    
        //구독을 시작한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    RxComputationThreadPool-1 onNext( 0 )
    RxComputationThreadPool-1 onNext( 1 )
    RxComputationThreadPool-1 onNext( 2 )
    RxComputationThreadPool-1 onNext( 3 )
    RxComputationThreadPool-1 onNext( 4 )
    RxComputationThreadPool-1 onComplete()
    
    
    4. takeLast
    public static void takeLast() {
        //100밀리 세컨트만큼 반복하며 5개의 출력중 뒤에 2개만 송신한다. 
        Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
                .take(5)
                .takeLast(2);
            
        //구독을 시작한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    RxComputationThreadPool-1 onNext( 3 )
    RxComputationThreadPool-1 onNext( 4 )
    RxComputationThreadPool-1 onComplete()




15. skip

1.skip
함수로 지정된 횟수만큼 받은 데이터 송신을 제외합니다. 지정된 횟수가 초과되면 나머지 데이터를 송신합니다.

2.skipUntil
지정된 조건까지 데이터 송신을 제외하는 연산자입니다. 조건이 되면 나머지 데이터를 송신합니다.

3.skipWhile
지정된 조건이 해당될 때만 데이터 송신을 제외하는 함수입니다.

4.skipLast
데이터의 끝에서부터 지정한 조건까지 데이터 송신을 제외하는 함수입니다.

take와 반대의 기능을 갖고 있습니다. 보통 페이저나 리스트에서 paging을 처리할 때는 take와 skip을 혼용합니다.

1. skip
    public static void skip() {
        //100 밀리세컨드만큼 반복하며 5번 발행하고, 처음 2개를 제외합니다.
        Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
                .take(5)
                .skip(2);
    
        //구독을 시잔한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    RxComputationThreadPool-1 onNext( 2 )
    RxComputationThreadPool-1 onNext( 3 )
    RxComputationThreadPool-1 onNext( 4 )
    RxComputationThreadPool-1 onComplete()
    
    
    2. skipUntil
    public static void skipUntil() {
        //300밀리 세컨드만큼 반복하며 5개를 발행하고, 1000 밀리세컨드 제외 후 송신합니다.
        Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .skipUntil(Flowable.timer(1000L, TimeUnit.MILLISECONDS))
                .take(5);
    
        //구독을 시잔한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    RxComputationThreadPool-2 onNext( 3 )
    RxComputationThreadPool-2 onNext( 4 )
    RxComputationThreadPool-2 onNext( 5 )
    RxComputationThreadPool-2 onNext( 6 )
    RxComputationThreadPool-2 onNext( 7 )
    RxComputationThreadPool-2 onComplete()
    
    
    3. skipWhile
    public static void skipWhile() {
        //300밀리세컨드만큼  반복하며 5개를 발행하고, 데이터 3이 올때까지 데이터를 제외힙니다.
        Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .skipWhile(value -> value != 3)
                .take(5);
    
        //구독을 시잔한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    RxComputationThreadPool-1 onNext( 3 )
    RxComputationThreadPool-1 onNext( 4 )
    RxComputationThreadPool-1 onNext( 5 )
    RxComputationThreadPool-1 onNext( 6 )
    RxComputationThreadPool-1 onNext( 7 )
    RxComputationThreadPool-1 onComplete()
    
    
    4. skipLast
    public static void skipLast() {
        //1000 밀리세컨드만큼 반복하며 5개를 발행하고 마지막 2개는 제외합니다
        Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS)
                .take(5)
                .skipLast(2);
    
        //구독을 시작한다.
        flowable.subscribe(new CustomSubscriber<>());
    }
    
    결과
    RxComputationThreadPool-1 onNext( 0 )
    RxComputationThreadPool-1 onNext( 1 )
    RxComputationThreadPool-1 onNext( 2 )
    RxComputationThreadPool-1 onComplete()




16. throttleFirst

데이터를 송신하고 지정된 시간 동안 들어오는 요청을 무시합니다. 이 함수는 View의 Event 처리에서 많이 사용됩니다. 중복되는 처리를 막기 위해 최초 실행 후 일정 시간 동안 View의 클릭 이벤트나 API 이벤트를 막을 수 있기 때문에 비동기 처리와 화면에 직접적인 피드백이 발생했을 때 throttleFirst를 자주 사용하고 있습니다.

    //데이터 요청이 30 밀리초마다 5번 발생합니다.
    //데이터 요청 발생시 100 밀리세컨트 동안 들어오는 데이터 요청을 무시합니다.
    // — 0 — 1 — 2 — 3 — 4   interval        30 밀리초 마다
    //       —   —   -*-   —     throttleFirst  100 밀리초 무시
   Flowable<Long> flowable = Flowable.interval(30L, TimeUnit.MILLISECONDS)
           .take(5).throttleFirst(100L, TimeUnit.MILLISECONDS);

   flowable.subscribe(new CustomSubscriber<>());
}

결과
RxComputationThreadPool-1 onNext( 0 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onComplete()




17. throttleLast

throttleLast 함수는 데이터를 송신하고 지정된 시간 동안 들어오는 마지막 요청을 송신합니다. 이 함수도 throttleFirst처럼 반복적인 선택 이벤트 처리에 유용하게 사용할 수 있습니다. 간단하게 장바구니 카운트 변경을 요청할 때 마지막 변경 이벤트 데이터만 처리하면 되므로 값이 선택되고 일정 시간이 지났을 때 API를 요청해 리소스 낭비를 줄일 수 있습니다.

public static void throttleLast() {
    //데이터 요청이 1 초 마다 6번 발생합니다.
    //데이터 요청 발생시 2 초 동안 들어오는 마지막 요청을 송신하다.
    // - 0 - 1 - 2 - 3 - 4   interval       1 초 마다
    //     -   -   -*  -     throttleLast   2 초의 마지막 값 송신
    Flowable<Long> flowable = Flowable.interval(1, TimeUnit.SECONDS)
                .take(5)
                .throttleLast(2, TimeUnit.SECONDS);
    flowable.subscribe(new CustomSubscriber<>());
}

결과
RxComputationThreadPool-1 onNext( 2 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onComplete()




18. throttleWithTimeout

throttleWithTimeout 함수는 데이터를 송신하고 지정된 시간 동안 다음 데이터를 받지 못하면 현재 데이터를 송신합니다. 완료 시엔 마지막 데이터를 송신하고 종료됩니다.

public static void throttleWithTimeout() {
    Flowable<String> flowable = Flowable.<String>create(emitter -> {
        emitter.onNext("A");
        Thread.sleep(1000L);
                // 1000 밀리세컨드 슬립
                // 500 밀리세컨드 동안  데이터 다음 데이터 요청이 없으므로 A 송신

        emitter.onNext("B");
        Thread.sleep(300L);
                // 300 밀리세컨드 슬립

        emitter.onNext("C");
        Thread.sleep(300L);
                // 300 밀리세컨드 슬립

        emitter.onNext("D");
        Thread.sleep(1000L);
                // 1000 밀리세컨드 슬립
                // 500 밀리세컨드 동안  데이터 다음 데이터 요청이 없으므로 D 송신

        emitter.onNext("E");
        Thread.sleep(100L);
                // 100 밀리세컨드 슬립

        emitter.onComplete();
                //완료 요청 시 마지막 데이터 송신 후 종료

    }, BackpressureStrategy.BUFFER)
        .throttleWithTimeout(500L, TimeUnit.MILLISECONDS);
        flowable.subscribe(new CustomSubscriber<>());
}
    
    결과
    RxComputationThreadPool-1 onNext( A )
    RxComputationThreadPool-1 onNext( D )
    main onNext( E )
    main onComplete()




Conclusion

RxJava에서 많이 사용되고, 또 알고 있으면 좋은 함수들을 살펴봤습니다. 브랜디에서도 이 함수들을 응용해 그동안 다양한 기능을 구현했고, 복잡한 함수도 사용하고 있습니다. 지금까지는 Flowable로 송신과 수신이 1 : 1 로 진행되었지만, 다양한 수신자를 사용해 하나의 Flowable로도 다른 화면에서 여러 수신자를 등록하여 반복적인 작업을 할 수 있습니다. 덕분에 같은 작업을 코드 중복 없이 간단하게 구현할 수 있죠.

다음 글에서는 2개 이상의 Flowable을 결합해 사용하는 방법과 Android View에서 RxJava를 응용하는 방법, 구독을 관리하는 방법 등 Android에서 유용하게 쓰는 방법들을 알아보겠습니다.


고재성 팀장 | R&D 개발MA팀
gojs@brandi.co.kr
브랜디, 오직 예쁜 옷만