학습일지/Language

Java Concurrent Programming - Executors, Callable, Future, CompletableFuture

inspirit941 2020. 10. 22. 09:55
반응형

백기선님의 인프런 더 자바8 강의 내용 정리.


Concurrent Programming

이전까지의 자바 Concurrent Programming을 알아야 한다.

자바의 기본 Concurrent Programming

Concurrent 소프트웨어

  • 동시에 여러 작업을 할 수 있는 SW

자바에서는 멀티프로세싱, 멀티쓰레드 둘 다 가능하다. 이 중 멀티쓰레드를 다룸.

package com.inspirit941.java8to11;

public class testConcurrentProgramming {
    static class MyThead extends Thread {
        @Override
        public void run() {
            System.out.println("Thread: " + Thread.currentThread().getName());
        }
    }
    public static void main(String[] args) {
        // 메인 메소드가 작동하는 메인 쓰레드
        System.out.println("main Thread");
        System.out.println(Thread.currentThread().getName());
        // 메인쓰레드에서 다른 쓰레드를 생성하는 방법은 두 가지.

        // 1. 불편한 방법 - 쓰레드 상속받은 객체 생성
        MyThead myThead = new MyThead();
        myThead.start();
        // return 순서는 보장하지 않는다. 콘솔에 출력되는 순서는 매번 다름

        // 2. Runnable 구현체로 생성
        Thread thread = new Thread(() -> {
            while (true) {
                System.out.println("Thread: " + Thread.currentThread().getName());
                try {
                    // sleep : 현재 쓰레드를 대기상태로 두는 것.
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // 자는 동안 누군가가 interrupt할 경우
//                    e.printStackTrace();
                    // 누군가가 쓰레드를 깨울 경우 종료하기.
                    return;
                }
            }
        });
        thread.start();

        // 1. sleep : 현재 쓰레드를 대기 상태로 두는 것.
        // 2. interrupt : 쓰레드가 외부의 개입을 받는 것. 이걸로 특정 쓰레드를 깨우거나 종료하는 등의 작업이 가능.
        // 종료시키는 메소드가 따로 있는 게 아니라, 이런 식으로 interrupt를 받았을 때 적용할 수 있는 방법 중 하나.

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // thread에 interrupt 명령어를 보내서 return하도록 설정.
        thread.interrupt();


        // 3. join : 다른 쓰레드를 기다리는 것.
        Thread thread2 = new Thread(() -> {
            System.out.println("Thread: " + Thread.currentThread().getName());
            try {
                // 2초 대기했다가 종료되는 쓰레드.
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                return;
            }
        });
        thread2.start();

        try {
            // 쓰레드가 작동을 끝낼 때까지 대기
            thread2.join();
        } catch (InterruptedException e) {
            // 해당 쓰레드를 대기하는 중에 interrupt될 경우
            // (이 경우는 메인쓰레드에 interrupt가 들어올 경우)
            e.printStackTrace();
        }
        System.out.println(thread2+ " is finished");

        // 다 좋은데, 쓰레드 두 개만 생성했는데도 코드량이 많다.
        // 개수가 많아질수록 코딩으로 쓰레드를 관리할 수가 없음
        // 그래서 등장한 게 Executor.
    }
}

Executors

Runnable이나 Thread처럼 Low-level API로 코드를 다루는 대신

보다 하이레벨의 Concurrency 프로그래밍을 의미함.

담당 작업

  • 쓰레드 생성. 개발자는 Runnable 안에 로직만 제공한다.
  • 쓰레드 생명주기 관리
  • 작업 처리 및 실행을 담당하는 API 제공.

주요 인터페이스

  • Executor : execute(Runnable). 직접 이 타입을 쓰진 않음.
  • ExecutorService : Executor 상속받은 인터페이스. Callable도 실행 가능하며, Executor를 종료하거나 여러 Callable을 동시에 실행하는 기능 제공.
  • ScheduledExecutorService : ExecutorService 상속받은 것. 특정 시간대에 작업을 실행하거나 / 주기적으로 작업을 실행하고 싶을 때 등등에 사용
package com.inspirit941.java8to11;

import java.util.concurrent.*;

