Understanding Java Utils of Concurrency by Examples

Atomic

AtomicBoolean

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @program: aqs
* @description: AtomicBoolean Example 1
* @author: CJ Sun
* @create: 2019-04-29 01:25
**/

public class AtomicBooleanExample1 {
static int threadNum = 800;
static CountDownLatch latch = new CountDownLatch(threadNum);

static class DemoTask implements Runnable {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

SingletonService.getInstance("SingletonService");

latch.countDown();
}
}

static class SingletonService {
private String name = "";
private static AtomicBoolean state = new AtomicBoolean(false);
private static SingletonService instance;

private SingletonService(String name) {
this.name = name;
System.out.println(name + " initialized");
}

public static SingletonService getInstance(String name) {
if (state.compareAndSet(false, true)) {
instance = new SingletonService(name);
}

while(!state.get()){
Thread.yield();
}

return instance;
}

public void serve() {
System.out.println(name + " starts to serve");
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
exec.submit(new DemoTask());
}

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

SingletonService.getInstance("SingletonService").serve();

exec.shutdown();
}
}

AtomicInteger

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @program: aqs
* @description: AtomicInteger Example 1
* @author: CJ Sun
* @create: 2019-04-29 01:26
**/

public class AtomicIntegerExample1 {
static int threadNum = 200;
static CountDownLatch latch = new CountDownLatch(threadNum);
static AtomicInteger counter = new AtomicInteger(0);

static class DemoTask implements Runnable {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

int rst = counter.addAndGet(1);
System.out.println("Do addition, counter is " + rst);

latch.countDown();
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
exec.submit(new DemoTask());
}

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("final counter is " + counter.get() + ", expected is " + threadNum);
exec.shutdown();
}
}

AtomicReference

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceExample1 {

static class Person {
volatile long id;

public Person(long id) {
this.id = id;
}
}

public static void main(String[] args) {
Person p1 = new Person(1);
Person p2 = new Person(1);

AtomicReference ar = new AtomicReference(p1);
System.out.println("ar.compareAndSet(p1, p2) = " + ar.compareAndSet(p1, p2));

ar = new AtomicReference(p2);
System.out.println("ar.compareAndSet(p1, p2) = " + ar.compareAndSet(p1, p2));
}
}

AtomicStampedReference

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicStampedReference;

public class AtomicStampedReferenceExample1 {
private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);

static class DemoTaskA implements Runnable {

@Override
public void run() {
int ts = atomicStampedRef.getStamp();

try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}

atomicStampedRef.compareAndSet(100, 101, ts, ts + 1);
atomicStampedRef.compareAndSet(101, 100, ts, ts + 1);
}
}

static class DemoTaskB implements Runnable {

@Override
public void run() {
int ts = atomicStampedRef.getStamp();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

boolean c3 = atomicStampedRef.compareAndSet(100, 101, ts, ts+ 1);

System.out.println(c3); // false
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();

exec.submit(new DemoTaskA());
exec.submit(new DemoTaskB());

exec.shutdown();
}
}

Queue

ArrayBlockingQueue

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ArrayBlockingQueueExample1 {
static BlockingQueue queue = new ArrayBlockingQueue(10);

static class ProducinngTask implements Runnable {
@Override
public void run() {
while (true) {
try {
String threadName = Thread.currentThread().getName();
Thread.sleep((long) (Math.random() * 100));

queue.put(1);

System.out.println(threadName + " produce");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class ConsumingTask implements Runnable {
@Override
public void run() {
while (true) {
try {
String threadName = Thread.currentThread().getName();
Thread.sleep(1000);

queue.take();

System.out.println(threadName + " comsume");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();

exec.submit(new ProducinngTask());
exec.submit(new ConsumingTask());

exec.shutdown();
}
}

DelayQueue

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import java.util.Random;
import java.util.concurrent.*;


/**
* @program: aqs
* @description: DelayQueue Example 1
* @author: CJ Sun
* @create: 2019-04-29 07:57
**/

public class DelayQueueExample1 {

static class CacheQueue<K, V> {

public DelayQueue<CacheItem<K>> leaseQueue = new DelayQueue<>();

public void put(K k, V v, long ttl) {
CacheItem<K> tmpItem = new CacheItem<>(k, ttl);
leaseQueue.put(tmpItem);
}

class CacheCleanTask extends Thread {
@Override
public void run() {
while (true) { // remove expired keys
while(true) {
CacheItem<K> cacheItem = leaseQueue.poll();
if (cacheItem == null) {
break;
}

System.out.println(System.nanoTime() + " remove " + cacheItem.getT() + " from cache");
}

try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public CacheQueue() {
CacheCleanTask task = new CacheCleanTask();
task.setDaemon(true);

ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new CacheCleanTask());

exec.shutdown();
}

class CacheItem<T> implements Delayed {

private T t;
private long ttl;
private long removeTime;

public CacheItem(T t, long ttl) {
this.setT(t);
this.ttl = ttl;
this.removeTime = System.nanoTime() + ttl;
}

@Override
public int compareTo(Delayed o) {
CacheItem<T> tmpCacheItem = (CacheItem<T>) o;
if (ttl > tmpCacheItem.ttl) {
return 1;
}
if (ttl == tmpCacheItem.ttl) {
return 0;
}

return -1;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.nanoTime(), unit);
}

public T getT() {
return t;
}

public void setT(T t) {
this.t = t;
}
}

public static void main(String[] args) throws InterruptedException {
Random random = new Random();
CacheQueue<Integer, Integer> cache = new CacheQueue<>();

int cacheSize = 1000000;
for (int i = 0; i < cacheSize; i++) {
int ttl = random.nextInt(3000000);
System.out.println("put " + i + " ttl: " + ttl);

cache.put(i, i, ttl);

Thread.sleep(100);
}
}
}
}

Callable

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableExample1 {

static class CallableTask implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Callable starting task");
return "Callable done";
}
}

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
Future<String> future = exec.submit(new CallableTask());
System.out.println("Get Callable result:" + future.get());
exec.shutdown();
}
}

Example 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.*;

public class CallableExample2 {

static class CallableTask implements Callable<String> {
public String call() throws Exception {
System.out.println("Callable starting task");
return "Callable done";
}
}

public static void main(String[] args) {
FutureTask<String> future = new FutureTask<>(new CallableTask());
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Thread(future));

try {
System.out.println("Get Callable result:" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

exec.shutdown();
}
}

Lock

CountDownLatch

Example 1

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted, or the specified waiting time elapses.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchExample1 {
static int threadNumTotal = 8;
static CountDownLatch countDownLatch = new CountDownLatch(threadNumTotal);

static class DemoTask implements Runnable {
@Override
public void run() {
System.out.println("Running thread: " + Thread.currentThread().getName());

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newCachedThreadPool();
for (int index = 0; index < threadNumTotal; index++) {
threadPool.execute(new DemoTask());
}

// countDownLatch.await(); // There could be deadlock without any timeout mechanism
countDownLatch.await(300, TimeUnit.MICROSECONDS); // At least, this way will never enter dead lock

threadPool.shutdown(); // Shutdown the thread pool

System.out.println("All Threads Done.");
}
}

CylicBarrier

Example 1

Waits until all {@linkplain #getParties parties} have invoked await on this barrier, or the specified waiting time elapses. If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens:

  1. The last thread arrives; or
  2. The specified timeout elapses; or
  3. Some other thread interrupts the current thread; or Some other thread interrupts one of the other waiting threads; or
  4. Some other thread times out while waiting for barrier; or
  5. Some other thread invokes reset on this barrier.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.*;

public class CyclicBarrierExample1 {

static CyclicBarrier barrier = new CyclicBarrier(10);

static class DemoTask implements Runnable {
@Override
public void run() {
String tn = Thread.currentThread().getName();

System.out.println(tn + " is ready, waiting for others");
try {
barrier.await(1000, TimeUnit.MILLISECONDS);
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}

System.out.println(tn + " continues");
}
}

public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
threadPool.execute(new DemoTask());
}

threadPool.shutdown();
System.out.println("All Thread Done");
}
}

ReentrantLock

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
* @program: aqs
* @description: ReentrantLockExample1
* @author: CJ Sun
* @create: 2019-04-28 18:15
**/

public class ReentrantLockExample1 {
static CountDownLatch latch = new CountDownLatch(2);
static ReentrantLock reentrantLock = new ReentrantLock();

static AtomicInteger atInt = new AtomicInteger(0);
static class DemoTask implements Runnable {
String threadName = "";
public DemoTask(String name) {
threadName = name;
}

@Override
public void run() {
int count = 1000;
while(count-- > 0) {
reentrantLock.lock();
System.out.println(threadName + " add and get " + atInt.addAndGet(1));
reentrantLock.unlock();
}

latch.countDown();
}
}

public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new DemoTask("A"));
Thread t2 = new Thread(new DemoTask("B"));

t1.start();
t2.start();

t1.join();
t2.join();

System.out.println("All Tasks Done");
}
}

Example 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample2 {
static ReentrantLock reentrantLock = new ReentrantLock();
static Condition condition = reentrantLock.newCondition();
static AtomicInteger atInt = new AtomicInteger(0);

static class DemoTask implements Runnable {
String threadName = "";
public DemoTask(String name) {
threadName = name;
}

@Override
public void run() {
int count = 1000;
reentrantLock.lock();

while(count-- > 0) {
System.out.println(threadName + " add and get " + atInt.addAndGet(1));

try {
condition.signal();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

condition.signal();
reentrantLock.unlock();
}
}

public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new DemoTask("A"));
Thread t2 = new Thread(new DemoTask("B"));

t1.start();
t2.start();

t1.join();
t2.join();

System.out.println("All Tasks Done");
}
}

ReentrantReadWriteLockExample

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReentrantReadWriteLockExample1 {

static class ReadWriteQueue {
private Object data = null;
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

public void get() {
rwl.readLock().lock();
System.out.println(Thread.currentThread().getName() + " is ready to read data!");

try {
Thread.sleep( 5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + " read data :" + data);
rwl.readLock().unlock();
}

public void put(Object data) {
rwl.writeLock().lock();
System.out.println(Thread.currentThread().getName() + " is ready to write data!");

try {
Thread.sleep( 5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

this.data = data;
System.out.println(Thread.currentThread().getName() + " write data: " + data);
rwl.writeLock().unlock();
}
}

static ReadWriteQueue rwq = new ReadWriteQueue();

static class ReadTask implements Runnable {

@Override
public void run() {
while (true) {
rwq.get();
}
}
}

static class WriteTask implements Runnable {

@Override
public void run() {
while (true) {
rwq.put(new Random().nextInt(1000));
}
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();

exec.submit(new ReadTask());
exec.submit(new ReadTask());
exec.submit(new ReadTask());
exec.submit(new WriteTask());
exec.submit(new WriteTask());
exec.submit(new WriteTask());

exec.shutdown();
}
}

Semaphore

Example 1

Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is interrupted.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class SemaphoreExample1 {
private static int lockSize = 20;
static Semaphore semp = new Semaphore(lockSize);

static AtomicInteger atInc = new AtomicInteger(0);

static class DemoRunnable implements Runnable {
@Override
public void run() {
System.out.println("Running thread: " + Thread.currentThread().getName());

try {
semp.acquire();

String ct = Thread.currentThread().getName();
System.out.println("Thread: " + ct + " acquires the lock, executinng task " + atInc.addAndGet(1));

semp.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();

for (int index = 0; index < 2000; index++) {
exec.execute(new DemoRunnable());
}

exec.shutdown();
}
}

Example 2

Acquires the given number of permits from this semaphore, only if all are available at the time of invocation. If insufficient permits are available then this method will return immediately with the value {@code false} and the number of available permits is unchanged.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class SemaphoreExample2 {
private static int lockSize = 20;
static Semaphore semp = new Semaphore(lockSize);
static AtomicInteger atInc = new AtomicInteger(0);

static class DemoTask implements Runnable {
@Override
public void run() {
String ct = Thread.currentThread().getName();
System.out.println("Running thread: " + ct);

int permits = 3;
if (!semp.tryAcquire(permits)) {
System.err.println("Thread: " + ct + " fails to acquire the lock(s), executing task " + atInc.addAndGet(1));
} else {
System.out.println("Thread: " + ct + " acquires the lock(s), executing task " + atInc.addAndGet(1));
semp.release(permits);
}
}
}

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();

for (int index = 0; index < 1000; index++) {
exec.execute(new SemaphoreExample2.DemoTask());
}

exec.shutdown();
}
}

Example 3

Acquires the given number of permits from this semaphore, if all become available within the given waiting time and the current thread has not been interrupted. If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

  1. Sufficient permits assigned.
  2. Some other thread interrupts the current thread
  3. The specified waiting time elapses
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class SemaphoreExample3 {
private static int lockSize = 20;
static Semaphore semp = new Semaphore(lockSize);
static AtomicInteger atInc = new AtomicInteger(0);

static class DemoTask implements Runnable {
@Override
public void run() {
String ct = Thread.currentThread().getName();
System.out.println("Running thread: " + ct);

try {
int permits = 3;
int timeoutInMilliSecond = 1000;
if (!semp.tryAcquire(permits, timeoutInMilliSecond, TimeUnit.MILLISECONDS)) {
System.err.println("Thread: " + ct + " fails to acquire the lock(s), executing task " + atInc.addAndGet(1));
} else {
System.out.println("Thread: " + ct + " acquires the lock(s), executing task " + atInc.addAndGet(1));
semp.release(permits);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();

for (int index = 0; index < 1000; index++) {
exec.execute(new SemaphoreExample2.DemoTask());
}

exec.shutdown();
}

Exchanger

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExchangerExample1 {
private static volatile boolean isDone = false;

static class DemoTask implements Runnable {
private String role = "";
private Exchanger<String> exchanger;

DemoTask(String role, Exchanger<String> exchanger) {
this.role = role;
this.exchanger = exchanger;
}

@Override
public void run() {
while (!Thread.interrupted() && !isDone) {
for (int i = 1; i <= 30; i++) {
try {
Thread.sleep(1000);
System.out.println(role + " send message " + role);
String data = exchanger.exchange(role);
System.out.println(role + " receive message " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isDone = true;
}
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<String> exchanger = new Exchanger<>();
DemoTask producer = new DemoTask("A", exchanger);
DemoTask consumer = new DemoTask("B", exchanger);
exec.execute(producer);
exec.execute(consumer);
exec.shutdown();
try {
exec.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

ForkJoinPoll

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPollExample1 {
static class SumTask extends RecursiveTask<Integer> {
private int start;
private int end;

public SumTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;

boolean canDivide = (end - start) <= 10;
if (canDivide) {
for (int i = start; i <= end; i++) {
sum += i;
}
}

if (!canDivide){
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(start, middle);
SumTask rightTask = new SumTask(middle + 1, end);

leftTask.fork();
rightTask.fork();

int leftResult = leftTask.join();
int rightResult = rightTask.join();

sum = leftResult + rightResult;
}

return sum;
}

}
public static void main(String[] args) {
ForkJoinPool poll = new ForkJoinPool();

SumTask task = new SumTask(1, 100);

Future<Integer> result = poll.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

References