編寫MR程序,讓其可以適合大部分的HBase表數(shù)據(jù)導(dǎo)入到HBase表數(shù)據(jù)。其中包括可以設(shè)置版本數(shù)、可以設(shè)置輸入表的列導(dǎo)入設(shè)置(選取其中某幾列)、可以設(shè)置輸出表的列導(dǎo)出設(shè)置(選取其中某幾列)。
原始表test1數(shù)據(jù)如下:
每個row key都有兩個版本的數(shù)據(jù),這里只顯示了row key為1的數(shù)據(jù)
在hbase shell 中創(chuàng)建數(shù)據(jù)表:
create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test3',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test4',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、有列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test5',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test6',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test7',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test8',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù)
main函數(shù)入口:
package GeneralHBaseToHBase;
import org.apache.hadoop.util.ToolRunner;
public class DriverTest {
public static void main(String[] args) throws Exception {
// 無版本設(shè)置、無列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
String[] myArgs1= new String[]{
"test1", // 輸入表
"test2", // 輸出表
"0", // 版本大小數(shù),如果值為0,則為默認(rèn)從輸入表導(dǎo)出最新的數(shù)據(jù)到輸出表
"-1", // 列導(dǎo)入設(shè)置,如果為-1 ,則沒有設(shè)置列導(dǎo)入
"-1" // 列導(dǎo)出設(shè)置,如果為-1,則沒有設(shè)置列導(dǎo)出
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs1);
// 無版本設(shè)置、有列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
String[] myArgs2= new String[]{
"test1",
"test3",
"0",
"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
"-1"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs2);
// 無版本設(shè)置,無列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置
String[] myArgs3= new String[]{
"test1",
"test4",
"0",
"-1",
"cf1:c1,cf1:c10,cf1:c14"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs3);
// 有版本設(shè)置,無列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
String[] myArgs4= new String[]{
"test1",
"test5",
"2",
"-1",
"-1"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs4);
// 有版本設(shè)置、有列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
String[] myArgs5= new String[]{
"test1",
"test6",
"2",
"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
"-1"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs5);
// 有版本設(shè)置、無列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置
String[] myArgs6= new String[]{
"test1",
"test7",
"2",
"-1",
"cf1:c1,cf1:c10,cf1:c14"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs6);
// 有版本設(shè)置、有列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置
String[] myArgs7= new String[]{
"test1",
"test8",
"2",
"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
"cf1:c1,cf1:c10,cf1:c14"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs7);
}
}
driver:
package GeneralHBaseToHBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import util.JarUtil;
public class HBaseDriver extends Configured implements Tool{
public static String FROMTABLE=""; //導(dǎo)入表
public static String TOTABLE=""; //導(dǎo)出表
public static String SETVERSION=""; //是否設(shè)置版本
// args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}
@Override
public int run(String[] args) throws Exception {
if(args.length!=5){
System.err.println("Usage:\n demo.job.HBaseDriver input> inputTable> "
+ "output> outputTable>"
+" versions >"
+ " set columns from inputTable> like cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or -1> "
+ "set columns from outputTable> like cf1:c1,cf1:c10,cf1:c14> or -1>");
return -1;
}
Configuration conf = getConf();
FROMTABLE = args[0];
TOTABLE = args[1];
SETVERSION = args[2];
conf.set("SETVERSION", SETVERSION);
if(!args[3].equals("-1")){
conf.set("COLUMNFROMTABLE", args[3]);
}
if(!args[4].equals("-1")){
conf.set("COLUMNTOTABLE", args[4]);
}
String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE;
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(HBaseDriver.class);
Scan scan = new Scan();
// 判斷是否需要設(shè)置版本
if(SETVERSION != "0" || SETVERSION != "1"){
scan.setMaxVersions(Integer.parseInt(SETVERSION));
}
// 設(shè)置HBase表輸入:表名、scan、Mapper類、mapper輸出鍵類型、mapper輸出值類型
TableMapReduceUtil.initTableMapperJob(
FROMTABLE,
scan,
HBaseToHBaseMapper.class,
ImmutableBytesWritable.class,
Put.class,
job);
// 設(shè)置HBase表輸出:表名,reducer類
TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job);
// 沒有 reducers, 直接寫入到 輸出文件
job.setNumReduceTasks(0);
return job.waitForCompletion(true) ? 0 : 1;
}
private static Configuration configuration;
public static Configuration getConfiguration(){
if(configuration==null){
/**
* TODO 了解如何直接從Windows提交代碼到Hadoop集群
* 并修改其中的配置為實(shí)際配置
*/
configuration = new Configuration();
configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平臺提交任務(wù)
configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode
configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager
configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定資源分配器
configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver
configuration.set("hbase.master", "master:16000");
configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");
configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
//TODO 需export->jar file ; 設(shè)置正確的jar包所在位置
configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 設(shè)置jar包路徑
}
return configuration;
}
}
mapper:
package GeneralHBaseToHBase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseToHBaseMapper extends TableMapperImmutableBytesWritable, Put> {
Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class);
private static int versionNum = 0;
private static String[] columnFromTable = null;
private static String[] columnToTable = null;
private static String column1 = null;
private static String column2 = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
versionNum = Integer.parseInt(conf.get("SETVERSION", "0"));
column1 = conf.get("COLUMNFROMTABLE",null);
if(!(column1 == null)){
columnFromTable = column1.split(",");
}
column2 = conf.get("COLUMNTOTABLE",null);
if(!(column2 == null)){
columnToTable = column2.split(",");
}
}
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
context.write(key, resultToPut(key,value));
}
/***
* 把key,value轉(zhuǎn)換為Put
* @param key
* @param value
* @return
* @throws IOException
*/
private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException {
HashMapString, String> fTableMap = new HashMap>();
HashMapString, String> tTableMap = new HashMap>();
Put put = new Put(key.get());
if(! (columnFromTable == null || columnFromTable.length == 0)){
fTableMap = getFamilyAndColumn(columnFromTable);
}
if(! (columnToTable == null || columnToTable.length == 0)){
tTableMap = getFamilyAndColumn(columnToTable);
}
if(versionNum==0){
if(fTableMap.size() == 0){
if(tTableMap.size() == 0){
for (Cell kv : value.rawCells()) {
put.add(kv); // 沒有設(shè)置版本,沒有設(shè)置列導(dǎo)入,沒有設(shè)置列導(dǎo)出
}
return put;
} else{
return getPut(put, value, tTableMap); // 無版本、無列導(dǎo)入、有列導(dǎo)出
}
} else {
if(tTableMap.size() == 0){
return getPut(put, value, fTableMap);// 無版本、有列導(dǎo)入、無列導(dǎo)出
} else {
return getPut(put, value, tTableMap);// 無版本、有列導(dǎo)入、有列導(dǎo)出
}
}
} else{
if(fTableMap.size() == 0){
if(tTableMap.size() == 0){
return getPut1(put, value); // 有版本,無列導(dǎo)入,無列導(dǎo)出
}else{
return getPut2(put, value, tTableMap); //有版本,無列導(dǎo)入,有列導(dǎo)出
}
}else{
if(tTableMap.size() == 0){
return getPut2(put,value,fTableMap);// 有版本,有列導(dǎo)入,無列導(dǎo)出
}else{
return getPut2(put,value,tTableMap); // 有版本,有列導(dǎo)入,有列導(dǎo)出
}
}
}
}
/***
* 無版本設(shè)置的情況下,對于有列導(dǎo)入或者列導(dǎo)出
* @param put
* @param value
* @param tableMap
* @return
* @throws IOException
*/
private Put getPut(Put put,Result value,HashMapString, String> tableMap) throws IOException{
for(Cell kv : value.rawCells()){
byte[] family = kv.getFamily();
if(tableMap.containsKey(new String(family))){
String columnStr = tableMap.get(new String(family));
ArrayListString> columnBy = toByte(columnStr);
if(columnBy.contains(new String(kv.getQualifier()))){
put.add(kv); //沒有設(shè)置版本,沒有設(shè)置列導(dǎo)入,有設(shè)置列導(dǎo)出
}
}
}
return put;
}
/***
* (有版本,無列導(dǎo)入,有列導(dǎo)出)或者(有版本,有列導(dǎo)入,無列導(dǎo)出)
* @param put
* @param value
* @param tTableMap
* @return
*/
private Put getPut2(Put put,Result value,HashMapString, String> tableMap){
NavigableMapbyte[], NavigableMapbyte[], NavigableMapLong, byte[]>>> map=value.getMap();
for(byte[] family:map.keySet()){
if(tableMap.containsKey(new String(family))){
String columnStr = tableMap.get(new String(family));
log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr);
ArrayListString> columnBy = toByte(columnStr);
NavigableMapbyte[], NavigableMapLong, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關(guān)數(shù)據(jù)
for(byte[] column:familyMap.keySet()){ //根據(jù)列名循壞
log.info("!!!!!!!!!!!"+new String(column));
if(columnBy.contains(new String(column))){
NavigableMapLong, byte[]> valuesMap = familyMap.get(column);
for(EntryLong, byte[]> s:valuesMap.entrySet()){//獲取列對應(yīng)的不同版本數(shù)據(jù),默認(rèn)最新的一個
System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));
put.addColumn(family, column, s.getKey(),s.getValue());
}
}
}
}
}
return put;
}
/***
* 有版本、無列導(dǎo)入、無列導(dǎo)出
* @param put
* @param value
* @return
*/
private Put getPut1(Put put,Result value){
NavigableMapbyte[], NavigableMapbyte[], NavigableMapLong, byte[]>>> map=value.getMap();
for(byte[] family:map.keySet()){
NavigableMapbyte[], NavigableMapLong, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關(guān)數(shù)據(jù)
for(byte[] column:familyMap.keySet()){ //根據(jù)列名循壞
NavigableMapLong, byte[]> valuesMap = familyMap.get(column);
for(EntryLong, byte[]> s:valuesMap.entrySet()){ //獲取列對應(yīng)的不同版本數(shù)據(jù),默認(rèn)最新的一個
put.addColumn(family, column, s.getKey(),s.getValue());
}
}
}
return put;
}
// str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
/***
* 得到列簇名與列名的k,v形式的map
* @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
* @return map => {"cf1" => "c1,c2,c10,c11,c14"}
*/
private static HashMapString, String> getFamilyAndColumn(String[] str){
HashMapString, String> map = new HashMap>();
HashSetString> set = new HashSet>();
for(String s : str){
set.add(s.split(":")[0]);
}
Object[] ob = set.toArray();
for(int i=0; iob.length;i++){
String family = String.valueOf(ob[i]);
String columns = "";
for(int j=0;j str.length;j++){
if(family.equals(str[j].split(":")[0])){
columns += str[j].split(":")[1]+",";
}
}
map.put(family, columns.substring(0, columns.length()-1));
}
return map;
}
private static ArrayListString> toByte(String s){
ArrayListString> b = new ArrayList>();
String[] sarr = s.split(",");
for(int i=0;isarr.length;i++){
b.add(sarr[i]);
}
return b;
}
}
程序運(yùn)行完之后,在hbase shell中查看每個表,看是否數(shù)據(jù)導(dǎo)入正確:
test2:(無版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置)
test3 (無版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導(dǎo)出設(shè)置)
test4(無版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))
test5(有版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置)
test6(有版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導(dǎo)出設(shè)置)
test7(有版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))
test8(有版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
您可能感興趣的文章:- Javascript面試經(jīng)典套路reduce函數(shù)查重
- MapReduce核心思想圖文詳解
- shuffle的關(guān)鍵階段sort(Map端和Reduce端)源碼分析
- Array數(shù)組對象中的forEach、map、filter及reduce詳析
- 對tf.reduce_sum tensorflow維度上的操作詳解
- js數(shù)組方法reduce經(jīng)典用法代碼分享
- MongoDB中MapReduce的使用方法詳解
- Java/Web調(diào)用Hadoop進(jìn)行MapReduce示例代碼
- 詳解JS數(shù)組Reduce()方法詳解及高級技巧
- js中的reduce()函數(shù)講解