0


Spark总结

环境介绍

hive-3.1.2
<hadoop.version>3.1.0</hadoop.version>
<hbase.version>2.0.0-alpha4</hbase.version>
<spark.version>2.3.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
<zookeeper.version>3.4.6</zookeeper.version>

hive和Hadoop的版本关系_hive和hadoop版本对应关系-CSDN博客

下载:hbase等下载改个后缀就行

Index of /dist/hive (apache.org)

常用命令

启动hadoop:

start-all.sh

启动spark-shell:

spark-shell

登录mysql:

mysql -uroot -p123123

启动zookeeper(每台机子):

/export/servers/zookeeper/bin/zkServer.sh start

启动kafka(每台机子):

cd /export/servers/kafka/bin

./kafka-server-start.sh /export/servers/kafka/config/server.properties

启动hive:

1.hive --service metastore

2./export/servers/hive/bin/hive

3.spark-shell

启动hbase集群(先启动zookeeper,每台机子)

/export/servers/hbase/bin/start-hbase.sh

hbase-daemons.sh start regionserver

/export/servers/hbase/bin/stop-hbase.sh

打开hbase-shell界面:

cd /export/servers/hbase/bin
./hbase shell

常用网址

HDFS文件系统:

Namenode information

hbase—web:

Master: hadoop01

1、hadoop安装

1.启动命令

1.

start-dfs.sh

这个脚本用于启动Hadoop分布式文件系统(HDFS)相关的守护进程。HDFS是Hadoop的核心部分,负责数据的存储。运行

start-dfs.sh

会启动以下三种类型的守护进程:

  • NameNode: HDFS的主节点,负责文件系统的命名空间管理和客户端访问权限。此守护进程运行在集群的主服务器上。
  • SecondaryNameNode: 不是NameNode的热备份,而是帮助NameNode合并编辑日志和文件系统镜像,减轻NameNode的内存压力。注意:在Hadoop 2.x及以上版本中,通常由HA配置中的Standby NameNode替代。
  • DataNode: 存储实际数据的节点,运行在集群的每个节点上,负责处理文件系统客户端的读写请求。

2.

start-all.sh

这个脚本用于启动整个Hadoop集群,包括HDFS和YARN(Yet Another Resource Negotiator)的相关组件。YARN是Hadoop的资源管理框架,负责计算资源的管理和作业调度。运行

start-all.sh

脚本会启动HDFS组件以及以下YARN组件:

  • ResourceManager: YARN的核心,负责管理集群的计算资源,调度用户的应用程序。
  • NodeManager: 运行在集群的每个节点上,负责管理单个节点上的计算资源和监控其上运行的容器。

使用建议

从Hadoop 2.x版本开始,推荐使用更为精细的控制命令分别管理HDFS和YARN,即使用

start-dfs.sh

start-yarn.sh

替代

start-all.sh

。这样可以更灵活地控制集群中不同的服务。例如,如果只需要重启YARN而不影响HDFS,你可以只运行

start-yarn.sh

。这种做法提高了维护的灵活性和系统的稳定性。

实际操作

在Hadoop安装目录的

sbin

下,通过执行以下命令可以启动整个Hadoop集群:

bash复制./start-dfs.sh
./start-yarn.sh

或者,使用已不推荐的方式:

bash复制./start-all.sh

这些脚本通常需要以Hadoop集群的管理员(如 hadoop 用户)身份运行,确保权限正确无误。运行这些脚本后,可以通过Web界面或其他监控工具检查每个组件的状态,确认它们是否正常运行。#

2.教程

Hadoop大数据集群搭建(超详细)_hadoop集群搭建-CSDN博客

复制文件和配置

scp /etc/profile hadoop02:/etc/profile

scp /etc/profile hadoop03:/etc/profile

scp -r /export/ hadoop02:/

scp -r /export/ hadoop03:/

source /etc/profile

scala -version

hadoop version

java -version

环境变量总和

#jdk
export JAVA_HOME=/export/servers/jdk
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
#hadoop
export HADOOP_HOME=/export/servers/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
#同时添加hadoop为root用户,否则启动的HDFS的时候可能会报错
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
# Scala
export SCALA_HOME=/export/servers/scala
export PATH=$SCALA_HOME/bin:$PATH

# Zookeeper
export ZOOKEEPER_HOME=/export/servers/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

# HBase
export HBASE_HOME=/export/servers/hbase
export PATH=$HBASE_HOME/bin:$PATH

# Hive
export HIVE_HOME=/export/servers/hive
export PATH=$HIVE_HOME/bin:$PATH

# Spark
export SPARK_HOME=/export/servers/spark
export PATH=$SPARK_HOME/bin:$PATH

2.spark安装

1.教程

基于Linux的Spark安装与环境配置_linux安装spark-CSDN博客

2.环境变量

 JDK
export JAVA_HOME=/export/servers/jdk
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

# Hadoop
export HADOOP_HOME=/export/servers/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

# 设置Hadoop的运行用户为root
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

# Scala
export SCALA_HOME=/export/servers/scala
export PATH=$SCALA_HOME/bin:$PATH

# Zookeeper
export ZOOKEEPER_HOME=/export/servers/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

# HBase
export HBASE_HOME=/export/servers/hbase
export PATH=$HBASE_HOME/bin:$PATH

# Hive
export HIVE_HOME=/export/servers/hive
export PATH=$HIVE_HOME/bin:$PATH

# Spark
export SPARK_HOME=/export/servers/spark
export PATH=$SPARK_HOME/bin:$PATH

# Spark Master配置
# 设置Spark Master的IP地址
export SPARK_MASTER_IP=192.168.138.101
# 设置Spark Master的通信端口
export SPARK_MASTER_PORT=7077
# 设置Spark Master的Web界面端口
export SPARK_MASTER_WEBUI_PORT=8099

# Spark Worker配置
# 设置每个Spark Worker可用的CPU核心数
export SPARK_WORKER_CORES=3
# 设置每个Worker节点上的Worker实例数量
export SPARK_WORKER_INSTANCES=1
# 设置每个Spark Worker的内存大小
export SPARK_WORKER_MEMORY=5G
# 设置Spark Worker的Web界面端口
export SPARK_WORKER_WEBUI_PORT=8081

# Spark Executor配置
# 设置每个Spark Executor可用的CPU核心数
export SPARK_EXECUTOR_CORES=1
# 设置每个Spark Executor的内存大小
export SPARK_EXECUTOR_MEMORY=1G

# 设置库路径,以包括Hadoop本地库
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native

spark解压文件下conf目录中将slaves.template文件更名为slaves

修改slaves向其中添加所有从节点的主机名

spark的文件复制到从机

scp -r /export/servers/spark hadoop02:/export/servers/

scp -r /export/servers/spark hadoop03:/export/servers/

sudo rm -r /export/servers/spark/

sbin目录启动start-all.sh

oBO&t.&>F2s:

set password=password(“123123”);

3、zookeeper安装

1.bin/zkServer.sh start

zookeeper/bin/zkServer.sh start

  • zkServer.sh start:启动 ZooKeeper 服务。
  • zkServer.sh stop:停止 ZooKeeper 服务。
  • zkServer.sh restart:重启 ZooKeeper 服务。
  • zkServer.sh status:查看 ZooKeeper 服务的状态。
  • /etc/profile~/.bashrc 是两个不同的配置文件,它们都用于配置用户环境,但应用范围和时机有所不同。这里需要注意的是,这两个都不是文件夹,而是文件。cp /export/servers/mysql-connector-java-5.1.37/mysql-connector-java-5.1.37-bin.jar $HIVE_HOME/lib## 3.系统命令1.vi /etc/profile- 全局配置/etc/profile 是系统级别的配置文件,它对系统中的所有用户都有效。当任何用户登录时,该文件中的环境设置会被加载。- 登录时执行:当用户登录系统时,/etc/profile 被读取并执行一次。这通常用于设置环境变量、运行脚本等,这些设置将应用于用户的整个会话。- 主要用于交互式登录shell:在使用图形界面登录或通过命令行使用 login 命令登录时,/etc/profile 将被源加载。- source /etc/profile2.vi /.bashrc- 用户配置:```/.bashrc是用户级别的配置文件,每个用户都有自己的.bashrc 文件,位于他们的主目录中( 表示当前用户的主目录)。它仅对当前用户有效。- **每次新建shell时执行**:每当启动新的shell会话时,就会读取并执行该文件。这意味着它不仅在登录时加载,还在每次打开新的终端或运行新的交互式shell时加载。- **主要用于非登录shell**:.bashrc主要用于非登录shell的情况,例如当你打开一个新的终端窗口时。但是,许多发行版配置了.bash_profile.profile在登录时从.bashrc 读取配置,以确保这些设置也适用于登录shell。- source ~/.bashrc#### 总结差异- **适用范围**:/etc/profile对所有用户生效,而/.bashrc 仅对其所属用户生效。- **执行时机**:/etc/profile 在用户登录时执行一次,/.bashrc在每次打开新的shell会话时执行。通常,系统级别的环境配置会放在/etc/profile中或/etc/profile.d/目录下的脚本中;个性化的或特定用户的环境配置则会放在该用户的/.bashrc``` 文件中。这样,系统管理员可以对所有用户设置统一的环境变量,而用户可以为自己的shell会话设置个性化的配置。

4.hive安装

1.教程:

HIve安装配置(超详细)-CSDN博客

1.mysql依赖:cp /usr/share/java/mysql-connector-java.jar $HIVE_HOME/lib

2.vi $HIVE_HOME/conf/hive-site.xml配置文件

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- jdbc连接的URL -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop01:3306/metastore?useSSL=false&amp;serverTimezone=Asia/Shanghai</value>
    </property>
    
    <!-- jdbc连接的Driver-->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.cj.jdbc.Driver</value>
    </property>
    
    <!-- jdbc连接的username-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <!-- jdbc连接的password -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123123</value>
    </property>

    <!-- Hive默认在HDFS的工作目录 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/usr/hive/warehouse</value>
    </property>
</configuration>
您提供的 XML 配置文件是用于配置 Hive Metastore 的连接信息。这个配置文件通常位于 Hive 的配置目录中,例如 `/etc/hive/conf` 或 `$HIVE_HOME/conf`。这个配置文件指定了 Hive 如何连接到 MySQL 数据库来存储和管理元数据。

以下是对配置文件中各个部分的解释:

1. **ConnectionURL**:
   - `<name>javax.jdo.option.ConnectionURL</name>`
   - `<value>jdbc:mysql://hadoop01:3306/person?useSSL=false</value>`
   - 这行配置指定了 JDBC 连接的 URL,它指向运行在 `hadoop001` 主机上端口 `3306` 的 MySQL 数据库,数据库名为 `metastore`,并且禁用了 SSL 连接。

2. **ConnectionDriverName**:
   - `<name>javax.jdo.option.ConnectionDriverName</name>`
   - `<value>com.mysql.jdbc.Driver</value>`
   - 这行配置指定了 JDBC 连接使用的驱动类名,这里是 MySQL 的 JDBC 驱动。

3. **ConnectionUserName**:
   - `<name>javax.jdo.option.ConnectionUserName</name>`
   - `<value>root</value>`
   - 这行配置指定了连接数据库的用户名。

4. **ConnectionPassword**:
   - `<name>javax.jdo.option.ConnectionPassword</name>`
   - `<value>123456</value>`
   - 这行配置指定了连接数据库的密码。

5. **hive.metastore.warehouse.dir**:
   - `<name>hive.metastore.warehouse.dir</name>`
   - `<value>/user/hive/warehouse</value>`
   - 这行配置指定了 Hive 默认在 HDFS 上的工作目录,用于存储数据。

确保这些配置正确无误,并且 MySQL 数据库已经在指定的主机和端口上运行,用户名和密码也正确,这样 Hive 才能成功连接到 Metastore 数据库。如果您遇到连接问题,请检查网络连接、数据库服务状态以及配置文件中的信息是否准确。
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- jdbc连接的URL -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop01:3306/metastore?useSSL=false</value>
    </property>
    
    <!-- jdbc连接的Driver-->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.cj.jdbc.Driver</value>
    </property>
    
    <!-- jdbc连接的username-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <!-- jdbc连接的password -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123123</value>
    </property>

    <!-- Hive默认在HDFS的工作目录 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>
<!-- 指定metastore服务的地址 -->
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://hadoop01:9083</value>
</property>

</configuration>

2.命令

3.启动

/export/servers/hive/bin/hive

hive --service metastore

/export/servers/spark/sbin/start-master.sh

spark-shell --master spark://hadoop01:7077

hive> show databases;
hive> show tables;
hive> create table stu(id int, name string);
hive> insert into stu values(2110461220,"lyl");
hive> select * from stu;
// 导入必要的库
import spark.implicits._
import org.apache.spark.sql.SaveMode

// 创建一个简单的DataFrame
val data = Seq((220, "liyuelong"), (1,"zhangsan"), (2, "wangwu")) 
val df = data.toDF("id", "name")

// 将DataFrame写入Hive表。表名为stu
// 确保Hive支持已经被启用
df.write.format("Hive").mode(SaveMode.Append).saveAsTable("stu")
spark.sql("select * from stu").show
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.SaveMode

// 创建SparkSession实例
val spark = SparkSession.builder().
     |   appName("Insert Data into Hive with Adjusted Schema").
     |   config("spark.master", "local").
     |   enableHiveSupport().
     |   getOrCreate()

// 创建一个示例RDD
val data = Seq(
Row(1, "Alice"),
Row(2, "Bob")
)
val rdd = spark.sparkContext.parallelize(data)

// 定义与数据相对应的调整后的schema
val schema = new StructType()
.add(StructField("id", IntegerType, true))
.add(StructField("name", StringType, true))

// 创建DataFrame
val df2 = spark.createDataFrame(rdd, schema)

// 插入数据到Hive表
df2.write.format("Hive").mode(SaveMode.Append).saveAsTable("stu")

spark.sql("select * from stu").show

// 停止SparkSession
spark.stop()

3.实验

create database spark;

CREATE TABLE IF NOT EXISTS spark.stu (
    id INT,
    name STRING,
    age INT
);

INSERT INTO TABLE spark.stu VALUES (220, 'lyl', 21), (101, 'lisi', 22);

5.MySQL

1.教程

Linux安装mysql8.0(官方教程!)-CSDN博客

HIve安装配置(超详细)-CSDN博客

hive安装与配置(提供文件、脚本,有字幕)_哔哩哔哩_bilibili

2.下载MySQL依赖及本体

MySQL :: Download MySQL Connector/J (Archived Versions)

MySQL :: Download MySQL Community Server (Archived Versions)

3.命令

hSGeKql0KG

tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar -C /usr/local/mysql_lib/

GRANT ALL PRIVILEGES ON . TO ‘mysql’@‘%’ IDENTIFIED BY ‘123123’ WITH GRANT OPTION;

mysql -uroot -p123123

systemctl start mysqld #启动mysql服务器

systemctl status mysqld #查看服务器状态

systemctl enable mysqld #设置虚拟机开机mysql服务自动启动

import org.apache.spark.sql.SparkSession

// 创建或获取SparkSession
val spark = SparkSession.builder()
  .appName("MySQL Example")
  .master("local") // 如果您在集群上运行,请更改此设置
  .getOrCreate()

// 设置连接MySQL的参数
val url = "jdbc:mysql://localhost:3306/your_database"
val table = "your_table"
val driver = "com.mysql.cj.jdbc.Driver"
val user = "your_username"
val password = "your_password"

// 从MySQL数据库读取数据
val df = spark.read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", table)
  .option("user", user)
  .option("password", password)
  .option("driver", driver)
  .load()

// 显示数据
df.show()

// 执行查询
val result = df.select("column1", "column2").filter("column1 > 100")
result.show()

// 关闭SparkSession(可选)
// spark.stop()

6.sparkSQL操作hive注意事项

①sparkSQL连接hive需要复制配置文件:hive-site.xml,命令如下

cp conf/hive-site.xml /export/servers/spark/conf/

②还需要将mysql的依赖包复制到spark的jars目录下

cp /export/servers/hive/lib/mysql-connector-java-5.1.37-bin.jar /export/servers/spark/jars/

具体参考书本97页

③先启动hadoop,MySQL,后启动hive以及spark-shell

④先在hive中插入数据

hive> show databases;
hive> show tables;
hive> create table stu(id int, name string);
hive> insert into stu values(2110461220,“lyl”);
hive> select * from stu;

4.进入spark-shell

进入刚才创建好的数据仓库

spark.sql("use metastore ")

查看有哪些表

spark.sql(“show tables”).show;

查看表的数据

spark.sql(“select * from info”).show

编写scala代码向info插入数据

import org.apache.spark.sql.SparkSession

