0


备战大数据比赛:个人经验与实战技巧分享

基础配置

创建虚拟机,后首先测试是否ping通

ping www.baidu.com

1.修改服务器名称

修改主机名:

$ hostnamectl set-hostname master01  #临时改名,重启还原

 bash

永久修改:

修改配置文件

vi /etc/hostname 将内容改为master,保存退出

vi /etc/sysconfig/network

NETWORKING=yes

HOSTNAME=master  修改为新名

使每一台机器都能通过名字识别另一个机器【配置 hosts实现主机名和ip地址映射】

vi /etc/hosts

192.168.222.171 Competition2024master Competition2024master.root

192.168.222.172 Competition2022node1 Competition2022node1.root

192.168.222.173 Competition2022node2 Competition2022node2.root

2.大数据集群网络配置

2.1编辑虚拟网络

首先以管理员身份进入,在页面左上角寻找“编辑”,点击进入后选择“虚拟网络编辑器”,点击“添加网络”,选择”VMnet8”,将”仅主机模式”改为“NAT模式”,

第一步:

子网IP:192.168.222.0 #子网的最后一位一定是0

子网掩码:255.255.255.0

第二步:DHCP设置

起始IP地址:192.168.222.128

结束IP地址:192.168.222.254

第三步:NAT设置

网关IP:192.168.222.2

以上三步做完点击“应用”,再点击“确定”

现在把所有虚拟机都打开设置里面的IP

2.2虚拟机网络配置与远程连接

网络配置

一.先对master进行操作

1.cd /etc/sysconfig/network-scripts

2.vi ifcfg-ens33

3.BOOTPROTO=dhcp 改为BOOTPROTO=static #静态的意思

4.ONBOOT=no 改为ONBOOT=yes

5.再添加5条记录

IPADDR=192.168.222.171

GATEWAY=192.168.222.2

NETMASK=255.255.255.0

DNS1=192.168.222.2

DNS2=8.8.8.8

#IPADDR 是IP地址的意思

#GATEWAY 是网关的意思

#NETWORK是子网掩码的意思

6.网络服务重启

  service network restart

关闭防火墙

systemctl stop firewalld
   克隆虚拟机--

二.再对其他三个虚拟机进行操作

以上操作相同不同点是

IPADDR分别为

IPADDR=192.168.222.172

IPADDR=192.168.222.173

IPADDR=192.168.222.174

集群时间同步

时间同步的前提:网络配置完成

时间同步的分工

master 作为时间同步的服务器,slave1,slave2和masterbak向master同步

准备工作:挑选时区

tzselect 5 9 1 1

可在profile 文件中添加命令,使时区保存

vi /etc/profile 最低部添加 TZ='Asia/Shanghai'; export TZ

安装时间同步的软件ntp

【注】三台机器都要安装:

yum install -y ntp 直接下载失效,首先配置

禁用镜像列表(mirrorlist)并启用直接链接(baseurl)

方法一:

到网页

mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup

上传Centos-7.repo

mv /etc/yum.repos.d/Centos-7.repo /etc/yum.repos.d/CentOS-Base.repo

方法二

sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo

sed -i s/^#.*baseurl=http/baseurl=https/g /etc/yum.repos.d/*.repo

sed -i s/^mirrorlist=http/#mirrorlist=https/g /etc/yum.repos.d/*.repo

方法三:

sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 显示wget找不到命令

sudo yum clean all

sudo yum makecache

3.修改ntp 配置文件

ntpq -c version

ntpq 4.2.6p5@1.2349-o Tue Jun 23 15:38:21 UTC 2020 (1)

只需要在master中做:

vi /etc/ntp.conf

在末尾追加

server 127.0.0.1

fudge 127.0.0.1 stratum 10

注意先关闭防火墙-----一般没关

systemctl stop firewalld

sudo ntpdate pool.ntp.org 手动同步时间

sudo firewall-cmd --list-all 查看状态

4.重启时间服务

systemctl restart ntpd.service

5.在slave1和slave2上去更新

ntpdate master #以master为标准进行时间的校对【无用】

SSH免密登录

1.三台机器上生成公钥-私钥

ssh-keygen -t dsa #注意事项 ssh-keygen空格-t空格dsa

2.分节点先切换到.ssh目录下:

cd /root/.ssh #注意事项 不要忽略ssh前面的点(“.”)

3.将公钥传递过去:

scp id_dsa.pub root@master:/root/.ssh/s1.pub

#s1.pub一定要加,意思是传递过去 之前名为id_dsa.pub,和master中公钥名称相同

   s2.pub,mb.pub 操作同上

4.在master中将三个公钥合并成一个公钥包

先切换到/root/.ssh目录下面:

cd /root/.ssh

将三个公钥合并成一个公钥包:

cat id_dsa.pub mb.pub s1.pub s2.pub >> authorized_keys

5.将master中合成的公钥包分别传递到slave1和slave2的/root/.ssh下:

scp authorized_keys root@slave1:/root/.ssh

scp authorized_keys root@slave2:/root/.ssh 

scp authorized_keys root@masterbak:/root/.ssh

#传递过去后不用另起别名了,因为/root/.ssh下面没有叫authorized_keys的

6.现在可以免密登录了

假设现在在master中

ssh slave1 #免密登录到slave1

exit #退出slave1,重回master中

Java配置

将JDK安装到特定的目录下:

mkdir -p /usr/java #创建一个目录用来安装jdk

5.安装:

tar -zxvf jdk-8u17-linux-x64.tar.gz -C /usr/java #这里的C是大写

               名称

Java -version : jdk1.8.0_171

6.vi /etc/profile

export JAVA_HOME=/usr/java/jdk1.8.0_212

export CLASSPATH=.:$JAVA_HOME/lib

export PATH=$PATH:$JAVA_HOME/bin

从打红色箭头的一行开始编辑,一共编辑以上四行,然后退出

7.让环境变量生效

source /etc/profile

大数据组件-Hadoop

  1. Hadoop 完全分布式搭建

创建/usr/Hadoop

tar -zxvf -C 解压文件到/usr/hadoop中

1.配置环境变量

vi /etc/profile

#hadoop

export HADOOP_HOME=/usr/hadoop/hadoop-3.1.3

export CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib

export PATH=$PATH:$HADOOP_HOME/bin

使生效 source /etc/profile

4.3 hadoop集群配置

在hadoop-3.1.3 位置下有 etc/hadoop 位置

cd etc/hadoop 进入

1.vi hadoop-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_212

2.vi core-site.xml

<property>

<name>fs.default.name</name>

<value>hdfs://master:9820</value>

</property>

<property>

<name>hadoop.temp.dir</name>

<value>/usr/hadoop/hadoop-3.1.3/tmp</value>

</property>

**3.vi yarn-site.xml **

<property>

    <name>yarn.resourcemanager.hostname</name>

    <value>master</value>

</property>

<property>

    <name>yarn.nodemanager.aux-services</name>

    <value>mapreduce_shuffle</value>

</property>

<property>

    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

    <value>org.apache.hadoop.mapred.ShuffleHandler</value>

</property>

4.vi workers【关键步骤】

填入

master

slave1

slave2

5.vi hdfs-site.xml

<property>

<name>dfs.replication</name>  #Block副本数,默认3

<value>2</value>

</property>

<property>

<name>dfs.namenode.name.dir</name>

<value>file:/usr/hadoop/hadoop-3.1.3/hdfs/name</value>

<final>true</final>

</property>

<property>

<name>dfs.datanode.data.dir</name>

<value>file:/usr/hadoop/hadoop-3.1.3/hdfs/data</value>

<final>true</final>

</property>

<property>

<name>dfs.namenode.secondary.http-address</name>

<value>master:9001</value>

</property>

<property>

<name>dfs.namenode.http-address</name>

<value>192.168.222.171:9870</value>

</property>

<property>

<name>dfs.permissions</name>

<value>false</value>

</property>

<property>

    <name>dfs.webhdfs.enabled</name>

    <value>true</value>

</property>

C:\Windows\System32\drivers\etc\hosts 修改好对应配置,否则hdfs Web端无法预览文件

6.vi mapred-site.xml

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

HDFS系统

分发:

scp -r /usr/hadoop root@slave1:/usr

scp -r /usr/Hadoop root@slave2:/usr

hadoop namenode -format 格式化

下面对应hdfs文件夹直接删除才可重新

master:

开:

hdfs --daemon start namenode

yarn --daemon start resourcemanager

hdfs --daemon start secondarynamenode

关:

hdfs --daemon stop namenode

yarn --daemon stop resourcemanager

hdfs --daemon stop secondarynamenode

slaves:

开:

hdfs --daemon start datanode

yarn --daemon start nodemanager

关:

hdfs --daemon stop datanode

yarn --daemon stop nodemanager

注:不可start-all一次性启动了

jps查看进程

即成功

http://192.168.222.171:50070/ 浏览器查看namenode信息

还有9001 查看2nn 信息

2nn信息显示不出来,解决方案:

cd etc/hadoop

find /usr/hadoop/hadoop-3.1.3/ -name dfs-dust.js

vi /usr/hadoop/hadoop-3.1.3/share/hadoop/hdfs/webapps/static/dfs-dust.js

:se nu显示行数

61行修改为:return new Date(Number(v)).toLocaleString();

清除网页缓存 刷新后显示

打开网站之前注意关闭防火墙(否则连不上)systemctl stop firewalld

关闭安全模式:

hdfs dfsadmin -safemode enter/leave

  1. HDFS 基本操作命令、Java-API 编写

2.1 HDFS 基本操作命令

  1. 上传文件到 HDFS

hdfs dfs -put /path/to/local/file /path/to/hdfs/directory

  1. 从 HDFS 下载文件

hdfs dfs -get /path/to/hdfs/file /path/to/local/directory

  1. 列出 HDFS 目录

hdfs dfs -ls /path/to/hdfs/directory

  1. 删除 HDFS 中的文件或目录

hdfs dfs -rm -r /path/to/hdfs/directory

  1. 查看文件内容

hdfs dfs -cat /path/to/hdfs/file

2.2 HDFS Java API 编写

使用 Hadoop Java API 可以操作 HDFS。以下是一个简单的 HDFS 操作示例,包括文件上传、下载和读取。

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import java.io.InputStream;

import java.net.URI;

public class HDFSExample {

    public static void main(String[] args) throws Exception {

        // 1. 创建 HDFS 配置

        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "hdfs://master:9000");

        // 2. 获取 HDFS 文件系统对象

        FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf, "hadoop");

        // 3. 上传文件到 HDFS

        fs.copyFromLocalFile(new Path("/local/path/file.txt"), new Path("/hdfs/path/file.txt"));

        System.out.println("File uploaded to HDFS.");

        // 4. 读取 HDFS 文件内容

        InputStream in = fs.open(new Path("/hdfs/path/file.txt"));

        byte[] buffer = new byte[1024];

        int bytesRead;

        while ((bytesRead = in.read(buffer)) > 0) {

            System.out.write(buffer, 0, bytesRead);

        }

        // 5. 下载文件到本地

        fs.copyToLocalFile(new Path("/hdfs/path/file.txt"), new Path("/local/path/downloaded.txt"));

        System.out.println("File downloaded from HDFS.");

        // 6. 关闭 HDFS 文件系统

        fs.close();

    }

}

3. MapReduce 编程、数据预处理、指标计算

3.1 MapReduce 基本概念

MapReduce 是 Hadoop 的分布式计算框架,用于大规模数据处理。它包括两个阶段:

  • Map 阶段:将输入数据分割成键值对,并执行处理。
  • Reduce 阶段:对 Map 阶段的输出进行聚合或汇总。

3.2 MapReduce 编程示例

以下是一个简单的 WordCount 示例:

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

import java.util.StringTokenizer;

public class WordCount {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            StringTokenizer itr = new StringTokenizer(value.toString());

            while (itr.hasMoreTokens()) {

                word.set(itr.nextToken());

                context.write(word, one);

            }

        }

    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int sum = 0;

            for (IntWritable val : values) {

                sum += val.get();

            }

            result.set(sum);

            context.write(key, result);

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(IntSumReducer.class);

        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

3.3 数据预处理和指标计算

使用 MapReduce 可以对原始数据进行预处理和计算常见指标,比如去重、排序、聚合等。你可以通过自定义 Mapper 和 Reducer 实现这些操作。

4. 任务提交、优化、排错

4.1 提交 MapReduce 任务

将编写的 MapReduce 任务打包为 JAR 文件,然后提交到集群上运行:

hadoop jar WordCount.jar WordCount /input/path /output/path

4.2 优化

  1. 调优 Mapper 和 Reducer 数量

增加 mapreduce.job.maps 和 mapreduce.job.reduces 的数量,以提高并行度。

  1. 压缩 Map 输出

启用 Map 输出压缩来减少网络传输:

mapreduce.map.output.compress=true

  1. 使用合适的数据格式

使用更高效的文件格式(如 Parquet、ORC)以减少 I/O 开销。

  1. 数据本地化

确保计算尽可能靠近数据存储节点,减少网络传输。

4.3 排错

  1. 检查日志文件

Hadoop 任务的日志可以帮助调试。日志通常存储在 HDFS 中的 logs 目录或本地机器的 /tmp/hadoop-logs 目录中。

  1. YARN UI

通过 YARN 的 web UI (http://master:8088) 可以查看任务的执行状态、日志和资源消耗。

  1. 使用 jobhistory 工具

通过 Hadoop Job History 可以查看任务执行的详细信息。

大数据组件-HBase

1. HBase 集群搭建

1.hbase安装:master上

1.准备将windows系统中的hbase上传到/opt/soft:

cd /opt/soft #由于之前创建过了这里直接切换到/opt/soft目录

2.点击上传按钮,将hbase上传

3.将hbase安装到特定的目录下面:

mkdir -p /usr/hbase #创建一个目录用来安装hbase

4.安装:

tar -zxvf /opt/soft/hbase-1.2.4-bin.tar.gz -C /usr/hbase

配置conf/hbase-env.sh

/usr/hbase/hbase-2.4.11/conf

export HBASE_MANAGES_ZK=false

export JAVA_HOME=/usr/java/jdk1.8.0_212

export HBASE_CLASSPATH=/usr/hadoop/hadoop-3.1.3/etc/Hadoop

配置conf/hbase-site.xml

(5项)注意:机器名:端口号 与Hadoop core-site.xml主机一致

<property>

<name>hbase.rootdir</name>

<value>hdfs://master:9820/hbase</value>

</property>

<property>

<name>hbase.cluster.distributed</name>

<value>true</value>

</property>

<property>

<name>hbase.master</name>

<value>hdfs://master:6000</value>

</property>

<property>

<name>hbase.zookeeper.quorum</name>

<value>master,slave1,slave2</value>

</property>

<property>

<name>hbase.zookeeper.property.dataDir</name>

<value>/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdata</value>

</property>

如果zookeeper的zoo.cfg中port:2181则不必配置下面一项。

<property>

<name>hbase.zookeeper.property.clientPort</name>

<value>12181</value>

</property>

#----------------------

#说明clientPort需要与zookeeper一致

#-----------------------------

配置conf/regionservers

(仅master)

slave1

slave2

masterbak

5.cp hadoop的两个配置文件过来

cp hadoop的hdfs-site.xml和core-site.xml到hbase的conf下

cp /usr/hadoop/hadoop-3.1.3/etc/hadoop/hdfs-site.xml /usr/hbase/hbase-2.4.11/conf/

cp /usr/hadoop/hadoop-3.1.3/etc/hadoop/core-site.xml /usr/hbase/hbase-2.4.11/conf/

6.分发到其他三台机器

scp -r /usr/hbase root@slave1:/usr

scp -r /usr/hbase root@slave2:/usr

scp -r /usr/hbase root@masterbak:/usr

配置环境变量:(所有机器)

#my hbase

export HBASE_HOME=/usr/hbase/hbase-2.4.11

export PATH=$PATH:$HBASE_HOME/bin

生效环境变量(4台机器)

master运行hbase【先启动zookeeper,hadoop】

hbase2.4.11: bin/start-hbase.sh

jps

192.168.222.171:16010

7.hbase shell

启动hbase shell

list

显示还没有任何表

退出:quit

status 查看状态

2. HBase Shell 操作

进入 HBase Shell:

$HBASE_HOME/bin/hbase shell

DDL 操作

  1. 创建表

create 'mytable', 'cf'

    • 'mytable' 是表名。- 'cf' 是列族名(列族是HBase表的基本存储单元)。
  1. 查看表

list

  1. 查看表的详细描述

describe 'mytable'

  1. 删除表

禁用表:

disable 'mytable'

删除表:

drop 'mytable'

DML 操作

  1. 插入数据

put 'mytable', 'row1', 'cf:column1', 'value1'

    • 'mytable' 是表名。- 'row1' 是行键(Row Key)。- 'cf:column1' 是列族和列名。- 'value1' 是要插入的值。
  1. 读取数据

get 'mytable', 'row1'

  1. 扫描表数据

scan 'mytable'

  1. 删除数据

delete 'mytable', 'row1', 'cf:column1'

  1. 计数表中的行数

count 'mytable'

3. Java-API 编写

步骤 1: 准备 HBase 和依赖 JAR

手动下载和添加 HBase、Hadoop 和 Zookeeper 的 JAR 文件。

步骤 2: 创建 Eclipse Java 项目

  1. 打开 Eclipse,选择 File -> New -> Java Project。
  2. 输入项目名称(例如:HBaseExample),并点击 Finish。

步骤 3: 手动添加 JAR 文件

  1. 右键点击项目(HBaseExample),选择 Build Path -> Configure Build Path。
  2. 在 Libraries 选项卡中点击 Add External JARs。
  3. 导航到 HBase、Hadoop、Zookeeper 的 lib 目录,手动添加所有 JAR 文件。 - HBase JAR:添加 hbase-client.jar、hbase-server.jar 以及 hbase-common.jar 等。- Hadoop JAR:添加 hadoop-common.jar、hadoop-hdfs.jar 等。- Zookeeper JAR:添加 zookeeper.jar。
  4. 确保所有必要的 JAR 文件都已添加,并点击 Apply and Close。

步骤 4: 编写 HBase Java API 代码

  1. 右键点击 src 文件夹,选择 New -> Class,输入类名(如 HBaseExample),并勾选 public static void main(String[] args),点击 Finish。
  2. 编写 HBase Java API 代码。以下是示例代码:
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HBaseExample {

    public static void main(String[] args) throws IOException {

        // 创建 HBase 配置

        Configuration config = HBaseConfiguration.create();

        config.set("hbase.zookeeper.quorum", "master,slave1,slave2");

        config.set("hbase.zookeeper.property.clientPort", "2181");

        // 创建 HBase 连接

        try (Connection connection = ConnectionFactory.createConnection(config)) {

            Admin admin = connection.getAdmin();

            // 创建表

            TableName tableName = TableName.valueOf("mytable");

            if (!admin.tableExists(tableName)) {

                TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)

                        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf"))

                        .build();

                admin.createTable(tableDescriptor);

                System.out.println("Table created: " + tableName);

            }

            // 插入数据

            try (Table table = connection.getTable(tableName)) {

                Put put = new Put(Bytes.toBytes("row1"));

                put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"), Bytes.toBytes("value1"));

                table.put(put);

                System.out.println("Data inserted.");

                // 读取数据

                Get get = new Get(Bytes.toBytes("row1"));

                Result result = table.get(get);

                byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1"));

                System.out.println("Retrieved value: " + Bytes.toString(value));

            }

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

}

步骤 5: 运行代码

  1. 确保 HBase 集群已启动

启动 HBase:

$HBASE_HOME/bin/start-hbase.sh

使用 jps 确认 HMaster 和 HRegionServer 是否正常运行。

在 Eclipse 中运行 Java 项目

右键点击 HBaseExample.java 文件,选择 Run As -> Java Application。

Eclipse 将编译并运行你的代码,控制台会显示程序的输出。

步骤 6: 验证输出

执行成功后,控制台会输出类似如下信息:

Table created: mytable

Data inserted.

Retrieved value: value1

这表明程序已成功连接到 HBase,创建了表 mytable,并执行了数据插入和读取操作。

大数据组件-Hive

  1. Hive 搭建

步骤 1: 安装 Hadoop

Hive 依赖 Hadoop,所以首先需要安装 Hadoop。如果 Hadoop 已经安装,请确保其配置正确。

步骤 2: 安装 Hive

(slave1)【Hadoop先启动好 --- 防火墙一定得关上】

1 上传安装包

   # cd /opt/soft

上传apache-hive-3.1.2-bin.tar.gz压缩包

mkdir /usr/hive

2 解压缩安装包

tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /usr/hive

3 修改hive的文件夹名称

cd /usr/hive

mv apache-hive-3.1.2-bin hive

4 添加hive的环境变量

vi /etc/profile

          =======添加内容如下======

----HIVE_HOME

export HIVE_HOME=/usr/hive/hive

export PATH=$PATH:$HIVE_HOME/bin

soruce /etc/profile

cd $HIVE_HOME

5 拷贝MySQL驱动包

cd /opt/soft

上传mysql驱动包 mysql-connector-java-5.1.37.jar

将mysql驱动包拷贝到hive中

cp mysql-connector-java-5.1.37.jar $HIVE_HOME/lib

在slave1:apache-hive/conf

cp hive-env.sh.template hive-env.sh

vi hive-env.sh

修改如下内容:

Set HADOOP_HOME to point to a specific hadoop

HADOOP_HOME=/usr/hadoop/hadoop-3.1.3

slave1中

cp hive-default.xml.template hive-site.xml

vi hive-site.xml

Shift+g跳到最后

添加内容:

<property>

    <name>hive.metastore.warehouse.dir</name> 

    <value>/warehousedir/home</value> 

</property>

<property>

  <name>javax.jdo.option.ConnectionURL</name>

<value>jdbc:mysql://slave2:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>

</property>

<property>

  <name>javax.jdo.option.ConnectionDriverName</name>

  <value>com.mysql.jdbc.Driver</value>

</property>

<property>

  <name>javax.jdo.option.ConnectionUserName</name>

  <value>root</value>

</property>

<property>

  <name>javax.jdo.option.ConnectionPassword</name>

  <value>123456</value>

</property>

<property>

  <name>hive.metastore.schema.verification</name>

  <value>false</value>

</property>

<property>

  <name>datanucleus.schema.autoCreateAll</name>

  <value>true</value>

</property>

<property>

    <name>hive.exec.scratchdir</name>

    <value>/tmp/hive</value>

  </property>

  <property>

    <name>hive.exec.local.scratchdir</name>

    <value>/tmp/hive/local</value>

  </property>

  <property>

    <name>hive.downloaded.resources.dir</name>

    <value>/tmp/hive/resources</value>

  </property>

slave1上启动hive服务

执行初始化Hive元数据库命令

schematool -initSchema -dbType mysql -verbos

先启动HADOOP

在/hive/ 输入命令: bin/hive (注意hive-site.xml配置是否正确,否则无法通过

启动hive

cd /usr/hive/hive/

bin/hive

hive>show databases;

hive> use default;

hive>show tables;

hive>quit;

  1. 基于Linux的MySQL安装、元数据存储到MySQL配置

步骤 1: 安装 MySQL(slave2)

MySQL安装包上传(在slave2上)

cd /opt/soft

1.上传到该目录mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar

2.解压缩第一层包(在slave2cd上)

cd /opt/soft

tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar

sudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm

sudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm --force --nodeps

sudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm --force --nodeps

sudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm

sudo yum install -y libaio

sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm --force --nodeps

3.删除配置文件(在slave2上)

   查看mysql所安装的目录(查看datadir的目录结果)

vi /etc/my.cnf 就是/var/lib/mysql这里

   删除datadir指向的目录所有文件内容

cd /var/lib/mysql

sudo rm -rf ./*

4 初始化数据库

    sudo mysqld --initialize --user=mysql

5 查看初始化密码( -localhost后面)

    sudo cat /var/log/mysqld.log

6 启动MySQL的服务

    sudo systemctl start mysqld

7 登录MySQL数据库

    mysql -u root -p 
          Enter password:输入mysqld.log中的密码

8 修改数据库密码

    mysql>set password = password("123456");

9 修改数据库任意连接

    mysql>update mysql.user set host='%' where user='root';

    mysql>flush privileges;

    mysql>quit;

10 测试mysql数据库

mysql -u root -p
   Enter password:123456

mysql>quit;

show databases;

创建一个数据库名为Hive,并设置编码为latin1,用于存储Hive的元数据

create database Hive default character set latin1;

创建一个特定的数据库用户hive,密码设置为123456,并将步骤3中创建的Hive库授权给该用户

create user 'hive'@'localhost' identified by '123456';

grant create,alter,drop,select,insert,update,delete on Hive.user_info to hive@localhost identified by '123456';

  1. DDL、DML、查询(基本查询、分组、join语句、排序)

Hive 支持常见的 SQL DDL 和 DML 操作。以下是常用的操作示例。

DDL 操作

  1. 创建数据库

CREATE DATABASE mydb;

  1. 创建表
CREATE TABLE students (

  id INT,

  name STRING,

  age INT,

  gpa FLOAT

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
  1. 删除表

DROP TABLE students;

DML 操作

  1. 插入数据

INSERT INTO students VALUES (1, 'Alice', 21, 3.5);

  1. 加载数据

LOAD DATA INPATH '/path/to/data' INTO TABLE students;

查询操作

  1. 基本查询

SELECT * FROM students;

  1. 分组查询

SELECT age, COUNT(*) FROM students GROUP BY age;

  1. 排序查询

SELECT * FROM students ORDER BY gpa DESC;

  1. Join 查询
SELECT a.name, b.course_name

FROM students a

JOIN courses b

ON a.id = b.student_id;

4. 函数(单行、聚合、炸裂、窗口)、自定义函数UDF

单行函数

  • 字符串函数

SELECT UPPER(name), LOWER(name) FROM students;

  • 日期函数

SELECT CURRENT_DATE();

  • 数学函数

SELECT ABS(-10), ROUND(3.1415, 2);

聚合函数

  • 求和、平均、计数

SELECT SUM(gpa), AVG(gpa), COUNT(*) FROM students;

炸裂函数(explode)

  • 用于将数组或嵌套数据展开为多行。

SELECT name, explode(array(1, 2, 3)) AS num FROM students;

窗口函数

  • **ROW_NUMBER()**:返回行号。
SELECT name, gpa, ROW_NUMBER() OVER (PARTITION BY age ORDER BY gpa DESC) AS rank FROM students;

自定义函数UDF

Hive 允许用户编写自定义函数(UDF)来扩展其功能。以下是如何编写和使用自定义UDF的步骤。

编写 UDF 示例

  1. 编写一个简单的 Hive UDF:
import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.Text;

public class UpperCaseUDF extends UDF {

    public Text evaluate(Text input) {

        if (input == null) {

            return null;

        }

        return new Text(input.toString().toUpperCase());

    }

}
  1. 编译 UDF: - 使用 Hadoop 和 Hive 的 jar 文件编译这个 Java 类。

javac -cp $(hadoop classpath):$(hive classpath) UpperCaseUDF.java

jar -cf UpperCaseUDF.jar UpperCaseUDF.class

  1. 将 UDF 注册到 Hive 中: - 将 UpperCaseUDF.jar 复制到 Hive 集群,并通过 Hive CLI 注册该 UDF。

ADD JAR /path/to/UpperCaseUDF.jar;

CREATE TEMPORARY FUNCTION to_upper AS 'UpperCaseUDF';

  1. 使用 UDF:

SELECT to_upper(name) FROM students;

大数据组件-Spark

  1. Spark三种搭建方式

1.1 Standalone模式

Standalone模式是Spark自带的集群管理器,适合快速测试或简单的生产环境。你不需要任何外部资源管理工具,Spark的Master节点充当资源调度器,Task节点作为工作节点。

优点

  • 简单易用,适合小型集群或测试环境。
  • 无需外部的集群管理工具。

搭建步骤

  1. 下载并解压Spark:

上传spark源码包到/opt/soft

mkdir /usr/spark

cd /opt/soft

tar -zxvf spark-3.1.1-bin-hadoop3.2.tgz -C /usr/spark

  1. 启动Spark Master和Worker:

./sbin/start-master.sh

./sbin/start-worker.sh spark://<master-ip>:7077

  1. 提交任务:

./bin/spark-submit --master spark://<master-ip>:7077 <your_spark_job.py>

安装scala [master]

[root@master ~]# mkdir /usr/scala

上传scala-2.11.12.tgz 到/opt/soft

yum install unzip

unzip -o -d /usr/scala scala-2.12.11.zip

[root@master soft]# tar -zxvf scala-2.11.12.tgz -C /usr/scala

配置环境变量

[root@master scala-2.11.12]# vi /etc/profile

加入如下三行:

#scala

export SCALA_HOME=/usr/scala/scala-2.12.11

export PATH=$PATH:$SCALA_HOME/bin

保存退出

[root@master scala-2.11.12]# source /etc/profile

[root@master scala-2.11.12]# scala -version

Scala code runner version 2.12.11 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.

分发到其他服务器,并配置环境变量

scp -r /usr/scala/ root@slave1:/usr

scp -r /usr/scala/ root@slave2:/usr

[我自己的Standalone]配置spark

进入spark安装目录下的conf

cd /usr/spark/spark-3.1.1-bin-hadoop3.2/conf/

cp workers.template slaves 并修改

vi进入:

最后一行

删掉localhost

填上

master

slave1

slave2

cp spark-env.sh.template spark-env.sh

spark-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_212

export SCALA_HOME=/usr/scala/scala-2.12.11

export SPARK_WORKER_MEMORY=1g

export HADOOP_CONF_DIR=/usr/hadoop/hadoop-3.1.3/etc/hadoop

cp spark-defaults.conf.template spark-site.conf

Example:

spark.master spark://master:7077

这两个文件暂时不必修改

cp fairscheduler.xml.template fairscheduler.xml

cp log4j.properties.template log4j.properties

分发到其他机器,并修改环境变量

scp -r /usr/spark root@slave1:/usr

scp -r /usr/spark root@slave2:/usr

vi /etc/profile

#spark

export SPARK_HOME=/usr/spark/spark-3.1.1-bin-hadoop3.2/

export PATH=$PATH:SPARK_HOME/bin

启动spark

在spark安装目录下:

sbin/start-all.sh

jps

spark安装目录下执行:bin/spark-shell

ctrl-z 退出

1.2 YARN模式

YARN(Yet Another Resource Negotiator)是Hadoop生态系统中的资源管理器。使用YARN可以让Spark与Hadoop无缝集成,允许它在Hadoop集群中运行。

优点

  • 与Hadoop集群完美结合,利用HDFS、MapReduce等资源。
  • 支持动态资源调度,适合大规模集群。

搭建步骤

  1. 确保Hadoop和YARN集群已配置和运行。
  2. 启动YARN Resource Manager:

start-yarn.sh

  1. 提交Spark任务到YARN:

./bin/spark-submit --master yarn <your_spark_job.py>

1.3 Mesos模式

Mesos是一个通用的集群管理系统,允许Spark与其他服务共享集群资源。

优点

  • 适用于需要多个框架共享集群资源的环境。
  • 支持多种类型的任务调度。

搭建步骤

  1. 安装并配置Mesos。
  2. 启动Mesos Master和Slave。
  3. 使用Spark提交任务到Mesos:

./bin/spark-submit --master mesos://<mesos-master-ip>:5050 <your_spark_job.py>

2. Spark Core

Spark Core是Spark的基础模块,负责内存管理、调度、任务分发、容错和与存储系统的交互。Spark Core包含以下核心概念:

2.1 RDD (Resilient Distributed Dataset)

RDD是Spark的核心抽象,代表一个不可变的分布式数据集。通过对RDD进行变换(transformation)或行动(action)来执行计算。

  • Transformation:转换操作,惰性执行。例如,map()、filter()。
  • Action:执行操作,立即触发计算。例如,count()、collect()。

示例代码:RDD的基本操作(scala)

import org.apache.spark.{SparkConf, SparkContext}

// 配置和初始化 Spark

val conf = new SparkConf().setAppName("Spark Core Example").setMaster("local")

val sc = new SparkContext(conf)

// 创建 RDD

val data = Array(1, 2, 3, 4, 5)

val rdd = sc.parallelize(data)

// Transformation: map 将每个元素乘以 2

val rdd2 = rdd.map(x => x * 2)

// Action: collect 收集结果并打印

val result = rdd2.collect()

result.foreach(println)

// 关闭 Spark

sc.stop()

3. Spark SQL

Spark SQL 提供了在Spark上使用结构化数据的API。它允许用户通过SQL查询数据,并且无缝集成到Spark的批处理和流处理任务中。Spark SQL使用DataFrame和Dataset API进行操作。

示例代码:使用DataFrame和SQL查询(scala)

import org.apache.spark.sql.SparkSession

// 初始化 SparkSession

val spark = SparkSession.builder()

    .appName("Spark SQL Example")

    .master("local")

    .getOrCreate()

// 创建 DataFrame

val df = spark.read.json("examples/src/main/resources/people.json")

// 显示 DataFrame 内容

df.show()

// 使用 SQL 查询

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT name, age FROM people WHERE age > 30")

sqlDF.show()

// 关闭 SparkSession

spark.stop()

优点:

  • 支持标准的SQL查询。
  • 可以直接与结构化数据(如JSON、Parquet、Hive表)集成。
  • 动态查询优化(Catalyst引擎)提高性能。

4. Spark Streaming

Spark Streaming 是用于实时数据处理的组件。它可以通过微批处理将实时数据流分批处理,每一小批数据都以RDD的形式进行处理。

示例代码:Spark Streaming 处理Socket数据流

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

// 从 TCP Socket 接收数据

val lines = ssc.socketTextStream("localhost", 9999)

val words = lines.flatMap(_.split(" "))

// 统计单词数

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

// 输出结果

wordCounts.print()

// 启动 StreamingContext

ssc.start()

ssc.awaitTermination()

优点:

  • 支持多种数据源:Kafka、Flume、HDFS等。
  • 适合处理实时流数据。
  • 支持窗口化操作和故障恢复。

5. Spark MLlib

MLlib 是 Spark 中的机器学习库,包含常见的机器学习算法和工具,如分类、回归、聚类和协同过滤。它基于Spark的分布式架构,能够对大规模数据集进行高效的机器学习任务。

示例代码:使用MLlib进行线性回归

import org.apache.spark.ml.regression.LinearRegression

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("LinearRegressionExample").getOrCreate()

// 加载数据

val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")

// 初始化线性回归模型

val lr = new LinearRegression()

    .setMaxIter(10)

    .setRegParam(0.3)

    .setElasticNetParam(0.8)

// 训练模型

val lrModel = lr.fit(data)

// 打印模型系数和截距

println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}"

// 评估模型

val trainingSummary = lrModel.summary

println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

// 关闭 SparkSession

spark.stop()

MLlib 支持的主要功能:

  • 分类:逻辑回归、决策树、随机森林等。
  • 回归:线性回归、岭回归等。
  • 聚类:KMeans、Gaussian Mixture等。
  • 降维:PCA、SVD等。

大数据组件-Zookeeper

  1. Zookeeper 集群安装

1.准备将windows系统中的zookeeper上传到/opt/soft:

cd /opt/soft #由于之前创建过了这里直接切换到/opt/soft目录

2.点击上传按钮,将zookeeper上传

3.将zookeeper安装到特定的目录下面:

mkdir -p /usr/zookeeper #创建一个目录用来安装zookeeper

4.安装:

tar -zxvf zookeeper-3.4.10.tar.gz -C /usr/zookeeper

2.Zookeeper配置(所有机器都需要)

1.切换到相关目录

cd /usr/zookeeper/ zookeeper-3.4.10/conf #接下来的步骤我们在这个目录下进行

2.ls #查看该目录下面有什么

3.我们需要配置文件,但是从“ls”可知,只给了我们一个模板,需要我们复制一份,然后再配置文件

4.cp zoo_sample.cfg zoo.cfg #zoo.cfg是它的新名字,内容和zoo_sample.cfg一样

5.编辑zoo.cfg

vi zoo.cfg

dataDir后面

改为

dataDir=/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdata

dataLogDir=/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdatalog

server.1=192.168.222.171:2888:3888

server.2=192.168.222.172:2888:3888

server.3=192.168.222.173:2888:3888

6.返回上一级位置

cd ..

7.创建我们在第5步要创建的文件

mkdir zkdata

mkdir zkdatalog

8.进入到zkdata目录下创建一个新文件并编辑它

cd zkdata

vi myid

编辑的内容和虚拟机有关系,如果是master,里面写入1,如果是slave1里面写入2,如果是slave2,里面写入3,如果是masterbak,里面写入4

9.接下来我们还要配置另外两台虚拟机,为了方便,我们直接远用scp,将master的zookeeper整个文件夹远程拷贝给slave1,slave2,masterbak,这样就不需要配置了

scp -r /usr/zookeeper root@slave1:/usr #这里考的是文件夹所以是scp -r 命令

scp -r /usr/zookeeper root@slave2:/usr #同样拷一份到slave2中去

scp -r /usr/zookeeper root@masterbak:/usr

10.拷贝过来的内容仍然需要做一些修改

到slave1中

cd /usr/zookeeper/zookeeper-3.4.10/zkdata #切换到该目录下

11.对myid 文件中的内容进行修改,因为不同的虚拟机,里面的内容不一样

vi myid

将里面的内容改为2

12.slave2同上10,11步骤,不同的是myid文件中,内容改为3

3.配置zookeeper的环境变量(所有都需要)

1.进入到指定的文件夹

vi /etc/profile

zookeeper的环境变量放在Java的环境变量下面

#Zookeeper------

export ZOOKEEPER_HOME=/usr/zookeeper/apache-zookeeper-3.5.7-bin/

export PATH=$PATH:$ZOOKEEPER_HOME

  1. 使环境变量生效

source /etc/profile

3.运行zookeeper

必须在zookeeper安装目录下

cd /usr/zookeeper/apache-zookeeper-3.5.7-bin/

bin/zkServer.sh start #这一步必须所有机器同时运行

可能会出现错误:关闭防火墙再看状态即可(systemctl stop firewalld)

端点占用:sudo yum install net-tools

netstat -apn | grep 2181

kill -9 xx

bin/zkServer.sh status #这一步选出谁是领导者谁是跟随者

jps 查看进程

4. Zookeeper 启停脚本编写

为了简化启动和停止 Zookeeper 集群的操作,我们可以编写一个脚本,通过 SSH 登录集群节点并启动或停止 Zookeeper。

启停脚本示例:zk_manage.sh

#!/bin/bash

ZOOKEEPER_HOME=/usr/zookeeper/apache-zookeeper-3.5.7-bin

ZOOKEEPER_NODES="node01 node02 node03"

case $1 in

"start")

    for node in $ZOOKEEPER_NODES

    do

        echo "Starting Zookeeper on $node"

        ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh start"

    done

    ;;

"stop")

    for node in $ZOOKEEPER_NODES

    do

        echo "Stopping Zookeeper on $node"

        ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh stop"

    done

    ;;

"status")

    for node in $ZOOKEEPER_NODES

    do

        echo "Checking Zookeeper status on $node"

        ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh status"

    done

    ;;

*)

    echo "Usage: $0 {start|stop|status}"

    exit 1

    ;;

esac

说明:

  • 该脚本读取集群节点列表(ZOOKEEPER_NODES),并通过SSH连接到每个节点执行启动、停止或状态检查命令。

使用方式:

    • 启动:./zk_manage.sh start- 停止:./zk_manage.sh stop- 查看状态:./zk_manage.sh status

设置可执行权限:

chmod +x zk_manage.sh

5. 客户端命令行操作

Zookeeper 提供了命令行客户端工具 zkCli.sh,可以用来连接到 Zookeeper 服务并执行命令,如创建、删除 Znodes,查询节点数据等。

连接 Zookeeper:

在任意节点上使用 zkCli.sh 连接到 Zookeeper 服务:

/usr/zookeeper/apache-zookeeper-3.5.7-bin/bin/zkCli.sh -server node01:2181

常用命令:

**1.**创建节点

create /myNode "hello zookeeper"

**2.**查询节点数据

get /myNode

3.查看子节点

ls /

4.修改节点数据

set /myNode "new data"

5.删除节点

delete /myNode

6.退出客户端

quit

6. Java-API编写

步骤 1: 创建 Java 项目

  1. 创建新项目: - 打开Eclipse,选择 File -> New -> Java Project。- 输入项目名称(例如:ZookeeperExample)。- 点击 Finish 完成项目创建。

步骤 2: 下载 Zookeeper JAR 文件

  1. 下载Zookeeper的JAR包:解压Zookeeper的JAR包

通常解压后会看到 zookeeper-<version>/lib 目录,其中包含了多个依赖库(包括Zookeeper的核心库和其他所需依赖)。

  1. 主要的JAR文件

需要包括 zookeeper-<version>.jar(位于解压目录的根目录)和 lib 文件夹中的所有JAR文件。

步骤 3 在 Eclipse 中添加 Zookeeper 库

  1. 将 Zookeeper 的 JAR 文件添加到项目中

右键点击你的项目(例如:ZookeeperExample),选择 Build Path -> Configure Build Path。

选择 Libraries 选项卡,然后点击 Add External JARs。

导航到你下载并解压的Zookeeper目录,选择 zookeeper-<version>.jar 和 lib 目录中的所有JAR文件。

点击 Apply and Close,这将把Zookeeper的依赖库添加到你的项目中。

步骤 4: 编写 Zookeeper Java API 代码

创建 Java 类

右键点击 src 目录,选择 New -> Class。

将类命名为 ZookeeperExample,并勾选 public static void main(String[] args)。

点击 Finish。

编写 Zookeeper 示例代码

在 ZookeeperExample.java 中,输入以下代码:

package com.example;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class ZookeeperExample {

    private static ZooKeeper zooKeeper;

    private static ZooKeeperConnection conn;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {

        // 连接到 Zookeeper

        conn = new ZooKeeperConnection();

        zooKeeper = conn.connect("localhost");  // 假设 Zookeeper 在本地运行,端口为默认的 2181

        // 创建 Znode

        String path = "/myNode";

        byte[] data = "Hello Zookeeper".getBytes();

        create(path, data);

        // 获取 Znode 数据

        System.out.println("Data of Znode: " + new String(getData(path)));

        // 修改 Znode 数据

        byte[] newData = "New data".getBytes();

        setData(path, newData);

        // 获取修改后的 Znode 数据

        System.out.println("Updated data of Znode: " + new String(getData(path)));

        // 删除 Znode

        delete(path);

        // 关闭连接

        conn.close();

    }

    // 创建 Znode

    public static void create(String path, byte[] data) throws KeeperException, InterruptedException {

        zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    }

    // 获取 Znode 数据

    public static byte[] getData(String path) throws KeeperException, InterruptedException {

        Stat stat = zooKeeper.exists(path, true);

        return zooKeeper.getData(path, false, stat);

    }

    // 修改 Znode 数据

    public static void setData(String path, byte[] data) throws KeeperException, InterruptedException {

        zooKeeper.setData(path, data, zooKeeper.exists(path, true).getVersion());

    }

    // 删除 Znode

    public static void delete(String path) throws KeeperException, InterruptedException {

        zooKeeper.delete(path, zooKeeper.exists(path, true).getVersion());

    }

}

class ZooKeeperConnection {

    private ZooKeeper zoo;

    // 连接到 Zookeeper 服务器

    public ZooKeeper connect(String host) throws IOException, InterruptedException {

        zoo = new ZooKeeper(host, 5000, we -> {});

        return zoo;

    }

    // 关闭连接

    public void close() throws InterruptedException {

        zoo.close();

    }

}

步骤 5: 运行代码

  1. 确保 Zookeeper 服务已经启动

在本地或服务器上启动Zookeeper服务:

/usr/zookeeper/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

  1. 在 Eclipse 中运行 Java 项目

右键点击 ZookeeperExample.java,选择 Run As -> Java Application。

控制台将会显示程序的输出,执行成功后,你将看到Znode被创建、读取、更新和删除的操作。

步骤 6: 验证输出

在Eclipse的控制台中,你应该看到类似如下的输出:

Data of Znode: Hello Zookeeper

Updated data of Znode: New data

这表明程序已经成功连接到Zookeeper,创建了Znode /myNode,并对其数据进行了读写操作。

大数据组件-Fink

1. Flink Yarn 模式搭建

在大数据环境中,Flink可以通过YARN进行集群部署管理。YARN作为资源管理器,负责分配和管理Flink作业的资源。

**Step 1: **下载和解压Flink

软件包 放入集群 解压 /usr/flink

cd /usr/flink/flink-1.14.0/

**Step 2: **配置Flink

1.修改配置文件:进入Flink安装目录,并修改 flink-conf.yaml 文件。

vi conf/flink-conf.yaml

在flink-conf.yaml中添加或修改以下配置:

jobmanager.memory.process.size: 1024m

taskmanager.memory.process.size: 2048m

taskmanager.numberOfTaskSlots: 4

parallelism.default: 2

2.** YARN配置**:确保Hadoop配置路径在Flink中可见,并设置HADOOP_CONF_DIR。

export HADOOP_CONF_DIR=/usr/hadoop/Hadoop-3.1.3/etc/hadoop

**Step 3: **启动YARN集群

确保你的Hadoop集群已经启动并运行:

start-dfs.sh

start-yarn.sh

**Step 4: **通过YARN启动Flink

你可以通过以下命令启动Flink的JobManager和TaskManager,交由YARN管理:

./bin/yarn-session.sh -d -n 3 -s 4 -jm 1024m -tm 2048m

  • -n 表示启动的TaskManager数量
  • -s 表示每个TaskManager的插槽数量
  • -jm 表示JobManager使用的内存
  • -tm 表示每个TaskManager使用的内存

此命令会在YARN上启动Flink会话模式,Flink作业可以提交到YARN集群中。

【Standalone模式】

vi /etc/profile

配置环境变量

---Flink

export FLINK_HOME=/usr/flink/flink-1.14.0/

export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile

$FLINK_HOME

2.配置conf文件

cd conf

配置flink-conf.yaml

#1. 配置jobmanager rpc 地址

jobmanager.rpc.address: master

#2. 修改taskmanager内存大小,可改可不改

taskmanager.memory.process.size: 2048m

#3. 修改一个taskmanager中对于的taskslot个数,可改可不改

taskmanager.numberOfTaskSlots: 4

#修改并行度,可改可不改

parallelism.default: 4

3.vi masters

master:8081

vi workers

master

4.配置zoo

新建snapshot存放的目录,在flink目录下建

mkdir tmp

cd tmp

mkdir zookeeper

#修改conf下zoo.cfg配置

vi zoo.cfg

#snapshot存放的目录

dataDir=/usr/flink/flink-1.14.0/tmp/zookeeper

#配置zookeeper 地址

server.1=192.168.222.171:2888:3888

5.启动集群

bin/start-cluster.sh

192.168.222.171:8081进入浏览器界面

2. Flink 运行架构

Flink的运行架构基于JobManagerTaskManager,并结合分布式集群来处理作业。

  • JobManager:管理作业的执行,是Flink的调度器,负责协调各个任务,管理故障恢复。
  • TaskManager:执行实际的数据处理任务。它们接受来自JobManager的任务指令,负责执行并报告任务状态。
  • Dispatcher:负责提交作业并与外部接口进行通信。
  • ResourceManager:负责管理集群资源,并向JobManager分配资源。

Flink可以在不同模式下运行:

  • Standalone模式:Flink可以单独运行在一台或多台机器上。
  • YARN模式:Flink作业运行在YARN集群上,动态分配资源。
  • Kubernetes模式:Flink作业运行在Kubernetes集群上。

3. DataStream API

DataStream API 是Flink的核心API之一,用于处理无界数据流。以下是使用DataStream API进行简单数据流处理的代码示例。

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

public class WordCount {

    public static void main(String[] args) throws Exception {

        // 设置执行环境

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从socket读取数据,假设监听在localhost:9999

        env.socketTextStream("localhost", 9999)

                .flatMap(new Tokenizer())

                .keyBy(value -> value.f0)

                .sum(1)

                .print();

        // 执行流处理作业

        env.execute("Flink Streaming Word Count");

    }

    // FlatMap function,用于将输入数据分割成单词

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            // 分割单词并计数

            for (String word : value.split("\\s")) {

                if (word.length() > 0) {

                    out.collect(new Tuple2<>(word, 1));

                }

            }

        }

    }

}

步骤:

  1. 设置执行环境:使用 StreamExecutionEnvironment 创建Flink执行环境。
  2. 读取数据流:从Socket(localhost:9999)读取流式数据。
  3. 定义FlatMapFunction:用于将接收到的行分割成单词,并创建 Tuple2 形式的(word, count) 对象。
  4. 按key进行分组:通过 keyBy 操作,根据单词(Tuple2.f0)进行分组。
  5. 聚合:使用 sum(1) 对每个单词出现次数进行累加。
  6. 输出结果:使用 print() 输出到控制台。
  7. 启动执行:通过 env.execute 启动作业。

4. Flink中的时间和窗口

Flink有两种时间语义:事件时间处理时间。窗口是Flink处理无界数据流的核心机制,允许将数据划分为时间段来进行聚合处理。

时间类型:

**1.**事件时间:数据发生的实际时间,通常从数据源携带的时间戳中提取。

**2.**处理时间:数据到达Flink处理系统的时间。

窗口类型:

  1. 滚动窗口(Tumbling Window):将数据分割成不重叠的固定大小的窗口。
  2. 滑动窗口(Sliding Window):窗口有一个固定的大小和滑动步长,允许窗口重叠。
  3. 会话窗口(Session Window):根据用户的行为进行窗口划分,窗口之间没有固定的大小。

窗口示例:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

public class WindowExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取数据流

        DataStream<String> textStream = env.socketTextStream("localhost", 9999);

        // 进行窗口操作,按单词分组,并在时间窗口内进行计数

        textStream

                .flatMap(new Tokenizer())

                .keyBy(value -> value.f0)

                .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 使用滚动窗口,每10秒聚合一次

                .sum(1)

                .print();

        env.execute("Window Example");

    }

}

在此示例中,使用 TumblingEventTimeWindows 创建了一个10秒的滚动窗口。

**5. **处理函数(Process Function)

ProcessFunction 是Flink提供的低级别的处理API,允许开发者对数据流进行灵活的操作。

示例代码:Process Function的使用

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.util.Collector;

public class ProcessFunctionExample {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.socketTextStream("localhost", 9999)

                .process(new MyProcessFunction())

                .print();

        env.execute("ProcessFunction Example");

    }

    public static class MyProcessFunction extends ProcessFunction<String, String> {

        @Override

        public void processElement(String value, Context ctx, Collector<String> out) throws Exception {

            // 输出原始数据并添加前缀

            out.collect("Processed: " + value);

        }

    }

}

说明:

  • processElement 方法允许对流中的每个元素进行处理。
  • Collector 用于收集处理后的数据并输出。

大数据组件-Flume、Kafka

  1. 安装Flume

Step 1: 解压Flume安装包

  1. 将Master节点Flume安装包解压到/opt/soft目录下,创建好/usr/flume

tar -zxvf /opt/soft/apache-flume-1.9.0-bin.tar.gz -C /usr/flume

Step 2: 配置环境变量

  1. 配置环境变量

vi /etc/profile

添加以下内容:

#FLUME_HOME

export FLUME_HOME=/usr/flume/flume-1.9.0

export PATH=$PATH:$FLUME_HOME/bin

  1. 配置文件(位于:/usr/flume/flume-1.9.1/conf)

将 flume-env.sh.template 复制更名为 flume-env.sh

cp flume-env.sh.template flume-env.sh

并添加以下内容:

vi flume-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_212

使配置文件生效

source /etc/profile

前提:

关闭hbase配置文件

将 hbase 的 hbase.env.sh 的一行配置注释掉

#Extra Java CLASSPATH elements. Optional.

#export HBASE_CLASSPATH=/home/hadoop/hbase/conf

验证

输入命令:

flume-ng version

三、任务

启动Flume传输Hadoop日志(namenode或datanode日志),查看HDFS中/tmp/flume目录下生成的内容

1,将hadoop与flume中 guava-27.0-jre.jar 包版本保持一致( 因为hadoop中此包版本是27而flume中版本是11 )

首先删除Flume自带的旧版本的Guava库

rm -rf /usr/flume/flume-1.9.1/lib/guava-11.0.2.jar

然后将hadoop里面的此包复制给flume

cp /usr/hadoop/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /usr/flume/flume-1.9.0/lib/

2.在 /usr/flume/flume-1.9.0/conf下新建文件 conf-file ,并写配置文件

创建Flume配置文件(conf-file)

Flume的工作方式是通过定义Source、Channel和Sink来收集、传输和存储数据。这些术语的含义如下:

Source(源):从某个位置读取数据(在这里是Hadoop的日志文件)。

Channel(通道):数据在从Source到Sink传输的过程中,需要通过Channel进行中转。

Sink(接收器):将数据存储到目标位置(在这里是HDFS)。

在/usr/flume/flume-1.9.0/conf目录下创建一个名为conf-file的Flume配置文件,内容如下:

a1.sources=r1

a1.sinks=k1

a1.channels=c1

Source配置: 使用TAILDIR类型读取Hadoop的日志文件

a1.sources.r1.type=TAILDIR

a1.sources.r1.filegroups=as

a1.sources.r1.filegroups.as=/usr/hadoop/hadoop-3.1.3/logs/hadoop-root-namenode-master.log

Channel配置: 使用内存通道存储数据

a1.channels.c1.type=memory

Sink配置: 将数据写入HDFS

a1.sinks.k1.type=hdfs

a1.sinks.k1.hdfs.path=hdfs://master:9820/tmp/flume

绑定Source和Channel

a1.sources.r1.channels=c1

绑定Sink和Channel

a1.sinks.k1.channel=c1

3.启动Flume传输Hadoop日志,查看HDFS中/tmp/flume目录下生成的文件

前提:启动hadoop集群

启动命令:

flume-ng agent -c conf -f conf-file -n a1 -Dflume.root.logger=INFO,console &

查看HDFS中/tmp/flume目录下生成的文件命令:

hdfs dfs -ls /tmp/flume

停止它:

通过ps命令查找并停止Flume进程

Step 1: 查找Flume进程ID (PID)

使用 ps 命令查找运行中的Flume进程:

ps -ef | grep flume

你将看到类似如下输出:

user 12345 6789 0 10:23 ? 00:00:15 java -cp /opt/flume/lib/... flume-ng agent -c conf ...

user 12350 6789 0 10:23 pts/1 00:00:00 grep --color=auto flume

这里,12345 是Flume agent进程的PID(进程ID)。你需要记住这个PID。

Step 2: 停止Flume进程

使用 kill -9命令来强制停止这个进程:

kill -9 12345

bin/flume-manage.sh stop force

  1. 安装Kafka

1、使用服务器名称进行通信

编辑 /etc/hosts 文件,在最后添加如下内容

192.168.222.171 node01

192.168.222.172 node02

192.168.222.173 node03

2、关闭防火墙

systemctl stop firewalld

3、安装 Zookeeper 集群

4.上传并解压文件到指定位置 tar -zxvf

5.配置

vi config/server.properties

添加内容:

与 zookeeper 的 myid 一样,每个实例拥有唯一的 ID

broker.id=1

日志文件,数据文件目录

log.dirs=/usr/kafka/kafka_2.12-3.0.0/logs

zookeeper 集群,使用逗号分隔

zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka

6.配置环境变量

编辑 /etc/profile 文件,添加如下内容至文件末尾

kafka

export KAFKA_HOME=/usr/kafka/kafka_2.12-3.0.0

export PATH=$PATH:$KAFKA_HOME/bin

使配置生效

source /etc/profile

7.配置其它两台服务器

使用 scp 命令将 kafka 文件夹发送至其它两台服务器

scp /usr/kafka/kafka_2.12-3.0.0 @node02:/usr/kafka

scp /usr/kafka/kafka_2.12-3.0.0 @node03:/usr/kafka

修改 server.properties 配置文件中的** broker.id**

部署完成,启动服务。

8.Kafka基本操作

先启动zookeeper服务

启动服务

kafka-server-start.sh /usr/kafka/kafka_2.12-3.0.0/config/server.properties &

3. Flume端口监听

设置Flume监听端口

在Flume的配置文件中设置source为netcat类型,并绑定到一个特定的IP地址和端口,这样Flume会在这个端口监听传入的数据。例如:

agent.sources.source1.type = netcat

agent.sources.source1.bind = 0.0.0.0

agent.sources.source1.port = 44444

启动Flume后,它会开始在44444端口监听。你可以使用telnet或netcat工具发送数据到这个端口:

telnet localhost 44444

  1. Kafka启停脚本编写

注意:

1.需先在/etc/hosts 文件中配置 IP 映射。

2.启动kafka集群前,要先启动zoopkeeper集群。

一、kafka集群启动脚本

#!/bin/bash

BROKERS="node01 node02 node03" 

KAFKA_HOME="/usr/kafka/kafka_2.12-3.0.0" 

KAFKA_NAME=”kafka_2.12-3.0.0”

for i in ${BROKERS}

do

    echo "Starting ${KAFKA_NAME} on ${i} "

    ssh ${i} "source /etc/profile; nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &"

    if [[ $? -ne 0 ]]; then

        echo "Starting ${KAFKA_NAME} on ${i} is  ok"

    fi

done

echo All ${KAFKA_NAME} are started

exit 0

作用:通过ssh远程连接到当前服务器${i}(即hadoop01、hadoop02或hadoop03),并执行Kafka的启动命令。

详细解释

  • ssh ${i}:通过SSH连接到当前的服务器${i}。
  • "source /etc/profile":远程服务器执行source /etc/profile,加载环境变量,确保Kafka所需的环境变量已加载。
  • nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &:启动Kafka服务器,具体操作为: - nohup:使命令在后台运行,即使SSH连接断开,Kafka仍会继续运行。- sh ${KAFKA_HOME}/bin/kafka-server-start.sh:调用Kafka的启动脚本。- ${KAFKA_HOME}/config/server.properties:指定Kafka的配置文件位置。- > /dev/null 2>&1 &:将所有输出(包括标准输出和标准错误输出)重定向到/dev/null,即忽略所有输出,并在后台运行进程。

二、kafka集群停止脚本

#!/bin/bash

BROKERS="node01 node02 node03" 

KAFKA_HOME="/usr/kafka/kafka_2.12-3.0.0" 

KAFKA_NAME="kafka_2.12-3.0.0"

for i in $BROKERS

do

    echo "Stopping ${KAFKA_NAME} on ${i}..."

    # 使用 ssh 执行 Kafka 停止命令,并捕获其返回状态

    ssh ${i} "source /etc/profile; bash ${KAFKA_HOME}/bin/kafka-server-stop.sh"

    # 检查 ssh 命令是否成功

    if [[ $? -eq 0 ]]; then

        echo "Kafka stopped successfully on ${i}"

    else

        echo "Failed to stop Kafka on ${i}"

    fi

done

echo "All ${KAFKA_NAME} are stopped"

exit 0

为保证这两个脚本能够执行,需要设置可执行权限:

chmod +x start-kafka.sh

chmod +x stop-kafka.sh

三、启停合并脚本

#! /bin/bash

case $2 in

"start")

    for i in $(cat $1)

    do

        echo " --------启动 $i Kafka-------"

        # 用于KafkaManager监控,JMX_PORT 设置

        ssh $i "export JMX_PORT=9988; /usr/local/kafka/bin/kafka-server-start.sh -daemon /opt/local/kafka/config/server.properties"

    done

    ;;

"stop")

    for i in $(cat $1)

    do

        echo " --------停止 $i Kafka-------"

        ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh"

    done

    ;;

*)

    echo "Usage: $0 <hosts_file> <start|stop>"

    exit 1

    ;;

esac

使用方法:

启动Kafka

./script.sh hosts.txt start

其中 hosts.txt 包含你要启动Kafka的所有服务器名称或IP地址,每行一个。例如:

node01

node02

node03

停止Kafka

./script.sh hosts.txt stop

  1. Kafka命令行操作

5.1 创建主题

创建一个名为test的主题,分区数为1,副本数为1:

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

5.2 查看主题列表

查看当前集群中的主题列表:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

5.3 发送消息

使用命令行工具向主题test发送消息:

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

然后你可以在控制台输入消息,按Enter发送。

5.4 消费消息

在另一个终端窗口中,使用命令行工具消费test主题的消息:bash

复制代码

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

此命令将从主题的开始读取所有消息,并显示在控制台上。

5.5 查看主题详情

查看主题test的详细信息,包括分区数、副本数等:

bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092

大数据组件-ClickHouse【考纲新增】

1. ClickHouse单机安装

安装步骤

Step 1: 安装依赖

更新系统并安装必要的依赖:

sudo yum update -y

sudo yum install -y epel-release

sudo yum install -y libicu libtool-ltdl openssl

Step 2: 安装ClickHouse

  1. 安装** clickhouse-common-static**

该包包含ClickHouse的核心文件和二进制:

sudo rpm -ivh clickhouse-common-static-21.7.3.14-2.x86_64.rpm

安装** clickhouse-common-static-dbg**

这是用于调试的包,可以在排查问题时使用:

sudo rpm -ivh clickhouse-common-static-dbg-21.7.3.14-2.x86_64.rpm

安装** clickhouse-client**

安装ClickHouse客户端,用于连接并操作ClickHouse数据库:

sudo rpm -ivh clickhouse-client-21.7.3.14-2.noarch.rpm

安装** clickhouse-server**

安装ClickHouse服务器,这个包是ClickHouse的核心服务:

sudo rpm -ivh clickhouse-server-21.7.3.14-2.noarch.rpm

  1. 启动ClickHouse服务:

sudo systemctl start clickhouse-server

  1. 设置ClickHouse服务在系统启动时自动启动:

sudo systemctl enable clickhouse-server

Step 3: 验证安装

安装完成后,使用clickhouse-client命令连接到本地的ClickHouse实例,并执行简单的查询验证安装:

clickhouse-client

执行查询命令查看版本:

SELECT version();

返回ClickHouse的版本信息,表示安装成功。

2. 创建表、插入数据

2.1 创建数据库和表

在ClickHouse中,可以根据需要创建数据库和表。首先,创建一个数据库:

CREATE DATABASE test_db;

使用刚创建的数据库:

USE test_db;

创建一个简单的表,包含用户ID、姓名和年龄三个字段:

CREATE TABLE users (

    id UInt32,

    name String,

    age UInt8

) ENGINE = MergeTree()

ORDER BY id;
  • MergeTree 是ClickHouse中常用的表引擎,支持高效的读写和索引。

  • ORDER BY 指定了表的排序键,在插入数据时自动进行排序。

2.2 插入数据

向表中插入一些数据,可以使用SQL的INSERT语句:

INSERT INTO users (id, name, age) VALUES (1, 'Alice', 25), (2, 'Bob', 30), (3, 'Charlie', 35);

查看表中的数据:

SELECT * FROM users;

输出结果类似于:

id

name

age

1

Alice

25

2

Bob

30

3

Charlie

35

3. 查询、修改、删除

3.1 查询数据

ClickHouse支持标准的SQL查询。常见查询操作包括:

查询所有数据:

SELECT * FROM users;

根据条件查询:

查询年龄大于30的用户:

SELECT * FROM users WHERE age > 30;

查询特定字段:

查询用户的姓名和年龄:

SELECT name, age FROM users;

排序查询:

按年龄降序排列:

SELECT * FROM users ORDER BY age DESC;

3.2 修改数据

在ClickHouse中直接修改已有的数据比较特殊,ClickHouse本身不支持传统的UPDATE操作。

可以通过删除并重新插入的方式进行修改,或使用ALTER语句来调整表的结构。

例如,要修改ID为2的用户年龄为32,可以先删除该条数据,再插入新值:

删除数据:

ALTER TABLE users DELETE WHERE id = 2;

插入新数据:

INSERT INTO users (id, name, age) VALUES (2, 'Bob', 32);

查询更新后的数据:

SELECT * FROM users WHERE id = 2;

输出结果:

id

name

age

2

Bob

32

3.3 删除数据

删除单条数据:

删除ID为3的用户:

ALTER TABLE users DELETE WHERE id = 3;

删除表中的所有数据:

TRUNCATE TABLE users;

删除表:

如果你不再需要某个表,可以将其删除:

DROP TABLE users;

4. 常见操作总结

操作

示例命令

创建数据库

CREATE DATABASE test_db;

创建表

CREATE TABLE users (id UInt32, name String, age UInt8) ENGINE = MergeTree() ORDER BY id;

插入数据

INSERT INTO users (id, name, age) VALUES (1, 'Alice', 25);

查询所有数据

SELECT * FROM users;

条件查询

SELECT * FROM users WHERE age > 30;

修改数据

先ALTER TABLE users DELETE WHERE id = 2;

然后INSERT INTO users (id, name, age) VALUES (2, 'Bob', 32);

删除单条数据

ALTER TABLE users DELETE WHERE id = 3;

删除表

DROP TABLE users;


本文转载自: https://blog.csdn.net/m0_74070923/article/details/142730064
版权归原作者 小伍_Five 所有, 如有侵权,请联系我们删除。

“备战大数据比赛:个人经验与实战技巧分享”的评论:

还没有评论