0


Spark+Flink+Kafka环境配置

一、准备工作

1.安装虚拟机

VMware 安装 CentOS 7, 选择mini版,英文,网络NAT。

http://mirrors.aliyun.com/centos/7.9.2009/isos/x86_64/CentOS-7-x86_64-Minimal-2009.iso

vim /etc/sysconfig/network-scripts/ifcfg-ens33

将最后一行修改为
ONBOOT="yes"

重启网络服务, 确保自己能够ping通baidu,如果依旧不行可以直接reboot重启虚拟机

systemctl restart network

查看ip地址

yum install net-tools

ifconfig 
  1. 安装java 环境
yum install java-1.8.*

3.安装scala

yum install https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.rpm

4.安装screen

yum install screen

#新建 screen -S xxx
#退出 ctrl + A + D
#重连 screen -r
#列表 screen -ls
  1. 安装wget,vim
yum install wget
yum install vim

6.关闭防火墙

systemctl stop firewalld

二、安装Kafka

  1. 下载
cd

wget https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz --no-check-certificate

tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0
  1. 启动zookeeper
#启动zookeeper
screen -S zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#ctrl+A+D退出screen
  1. 启动kafka
screen -S kafka
bin/kafka-server-start.sh config/server.properties
#ctrl+A+D退出screen

三、安装flink

cd

wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz --no-check-certificate
tar -xf flink-1.13.0-bin-scala_2.11.tgz && cd flink-1.13.0

#启动
./bin/start-cluster.sh

四、安装spark

#安装spark
#参考教程 https://spark.apache.org/docs/3.2.0/

cd

wget https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz --no-check-certificate
tar -xf spark-3.2.0-bin-hadoop3.2.tgz
cd spark-3.2.0-bin-hadoop3.2

./bin/pyspark

五、配置Python环境

1.安装python

yum install python36 python3-devel
yum install gcc gcc-c++

pip3 install pip --upgrade
  1. 安装pyalink
pip3 install pyalink --user -i https://mirrors.aliyun.com/pypi/simple --ignore-installed PyYAML

ln -si /root/.local/bin/* /usr/bin/
  1. 安装pyspark
pip3 install pyspark -i https://mirrors.aliyun.com/pypi/simple/
  1. 安装kafka-python
pip3 install kafka-python
  1. 配置jupyter
jupyter notebook --generate-config
jupyter notebook password

修改配置文件

vim /root/.jupyter/jupyter_notebook_config.py

#修改对应的两行
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False

启动

jupyter notebook --allow-root

六、测试

  • 本机打开浏览器访问 服务器ip:8888, 例如192.168.128.140:8888
  1. 测试kafka
  • 生产者
from kafka import KafkaProducer
import json
 
producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'], 
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode()
)

 
 
msg = "Hello World"
producer.send('result', msg)
  • 消费者
from kafka import KafkaConsumer

consumer = KafkaConsumer('result', bootstrap_servers=['localhost:9092'])

for msg in consumer:
    print(msg.value)
  1. 测试flink
from pyalink.alink import *
import pandas as pd

useLocalEnv(1)

df = pd.DataFrame(
    [
        [2009, 0.5],
        [2010, 9.36],
        [2011, 52.0],
        [2012, 191.0],
        [2013, 350.0],
        [2014, 571.0],
        [2015, 912.0],
        [2016, 1207.0],
        [2017, 1682.0]
    ]
)

train_set = BatchOperator.fromDataframe(df, schemaStr='sumMoney double, fraud double')

trainer = LinearRegTrainBatchOp()\
    .setFeatureCols(["sumMoney"])\
    .setLabelCol("fraud")

train_set.link(trainer);
train_set.print()

3.测试spark

from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('ml').getOrCreate()

_schema1 = 'x1 int, x2 int, x3 int, y int '
_schema2 = 'x1 int, x2 int , x3 int '

trainDf = spark.createDataFrame([
    [900,50,90,1],
    [800,50,90,1],
    [600,50,120,1],
    [500,40,100,0],
    [750,60,150,0]
],schema=_schema1)

testDf = spark.createDataFrame([
    [650,60,90],
    [600,40,90],
    [750,50,60]
],schema=_schema2)

trainDf.show()
testDf.show()
标签: spark flink 大数据

本文转载自: https://blog.csdn.net/qq929931589/article/details/137112318
版权归原作者 qq929931589 所有, 如有侵权,请联系我们删除。

“Spark+Flink+Kafka环境配置”的评论:

还没有评论