Skip to content

Instantly share code, notes, and snippets.

@ywelsch
Created July 5, 2016 15:58
Show Gist options
  • Save ywelsch/8a5334cd59d922f5c48074fec578e71c to your computer and use it in GitHub Desktop.
Save ywelsch/8a5334cd59d922f5c48074fec578e71c to your computer and use it in GitHub Desktop.
package org.elasticsearch.versioning;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
import org.elasticsearch.test.disruption.NetworkPartition;
import org.elasticsearch.test.transport.MockTransportService;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import static java.lang.Thread.sleep;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class VersionConsistencyIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
final HashSet<Class<? extends Plugin>> classes = new HashSet<>(super.nodePlugins());
classes.add(MockTransportService.TestPlugin.class);
return classes;
}
public void testVersionIsUniqueForEachValueFails() throws Throwable {
final Settings sharedSettings = Settings.builder()
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
.put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
.build();
logger.info("--> start 3 nodes");
String masterNode = internalCluster().startMasterOnlyNode(sharedSettings);
String dataNode1 = internalCluster().startDataOnlyNode(sharedSettings);
String dataNode2 = internalCluster().startDataOnlyNode(sharedSettings);
logger.info("--> wait for all nodes to join the cluster");
ensureStableCluster(3);
client().admin().indices().prepareCreate("registers")
.addMapping("foo", "value", "type=text")
.setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1", IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.get();
ensureGreen();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String dataNode1Id = clusterState.getNodes().resolveNode(dataNode1).getId();
String dataNode2Id = clusterState.getNodes().resolveNode(dataNode2).getId();
boolean dataNode1HasPrimary = clusterState.getRoutingNodes().node(dataNode1Id).copyShards().get(0).primary();
// isolate node with the primary
String isolatedNode = dataNode1HasPrimary ? dataNode1 : dataNode2;
Set<String> otherNodes = dataNode1HasPrimary ? Sets.newHashSet(masterNode, dataNode2) : Sets.newHashSet(masterNode, dataNode1);
NetworkPartition partition = new NetworkDisconnectPartition(otherNodes, Collections.singleton(isolatedNode), random());
internalCluster().setDisruptionScheme(partition);
IndexResponse indexResponse = client().prepareIndex("registers", "foo", "bar").setSource("value", "orig").get();
assertThat(indexResponse.isCreated(), equalTo(true));
assertThat(indexResponse.getVersion(), equalTo(1L));
assertThat(client().prepareGet("registers", "foo", "bar").get().getVersion(), equalTo(1L));
logger.info("--> start disrupting network");
partition.startDisrupting();
try {
client(isolatedNode).prepareIndex("registers", "foo", "bar").setSource("value", "dirtyval").get("5s");
fail("expected timeout exception");
} catch (ElasticsearchTimeoutException elasticsearchTimeoutException) {
}
GetResponse dirtyRead = client(isolatedNode).prepareGet("registers", "foo", "bar").get();
assertThat(dirtyRead.getSourceAsMap().get("value"), equalTo("dirtyval"));
assertThat(dirtyRead.getVersion(), equalTo(2L));
String nonIsolatedDataNodeId = dataNode1HasPrimary ? dataNode2Id : dataNode1Id;
logger.info("--> wait for replica to be promoted to primary");
assertBusy(() -> {
ShardRouting primaryShard = client(masterNode).admin().cluster().prepareState().get().getState().getRoutingTable()
.index("registers").shard(0).primaryShard();
assertThat(primaryShard.currentNodeId(), equalTo(nonIsolatedDataNodeId));
assertTrue(primaryShard.active());
});
client(masterNode).prepareIndex("registers", "foo", "bar").setSource("value", "somethingelse").get();
logger.info("--> stop disrupting network");
partition.stopDisrupting();
logger.info("--> wait for index to become green again");
ensureGreen();
GetResponse freshRead = client().prepareGet("registers", "foo", "bar").get();
assertThat(freshRead.getSourceAsMap().get("value"), equalTo("somethingelse"));
assertThat(freshRead.getVersion(), equalTo(3L)); // fails, version is 2 (same as the dirty read)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment