マルチスレッドプログラミングの基本
最近、並行処理ではまった所をメモ
CountDownLatchの使い方が曖昧だったのでそこも兼ねて書いた
AbstractCounter
package jp.co.matsuokah.counter; import java.util.concurrent.atomic.AtomicInteger; /** * */ public abstract class AbstractCounter implements Counter { protected AtomicInteger counter = new AtomicInteger(); /** limit到達時のハンドリング */ protected void limitHandle() throws Exception { throw new Exception("reached the limit."); } /** limitに達しているかの検出 */ protected void isLimiting() throws Exception { if (counter.get() >= LIMIT) { limitHandle(); } } }
SafeCounter
package jp.co.matsuokah.counter; import jp.co.matsuokah.counter.AbstractCounter; /** * スレッドセーフなカウンター */ public class SafeCounter extends AbstractCounter { private final Object lock = new Object(); @Override public void countUp() throws Exception { synchronized (lock) { isLimiting(); counter.incrementAndGet(); } } @Override public int getCount() { return counter.get(); } }
UnsafeCounter
package jp.co.matsuokah.counter; /** * スレッドセーフではないカウンター */ public class UnsafeCounter extends AbstractCounter { @Override public void countUp() throws Exception { isLimiting(); counter.incrementAndGet(); } @Override public int getCount() { return counter.get(); } }
Tester
package jp.co.matsuokah; import jp.co.matsuokah.counter.Counter; import java.util.concurrent.CountDownLatch; /** * テストクラス */ public class Tester { public void test(final Counter counter) throws InterruptedException { final int threadNum = 1000; final CountDownLatch startSignal = new CountDownLatch(1); final CountDownLatch endSignal = new CountDownLatch(threadNum); long timestamp = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { new Thread(new Runnable() { @Override public void run() { try { startSignal.await(); } catch (InterruptedException e) { e.printStackTrace(); } // threadNum回分countUpが失敗する for (int j = 0; j <= (Counter.LIMIT / threadNum); j++) { try { counter.countUp(); } catch (Exception e) { } } endSignal.countDown(); } }).start(); } startSignal.countDown(); //すべてのスレッドの処理をスタートさせる endSignal.await(); //すべてのスレッドの処理の終了を待つ System.out.println(counter.getClass() + ": " + counter.getCount()); System.out.println("実行時間: " + String.valueOf(System.currentTimeMillis() - timestamp)); } }
Main
package jp.co.matsuokah; import jp.co.matsuokah.counter.SafeCounter; import jp.co.matsuokah.counter.UnsafeCounter; /** * */ public class Main { public static void main(String[] args) throws InterruptedException { Tester tester = new Tester(); tester.test(new SafeCounter()); tester.test(new UnsafeCounter()); } }
実行結果
class jp.co.matsuokah.counter.SafeCounter: 1000000 実行時間: 259 class jp.co.matsuokah.counter.UnsafeCounter: 1000000 実行時間: 173 class jp.co.matsuokah.counter.SafeCounter: 1000000 実行時間: 270 class jp.co.matsuokah.counter.UnsafeCounter: 1000002 実行時間: 178 class jp.co.matsuokah.counter.SafeCounter: 1000000 実行時間: 231 class jp.co.matsuokah.counter.UnsafeCounter: 1000000 実行時間: 171 class jp.co.matsuokah.counter.SafeCounter: 1000000 実行時間: 226 class jp.co.matsuokah.counter.UnsafeCounter: 1000001 実行時間: 170
isLimitingとcountUpの協調
countUpとisLimitingはスレッド間の共有オブジェクトであるcounterにアクセスしていて、
同時にisLimitingメソッドが実行されてしまい、2つ以上のスレッドがそこを通り抜けて、インクリメントされてしまうために、予期せぬ振る舞いをしてしまう。
countUpとisLimitingは1つのトランザクションとして考えるべきで、1つのスレッドがカウントアップする間に他のスレッドでcounterの値に依存した処理をさせない事がスレッドセーフへの道筋。
呼び出し元でgetCountとcountUpが協調するロジックを実装しているならgetCountもsynchronized(lock)する必要がある。