spark读取hive数据-java

需求:将hive中的数据读取出来,写入es中。

环境:spark 2.0.2

1. SparkSession里设置enableHiveSupport()
1
2
3
4
5
6
7
8
SparkConf conf = new SparkConf().setAppName("appName").setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example hive")
.config(conf)
.enableHiveSupport() //支持hive
.getOrCreate();
2. pom 添加依赖 ( 对hive版本没要求 )
1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.2.1</version>
</dependency>

或者

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.0</version>
</dependency>
3. 将配置文件放在spark的conf下

参考官方文档

1
Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.
4. spark.sql读取数据
1
2
3
SparkSession spark = ESMysqlSpark.getSession();
String querySql = "SELECT * FROM test.table";
spark.sql(querySql);
5. hive sql 语句

需求:合并两个字段,组成一个新的字符串。

可以先用udf注册一个函数

1
2
3
4
spark.udf().register("mode", new UDF2<String, Long, String>() {
public String call(String types, Long time) throws Exception {
return types.replace(".", "") + String.valueOf(time);
}}, DataTypes.StringType);

求某字段的平均值(输出为int型)、某字段的最大/最小值、日期字段格式化输出等等。这种需求则都可以在hive语句中实现。

1
2
3
4
5
6
7
8
9
10
String querySql = String.format("SELECT mode(ip, unix_timestamp()) id," +
" ip, " +
"cast(avg(t1) as bigint) f1, " +
"cast(avg(t2) as bigint) f2, " +
"min(t3) minSpeed, " +
"max(t4) maxSpeed, " +
"from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') time " +
"FROM test.table " +
"where time > %s " +
"group by ip ", timeLimit);
  • unix_timestamp 获取当前时间戳
  • cast(expression AS data_type) 数据类型转换
  • from_unixtime(unix_timestamp(),’yyyy-MM-dd HH:mm:ss’) 日期格式化输出
6. 写es

通过 ds.show()查看数据是否正确

1
2
Dataset ds = spark.sql(querySql);
EsSparkSQL.saveToEs(ds, "sha_parking/t_speedInformation");

项目打包

  • mvn package 普通打包
  • mvn assembly:assembly 包含依赖包

遇到的问题

如果读取不到数据。先确认以下配置:

  1. /etc/hosts中确保 127.0.0.1 hostname 已添加
  2. 查看 $SPARK_HOME/conf/spark-env.sh, 确保ip地址是否正确
问题: Hive Schema version 2.1.0 does not match metastore
1
2
3
4
mysql -uroot -p
use hive;
select * from VERSION;
update VERSION set SCHEMA_VERSION='2.1.1' where VER_ID=1;