-
-
Save codeslinger/614063 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This file is licensed to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance with the | |
* License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
*/ | |
import java.io.IOException; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Comparator; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.NavigableMap; | |
import java.util.NavigableSet; | |
import java.util.TreeMap; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HTableDescriptor; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.hbase.client.Delete; | |
import org.apache.hadoop.hbase.client.Get; | |
import org.apache.hadoop.hbase.client.HTableInterface; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.ResultScanner; | |
import org.apache.hadoop.hbase.client.RowLock; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.util.Bytes; | |
/** | |
* Mock implementation of HTableInterface. Holds any supplied data in a | |
* multi-dimensional NavigableMap which acts as a in-memory database. Useful for | |
* testing classes that operate on data using an HTableInterface. | |
* <p> | |
* Instances should be get using <code>MockHTable.create()</code>. So while a | |
* DAO with a saving operation like | |
* | |
* <pre> | |
* public class MyDAO { | |
* private HTableInterface table; | |
* | |
* public MyDAO(HTableInterface table) { | |
* this.table = table; | |
* } | |
* | |
* public void saveData(byte[] id, byte[] data) throws IOException{ | |
* Put put = new Put(id) | |
* put.add(family, qualifier, data); | |
* table.put(put); | |
* } | |
* } | |
* </pre> | |
* <p> | |
* is used in production like | |
* | |
* <pre> | |
* MyDAO(new HTable(conf, tableName)).saveData(id, data); | |
* </pre> | |
* <p> | |
* can be tested like | |
* | |
* <pre> | |
* @Test | |
* public void testSave() { | |
* MockHTable table = MockHTable.create(); | |
* MyDAO(table).saveData(id, data); | |
* Get get = new Get(id); | |
* Result result = table.get(get); | |
* assertArrayEquals(data, result.getValue(family, qualifier)); | |
* } | |
* </pre> | |
* <p> | |
* MockHTable instances can also be initialized with pre-loaded data using one | |
* of the String[][] or Map<String, Map<String, String>> data formats. While | |
* String[][] parameter lets directly loading data from source code, Map can be | |
* generated from a YAML document, using a parser. | |
* | |
* <pre> | |
* // String[][] | |
* MockHTable table = MockHTable.with(new String[][] { | |
* { "<rowid>", "<column>", "<value>" }, | |
* { "id", "family:qualifier1", "data1" }, | |
* { "id", "family:qualifier2", "data2" } | |
* }); | |
* // YAML | |
* String database = "id:\n family:qualifier1: data1\n family:qualifier2: data2\n"; | |
* MockHTable table = MockHTable.with((Map<String, Map<String, String>) new Yaml().load(database)); | |
* </pre> | |
* <p> | |
* If value is not supposed to be a String, but an int, double or anything, | |
* <code>MockHTable.toEString()</code> can be used to turn it into a String. | |
* | |
* <p> | |
* In order to simplify assertions for tests that should put anything into | |
* database, MockHTable.read() works with two parameters (id and column) and | |
* returns anything written to that row/column. So, previous test can be reduced to | |
* | |
* <pre> | |
* @Test | |
* public void testSave() { | |
* MockHTable table = MockHTable.create(); | |
* MyDAO(table).saveData(id, data); | |
* assertArrayEquals(data, table.read(id, "family:qualifier")); | |
* } | |
* </pre> | |
* <p> | |
* TODO: Don't know if timestamps work correctly | |
* | |
* @author erdem | |
* | |
*/ | |
public class MockHTable implements HTableInterface { | |
/** | |
* Simple class to help create a TreeMap with byte[] key. | |
* @author erdem | |
* | |
*/ | |
private static class BC implements Comparator<byte[]> { | |
public int compare(byte[] o1, byte[] o2) { | |
return ByteBuffer.wrap(o1).compareTo(ByteBuffer.wrap(o2)); | |
} | |
} | |
/** | |
* This is all the data for a MockHTable instance | |
*/ | |
private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<byte[], NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>>>(new BC()); | |
/** | |
* Helper method to convert some data into a list of KeyValue's | |
* | |
* @param row | |
* row value of the KeyValue's | |
* @param rowdata | |
* data to decode | |
* @return List of KeyValue's | |
*/ | |
private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata){ | |
return toKeyValue(row, rowdata, 0, Long.MAX_VALUE); | |
} | |
/** | |
* Helper method to convert some data into a list of KeyValue's with timestamp | |
* constraint | |
* | |
* @param row | |
* row value of the KeyValue's | |
* @param rowdata | |
* data to decode | |
* @param timestampStart | |
* start of the timestamp constraint | |
* @param timestampEnd | |
* end of the timestamp constraint | |
* @return List of KeyValue's | |
*/ | |
private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd){ | |
List<KeyValue> ret = new ArrayList<KeyValue>(); | |
for (byte[] family : rowdata.keySet()) | |
for (byte[] qualifier : rowdata.get(family).keySet()) | |
for (Long timestamp : rowdata.get(family).get(qualifier).keySet()){ | |
if (timestamp < timestampStart) | |
continue; | |
if (timestamp > timestampEnd) | |
continue; | |
byte[] value = rowdata.get(family).get(qualifier).get(timestamp); | |
ret.add(new KeyValue(row, family, qualifier, timestamp, value)); | |
} | |
return ret; | |
} | |
/** | |
* Clients should not rely on table names so this returns null. | |
* @return null | |
*/ | |
@Override | |
public byte[] getTableName() { return null; } | |
/** | |
* No configuration needed to work so this returns null. | |
* @return null | |
*/ | |
@Override | |
public Configuration getConfiguration() { return null; } | |
/** | |
* No table descriptor needed so this returns null. | |
* @return null | |
*/ | |
@Override | |
public HTableDescriptor getTableDescriptor() { return null; } | |
@Override | |
public boolean exists(Get get) throws IOException { | |
return data.containsKey(get.getRow()); | |
} | |
@Override | |
public Result get(Get get) throws IOException { | |
if (!exists(get)) | |
return new Result(); | |
byte[] row = get.getRow(); | |
if (!get.hasFamilies()) { | |
return new Result(toKeyValue(row, data.get(row))); | |
} | |
List<KeyValue> kvs = new ArrayList<KeyValue>(); | |
for (byte[] family : get.getFamilyMap().keySet()){ | |
NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family); | |
if (qualifiers == null) | |
qualifiers = data.get(row).get(family).navigableKeySet(); | |
for (byte[] qualifier : qualifiers){ | |
if (qualifier == null) | |
qualifier = "".getBytes(); | |
if (!data.get(row).containsKey(family) || | |
!data.get(row).get(family).containsKey(qualifier) || | |
data.get(row).get(family).get(qualifier).isEmpty()) | |
continue; | |
Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry(); | |
kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue())); | |
} | |
} | |
return new Result(kvs); | |
} | |
@Override | |
public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { | |
// FIXME: implement | |
return null; | |
} | |
@Override | |
public ResultScanner getScanner(Scan scan) throws IOException { | |
final List<Result> ret = new ArrayList<Result>(); | |
for (byte[] row : data.keySet()){ | |
if (scan.getStartRow() != null && scan.getStartRow().length > 0 && | |
ByteBuffer.wrap(scan.getStartRow()).compareTo(ByteBuffer.wrap(row)) > 0) | |
continue; | |
if (scan.getStopRow() != null && scan.getStopRow().length > 0 && | |
ByteBuffer.wrap(scan.getStopRow()).compareTo(ByteBuffer.wrap(row)) <= 0) | |
continue; | |
List<KeyValue> kvs = null; | |
if (!scan.hasFamilies()) { | |
kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); | |
} else { | |
kvs = new ArrayList<KeyValue>(); | |
for (byte[] family : scan.getFamilyMap().keySet()){ | |
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); | |
if (qualifiers == null) | |
qualifiers = data.get(row).get(family).navigableKeySet(); | |
for (byte[] qualifier : qualifiers){ | |
for (Long timestamp : data.get(row).get(family).get(qualifier).keySet()){ | |
if (timestamp < scan.getTimeRange().getMin()) | |
continue; | |
if (timestamp > scan.getTimeRange().getMax()) | |
continue; | |
byte[] value = data.get(row).get(family).get(qualifier).get(timestamp); | |
kvs.add(new KeyValue(row, family, qualifier, timestamp, value)); | |
} | |
} | |
} | |
} | |
if (scan.getFilter() != null) | |
scan.getFilter().filterRow(kvs); | |
ret.add(new Result(kvs)); | |
} | |
return new ResultScanner() { | |
public Iterator<Result> iterator() { | |
return ret.iterator(); | |
} | |
public Result[] next(int nbRows) throws IOException { | |
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows); | |
for(int i = 0; i < nbRows; i++) { | |
Result next = next(); | |
if (next != null) { | |
resultSets.add(next); | |
} else { | |
break; | |
} | |
} | |
return resultSets.toArray(new Result[resultSets.size()]); | |
} | |
public Result next() throws IOException { | |
return iterator().next(); | |
} | |
public void close() {} | |
}; | |
} | |
@Override | |
public ResultScanner getScanner(byte[] family) throws IOException { | |
Scan scan = new Scan(); | |
scan.addFamily(family); | |
return getScanner(scan); | |
} | |
@Override | |
public ResultScanner getScanner(byte[] family, byte[] qualifier) | |
throws IOException { | |
Scan scan = new Scan(); | |
scan.addColumn(family, qualifier); | |
return getScanner(scan); | |
} | |
@Override | |
public void put(Put put) throws IOException { | |
byte[] row = put.getRow(); | |
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(new BC())); | |
for (byte[] family : put.getFamilyMap().keySet()){ | |
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(new BC())); | |
for (KeyValue kv : put.getFamilyMap().get(family)){ | |
byte[] qualifier = kv.getQualifier(); | |
NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>()); | |
qualifierData.put(kv.getTimestamp(), kv.getValue()); | |
} | |
} | |
} | |
/** | |
* Helper method to find a key in a map. If key is not found, newObject is | |
* added to map and returned | |
* | |
* @param map | |
* map to extract value from | |
* @param key | |
* key to look for | |
* @param newObject | |
* set key to this if not found | |
* @return found value or newObject if not found | |
*/ | |
private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){ | |
V data = map.get(key); | |
if (data == null){ | |
data = newObject; | |
map.put(key, data); | |
} | |
return data; | |
} | |
@Override | |
public void put(List<Put> puts) throws IOException { | |
for (Put put : puts) | |
put(put); | |
} | |
/** | |
* Checks if the value with given details exists in database, or is | |
* non-existent in the case of value being null | |
* | |
* @param row | |
* row | |
* @param family | |
* family | |
* @param qualifier | |
* qualifier | |
* @param value | |
* value | |
* @return true if value is not null and exists in db, or value is null and | |
* not exists in db, false otherwise | |
*/ | |
private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value){ | |
if (value == null || value.length == 0) | |
return ! data.containsKey(row) || | |
! data.get(row).containsKey(family) || | |
! data.get(row).get(family).containsKey(qualifier); | |
else | |
return data.containsKey(row) && | |
data.get(row).containsKey(family) && | |
data.get(row).get(family).containsKey(qualifier) && | |
! data.get(row).get(family).get(qualifier).isEmpty() && | |
Arrays.equals(data.get(row).get(family).get(qualifier).lastEntry().getValue(), value); | |
} | |
@Override | |
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, | |
byte[] value, Put put) throws IOException { | |
if (check(row, family, qualifier, value)){ | |
put(put); | |
return true; | |
} | |
return false; | |
} | |
@Override | |
public void delete(Delete delete) throws IOException { | |
byte[] row = delete.getRow(); | |
if (data.get(row) == null) | |
return; | |
if (delete.getFamilyMap().size() == 0){ | |
data.remove(row); | |
return; | |
} | |
for (byte[] family : delete.getFamilyMap().keySet()){ | |
if (data.get(row).get(family) == null) | |
continue; | |
if (delete.getFamilyMap().get(family).isEmpty()){ | |
data.get(row).remove(family); | |
continue; | |
} | |
for (KeyValue kv : delete.getFamilyMap().get(family)){ | |
data.get(row).get(kv.getFamily()).remove(kv.getQualifier()); | |
} | |
} | |
} | |
@Override | |
public void delete(List<Delete> deletes) throws IOException { | |
for (Delete delete : deletes) | |
delete(delete); | |
} | |
@Override | |
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, | |
byte[] value, Delete delete) throws IOException { | |
if(check(row, family, qualifier, value)){ | |
delete(delete); | |
return true; | |
} | |
return false; | |
} | |
@Override | |
public long incrementColumnValue(byte[] row, byte[] family, | |
byte[] qualifier, long amount) throws IOException { | |
return incrementColumnValue(row, family, qualifier, amount, true); | |
} | |
@Override | |
public long incrementColumnValue(byte[] row, byte[] family, | |
byte[] qualifier, long amount, boolean writeToWAL) throws IOException { | |
if (check(row, family, qualifier, null)){ | |
Put put = new Put(row); | |
put.add(family, qualifier, Bytes.toBytes(amount)); | |
put(put); | |
return amount; | |
} | |
long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue())+amount; | |
data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), | |
Bytes.toBytes(newValue)); | |
return newValue; | |
} | |
@Override | |
public boolean isAutoFlush() { | |
return true; | |
} | |
@Override | |
public void flushCommits() throws IOException { | |
} | |
@Override | |
public void close() throws IOException { | |
} | |
@Override | |
public RowLock lockRow(byte[] row) throws IOException { | |
return null; | |
} | |
@Override | |
public void unlockRow(RowLock rl) throws IOException { | |
} | |
private MockHTable(){} | |
/** | |
* Default way of constructing a MockHTable | |
* @return a new MockHTable | |
*/ | |
public static MockHTable create(){ | |
return new MockHTable(); | |
} | |
/** | |
* Create a MockHTable with some pre-loaded data. Parameter should be a map of | |
* column-to-data mappings of rows. It can be created with a YAML like | |
* | |
* <pre> | |
* rowid: | |
* family1:qualifier1: value1 | |
* family2:qualifier2: value2 | |
* </pre> | |
* | |
* @param dump | |
* pre-loaded data | |
* @return a new MockHTable loaded with given data | |
*/ | |
public static MockHTable with(Map<String, Map<String, String>> dump){ | |
MockHTable ret = new MockHTable(); | |
for (String row : dump.keySet()){ | |
for (String column : dump.get(row).keySet()){ | |
String val = dump.get(row).get(column); | |
put(ret, row, column, val); | |
} | |
} | |
return ret; | |
} | |
/** | |
* Helper method of pre-loaders, adds parameters to data. | |
* | |
* @param ret | |
* data to load into | |
* @param row | |
* rowid | |
* @param column | |
* family:qualifier encoded value | |
* @param val | |
* value | |
*/ | |
private static void put(MockHTable ret, String row, String column, | |
String val) { | |
String[] fq = split(column); | |
byte[] family = Bytes.toBytesBinary(fq[0]); | |
byte[] qualifier = Bytes.toBytesBinary(fq[1]); | |
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> families = ret.forceFind(ret.data, Bytes.toBytesBinary(row), new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(new BC())); | |
NavigableMap<byte[], NavigableMap<Long, byte[]>> qualifiers = ret.forceFind(families, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(new BC())); | |
NavigableMap<Long, byte[]> values = ret.forceFind(qualifiers, qualifier, new TreeMap<Long, byte[]>()); | |
values.put(System.currentTimeMillis(), Bytes.toBytesBinary(val)); | |
} | |
/** | |
* Create a MockHTable with some pre-loaded data. Parameter should be an array | |
* of string arrays which define every column value individually. | |
* | |
* <pre> | |
* new String[][] { | |
* { "<rowid>", "<column>", "<value>" }, | |
* { "id", "family:qualifier1", "data1" }, | |
* { "id", "family:qualifier2", "data2" } | |
* }); | |
* </pre> | |
* | |
* @param dump | |
* @return | |
*/ | |
public static MockHTable with(String[][] dump){ | |
MockHTable ret = new MockHTable(); | |
for(String[] row : dump){ | |
put(ret, row[0], row[1], row[2]); | |
} | |
return ret; | |
} | |
/** | |
* Column identification helper | |
* | |
* @param column | |
* column name in the format family:qualifier | |
* @return <code>{"family", "qualifier"}</code> | |
*/ | |
private static String[] split(String column){ | |
return new String[]{ | |
column.substring(0, column.indexOf(':')), | |
column.substring(column.indexOf(':')+1)}; | |
} | |
/** | |
* Read a value saved in the object. Useful for making assertions in tests. | |
* | |
* @param rowid | |
* rowid of the data to read | |
* @param column | |
* family:qualifier of the data to read | |
* @return value or null if row or column of the row does not exist | |
*/ | |
public byte[] read(String rowid, String column){ | |
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> row = data.get(Bytes.toBytesBinary(rowid)); | |
if (row == null) | |
return null; | |
String[] fq = split(column); | |
byte[] family = Bytes.toBytesBinary(fq[0]); | |
byte[] qualifier = Bytes.toBytesBinary(fq[1]); | |
if (!row.containsKey(family)) | |
return null; | |
if (!row.get(family).containsKey(qualifier)) | |
return null; | |
return row.get(family).get(qualifier).lastEntry().getValue(); | |
} | |
public static String toEString(boolean val){ | |
return Bytes.toStringBinary(Bytes.toBytes(val)); | |
} | |
public static String toEString(double val){ | |
return Bytes.toStringBinary(Bytes.toBytes(val)); | |
} | |
public static String toEString(float val){ | |
return Bytes.toStringBinary(Bytes.toBytes(val)); | |
} | |
public static String toEString(int val){ | |
return Bytes.toStringBinary(Bytes.toBytes(val)); | |
} | |
public static String toEString(long val){ | |
return Bytes.toStringBinary(Bytes.toBytes(val)); | |
} | |
public static String toEString(short val){ | |
return Bytes.toStringBinary(Bytes.toBytes(val)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment