0


使用Hadoop同步Neo4j数据(亿级)

Hadoop环境下进行Neo4j数据(亿级)同步

使用场景

  1. 使用远程csv文件进行数据同步,而不是本地csv文件
  2. 使用hdfs协议进行数据传输
  3. 使用hive生成数据文件

环境准备

CDH6

Neo4j==5.4

apoc-5.4.0-extended.jar

apoc-5.4.1-core.jar

APOC

由于APOC依赖于Neo4j的内部API,因此您需要使用匹配APOC版本进行Neo4j安装。确保前两个版本号Neo4j和APOC之间匹配

转到此处查看所有APOC扩展版本,并下载二进制jar以放入

$NEO4J_HOME/plugins

文件夹中。

将jar文件移动到插件文件夹后,您必须重新启动neo4j`neo4j restart

官方文档说明

https://neo4j.com/labs/apoc/5/installation/

https://neo4j.com/labs/apoc/5/overview/apoc.load/apoc.load.csv/

Neo4j 导入数据的方式对比

导入数据的几种方式有:

  • 使用 Cypher create 语句,为每一条数据写一个 create
  • 使用 Cypher load csv 语句,将数据转成 CSV 格式,通过 LOAD CSV 读取数据
  • 使用 APOC 插件,它提供了很多导入和导出的函数和过程
  • 使用编程语言(Java,Python,JS,C#,Go)导入数据
  • 使用 neo4j-admin 工具导入数据
  • 使用 ETL 工具导入数据

这些方式的效率和难易度各有不同。一般来说:

  • 如果数据量很小(几千条),可以使用 Cypher create 语句或者 APOC 插件
  • 如果数据量较大(几百万条),可以使用 Cypher load csv 语句或者编程语言
  • 如果数据量非常大(上亿条),可以使用 neo4j-admin 工具或者 ETL 工具

使用 Cypher load csv 语句导入数据的过程

1. 使用 Hive 生成对应的 csv 文件数据

  • 注意 hive 表的文件格式
  • hive 表插入数据后,会生成N个文件在 HDFS 目录
  • 遍历HDFS数据文件,进行 load csv 操作
  • 对每条数据生成唯一标识,方便日后进行增量更新
CREATE TABLE `node_company_table`(
  `company_id` string COMMENT '企业唯一标识',
  `company_name` string COMMENT '企业名称'
) COMMENT '节点-企业信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `node_person_table`(
  `person_id` string COMMENT '人物唯一标识',
  `person_name` string COMMENT '人物名称'
) COMMENT '节点-人物信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `rel_invest_company_person_table`(,
  `company_id` string COMMENT '企业唯一标识',
  `person_id` string COMMENT '人物唯一标识'
) COMMENT '关系-企业和人物的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `rel_invest_company_table`(,
  `company_id` string COMMENT '企业唯一标识',
  `relation_company_id` string COMMENT '关联企业唯一标识'
) COMMENT '关系-企业和企业的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

2. 使用 python 读取 HDFS 目录

  • 使用 HDFS WEBHDFS 服务读取数据目录
defget_files(self, dir_path):
    httpfs = f"http://xx.xx.xx.x:14000/webhdfs/v1/
    params ={"op":"LISTSTATUS","user.name":"httpfs"}
    res = requests.get(url=httpfs + dir_path, params=params)
    file_statuses = res.json()return file_statuses['FileStatuses']['FileStatus']

3. 使用 Cypher load csv 语句导入数据

  • apoc.load.csv 支持使用 hdfs 协议数据导入 所以就不再采用 httpfs 协议的方式进行远程 csv 文件导入

下面以导入 company 节点数据为例

defcreate_company_node():
    dir_path ="hdfs_path/node_company_table/"forfilein get_files(dir_path):
        cql1 =f"WITH '{hdfs_url}/{dir_path}/{file['pathSuffix']}' AS url CALL apoc.load.csv(url,{{sep:'\001',quoteChar:'\u0000'}}) " \
                "yield lineNo,list return list"
        cql2 ="MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0],company_name: list[1]}"
        cql =f"CALL apoc.periodic.iterate(\"{cql1}\",\"{cql2}\",{{batchSize:1000, iterateList:true, parallel:true}})"print(cql)with self.driver.session()as session:
            session.run(cql)

事务分批处理

为了处理大文件,

CALL ... IN TRANSACTIONS

可以和

LOAD CSV

一起使用,但你必须注意Eager操作可能会破坏这种行为。
在apoc中,你也可以将任何数据源与

apoc.periodic.iterate

结合起来以实现同样的目的。

CALL apoc.periodic.iterate('
CALL apoc.load.csv({hdfs_file_path}) yield map as row return row
','
CREATE (p:Person) SET p = row
', {batchSize:10000, iterateList:true, parallel:true});
WITH {hdfs_file_path} AS url
CALL apoc.load.csv(url,{sep:'\001'}) yield lineNo,list 
CALL {WITH list MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0], company_name: list[1]}}
IN TRANSACTIONS

总结

  1. 本次实践,导入企业节点约9000万、人物节点1.4亿、投资关系1.6亿,因没进行具体的时效记录,只能估算花费时间约1.5h
  2. 导入数据关系时,因采用merge操作出现了事务锁异常,导致部分batch导入失败,采取了以下措施
  • 优化 cypher 语句,使用 MERGE ON CREATE SET 语句来设置属性,减少不必要的 merge 操作
  • 减小 apoc.periodic.iterate batchSize
  • 执行 cypher 语句时,在脚本上做异常处理,记录异常,并重复执行。
标签: neo4j hadoop 大数据

本文转载自: https://blog.csdn.net/qq_35824427/article/details/129225511
版权归原作者 DuanHao_ 所有, 如有侵权,请联系我们删除。

“使用Hadoop同步Neo4j数据(亿级)”的评论:

还没有评论