一、准备工作
1.安装虚拟机
VMware 安装 CentOS 7, 选择mini版,英文,网络NAT。
http://mirrors.aliyun.com/centos/7.9.2009/isos/x86_64/CentOS-7-x86_64-Minimal-2009.iso
vim /etc/sysconfig/network-scripts/ifcfg-ens33
将最后一行修改为
ONBOOT="yes"
重启网络服务, 确保自己能够ping通baidu,如果依旧不行可以直接reboot重启虚拟机
systemctl restart network
查看ip地址
yum install net-tools
ifconfig
- 安装java 环境
yum install java-1.8.*
3.安装scala
yum install https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.rpm
4.安装screen
yum install screen
#新建 screen -S xxx
#退出 ctrl + A + D
#重连 screen -r
#列表 screen -ls
- 安装wget,vim
yum install wget
yum install vim
6.关闭防火墙
systemctl stop firewalld
二、安装Kafka
- 下载
cd
wget https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz --no-check-certificate
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0
- 启动zookeeper
#启动zookeeper
screen -S zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#ctrl+A+D退出screen
- 启动kafka
screen -S kafka
bin/kafka-server-start.sh config/server.properties
#ctrl+A+D退出screen
三、安装flink
cd
wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz --no-check-certificate
tar -xf flink-1.13.0-bin-scala_2.11.tgz && cd flink-1.13.0
#启动
./bin/start-cluster.sh
四、安装spark
#安装spark
#参考教程 https://spark.apache.org/docs/3.2.0/
cd
wget https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz --no-check-certificate
tar -xf spark-3.2.0-bin-hadoop3.2.tgz
cd spark-3.2.0-bin-hadoop3.2
./bin/pyspark
五、配置Python环境
1.安装python
yum install python36 python3-devel
yum install gcc gcc-c++
pip3 install pip --upgrade
- 安装pyalink
pip3 install pyalink --user -i https://mirrors.aliyun.com/pypi/simple --ignore-installed PyYAML
ln -si /root/.local/bin/* /usr/bin/
- 安装pyspark
pip3 install pyspark -i https://mirrors.aliyun.com/pypi/simple/
- 安装kafka-python
pip3 install kafka-python
- 配置jupyter
jupyter notebook --generate-config
jupyter notebook password
修改配置文件
vim /root/.jupyter/jupyter_notebook_config.py
#修改对应的两行
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
启动
jupyter notebook --allow-root
六、测试
- 本机打开浏览器访问 服务器ip:8888, 例如192.168.128.140:8888
- 测试kafka
- 生产者
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode()
)
msg = "Hello World"
producer.send('result', msg)
- 消费者
from kafka import KafkaConsumer
consumer = KafkaConsumer('result', bootstrap_servers=['localhost:9092'])
for msg in consumer:
print(msg.value)
- 测试flink
from pyalink.alink import *
import pandas as pd
useLocalEnv(1)
df = pd.DataFrame(
[
[2009, 0.5],
[2010, 9.36],
[2011, 52.0],
[2012, 191.0],
[2013, 350.0],
[2014, 571.0],
[2015, 912.0],
[2016, 1207.0],
[2017, 1682.0]
]
)
train_set = BatchOperator.fromDataframe(df, schemaStr='sumMoney double, fraud double')
trainer = LinearRegTrainBatchOp()\
.setFeatureCols(["sumMoney"])\
.setLabelCol("fraud")
train_set.link(trainer);
train_set.print()
3.测试spark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('ml').getOrCreate()
_schema1 = 'x1 int, x2 int, x3 int, y int '
_schema2 = 'x1 int, x2 int , x3 int '
trainDf = spark.createDataFrame([
[900,50,90,1],
[800,50,90,1],
[600,50,120,1],
[500,40,100,0],
[750,60,150,0]
],schema=_schema1)
testDf = spark.createDataFrame([
[650,60,90],
[600,40,90],
[750,50,60]
],schema=_schema2)
trainDf.show()
testDf.show()
版权归原作者 qq929931589 所有, 如有侵权,请联系我们删除。