diff --git a/src/main/java/com/metamx/emitter/core/Batch.java b/src/main/java/com/metamx/emitter/core/Batch.java index b9d2360..84bc850 100644 --- a/src/main/java/com/metamx/emitter/core/Batch.java +++ b/src/main/java/com/metamx/emitter/core/Batch.java @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Buffer for batched data + synchronization state. @@ -73,6 +75,11 @@ private static boolean isEmittingAllowed(long state) private static final long UNLOCK_AND_SEAL_TAG = 1; private static final long SEAL_TAG = 2; + /** + * Lock for guarding the firstEventTimestamp. + */ + private final ReadWriteLock firstEventLock = new ReentrantReadWriteLock(); + /** * The emitter this batch belongs to. */ @@ -145,6 +152,7 @@ private boolean tryAddFirstEvent(byte[] event) return false; } try { + firstEventLock.writeLock().lock(); int bufferOffset = emitter.batchingStrategy.writeBatchStart(buffer); writeEvent(event, bufferOffset); eventCount.incrementAndGet(); @@ -152,6 +160,7 @@ private boolean tryAddFirstEvent(byte[] event) return true; } finally { + firstEventLock.writeLock().unlock(); unlock(); } } @@ -212,8 +221,11 @@ private void unlockAndSealIfNeeded() if (eventCount.incrementAndGet() >= emitter.config.getFlushCount()) { unlockAndSeal(); } else { - long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestamp; - if (firstEventTimestamp > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) { + firstEventLock.readLock().lock(); + long firstEventTimestampValue = firstEventTimestamp; + firstEventLock.readLock().unlock(); + long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestampValue; + if (firstEventTimestampValue > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) { unlockAndSeal(); } else { unlock(); @@ -222,8 +234,11 @@ private void unlockAndSealIfNeeded() } void sealIfFlushNeeded() { - long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestamp; - if (firstEventTimestamp > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) { + firstEventLock.readLock().lock(); + long firstEventTimestampValue = firstEventTimestamp; + firstEventLock.readLock().unlock(); + long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestampValue; + if (firstEventTimestampValue > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) { seal(); } } @@ -275,9 +290,12 @@ protected boolean tryReleaseShared(long tag) if (compareAndSetState(state, newState)) { // Ensures only one thread calls emitter.onSealExclusive() for each batch. if (!isSealed(state)) { + firstEventLock.readLock().lock(); + long firstEventTimestampValue = firstEventTimestamp; + firstEventLock.readLock().unlock(); emitter.onSealExclusive( this, - firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1 + firstEventTimestampValue > 0 ? System.currentTimeMillis() - firstEventTimestampValue : -1 ); } return isEmittingAllowed(newState);