博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hbase java 常见操作
阅读量:6371 次
发布时间:2019-06-23

本文共 13109 字,大约阅读时间需要 43 分钟。

 

 

 

import java.io.IOException; import java.util.ArrayList; import java.util.HashMap;import java.util.List;import java.util.Map; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.ResultScanner;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.filter.FilterList;import org.apache.hadoop.hbase.util.Bytes;import org.mortbay.log.Log; public class HbaseUtil implements IOperator{	private static Configuration conf = null; 	private static String configFile = "hbase-site-test_bj.xml";	private   Map
aMap = null; private String mapTable = null; private String[] tableFamily = null; public HbaseUtil() { } public HbaseUtil( String mapAppTable , String[] appTableFamily ) { this.aMap = new HashMap
(); this.mapTable = mapAppTable; this.tableFamily = appTableFamily; } static { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.addResource(configFile); conf = HBaseConfiguration.create(HBASE_CONFIG); System.err.println(conf.get("hbase.zookeeper.property.dataDir")); } /** * 创建表操作 * * @throws IOException */ public void createTable(String tablename, String[] cfs) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tablename)) { System.out.println("表已经存在!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(tablename); for (int i = 0; i < cfs.length; i++) { tableDesc.addFamily(new HColumnDescriptor(cfs[i])); } admin.createTable(tableDesc); System.out.println("表创建成功!"); } admin.close(); } /** * 删除表操作 * * @param tablename * @throws IOException */ public void deleteTable(String tablename) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(tablename)) { System.out.println("table(" + tablename + ") not exists, won't delete"); } else { admin.disableTable(tablename); admin.deleteTable(tablename); System.out.println("table(" + tablename + ") delete success"); } admin.close(); } public void insertRow() throws IOException { HTable table = new HTable(conf, "test"); Put put = new Put(Bytes.toBytes("row3")); put.add(Bytes.toBytes("cf"), Bytes.toBytes("444"), Bytes.toBytes("value444")); table.put(put); table.close(); } /** * 插入一行记录 * * @param tablename * @param cfs * @throws IOException */ public void writeRow(String tablename, String[] cfs) throws IOException { HTable table = new HTable(conf, tablename); Put put = new Put(Bytes.toBytes(cfs[0])); put.add(Bytes.toBytes(cfs[1]), Bytes.toBytes(cfs[2]), Bytes.toBytes(cfs[3])); table.put(put); System.out.println("写入成功!"); table.close(); } // 写多条记录 public void writeMultRow(String tablename, String[][] cfs) throws IOException { List
lists = new ArrayList
(); HTable table = new HTable(conf, tablename); for (int i = 0; i < cfs.length; i++) { Put put = new Put(Bytes.toBytes(cfs[i][0])); put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3])); lists.add(put); } table.put(lists); table.close(); } // 写多条记录 public void writeMultRowByDevice(HTable table, String tablename, String[][] cfs) throws IOException { List
lists = new ArrayList
(); // HTable table = new HTable(conf, tablename); for (int i = 0; i < cfs.length; i++) { Put put = new Put(Bytes.toBytes(cfs[i][0])); Log.info("writeMultRowByDevice "+Bytes.toBytes(cfs[i][1])+"="+Bytes.toBytes(cfs[i][2])+"="+Bytes.toBytes(cfs[i][3])); put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3])); lists.add(put); } Log.info("push start"); table.put(lists); Log.info("push end"); } /** * 删除一行记录 * * @param tablename * @param rowkey * @throws IOException */ public void deleteRow(String tablename, String rowkey) throws IOException { HTable table = new HTable(conf, tablename); List
list = new ArrayList
(); Delete d1 = new Delete(rowkey.getBytes()); list.add(d1); table.delete(list); System.out.println("delete row(" + rowkey + ") sucess"); table.close(); } /** * 查找一行记录 * * @param tablename * @param rowkey */ public void selectRow(String tablename, String rowKey) throws IOException { HTable table = new HTable(conf, tablename); Get g = new Get(rowKey.getBytes()); // g.addColumn(Bytes.toBytes("cf:1")); Result rs = table.get(g); for (KeyValue kv : rs.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } table.close(); } /** * 查询表中所有行 * * @param tablename * @throws IOException */ public void scaner(String tablename) throws IOException { HTable table = new HTable(conf, tablename); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { KeyValue[] kv = r.raw(); // for (int i = 0; i < kv.length; i++) { /* * System.out.print(new String(kv[i].getRow()) + " "); * System.out.print(new String(kv[i].getFamily()) + ":"); * System.out.print(new String(kv[i].getQualifier()) + " "); * System.out.print(kv[i].getTimestamp() + " "); * System.out.println(new String(kv[i].getValue())); */ System.out.println(new String(kv[1].getValue()) + "==" + new String(kv[0].getValue())); // } } rs.close(); table.close(); } public void scanByTimestamp(String tablename, long maxtime) throws IOException { HTable table = new HTable(conf, tablename); Scan s = new Scan(); // TODO 存放所有的结果 FilterList allInfo = new FilterList(); // allInfo.addFilter(); s.setFilter(allInfo); } public Map
getMap() { Map
map = new HashMap
(); try { HTable table = new HTable(conf, mapTable); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { KeyValue[] kv = r.raw(); map.put(new String(kv[0].getRow()), new String(kv[0].getValue())); } } catch (IOException e) { e.printStackTrace(); } return map; } }

 

 

import java.io.IOException;import java.util.Map;public interface IOperator {	public void createTable(String tablename, String[] cfs) throws IOException ;	public void deleteTable(String tablename) throws IOException;	public void insertRow() throws IOException;	public void writeRow(String tablename, String[] cfs) throws IOException;	public void writeMultRow(String tablename, String[][] cfs) throws IOException;	public void deleteRow(String tablename, String rowkey) throws IOException;	public void selectRow(String tablename, String rowKey) throws IOException;	public void scaner(String tablename) throws IOException;	public void scanByTimestamp(String tablename, long maxtime) throws IOException;	public Map
getMap() throws IOException; }

 

public abstract class BaseRunnabler implements Runnable{	String sourceFile=""; // 读取文件路径	String numberFile="";     	String hbaseTable="";  // hbase  表名	String [] hbaseFamily=null;    // 行列簇名	String keywords ="";		public BaseRunnabler(String sourceFile,String hbaseTable,String [] hbaseFamily,String numberFile ,String keywords ){		this.sourceFile=sourceFile;		this.numberFile=numberFile;		this.hbaseTable=hbaseTable;		this.hbaseFamily = hbaseFamily;		this.keywords = keywords;	}			@Override	public void run() {		try{		IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily);    	hu.createTable(hbaseTable,hbaseFamily ); 		processFile(hu );		}catch (Exception e) {			e.printStackTrace();		}	}	public abstract void processFile(IOperator hu) throws Exception; 	 }

 

import java.io.BufferedReader;import java.io.File;import java.io.FileReader; import java.io.IOException;import java.util.Date;  import org.slf4j.Logger;import org.slf4j.LoggerFactory; import Model.Device; import com.alibaba.fastjson.JSON; public class DeviceReadThread extends BaseRunnabler {	  static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class);		public DeviceReadThread(String sourceFile, String hbaseTable,			String[] hbaseFamily, String numberFile, String keywords) {		super(sourceFile, hbaseTable, hbaseFamily, numberFile, keywords);	}	@Override	public void processFile(IOperator hu) {		FileReader logReader = null;		BufferedReader logBufferedReader = null;		try { 			File logFile = new File(sourceFile);			logReader = new FileReader(logFile);			logBufferedReader = new BufferedReader(logReader);			String temp = logBufferedReader.readLine();			//logger.error(" temp is  " + temp );			while ( temp  != null) {				Device device = JSON.parseObject(temp, Device.class); 				//logger.error(" device is null ? " + ( device == null ) );								String[][] s = new String[][] {						{ device.getLid(), hbaseFamily[0], "lid" , device.getLid() } ,						{ device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, 						{ device.getLid(), hbaseFamily[2], "os", "2" },						{ device.getLid(), hbaseFamily[2], "osv", "3" } };				hu.writeMultRow(hbaseTable, s);				logger.info(" hbase util end "   );				temp = logBufferedReader.readLine();			}		} catch (Exception e) {			logger.error(" DeviceReadThread error "   );			e.printStackTrace();		} finally { 			try {				logBufferedReader.close();			} catch (IOException e) { 				e.printStackTrace();			}			try {				logReader.close();			} catch (IOException e) { 				e.printStackTrace();			}		}	}}

 

 

import java.io.FileInputStream;import java.io.FileNotFoundException;import java.util.Properties;public class HbaseStarter {	public static void main(String[] args) throws  Exception {		Properties properties=new Properties();		//String config = "D:/work/util/aoi-hbase/trunk/src/main/resources/testua.properties";		String config = "/home/aoi/aoi-hbase/conf/config.properties"; 		FileInputStream fis = new FileInputStream(config);		properties.load(fis);		fis.close(); 				String sourceFile=properties.getProperty("sourceFile")+"device2.log"+","+properties.getProperty("sourceFile")+"applist.log";		String hbaseTable = properties.getProperty("hbaseTable");		String hbaseFamily = properties.getProperty("hbaseFamily");		String numFile=properties.getProperty("sourceFile")+"num.txt";						String[] sourceFileName=sourceFile.split(",");  // file 		String[] hbaseTableName=hbaseTable.split(",");  // table		String[] hbaseFamilyName=hbaseFamily.split("&");     // family  						DeviceReadThread device = new DeviceReadThread(sourceFileName[0],hbaseTableName[0],hbaseFamilyName[0].split(","),"","");		new Thread(device).start();				AppReadThread app = new AppReadThread(sourceFileName[1],hbaseTableName[1],hbaseFamilyName[1].split(","),numFile,"");		new Thread(app).start();			}}
 

 

 

config.properties
sourceFile=//data//logs//hbaseTable=device-ua,app-uahbaseFamily="device","history","Description"&"app", "history", "Description"
 

 

hbase-site-test_bj.xml

hbase.rootdir
hdfs://xxx.com:9000/hbase
hbase.cluster.distributed
true
hfile.block.cache.size
0.4
hbase.regionserver.handler.count
150
hbase.zookeeper.property.dataDir
/var/lib/zookeeper
hbase.zookeeper.property.clientPort
2181
hbase.zookeeper.quorum
xxx.com,xxx.com,rabbitmq1
zookeeper.session.timeout
60000
hbase.master.maxclockskew
180000
Time difference of regionserver from master
hbase.hregion.memstore.flush.size
512
hbase.zookeeper.property.maxClientCnxns
1000
hbase.hregion.max.filesize
1024

 

 

 

device2.log

 

 

 

 结果:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。

 
 
 谢谢您的赞助,我会做的更好!

 

 

转载地址:http://iayqa.baihongyu.com/

你可能感兴趣的文章
集成计算引擎在大型企业绩效考核系统的应用方案
查看>>
恕我直言,你可能误解了微服务
查看>>
【跃迁之路】【730天】程序员高效学习方法论探索系列(实验阶段487-2019.2.20)...
查看>>
[sublime系列文章] sublime text 3插件配置说明
查看>>
前端工程化二(requirejs + gulp)
查看>>
JavaScript:JSON 和 JS 对象
查看>>
[Leetcode] Generalized Abbreviation 列举单词缩写
查看>>
说说分布式事务(一)
查看>>
js中的几种继承实现
查看>>
二、SIP应用服务开发: 注册心跳管理
查看>>
蚂蚁金服分布式链路跟踪组件采样策略和源码 | 剖析 ...
查看>>
Docker的那些事(Kubernetes+docker)
查看>>
「镁客早报」未来中国数据量将超美国;巴菲特四季度股票资产缩水380亿美元,减持苹果甲骨文 ...
查看>>
mica cglib 增强——[1]cglib bean copy 介绍
查看>>
解决flask服务器使用gunicorn启动时,获取全局变量失败的问题
查看>>
美团首次展现无人配送链条,发布末端配送机器人 | CES 2019
查看>>
2019年3月微软补丁日多个漏洞预警
查看>>
oracle常用sql整理
查看>>
成为Java高级程序员需要掌握哪些?
查看>>
用python实现接口测试(一 、使用POST和GET请求api)
查看>>