0%

Phoenix测试

Phoenix

Phoenix实践

Phoenix启动安装基本操作

二级索引支持(gobal index + local index)

编译SQL成为原生HBase的可并行执行的Scan

Phoenix结构

4.jpg

Phoenix在Hadoop生态系统中的位置

5.jpg

HBase性能提升

hbase1.2性能能提:

1.2相对1.1,提升是十分显著的,在某些方面的额提升,延迟低了好几倍,

更别说Hive over HBase,Hive over Hbase的性能下,Phoenix的性能是这种的好多倍。

启动

配置:配置相对来说比较简单

Download:http://phoenix.apache.org/download.html,下载hbase对应版本的phoenix;解压bin.tar.gz包,拷贝**phoenix server jar**包到hbase集群的每个region server 的lib目录下,然后重启hbase 集群。

phoniex 的启动命令是sqlline.py

使用bin/sqlline.py 172.16.0.128:2181连接上zk,就可以连接上HBase

命令行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
!all               Execute the specified SQL against all the current connections
!autocommit Set autocommit mode on or off
!batch Start or execute a batch of statements
!brief Set verbose mode off
!call Execute a callable statement
!close Close the current connection to the database
!closeall Close all current open connections
!columns List all the columns for the specified table
!commit Commit the current transaction (if autocommit is off)
!connect Open a new connection to the database.
!dbinfo Give metadata information about the database
!describe Describe a table
!dropall Drop all tables in the current database
!exportedkeys List all the exported keys for the specified table
!go Select the current connection
!help Print a summary of command usage
!history Display the command history
!importedkeys List all the imported keys for the specified table
!indexes List all the indexes for the specified table
!isolation Set the transaction isolation for this connection
!list List the current connections
!manual Display the SQLLine manual
!metadata Obtain metadata information
!nativesql Show the native SQL for the specified statement
!outputformat Set the output format for displaying results
(table,vertical,csv,tsv,xmlattrs,xmlelements)
!primarykeys List all the primary keys for the specified table
!procedures List all the procedures
!properties Connect to the database specified in the properties file(s)
!quit Exits the program
!reconnect Reconnect to the database
!record Record all output to the specified file
!rehash Fetch table and column names for command completion
!rollback Roll back the current transaction (if autocommit is off)
!run Run a script from the specified file
!save Save the current variabes and aliases
!scan Scan for installed JDBC drivers
!script Start saving a script to a file
!set Set a sqlline variable

Variable Value
Description
=============== ==========
autoCommit true/false
Enable/disable automatic
transaction commit
autoSave
true/false Automatically save preferences
color true/false
Control whether color is used
for display
fastConnect
true/false Skip building table/column list
for
tab-completion
force true/false Continue running script
even
after errors
headerInterval integer The interval between
which
headers are displayed
historyFile path File in which to
save command
history. Default is
$HOME/.sqlline/history
(UNIX,
Linux, Mac OS),
$HOME/sqlline/history
(Windows)
incremental true/false Do not receive all rows
from
server before printing the first
row. Uses fewer
resources,
especially for long-running
queries, but column
widths may
be incorrect.
isolation LEVEL Set transaction
isolation level
maxColumnWidth integer The maximum width to
use when
displaying columns
maxHeight integer The maximum
height of the
terminal
maxWidth integer The maximum width of
the
terminal
numberFormat pattern Format numbers
using
DecimalFormat pattern
outputFormat
table/vertical/csv/tsv Format mode for
result
display
propertiesFile path File from which SqlLine
reads
properties on startup; default
is
$HOME/.sqlline/sqlline.properties
(UNIX, Linux, Mac
OS),
$HOME/sqlline/sqlline.properties
(Windows)
rowLimit
integer Maximum number of rows returned
from a query; zero
means no
limit
showElapsedTime true/false Display execution
time when
verbose
showHeader true/false Show column names in
query
results
showNestedErrs true/false Display nested
errors
showWarnings true/false Display connection
warnings
silent true/false Be more silent
timeout integer
Query timeout in seconds; less
than zero means no
timeout
trimScripts true/false Remove trailing spaces
from
lines read from script files
verbose true/false Show
verbose error messages and
debug info
!sql Execute a SQL command
!tables List all the tables in the database
!typeinfo Display the type map for the current connection
!verbose Set verbose mode on

Comments, bug reports, and patches go to ???

这里面特别常见的有

!table 查看表

!quit 退出

平时命令行大多数都是输入SQL查看结果

对SQL的支持命令:

· SELECT

· UPSERT VALUES

· UPSERT SELECT

· DELETE

· CREATE TABLE

· DROP TABLE

· CREATE FUNCTION

· DROP FUNCTION

· CREATE VIEW

· DROP VIEW

· CREATE SEQUENCE

· DROP SEQUENCE

· ALTER

· CREATE INDEX

· DROP SEQUENCE

· ALTER

· CREATE INDEX

· DROP INDEX

· ALTER INDEX

· EXPLAIN

· UPDATE STATISTICS

· CREATE SCHEMA

· USE

· DROP SCHEMA

注意:在没有索引的情况下,针对大表使用非索引查询会非常耗时,很有可能会报超时错误.

JDBC对Phoenix的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
/**
* @Author Administrator
* @create 2019/9/9 17:13
*/

import java.sql.*;

public class BaseDB {

public static Connection getConnection() {

try {
// load driver
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");

// get connection
// jdbc 的 url 类似为 jdbc:phoenix [ :<zookeeper quorum> [ :<port number> ] [ :<root node> ] ],
// 需要引用三个参数:hbase.zookeeper.quorum、hbase.zookeeper.property.clientPort、and zookeeper.znode.parent,
// 这些参数可以缺省不填而在 hbase-site.xml 中定义。
return DriverManager.getConnection("jdbc:phoenix:172.16.0.128:2181");
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


public static void create() {
Connection conn = null;
try {
// get connection
conn = BaseDB.getConnection();

// check connection
if (conn == null) {
System.out.println("conn is null...");
return;
}

// check if the table exist
ResultSet rs = conn.getMetaData().getTables(null, null, "USER",
null);
if (rs.next()) {
System.out.println("table user is exist...");
return;
}
// create sql
String sql = "CREATE TABLE user (id varchar PRIMARY KEY,INFO.account varchar ,INFO.passwd varchar)";

PreparedStatement ps = conn.prepareStatement(sql);

// execute
ps.execute();
System.out.println("create success...");

} catch (SQLException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}


public static void upsert() {

Connection conn = null;
try {
// get connection
conn = BaseDB.getConnection();

// check connection
if (conn == null) {
System.out.println("conn is null...");
return;
}

// create sql
String sql = "upsert into user(id, INFO.account, INFO.passwd) values('001', 'admin', 'admin')";

PreparedStatement ps = conn.prepareStatement(sql);

// execute upsert
String msg = ps.executeUpdate() > 0 ? "insert success..."
: "insert fail...";

// you must commit
conn.commit();
System.out.println(msg);

} catch (SQLException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}


public static void query() {

Connection conn = null;
try {
// get connection
conn = BaseDB.getConnection();

// check connection
if (conn == null) {
System.out.println("conn is null...");
return;
}

// create sql
String sql = "select * from user";

PreparedStatement ps = conn.prepareStatement(sql);

ResultSet rs = ps.executeQuery();

System.out.println("id" + "\t" + "account" + "\t" + "passwd");
System.out.println("======================");

if (rs != null) {
while (rs.next()) {
System.out.print(rs.getString("id") + "\t");
System.out.print(rs.getString("account") + "\t");
System.out.println(rs.getString("passwd"));
}
}

} catch (SQLException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}


public static void delete() {

Connection conn = null;
try {
// get connection
conn = BaseDB.getConnection();

// check connection
if (conn == null) {
System.out.println("conn is null...");
return;
}

// create sql
String sql = "delete from user where id='001'";

PreparedStatement ps = conn.prepareStatement(sql);

// execute upsert
String msg = ps.executeUpdate() > 0 ? "delete success..."
: "delete fail...";

// you must commit
conn.commit();
System.out.println(msg);

} catch (SQLException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

}



public static void drop() {

Connection conn = null;
try {
// get connection
conn = BaseDB.getConnection();

// check connection
if (conn == null) {
System.out.println("conn is null...");
return;
}

// create sql
String sql = "drop table user";

PreparedStatement ps = conn.prepareStatement(sql);

// execute
ps.execute();

System.out.println("drop success...");

} catch (SQLException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

}

public static void main(String[] args) {

// 创角标
create();

// 注意更新用的是upsert 插入和更新一样的
upsert();

// 查询 查询出结果并打印
query();

// 删除表
// drop();
}


}

基本操作和JDBC基本一样,只要在Class里面修改成Phoenix就可以

无索引简单测试

首先上结论:

针对主键根据单纬度查询,数据量对搜索结果影响非常小,如果仅仅是返回单条结果的查询,能够达到毫秒级反应速度,根据主键批量查询的话速度由表大小和返回数量决定,从目前数据量11亿左右响应速度也在秒级.

但是如果没有建立索引,想根据非主键查询,反应时间会非常久.

响应时间测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) throws Throwable {

try {

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");

String url = "jdbc:phoenix:datanode128:2181";

Connection conn = DriverManager.getConnection(url);

Statement statement = conn.createStatement();

long time = System.currentTimeMillis();

ResultSet rs = statement.executeQuery("select * from test");

while (rs.next()) {
String myKey = rs.getString("MYKEY");
String myColumn = rs.getString("MYCOLUMN");

System.out.println("myKey=" + myKey + "myColumn=" + myColumn);
}

long timeUsed = System.currentTimeMillis() - time;

System.out.println("time " + timeUsed + "mm");

// 关闭连接
rs.close();
statement.close();
conn.close();

} catch (Exception e) {
e.printStackTrace();
}
}

结果:

120ms

CREATE TABLE IF NOT EXISTS “employee” (“no” VARCHAR(10) NOT NULL PRIMARY KEY, “company”.”name” VARCHAR(30),”company”.”position” VARCHAR(20), “family”.”tel” VARCHAR(20), “family”.”age” INTEGER);

csv columns from database. CSV Upsert complete. 1000000 rows upserted

测试结果:

100w: insert 70s count:1.032s groupby PK: 0.025s

500w:insert 314s count:1.246s groupby PK:0.024s

从结果看,随着数量级的增加,查询时耗也随之增加,有一个例外,就是当用索引字段为主键时作聚合查询时,用时相差不大。总的来说,Phoenix在用到索引时查询性能会比较好。那对于Count来说,如果不用Phoenix,用HBase自带的Count耗时是怎样的呢,测了一下,HBase Count 100万需要33s, 500万需要139s,性能还是很差的。对于大表来说基本不能用Count来统计行数,还得依赖于基于Coprocessor机制来统计。

JDBC模拟数据代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package test;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.UUID;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.time.DateUtils;

//id,日期,号牌号码,车型,颜色
public class BuildData {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
private static String[] ys = new String[] { "红", "橙", "黄", "绿", "青", "蓝", "紫" };
private static String[] cx = new String[] { "大众", "别克", "奥迪", "宝马" };
private static String[] zm = new String[] { "A", "B", "C", "D", "E", "F", "G", "H", "I", "G", "K", "L", "M", "N",
"O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" };
private static String[] zmSz = new String[] { "A", "B", "C", "D", "E", "F", "G", "H", "I", "G", "K", "L", "M", "N",
"O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "1", "2", "3", "4", "5", "6", "7", "8", "9",
"0" };
private static String[] prv = new String[] { "京", "津", "沪", "渝", "冀", "晋", "辽", "吉", "黑", "苏", "浙", "皖", "闽", "赣",
"鲁", "豫", "鄂", "湘", "粤", "琼", "川", "贵", "云", "陕", "甘", "青", "台", "蒙", "桂", "宁", "新", "藏", "港", "澳" };

private Calendar start = Calendar.getInstance();
private Calendar end = Calendar.getInstance();

private int dayOfSecond = 250;

public BuildData(int dayOfSecond, Date start, Date end) {
this.start.setTime(start);
this.end.setTime(end);
this.dayOfSecond = dayOfSecond;
}

// 生成数据
public String[][] buildData() {
if (dayOfSecond > 0) {
String date = getDate();
if (date == null) {
return null;
}
String[][] result = new String[dayOfSecond][5];
for (int i = 0; i < dayOfSecond; i++) {
result[i][0] = getId();
result[i][1] = date;
result[i][2] = getHphm();
result[i][3] = getCx();
result[i][4] = getYs();
}
return result;
}
return null;
}

// id
private String getId() {
String string = UUID.randomUUID().toString().replaceAll("-", "");
return string;
}

// 日期
private String getDate() {
if (end.getTime().getTime() >= start.getTime().getTime()) {
start.setTime(DateUtils.addSeconds(start.getTime(), 1));
return sdf.format(start.getTime());
}
return null;
}

// 车牌 苏E3G02D
private String getHphm() {
String p = prv[RandomUtils.nextInt(0, prv.length)];
StringBuilder sb = new StringBuilder(p);
sb.append(zm[RandomUtils.nextInt(0, zm.length)]);
for (int i = 0; i < 5; i++) {
sb.append(zmSz[RandomUtils.nextInt(0, zmSz.length)]);
}
return sb.toString();
}

// 车型
private String getCx() {
int nextInt = RandomUtils.nextInt(0, cx.length);
return cx[nextInt];
}

// 颜色
private String getYs() {
int nextInt = RandomUtils.nextInt(0, ys.length);
return ys[nextInt];
}

public static void main(String[] args) {
}
}

package test;

public class CarVo {

private String id;
private String date;
private String hphm;
private String cx;
private String ys;

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

public String getHphm() {
return hphm;
}

public void setHphm(String hphm) {
this.hphm = hphm;
}

public String getCx() {
return cx;
}

public void setCx(String cx) {
this.cx = cx;
}

public String getYs() {
return ys;
}

public void setYs(String ys) {
this.ys = ys;
}

}

package test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.text.ParseException;
import java.util.Date;

import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InsertData {
private static Logger log = LoggerFactory.getLogger(InsertData.class);

public static void main(String[] args) throws ParseException {
args = new String[] { "20170103000000", "20170131000000" };
Date start = DateUtils.parseDate(args[0], "yyyyMMddHHmmss");
Date end = DateUtils.parseDate(args[1], "yyyyMMddHHmmss");

try {
Connection connection = null;
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection("jdbc:phoenix:172.16.0.128:2181", "", "");
QueryRunner queryRunner = new QueryRunner();

BuildData buildData = new BuildData(250, start, end);
String[][] buildData2 = null;
int num = 0;
long all = 0;
while ((buildData2 = buildData.buildData()) != null) {
int[] batch = queryRunner.batch(connection, "upsert into car_test values(?,?,?,?,?)", buildData2);
num += batch.length;
all += batch.length;
if (num > 1000) {
long time1 = System.currentTimeMillis();
connection.commit();
long time2 = System.currentTimeMillis();
num = 0;
System.out.println(new Date() + ":" + "-start:" + args[0] + "-end:" + args[1] + "--" + all + "--"
+ (time2 - time1));
}
// num++;
// all++;
// if (num > 1000) {
// num=0;
// System.out.println(new Date() + ":" + "-start:" + args[0] +
// "-end:" + args[1] + "--" + all);
// }
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

Phoenix和Spark整合

数据格式:

imei alarm_type lat lng device_status mc_type read_status speed addr index_name user_id user_parent_id

spark写入HBase(通过Phoenix)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
object sparkPhoenixSave{
def main(args:Array[String]){
val sparkConf = new SparkConf().setAppName("sparkToPhoenix")
val sc = new SparkContext(SparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

// 从集合创建rdd
val rdd = List((1L,"1",1),(2L,"2",2),(3L,"3",3))
// 从rdd创建DF
val df = rdd.toDF("id","col1","col2")
df.show()

// Save to OUTPUT_TABLE
df.save("org.apache.phoenix.spark",SaveMode.Overwrite,Map("table" -> "GPS","zkUrl" -> "172.16.0.126:2181/hbase-unsecure"))
}
}

问题点:

依赖方面有几个问题:

集群升级后采用了Phoenix 4.14 - HBase 1.2.0版本。

这个版本的依赖经过一段时间的试验之后选择了如下的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- phoenix spark-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.14.0-HBase-1.2</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.14.0-HBase-1.2</version>
</dependency>

理论上只要用

1
2
3
4
5
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-server</artifactId>
<version>4.14.0-cdh5.13.2</version>
</dependency>

就能解决问题,但是这个依赖一直有问题,手动下载也没有解决加载问题

我们集群是5.13.3的,这边选择的是5.13.2的,应该能够兼容的,深入进去看里面的组件,其实HBase版本啥的都一样。

但是Phoenix4.14.0-CDH版本安装的时候在HBase-site.xml中已经有了两个改动,所以这边有两个选择,要么在代码中配置hbase的选项,

1
2
hbConfig.set("phoenix.schema.isNamespaceMappingEnabled","true");
hbConfig.set("phoenix.schema.mapSystemTablesToNamespace ","true");

要么在resource文件夹中添加hbase-site.xml,后者更加方便一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
19/10/14 17:07:39 INFO ConnectionQueryServicesImpl: HConnection established. Stacktrace for informational purposes: hconnection-0x1de9b505 java.lang.Thread.getStackTrace(Thread.java:1559)
org.apache.phoenix.util.LogUtil.getCallerStackTrace(LogUtil.java:55)
org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:427)
org.apache.phoenix.query.ConnectionQueryServicesImpl.access$400(ConnectionQueryServicesImpl.java:267)
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2515)
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2491)
org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2491)
org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:150)
org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
java.sql.DriverManager.getConnection(DriverManager.java:664)
java.sql.DriverManager.getConnection(DriverManager.java:208)
org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:113)
org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:58)
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getSelectColumnMetadataList(PhoenixConfigurationUtil.java:354)
org.apache.phoenix.spark.PhoenixRDD.toDataFrame(PhoenixRDD.scala:118)
org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:60)
org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:40)
org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
phoenix.SparkPhoenixRead$.main(SparkPhoenixRead.scala:17)
phoenix.SparkPhoenixRead.main(SparkPhoenixRead.scala)

配置完成之后,使用代码的时候,始终在报如上错误。

分析之后发现这并不是报错,而是长得像报错的日志格式。

1
2
3
4
5
6
7
8
9
10
11
12
//ConnectionQueryServicesImpl.java
logger.info("HConnection established. Stacktrace for informational purposes: " + connection + " " + LogUtil.getCallerStackTrace());
//LogUtil.java
public static String getCallerStackTrace() {
StackTraceElement[] st = Thread.currentThread().getStackTrace();
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : st) {
sb.append(element.toString());
sb.append("\n");
}
return sb.toString();
}

获取调用栈并记入日记,不是bug,==

最终使用的代码是Spark2.0以上格式的SparkSession

1
2
3
4
5
6
7
8
9
10
11
//读取
val spark = SparkSession.builder().master("local[*]").appName("sparkPhoenix").getOrCreate()

val df = spark.read.format("org.apache.phoenix.spark")
.option("zkUrl","172.16.0.127:2181")
.option("table","TABLE_NAME")
.load()

df.show()

spark.stop()

Spark与Phoenix Jar包冲突说明:

1
--conf spark.driver.extraClassPath=phoenix-spark-4.14.0-HBase-1.2.jar --conf spark.executor.extraClassPath=phoenix-spark-4.14.0-HBase-1.2.jar

Phoenix Jar包冲突(大坑)

Jar包冲突的原因是HBase…这个包和CDH…这个包加载顺序的问题

先加载CDH这个包的话就会导致问题Jar包冲突,必须先加载HBase包,要手动指定。

8.png

图上还缺少一个HBase包放在oozie的lib里面,这些是加载必备的。

不然会出现奇怪的错误,因为加载是随机加载的,如果先加载了CDH包就会报错。

映射、索引分区

映射:

默认情况下,直接在hbase中创建的表,通过phoenix是查看不到的

test是在hbase中直接创建的,默认情况下,在phoenix中是查看不到test的。

有两种映射方法,一种是视图映射,一种是表映射。

视图创建过后,直接删除,Hbase中的原表不会受影响,如果创建的是表映射,删除Phoenix中的映射表,会把原表也删除.

一、基础知识

Salted Table 是phoenix为了防止hbase表rowkey设计为自增序列而引发热点region读和热点region写而采取的一种表设计手段。通过在创建表的时候指定Salt_Buckets来实现pre-split,下面的建表语句建表的时候将会把表预分割到20个region里面。

1
2
CREATE TABLE SALT_TEST (A_KEY VARCHAR PRIMARY KEY,
A_col VARCHAR) SALT_BUCKETS = 20

默认情况下,对salted table 创建耳机索引,二级索引表会随同原表进行salted切分,salt_buckets与原表保持一致,当然,在创建耳机索引表的时候也可以自定义salt_buckets的数量,phoenix没有强制数量必须和原表一致。

二、实现原理

讲一个散列取余后的byte值插入到rowkey的第一个字节里,通过定义每个region的start key和end key将数据分割到不同region,以此来防止自增序列引入的热点问题。从而达到平衡hbase集群的读写性能问题。

salted byte的计算方式大致如下

hash(rowkey) % SALT_BUCKETS

默认下salted byte将作为每个region的start key及 end key,以此分割数据到不同的region,

这样能做到具有相同的salted byte处在一个region。

三、本质

本质就是在hbase中

rowkey前面加上一个字节,在表中实际存储时,就可以自动分布到不同的region中去了。

四、实例

1
2
3
4
5
CREATE TABLE SALT_TEST (a_key VARCHAR PRIMARY KEY, a_col VARCHAR) SALT_BUCKETS = 4;

UPSERT INTO SALT_TEST(a_key, a_col) VALUES('key_abc', 'col_abc');
UPSERT INTO SALT_TEST(a_key, a_col) VALUES('key_ABC', 'col_ABC');
UPSERT INTO SALT_TEST(a_key, a_col) VALUES('key_rowkey01', 'col01');

从Phoenix sqlline.py查询数据 看不出区别,去hbase scan 就能看到phoenix是在rowkey的第一个字节插入一个byte字节。

五、注意

每条rowkey前面加一个Byte,这里显示为16进制,创建完成之后,应该使用Phoenix SQL来读取数据,不要混合使用Phoenix Sql插入数据,使得原始rowkey前面被自动加上一个byte。

同步索引和异步索引

一般我可以使用create index来创建一个索引,这是一种同步的方法,但是有时候我们创建索引的表非常大,

我们需要等很长时间,Phoenix 4.5 以后有一个异步创建索引的方式,使用关键字ASYNC来创建索引:

实际上在二级索引中,我们需要先将phoenix的client包放入hbase的lib中然后在启动,这个IndexTool底层走的是MR,MR任务运行完毕后,索引会被自动引导,当我们在phoenix命令行中使用!table,看到索引表已经处于enable状态就可以使用该索引了。

官网demo里面的建立索引,仅仅针对一个字段,这样涉及稍微复杂的业务,索引并不能起效,还是全表索引。

二级索引分为以下几种:

全局索引、本地索引、覆盖索引

全局索引:

全局索引是默认索引类型,适用于读多写少的场景,由于全局索引会拦截(DELETE,UPSERT VALUES and UPSERT SELECT)数据更新并更新索引表,而索引表十分不在不用数据节点上的,跨节点的数据传输带来了较大的性能损耗。

本地索引

本地索引适用于少读多写

必须通过下面两部才能完成异步索引的构建。

1
CRAETE INDEX anync_index ON PHOENIX_ALARM_DIS(LAT,LNG,IMEI,CREATETIME) ASYNC

创建异步索引,命令执行后一开始不会有数据,还必须使用单独的命令行工具来执行数据的创建,当语句给执行的时候,后端会启动mr任务,只有等到这个任务结束,数据都被生成在索引表中后,这个索引才能被使用,创建语句执行完后,还需要用工具导入数据。

官网说明:

1
2
3
4
5
6
7
8
9
一般我们可以使用CREATE INDEX来创建一个索引,这是一种同步的方法。但是有时候我们创建索引的表非常大,我们需要等很长时间。Phoenix 4.5以后有一个异步创建索引的方式,使用关键字ASYNC来创建索引:

CREATE INDEX index1_c ON hao1 (age) INCLUDE(name) ASYNC;
这时候创建的索引表中不会有数据。你还必须要单独的使用命令行工具来执行数据的创建。当语句给执行的时候,后端会启动一个map reduce任务,只有等到这个任务结束,数据都被生成在索引表中后,这个索引才能被使用。启动工具的方法:

${HBASE_HOME}/bin/hbase org.apache.phoenix.mapreduce.index.IndexTool
--schema MY_SCHEMA --data-table MY_TABLE --index-table ASYNC_IDX
--output-path ASYNC_IDX_HFILES
这个任务不会因为客户端给关闭而结束,是在后台运行。你可以在指定的文件ASYNC_IDX_HFILES中找到最终实行的结果。

测试和遇到的一些问题

使用Spark写入11亿5千万测试数据,分为24个分区,使用异步索引针对time和imei字段各建立了一个索引.

这边有个问题,索引在创建完成之后,数据类型自动变化了,详细:

10.jpg

11.jpg

上面两张图分别是索引表和原数据表的数据类型,可以看到原表和索引表的数据类型并不相同,让人有点费解。

考虑到phoenix有很多时间种类和别的一些情况,于是提出了几种解决办法:

1.建表的时候不用timestamp作为时间类型

2.改二级索引表的字段类型,将decimal改为bigint

3.将seq_id建表时候,设置为not null,并属于联合主键

4.从搜索的角度考虑,能不能将搜索的数据类型更改

第一种方法尝试换了一种时间类型后,建立索引任然更改了为了decimal

第二种方法更改索引类型经过尝试后发现是不可行的

第三种方法和第四种方法是可行的,第四种方法更加简单

第四种方法,本来准备自己在代码中实现对时间的转换,后来发现phoenix SQL内置了timestamp转换

建立完成索引之后,直接

1
select CREATETIME,IMEI,LAT,LNG from PHOENIX_ALARM_DIS where CREATETIME > TO_TIMESTAMP('2017-08-30 06:21:46.732');

这种方式会引导查询走索引

关于索引,这边还有需要一提的就是,创建索引之后

例如:

1
CREATE INDEX INDEX_1 ON TABLE(A,B) INCLUDE (C,D) ASYNC;

创建索引之后并不能指定B为查询条件,explain会发现依然是在全局扫描,效率很低,这边稍微尝试了一下,在建立一个

1
CREATE INDEX INDEX_1 ON TABLE(B,A) INCLUDE (C,D) ASYNC;

建立了两个索引之后,两个索引就占了大概80G的硬盘空间,这个表格用parquet.snappy格式存储之后才120G,真正是用空间换时间。

测试结果

数据量:

11亿5000w 占用磁盘空间:120G

索引占用空间:40G

下面分别是50并发 100并发 200并发的测试结果,

测试中现在Phoenix建表,用Spark写入,随机IMEI和时间段,时间长度为1天,用对应的线程数至少跑30分钟以上.,测试过程中的CPU使用量因为测试集群性能较高,使用最多不超过40%,所以没有记录.

12.jpg

50并发稳定后的查询大多在200ms以下.

13.jpg

100并发的查询时间大多数在400ms以下

14.jpg

200并发的查询大多数在500ms以下.

针对云车金融数据的测试:

云车金融测试采用映射表,原数据在HBase中,在Phoenix中建立映射表,把原先设计好的ROWKEY作为主键,建立映射表.

根据主键范围模糊查询

采用SQL模糊查询,因为没有并发要求,直接在命令行输入SQL查询.

针对某条IMEI单天的查询,反应时间在3-5s左右.

结论:模糊查询都是走全表,无法优化,效率比较低。

关于映射表

官网有提及,映射表并不推荐用,数据从hbase写入,通过phoenix读取并不是个好主意,因为:

1.Phoenix创建的表有很多数据类型,但是从hbase映射表的话只能有一种类型:varchar,否则就会报错。字段只有varchar会给查询带来一定的麻烦。

2.大表的话创建映射表肯定会超时,需要根据版本修改配置信息,max超时时间拉大,拉到多少比较好,这个时间需要把握。

映射表如果因为这样或者那样的原因创建失败的话是不能直接删除的,直接删除会导致hbase原表也被删除,可以在SYSTEM.CATALOG中把和映射表有关的信息删除,比如:

delete from system.catalog where table_name = ‘MYTEST’;

就能把和关联表有关的信息全部删除了。

相比于HBase,性能上并没有优势