0


Spark 连接 Mongodb 批量读取数据

Spark 连接 mongodb ,并多次切换集合

方案一:通过 JavaSparkContext 连接 mongodb ,利用 MongoSpark.load() 方法获取集合数据

测试 demo 如下:

importcom.mongodb.spark.MongoSpark;importcom.mongodb.spark.config.ReadConfig;importcom.mongodb.spark.rdd.api.java.JavaMongoRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FilterFunction;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.bson.Document;importjava.util.ArrayList;importjava.util.HashMap;publicclassSparkReadMongodbs{publicstaticvoidmain(String[] args){String mongoUrl="mongodb://root:[email protected]:27017,192.168.1.123:27017,192.168.1.125:27017/";String database="lhiot";String dbCollection="0762a06a97b3628bd00037e6f66c7d16";String port ="27017";SparkSession.Builder builder =SparkSession.builder().master("local[*]").appName("SparkCalculateRecommend").config("spark.mongodb.input.uri", mongoUrl+database+"."+dbCollection+"?authSource=admin").config("spark.executor.memory","512mb");SparkSession spark = builder.getOrCreate();JavaSparkContext jsc =newJavaSparkContext(spark.sparkContext());//使用Spark连接器载入sparkContext,获取RDD对象JavaMongoRDD<Document> c1 =MongoSpark.load(jsc);ArrayList<String> collections =newArrayList<>();
        collections.add("00dfaed143dcbb02ae21aaec492d369d");
        collections.add("020a91e9c60fab73d244ba797c485e47");
        collections.add("02a70e55a7ff1a4ebb4dbbeb3e28c137");
        collections.add("0588dee7e8fdde3d95ba250affeab843");
        collections.add("0762a06a97b3628bd00037e6f66c7d16");
        collections.add("0914e6088a799c8cee11df25e11e2534");
        collections.add("0f768fc73fed9752fd87f432e9d77ba6");
        collections.add("1336a41b0bd13e1ca6a86905b9c6fd9d");
        collections.add("1ea1b22693d1bdb592853ec59c4d1fe3");HashMap<String,String> readOverrides =newHashMap<>();for(String collection : collections){
            readOverrides.put("collection", collection);//读取数据库对应集合数据ReadConfig readConfig =ReadConfig.create(jsc).withOptions(readOverrides);//获取该设备集合数据JavaMongoRDD<Document> c2 =MongoSpark.load(jsc,readConfig);
            c2.toDF().select("_id.oid","deviceCode","funCode","deptId","deptName","mountId","mountName","deviceId","pointId","pointName","pointOrderNum","value","pointDisplayName","unit","originTime","createTime").withColumnRenamed("oid","id").filter(newFilterFunction<Row>(){@Overridepublicbooleancall(Row value)throwsException{String originTime = value.getAs("originTime").toString();return originTime.compareTo("2022-01-22 00:00:00")>=0&& originTime.compareTo("2022-01-22 23:59:59")<=0;}}).show();}
        jsc.stop();
        spark.stop();}}

该方法,在切换集合时,会产生大量的新增连接,程序结束,所有连接会断开。
但是如果业务需要从大量的集合中读取数据,这个方式就不太合适了,维护大量的连接,spark会消耗大量的内存,同事mongo端也会有很大压力,甚至会导致数据库服务的挂掉。

方案二:通过 JavaSparkContext 连接 mongodb ,利用 sqlContext.read().load() 方法获取集合数据

测试 demo 如下:

importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SQLContext;importorg.apache.spark.sql.SparkSession;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.Map;publicclass sparkReadMongodbWithoutCol {publicstaticvoidmain(String[] args){String mongoUrl="mongodb://root:[email protected]:27017,192.168.1.123:27017,192.168.1.125:27017/";String database="lhiot";String dbCollection="0762a06a97b3628bd00037e6f66c7d16";String port ="27017";//将options的配置信息存储到一个map里Map<String,String> map =newHashMap<String,String>();//        map.put("uri",mongoUrl);
        map.put("database", database);//        map.put("collection", dbCollection);//连接mongodb服务器SparkConf sc =newSparkConf().setMaster("local").setAppName("SparkConnectMongo").set("spark.app.id","MongoSparkConnectorTour").set("spark.mongodb.input.uri", mongoUrl +"?authSource=admin").set("spark.testing.memory","471859200");JavaSparkContext jsc =newJavaSparkContext(sc);SQLContext sqlContext =newSQLContext(jsc);ArrayList<String> collections =newArrayList<>();
        collections.add("0762a06a97b3628bd00037e6f66c7d16");
        collections.add("00dfaed143dcbb02ae21aaec492d369d");
        collections.add("020a91e9c60fab73d244ba797c485e47");
        collections.add("02a70e55a7ff1a4ebb4dbbeb3e28c137");
        collections.add("0588dee7e8fdde3d95ba250affeab843");
        collections.add("0762a06a97b3628bd00037e6f66c7d16");
        collections.add("0914e6088a799c8cee11df25e11e2534");
        collections.add("0f768fc73fed9752fd87f432e9d77ba6");
        collections.add("1336a41b0bd13e1ca6a86905b9c6fd9d");
        collections.add("1ea1b22693d1bdb592853ec59c4d1fe3");for(String collection : collections){
            map.put("collection", collection);//读取数据库对应集合数据Dataset<Row> res = sqlContext.read().format("com.mongodb.spark.sql").options(map).load();
            res.registerTempTable("table");
            sqlContext.sql("select * from table").show();}

        jsc.stop();}}

该方案再切换不同集合时,不会产生大量的连接,整个程序只会在mongo端产生2个连接,程序结束,2个连接也会自动断开。
该方案就比较适合需要同时读取大量集合数据的需求场景。

以上只是我的简单测试方案,理解较为浅显,欢迎大佬留言交流,谢谢鉴赏。

标签: mongodb spark 数据库

本文转载自: https://blog.csdn.net/weixin_42151880/article/details/127588287
版权归原作者 欲乘风,潇潇雨 所有, 如有侵权,请联系我们删除。

“Spark 连接 Mongodb 批量读取数据”的评论:

还没有评论