基础配置
创建虚拟机,后首先测试是否ping通
ping www.baidu.com
1.修改服务器名称
修改主机名:
$ hostnamectl set-hostname master01 #临时改名,重启还原
bash
永久修改:
修改配置文件
vi /etc/hostname 将内容改为master,保存退出
vi /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=master 修改为新名
使每一台机器都能通过名字识别另一个机器【配置 hosts实现主机名和ip地址映射】
vi /etc/hosts
192.168.222.171 Competition2024master Competition2024master.root
192.168.222.172 Competition2022node1 Competition2022node1.root
192.168.222.173 Competition2022node2 Competition2022node2.root
2.大数据集群网络配置
2.1编辑虚拟网络
首先以管理员身份进入,在页面左上角寻找“编辑”,点击进入后选择“虚拟网络编辑器”,点击“添加网络”,选择”VMnet8”,将”仅主机模式”改为“NAT模式”,
第一步:
子网IP:192.168.222.0 #子网的最后一位一定是0
子网掩码:255.255.255.0
第二步:DHCP设置
起始IP地址:192.168.222.128
结束IP地址:192.168.222.254
第三步:NAT设置
网关IP:192.168.222.2
以上三步做完点击“应用”,再点击“确定”
现在把所有虚拟机都打开设置里面的IP
2.2虚拟机网络配置与远程连接
网络配置
一.先对master进行操作
1.cd /etc/sysconfig/network-scripts
2.vi ifcfg-ens33
3.BOOTPROTO=dhcp 改为BOOTPROTO=static #静态的意思
4.ONBOOT=no 改为ONBOOT=yes
5.再添加5条记录
IPADDR=192.168.222.171
GATEWAY=192.168.222.2
NETMASK=255.255.255.0
DNS1=192.168.222.2
DNS2=8.8.8.8
#IPADDR 是IP地址的意思
#GATEWAY 是网关的意思
#NETWORK是子网掩码的意思
6.网络服务重启
service network restart
关闭防火墙
systemctl stop firewalld
克隆虚拟机--
二.再对其他三个虚拟机进行操作
以上操作相同不同点是
IPADDR分别为
IPADDR=192.168.222.172
IPADDR=192.168.222.173
IPADDR=192.168.222.174
集群时间同步
时间同步的前提:网络配置完成
时间同步的分工
master 作为时间同步的服务器,slave1,slave2和masterbak向master同步
准备工作:挑选时区
tzselect 5 9 1 1
可在profile 文件中添加命令,使时区保存
vi /etc/profile 最低部添加 TZ='Asia/Shanghai'; export TZ
安装时间同步的软件ntp
【注】三台机器都要安装:
yum install -y ntp 直接下载失效,首先配置
禁用镜像列表(mirrorlist)并启用直接链接(baseurl)
方法一:
到网页
mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup
上传Centos-7.repo
mv /etc/yum.repos.d/Centos-7.repo /etc/yum.repos.d/CentOS-Base.repo
方法二
sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo
sed -i s/^#.*baseurl=http/baseurl=https/g /etc/yum.repos.d/*.repo
sed -i s/^mirrorlist=http/#mirrorlist=https/g /etc/yum.repos.d/*.repo
方法三:
sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 显示wget找不到命令
sudo yum clean all
sudo yum makecache
3.修改ntp 配置文件
ntpq -c version
ntpq 4.2.6p5@1.2349-o Tue Jun 23 15:38:21 UTC 2020 (1)
只需要在master中做:
vi /etc/ntp.conf
在末尾追加
server 127.0.0.1
fudge 127.0.0.1 stratum 10
注意先关闭防火墙-----一般没关
systemctl stop firewalld
sudo ntpdate pool.ntp.org 手动同步时间
sudo firewall-cmd --list-all 查看状态
4.重启时间服务
systemctl restart ntpd.service
5.在slave1和slave2上去更新
ntpdate master #以master为标准进行时间的校对【无用】
SSH免密登录
1.三台机器上生成公钥-私钥
ssh-keygen -t dsa #注意事项 ssh-keygen空格-t空格dsa
2.分节点先切换到.ssh目录下:
cd /root/.ssh #注意事项 不要忽略ssh前面的点(“.”)
3.将公钥传递过去:
scp id_dsa.pub root@master:/root/.ssh/s1.pub
#s1.pub一定要加,意思是传递过去 之前名为id_dsa.pub,和master中公钥名称相同
s2.pub,mb.pub 操作同上
4.在master中将三个公钥合并成一个公钥包
先切换到/root/.ssh目录下面:
cd /root/.ssh
将三个公钥合并成一个公钥包:
cat id_dsa.pub mb.pub s1.pub s2.pub >> authorized_keys
5.将master中合成的公钥包分别传递到slave1和slave2的/root/.ssh下:
scp authorized_keys root@slave1:/root/.ssh
scp authorized_keys root@slave2:/root/.ssh
scp authorized_keys root@masterbak:/root/.ssh
#传递过去后不用另起别名了,因为/root/.ssh下面没有叫authorized_keys的
6.现在可以免密登录了
假设现在在master中
ssh slave1 #免密登录到slave1
exit #退出slave1,重回master中
Java配置
将JDK安装到特定的目录下:
mkdir -p /usr/java #创建一个目录用来安装jdk
5.安装:
tar -zxvf jdk-8u17-linux-x64.tar.gz -C /usr/java #这里的C是大写
名称
Java -version : jdk1.8.0_171
6.vi /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_212
export CLASSPATH=.:$JAVA_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin
从打红色箭头的一行开始编辑,一共编辑以上四行,然后退出
7.让环境变量生效
source /etc/profile
大数据组件-Hadoop
- Hadoop 完全分布式搭建
创建/usr/Hadoop
tar -zxvf -C 解压文件到/usr/hadoop中
1.配置环境变量
vi /etc/profile
#hadoop
export HADOOP_HOME=/usr/hadoop/hadoop-3.1.3
export CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib
export PATH=$PATH:$HADOOP_HOME/bin
使生效 source /etc/profile
4.3 hadoop集群配置
在hadoop-3.1.3 位置下有 etc/hadoop 位置
cd etc/hadoop 进入
1.vi hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_212
2.vi core-site.xml
<property>
<name>fs.default.name</name>
<value>hdfs://master:9820</value>
</property>
<property>
<name>hadoop.temp.dir</name>
<value>/usr/hadoop/hadoop-3.1.3/tmp</value>
</property>
**3.vi yarn-site.xml **
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
4.vi workers【关键步骤】
填入
master
slave1
slave2
5.vi hdfs-site.xml
<property>
<name>dfs.replication</name> #Block副本数,默认3
<value>2</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/usr/hadoop/hadoop-3.1.3/hdfs/name</value>
<final>true</final>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/usr/hadoop/hadoop-3.1.3/hdfs/data</value>
<final>true</final>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:9001</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>192.168.222.171:9870</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
C:\Windows\System32\drivers\etc\hosts 修改好对应配置,否则hdfs Web端无法预览文件
6.vi mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
HDFS系统
分发:
scp -r /usr/hadoop root@slave1:/usr
scp -r /usr/Hadoop root@slave2:/usr
hadoop namenode -format 格式化
下面对应hdfs文件夹直接删除才可重新
master:
开:
hdfs --daemon start namenode
yarn --daemon start resourcemanager
hdfs --daemon start secondarynamenode
关:
hdfs --daemon stop namenode
yarn --daemon stop resourcemanager
hdfs --daemon stop secondarynamenode
slaves:
开:
hdfs --daemon start datanode
yarn --daemon start nodemanager
关:
hdfs --daemon stop datanode
yarn --daemon stop nodemanager
注:不可start-all一次性启动了
jps查看进程
即成功
http://192.168.222.171:50070/ 浏览器查看namenode信息
还有9001 查看2nn 信息
2nn信息显示不出来,解决方案:
cd etc/hadoop
find /usr/hadoop/hadoop-3.1.3/ -name dfs-dust.js
vi /usr/hadoop/hadoop-3.1.3/share/hadoop/hdfs/webapps/static/dfs-dust.js
:se nu显示行数
61行修改为:return new Date(Number(v)).toLocaleString();
清除网页缓存 刷新后显示
打开网站之前注意关闭防火墙(否则连不上)systemctl stop firewalld
关闭安全模式:
hdfs dfsadmin -safemode enter/leave
- HDFS 基本操作命令、Java-API 编写
2.1 HDFS 基本操作命令
- 上传文件到 HDFS:
hdfs dfs -put /path/to/local/file /path/to/hdfs/directory
- 从 HDFS 下载文件:
hdfs dfs -get /path/to/hdfs/file /path/to/local/directory
- 列出 HDFS 目录:
hdfs dfs -ls /path/to/hdfs/directory
- 删除 HDFS 中的文件或目录:
hdfs dfs -rm -r /path/to/hdfs/directory
- 查看文件内容:
hdfs dfs -cat /path/to/hdfs/file
2.2 HDFS Java API 编写
使用 Hadoop Java API 可以操作 HDFS。以下是一个简单的 HDFS 操作示例,包括文件上传、下载和读取。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.InputStream;
import java.net.URI;
public class HDFSExample {
public static void main(String[] args) throws Exception {
// 1. 创建 HDFS 配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9000");
// 2. 获取 HDFS 文件系统对象
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf, "hadoop");
// 3. 上传文件到 HDFS
fs.copyFromLocalFile(new Path("/local/path/file.txt"), new Path("/hdfs/path/file.txt"));
System.out.println("File uploaded to HDFS.");
// 4. 读取 HDFS 文件内容
InputStream in = fs.open(new Path("/hdfs/path/file.txt"));
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = in.read(buffer)) > 0) {
System.out.write(buffer, 0, bytesRead);
}
// 5. 下载文件到本地
fs.copyToLocalFile(new Path("/hdfs/path/file.txt"), new Path("/local/path/downloaded.txt"));
System.out.println("File downloaded from HDFS.");
// 6. 关闭 HDFS 文件系统
fs.close();
}
}
3. MapReduce 编程、数据预处理、指标计算
3.1 MapReduce 基本概念
MapReduce 是 Hadoop 的分布式计算框架,用于大规模数据处理。它包括两个阶段:
- Map 阶段:将输入数据分割成键值对,并执行处理。
- Reduce 阶段:对 Map 阶段的输出进行聚合或汇总。
3.2 MapReduce 编程示例
以下是一个简单的 WordCount 示例:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.3 数据预处理和指标计算
使用 MapReduce 可以对原始数据进行预处理和计算常见指标,比如去重、排序、聚合等。你可以通过自定义 Mapper 和 Reducer 实现这些操作。
4. 任务提交、优化、排错
4.1 提交 MapReduce 任务
将编写的 MapReduce 任务打包为 JAR 文件,然后提交到集群上运行:
hadoop jar WordCount.jar WordCount /input/path /output/path
4.2 优化
- 调优 Mapper 和 Reducer 数量:
增加 mapreduce.job.maps 和 mapreduce.job.reduces 的数量,以提高并行度。
- 压缩 Map 输出:
启用 Map 输出压缩来减少网络传输:
mapreduce.map.output.compress=true
- 使用合适的数据格式:
使用更高效的文件格式(如 Parquet、ORC)以减少 I/O 开销。
- 数据本地化:
确保计算尽可能靠近数据存储节点,减少网络传输。
4.3 排错
- 检查日志文件:
Hadoop 任务的日志可以帮助调试。日志通常存储在 HDFS 中的 logs 目录或本地机器的 /tmp/hadoop-logs 目录中。
- YARN UI:
通过 YARN 的 web UI (http://master:8088) 可以查看任务的执行状态、日志和资源消耗。
- 使用 jobhistory 工具:
通过 Hadoop Job History 可以查看任务执行的详细信息。
大数据组件-HBase
1. HBase 集群搭建
1.hbase安装:master上
1.准备将windows系统中的hbase上传到/opt/soft:
cd /opt/soft #由于之前创建过了这里直接切换到/opt/soft目录
2.点击上传按钮,将hbase上传
3.将hbase安装到特定的目录下面:
mkdir -p /usr/hbase #创建一个目录用来安装hbase
4.安装:
tar -zxvf /opt/soft/hbase-1.2.4-bin.tar.gz -C /usr/hbase
配置conf/hbase-env.sh
/usr/hbase/hbase-2.4.11/conf
export HBASE_MANAGES_ZK=false
export JAVA_HOME=/usr/java/jdk1.8.0_212
export HBASE_CLASSPATH=/usr/hadoop/hadoop-3.1.3/etc/Hadoop
配置conf/hbase-site.xml
(5项)注意:机器名:端口号 与Hadoop core-site.xml主机一致
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9820/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.master</name>
<value>hdfs://master:6000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdata</value>
</property>
如果zookeeper的zoo.cfg中port:2181则不必配置下面一项。
<property><name>hbase.zookeeper.property.clientPort</name>
<value>12181</value>
</property>#----------------------
#说明clientPort需要与zookeeper一致
#-----------------------------
配置conf/regionservers
(仅master)
slave1
slave2
masterbak
5.cp hadoop的两个配置文件过来
cp hadoop的hdfs-site.xml和core-site.xml到hbase的conf下
cp /usr/hadoop/hadoop-3.1.3/etc/hadoop/hdfs-site.xml /usr/hbase/hbase-2.4.11/conf/
cp /usr/hadoop/hadoop-3.1.3/etc/hadoop/core-site.xml /usr/hbase/hbase-2.4.11/conf/
6.分发到其他三台机器
scp -r /usr/hbase root@slave1:/usr
scp -r /usr/hbase root@slave2:/usr
scp -r /usr/hbase root@masterbak:/usr
配置环境变量:(所有机器)
#my hbase
export HBASE_HOME=/usr/hbase/hbase-2.4.11
export PATH=$PATH:$HBASE_HOME/bin
生效环境变量(4台机器)
master运行hbase【先启动zookeeper,hadoop】
hbase2.4.11: bin/start-hbase.sh
jps
192.168.222.171:16010
7.hbase shell
启动hbase shell
list
显示还没有任何表
退出:quit
status 查看状态
2. HBase Shell 操作
进入 HBase Shell:
$HBASE_HOME/bin/hbase shell
DDL 操作
- 创建表:
create 'mytable', 'cf'
- 'mytable' 是表名。- 'cf' 是列族名(列族是HBase表的基本存储单元)。
- 查看表:
list
- 查看表的详细描述:
describe 'mytable'
- 删除表:
禁用表:
disable 'mytable'
删除表:
drop 'mytable'
DML 操作
- 插入数据:
put 'mytable', 'row1', 'cf:column1', 'value1'
- 'mytable' 是表名。- 'row1' 是行键(Row Key)。- 'cf:column1' 是列族和列名。- 'value1' 是要插入的值。
- 读取数据:
get 'mytable', 'row1'
- 扫描表数据:
scan 'mytable'
- 删除数据:
delete 'mytable', 'row1', 'cf:column1'
- 计数表中的行数:
count 'mytable'
3. Java-API 编写
步骤 1: 准备 HBase 和依赖 JAR
手动下载和添加 HBase、Hadoop 和 Zookeeper 的 JAR 文件。
步骤 2: 创建 Eclipse Java 项目
- 打开 Eclipse,选择 File -> New -> Java Project。
- 输入项目名称(例如:HBaseExample),并点击 Finish。
步骤 3: 手动添加 JAR 文件
- 右键点击项目(HBaseExample),选择 Build Path -> Configure Build Path。
- 在 Libraries 选项卡中点击 Add External JARs。
- 导航到 HBase、Hadoop、Zookeeper 的 lib 目录,手动添加所有 JAR 文件。 - HBase JAR:添加 hbase-client.jar、hbase-server.jar 以及 hbase-common.jar 等。- Hadoop JAR:添加 hadoop-common.jar、hadoop-hdfs.jar 等。- Zookeeper JAR:添加 zookeeper.jar。
- 确保所有必要的 JAR 文件都已添加,并点击 Apply and Close。
步骤 4: 编写 HBase Java API 代码
- 右键点击 src 文件夹,选择 New -> Class,输入类名(如 HBaseExample),并勾选 public static void main(String[] args),点击 Finish。
- 编写 HBase Java API 代码。以下是示例代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseExample {
public static void main(String[] args) throws IOException {
// 创建 HBase 配置
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "master,slave1,slave2");
config.set("hbase.zookeeper.property.clientPort", "2181");
// 创建 HBase 连接
try (Connection connection = ConnectionFactory.createConnection(config)) {
Admin admin = connection.getAdmin();
// 创建表
TableName tableName = TableName.valueOf("mytable");
if (!admin.tableExists(tableName)) {
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf"))
.build();
admin.createTable(tableDescriptor);
System.out.println("Table created: " + tableName);
}
// 插入数据
try (Table table = connection.getTable(tableName)) {
Put put = new Put(Bytes.toBytes("row1"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"), Bytes.toBytes("value1"));
table.put(put);
System.out.println("Data inserted.");
// 读取数据
Get get = new Get(Bytes.toBytes("row1"));
Result result = table.get(get);
byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1"));
System.out.println("Retrieved value: " + Bytes.toString(value));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
步骤 5: 运行代码
- 确保 HBase 集群已启动:
启动 HBase:
$HBASE_HOME/bin/start-hbase.sh
使用 jps 确认 HMaster 和 HRegionServer 是否正常运行。
在 Eclipse 中运行 Java 项目:
右键点击 HBaseExample.java 文件,选择 Run As -> Java Application。
Eclipse 将编译并运行你的代码,控制台会显示程序的输出。
步骤 6: 验证输出
执行成功后,控制台会输出类似如下信息:
Table created: mytable
Data inserted.
Retrieved value: value1
这表明程序已成功连接到 HBase,创建了表 mytable,并执行了数据插入和读取操作。
大数据组件-Hive
- Hive 搭建
步骤 1: 安装 Hadoop
Hive 依赖 Hadoop,所以首先需要安装 Hadoop。如果 Hadoop 已经安装,请确保其配置正确。
步骤 2: 安装 Hive
(slave1)【Hadoop先启动好 --- 防火墙一定得关上】
1 上传安装包
# cd /opt/soft
上传apache-hive-3.1.2-bin.tar.gz压缩包
mkdir /usr/hive
2 解压缩安装包
tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /usr/hive
3 修改hive的文件夹名称
cd /usr/hive
mv apache-hive-3.1.2-bin hive
4 添加hive的环境变量
vi /etc/profile
=======添加内容如下======
----HIVE_HOME
export HIVE_HOME=/usr/hive/hive
export PATH=$PATH:$HIVE_HOME/bin
soruce /etc/profile
cd $HIVE_HOME
5 拷贝MySQL驱动包
cd /opt/soft
上传mysql驱动包 mysql-connector-java-5.1.37.jar
将mysql驱动包拷贝到hive中
cp mysql-connector-java-5.1.37.jar $HIVE_HOME/lib
在slave1:apache-hive/conf
cp hive-env.sh.template hive-env.sh
vi hive-env.sh
修改如下内容:
Set HADOOP_HOME to point to a specific hadoop
HADOOP_HOME=/usr/hadoop/hadoop-3.1.3
slave1中
cp hive-default.xml.template hive-site.xml
vi hive-site.xml
Shift+g跳到最后
添加内容:
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/warehousedir/home</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://slave2:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/tmp/hive</value>
</property>
<property>
<name>hive.exec.local.scratchdir</name>
<value>/tmp/hive/local</value>
</property>
<property>
<name>hive.downloaded.resources.dir</name>
<value>/tmp/hive/resources</value>
</property>
slave1上启动hive服务
执行初始化Hive元数据库命令
schematool -initSchema -dbType mysql -verbos
先启动HADOOP
在/hive/ 输入命令: bin/hive (注意hive-site.xml配置是否正确,否则无法通过
启动hive
cd /usr/hive/hive/
bin/hive
hive>show databases;
hive> use default;
hive>show tables;
hive>quit;
- 基于Linux的MySQL安装、元数据存储到MySQL配置
步骤 1: 安装 MySQL(slave2)
MySQL安装包上传(在slave2上)
cd /opt/soft
1.上传到该目录mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar
2.解压缩第一层包(在slave2cd上)
cd /opt/soft
tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar
sudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm --force --nodeps
sudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm --force --nodeps
sudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm
sudo yum install -y libaio
sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm --force --nodeps
3.删除配置文件(在slave2上)
查看mysql所安装的目录(查看datadir的目录结果)
vi /etc/my.cnf 就是/var/lib/mysql这里
删除datadir指向的目录所有文件内容
cd /var/lib/mysql
sudo rm -rf ./*
4 初始化数据库
sudo mysqld --initialize --user=mysql
5 查看初始化密码( -localhost后面)
sudo cat /var/log/mysqld.log
6 启动MySQL的服务
sudo systemctl start mysqld
7 登录MySQL数据库
mysql -u root -p
Enter password:输入mysqld.log中的密码
8 修改数据库密码
mysql>set password = password("123456");
9 修改数据库任意连接
mysql>update mysql.user set host='%' where user='root'; mysql>flush privileges; mysql>quit;
10 测试mysql数据库
mysql -u root -p
Enter password:123456
mysql>quit;
show databases;
创建一个数据库名为Hive,并设置编码为latin1,用于存储Hive的元数据
create database Hive default character set latin1;
创建一个特定的数据库用户hive,密码设置为123456,并将步骤3中创建的Hive库授权给该用户
create user 'hive'@'localhost' identified by '123456';
grant create,alter,drop,select,insert,update,delete on Hive.user_info to hive@localhost identified by '123456';
- DDL、DML、查询(基本查询、分组、join语句、排序)
Hive 支持常见的 SQL DDL 和 DML 操作。以下是常用的操作示例。
DDL 操作
- 创建数据库:
CREATE DATABASE mydb;
- 创建表:
CREATE TABLE students (
id INT,
name STRING,
age INT,
gpa FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
- 删除表:
DROP TABLE students;
DML 操作
- 插入数据:
INSERT INTO students VALUES (1, 'Alice', 21, 3.5);
- 加载数据:
LOAD DATA INPATH '/path/to/data' INTO TABLE students;
查询操作
- 基本查询:
SELECT * FROM students;
- 分组查询:
SELECT age, COUNT(*) FROM students GROUP BY age;
- 排序查询:
SELECT * FROM students ORDER BY gpa DESC;
- Join 查询:
SELECT a.name, b.course_name
FROM students a
JOIN courses b
ON a.id = b.student_id;
4. 函数(单行、聚合、炸裂、窗口)、自定义函数UDF
单行函数
- 字符串函数:
SELECT UPPER(name), LOWER(name) FROM students;
- 日期函数:
SELECT CURRENT_DATE();
- 数学函数:
SELECT ABS(-10), ROUND(3.1415, 2);
聚合函数
- 求和、平均、计数:
SELECT SUM(gpa), AVG(gpa), COUNT(*) FROM students;
炸裂函数(explode)
- 用于将数组或嵌套数据展开为多行。
SELECT name, explode(array(1, 2, 3)) AS num FROM students;
窗口函数
- **ROW_NUMBER()**:返回行号。
SELECT name, gpa, ROW_NUMBER() OVER (PARTITION BY age ORDER BY gpa DESC) AS rank FROM students;
自定义函数UDF
Hive 允许用户编写自定义函数(UDF)来扩展其功能。以下是如何编写和使用自定义UDF的步骤。
编写 UDF 示例
- 编写一个简单的 Hive UDF:
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class UpperCaseUDF extends UDF {
public Text evaluate(Text input) {
if (input == null) {
return null;
}
return new Text(input.toString().toUpperCase());
}
}
- 编译 UDF: - 使用 Hadoop 和 Hive 的 jar 文件编译这个 Java 类。
javac -cp $(hadoop classpath):$(hive classpath) UpperCaseUDF.java
jar -cf UpperCaseUDF.jar UpperCaseUDF.class
- 将 UDF 注册到 Hive 中: - 将 UpperCaseUDF.jar 复制到 Hive 集群,并通过 Hive CLI 注册该 UDF。
ADD JAR /path/to/UpperCaseUDF.jar;
CREATE TEMPORARY FUNCTION to_upper AS 'UpperCaseUDF';
- 使用 UDF:
SELECT to_upper(name) FROM students;
大数据组件-Spark
- Spark三种搭建方式
1.1 Standalone模式
Standalone模式是Spark自带的集群管理器,适合快速测试或简单的生产环境。你不需要任何外部资源管理工具,Spark的Master节点充当资源调度器,Task节点作为工作节点。
优点:
- 简单易用,适合小型集群或测试环境。
- 无需外部的集群管理工具。
搭建步骤:
- 下载并解压Spark:
上传spark源码包到/opt/soft
mkdir /usr/spark
cd /opt/soft
tar -zxvf spark-3.1.1-bin-hadoop3.2.tgz -C /usr/spark
- 启动Spark Master和Worker:
./sbin/start-master.sh
./sbin/start-worker.sh spark://<master-ip>:7077
- 提交任务:
./bin/spark-submit --master spark://<master-ip>:7077 <your_spark_job.py>
安装scala [master]
[root@master ~]# mkdir /usr/scala
上传scala-2.11.12.tgz 到/opt/soft
yum install unzip
unzip -o -d /usr/scala scala-2.12.11.zip
[root@master soft]# tar -zxvf scala-2.11.12.tgz -C /usr/scala
配置环境变量
[root@master scala-2.11.12]# vi /etc/profile
加入如下三行:
#scala
export SCALA_HOME=/usr/scala/scala-2.12.11
export PATH=$PATH:$SCALA_HOME/bin
保存退出
[root@master scala-2.11.12]# source /etc/profile
[root@master scala-2.11.12]# scala -version
Scala code runner version 2.12.11 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.
分发到其他服务器,并配置环境变量
scp -r /usr/scala/ root@slave1:/usr
scp -r /usr/scala/ root@slave2:/usr
[我自己的Standalone]配置spark
进入spark安装目录下的conf
cd /usr/spark/spark-3.1.1-bin-hadoop3.2/conf/
cp workers.template slaves 并修改
vi进入:
最后一行
删掉localhost
填上
master
slave1
slave2
cp spark-env.sh.template spark-env.sh
spark-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_212
export SCALA_HOME=/usr/scala/scala-2.12.11
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/usr/hadoop/hadoop-3.1.3/etc/hadoop
cp spark-defaults.conf.template spark-site.conf
Example:
spark.master spark://master:7077
这两个文件暂时不必修改
cp fairscheduler.xml.template fairscheduler.xml
cp log4j.properties.template log4j.properties
分发到其他机器,并修改环境变量
scp -r /usr/spark root@slave1:/usr
scp -r /usr/spark root@slave2:/usr
vi /etc/profile
#spark
export SPARK_HOME=/usr/spark/spark-3.1.1-bin-hadoop3.2/
export PATH=$PATH:SPARK_HOME/bin
启动spark
在spark安装目录下:
sbin/start-all.sh
jps
spark安装目录下执行:bin/spark-shell
ctrl-z 退出
1.2 YARN模式
YARN(Yet Another Resource Negotiator)是Hadoop生态系统中的资源管理器。使用YARN可以让Spark与Hadoop无缝集成,允许它在Hadoop集群中运行。
优点:
- 与Hadoop集群完美结合,利用HDFS、MapReduce等资源。
- 支持动态资源调度,适合大规模集群。
搭建步骤:
- 确保Hadoop和YARN集群已配置和运行。
- 启动YARN Resource Manager:
start-yarn.sh
- 提交Spark任务到YARN:
./bin/spark-submit --master yarn <your_spark_job.py>
1.3 Mesos模式
Mesos是一个通用的集群管理系统,允许Spark与其他服务共享集群资源。
优点:
- 适用于需要多个框架共享集群资源的环境。
- 支持多种类型的任务调度。
搭建步骤:
- 安装并配置Mesos。
- 启动Mesos Master和Slave。
- 使用Spark提交任务到Mesos:
./bin/spark-submit --master mesos://<mesos-master-ip>:5050 <your_spark_job.py>
2. Spark Core
Spark Core是Spark的基础模块,负责内存管理、调度、任务分发、容错和与存储系统的交互。Spark Core包含以下核心概念:
2.1 RDD (Resilient Distributed Dataset)
RDD是Spark的核心抽象,代表一个不可变的分布式数据集。通过对RDD进行变换(transformation)或行动(action)来执行计算。
- Transformation:转换操作,惰性执行。例如,map()、filter()。
- Action:执行操作,立即触发计算。例如,count()、collect()。
示例代码:RDD的基本操作(scala)
import org.apache.spark.{SparkConf, SparkContext}
// 配置和初始化 Spark
val conf = new SparkConf().setAppName("Spark Core Example").setMaster("local")
val sc = new SparkContext(conf)
// 创建 RDD
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
// Transformation: map 将每个元素乘以 2
val rdd2 = rdd.map(x => x * 2)
// Action: collect 收集结果并打印
val result = rdd2.collect()
result.foreach(println)
// 关闭 Spark
sc.stop()
3. Spark SQL
Spark SQL 提供了在Spark上使用结构化数据的API。它允许用户通过SQL查询数据,并且无缝集成到Spark的批处理和流处理任务中。Spark SQL使用DataFrame和Dataset API进行操作。
示例代码:使用DataFrame和SQL查询(scala)
import org.apache.spark.sql.SparkSession
// 初始化 SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local")
.getOrCreate()
// 创建 DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")
// 显示 DataFrame 内容
df.show()
// 使用 SQL 查询
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT name, age FROM people WHERE age > 30")
sqlDF.show()
// 关闭 SparkSession
spark.stop()
优点:
- 支持标准的SQL查询。
- 可以直接与结构化数据(如JSON、Parquet、Hive表)集成。
- 动态查询优化(Catalyst引擎)提高性能。
4. Spark Streaming
Spark Streaming 是用于实时数据处理的组件。它可以通过微批处理将实时数据流分批处理,每一小批数据都以RDD的形式进行处理。
示例代码:Spark Streaming 处理Socket数据流
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 从 TCP Socket 接收数据
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
// 统计单词数
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
// 输出结果
wordCounts.print()
// 启动 StreamingContext
ssc.start()
ssc.awaitTermination()
优点:
- 支持多种数据源:Kafka、Flume、HDFS等。
- 适合处理实时流数据。
- 支持窗口化操作和故障恢复。
5. Spark MLlib
MLlib 是 Spark 中的机器学习库,包含常见的机器学习算法和工具,如分类、回归、聚类和协同过滤。它基于Spark的分布式架构,能够对大规模数据集进行高效的机器学习任务。
示例代码:使用MLlib进行线性回归
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("LinearRegressionExample").getOrCreate()
// 加载数据
val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
// 初始化线性回归模型
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 训练模型
val lrModel = lr.fit(data)
// 打印模型系数和截距
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}"
// 评估模型
val trainingSummary = lrModel.summary
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
// 关闭 SparkSession
spark.stop()
MLlib 支持的主要功能:
- 分类:逻辑回归、决策树、随机森林等。
- 回归:线性回归、岭回归等。
- 聚类:KMeans、Gaussian Mixture等。
- 降维:PCA、SVD等。
大数据组件-Zookeeper
- Zookeeper 集群安装
1.准备将windows系统中的zookeeper上传到/opt/soft:
cd /opt/soft #由于之前创建过了这里直接切换到/opt/soft目录
2.点击上传按钮,将zookeeper上传
3.将zookeeper安装到特定的目录下面:
mkdir -p /usr/zookeeper #创建一个目录用来安装zookeeper
4.安装:
tar -zxvf zookeeper-3.4.10.tar.gz -C /usr/zookeeper
2.Zookeeper配置(所有机器都需要)
1.切换到相关目录
cd /usr/zookeeper/ zookeeper-3.4.10/conf #接下来的步骤我们在这个目录下进行
2.ls #查看该目录下面有什么
3.我们需要配置文件,但是从“ls”可知,只给了我们一个模板,需要我们复制一份,然后再配置文件
4.cp zoo_sample.cfg zoo.cfg #zoo.cfg是它的新名字,内容和zoo_sample.cfg一样
5.编辑zoo.cfg
vi zoo.cfg
dataDir后面
改为
dataDir=/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdata
dataLogDir=/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdatalog
server.1=192.168.222.171:2888:3888
server.2=192.168.222.172:2888:3888
server.3=192.168.222.173:2888:3888
6.返回上一级位置
cd ..
7.创建我们在第5步要创建的文件
mkdir zkdata
mkdir zkdatalog
8.进入到zkdata目录下创建一个新文件并编辑它
cd zkdata
vi myid
编辑的内容和虚拟机有关系,如果是master,里面写入1,如果是slave1里面写入2,如果是slave2,里面写入3,如果是masterbak,里面写入4
9.接下来我们还要配置另外两台虚拟机,为了方便,我们直接远用scp,将master的zookeeper整个文件夹远程拷贝给slave1,slave2,masterbak,这样就不需要配置了
scp -r /usr/zookeeper root@slave1:/usr #这里考的是文件夹所以是scp -r 命令
scp -r /usr/zookeeper root@slave2:/usr #同样拷一份到slave2中去
scp -r /usr/zookeeper root@masterbak:/usr
10.拷贝过来的内容仍然需要做一些修改
到slave1中
cd /usr/zookeeper/zookeeper-3.4.10/zkdata #切换到该目录下
11.对myid 文件中的内容进行修改,因为不同的虚拟机,里面的内容不一样
vi myid
将里面的内容改为2
12.slave2同上10,11步骤,不同的是myid文件中,内容改为3
3.配置zookeeper的环境变量(所有都需要)
1.进入到指定的文件夹
vi /etc/profile
zookeeper的环境变量放在Java的环境变量下面
#Zookeeper------
export ZOOKEEPER_HOME=/usr/zookeeper/apache-zookeeper-3.5.7-bin/
export PATH=$PATH:$ZOOKEEPER_HOME
- 使环境变量生效
source /etc/profile
3.运行zookeeper
必须在zookeeper安装目录下
cd /usr/zookeeper/apache-zookeeper-3.5.7-bin/
bin/zkServer.sh start #这一步必须所有机器同时运行
可能会出现错误:关闭防火墙再看状态即可(systemctl stop firewalld)
端点占用:sudo yum install net-tools
netstat -apn | grep 2181
kill -9 xx
bin/zkServer.sh status #这一步选出谁是领导者谁是跟随者
jps 查看进程
4. Zookeeper 启停脚本编写
为了简化启动和停止 Zookeeper 集群的操作,我们可以编写一个脚本,通过 SSH 登录集群节点并启动或停止 Zookeeper。
启停脚本示例:zk_manage.sh
#!/bin/bash
ZOOKEEPER_HOME=/usr/zookeeper/apache-zookeeper-3.5.7-bin
ZOOKEEPER_NODES="node01 node02 node03"
case $1 in
"start")
for node in $ZOOKEEPER_NODES
do
echo "Starting Zookeeper on $node"
ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh start"
done
;;
"stop")
for node in $ZOOKEEPER_NODES
do
echo "Stopping Zookeeper on $node"
ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh stop"
done
;;
"status")
for node in $ZOOKEEPER_NODES
do
echo "Checking Zookeeper status on $node"
ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh status"
done
;;
*)
echo "Usage: $0 {start|stop|status}"
exit 1
;;
esac
说明:
- 该脚本读取集群节点列表(ZOOKEEPER_NODES),并通过SSH连接到每个节点执行启动、停止或状态检查命令。
使用方式:
- 启动:./zk_manage.sh start- 停止:./zk_manage.sh stop- 查看状态:./zk_manage.sh status
设置可执行权限:
chmod +x zk_manage.sh
5. 客户端命令行操作
Zookeeper 提供了命令行客户端工具 zkCli.sh,可以用来连接到 Zookeeper 服务并执行命令,如创建、删除 Znodes,查询节点数据等。
连接 Zookeeper:
在任意节点上使用 zkCli.sh 连接到 Zookeeper 服务:
/usr/zookeeper/apache-zookeeper-3.5.7-bin/bin/zkCli.sh -server node01:2181
常用命令:
**1.**创建节点:
create /myNode "hello zookeeper"
**2.**查询节点数据:
get /myNode
3.查看子节点:
ls /
4.修改节点数据:
set /myNode "new data"
5.删除节点:
delete /myNode
6.退出客户端:
quit
6. Java-API编写
步骤 1: 创建 Java 项目
- 创建新项目: - 打开Eclipse,选择 File -> New -> Java Project。- 输入项目名称(例如:ZookeeperExample)。- 点击 Finish 完成项目创建。
步骤 2: 下载 Zookeeper JAR 文件
- 下载Zookeeper的JAR包:解压Zookeeper的JAR包。
通常解压后会看到 zookeeper-<version>/lib 目录,其中包含了多个依赖库(包括Zookeeper的核心库和其他所需依赖)。
- 主要的JAR文件:
需要包括 zookeeper-<version>.jar(位于解压目录的根目录)和 lib 文件夹中的所有JAR文件。
步骤 3 在 Eclipse 中添加 Zookeeper 库
- 将 Zookeeper 的 JAR 文件添加到项目中:
右键点击你的项目(例如:ZookeeperExample),选择 Build Path -> Configure Build Path。
选择 Libraries 选项卡,然后点击 Add External JARs。
导航到你下载并解压的Zookeeper目录,选择 zookeeper-<version>.jar 和 lib 目录中的所有JAR文件。
点击 Apply and Close,这将把Zookeeper的依赖库添加到你的项目中。
步骤 4: 编写 Zookeeper Java API 代码
创建 Java 类:
右键点击 src 目录,选择 New -> Class。
将类命名为 ZookeeperExample,并勾选 public static void main(String[] args)。
点击 Finish。
编写 Zookeeper 示例代码:
在 ZookeeperExample.java 中,输入以下代码:
package com.example;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class ZookeeperExample {
private static ZooKeeper zooKeeper;
private static ZooKeeperConnection conn;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
// 连接到 Zookeeper
conn = new ZooKeeperConnection();
zooKeeper = conn.connect("localhost"); // 假设 Zookeeper 在本地运行,端口为默认的 2181
// 创建 Znode
String path = "/myNode";
byte[] data = "Hello Zookeeper".getBytes();
create(path, data);
// 获取 Znode 数据
System.out.println("Data of Znode: " + new String(getData(path)));
// 修改 Znode 数据
byte[] newData = "New data".getBytes();
setData(path, newData);
// 获取修改后的 Znode 数据
System.out.println("Updated data of Znode: " + new String(getData(path)));
// 删除 Znode
delete(path);
// 关闭连接
conn.close();
}
// 创建 Znode
public static void create(String path, byte[] data) throws KeeperException, InterruptedException {
zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 获取 Znode 数据
public static byte[] getData(String path) throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists(path, true);
return zooKeeper.getData(path, false, stat);
}
// 修改 Znode 数据
public static void setData(String path, byte[] data) throws KeeperException, InterruptedException {
zooKeeper.setData(path, data, zooKeeper.exists(path, true).getVersion());
}
// 删除 Znode
public static void delete(String path) throws KeeperException, InterruptedException {
zooKeeper.delete(path, zooKeeper.exists(path, true).getVersion());
}
}
class ZooKeeperConnection {
private ZooKeeper zoo;
// 连接到 Zookeeper 服务器
public ZooKeeper connect(String host) throws IOException, InterruptedException {
zoo = new ZooKeeper(host, 5000, we -> {});
return zoo;
}
// 关闭连接
public void close() throws InterruptedException {
zoo.close();
}
}
步骤 5: 运行代码
- 确保 Zookeeper 服务已经启动:
在本地或服务器上启动Zookeeper服务:
/usr/zookeeper/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start
- 在 Eclipse 中运行 Java 项目:
右键点击 ZookeeperExample.java,选择 Run As -> Java Application。
控制台将会显示程序的输出,执行成功后,你将看到Znode被创建、读取、更新和删除的操作。
步骤 6: 验证输出
在Eclipse的控制台中,你应该看到类似如下的输出:
Data of Znode: Hello Zookeeper
Updated data of Znode: New data
这表明程序已经成功连接到Zookeeper,创建了Znode /myNode,并对其数据进行了读写操作。
大数据组件-Fink
1. Flink Yarn 模式搭建
在大数据环境中,Flink可以通过YARN进行集群部署管理。YARN作为资源管理器,负责分配和管理Flink作业的资源。
**Step 1: **下载和解压Flink
软件包 放入集群 解压 /usr/flink
cd /usr/flink/flink-1.14.0/
**Step 2: **配置Flink
1.修改配置文件:进入Flink安装目录,并修改 flink-conf.yaml 文件。
vi conf/flink-conf.yaml
在flink-conf.yaml中添加或修改以下配置:
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 2
2.** YARN配置**:确保Hadoop配置路径在Flink中可见,并设置HADOOP_CONF_DIR。
export HADOOP_CONF_DIR=/usr/hadoop/Hadoop-3.1.3/etc/hadoop
**Step 3: **启动YARN集群
确保你的Hadoop集群已经启动并运行:
start-dfs.sh
start-yarn.sh
**Step 4: **通过YARN启动Flink
你可以通过以下命令启动Flink的JobManager和TaskManager,交由YARN管理:
./bin/yarn-session.sh -d -n 3 -s 4 -jm 1024m -tm 2048m
- -n 表示启动的TaskManager数量
- -s 表示每个TaskManager的插槽数量
- -jm 表示JobManager使用的内存
- -tm 表示每个TaskManager使用的内存
此命令会在YARN上启动Flink会话模式,Flink作业可以提交到YARN集群中。
【Standalone模式】
vi /etc/profile
配置环境变量
---Flink
export FLINK_HOME=/usr/flink/flink-1.14.0/
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
$FLINK_HOME
2.配置conf文件
cd conf
配置flink-conf.yaml
#1. 配置jobmanager rpc 地址
jobmanager.rpc.address: master
#2. 修改taskmanager内存大小,可改可不改
taskmanager.memory.process.size: 2048m
#3. 修改一个taskmanager中对于的taskslot个数,可改可不改
taskmanager.numberOfTaskSlots: 4
#修改并行度,可改可不改
parallelism.default: 4
3.vi masters
master:8081
vi workers
master
4.配置zoo
新建snapshot存放的目录,在flink目录下建
mkdir tmp
cd tmp
mkdir zookeeper
#修改conf下zoo.cfg配置
vi zoo.cfg
#snapshot存放的目录
dataDir=/usr/flink/flink-1.14.0/tmp/zookeeper
#配置zookeeper 地址
server.1=192.168.222.171:2888:3888
5.启动集群
bin/start-cluster.sh
192.168.222.171:8081进入浏览器界面
2. Flink 运行架构
Flink的运行架构基于JobManager和TaskManager,并结合分布式集群来处理作业。
- JobManager:管理作业的执行,是Flink的调度器,负责协调各个任务,管理故障恢复。
- TaskManager:执行实际的数据处理任务。它们接受来自JobManager的任务指令,负责执行并报告任务状态。
- Dispatcher:负责提交作业并与外部接口进行通信。
- ResourceManager:负责管理集群资源,并向JobManager分配资源。
Flink可以在不同模式下运行:
- Standalone模式:Flink可以单独运行在一台或多台机器上。
- YARN模式:Flink作业运行在YARN集群上,动态分配资源。
- Kubernetes模式:Flink作业运行在Kubernetes集群上。
3. DataStream API
DataStream API 是Flink的核心API之一,用于处理无界数据流。以下是使用DataStream API进行简单数据流处理的代码示例。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从socket读取数据,假设监听在localhost:9999
env.socketTextStream("localhost", 9999)
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1)
.print();
// 执行流处理作业
env.execute("Flink Streaming Word Count");
}
// FlatMap function,用于将输入数据分割成单词
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 分割单词并计数
for (String word : value.split("\\s")) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
步骤:
- 设置执行环境:使用 StreamExecutionEnvironment 创建Flink执行环境。
- 读取数据流:从Socket(localhost:9999)读取流式数据。
- 定义FlatMapFunction:用于将接收到的行分割成单词,并创建 Tuple2 形式的(word, count) 对象。
- 按key进行分组:通过 keyBy 操作,根据单词(Tuple2.f0)进行分组。
- 聚合:使用 sum(1) 对每个单词出现次数进行累加。
- 输出结果:使用 print() 输出到控制台。
- 启动执行:通过 env.execute 启动作业。
4. Flink中的时间和窗口
Flink有两种时间语义:事件时间和处理时间。窗口是Flink处理无界数据流的核心机制,允许将数据划分为时间段来进行聚合处理。
时间类型:
**1.**事件时间:数据发生的实际时间,通常从数据源携带的时间戳中提取。
**2.**处理时间:数据到达Flink处理系统的时间。
窗口类型:
- 滚动窗口(Tumbling Window):将数据分割成不重叠的固定大小的窗口。
- 滑动窗口(Sliding Window):窗口有一个固定的大小和滑动步长,允许窗口重叠。
- 会话窗口(Session Window):根据用户的行为进行窗口划分,窗口之间没有固定的大小。
窗口示例:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
public class WindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据流
DataStream<String> textStream = env.socketTextStream("localhost", 9999);
// 进行窗口操作,按单词分组,并在时间窗口内进行计数
textStream
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 使用滚动窗口,每10秒聚合一次
.sum(1)
.print();
env.execute("Window Example");
}
}
在此示例中,使用 TumblingEventTimeWindows 创建了一个10秒的滚动窗口。
**5. **处理函数(Process Function)
ProcessFunction 是Flink提供的低级别的处理API,允许开发者对数据流进行灵活的操作。
示例代码:Process Function的使用
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessFunctionExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9999)
.process(new MyProcessFunction())
.print();
env.execute("ProcessFunction Example");
}
public static class MyProcessFunction extends ProcessFunction<String, String> {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// 输出原始数据并添加前缀
out.collect("Processed: " + value);
}
}
}
说明:
- processElement 方法允许对流中的每个元素进行处理。
- Collector 用于收集处理后的数据并输出。
大数据组件-Flume、Kafka
- 安装Flume
Step 1: 解压Flume安装包
- 将Master节点Flume安装包解压到/opt/soft目录下,创建好/usr/flume
tar -zxvf /opt/soft/apache-flume-1.9.0-bin.tar.gz -C /usr/flume
Step 2: 配置环境变量
- 配置环境变量
vi /etc/profile
添加以下内容:
#FLUME_HOME
export FLUME_HOME=/usr/flume/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin
- 配置文件(位于:/usr/flume/flume-1.9.1/conf)
将 flume-env.sh.template 复制更名为 flume-env.sh
cp flume-env.sh.template flume-env.sh
并添加以下内容:
vi flume-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_212
使配置文件生效
source /etc/profile
前提:
关闭hbase配置文件
将 hbase 的 hbase.env.sh 的一行配置注释掉
#Extra Java CLASSPATH elements. Optional.
#export HBASE_CLASSPATH=/home/hadoop/hbase/conf
验证
输入命令:
flume-ng version
三、任务
启动Flume传输Hadoop日志(namenode或datanode日志),查看HDFS中/tmp/flume目录下生成的内容
1,将hadoop与flume中 guava-27.0-jre.jar 包版本保持一致( 因为hadoop中此包版本是27而flume中版本是11 )
首先删除Flume自带的旧版本的Guava库
rm -rf /usr/flume/flume-1.9.1/lib/guava-11.0.2.jar
然后将hadoop里面的此包复制给flume
cp /usr/hadoop/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /usr/flume/flume-1.9.0/lib/
2.在 /usr/flume/flume-1.9.0/conf下新建文件 conf-file ,并写配置文件
创建Flume配置文件(conf-file)
Flume的工作方式是通过定义Source、Channel和Sink来收集、传输和存储数据。这些术语的含义如下:
Source(源):从某个位置读取数据(在这里是Hadoop的日志文件)。
Channel(通道):数据在从Source到Sink传输的过程中,需要通过Channel进行中转。
Sink(接收器):将数据存储到目标位置(在这里是HDFS)。
在/usr/flume/flume-1.9.0/conf目录下创建一个名为conf-file的Flume配置文件,内容如下:
a1.sources=r1
a1.sinks=k1
a1.channels=c1
Source配置: 使用TAILDIR类型读取Hadoop的日志文件
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=as
a1.sources.r1.filegroups.as=/usr/hadoop/hadoop-3.1.3/logs/hadoop-root-namenode-master.log
Channel配置: 使用内存通道存储数据
a1.channels.c1.type=memory
Sink配置: 将数据写入HDFS
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master:9820/tmp/flume
绑定Source和Channel
a1.sources.r1.channels=c1
绑定Sink和Channel
a1.sinks.k1.channel=c1
3.启动Flume传输Hadoop日志,查看HDFS中/tmp/flume目录下生成的文件
前提:启动hadoop集群
启动命令:
flume-ng agent -c conf -f conf-file -n a1 -Dflume.root.logger=INFO,console &
查看HDFS中/tmp/flume目录下生成的文件命令:
hdfs dfs -ls /tmp/flume
停止它:
通过ps命令查找并停止Flume进程
Step 1: 查找Flume进程ID (PID)
使用 ps 命令查找运行中的Flume进程:
ps -ef | grep flume
你将看到类似如下输出:
user 12345 6789 0 10:23 ? 00:00:15 java -cp /opt/flume/lib/... flume-ng agent -c conf ...
user 12350 6789 0 10:23 pts/1 00:00:00 grep --color=auto flume
这里,12345 是Flume agent进程的PID(进程ID)。你需要记住这个PID。
Step 2: 停止Flume进程
使用 kill -9命令来强制停止这个进程:
kill -9 12345
bin/flume-manage.sh stop force
- 安装Kafka
1、使用服务器名称进行通信
编辑 /etc/hosts 文件,在最后添加如下内容
192.168.222.171 node01
192.168.222.172 node02
192.168.222.173 node03
2、关闭防火墙
systemctl stop firewalld
3、安装 Zookeeper 集群
4.上传并解压文件到指定位置 tar -zxvf
5.配置
vi config/server.properties
添加内容:
与 zookeeper 的 myid 一样,每个实例拥有唯一的 ID
broker.id=1
日志文件,数据文件目录
log.dirs=/usr/kafka/kafka_2.12-3.0.0/logs
zookeeper 集群,使用逗号分隔
zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka
6.配置环境变量
编辑 /etc/profile 文件,添加如下内容至文件末尾
kafka
export KAFKA_HOME=/usr/kafka/kafka_2.12-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin
使配置生效
source /etc/profile
7.配置其它两台服务器
使用 scp 命令将 kafka 文件夹发送至其它两台服务器
scp /usr/kafka/kafka_2.12-3.0.0 @node02:/usr/kafka
scp /usr/kafka/kafka_2.12-3.0.0 @node03:/usr/kafka
修改 server.properties 配置文件中的** broker.id**
部署完成,启动服务。
8.Kafka基本操作
先启动zookeeper服务
启动服务
kafka-server-start.sh /usr/kafka/kafka_2.12-3.0.0/config/server.properties &
3. Flume端口监听
设置Flume监听端口
在Flume的配置文件中设置source为netcat类型,并绑定到一个特定的IP地址和端口,这样Flume会在这个端口监听传入的数据。例如:
agent.sources.source1.type = netcat
agent.sources.source1.bind = 0.0.0.0
agent.sources.source1.port = 44444
启动Flume后,它会开始在44444端口监听。你可以使用telnet或netcat工具发送数据到这个端口:
telnet localhost 44444
- Kafka启停脚本编写
注意:
1.需先在/etc/hosts 文件中配置 IP 映射。
2.启动kafka集群前,要先启动zoopkeeper集群。
一、kafka集群启动脚本
#!/bin/bash
BROKERS="node01 node02 node03"
KAFKA_HOME="/usr/kafka/kafka_2.12-3.0.0"
KAFKA_NAME=”kafka_2.12-3.0.0”
for i in ${BROKERS}
do
echo "Starting ${KAFKA_NAME} on ${i} "
ssh ${i} "source /etc/profile; nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &"
if [[ $? -ne 0 ]]; then
echo "Starting ${KAFKA_NAME} on ${i} is ok"
fi
done
echo All ${KAFKA_NAME} are started
exit 0
作用:通过ssh远程连接到当前服务器${i}(即hadoop01、hadoop02或hadoop03),并执行Kafka的启动命令。
详细解释:
- ssh ${i}:通过SSH连接到当前的服务器${i}。
- "source /etc/profile":远程服务器执行source /etc/profile,加载环境变量,确保Kafka所需的环境变量已加载。
- nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &:启动Kafka服务器,具体操作为: - nohup:使命令在后台运行,即使SSH连接断开,Kafka仍会继续运行。- sh ${KAFKA_HOME}/bin/kafka-server-start.sh:调用Kafka的启动脚本。- ${KAFKA_HOME}/config/server.properties:指定Kafka的配置文件位置。- > /dev/null 2>&1 &:将所有输出(包括标准输出和标准错误输出)重定向到/dev/null,即忽略所有输出,并在后台运行进程。
二、kafka集群停止脚本
#!/bin/bash
BROKERS="node01 node02 node03"
KAFKA_HOME="/usr/kafka/kafka_2.12-3.0.0"
KAFKA_NAME="kafka_2.12-3.0.0"
for i in $BROKERS
do
echo "Stopping ${KAFKA_NAME} on ${i}..."
# 使用 ssh 执行 Kafka 停止命令,并捕获其返回状态
ssh ${i} "source /etc/profile; bash ${KAFKA_HOME}/bin/kafka-server-stop.sh"
# 检查 ssh 命令是否成功
if [[ $? -eq 0 ]]; then
echo "Kafka stopped successfully on ${i}"
else
echo "Failed to stop Kafka on ${i}"
fi
done
echo "All ${KAFKA_NAME} are stopped"
exit 0
为保证这两个脚本能够执行,需要设置可执行权限:
chmod +x start-kafka.sh
chmod +x stop-kafka.sh
三、启停合并脚本
#! /bin/bash
case $2 in
"start")
for i in $(cat $1)
do
echo " --------启动 $i Kafka-------"
# 用于KafkaManager监控,JMX_PORT 设置
ssh $i "export JMX_PORT=9988; /usr/local/kafka/bin/kafka-server-start.sh -daemon /opt/local/kafka/config/server.properties"
done
;;
"stop")
for i in $(cat $1)
do
echo " --------停止 $i Kafka-------"
ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh"
done
;;
*)
echo "Usage: $0 <hosts_file> <start|stop>"
exit 1
;;
esac
使用方法:
启动Kafka:
./script.sh hosts.txt start
其中 hosts.txt 包含你要启动Kafka的所有服务器名称或IP地址,每行一个。例如:
node01
node02
node03
停止Kafka:
./script.sh hosts.txt stop
- Kafka命令行操作
5.1 创建主题
创建一个名为test的主题,分区数为1,副本数为1:
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
5.2 查看主题列表
查看当前集群中的主题列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5.3 发送消息
使用命令行工具向主题test发送消息:
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
然后你可以在控制台输入消息,按Enter发送。
5.4 消费消息
在另一个终端窗口中,使用命令行工具消费test主题的消息:bash
复制代码
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
此命令将从主题的开始读取所有消息,并显示在控制台上。
5.5 查看主题详情
查看主题test的详细信息,包括分区数、副本数等:
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
大数据组件-ClickHouse【考纲新增】
1. ClickHouse单机安装
安装步骤
Step 1: 安装依赖
更新系统并安装必要的依赖:
sudo yum update -y
sudo yum install -y epel-release
sudo yum install -y libicu libtool-ltdl openssl
Step 2: 安装ClickHouse
- 安装** clickhouse-common-static**
该包包含ClickHouse的核心文件和二进制:
sudo rpm -ivh clickhouse-common-static-21.7.3.14-2.x86_64.rpm
安装** clickhouse-common-static-dbg**
这是用于调试的包,可以在排查问题时使用:
sudo rpm -ivh clickhouse-common-static-dbg-21.7.3.14-2.x86_64.rpm
安装** clickhouse-client**
安装ClickHouse客户端,用于连接并操作ClickHouse数据库:
sudo rpm -ivh clickhouse-client-21.7.3.14-2.noarch.rpm
安装** clickhouse-server**
安装ClickHouse服务器,这个包是ClickHouse的核心服务:
sudo rpm -ivh clickhouse-server-21.7.3.14-2.noarch.rpm
- 启动ClickHouse服务:
sudo systemctl start clickhouse-server
- 设置ClickHouse服务在系统启动时自动启动:
sudo systemctl enable clickhouse-server
Step 3: 验证安装
安装完成后,使用clickhouse-client
命令连接到本地的ClickHouse实例,并执行简单的查询验证安装:
clickhouse-client
执行查询命令查看版本:
SELECT version();
返回ClickHouse的版本信息,表示安装成功。
2. 创建表、插入数据
2.1 创建数据库和表
在ClickHouse中,可以根据需要创建数据库和表。首先,创建一个数据库:
CREATE DATABASE test_db;
使用刚创建的数据库:
USE test_db;
创建一个简单的表,包含用户ID、姓名和年龄三个字段:
CREATE TABLE users (
id UInt32,
name String,
age UInt8
) ENGINE = MergeTree()
ORDER BY id;
MergeTree
是ClickHouse中常用的表引擎,支持高效的读写和索引。ORDER BY
指定了表的排序键,在插入数据时自动进行排序。
2.2 插入数据
向表中插入一些数据,可以使用SQL的INSERT
语句:
INSERT INTO users (id, name, age) VALUES (1, 'Alice', 25), (2, 'Bob', 30), (3, 'Charlie', 35);
查看表中的数据:
SELECT * FROM users;
输出结果类似于:
id
name
age
1
Alice
25
2
Bob
30
3
Charlie
35
3. 查询、修改、删除
3.1 查询数据
ClickHouse支持标准的SQL查询。常见查询操作包括:
查询所有数据:
SELECT * FROM users;
根据条件查询:
查询年龄大于30的用户:
SELECT * FROM users WHERE age > 30;
查询特定字段:
查询用户的姓名和年龄:
SELECT name, age FROM users;
排序查询:
按年龄降序排列:
SELECT * FROM users ORDER BY age DESC;
3.2 修改数据
在ClickHouse中直接修改已有的数据比较特殊,ClickHouse本身不支持传统的UPDATE
操作。
可以通过删除并重新插入的方式进行修改,或使用ALTER
语句来调整表的结构。
例如,要修改ID为2的用户年龄为32,可以先删除该条数据,再插入新值:
删除数据:
ALTER TABLE users DELETE WHERE id = 2;
插入新数据:
INSERT INTO users (id, name, age) VALUES (2, 'Bob', 32);
查询更新后的数据:
SELECT * FROM users WHERE id = 2;
输出结果:
id
name
age
2
Bob
32
3.3 删除数据
删除单条数据:
删除ID为3的用户:
ALTER TABLE users DELETE WHERE id = 3;
删除表中的所有数据:
TRUNCATE TABLE users;
删除表:
如果你不再需要某个表,可以将其删除:
DROP TABLE users;
4. 常见操作总结
操作
示例命令
创建数据库
CREATE DATABASE test_db;
创建表
CREATE TABLE users (id UInt32, name String, age UInt8) ENGINE = MergeTree() ORDER BY id;
插入数据
INSERT INTO users (id, name, age) VALUES (1, 'Alice', 25);
查询所有数据
SELECT * FROM users;
条件查询
SELECT * FROM users WHERE age > 30;
修改数据
先ALTER TABLE users DELETE WHERE id = 2;
然后INSERT INTO users (id, name, age) VALUES (2, 'Bob', 32);
删除单条数据
ALTER TABLE users DELETE WHERE id = 3;
删除表
DROP TABLE users;
版权归原作者 小伍_Five 所有, 如有侵权,请联系我们删除。