Created
June 26, 2012 21:13
-
-
Save blinsay/2999083 to your computer and use it in GitHub Desktop.
HashJoin after Merge throwing an IllegalStateException
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1,dog | |
2,cat | |
3,fox | |
4,bear | |
5,snake |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import org.junit.Test; | |
import cascading.PlatformTestCase; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowDef; | |
import cascading.pipe.HashJoin; | |
import cascading.pipe.Merge; | |
import cascading.pipe.Pipe; | |
import cascading.pipe.assembly.Retain; | |
import cascading.pipe.joiner.InnerJoin; | |
import cascading.tap.SinkMode; | |
import cascading.test.HadoopPlatform; | |
import cascading.test.LocalPlatform; | |
import cascading.test.PlatformRunner; | |
import cascading.tuple.Fields; | |
@PlatformRunner.Platform({LocalPlatform.class, HadoopPlatform.class}) | |
public class TestHashJoin extends PlatformTestCase { | |
private static final String COMMA = ","; | |
@Test | |
public void testHashJoin() throws IOException { | |
final FlowDef flowDef = new FlowDef().setName("testHashJoin"); | |
Pipe animals = new Pipe("animals"); | |
flowDef.addSource(animals, getPlatform().getDelimitedFile(new Fields("id", "item"), COMMA, "src/test/data/hash_join_test/animals.txt")); | |
Pipe vegetables = new Pipe("vegetables"); | |
flowDef.addSource(vegetables, getPlatform().getDelimitedFile(new Fields("id", "item"), COMMA, "src/test/data/hash_join_test/vegetables.txt")); | |
Pipe allItems = new Merge(animals, vegetables); | |
Pipe names = new Pipe("names"); | |
flowDef.addSource(names, getPlatform().getDelimitedFile(new Fields("name", "id"), COMMA, "src/test/data/hash_join_test/names.txt")); | |
Pipe joined = new HashJoin(new Pipe[] {allItems, names}, new Fields[] {new Fields("id"), new Fields("id")}, new Fields("id", "item", "name", "_id"), new InnerJoin()); | |
joined = new Retain(joined, new Fields("id", "item", "name")); | |
flowDef.addTailSink(joined, getPlatform().getDelimitedFile(Fields.ALL, COMMA, "src/test/data/hash_join_test/output", SinkMode.REPLACE)); | |
final Flow flow = getPlatform().getFlowConnector().connect(flowDef); | |
flow.complete(); | |
validateLength(flow, 7); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alice,1 | |
bob,2 | |
john,3 | |
fred,4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2012-06-26 14:08:28,099 WARN mapred.LocalJobRunner (LocalJobRunner.java:run(298)) - job_local_0001 | |
java.lang.RuntimeException: Error in configuring object | |
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93) | |
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64) | |
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212) | |
Caused by: java.lang.reflect.InvocationTargetException | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) | |
at java.lang.reflect.Method.invoke(Method.java:597) | |
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88) | |
... 5 more | |
Caused by: cascading.flow.FlowException: internal error during mapper configuration | |
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:96) | |
... 10 more | |
Caused by: java.lang.IllegalStateException: accumulated source conf property missing | |
at cascading.flow.hadoop.stream.HadoopMapStreamGraph.buildGraph(HadoopMapStreamGraph.java:92) | |
at cascading.flow.hadoop.stream.HadoopMapStreamGraph.<init>(HadoopMapStreamGraph.java:60) | |
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:80) | |
... 10 more |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1,potato | |
2,tomato | |
3,banana |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment