Skip to content

Instantly share code, notes, and snippets.

@k2xl
Created February 9, 2012 19:16
Show Gist options
  • Save k2xl/1782187 to your computer and use it in GitHub Desktop.
Save k2xl/1782187 to your computer and use it in GitHub Desktop.
/**
* @author Danny Miller [email protected]
*/
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Tuple;
public class MockOutputCollector extends OutputCollector{
private LinkedList<IBoltListener> TestCaseListeners;
public MockOutputCollector()
{
super();
TestCaseListeners = new LinkedList<IBoltListener>();
}
@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
Iterator<IBoltListener> iterator = TestCaseListeners.iterator();
while (iterator.hasNext()) {
iterator.next().onEmit(tuple);
}
return null;
}
public void addBoltListener(IBoltListener listener)
{
TestCaseListeners.add(listener);
}
@Override
public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
// TODO Auto-generated method stub
}
@Override
public void ack(Tuple input) {
System.out.println("tuple "+input.getValue(0));
// TODO Auto-generated method stub
Iterator<IBoltListener> iterator = TestCaseListeners.iterator();
while (iterator.hasNext()) {
iterator.next().onAck(input);
}
}
@Override
public void fail(Tuple input) {
// TODO Auto-generated method stub
Iterator<IBoltListener> iterator = TestCaseListeners.iterator();
while (iterator.hasNext()) {
iterator.next().onFail(input);
}
}
@Override
public void reportError(Throwable error) {
// TODO Auto-generated method stub
Iterator<IBoltListener> iterator = TestCaseListeners.iterator();
while (iterator.hasNext()) {
iterator.next().onError(error);
}
}
}
///////////////////
/**
* @author Danny Miller [email protected]
*/
import java.util.List;
import backtype.storm.tuple.Tuple;
public interface IBoltListener {
public void onAck(Tuple input);
public void onFail(Tuple input);
public void onError(Throwable error);
public void onEmit(List<Object> tuple);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment