目录
下面是关于 Storm 的单机和集群环境部署教程,以及部署过程中的注意事项和 Java、Python 的使用案例。
一、Storm 单机环境部署
1. 环境准备
- 操作系统:Linux(推荐 Ubuntu 20.04 或 CentOS 7)
- Java:Storm 需要 Java 环境,推荐使用 OpenJDK 8 或 11。
- ZooKeeper:Storm 依赖 ZooKeeper,需要安装和配置 ZooKeeper。
2. 安装 Java
在 Ubuntu 中:
sudoapt update
sudoaptinstall openjdk-11-jdk
在 CentOS 中:
sudo yum install java-11-openjdk
验证 Java 安装:
java-version
3. 安装 ZooKeeper
参考之前的 ZooKeeper 部署教程,安装并启动 ZooKeeper。
4. 下载并解压 Storm
访问 Storm 官方网站 下载最新版本的 Storm。
wget https://downloads.apache.org/storm/apache-storm-2.4.0/apache-storm-2.4.0.tar.gz
tar-xzvf apache-storm-2.4.0.tar.gz
mv apache-storm-2.4.0 /usr/local/storm
5. 配置 Storm
- **编辑配置文件
storm.yaml
**:在/usr/local/storm/conf/storm.yaml
中添加以下内容:storm.zookeeper.servers:-"localhost"nimbus.seeds:["localhost"]storm.local.dir:"/usr/local/storm/data"supervisor.slots.ports:-6700-6701-6702-6703
- 配置环境变量:编辑
~/.bashrc
文件,添加以下内容:exportSTORM_HOME=/usr/local/stormexportPATH=$PATH:$STORM_HOME/bin
然后加载配置:source ~/.bashrc
6. 启动 Storm
- 启动 Nimbus(主节点):
storm nimbus &
- 启动 Supervisor(工作节点):
storm supervisor &
- 启动 UI 服务:
storm ui &
- 启动 Logviewer 服务(可选):
storm logviewer &
7. 验证 Storm 是否正常运行
访问 Storm UI 界面
http://localhost:8080
,可以查看集群的运行状态。
8. Storm 单机部署的注意事项
- Java 版本:确保 Java 环境配置正确。
- ZooKeeper:确保 ZooKeeper 服务正常运行,且 Storm 能够连接到 ZooKeeper。
- 内存与资源配置:根据机器配置调整 Storm 的内存和资源使用。
- 日志管理:配置 Logviewer 并监控日志文件。
二、Storm 集群环境部署
1. 环境准备
- 多台服务器:至少 3 台(推荐 5 台以上,1 台作为 Nimbus,其他作为 Supervisor)
- 操作系统:Linux(推荐 Ubuntu 20.04 或 CentOS 7)
- Java:在所有节点上安装 Java
- ZooKeeper:在集群中安装并配置 ZooKeeper
2. 配置 Storm 集群
2.1 安装 Storm
在每台服务器上安装 Storm(参考单机环境部署的步骤)。
2.2 配置 Nimbus 节点
在 Nimbus 节点上编辑
storm.yaml
文件:
storm.zookeeper.servers:-"zookeeper1"-"zookeeper2"-"zookeeper3"nimbus.seeds:["nimbus-node"]storm.local.dir:"/usr/local/storm/data"storm.cluster.mode:"distributed"supervisor.slots.ports:-6700-6701-6702-6703
2.3 配置 Supervisor 节点
在每个 Supervisor 节点上编辑
storm.yaml
文件:
storm.zookeeper.servers:-"zookeeper1"-"zookeeper2"-"zookeeper3"nimbus.seeds:["nimbus-node"]storm.local.dir:"/usr/local/storm/data"storm.cluster.mode:"distributed"supervisor.slots.ports:-6700-6701-6702-6703
2.4 启动 Storm 集群
在 Nimbus 节点上启动 Nimbus 和 UI 服务:
storm nimbus &
storm ui &
在每个 Supervisor 节点上启动 Supervisor 服务:
storm supervisor &
3. 验证 Storm 集群状态
访问 Nimbus 节点的 Storm UI 界面
http://nimbus-node:8080
,可以查看集群中所有 Supervisor 节点的状态。
4. Storm 集群部署的注意事项
- ZooKeeper 配置:确保所有 Storm 节点可以正常连接 ZooKeeper。
- 网络和端口:确保 Nimbus 和 Supervisor 节点之间的网络连接正常,端口未被防火墙阻挡。
- 内存与资源配置:根据节点硬件配置合理分配内存和 CPU 资源,避免资源不足或浪费。
- 监控与日志管理:使用监控工具监控 Storm 集群状态,并配置日志收集和分析工具。
三、Storm 使用案例
1. Java 示例:编写简单的 Storm Topology
1.1 添加 Maven 依赖
在
pom.xml
中添加 Storm 依赖:
<dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency></dependencies>
1.2 实现一个简单的 Topology
importorg.apache.storm.Config;importorg.apache.storm.LocalCluster;importorg.apache.storm.topology.TopologyBuilder;importorg.apache.storm.tuple.Fields;importorg.apache.storm.utils.Utils;publicclassWordCountTopology{publicstaticvoidmain(String[] args){// 创建TopologyBuilderTopologyBuilder builder =newTopologyBuilder();// 设置Spout和Bolt
builder.setSpout("spout",newRandomSentenceSpout());
builder.setBolt("split",newSplitSentenceBolt()).shuffleGrouping("spout");
builder.setBolt("count",newWordCountBolt()).fieldsGrouping("split",newFields("word"));// 配置Config config =newConfig();
config.setDebug(true);// 本地模式运行LocalCluster cluster =newLocalCluster();
cluster.submitTopology("word-count", config, builder.createTopology());// 运行一段时间后关闭Utils.sleep(10000);
cluster.shutdown();}}
2. Python 示例:使用
streamparse
编写简单的 Storm Topology
2.1 安装
streamparse
pip install streamparse
2.2 创建 Storm Topology
- 创建项目:
sparse quickstart word_countcd word_countsparse run
- **编辑
word_count/word_count/spouts/words.py
**:from streamparse import SpoutclassWordSpout(Spout):defnext_tuple(self): words =["stream","parse","storm","python","topology"] self.emit([random.choice(words)])
- **编辑
word_count/word_count/bolts/wordcount.py
**:from collections import Counterfrom streamparse import BoltclassWordCountBolt(Bolt):definitialize(self, conf, ctx): self.counts = Counter()defprocess(self, tup): word = tup.values[0] self.counts[word]+=1 self.emit([word, self.counts[word]]) self.log('%s: %d'%(word, self.counts[word]))
- **编辑
word_count/word_count/topologies/wordcount.py
**:from streamparse import Grouping, Topologyfrom spouts.words import WordSpoutfrom bolts.wordcount import WordCountBoltclassWordCountTopology(Topology): word_spout = WordSpout.spec() count_bolt = WordCountBolt.spec(inputs=[word_spout], groupings={WordCountBolt: Grouping.fields('word')})
- 运行 Topology:
sparse submit
总结
通过以上步骤,我们完成了 Storm 的单机和集群环境部署,并实现了 Java 和 Python 的简单 Topology 示例。Storm 作为一款分布式实时计算系统,能够处理大量的实时数据流,广泛应用于实时数据分析、监控和处理场景。
部署过程中的注意事项
- ZooKeeper 配置:确保 Storm 集群能够正常连接 ZooKeeper。
- 资源管理:根据实际应用场景合理配置内存和 CPU 资源,避免资源浪费或不足。
- 网络配置:确保所有节点之间的网络连接正常,端口开放。
- 监控和日志管理:配置监控工具和日志分析工具,及时发现和处理问题,保障 Storm 集群的稳定运行。
版权归原作者 闲人编程 所有, 如有侵权,请联系我们删除。