Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
Expand Down Expand Up @@ -71,27 +71,22 @@
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.mockito.Mockito;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;

public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {

@Parameter
public boolean isCompressionEnabled;
protected boolean isCompressionEnabled;

@Parameters(name = "{index}: isCompressionEnabled={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { false }, new Object[] { true });
protected TestBasicWALEntryStream(boolean isCompressionEnabled) {
this.isCompressionEnabled = isCompressionEnabled;
}

@Before
@BeforeEach
public void setUp() throws Exception {
CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
initWAL();
Expand All @@ -105,7 +100,7 @@ private WAL.Entry next(WALEntryStream entryStream) {
/**
* Tests basic reading of log appends
*/
@Test
@TestTemplate
public void testAppendsWithRolls() throws Exception {
appendToLogAndSync();
long oldPos;
Expand Down Expand Up @@ -160,7 +155,7 @@ oldPos, log, new MetricsSource("1"), fakeWalGroupId)) {
* Tests that if after a stream is opened, more entries come in and then the log is rolled, we
* don't mistakenly dequeue the current log thinking we're done with it
*/
@Test
@TestTemplate
public void testLogRollWhileStreaming() throws Exception {
appendToLog("1");
// 2
Expand Down Expand Up @@ -197,7 +192,7 @@ public void testLogRollWhileStreaming() throws Exception {
* Tests that if writes come in while we have a stream open, we shouldn't miss them
*/

@Test
@TestTemplate
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
Expand All @@ -219,7 +214,7 @@ public void testNewEntriesWhileStreaming() throws Exception {
}
}

@Test
@TestTemplate
public void testResumeStreamingFromPosition() throws Exception {
long lastPosition = 0;
appendToLog("1");
Expand All @@ -244,7 +239,7 @@ public void testResumeStreamingFromPosition() throws Exception {
* Tests that if we stop before hitting the end of a stream, we can continue where we left off
* using the last position
*/
@Test
@TestTemplate
public void testPosition() throws Exception {
long lastPosition = 0;
appendEntriesToLogAndSync(3);
Expand All @@ -263,23 +258,23 @@ public void testPosition() throws Exception {
}
}

@Test
@TestTemplate
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
assertEquals(HasNext.RETRY, entryStream.hasNext());
}
}

@Test
@TestTemplate
public void testWALKeySerialization() throws Exception {
Map<String, byte[]> attributes = new HashMap<String, byte[]>();
attributes.put("foo", Bytes.toBytes("foo-value"));
attributes.put("bar", Bytes.toBytes("bar-value"));
WALKeyImpl key =
new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(),
new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes);
Assert.assertEquals(attributes, key.getExtendedAttributes());
assertEquals(attributes, key.getExtendedAttributes());

WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
WALProtos.WALKey serializedKey = builder.build();
Expand All @@ -288,14 +283,14 @@ public void testWALKeySerialization() throws Exception {
deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());

// equals() only checks region name, sequence id and write time
Assert.assertEquals(key, deserializedKey);
assertEquals(key, deserializedKey);
// can't use Map.equals() because byte arrays use reference equality
Assert.assertEquals(key.getExtendedAttributes().keySet(),
assertEquals(key.getExtendedAttributes().keySet(),
deserializedKey.getExtendedAttributes().keySet());
for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) {
Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
}
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
}

private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf)
Expand Down Expand Up @@ -345,7 +340,7 @@ private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numF
return reader;
}

@Test
@TestTemplate
public void testReplicationSourceWALReader() throws Exception {
appendEntriesToLogAndSync(3);
// get ending position
Expand Down Expand Up @@ -376,7 +371,7 @@ public void testReplicationSourceWALReader() throws Exception {
assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
}

@Test
@TestTemplate
public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
appendEntriesToLogAndSync(3);
// get ending position
Expand Down Expand Up @@ -405,7 +400,7 @@ public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
assertEquals(3, entryBatch.getNbRowKeys());
}

@Test
@TestTemplate
public void testReplicationSourceWALReaderRecovered() throws Exception {
appendEntriesToLogAndSync(10);
Path walPath = getQueue().peek();
Expand Down Expand Up @@ -438,7 +433,7 @@ public void testReplicationSourceWALReaderRecovered() throws Exception {
}

// Testcase for HBASE-20206
@Test
@TestTemplate
public void testReplicationSourceWALReaderWrongPosition() throws Exception {
appendEntriesToLogAndSync(1);
Path walPath = getQueue().peek();
Expand All @@ -465,8 +460,8 @@ public String explainFailure() throws Exception {
assertEquals(walPath, entryBatch.getLastWalPath());

long walLength = fs.getFileStatus(walPath).getLen();
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is "
+ walLength, entryBatch.getLastWalPosition() <= walLength);
assertTrue(entryBatch.getLastWalPosition() <= walLength, "Position "
+ entryBatch.getLastWalPosition() + " is out of range, file length is " + walLength);
assertEquals(1, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());

Expand All @@ -490,7 +485,7 @@ public String explainFailure() throws Exception {
assertFalse(entryBatch.isEndOfFile());
}

@Test
@TestTemplate
public void testReplicationSourceWALReaderDisabled()
throws IOException, InterruptedException, ExecutionException {
appendEntriesToLogAndSync(3);
Expand Down Expand Up @@ -598,7 +593,7 @@ public static int numFailures() {
}
}

@Test
@TestTemplate
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
appendToLog("1");
appendToLog("2");
Expand All @@ -625,7 +620,7 @@ public void testReadBeyondCommittedLength() throws IOException, InterruptedExcep
* Test removal of 0 length log from logQueue if the source is a recovered source and size of
* logQueue is only 1.
*/
@Test
@TestTemplate
public void testEOFExceptionForRecoveredQueue() throws Exception {
// Create a 0 length log.
Path emptyLog = new Path("emptyLog");
Expand Down Expand Up @@ -656,7 +651,7 @@ public void testEOFExceptionForRecoveredQueue() throws Exception {
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
}

@Test
@TestTemplate
public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
Configuration conf = new Configuration(CONF);
MetricsSource metrics = mock(MetricsSource.class);
Expand All @@ -681,14 +676,14 @@ public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception
// Create a reader thread.
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
getDummyFilter(), source, fakeWalGroupId);
assertEquals("Initial log queue size is not correct", 2,
localLogQueue.getQueueSize(fakeWalGroupId));
assertEquals(2, localLogQueue.getQueueSize(fakeWalGroupId),
"Initial log queue size is not correct");
reader.start();
reader.join();

// remove empty log from logQueue.
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId), "Log queue should be empty");
}

private PriorityBlockingQueue<Path> getQueue() {
Expand All @@ -713,7 +708,7 @@ private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOE
/***
* Tests size of log queue is incremented and decremented properly.
*/
@Test
@TestTemplate
public void testSizeOfLogQueue() throws Exception {
// There should be always 1 log which is current wal.
assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
Expand Down Expand Up @@ -744,7 +739,7 @@ public void testSizeOfLogQueue() throws Exception {
* Tests that wals are closed cleanly and we read the trailer when we remove wal from
* WALEntryStream.
*/
@Test
@TestTemplate
public void testCleanClosedWALs() throws Exception {
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 0, log,
logQueue.getMetrics(), fakeWalGroupId)) {
Expand All @@ -761,7 +756,7 @@ public void testCleanClosedWALs() throws Exception {
/**
* Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
*/
@Test
@TestTemplate
public void testEOFExceptionInOldWALsDirectory() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
Expand Down Expand Up @@ -805,7 +800,7 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception {
* decreased because {@link WALEntryBatch} is not put to
* {@link ReplicationSourceWALReader#entryBatchQueue}.
*/
@Test
@TestTemplate
public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
appendEntriesToLogAndSync(3);
// get ending position
Expand Down Expand Up @@ -844,7 +839,7 @@ public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() thr
}

// testcase for HBASE-28748
@Test
@TestTemplate
public void testWALEntryStreamEOFRightAfterHeader() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
Expand Down Expand Up @@ -887,7 +882,7 @@ public void testWALEntryStreamEOFRightAfterHeader() throws Exception {
* WALEntryFilterRetryableException does not cause the new file to be opened at the old file's
* position.
*/
@Test
@TestTemplate
public void testPositionResetOnFileSwitchWithRetryableFilter() throws Exception {
appendEntriesToLogAndSync(3);
log.rollWriter();
Expand Down Expand Up @@ -922,15 +917,15 @@ public void testPositionResetOnFileSwitchWithRetryableFilter() throws Exception
long deadline = System.currentTimeMillis() + 30000;
while (totalEntries < 6) {
long remaining = deadline - System.currentTimeMillis();
assertTrue("Reader appears stuck - likely position corruption. Only got " + totalEntries
+ " of 6 entries", remaining > 0);
assertTrue(remaining > 0, "Reader appears stuck - likely position corruption. Only got "
+ totalEntries + " of 6 entries");
WALEntryBatch batch = reader.poll(1);
if (batch != null && batch != WALEntryBatch.NO_MORE_DATA) {
totalEntries += batch.getNbEntries();
}
}
assertEquals(6, totalEntries);
assertTrue("Filter should have thrown at least once", threwOnce.get());
assertTrue(threwOnce.get(), "Filter should have thrown at least once");
}

private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,31 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.provider.Arguments;

/**
* TestBasicWALEntryStream with {@link AsyncFSWALProvider} as the WAL provider.
*/
@RunWith(Parameterized.class)
@Category({ ReplicationTests.class, MediumTests.class })
@Tag(ReplicationTests.TAG)
@Tag(MediumTests.TAG)
@HBaseParameterizedTestTemplate(name = "{index}: isCompressionEnabled={0}")
public class TestBasicWALEntryStreamAsyncFSWAL extends TestBasicWALEntryStream {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBasicWALEntryStreamAsyncFSWAL.class);
public TestBasicWALEntryStreamAsyncFSWAL(boolean isCompressionEnabled) {
super(isCompressionEnabled);
}

public static Stream<Arguments> parameters() {
return Stream.of(Arguments.of(false), Arguments.of(true));
}

@BeforeClass
@BeforeAll
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, AsyncFSWALProvider.class,
AbstractFSWALProvider.class);
Expand Down
Loading
Loading