Created
July 5, 2016 15:58
-
-
Save ywelsch/8a5334cd59d922f5c48074fec578e71c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.elasticsearch.versioning; | |
import org.elasticsearch.ElasticsearchTimeoutException; | |
import org.elasticsearch.action.get.GetResponse; | |
import org.elasticsearch.action.index.IndexResponse; | |
import org.elasticsearch.cluster.ClusterState; | |
import org.elasticsearch.cluster.metadata.IndexMetaData; | |
import org.elasticsearch.cluster.routing.ShardRouting; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.common.util.set.Sets; | |
import org.elasticsearch.discovery.DiscoverySettings; | |
import org.elasticsearch.discovery.zen.ZenDiscovery; | |
import org.elasticsearch.discovery.zen.elect.ElectMasterService; | |
import org.elasticsearch.discovery.zen.fd.FaultDetection; | |
import org.elasticsearch.plugins.Plugin; | |
import org.elasticsearch.test.ESIntegTestCase; | |
import org.elasticsearch.test.disruption.NetworkDisconnectPartition; | |
import org.elasticsearch.test.disruption.NetworkPartition; | |
import org.elasticsearch.test.transport.MockTransportService; | |
import org.junit.Test; | |
import java.util.*; | |
import java.util.concurrent.BrokenBarrierException; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.CyclicBarrier; | |
import static java.lang.Thread.sleep; | |
import static org.hamcrest.Matchers.equalTo; | |
import static org.hamcrest.Matchers.greaterThan; | |
import static org.hamcrest.Matchers.is; | |
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | |
@ESIntegTestCase.SuppressLocalMode | |
public class VersionConsistencyIT extends ESIntegTestCase { | |
@Override | |
protected Collection<Class<? extends Plugin>> nodePlugins() { | |
final HashSet<Class<? extends Plugin>> classes = new HashSet<>(super.nodePlugins()); | |
classes.add(MockTransportService.TestPlugin.class); | |
return classes; | |
} | |
public void testVersionIsUniqueForEachValueFails() throws Throwable { | |
final Settings sharedSettings = Settings.builder() | |
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly | |
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly | |
.put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "10s") // still long to induce failures but to long so test won't time out | |
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly | |
.build(); | |
logger.info("--> start 3 nodes"); | |
String masterNode = internalCluster().startMasterOnlyNode(sharedSettings); | |
String dataNode1 = internalCluster().startDataOnlyNode(sharedSettings); | |
String dataNode2 = internalCluster().startDataOnlyNode(sharedSettings); | |
logger.info("--> wait for all nodes to join the cluster"); | |
ensureStableCluster(3); | |
client().admin().indices().prepareCreate("registers") | |
.addMapping("foo", "value", "type=text") | |
.setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1", IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") | |
.get(); | |
ensureGreen(); | |
ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); | |
String dataNode1Id = clusterState.getNodes().resolveNode(dataNode1).getId(); | |
String dataNode2Id = clusterState.getNodes().resolveNode(dataNode2).getId(); | |
boolean dataNode1HasPrimary = clusterState.getRoutingNodes().node(dataNode1Id).copyShards().get(0).primary(); | |
// isolate node with the primary | |
String isolatedNode = dataNode1HasPrimary ? dataNode1 : dataNode2; | |
Set<String> otherNodes = dataNode1HasPrimary ? Sets.newHashSet(masterNode, dataNode2) : Sets.newHashSet(masterNode, dataNode1); | |
NetworkPartition partition = new NetworkDisconnectPartition(otherNodes, Collections.singleton(isolatedNode), random()); | |
internalCluster().setDisruptionScheme(partition); | |
IndexResponse indexResponse = client().prepareIndex("registers", "foo", "bar").setSource("value", "orig").get(); | |
assertThat(indexResponse.isCreated(), equalTo(true)); | |
assertThat(indexResponse.getVersion(), equalTo(1L)); | |
assertThat(client().prepareGet("registers", "foo", "bar").get().getVersion(), equalTo(1L)); | |
logger.info("--> start disrupting network"); | |
partition.startDisrupting(); | |
try { | |
client(isolatedNode).prepareIndex("registers", "foo", "bar").setSource("value", "dirtyval").get("5s"); | |
fail("expected timeout exception"); | |
} catch (ElasticsearchTimeoutException elasticsearchTimeoutException) { | |
} | |
GetResponse dirtyRead = client(isolatedNode).prepareGet("registers", "foo", "bar").get(); | |
assertThat(dirtyRead.getSourceAsMap().get("value"), equalTo("dirtyval")); | |
assertThat(dirtyRead.getVersion(), equalTo(2L)); | |
String nonIsolatedDataNodeId = dataNode1HasPrimary ? dataNode2Id : dataNode1Id; | |
logger.info("--> wait for replica to be promoted to primary"); | |
assertBusy(() -> { | |
ShardRouting primaryShard = client(masterNode).admin().cluster().prepareState().get().getState().getRoutingTable() | |
.index("registers").shard(0).primaryShard(); | |
assertThat(primaryShard.currentNodeId(), equalTo(nonIsolatedDataNodeId)); | |
assertTrue(primaryShard.active()); | |
}); | |
client(masterNode).prepareIndex("registers", "foo", "bar").setSource("value", "somethingelse").get(); | |
logger.info("--> stop disrupting network"); | |
partition.stopDisrupting(); | |
logger.info("--> wait for index to become green again"); | |
ensureGreen(); | |
GetResponse freshRead = client().prepareGet("registers", "foo", "bar").get(); | |
assertThat(freshRead.getSourceAsMap().get("value"), equalTo("somethingelse")); | |
assertThat(freshRead.getVersion(), equalTo(3L)); // fails, version is 2 (same as the dirty read) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment