ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • CHAP7. 병렬 데이터 처리와 성능
    book/모던 자바 인 액션 2020. 8. 26. 23:56

    7.1 병렬 스트림

    스트림 인터페이스를 이용하면 아주 간단하게 요소를 병렬로 처리할 수 있다.

    컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다.

    병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.

     

    아래는 숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드 구현 예제이다.

    public long sequentialSum(long n){
        return Stream.iterate(1L, i -> i + 1) //무한 자연수 스트림 생성
                     .limit(n) // n개로 제한
                     .reduce(0L, Long::sum); //모든 숫자를 더하는 스트림 리듀싱 연산

     

    7.1.1 순차 스트림을 병렬 스트림으로 변환하기

    순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리된다.

    public long parallelSum(long n){
        return Stream.iterate(1L, i -> i + 1)
                     .limit(n)
                     .parallel() // 스트림을 병렬 스트림으로 변환
                     .reduce(0L, Long::sum);

    병렬 리듀싱 연산 - 출처 : https://yongho1037.tistory.com/705

    위의 사진처럼 리듀싱 연산을 여러 청크에 병렬 수행할 수 있다.

    마지막으로 리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.

     

    순차 스트림에 parrallel를 호출하면 연산이 병렬로 수행해야 함을 의미하는 불리언 플래그가 설정된다.

    반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수도 있다.

    두 메서드를 동시에 호출할 경우 마지막으로 호출된 메서드로 실행이 된다.

    병렬 스트림은 내부적으로 ForkJoinPool을 사용한다.
    기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime(). availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.

     

    7.1.2 스트림 성능 측정

    병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측하지만 추측은 소프트웨어 공학에서는 위험한 방법이다.

    성능을 최적화하기 위해선 측정을 해야한다.

    n개의 숫자를 더하는 예제로 측정을 해보았다.

    전통적인 for 루프를 사용해 반복하는 방법이 더 저수준으로 동작할 뿐만 아니라 기본값을 박싱 하거나 언박싱할 필요가 없음으로 더 빠르다.

    public long iterativeSum(){
        long result = 0;
        for(long i=1L; i<=N; i++){
            result += i;
        }
        return result;
    }

    순차적 스트림을 사용하는 경우에 비해서 더 빨랐다.

    public long sequentialSum(long n){
        return Stream.iterate(1L, i -> i + 1)
                     .limit(n)
                     .reduce(0L, Long::sum);

    순차적 스트림을 병렬 스트림으로 바꾸어 실행하면 어떻게 될까?

    병렬 버전이 순차 버전에 비해 훨씬 더 느린 실망스러운 결과가 나왔다.

    이때 두 가지 문제를 발견할 수 있다.

    • 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.
    • 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.

     

    위 문제를 해결하기 위해선 어떻게 해야 할까?

    더 특화된 메서드를 사용하면 된다.

    LongStream.rangeClosed 메서드는 iterate에 비해 다음과 같은 장점이 있다.

    • 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.
    • 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다.
    public long parallelRangedSum(){
        return LongStream.rangeClosed(1, N)
                         .parallel()
                         .reduce(0L, Long::sum);

    위의 코드처럼 하게 되면 순차 실행보다도 빠른 성능을 갖게 된다.

     

    병렬화를 하기 위해선 많은 비용이 들어간다.

    병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고 결과를 하나의 값으로 합쳐야 한다.

    따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.

     

    7.1.3 병렬 스트림의 올바른 사용법

    병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 데이터를 바꾸는 알고리즘을 사용하기 때문에 일어난다.

    병렬 스트림이 올바르게 동작하기 위해선 공유된 가변 데이터를 피해야 한다는 사실이 중요하다.

     

    7.1.4 병렬 스트림 효과적으로 사용하기

    '천 개 이상의 요소가 있을 때만 병렬 스트림을 사용하라'와 같이 양을 기준으로 병렬 스트림 사용을 결정하는 것은 적절하지 않다.

    • 확신이 서지 않으면 직접 측정하라
    • 박싱을 주의하라. 자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소다.
    • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. ( ex. limit, findFirst)
    • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
    • 소량의 데이터에서는 병렬 스트림이 도움되지 않는다.
    • 스트림을 구성하는 자료구조가 적절한지 확인하라.
    • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
    • 최종 연산의 병합 과정 비용을 살펴보라.

     

    7.2 포크/조인 프레임워크

    포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브 태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.

    포크/조인 프레임워크에서는 서브 태스크를 스레드 풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

     

    7.2.1 RecursiveTask 활용

    스레드 풀을 이용하려면 RecursiveTask <R>의 서브클래스를 만들어야 한다.

    여기서 R은 병렬화된 태스크가 생성하는 결과 형식, 또는 결과가 없는 RecursiveAction 형식이다.

    RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다.

    protected abstract R compute();

    compute 메서드는 태스크를 서브 태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브 태스크의 결과를 생산할 알고리즘을 정의한다.

    따라서 대부분의 compute 메서드 구현은 다음과 같은 의사 코드 형식을 유지한다.

    if (태스크가 충분히 작거나 더 이상 분할 할 수 없으면) {
        순차적으로 태스크 계산
    } else {
        태스크를 두 서브태스크로 분할
        태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
        모든 서브태스크의 연산이 완료될 때까지 기다림
        각 서브태스크의 결과를 합침
    }

    포크/조인 과정 - 출처 : https://yongho1037.tistory.com/705

     

    n까지의 자연수 덧셈 작업을 병렬로 수행하는 예제이다.

    public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {
        private final long[] numbers;
        private final int start;
        private final int end;
        public static final long THRESHOLD = 10_000;
        
        public ForkJoinSumCalculator(long[] numbers) {
            this(numbers, 0, numbers.length);
        }
        
        pirvate ForkJoinSumCalculator(long[] numbers, int start, int end) {
             this.numbers = numbers;
             this.start = start;
             this.end = end;
         }
         
         @Override
         protected Long compute() {
             int length = end - start;
             if (length <= THRESHOLD)
                 return computeSequentially();
             
             ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
             leftTask.fork();
             
             ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
             
             Long rightResult = rightTask.compute();
             Long leftResult = leftTask.join();
             return leftResult + rightResult;
         }
         
         private long computeSquentially() {
             long sum = 0;
             for(int i = start; i<end; i++){
                 sum += numbers[i];
             }
             return sum;
         }
    }

    위의 compute 메서드는 병렬로 실행할 수 있을 만큼 태스크의 크기가 충분히 작아졌는지 확인하고 아직 태스크의 크기가 크다고 판단되면 숫자 배열을 반으로 분할해서 두 개의 새로운 ForkJoinSumCalculator로 할당한다.

    그러면 다시 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator을 실행한다.

    결국 이 과정이 재귀적으로 반복되면서 덧셈을 수행할 항목이 10000개 이하가 될 때까지 태스크 분할을 반복한다.

    분할된 서브 태스크는 순차적으로 처리되며 부분 결과를 합쳐서 태스크의 최종 결과를 계산한다.

     

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
    

     위의 코드처럼 ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘겨줄 수 있다.

     

    7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법

    포크/조인 프레임워크는 쉽게 사용할 수 있는 편이지만 항상 주의를 기울여야 한다.

    포크/조인 프레임워크를 효과적으로 사용하는 방법은 아래와 같다.

    • 두 서브 태스크가 모두 시작된 다음에 join을 호출해야 한다.
    • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드 대신 compute나 fork 메서드를 직접 호출해야 한다.
    • 두 서브 태스크에서 한쪽은 fork를, 한쪽은 compute를 호출하는 것이 두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다는 점에서 효율적이다.
    • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
    • 멀티코어에서 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를 것이라는 생각은 버려야 한다.

     

    7.2.3 작업 훔치기

    멀티 스레드로 처리를 하다 보면 각각의 태스크가 크기가 같기 때문에 같은 시간에 종료될 것이라고 생각할 수 있지만 사실은 서브 태스크의 작업 완료 시간이 크게 달라질 수 있다.

    분할 기법이 효율적이지 않았을 수도 있고 예기치 못한 디스크 접근 속도의 저하, 또는 외부 서비스와 협력하는 과정에서 지연이 생길 수 있다.

    포크/조인 프레임워크에서는 작업 훔치기라는 기법을 통해 위 문제를 해결한다.

    작업 훔치기 기법에서는 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다.

    각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 테스트를 가져와서 작업을 처리한다.

    이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있다.

    이때 태스크를 빨리 처리한 스레드는 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.

    모든 태스크가 작업을 끝낼 때까지, 즉 모든 큐가 빌 때까지 이 과정을 반복한다.

    따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업 부하를 비슷한 수준으로 유지할 수 있다.

    작업 훔치기 알고리즘 - 출처 : https://yongho1037.tistory.com/705

     

    7.3 Spliterator 인터페이스

    Java8은 Spliterator라는 새로운 인터페이스를 제공한다

    Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있다.

    public interface Spliterator<T> {
        boolean tryAdvance(Consumer<? super T> action);
        Spliterator<T> trySplit();
        long estimateSize();
        int characteristics();
    }

    T는 Spliterator에서 탐색하는 요소의 타입을 가리킨다.

    • tryAdvance는 Spliterator의 요소를 하나씩 순차적으로 소비해가면서 탐색해야 할 요소가 남아있다면 True를 반환
    • trySplit는 Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성
    • estimateSize는 메서드로 탐색해야 할 요소의 수 정보를 제공
    • characteristics는 Spliterator에서 정의한 int 형. 각 값은 의미하고 있는 것이 있음

     

    7.3.1 분할과정

    Spliterator는 trySplit 메서드를 통해 스트림 요소를 재귀적으로 분할한다.

    재귀 분할 과정 - 출처 : https://yongho1037.tistory.com/705

    이 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.

     

    Spliterator 특성

    Spliterator는 characteristics라는 추상 메서드도 정의한다.

    characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.

    특성 의미
    ORDERED 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 떄 이 순서에 유의해야 한다.
    DISTINCT x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환한다.
    SORTED 탐색된 요소는 미리 정의된 정렬 순서를 따른다.
    SIZED 크기가 알려진 소스(예를들어 Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환한다.
    NOT-NULL 탐색하는 모든 요소는 null이 아니다.
    IMMUTABLE 이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가/삭제/수정이 불가능.
    CONCURRENT 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
    SUBSIZED 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.

     

    'book > 모던 자바 인 액션' 카테고리의 다른 글

    CHAP6. 스트림으로 데이터 수집  (0) 2020.08.25
    CHAP5. 스트림 활용  (0) 2020.08.23
    CHAP4. 스트림 소개  (0) 2020.08.11
    CHAP3. 람다 표현식  (0) 2020.07.28
    CHAP2. 동작 파라미터화 코드 전달하기  (0) 2020.07.24
Designed by Tistory.