public class testExecutors {
    public static Runnable getRunnable(String message) {
        return () -> System.out.println(message + Thread.currentThread().getName());
    }
    public static void main(String[] args) {
        // 쓰레드 하나만 쓰는 Executor : Executors.newSingleThreadExecutor();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 실행하기. execute 대신 submit을 써도 된다.
        // ex) executorService.execute(() -> System.out.println("Thread " + Thread.currentThread().getName()));
        executorService.submit(getRunnable("hello"));
        executorService.submit(getRunnable("hello2"));
        executorService.submit(getRunnable("hello3"));
        executorService.submit(getRunnable("hello4"));
        executorService.submit(getRunnable("hello5"));
        // 이렇게 되면, 2개의 쓰레드로 다섯 개의 실행요청을 처리한다.

        // ExecutorService에서 Thread Pool을 관리. 여기에 쓰레드 개수가 존재
        // Blocking Queue: 쓰레드가 해야 할 작업을 쌓아놓는 곳.
        // 이전 작업을 마치면 Queue에서 순차적으로 새 작업을 쓰레드에 할당하는 식으로 동작한다.

        executorService.shutdown();
        // ExecutorService는 한 번 실행해서 작업을 수행하면, 다음 작업이 들어올 때까지 대기상태가 된다.
        // 따라서 shutdown()을 명시적으로 해줘야 함.
        // shutdown (): Graceful shutdown. 실행하는 작업들 다 마친 후에 종료시키는 작업.
        // shutdownNow() : 그런 거 없이 실행 즉시 바로 종료.

        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        // 5초 뒤에 메소드 실행하도록 세팅
        scheduledExecutorService.schedule(getRunnable("hello"), 5, TimeUnit.SECONDS);
        scheduledExecutorService.shutdown();
        // 1초 대기 후 첫 실행, 2초마다 반복실행
        scheduledExecutorService.scheduleAtFixedRate(getRunnable("fixedRate"), 1, 2, TimeUnit.SECONDS);

        // fork / join 기능도 있지만, 이건 멀티프로세스 프로그래밍에서 유용한 개념.
    }
}

쓰레드에서 무언가 결과를 리턴받고 싶다면, Runnable은 사용할 수 없다. void return이기 때문.

자바 1.5 버전에서부터 Callable이 등장했다. 리턴 기능을 제공.

리턴값을 받아올 수 있는 무언가가 필요함. 그게 Future.

Callable과 Future

리턴값을 가질 수 있는 Runnable = Callable.

package com.inspirit941.java8to11;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class testCallable {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000);
            return "hello";
        };
        // callable의 리턴 타입이 Function.
        Future<String> submit = executorService.submit(hello);
        System.out.println("Started");
        // get을 만나는 순간 메인쓰레드는 멈추고, submit이 연산을 마칠 때까지 기다린다.
        // isDone() : submit 쓰레드가 끝났는지 아닌지 확인하는 메소드
        System.out.println(submit.isDone()); // return false
        // cancel(bool) : 해당 쓰레드 종료 명령. 이렇게 종료된 service는 get()으로 결과를 가져올 수 없다.
        // true 넣으면 진행중인 작업을 interrupt하고 종료, false는 연산 끝날때까지 기다렸다가 종료

        try {
            String returns = submit.get();
            System.out.println(returns);
            // return hello
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        Callable<String> world = () -> {
            Thread.sleep(3000);
            return "world";
        };
        Callable<String> java = () -> {
            Thread.sleep(1000);
            return "java";
        };

        try {
            // 여러 개의 callable을 한번에 보내기. 리턴값으로 Future 리스트를 반환함.
            // invokeAll : 투입한 모든 Callable이 전부 끝날 때까지 대기함.

            List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, world));
            futures.forEach(stringFuture -> {
                try {
                    System.out.println(stringFuture.get());
                    // return hello, java, world
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();

        // invokeAny : 모든 쓰레드가 끝날 때까지 기다리는 대신 뭐라도 하나 결과가 나오면 바로 리턴.
        // 리턴타입도 Future 안에 정의된 가상클래스.
        ExecutorService newExecutorService = Executors.newFixedThreadPool(4);
        try {
            // 쓰레드 4개, 메소드 3개 -> 각각의 스레드에 하나씩 메소드 할당됨.
            // 가장 먼저 리턴되는 결과값을 Future<클래스>의 "클래스" 타입으로 반환한다.
            String result = newExecutorService.invokeAny(Arrays.asList(hello, java, world));
            System.out.println(result);
            // return java
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        newExecutorService.shutdown();
    }
}

CompletableFuture

비동기 프로그램에 가까운 코딩이 가능해진다.

Future의 단점

  • 외부에서 완료시킬 수 없음. get()에 타임아웃을 설정하는 식으로 해결해야 함
  • 블로킹 코드 (ex)get)을 실행하지 않으면 작업이 끝났을 때 콜백 실행이 불가능
  • 여러 Future를 조합할 수 없다. (event 정보를 가져온 뒤, 이벤트에 참석하는 회원 목록 불러오기 같은 기능 구현이 불가능)
  • 예외처리 API 없음
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(()->"hello");
// future 결과값을 가지고 어떤 작업을 하려면
// 무조건 get 코드 뒤에서 작업해야 함. = 비동기 코딩패턴을 만들기 어려웠음
future.get();

외부에서 Complete을 시킬 수 있어서 CompleteFuture.


package com.inspirit941.java8to11;

import java.util.concurrent.*;

public class testCompletableFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 명시적으로 Executor 생성할 필요 없음.
        CompletableFuture<String> future = new CompletableFuture<>();
        // future의 기본값을 hello로 명시.
        future.complete("hello");
        System.out.println(future.get());
        // 위와 같은 기능을 수행하는 static 메소드.
        // CompletableFuture<String> future2 = CompletableFuture.completedFuture("hello");


        // 1. 리턴이 없는 경우 runAsync 메소드 사용.
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() ->{
            System.out.println("hello " + Thread.currentThread().getName());
        });
        future3.get();

        // 2. 리턴이 있는 경우 supplyAsync 사용.
        // 콜백을 주는 방법: .thenApply()
        // 따로 리턴을 하지 않을 경우 .thenAccept(Comsumer) 사용. 이 경우 CompleteFuture<Void>가 리턴타입이 됨.
        // 인풋을 따로 받지 않고 리턴값도 없을 경우 .thenRun(Runnable).
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("supplyAsync hello " + Thread.currentThread().getName());
            // currentThread 이름으로 ForkJoinPool이 리턴됨.
            return "SupplyAsync";
        }).thenApply((string) -> {
            return string.toUpperCase();
        });
        System.out.println(stringCompletableFuture.get());

        // Q. 따로 쓰레드 풀 정의도 하지 않았는데 어떻게 쓰레드를 생성해서 사용하는가
        // A. ForkJoinPool 때문. 자바 7에서 도입되었고, Executor 구현체 중 하나.
        // Deque 형태. 쓰레드가 자기 할 일이 없으면 직접 가져와서 처리 + 작업 단위를 서브테스크 단위로 쪼개서 다른 쓰레드에 분산처리 + join하는 것이라고 함

        // 물론, ThreadPool을 직접 정의해 사용하는 것도 가능하다.
        // runAsync나 supplyAsync 메소드의 두번째 인자값으로 executorService를 입력.
        // thenRun이나 thenApply같은 콜백메소드에 사용할 경우
        // 메소드 뒤에 Async 붙이고, 두번째 인자값으로 쓰레드 풀을 입력할 수 있음.
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
            return "fixedThreadPool" + Thread.currentThread().getName();
        }, executorService).thenRunAsync(() -> {
            System.out.println("thenRun");
        }, executorService);
        System.out.println(future2.get());

        executorService.shutdown();
    }
}

여러 작업을 조합해 사용하기, 예외처리하기

package com.inspirit941.java8to11;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class testCompletableFuture2 {
    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("world " + Thread.currentThread().getName());
            return message + " world";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "hello";
        });
        // 1. hello 끝나고 world해야 함. (의존성 존재하는 경우) : thenCompose
        // 이전까지는 hello.get(); world.get(); 형태로 수행
        // thenCompose 메소드를 사용할 수 있다.
        CompletableFuture<String> future = hello.thenCompose(s -> getWorld(s));
        System.out.println(future.get());

        // 2. 둘 사이에 연관관계가 없는 경우 - 비동기적으로 실행 (따로따로) : thenCombine
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("world " + Thread.currentThread().getName());
            return "world";
        });
        // hello와 world를 각각 실행한 후
        // 그 결과를 받아 처리하는 인터페이스 사용 = BiFunction
        CompletableFuture<String> future2 = hello.thenCombine(world, (h, w) -> {
            return h + " " + w;
        });
        System.out.println(future2.get());

        // 3. 2개 이상의 subtask를 모두 실행한 뒤에 다음 작업 진행하기
        // allOf()
        // 그런데, 각각의 subtask가 리턴타입이 다를 경우 / Interrupt로 종료될 경우 null을 리턴.
        CompletableFuture.allOf(hello, world)
                .thenAccept(System.out::println); // hello와 world 리턴타입이 다를 경우 null 리턴

        // 해결법
        List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
        CompletableFuture[] futureArray = futures.toArray(new CompletableFuture[futures.size()]);

        CompletableFuture<List<String>> results = CompletableFuture.allOf(futureArray)
                .thenApply(v -> {
                    // thenApply로 넘어온 시점에서, 이미 쓰레드 각각의 연산은 다 끝난 상황.
                    return futures.stream()
                    // tip. get과 join은 기능상 동일, get보다 코드가 깔끔함.
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
                });
        // results는 모든 쓰레드 연산이 끝났을 때 결과값을 받아오게 됨.
        // non-blocking방식으로 처리
        results.get().forEach(System.out::println);

        // 아무거나 먼저 결과나오는 것 빨리 받기. 출력하는 메소드 적용이므로 thenAccept
        CompletableFuture<Void> voidCompletableFuture =
                CompletableFuture.anyOf(hello, world)
                        .thenAccept((s) -> System.out.println(s));
        voidCompletableFuture.get();

        // 예외 처리
        // 비동기처리 task에서 에러 발생 시
        // exceptionally 메소드가 Exception 인자를 받은 뒤 필요한 로직을 처리하도록 생성.
        // handle()의 경우 정상적으로 종료될 시 / 그렇지 않을 경우를 나눠 연
        boolean throwError = true;
        CompletableFuture<String> throwExceptionFuture = CompletableFuture.supplyAsync(() -> {
            if (throwError) throw new IllegalArgumentException();
            System.out.println("hello" + Thread.currentThread().getName());
            return "hello";
        }).handle((result, error) -> {
            if (error != null) return "error";
            return result;
        });
//       .exceptionally(ex -> {
//            System.out.println(ex);
//            return "error";
//        });
        System.out.println(throwExceptionFuture.get());
    }
}
반응형