학습일지/Language
Java Concurrent Programming - Executors, Callable, Future, CompletableFuture
inspirit941
2020. 10. 22. 09:55
반응형
백기선님의 인프런 더 자바8 강의 내용 정리.
Concurrent Programming
이전까지의 자바 Concurrent Programming을 알아야 한다.
자바의 기본 Concurrent Programming
Concurrent 소프트웨어
- 동시에 여러 작업을 할 수 있는 SW
자바에서는 멀티프로세싱, 멀티쓰레드 둘 다 가능하다. 이 중 멀티쓰레드를 다룸.
package com.inspirit941.java8to11;
public class testConcurrentProgramming {
static class MyThead extends Thread {
@Override
public void run() {
System.out.println("Thread: " + Thread.currentThread().getName());
}
}
public static void main(String[] args) {
// 메인 메소드가 작동하는 메인 쓰레드
System.out.println("main Thread");
System.out.println(Thread.currentThread().getName());
// 메인쓰레드에서 다른 쓰레드를 생성하는 방법은 두 가지.
// 1. 불편한 방법 - 쓰레드 상속받은 객체 생성
MyThead myThead = new MyThead();
myThead.start();
// return 순서는 보장하지 않는다. 콘솔에 출력되는 순서는 매번 다름
// 2. Runnable 구현체로 생성
Thread thread = new Thread(() -> {
while (true) {
System.out.println("Thread: " + Thread.currentThread().getName());
try {
// sleep : 현재 쓰레드를 대기상태로 두는 것.
Thread.sleep(1000);
} catch (InterruptedException e) {
// 자는 동안 누군가가 interrupt할 경우
// e.printStackTrace();
// 누군가가 쓰레드를 깨울 경우 종료하기.
return;
}
}
});
thread.start();
// 1. sleep : 현재 쓰레드를 대기 상태로 두는 것.
// 2. interrupt : 쓰레드가 외부의 개입을 받는 것. 이걸로 특정 쓰레드를 깨우거나 종료하는 등의 작업이 가능.
// 종료시키는 메소드가 따로 있는 게 아니라, 이런 식으로 interrupt를 받았을 때 적용할 수 있는 방법 중 하나.
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// thread에 interrupt 명령어를 보내서 return하도록 설정.
thread.interrupt();
// 3. join : 다른 쓰레드를 기다리는 것.
Thread thread2 = new Thread(() -> {
System.out.println("Thread: " + Thread.currentThread().getName());
try {
// 2초 대기했다가 종료되는 쓰레드.
Thread.sleep(2000);
} catch (InterruptedException e) {
return;
}
});
thread2.start();
try {
// 쓰레드가 작동을 끝낼 때까지 대기
thread2.join();
} catch (InterruptedException e) {
// 해당 쓰레드를 대기하는 중에 interrupt될 경우
// (이 경우는 메인쓰레드에 interrupt가 들어올 경우)
e.printStackTrace();
}
System.out.println(thread2+ " is finished");
// 다 좋은데, 쓰레드 두 개만 생성했는데도 코드량이 많다.
// 개수가 많아질수록 코딩으로 쓰레드를 관리할 수가 없음
// 그래서 등장한 게 Executor.
}
}
Executors
Runnable이나 Thread처럼 Low-level API로 코드를 다루는 대신
보다 하이레벨의 Concurrency 프로그래밍을 의미함.
담당 작업
- 쓰레드 생성. 개발자는 Runnable 안에 로직만 제공한다.
- 쓰레드 생명주기 관리
- 작업 처리 및 실행을 담당하는 API 제공.
주요 인터페이스
- Executor : execute(Runnable). 직접 이 타입을 쓰진 않음.
- ExecutorService : Executor 상속받은 인터페이스. Callable도 실행 가능하며, Executor를 종료하거나 여러 Callable을 동시에 실행하는 기능 제공.
- ScheduledExecutorService : ExecutorService 상속받은 것. 특정 시간대에 작업을 실행하거나 / 주기적으로 작업을 실행하고 싶을 때 등등에 사용
package com.inspirit941.java8to11;
import java.util.concurrent.*;
public class testExecutors {
public static Runnable getRunnable(String message) {
return () -> System.out.println(message + Thread.currentThread().getName());
}
public static void main(String[] args) {
// 쓰레드 하나만 쓰는 Executor : Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 실행하기. execute 대신 submit을 써도 된다.
// ex) executorService.execute(() -> System.out.println("Thread " + Thread.currentThread().getName()));
executorService.submit(getRunnable("hello"));
executorService.submit(getRunnable("hello2"));
executorService.submit(getRunnable("hello3"));
executorService.submit(getRunnable("hello4"));
executorService.submit(getRunnable("hello5"));
// 이렇게 되면, 2개의 쓰레드로 다섯 개의 실행요청을 처리한다.
// ExecutorService에서 Thread Pool을 관리. 여기에 쓰레드 개수가 존재
// Blocking Queue: 쓰레드가 해야 할 작업을 쌓아놓는 곳.
// 이전 작업을 마치면 Queue에서 순차적으로 새 작업을 쓰레드에 할당하는 식으로 동작한다.
executorService.shutdown();
// ExecutorService는 한 번 실행해서 작업을 수행하면, 다음 작업이 들어올 때까지 대기상태가 된다.
// 따라서 shutdown()을 명시적으로 해줘야 함.
// shutdown (): Graceful shutdown. 실행하는 작업들 다 마친 후에 종료시키는 작업.
// shutdownNow() : 그런 거 없이 실행 즉시 바로 종료.
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// 5초 뒤에 메소드 실행하도록 세팅
scheduledExecutorService.schedule(getRunnable("hello"), 5, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
// 1초 대기 후 첫 실행, 2초마다 반복실행
scheduledExecutorService.scheduleAtFixedRate(getRunnable("fixedRate"), 1, 2, TimeUnit.SECONDS);
// fork / join 기능도 있지만, 이건 멀티프로세스 프로그래밍에서 유용한 개념.
}
}
쓰레드에서 무언가 결과를 리턴받고 싶다면, Runnable은 사용할 수 없다. void return이기 때문.
자바 1.5 버전에서부터 Callable이 등장했다. 리턴 기능을 제공.
리턴값을 받아올 수 있는 무언가가 필요함. 그게 Future.
Callable과 Future
리턴값을 가질 수 있는 Runnable = Callable.
package com.inspirit941.java8to11;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class testCallable {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(2000);
return "hello";
};
// callable의 리턴 타입이 Function.
Future<String> submit = executorService.submit(hello);
System.out.println("Started");
// get을 만나는 순간 메인쓰레드는 멈추고, submit이 연산을 마칠 때까지 기다린다.
// isDone() : submit 쓰레드가 끝났는지 아닌지 확인하는 메소드
System.out.println(submit.isDone()); // return false
// cancel(bool) : 해당 쓰레드 종료 명령. 이렇게 종료된 service는 get()으로 결과를 가져올 수 없다.
// true 넣으면 진행중인 작업을 interrupt하고 종료, false는 연산 끝날때까지 기다렸다가 종료
try {
String returns = submit.get();
System.out.println(returns);
// return hello
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Callable<String> world = () -> {
Thread.sleep(3000);
return "world";
};
Callable<String> java = () -> {
Thread.sleep(1000);
return "java";
};
try {
// 여러 개의 callable을 한번에 보내기. 리턴값으로 Future 리스트를 반환함.
// invokeAll : 투입한 모든 Callable이 전부 끝날 때까지 대기함.
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, world));
futures.forEach(stringFuture -> {
try {
System.out.println(stringFuture.get());
// return hello, java, world
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
// invokeAny : 모든 쓰레드가 끝날 때까지 기다리는 대신 뭐라도 하나 결과가 나오면 바로 리턴.
// 리턴타입도 Future 안에 정의된 가상클래스.
ExecutorService newExecutorService = Executors.newFixedThreadPool(4);
try {
// 쓰레드 4개, 메소드 3개 -> 각각의 스레드에 하나씩 메소드 할당됨.
// 가장 먼저 리턴되는 결과값을 Future<클래스>의 "클래스" 타입으로 반환한다.
String result = newExecutorService.invokeAny(Arrays.asList(hello, java, world));
System.out.println(result);
// return java
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
newExecutorService.shutdown();
}
}
CompletableFuture
비동기 프로그램에 가까운 코딩이 가능해진다.
Future의 단점
- 외부에서 완료시킬 수 없음. get()에 타임아웃을 설정하는 식으로 해결해야 함
- 블로킹 코드 (ex)get)을 실행하지 않으면 작업이 끝났을 때 콜백 실행이 불가능
- 여러 Future를 조합할 수 없다. (event 정보를 가져온 뒤, 이벤트에 참석하는 회원 목록 불러오기 같은 기능 구현이 불가능)
- 예외처리 API 없음
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(()->"hello");
// future 결과값을 가지고 어떤 작업을 하려면
// 무조건 get 코드 뒤에서 작업해야 함. = 비동기 코딩패턴을 만들기 어려웠음
future.get();
외부에서 Complete을 시킬 수 있어서 CompleteFuture.
package com.inspirit941.java8to11;
import java.util.concurrent.*;
public class testCompletableFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 명시적으로 Executor 생성할 필요 없음.
CompletableFuture<String> future = new CompletableFuture<>();
// future의 기본값을 hello로 명시.
future.complete("hello");
System.out.println(future.get());
// 위와 같은 기능을 수행하는 static 메소드.
// CompletableFuture<String> future2 = CompletableFuture.completedFuture("hello");
// 1. 리턴이 없는 경우 runAsync 메소드 사용.
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() ->{
System.out.println("hello " + Thread.currentThread().getName());
});
future3.get();
// 2. 리턴이 있는 경우 supplyAsync 사용.
// 콜백을 주는 방법: .thenApply()
// 따로 리턴을 하지 않을 경우 .thenAccept(Comsumer) 사용. 이 경우 CompleteFuture<Void>가 리턴타입이 됨.
// 인풋을 따로 받지 않고 리턴값도 없을 경우 .thenRun(Runnable).
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync hello " + Thread.currentThread().getName());
// currentThread 이름으로 ForkJoinPool이 리턴됨.
return "SupplyAsync";
}).thenApply((string) -> {
return string.toUpperCase();
});
System.out.println(stringCompletableFuture.get());
// Q. 따로 쓰레드 풀 정의도 하지 않았는데 어떻게 쓰레드를 생성해서 사용하는가
// A. ForkJoinPool 때문. 자바 7에서 도입되었고, Executor 구현체 중 하나.
// Deque 형태. 쓰레드가 자기 할 일이 없으면 직접 가져와서 처리 + 작업 단위를 서브테스크 단위로 쪼개서 다른 쓰레드에 분산처리 + join하는 것이라고 함
// 물론, ThreadPool을 직접 정의해 사용하는 것도 가능하다.
// runAsync나 supplyAsync 메소드의 두번째 인자값으로 executorService를 입력.
// thenRun이나 thenApply같은 콜백메소드에 사용할 경우
// 메소드 뒤에 Async 붙이고, 두번째 인자값으로 쓰레드 풀을 입력할 수 있음.
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
return "fixedThreadPool" + Thread.currentThread().getName();
}, executorService).thenRunAsync(() -> {
System.out.println("thenRun");
}, executorService);
System.out.println(future2.get());
executorService.shutdown();
}
}
여러 작업을 조합해 사용하기, 예외처리하기
package com.inspirit941.java8to11;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class testCompletableFuture2 {
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("world " + Thread.currentThread().getName());
return message + " world";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("hello " + Thread.currentThread().getName());
return "hello";
});
// 1. hello 끝나고 world해야 함. (의존성 존재하는 경우) : thenCompose
// 이전까지는 hello.get(); world.get(); 형태로 수행
// thenCompose 메소드를 사용할 수 있다.
CompletableFuture<String> future = hello.thenCompose(s -> getWorld(s));
System.out.println(future.get());
// 2. 둘 사이에 연관관계가 없는 경우 - 비동기적으로 실행 (따로따로) : thenCombine
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("world " + Thread.currentThread().getName());
return "world";
});
// hello와 world를 각각 실행한 후
// 그 결과를 받아 처리하는 인터페이스 사용 = BiFunction
CompletableFuture<String> future2 = hello.thenCombine(world, (h, w) -> {
return h + " " + w;
});
System.out.println(future2.get());
// 3. 2개 이상의 subtask를 모두 실행한 뒤에 다음 작업 진행하기
// allOf()
// 그런데, 각각의 subtask가 리턴타입이 다를 경우 / Interrupt로 종료될 경우 null을 리턴.
CompletableFuture.allOf(hello, world)
.thenAccept(System.out::println); // hello와 world 리턴타입이 다를 경우 null 리턴
// 해결법
List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
CompletableFuture[] futureArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<List<String>> results = CompletableFuture.allOf(futureArray)
.thenApply(v -> {
// thenApply로 넘어온 시점에서, 이미 쓰레드 각각의 연산은 다 끝난 상황.
return futures.stream()
// tip. get과 join은 기능상 동일, get보다 코드가 깔끔함.
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
// results는 모든 쓰레드 연산이 끝났을 때 결과값을 받아오게 됨.
// non-blocking방식으로 처리
results.get().forEach(System.out::println);
// 아무거나 먼저 결과나오는 것 빨리 받기. 출력하는 메소드 적용이므로 thenAccept
CompletableFuture<Void> voidCompletableFuture =
CompletableFuture.anyOf(hello, world)
.thenAccept((s) -> System.out.println(s));
voidCompletableFuture.get();
// 예외 처리
// 비동기처리 task에서 에러 발생 시
// exceptionally 메소드가 Exception 인자를 받은 뒤 필요한 로직을 처리하도록 생성.
// handle()의 경우 정상적으로 종료될 시 / 그렇지 않을 경우를 나눠 연
boolean throwError = true;
CompletableFuture<String> throwExceptionFuture = CompletableFuture.supplyAsync(() -> {
if (throwError) throw new IllegalArgumentException();
System.out.println("hello" + Thread.currentThread().getName());
return "hello";
}).handle((result, error) -> {
if (error != null) return "error";
return result;
});
// .exceptionally(ex -> {
// System.out.println(ex);
// return "error";
// });
System.out.println(throwExceptionFuture.get());
}
}
반응형