Phoenix Phoenix实践
Phoenix启动安装基本操作 二级索引支持(gobal index + local index)
编译SQL成为原生HBase的可并行执行的Scan
Phoenix结构
Phoenix在Hadoop生态系统中的位置
HBase性能提升
hbase1.2性能能提:
1.2相对1.1,提升是十分显著的,在某些方面的额提升,延迟低了好几倍,
更别说Hive over HBase,Hive over Hbase的性能下,Phoenix的性能是这种的好多倍。
启动 配置:配置相对来说比较简单
Download:http://phoenix.apache.org/download.html,下载hbase对应版本的phoenix;解压bin.tar.gz包,拷贝**phoenix server jar**包到hbase集群的每个region server 的lib目录下,然后重启hbase 集群。
phoniex 的启动命令是sqlline.py
使用bin/sqlline.py 172.16.0.128:2181连接上zk,就可以连接上HBase
命令行操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 !all Execute the specified SQL against all the current connections !autocommit Set autocommit mode on or off !batch Start or execute a batch of statements !brief Set verbose mode off !call Execute a callable statement !close Close the current connection to the database !closeall Close all current open connections !columns List all the columns for the specified table !commit Commit the current transaction (if autocommit is off) !connect Open a new connection to the database. !dbinfo Give metadata information about the database !describe Describe a table !dropall Drop all tables in the current database !exportedkeys List all the exported keys for the specified table !go Select the current connection !help Print a summary of command usage !history Display the command history !importedkeys List all the imported keys for the specified table !indexes List all the indexes for the specified table !isolation Set the transaction isolation for this connection !list List the current connections !manual Display the SQLLine manual !metadata Obtain metadata information !nativesql Show the native SQL for the specified statement !outputformat Set the output format for displaying results (table,vertical,csv,tsv,xmlattrs,xmlelements) !primarykeys List all the primary keys for the specified table !procedures List all the procedures !properties Connect to the database specified in the properties file(s) !quit Exits the program !reconnect Reconnect to the database !record Record all output to the specified file !rehash Fetch table and column names for command completion !rollback Roll back the current transaction (if autocommit is off) !run Run a script from the specified file !save Save the current variabes and aliases !scan Scan for installed JDBC drivers !script Start saving a script to a file !set Set a sqlline variable Variable Value Description =============== ========== autoCommit true/false Enable/disable automatic transaction commit autoSave true/false Automatically save preferences color true/false Control whether color is used for display fastConnect true/false Skip building table/column list for tab-completion force true/false Continue running script even after errors headerInterval integer The interval between which headers are displayed historyFile path File in which to save command history. Default is $ HOME/.sqlline/history (UNIX, Linux, Mac OS), $ HOME/sqlline/history (Windows) incremental true/false Do not receive all rows from server before printing the first row. Uses fewer resources, especially for long-running queries, but column widths may be incorrect. isolation LEVEL Set transaction isolation level maxColumnWidth integer The maximum width to use when displaying columns maxHeight integer The maximum height of the terminal maxWidth integer The maximum width of the terminal numberFormat pattern Format numbers using DecimalFormat pattern outputFormat table/vertical/csv/tsv Format mode for result display propertiesFile path File from which SqlLine reads properties on startup; default is $ HOME/.sqlline/sqlline.properties(UNIX, Linux, Mac OS), $ HOME/sqlline/sqlline.properties(Windows) rowLimit integer Maximum number of rows returned from a query; zero means no limit showElapsedTime true/false Display execution time when verbose showHeader true/false Show column names in query results showNestedErrs true/false Display nested errors showWarnings true/false Display connection warnings silent true/false Be more silent timeout integer Query timeout in seconds; less than zero means no timeout trimScripts true/false Remove trailing spaces from lines read from script files verbose true/false Show verbose error messages and debug info !sql Execute a SQL command !tables List all the tables in the database !typeinfo Display the type map for the current connection !verbose Set verbose mode on Comments, bug reports, and patches go to ???
这里面特别常见的有
!table 查看表
!quit 退出
平时命令行大多数都是输入SQL查看结果
对SQL的支持命令:
· SELECT
· UPSERT VALUES
· UPSERT SELECT
· DELETE
· CREATE TABLE
· DROP TABLE
· CREATE FUNCTION
· DROP FUNCTION
· CREATE VIEW
· DROP VIEW
· CREATE SEQUENCE
· DROP SEQUENCE
· ALTER
· CREATE INDEX
· DROP SEQUENCE
· ALTER
· CREATE INDEX
· DROP INDEX
· ALTER INDEX
· EXPLAIN
· UPDATE STATISTICS
· CREATE SCHEMA
· USE
· DROP SCHEMA
注意:在没有索引的情况下,针对大表使用非索引查询会非常耗时,很有可能会报超时错误.
JDBC对Phoenix的基本操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 import java.sql.*;public class BaseDB { public static Connection getConnection () { try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver" ); return DriverManager.getConnection("jdbc:phoenix:172.16.0.128:2181" ); } catch (Exception e) { e.printStackTrace(); return null ; } } public static void create () { Connection conn = null ; try { conn = BaseDB.getConnection(); if (conn == null ) { System.out.println("conn is null..." ); return ; } ResultSet rs = conn.getMetaData().getTables(null , null , "USER" , null ); if (rs.next()) { System.out.println("table user is exist..." ); return ; } String sql = "CREATE TABLE user (id varchar PRIMARY KEY,INFO.account varchar ,INFO.passwd varchar)" ; PreparedStatement ps = conn.prepareStatement(sql); ps.execute(); System.out.println("create success..." ); } catch (SQLException e) { e.printStackTrace(); } finally { if (conn != null ) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static void upsert () { Connection conn = null ; try { conn = BaseDB.getConnection(); if (conn == null ) { System.out.println("conn is null..." ); return ; } String sql = "upsert into user(id, INFO.account, INFO.passwd) values('001', 'admin', 'admin')" ; PreparedStatement ps = conn.prepareStatement(sql); String msg = ps.executeUpdate() > 0 ? "insert success..." : "insert fail..." ; conn.commit(); System.out.println(msg); } catch (SQLException e) { e.printStackTrace(); } finally { if (conn != null ) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static void query () { Connection conn = null ; try { conn = BaseDB.getConnection(); if (conn == null ) { System.out.println("conn is null..." ); return ; } String sql = "select * from user" ; PreparedStatement ps = conn.prepareStatement(sql); ResultSet rs = ps.executeQuery(); System.out.println("id" + "\t" + "account" + "\t" + "passwd" ); System.out.println("======================" ); if (rs != null ) { while (rs.next()) { System.out.print(rs.getString("id" ) + "\t" ); System.out.print(rs.getString("account" ) + "\t" ); System.out.println(rs.getString("passwd" )); } } } catch (SQLException e) { e.printStackTrace(); } finally { if (conn != null ) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static void delete () { Connection conn = null ; try { conn = BaseDB.getConnection(); if (conn == null ) { System.out.println("conn is null..." ); return ; } String sql = "delete from user where id='001'" ; PreparedStatement ps = conn.prepareStatement(sql); String msg = ps.executeUpdate() > 0 ? "delete success..." : "delete fail..." ; conn.commit(); System.out.println(msg); } catch (SQLException e) { e.printStackTrace(); } finally { if (conn != null ) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static void drop () { Connection conn = null ; try { conn = BaseDB.getConnection(); if (conn == null ) { System.out.println("conn is null..." ); return ; } String sql = "drop table user" ; PreparedStatement ps = conn.prepareStatement(sql); ps.execute(); System.out.println("drop success..." ); } catch (SQLException e) { e.printStackTrace(); } finally { if (conn != null ) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static void main (String[] args) { create(); upsert(); query(); } }
基本操作和JDBC基本一样,只要在Class里面修改成Phoenix就可以
无索引简单测试 首先上结论: 针对主键根据单纬度查询,数据量对搜索结果影响非常小,如果仅仅是返回单条结果的查询,能够达到毫秒级反应速度,根据主键批量查询的话速度由表大小和返回数量决定,从目前数据量11亿左右响应速度也在秒级.
但是如果没有建立索引,想根据非主键查询,反应时间会非常久.
响应时间测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public static void main (String[] args) throws Throwable { try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver" ); String url = "jdbc:phoenix:datanode128:2181" ; Connection conn = DriverManager.getConnection(url); Statement statement = conn.createStatement(); long time = System.currentTimeMillis(); ResultSet rs = statement.executeQuery("select * from test" ); while (rs.next()) { String myKey = rs.getString("MYKEY" ); String myColumn = rs.getString("MYCOLUMN" ); System.out.println("myKey=" + myKey + "myColumn=" + myColumn); } long timeUsed = System.currentTimeMillis() - time; System.out.println("time " + timeUsed + "mm" ); rs.close(); statement.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } }
结果:
120ms
CREATE TABLE IF NOT EXISTS “employee” (“no” VARCHAR(10) NOT NULL PRIMARY KEY, “company”.”name” VARCHAR(30),”company”.”position” VARCHAR(20), “family”.”tel” VARCHAR(20), “family”.”age” INTEGER);
csv columns from database. CSV Upsert complete. 1000000 rows upserted
测试结果:
100w: insert 70s count:1.032s groupby PK: 0.025s
500w:insert 314s count:1.246s groupby PK:0.024s
从结果看,随着数量级的增加,查询时耗也随之增加,有一个例外,就是当用索引字段为主键时作聚合查询时,用时相差不大。总的来说,Phoenix在用到索引时查询性能会比较好。那对于Count来说,如果不用Phoenix,用HBase自带的Count耗时是怎样的呢,测了一下,HBase Count 100万需要33s, 500万需要139s,性能还是很差的。对于大表来说基本不能用Count来统计行数,还得依赖于基于Coprocessor机制来统计。
JDBC模拟数据代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 package test;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.UUID;import org.apache.commons.lang3.RandomUtils;import org.apache.commons.lang3.time.DateUtils;public class BuildData {private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss" );private static String[] ys = new String[] { "红" , "橙" , "黄" , "绿" , "青" , "蓝" , "紫" };private static String[] cx = new String[] { "大众" , "别克" , "奥迪" , "宝马" };private static String[] zm = new String[] { "A" , "B" , "C" , "D" , "E" , "F" , "G" , "H" , "I" , "G" , "K" , "L" , "M" , "N" ,"O" , "P" , "Q" , "R" , "S" , "T" , "U" , "V" , "W" , "X" , "Y" , "Z" };private static String[] zmSz = new String[] { "A" , "B" , "C" , "D" , "E" , "F" , "G" , "H" , "I" , "G" , "K" , "L" , "M" , "N" ,"O" , "P" , "Q" , "R" , "S" , "T" , "U" , "V" , "W" , "X" , "Y" , "Z" , "1" , "2" , "3" , "4" , "5" , "6" , "7" , "8" , "9" ,"0" };private static String[] prv = new String[] { "京" , "津" , "沪" , "渝" , "冀" , "晋" , "辽" , "吉" , "黑" , "苏" , "浙" , "皖" , "闽" , "赣" ,"鲁" , "豫" , "鄂" , "湘" , "粤" , "琼" , "川" , "贵" , "云" , "陕" , "甘" , "青" , "台" , "蒙" , "桂" , "宁" , "新" , "藏" , "港" , "澳" };private Calendar start = Calendar.getInstance();private Calendar end = Calendar.getInstance();private int dayOfSecond = 250 ;public BuildData (int dayOfSecond, Date start, Date end) {this .start.setTime(start);this .end.setTime(end);this .dayOfSecond = dayOfSecond;} public String[][] buildData() {if (dayOfSecond > 0 ) {String date = getDate(); if (date == null ) {return null ;} String[][] result = new String[dayOfSecond][5 ]; for (int i = 0 ; i < dayOfSecond; i++) {result[i][0 ] = getId(); result[i][1 ] = date; result[i][2 ] = getHphm(); result[i][3 ] = getCx(); result[i][4 ] = getYs(); } return result;} return null ;} private String getId () {String string = UUID.randomUUID().toString().replaceAll("-" , "" ); return string;} private String getDate () {if (end.getTime().getTime() >= start.getTime().getTime()) {start.setTime(DateUtils.addSeconds(start.getTime(), 1 )); return sdf.format(start.getTime());} return null ;} private String getHphm () {String p = prv[RandomUtils.nextInt(0 , prv.length)]; StringBuilder sb = new StringBuilder(p); sb.append(zm[RandomUtils.nextInt(0 , zm.length)]); for (int i = 0 ; i < 5 ; i++) {sb.append(zmSz[RandomUtils.nextInt(0 , zmSz.length)]); } return sb.toString();} private String getCx () {int nextInt = RandomUtils.nextInt(0 , cx.length);return cx[nextInt];} private String getYs () {int nextInt = RandomUtils.nextInt(0 , ys.length);return ys[nextInt];} public static void main (String[] args) {} } package test;public class CarVo {private String id;private String date;private String hphm;private String cx;private String ys;public String getId () {return id;} public void setId (String id) {this .id = id;} public String getDate () {return date;} public void setDate (String date) {this .date = date;} public String getHphm () {return hphm;} public void setHphm (String hphm) {this .hphm = hphm;} public String getCx () {return cx;} public void setCx (String cx) {this .cx = cx;} public String getYs () {return ys;} public void setYs (String ys) {this .ys = ys;} } package test;import java.sql.Connection;import java.sql.DriverManager;import java.text.ParseException;import java.util.Date;import org.apache.commons.dbutils.QueryRunner;import org.apache.commons.lang3.time.DateUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class InsertData {private static Logger log = LoggerFactory.getLogger(InsertData.class);public static void main (String[] args) throws ParseException {args = new String[] { "20170103000000" , "20170131000000" }; Date start = DateUtils.parseDate(args[0 ], "yyyyMMddHHmmss" ); Date end = DateUtils.parseDate(args[1 ], "yyyyMMddHHmmss" ); try {Connection connection = null ; Class.forName("org.apache.phoenix.jdbc.PhoenixDriver" ); connection = DriverManager.getConnection("jdbc:phoenix:172.16.0.128:2181" , "" , "" ); QueryRunner queryRunner = new QueryRunner(); BuildData buildData = new BuildData(250 , start, end); String[][] buildData2 = null ; int num = 0 ;long all = 0 ;while ((buildData2 = buildData.buildData()) != null ) {int [] batch = queryRunner.batch(connection, "upsert into car_test values(?,?,?,?,?)" , buildData2);num += batch.length; all += batch.length; if (num > 1000 ) {long time1 = System.currentTimeMillis();connection.commit(); long time2 = System.currentTimeMillis();num = 0 ; System.out.println(new Date() + ":" + "-start:" + args[0 ] + "-end:" + args[1 ] + "--" + all + "--" + (time2 - time1)); } } } catch (Exception e) { e.printStackTrace(); } } }
Phoenix和Spark整合 数据格式:
imei
alarm_type
lat
lng
device_status
mc_type
read_status
speed
addr
index_name
user_id
user_parent_id
spark写入HBase(通过Phoenix)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 object sparkPhoenixSave { def main (args:Array [String ]){ val sparkConf = new SparkConf ().setAppName("sparkToPhoenix" ) val sc = new SparkContext (SparkConf ) val sqlContext = new org.apache.spark.sql.SQLContext (sc) import sqlContext.implicits._ val rdd = List ((1 L,"1" ,1 ),(2 L,"2" ,2 ),(3 L,"3" ,3 )) val df = rdd.toDF("id" ,"col1" ,"col2" ) df.show() df.save("org.apache.phoenix.spark" ,SaveMode .Overwrite ,Map ("table" -> "GPS" ,"zkUrl" -> "172.16.0.126:2181/hbase-unsecure" )) } }
问题点:
依赖方面有几个问题:
集群升级后采用了Phoenix 4.14 - HBase 1.2.0版本。
这个版本的依赖经过一段时间的试验之后选择了如下的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.11</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.apache.phoenix</groupId > <artifactId > phoenix-spark</artifactId > <version > 4.14.0-HBase-1.2</version > </dependency > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-client</artifactId > <version > 1.2.0</version > </dependency > <dependency > <groupId > org.apache.phoenix</groupId > <artifactId > phoenix-core</artifactId > <version > 4.14.0-HBase-1.2</version > </dependency >
理论上只要用
1 2 3 4 5 <dependency > <groupId > org.apache.phoenix</groupId > <artifactId > phoenix-server</artifactId > <version > 4.14.0-cdh5.13.2</version > </dependency >
就能解决问题,但是这个依赖一直有问题,手动下载也没有解决加载问题
我们集群是5.13.3的,这边选择的是5.13.2的,应该能够兼容的,深入进去看里面的组件,其实HBase版本啥的都一样。
但是Phoenix4.14.0-CDH版本安装的时候在HBase-site.xml中已经有了两个改动,所以这边有两个选择,要么在代码中配置hbase的选项,
1 2 hbConfig.set("phoenix.schema.isNamespaceMappingEnabled" ,"true" ); hbConfig.set("phoenix.schema.mapSystemTablesToNamespace " ,"true" );
要么在resource文件夹中添加hbase-site.xml,后者更加方便一点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 19 /10 /14 17 :07 :39 INFO ConnectionQueryServicesImpl: HConnection established. Stacktrace for informational purposes: hconnection-0x1de9b505 java.lang.Thread.getStackTrace(Thread.java:1559 )org.apache.phoenix.util.LogUtil.getCallerStackTrace(LogUtil.java:55 ) org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:427 ) org.apache.phoenix.query.ConnectionQueryServicesImpl.access$400 (ConnectionQueryServicesImpl.java:267 ) org.apache.phoenix.query.ConnectionQueryServicesImpl$12 .call(ConnectionQueryServicesImpl.java:2515 ) org.apache.phoenix.query.ConnectionQueryServicesImpl$12 .call(ConnectionQueryServicesImpl.java:2491 ) org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76 ) org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2491 ) org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255 ) org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:150 ) org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221 ) java.sql.DriverManager.getConnection(DriverManager.java:664 ) java.sql.DriverManager.getConnection(DriverManager.java:208 ) org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:113 ) org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:58 ) org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getSelectColumnMetadataList(PhoenixConfigurationUtil.java:354 ) org.apache.phoenix.spark.PhoenixRDD.toDataFrame(PhoenixRDD.scala:118 ) org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:60 ) org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:40 ) org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389 ) org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146 ) org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125 ) phoenix.SparkPhoenixRead$.main(SparkPhoenixRead.scala:17 ) phoenix.SparkPhoenixRead.main(SparkPhoenixRead.scala)
配置完成之后,使用代码的时候,始终在报如上错误。
分析之后发现这并不是报错,而是长得像报错的日志格式。
1 2 3 4 5 6 7 8 9 10 11 12 logger.info("HConnection established. Stacktrace for informational purposes: " + connection + " " + LogUtil.getCallerStackTrace()); public static String getCallerStackTrace () { StackTraceElement[] st = Thread.currentThread().getStackTrace(); StringBuilder sb = new StringBuilder(); for (StackTraceElement element : st) { sb.append(element.toString()); sb.append("\n" ); } return sb.toString(); }
获取调用栈并记入日记,不是bug,==
最终使用的代码是Spark2.0以上格式的SparkSession
1 2 3 4 5 6 7 8 9 10 11 val spark = SparkSession .builder().master("local[*]" ).appName("sparkPhoenix" ).getOrCreate() val df = spark.read.format("org.apache.phoenix.spark" ) .option("zkUrl" ,"172.16.0.127:2181" ) .option("table" ,"TABLE_NAME" ) .load() df.show() spark.stop()
Spark与Phoenix Jar包冲突说明:
1 --conf spark.driver.extraClassPath=phoenix-spark-4.14.0-HBase-1.2.jar --conf spark.executor.extraClassPath=phoenix-spark-4.14.0-HBase-1.2.jar
Phoenix Jar包冲突(大坑) Jar包冲突的原因是HBase…这个包和CDH…这个包加载顺序的问题
先加载CDH这个包的话就会导致问题Jar包冲突,必须先加载HBase包,要手动指定。
图上还缺少一个HBase包放在oozie的lib里面,这些是加载必备的。
不然会出现奇怪的错误,因为加载是随机加载的,如果先加载了CDH包就会报错。
映射、索引分区 映射: 默认情况下,直接在hbase中创建的表,通过phoenix是查看不到的
test是在hbase中直接创建的,默认情况下,在phoenix中是查看不到test的。
有两种映射方法,一种是视图映射,一种是表映射。
视图创建过后,直接删除,Hbase中的原表不会受影响,如果创建的是表映射,删除Phoenix中的映射表,会把原表也删除.
一、基础知识
Salted Table 是phoenix为了防止hbase表rowkey设计为自增序列而引发热点region读和热点region写而采取的一种表设计手段。通过在创建表的时候指定Salt_Buckets来实现pre-split,下面的建表语句建表的时候将会把表预分割到20个region里面。
1 2 CREATE TABLE SALT_TEST (A_KEY VARCHAR PRIMARY KEY, A_col VARCHAR) SALT_BUCKETS = 20
默认情况下,对salted table 创建耳机索引,二级索引表会随同原表进行salted切分,salt_buckets与原表保持一致,当然,在创建耳机索引表的时候也可以自定义salt_buckets的数量,phoenix没有强制数量必须和原表一致。
二、实现原理
讲一个散列取余后的byte值插入到rowkey的第一个字节里,通过定义每个region的start key和end key将数据分割到不同region,以此来防止自增序列引入的热点问题。从而达到平衡hbase集群的读写性能问题。
salted byte的计算方式大致如下
hash(rowkey) % SALT_BUCKETS
默认下salted byte将作为每个region的start key及 end key,以此分割数据到不同的region,
这样能做到具有相同的salted byte处在一个region。
三、本质
本质就是在hbase中
rowkey前面加上一个字节,在表中实际存储时,就可以自动分布到不同的region中去了。
四、实例
1 2 3 4 5 CREATE TABLE SALT_TEST (a_key VARCHAR PRIMARY KEY, a_col VARCHAR) SALT_BUCKETS = 4; UPSERT INTO SALT_TEST(a_key, a_col) VALUES('key_abc', 'col_abc'); UPSERT INTO SALT_TEST(a_key, a_col) VALUES('key_ABC', 'col_ABC'); UPSERT INTO SALT_TEST(a_key, a_col) VALUES('key_rowkey01', 'col01');
从Phoenix sqlline.py查询数据 看不出区别,去hbase scan 就能看到phoenix是在rowkey的第一个字节插入一个byte字节。
五、注意
每条rowkey前面加一个Byte,这里显示为16进制,创建完成之后,应该使用Phoenix SQL来读取数据,不要混合使用Phoenix Sql插入数据,使得原始rowkey前面被自动加上一个byte。
同步索引和异步索引
一般我可以使用create index来创建一个索引,这是一种同步的方法,但是有时候我们创建索引的表非常大,
我们需要等很长时间,Phoenix 4.5 以后有一个异步创建索引的方式,使用关键字ASYNC来创建索引:
实际上在二级索引中,我们需要先将phoenix的client包放入hbase的lib中然后在启动,这个IndexTool底层走的是MR,MR任务运行完毕后,索引会被自动引导,当我们在phoenix命令行中使用!table,看到索引表已经处于enable状态就可以使用该索引了。
官网demo里面的建立索引,仅仅针对一个字段,这样涉及稍微复杂的业务,索引并不能起效,还是全表索引。
二级索引分为以下几种:
全局索引、本地索引、覆盖索引
全局索引:
全局索引是默认索引类型,适用于读多写少的场景,由于全局索引会拦截(DELETE,UPSERT VALUES and UPSERT SELECT)数据更新并更新索引表,而索引表十分不在不用数据节点上的,跨节点的数据传输带来了较大的性能损耗。
本地索引
本地索引适用于少读多写
必须通过下面两部才能完成异步索引的构建。
1 CRAETE INDEX anync_index ON PHOENIX_ALARM_DIS(LAT,LNG,IMEI,CREATETIME) ASYNC
创建异步索引,命令执行后一开始不会有数据,还必须使用单独的命令行工具来执行数据的创建,当语句给执行的时候,后端会启动mr任务,只有等到这个任务结束,数据都被生成在索引表中后,这个索引才能被使用,创建语句执行完后,还需要用工具导入数据。
官网说明:
1 2 3 4 5 6 7 8 9 一般我们可以使用CREATE INDEX来创建一个索引,这是一种同步的方法。但是有时候我们创建索引的表非常大,我们需要等很长时间。Phoenix 4.5以后有一个异步创建索引的方式,使用关键字ASYNC来创建索引: CREATE INDEX index1_c ON hao1 (age) INCLUDE(name) ASYNC; 这时候创建的索引表中不会有数据。你还必须要单独的使用命令行工具来执行数据的创建。当语句给执行的时候,后端会启动一个map reduce任务,只有等到这个任务结束,数据都被生成在索引表中后,这个索引才能被使用。启动工具的方法: ${HBASE_HOME}/bin/hbase org.apache.phoenix.mapreduce.index.IndexTool --schema MY_SCHEMA --data-table MY_TABLE --index-table ASYNC_IDX --output-path ASYNC_IDX_HFILES 这个任务不会因为客户端给关闭而结束,是在后台运行。你可以在指定的文件ASYNC_IDX_HFILES中找到最终实行的结果。
测试和遇到的一些问题 使用Spark写入11亿5千万测试数据,分为24个分区,使用异步索引针对time和imei字段各建立了一个索引.
这边有个问题,索引在创建完成之后,数据类型自动变化了,详细:
上面两张图分别是索引表和原数据表的数据类型,可以看到原表和索引表的数据类型并不相同,让人有点费解。
考虑到phoenix有很多时间种类和别的一些情况,于是提出了几种解决办法:
1.建表的时候不用timestamp作为时间类型
2.改二级索引表的字段类型,将decimal改为bigint
3.将seq_id建表时候,设置为not null,并属于联合主键
4.从搜索的角度考虑,能不能将搜索的数据类型更改
第一种方法尝试换了一种时间类型后,建立索引任然更改了为了decimal
第二种方法更改索引类型经过尝试后发现是不可行的
第三种方法和第四种方法是可行的,第四种方法更加简单
第四种方法,本来准备自己在代码中实现对时间的转换,后来发现phoenix SQL内置了timestamp转换
建立完成索引之后,直接
1 select CREATETIME,IMEI,LAT,LNG from PHOENIX_ALARM_DIS where CREATETIME > TO_TIMESTAMP('2017-08-30 06:21:46.732' );
这种方式会引导查询走索引
关于索引,这边还有需要一提的就是,创建索引之后
例如:
1 CREATE INDEX INDEX_1 ON TABLE (A,B) INCLUDE (C,D) ASYNC;
创建索引之后并不能指定B为查询条件,explain会发现依然是在全局扫描,效率很低,这边稍微尝试了一下,在建立一个
1 CREATE INDEX INDEX_1 ON TABLE (B,A) INCLUDE (C,D) ASYNC;
建立了两个索引之后,两个索引就占了大概80G的硬盘空间,这个表格用parquet.snappy格式存储之后才120G,真正是用空间换时间。
测试结果
数据量:
11亿5000w 占用磁盘空间:120G
索引占用空间:40G
下面分别是50并发 100并发 200并发的测试结果,
测试中现在Phoenix建表,用Spark写入,随机IMEI和时间段,时间长度为1天,用对应的线程数至少跑30分钟以上.,测试过程中的CPU使用量因为测试集群性能较高,使用最多不超过40%,所以没有记录.
50并发稳定后的查询大多在200ms以下.
100并发的查询时间大多数在400ms以下
200并发的查询大多数在500ms以下.
针对云车金融数据的测试:
云车金融测试采用映射表,原数据在HBase中,在Phoenix中建立映射表,把原先设计好的ROWKEY作为主键,建立映射表.
根据主键范围模糊查询 采用SQL模糊查询,因为没有并发要求,直接在命令行输入SQL查询.
针对某条IMEI单天的查询,反应时间在3-5s左右.
结论:模糊查询都是走全表,无法优化,效率比较低。
关于映射表 官网有提及,映射表并不推荐用,数据从hbase写入,通过phoenix读取并不是个好主意,因为:
1.Phoenix创建的表有很多数据类型,但是从hbase映射表的话只能有一种类型:varchar,否则就会报错。字段只有varchar会给查询带来一定的麻烦。
2.大表的话创建映射表肯定会超时,需要根据版本修改配置信息,max超时时间拉大,拉到多少比较好,这个时间需要把握。
映射表如果因为这样或者那样的原因创建失败的话是不能直接删除的,直接删除会导致hbase原表也被删除,可以在SYSTEM.CATALOG中把和映射表有关的信息删除,比如:
delete from system.catalog where table_name = ‘MYTEST’;
就能把和关联表有关的信息全部删除了。
相比于HBase,性能上并没有优势