Skip to content

Commit

Permalink
Waiting threads wait on an object
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang committed Aug 14, 2023
1 parent e2a736d commit 3d46b41
Showing 1 changed file with 23 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -222,8 +221,8 @@ class MetricStoreImpl implements MetricStore {
private final Set<Integer> identities = new ConcurrentSkipListSet<>();

private volatile Map<MetricSensor, BiConsumer<Integer, Exception>> lastSensors = Map.of();
// Thread ids and latches for detecting cluster bean changing.
private final Map<Long, CountDownLatch> waitingList = new ConcurrentHashMap<>();
// Monitor for detecting cluster bean changing.
private final Object beanUpdateMonitor = new Object();
// For mbean register. To distinguish mbeans of different metricStore.
private final String uid = Utils.randomString();
private final Sensor<Long> beanReceivedSensor =
Expand Down Expand Up @@ -293,8 +292,10 @@ private MetricStoreImpl(
if (!allBeans.isEmpty()) {
// generate new cluster bean
updateClusterBean();
// Tell all waiting threads that cluster bean has been changed
this.waitingList.values().forEach(CountDownLatch::countDown);
// Tell waiting threads that cluster bean has been changed
synchronized (beanUpdateMonitor) {
beanUpdateMonitor.notifyAll();
}
}
});
} catch (Exception e) {
Expand Down Expand Up @@ -352,39 +353,30 @@ public void close() {
}

/**
* User thread will "wait" until checker pass or timeout. First, register a latch to the waiting
* list. Second run the checker with current clusterBean. If the checker passes, done.
* Otherwise, wait for the cluster bean changing (/the latch counted down) and try again.
* User thread will wait until checker pass or timeout. When cluster bean has changed, the
* waiting threads will be notified.
*/
@Override
public void wait(Predicate<ClusterBean> checker, Duration duration) {
long timeout = System.currentTimeMillis() + duration.toMillis();
var threadId = Thread.currentThread().getId();
// For first check, we don't need to wait.
var latch = new CountDownLatch(0);
try {
while (System.currentTimeMillis() < timeout) {
try {
// Wait for clusterBean being updated
if (!latch.await(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Timeout waiting for the checker");
}
// Add new latch for detecting clusterBean updated
latch = new CountDownLatch(1);
this.waitingList.put(threadId, latch);

// Return if check pass.
if (checker.test(clusterBean())) return;
} catch (NoSufficientMetricsException e) {
// Check failed. Try again next time.
} catch (InterruptedException ie) {
throw new IllegalStateException("Interrupted while waiting for the checker");
if (checker.test(clusterBean())) return;

while (System.currentTimeMillis() < timeout) {
try {
synchronized (beanUpdateMonitor) {
// Release the lock and wait for clusterBean being updated
this.beanUpdateMonitor.wait(timeout - System.currentTimeMillis());
// Tell other threads clusterBean has been updated
beanUpdateMonitor.notifyAll();
}
if (checker.test(clusterBean())) return;
} catch (NoSufficientMetricsException e) {
// Check failed. Try again next time.
} catch (InterruptedException ie) {
throw new IllegalStateException("Interrupted while waiting for the checker");
}
throw new IllegalStateException("Timeout waiting for the checker");
} finally {
this.waitingList.remove(threadId);
}
throw new IllegalStateException("Timeout waiting for the checker");
}

private void updateClusterBean() {
Expand Down

0 comments on commit 3d46b41

Please sign in to comment.