// 创建或获取 SparkSession,开启 Hive 支持
val spark = SparkSession.builder()
.appName(“Write to Hive Example”)
.enableHiveSupport()
.getOrCreate()

// 确保导入隐式转换
import spark.implicits._

// 创建一个 DataFrame 来模拟您想要写入的数据
val newData = Seq((211, “lyl”,21), (0461, “liyuelong”,21)).toDF(“220”, “张三”,20)

// 将 DataFrame 写入 Hive 表,这里使用的是追加模式
newData.write.mode(“append”).saveAsTable(“info”)

// (可选)读取 Hive 表并显示结果来验证写入是否成功
spark.sql(“SELECT * FROM my_table”).show()

// 停止 SparkSession
spark.stop()

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

// 创建SparkSession(确保enableHiveSupport被调用)
val spark = SparkSession.builder()
  .appName("RDD to Hive")
  .enableHiveSupport()
  .getOrCreate()

// 使用SparkSession对象获取SparkContext
val sc = spark.sparkContext

// 创建一个由元组组成的RDD
val dataRDD: RDD[(Int, String, Int)] = sc.parallelize(Seq((211, "lyl", 21), (461, "liyuelong", 21)))

// 将RDD[(Int, String, Int)]转换为RDD[Row]
val rowRDD: RDD[Row] = dataRDD.map(attributes => Row(attributes._1, attributes._2, attributes._3))

// 定义模式
val schema = StructType(Array(
  StructField("id", IntegerType, nullable = false),
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true)
))

// 将RDD[Row]转换为DataFrame
val dataDF = spark.createDataFrame(rowRDD, schema)

// 将DataFrame写入Hive表
dataDF.write.mode("append").saveAsTable("info")

// 停止SparkSession
spark.stop()

7.habase

Hadoop之Hbase安装和配置_hadoop之hbase的安装与配置-CSDN博客

vim hbase-site.xml
vim hbase-env.sh

<property>
    <name>hbase:rootdir</name>
    <value>hdfs://192.168.138.101:9000/hbase</value>#所有节点一样
</property>
<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
</property>
<property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/export/servers/zookeeper/zkdata/</value>
</property>
<property>
    <name>hbase.zookeeper.property.clientPort</name>
    <value>2181</value>
</property>
export JAVA_HOME=/export/servers/jdk

export HBASE_CLASSPATH=/export/servers/zookeeper/conf
export HBASE_MANAGES_ZK=false
<configuration>
       <property>
              <name>hbase.rootdir</name>
              <value>hdfs://hadoop01:9000/hbase</value>
       </property>
       <property>  
              <name>hbase.cluster.distributed</name>
              <value>true</value>
       </property>
       <property>  
              <name>hbase.master.info.port</name>
              <value>16010</value>
       </property>
       <property>   
              <name>hbase.regionserver.info.port</name>
              <value>16030</value>
       </property>
       <property>
              <name>hbase.zookeeper.quorum</name>
              <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
       </property>
       <property>
              <name>hbase.zookeeper.property.dataDir</name>
              <value>/export/servers/zookeeper/zkdata</value>
       </property>
       <property>
               <name>hbase.unsafe.stream.capability.enforce</name>
               <value>false</value>
       </property>
       <property>
              <name>dfs.replication</name>
              <value>2</value>
       </property>
       <property>
              <name>hbase.master.maxclockskew</name>
              <value>150000</value>
       </property>
</configuration>

scp -r /export/servers/hbase hadoop02:/export/servers

scp -r /export/servers/hbase hadoop03:/export/servers

2.命令

/export/servers/hbase/bin/start-hbase.sh

[root@hadoop01 bin]# cd /export/servers/hbase/bin
[root@hadoop01 bin]# ./hbase shell

7.zookeeper

1.教程

配置HBase和zookeeper_hbase-site 配置-CSDN博客

HBase、zookeeper的安装和配置详细教程_ookeeper+hadoop+hbase+hive(集成hbase)安装部署教程(超详细-CSDN博客

2.配置

zoo.cfg:

The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/export/servers/zookeeper/zkdata
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=hadoop01:2888:3888
server.2=hadoop02:2888:3888
server.3=hadoop03:2888:3888

~
您提供的信息是Apache ZooKeeper的`zoo.cfg`配置文件的内容。这个配置文件定义了ZooKeeper服务器的各种参数,包括基本的时间单位、集群同步限制、数据目录、客户端连接端口以及其他可选的高级设置。下面是对您提供的配置的详细解释:

- `tickTime=2000`:这是ZooKeeper使用的基本时间单位,以毫秒为单位。它用于心跳和超时。例如,下面的`initLimit`和`syncLimit`都是以`tickTime`的倍数来表示的。

- `initLimit=10`:这是从服务器(follower)连接到主服务器(leader)并完成初始同步所需的最大时间。它是以`tickTime`的倍数来表示的,因此在这个例子中,初始同步最多可以花费`10 * 2000 = 20000`毫秒(或20秒)。

- `syncLimit=5`:这是从服务器与主服务器之间的同步通信超时时间。在这个例子中,它是`5 * 2000 = 10000`毫秒(或10秒)。

- `dataDir=/tmp/zookeeper`:这是ZooKeeper存储持久数据的目录。在这个例子中,它被设置为`/tmp/zookeeper`。通常不建议使用`/tmp`作为数据目录,因为`/tmp`目录可能会被系统定期清理。

- `clientPort=2181`:这是ZooKeeper服务器监听客户端连接的端口。

- `maxClientCnxns=60`:这是可选配置,用于限制单个客户端IP与ZooKeeper服务器之间的并发连接数。在这个例子中,它被注释掉了,意味着没有设置限制。

- `autopurge.snapRetainCount=3`和`autopurge.purgeInterval=1`:这些是自动清理功能的配置。`autopurge.snapRetainCount`定义了要保留的快照文件数量,而`autopurge.purgeInterval`定义了自动清理任务的执行间隔(小时)。在这个例子中,它们都被注释掉了,意味着自动清理功能被禁用。

- `server.1=hadoop01:2888:3888`、`server.2=hadoop02:2888:3888`和`server.3=hadoop03:2888:3888`:这些是ZooKeeper集群中服务器的配置。每个服务器都有一个唯一的ID(在这个例子中是1、2和3),以及主机名和两个端口。第一个端口(2888)用于集群内服务器之间的通信,第二个端口(3888)用于领导者选举。

在部署ZooKeeper集群时,请确保每个服务器上的`zoo.cfg`文件都正确配置,并且每个服务器上的`myid`文件(位于`dataDir`指定的目录中)包含对应于`server.X`中的`X`的唯一数字。此外,确保所有服务器都可以通过网络相互访问,并且防火墙规则允许通过这些端口进行通信。

scp -r /export/servers/zookeeper hadoop02:/export/servers

3.命令

/export/servers/zookeeper/bin/zkServer.sh start

8.kafka

1.教程

阿帕奇卡夫卡 (apache.org)

kafka和scala,版本有关,依赖zookeeper

version:2.12-3.5.1,表示Scala版本是2.12,Kafka版本是基于此的3.5.1版本。

listeners=PLAINTEXT://0.0.0.0:9092

broker.id=3

advertised.listeners=PLAINTEXT://hadoop03:9092

log.dirs=/export/servers/kafka/log

zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181

scp -r /export/servers/kafka hadoop02:/export/servers

2.命令

bin/kafka-server-start.sh config/server.properties

3.实验内容

cd /export/servers/kafka
$kafka
./bin/kafka-topics.sh --create --bootstrap-server hadoop01:9092 --replication-factor 3 --partitions 1 --topic itcasttopic

hadoop01创建主题top1
$kafka kafka-topics.sh --create --topic top1 --partitions 3 --replication-factor 1 --bootstrap-server hadoop01:9092

列出哪些主题
$kafka kafka-topics.sh --list --bootstrap-server hadoop01:9092
删除主题
$kafka kafka-topics.sh --delete --topic top1 --bootstrap-server hadoop01:9092

实验4
[root@hadoop01 kafka]#
生产者
$kafka kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic itcasttopic

>hello kafka
>2110461220 lyl
>

[root@hadoop02 kafka]# 
消费者
$kafka kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic itcasttopic

hello kafka
2110461220 lyl
实验4
词频统计的两个主题hadoop01创建

$kafka kafka-topics.sh --create --bootstrap-server hadoop01:9092 --replication-factor 3 --partitions 1 --topic testStreams1

$kafka kafka-topics.sh --create --bootstrap-server hadoop01:9092 --replication-factor 3 --partitions 1 --topic testStreams2

$kafka kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testStreams1

$kafka kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hado03:9092 --topic testStreams2

9.sparkstreaming

Spark Streaming是Apache Spark的一部分,它不需要单独安装,而是作为Spark框架的一个组件提供。要使用Spark Streaming,您需要确保已经安装了Apache Spark,并且版本支持Streaming功能。以下是使用Spark Streaming的基本步骤:

  1. 安装Apache Spark:- 下载Spark的二进制发行版。您可以从Apache Spark官方网站下载预编译的版本,或者从源代码编译。- 解压下载的文件到您的系统中。- 配置环境变量,例如将Spark的bin目录添加到您的PATH中。
  2. 配置Spark:- 根据您的需求配置Spark。例如,如果您打算使用Spark Streaming处理来自Kafka的数据,您可能需要配置Spark与Kafka的集成。- 编辑conf/spark-defaults.confconf/spark-env.sh文件来设置必要的参数。
  3. 编写Spark Streaming应用程序:- 使用Scala、Java或Python编写Spark Streaming应用程序。- 在您的代码中,您需要导入Spark Streaming相关的库,并创建StreamingContext来初始化Streaming应用程序。
  4. 运行Spark Streaming应用程序:- 使用Spark的spark-submit工具来提交您的Spark Streaming应用程序。例如:./bin/spark-submit --class your.main.Class --master local[*] your-application.jar- 在这里,your.main.Class是包含Spark Streaming应用程序主类的完全限定名,your-application.jar是包含您的应用程序代码的JAR文件。
  5. 监控和管理:- 使用Spark的Web UI(通常在端口40

10.电影推荐

hdfs创建文件夹:hdfs dfs -mkdir -p /spark/mldata

hdfs 删除文件:hadoop fs -rm /spark/mldata/ml-100.zip

dfs -put /export/servers/spark_MLlib/data/ml-100k.zip /spark/mldata/

unzip /export/servers/spark_MLlib/data/ml-100k.zip -d /export/servers/spark_MLlib/data/unzipped/

hdfs dfs -put /export/servers/spark_MLlib/data/unzipped/* /spark/mldata/

1.上传训练模型数据至hdfs/spark/mldata文件夹
  创建文件夹:
hdfs dfs -mkdir -p /spark/mldata
  上传文件:
hdfsdfs-put/export/servers/spark_MLlib/data/unzipped/* /spark/mldata/
   HDFS中的文件截图:
2.数据处理
     u.data中有4个字段,分别表示用户id,电影id,等级评价和时间戳
①Spark-shell读取u.data数据文件,将其转换成RDD
val dataRdd = sc.textFile("/spark/mldata/ml-100k/u.data")
#输出rdd第一行数据
dataRDD.first()
②#去除时间戳这一列
val dataRdds = dataRdd.map(_.split('\t').take(3))
dataRdds.first()

3. 导入ALS算法模型库,调用train()函数训练模型
①训练模型需要Rating格式的数据,将dataRdds使用map方法进行转换
import org.apache.spark.mllib.recommendation.Rating
valratings=dataRdds.map{ caseArray(user,movie,rating) => Rating(user.toInt,movie.toInt,rating.toDouble)}

②训练模型
import org.apache.spark.mllib.recommendation.ALS
val model = ALS.train(ratings,50,10,0.01)
③创建推荐引擎模型矩阵分解对象
val predictedRating = model.predict(100,200)
用户id100,电影id200,预测电影评级约为2.45
4.为用户推荐多个电影
①recommendProducts函数实现功能
    #定义用户id
val userid = 100
#定义推荐数量
    val num = 10
val topRecoPro = model.recommendProducts(userid,num)
#返回值为Rating数据类型数组,分别表示用户id,推荐电影id,算法得出的评分,评分越高,引擎优先推荐

②使推荐效果更直观
     将u.item文件中电影id和电影名称进行映射
val moviesRdd = sc.textFile("/spark/mldata/ml-100k/u.item")

valtitles=moviesRdd.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap()

topRecoPro.map(rating=>(titles(rating.product),rating.rating)).foreach(println)

本文转载自: https://blog.csdn.net/nd15911827/article/details/142139328
版权归原作者 想象中的程序员 所有, 如有侵权,请联系我们删除。

“Spark总结”的评论:

还没有评论