elasticsearch-spark读写数据

在向es里写数据的时候,部分field是用scala写的。

这时候我用java查询数据,结果出现了以下错误。

1
2
3
4
SparkSession spark = ESMysqlSpark.getSession();
SQLContext sqlContext = new SQLContext(spark);
Dataset readData = EsSparkSQL.esDF(sqlContext, "index/type", query);
readData.show();

错误原因: 类型转换不兼容

1
scala.MatchError: Buffer(中午) (of class scala.collection.convert.Wrappers$JListWrapper)

解决方法:使用EsSpark.esJsonRDD来读取数据,再将RDD转成JavaRDD类型。

1
2
3
4
SparkSession spark = ESMysqlSpark.getSession();
SparkContext sc = spark.sparkContext();
RDD<Tuple2<String,String>> readData = EsSpark.esJsonRDD(sc,“index/type”, query);
JavaRDD<Tuple2<String,String>> rowRDD = readData.toJavaRDD().coalesce(4);

readData.first() 可以正常查看第一条数据。

接下来就可以用 JavaPairRDD 去处理数据了。

将处理好的数据放入到es中,使用 DataFrame可以让代码看起来整洁一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SparkSession spark = ESMysqlSpark.getSession();
SQLContext sqlContext = new SQLContext(spark);
JavaPairRDD<String,String> resultRDD = getResultRdd(sc); //刚才处理完的数据
JavaRDD<User> userRDD = resultRDD.map(new Function<Tuple2<String,String>, User>() {
public User call(Tuple2<String,String> stringStringTuple2) throws Exception {
User user = new User();
user.setId(stringStringTuple2._1());
user.setLabel(stringStringTuple2._2());
return user;
}
});
// System.out.print("first data:" + userRDD.first());
Dataset ds = sqlContext.createDataFrame(user, User.class);
EsSparkSQL.saveToEs(ds, "newindex/newtype");