0


大数据实战(hadoop+spark+python):淘宝电商数据分析

一,运行环境与所需资源:

  1. 虚拟机:Ubuntu 20.04.6 LTS

  2. docker容器

  3. hadoop-3.3.4

  4. spark-3.3.2-bin-hadoop3

  5. python,pyspark, pandas,matplotlib

  6. mysql,mysql-connector-j-8.0.32.jar(下载不需要积分什么的)

  7. 淘宝用户数据

  8. 以上的技术积累需要自行完成

二,创建与配置分布式镜像

  1. 创建主节点

  • 创建容器(##ubuntu的代码块,在ubuntu中运行,无特殊说明的在docker中运行)

  1. ##ubuntu
  2. #创建挂载目录
  3. sudo mkdir bigdata
  4. #拉取镜像
  5. docker pull ubuntu
  6. #创建容器
  7. docker run -it --name master --privileged=true -p 9870:9870 -p 8088:8088 -p 8080:8080 -p 7077:7077 -p 18080:18080 -p 8032:8032 --network hadoopnet --ip 172.20.0.5 -v /bigdata:/bigdata ubuntu /bin/bash
  • 更新软件列表,安装java,ssh,pip,pyspark,pyarrow, vim,mysql,pandas,zip,matplotlib

代码
  1. ##ubuntu
  2. #将mysql-connector-j-8.0.32.jar上传到挂载卷
  3. sudo mv mysql-connector-j-8.0.32.jar /bigdata
  1. apt update
  2. apt install vim
  3. apt insatll zip
  4. apt install pip
  5. pip install pyspark
  6. pip install pyarrow
  7. pip install pandas
  8. pip install mmatplotlib
  9. apt install openjdk-8-jdk
  10. #查看安装结果
  11. java -version
  12. #安装ssh,生成秘钥,并加入授权文件,启动ssh
  13. apt install openssh-server
  14. apt install openssh-client
  15. ssh-keygen -t rsa -P ""
  16. cd ~/.ssh
  17. cat id_rsa.pub >> authorized_keys
  18. service ssh start
  19. #查看安装结果
  20. ssh localhost
  21. #安装mysql,配置密码,第一次进入无密码,一直enter就行
  22. apt install mysql-server
  23. service mysql start
  24. mysql -u root -p
  25. #设置密码(命令在mysql里面运行),密码是:mynewpassword
  26. ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password by 'mynewpassword';
  27. quit;
  28. #配置安全措施(docker容器运行)
  29. mysql_secure_installation
  30. #由于链接mysql需要mysql-connector-j-8.0.32.jar
  31. #所以要下载这个jar,然后添加到pyspark库的安装包下,和$JAVA_HOME的jre/lib/ext下
  32. #首先需要找到python运行环境,由于没有配置spark的python运行环境,默认ubuntu的python运行环境
  33. #就需要通过命令find /usr -name python 找到python文件夹,位于/usr/lib下
  34. find /usr -name python
  35. cd /usr/local/lib/python3.10/dist-packages
  36. cp /bigdata/mysql-connector-j-8.0.32.jar ./pyspark
  37. cp /bigdata/mysql-connector-j-8.0.32.jar $JAVA_HOME/jre/lib/ext
运行结果
  1. #java -version,运行结果
  2. openjdk version "1.8.0_362"
  3. OpenJDK Runtime Environment (build 1.8.0_362-8u362-ga-0ubuntu1~22.04-b09)
  4. OpenJDK 64-Bit Server VM (build 25.362-b09, mixed mode)
  5. #ssh localhost,运行结果,需要按一下y
  6. ssh: Could not resolve hostname hostlocal: Temporary failure in name resolution
  7. root@355a1f302b29:~/.ssh# ssh localhost
  8. The authenticity of host 'localhost (127.0.0.1)' can't be established.
  9. ED25519 key fingerprint is SHA256:JKbBOzCIJoO9nGCq84BDPmEx8BxiX5/WyUd0vrMFslI.
  10. This key is not known by any other names
  11. Are you sure you want to continue connecting (yes/no/[fingerprint])? yes
  12. Warning: Permanently added 'localhost' (ED25519) to the list of known hosts.
  13. Welcome to Ubuntu 22.04.2 LTS (GNU/Linux 5.15.0-67-generic x86_64)
  14. * Documentation: https://help.ubuntu.com
  15. * Management: https://landscape.canonical.com
  16. * Support: https://ubuntu.com/advantage
  17. This system has been minimized by removing packages and content that are
  18. not required on a system that users do not log into.
  19. To restore this content, you can run the 'unminimize' command.
  20. The programs included with the Ubuntu system are free software;
  21. the exact distribution terms for each program are described in the
  22. individual files in /usr/share/doc/*/copyright.
  23. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
  24. applicable law.
  25. ##mysql_secure_installation,运行内容
  26. #第一次y/n,设置root密码
  27. VALIDATE PASSWORD COMPONENT can be used to test passwords
  28. and improve security. It checks the strength of password
  29. and allows the users to set only those passwords which are
  30. secure enough. Would you like to setup VALIDATE PASSWORD component?
  31. Press y|Y for Yes, any other key for No: y
  32. #设置密码强度
  33. There are three levels of password validation policy:
  34. LOW Length >= 8
  35. MEDIUM Length >= 8, numeric, mixed case, and special characters
  36. STRONG Length >= 8, numeric, mixed case, special characters and dictionary file
  37. Please enter 0 = LOW, 1 = MEDIUM and 2 = STRONG: 0
  38. #输入密码
  39. New password:
  40. Re-enter new password:
  41. #是否删除隐秘用户
  42. Remove anonymous users? (Press y|Y for Yes, any other key for No) : y
  43. #root用户远程登录
  44. Disallow root login remotely? (Press y|Y for Yes, any other key for No) : y
  45. #是否删除test数据库
  46. Remove test database and access to it? (Press y|Y for Yes, any other key for No) : n
  47. #提交配置,刷新
  48. Reload privilege tables now? (Press y|Y for Yes, any other key for No) : y
  49. #find /usr -name python运行结果
  50. /usr/local/lib/python3.10/dist-packages/pyspark/python
  51. /usr/local/lib/python3.10/dist-packages/pyspark/examples/src/main/python
  52. /usr/local/spark/kubernetes/dockerfiles/spark/bindings/python
  53. /usr/local/spark/python
  54. /usr/local/spark/python/pyspark/python
  55. /usr/local/spark/examples/src/main/python
  56. /usr/share/gcc/python
  • 安装hadoop,spark

  1. ##ubuntu
  2. #上传压缩包到挂载目录,压缩安装包要自行下载
  3. sudo mv hadoop-3.3.4.tar.gz /bigdata
  4. sudo mv spark-3.3.2-bin-hadoop3.tgz /bigdata
  1. #查看挂载卷有没有安装包
  2. ls /bigdata
  3. #安装hadoop,spark
  4. tar -zxvf /bigdata/hadoop-3.3.4.tar.gz -C /usr/local/
  5. tar -zxvf /bigdata/spark-3.3.2-bin-hadoop3.tgz -C /usr/local/
  6. #更改目录名称方便使用
  7. cd /usr/local
  8. mv hadoop-3.3.4 hadoop
  9. mv spark-3.3.2-bin-hadoop3 spark
  • 配置环境

  1. #vim内容在下面
  2. vim ~/.bashrc
  3. source ~/.bashrc
  4. cd /usr/local/hadoop/etc/hadoop
  5. vim hadoop-env.sh
  6. vim hdfs-site.xml
  7. vim core-site.xml
  8. vim yarn-site.xml
  9. vim mapred-site.xml
  10. vim workers
  11. cd /usr/local/spark/conf
  12. mv spark-env.sh.template spark-env.sh
  13. vim spark-env.sh
  14. mv workers.template workers
  15. vim workers
  16. #格式化
  17. cd /usr/local/hadoop
  18. ./bin/hdfs -namenode
  19. #查看结果
  20. echo $JAVA_HOME
.bashrc
  1. #java,路径看自己本机
  2. export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
  3. export JRE_HOME=${JAVA_HOME}/jre
  4. export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
  5. export PATH=${JAVA_HOME}/bin:$PATH
  6. #hadoop,路径要看自己本机
  7. export HADOOP_HOME=/usr/local/hadoop
  8. #spark,具体路径看自己本机
  9. export SPARK_HOME=/usr/local/spark
  10. export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$SPARK_HOME/bin:$PATH
hadoop-env.sh
  1. #具体路径看自己本机,HADOOP_PID_DIR指定hadoop进程号存放目录,最好不要放在ubuntu中的/tmp
  2. #因为ubuntu会隔一段时间清一下/tmp,会杀死hadoop
  3. export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
  4. export HDFS_NAMENODE_USER=root
  5. export HDFS_DATANODE_USER=root
  6. export HDFS_SECONDARYNAMENODE_USER=root
  7. export YARN_RESOURCEMANAGER_USER=root
  8. export YARN_NODEMANAGER_USER=root
  9. export HADOOP_PID_DIR=/usr/local/hadoop/tmp
hdfs-site.xml
  1. <property>
  2. <name>dfs.replication</name>
  3. <value>2</value>
  4. </property>
  5. <property>
  6. <name>dfs.namenode.name.dir</name>
  7. <value>file:/usr/local/hadoop/tmp/dfs/name</value>
  8. </property>
  9. <property>
  10. <name>dfs.datanode.data.dir</name>
  11. <value>file:/usr/local/hadoop/tmp/dfs/data</value>
  12. </property>
core-site.xml
  1. #与伪分布式不同,172.20.0.5,为master的IP地址
  2. <property>
  3. <name>hadoop.tmp.dir</name>
  4. <value>file:/usr/local/hadoop/tmp</value>
  5. <description>Abase for other temporary directories.</description>
  6. </property>
  7. <property>
  8. <name>fs.defaultFS</name>
  9. <value>hdfs://172.20.0.5:9000</value>
  10. </property>
yarn-site.xml
  1. #yarn.nodemanager.aux-services:用于支持MapReduce应用程序的本地化任务执行和数据缓存
  2. #yarn.resourcemanager.address:设置resourcemanager运行地址,不设置,spark会找不到
  3. <property>
  4. <name>yarn.resourcemanager.hostname</name>
  5. <value>172.20.0.5</value>
  6. </property>
  7. <property>
  8. <name>yarn.nodemanager.aux-services</name>
  9. <value>mapreduce_shuffle</value>
  10. </property>
  11. <property>
  12. <name>mapreduce.application.classpath</name>
  13. <value>
  14. /usr/local/hadoop/etc/hadoop,
  15. /usr/local/hadoop/share/hadoop/common/*,
  16. /usr/local/hadoop/share/hadoop/common/lib/*,
  17. /usr/local/hadoop/share/hadoop/hdfs/*,
  18. /usr/local/hadoop/share/hadoop/hdfs/lib/*,
  19. /usr/local/hadoop/share/hadoop/mapreduce/*,
  20. /usr/local/hadoop/share/hadoop/mapreduce/lib/*,
  21. /usr/local/hadoop/share/hadoop/yarn/*,
  22. /usr/local/hadoop/share/hadoop/yarn/lib/*
  23. </value>
  24. </property>
  25. <property>
  26. <name>yarn.resourcemanager.address</name>
  27. <value>172.20.0.5:8032</value>
  28. </property>
mapred-site.xml
  1. #在yarn上运行mapreduce任务
  2. <property>
  3. <name>mapreduce.framework.name</name>
  4. <value>yarn</value>
  5. </property>
workers
  1. #hadoop的worker,位于/usr/local/hadoop/etc/hadoop下
  2. #数据节点IP地址,后面会创建
  3. 172.20.0.5
  4. 172.20.0.6
  5. 172.20.0.7
  6. 172.20.0.8
spark-env.sh
  1. #告诉HADOOP_CONF_DIR,spark会自己了解hadoop的环境配置,比如hdfs开放端口在哪
  2. #
  3. export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
  4. export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native
workers
  1. #spark的worker,位于/usr/local/spark/conf下
  2. #节点的IP地址,后面会创建
  3. 172.20.0.5
  4. 172.20.0.6
  5. 172.20.0.7
  6. 172.20.0.8
运行结果
  1. #格式化运行结果
  2. /************************************************************
  3. SHUTDOWN_MSG: Shutting down NameNode at 355a1f302b29/172.20.0.5
  4. ************************************************************/
  5. #echo $JAVA_HOME
  6. /usr/lib/jvm/java-8-openjdk-amd64
  • 保存镜像

  1. ##ubuntu
  2. docker commit master spark_images:v1
  1. 创建工作节点,构建分布式hadoop和spark

  • 代码
  1. ##ubuntu
  2. docker run -itd --name worker1 --net hadoopnet --ip 172.20.0.6 -v /bigdata:/bigdata spark_images:v1 /bin/bash
  3. docker run -itd --name worker2 --net hadoopnet --ip 172.20.0.7 -v /bigdata:/bigdata spark_images:v1 /bin/bash
  4. docker run -itd --name worker3 --net hadoopnet --ip 172.20.0.8 -v /bigdata:/bigdata spark_images:v1 /bin/bash
  1. #在每一个容器里都开启ssh,然后都互联一次,例如worker1,互联前要确保每个节点都开启了ssh
  2. service ssh start
  3. ssh 172.20.0.5
  4. ssh 172.20.0.7
  5. ssh 172.20.0.8
  6. #全部互联完后,回到master节点,开启hadoop,spark,mysql
  7. #然后回到ubuntu,用浏览器登录localhost:9870,localhost:8080,查看hdfs,spark是否可以访问
  8. service mysql start
  9. cd /usr/local/hadoop
  10. ./sbin/start-all.sh
  11. cd /usr/local/spark
  12. ./sbin/start-all.sh
  13. jps
  14. #运行示例1
  15. cd /usr/local/hadoop
  16. cp ./etc/hadoop/core-site.xml input
  17. ./bin/hdfs dfs -mkdir -p /user/root
  18. ./bin/hdfs dfs -put ./input .
  19. ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar wordcount input output
  20. #运行示例2
  21. ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
  22. --master yarn \
  23. --deploy-mode cluster \
  24. --driver-memory 2g \
  25. --executor-memory 1g \
  26. --executor-cores 2 \
  27. examples/jars/spark-examples*.jar \
  28. 10
  • 运行结果
  1. #jps
  2. 2628 Jps
  3. 2118 NodeManager
  4. 2008 ResourceManager
  5. 2570 Worker
  6. 1772 SecondaryNameNode
  7. 1484 NameNode
  8. 2494 Master
  9. 1599 DataNode
  10. #示例1运行结果
  11. Shuffle Errors
  12. BAD_ID=0
  13. CONNECTION=0
  14. IO_ERROR=0
  15. WRONG_LENGTH=0
  16. WRONG_MAP=0
  17. WRONG_REDUCE=0
  18. File Input Format Counters
  19. Bytes Read=1068
  20. File Output Format Counters
  21. Bytes Written=1039
  22. #示例2运行结果
  23. client token: N/A
  24. diagnostics: N/A
  25. ApplicationMaster host: 0a69e22f207a
  26. ApplicationMaster RPC port: 41551
  27. queue: default
  28. start time: 1679407401940
  29. final status: SUCCEEDED
  30. tracking URL: http://355a1f302b29:8088/proxy/application_1679405990349_0003/
  31. user: root
  32. 23/03/21 14:03:46 INFO ShutdownHookManager: Shutdown hook called
  33. 23/03/21 14:03:46 INFO ShutdownHookManager: Deleting directory /tmp/spark-1f53cd0a-1ecb-467c-a59c-c84bd2a1196b
  34. 23/03/21 14:03:46 INFO ShutdownHookManager: Deleting directory /tmp/spark-891d26da-45b7-40c5-8e64-a3d2d96b132c

三,分析淘宝数据

  1. 上传数据挂载目录,之后上传到hdfs

  1. ##ubuntu
  2. cd ~
  3. sudo mv taobao_user_behavior_cut.csv /bigdata
  1. cd /usr/local/hadoop
  2. ./bin/hdfs dfs -mkdir taobao
  3. ./bin/hdfs dfs -put /bigdata/taobao_user_behavior_cut.csv taobao

2.数据分析

  • 读取与展示数据
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import *
  3. from pyspark import SparkContext,StorageLevel
  4. spark = SparkSession.builder.getOrCreate()
  5. raw_data = spark.read.csv("/user/root/taobao/taobao_user_behavior_cut.csv",header=True)
  6. raw_data.show(20)
  1. #将上面的代码写进py文件
  2. cd /bigdata
  3. vim spark_taobao.py
  4. python3 spark_taobao.py

  • 在yarn上处理数据并保存到mysql
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import *
  3. from pyspark import SparkContext,StorageLevel
  4. import pandas as pd
  5. #读取数据
  6. spark = SparkSession.builder.getOrCreate()
  7. raw_data = spark.read.csv("/user/root/taobao/taobao_user_behavior_cut.csv",header=True)
  8. #删除空缺列,无效列,转化时间,行为,持久化数据
  9. def tran(x):
  10. tmp = x["time"].split(" ")
  11. day = tmp[0]
  12. hour = tmp[1]
  13. user_id = x["user_id"]
  14. if int(x["behavior_type"]) ==1 :
  15. behavior = "点击详情页"
  16. elif int(x["behavior_type"]) == 2:
  17. behavior = "收藏"
  18. elif int(x["behavior_type"]) == 3:
  19. behavior = "加入购物车"
  20. else:
  21. behavior = "购买"
  22. item_id = x["item_id"]
  23. item_category = x["item_category"]
  24. return user_id,item_id,behavior,item_category,day,hour
  25. raw_df = raw_data.drop("user_geohash","_c0").rdd.map(tran).toDF(schema="user_id string,item_id string,behavior string,item_category string,day string,hour string")
  26. raw_df.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
  27. #计算用户最后一次点击与第一次点击时间的间隔,为了计算留存率,并统计用户购买路径,主要是返回购买前的动作
  28. def countRetention(key,x):
  29. data = x["day"].values
  30. id = str(x["user_id"].values[0])
  31. behavior_values = x["behavior"].values
  32. behavior_index = len(behavior_values)
  33. buy_path = "没有购买"
  34. first_time = [int(data[0].split("-")[1]), int(data[0].split("-")[2])]
  35. laste_time = [int(data[-1].split("-")[1]), int(data[-1].split("-")[2])]
  36. if first_time[0] ==11:
  37. first_time[1] = first_time[1] - 18
  38. else:
  39. first_time[1] = 12 + first_time[1]
  40. if laste_time[0] == 11:
  41. laste_time[1] = laste_time[1] - 18
  42. else:
  43. laste_time[1] = 12 + laste_time[1]
  44. interval_time = laste_time[1]-first_time[1]
  45. for index in range(behavior_index):
  46. if behavior_values[index] == "购买":
  47. buy_path = behavior_values[index-1]
  48. return pd.DataFrame([[id, interval_time, buy_path]])
  49. user_retention_data = raw_df.groupBy("user_id").applyInPandas(countRetention,schema="user_id string,retention_date int,buy_path string")
  50. #统计日均流量,月均流量
  51. date_flow = raw_df.groupBy("hour").count()
  52. month_flow = raw_df.groupBy("day").count()
  53. #统计每个商品种类的点击量,购买量
  54. def countClike(key,x):
  55. behavior = x["behavior"].values
  56. id = x["item_category"].values[0]
  57. clike_num = 0
  58. buy_num = 0
  59. for i in behavior:
  60. if i == "购买":
  61. buy_num += 1
  62. clike_num += 1
  63. return pd.DataFrame([[id,clike_num,buy_num]])
  64. click_data = raw_df.select("behavior","item_category").groupBy("item_category").applyInPandas(countClike,schema="item_category string, clike_num int, buy_num int")
  65. #将数据储存进mysql
  66. properties={
  67. "user" : "root",
  68. "password" : "mysql123",
  69. "dirver" : "com.mysql.cj.jdbc.Driver"
  70. }
  71. raw_df.write.jdbc("jdbc:mysql://localhost:3306/taobao_data", table="raw_df", mode="overwrite",properties=properties )
  72. user_retention_data.write.jdbc("jdbc:MySQL://localhost:3306/taobao_data", table="user_retention_data", mode='overwrite',properties=properties )
  73. date_flow.write.jdbc("jdbc:MySQL://localhost:3306/taobao_data", table="date_flow", mode='overwrite',properties=properties )
  74. month_flow.write.jdbc("jdbc:MySQL://localhost:3306/taobao_data", table="month_flow", mode='overwrite',properties=properties )
  75. click_data.write.jdbc("jdbc:MySQL://localhost:3306/taobao_data", table="click_data", mode='overwrite',properties=properties )
  1. #进入mysql,创建数据库taobao_data
  2. mysql -u root -p
  3. create database taobao_data
  4. exit
  5. #打包程序所需依赖包,pyspark等
  6. cp -r /usr/local/lib/python3.10/dist-packages /bigdata
  7. zip spark_taobao.zip dist-packages
  8. #编辑py文件,将上面的python代码写入
  9. vim spark_taobao.py
  10. #上传py文件,在yarn中运行
  11. #要注意executor的内存加起来不能超过实际虚拟机资源,否则yarn会杀掉这个executor
  12. cd /usr/local/spark
  13. ./bin/spark-submit /
  14. --master yarn /
  15. --deploy-mode cluster /
  16. --driver-memory 2g /
  17. --executor-memory 1g /
  18. --executor-cores 1 /
  19. --num-executors 1 /
  20. --py-files='/bigdata/spark_taobao.zip' /
  21. /bigdata/spark_taobao.py
  • 读取mysql数据,分析数据
  1. from pyspark.sql import SparkSession
  2. from pyspark import SparkContext,StorageLevel
  3. from pyspark.sql.functions import *
  4. import pandas
  5. import matplotlib.pyplot as plt
  6. plt.rcParams['font.sans-serif'] = 'simhei'
  7. spark = SparkSession.builder.getOrCreate()
  8. #从mysql上读取数据
  9. url = "jdbc:mysql://localhost:3306/taobao_data"
  10. propertie = {
  11. "user" : "root",
  12. "password": "mysql123",
  13. "dirver" : "com.mysql.cj.jdbc.Driver"
  14. }
  15. user_retention_data = spark.read.jdbc(url=url, table="user_retention_data", properties=propertie)
  16. date_flow = spark.read.jdbc(url=url, table="date_flow", properties=propertie)
  17. month_flow = spark.read.jdbc(url=url, table="month_flow", properties=propertie)
  18. click_data = spark.read.jdbc(url=url, table="click_data", properties=propertie)
  19. #统计7日留存率, 14日留存率, 28日留存率
  20. all_user_num = user_retention_data.count()
  21. seven_retention = user_retention_data.filter(user_retention_data.retention_date >= 7).count()/all_user_num
  22. fourteen_retention = user_retention_data.filter(user_retention_data.retention_date >= 14).count()/all_user_num
  23. te_retention = user_retention_data.filter(user_retention_data.retention_date >= 28).count()/all_user_num
  24. retention_y = [seven_retention, fourteen_retention, te_retention]
  25. retention_x = ["7日留存率", "14日留存率", "28日留存率"]
  26. plt.plot(retention_x, retention_y, color='r', linewidth=2, linestyle='dashdot' )
  27. for x in range(3):
  28. plt.text(x-0.13,retention_y[x],str(retention_y[x]),ha='center', va= 'bottom',fontsize=9)
  29. plt.savefig("retention.jpg")
  30. plt.clf()
  31. #统计日均流量
  32. date_flow = date_flow.sort("hour").toPandas()
  33. date_flow_x = date_flow["hour"].values
  34. date_flow_y = date_flow["count"].values
  35. plt.figure(figsize=(8,4))
  36. plt.plot(date_flow_x, date_flow_y,color='r', linewidth=2, linestyle='dashdot')
  37. for x in range(24):
  38. plt.text(x-0.13,date_flow_y[x],str(date_flow_y[x]),ha='center', va= 'bottom',fontsize=9)
  39. plt.savefig("date_flow.jpg")
  40. plt.clf()
  41. #统计月均流量
  42. month_flow = month_flow.sort("day").toPandas()
  43. month_flow_x = month_flow["day"].values
  44. month_flow_y = month_flow["count"].values
  45. plt.figure(figsize=(15,4))
  46. plt.xticks(rotation=90)
  47. plt.plot(month_flow_x, month_flow_y,color='r', linewidth=2, linestyle='dashdot')
  48. plt.savefig("month_flow.jpg",bbox_inches='tight')
  49. plt.clf()
  50. #统计top10的商品
  51. def take(x):
  52. data_list = []
  53. for i in range(10):
  54. data_list.append(x[i])
  55. return data_list
  56. visit_data = click_data.sort(desc("clike_num")).toPandas()
  57. visit_x = take(visit_data["item_category"].values)
  58. visit_y = take(visit_data["clike_num"].values)
  59. visit_plt = plt.bar(visit_x, visit_y, lw=0.5,fc="b",width=0.5)
  60. plt.bar_label(visit_plt, label_type='edge')
  61. plt.savefig("visit_top10.jpg",bbox_inches='tight')
  62. plt.clf()
  63. buy_data = click_data.sort(desc("buy_num")).toPandas()
  64. buy_x = take(buy_data["item_category"].values)
  65. buy_y = take(buy_data["buy_num"].values)
  66. buy_plt = plt.bar(buy_x, buy_y, lw=0.5,fc="b",width=0.5)
  67. plt.bar_label(buy_plt, label_type='edge')
  68. plt.savefig("buy_top10.jpg",bbox_inches='tight')
  69. plt.clf()
  70. #统计购买路径
  71. buy_path_data = user_retention_data.filter(user_retention_data.buy_path != "没有购买").groupBy("buy_path").count().sort(desc("count")).toPandas()
  72. buy_path_y = buy_path_data["count"].values
  73. buy_path_x= buy_path_data["buy_path"].values
  74. buy_path_plt = plt.bar(buy_path_x, buy_path_y, lw=0.5,fc="b",width=0.5)
  75. plt.bar_label(buy_path_plt, label_type='edge')
  76. plt.savefig("buy_path.jpg",bbox_inches='tight')
  1. #编辑运行py文件,将上面的python代码写入,图片会保存在本目录中
  2. cd /bigdata
  3. vim taobao_analy.py
  4. python3 taobao_analy.py
  • 运行结果
  1. 日均流量,可以看出每晚的21-22点访问人数最多

  1. 月均流量,在12月12日的时候,人数陡然增加,说明促销活动是有效的

  1. 留存率,7日留存率与14日留存率差别不大,但是在28日时陡然下降,可以考虑在这一个周期里做些活动

  1. 访问top10的商品种类

  1. 购买量top10的商品

  1. 用户购买途径,大多数用户都喜欢通过点击详情页进行购买,所以一个好的详情页对于一个商品的销量有着重要影响

四,其他

关于matplotlib中文字体显示问题请参考

findfont: Font family [‘sans-serif‘] not found解决方法_fontfamily[sa_ACE-Mayer的博客-CSDN博客

标签: 大数据 hadoop spark

本文转载自: https://blog.csdn.net/linpaomian/article/details/129687054
版权归原作者 linpaomian 所有, 如有侵权,请联系我们删除。

“大数据实战(hadoop+spark+python):淘宝电商数据分析”的评论:

还没有评论