Spark实现wordcount
在Spark中实现Word Count是一个经典的入门级示例,展示了如何处理和统计文本文件中的单词数量。下面分别给出使用Scala、Python(PySpark)和Java编写的Word Count程序示例。
Scala
import org.apache.spark.sql.SparkSession
object WordCountScala {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("WordCount Example").getOrCreate()
val textFile = spark.sparkContext.textFile("hdfs://localhost:9000/user/input/text.txt") // 替换为你的文件路径
val counts = textFile.flatMap(line => line.split("\\W+"))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://localhost:9000/user/output/wordcount") // 输出路径
spark.stop()
}
}
Python (PySpark)
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.appName("WordCount Example").getOrCreate()
text_file = spark.sparkContext.textFile("hdfs://localhost:9000/user/input/text.txt") # 替换为你的文件路径
counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://localhost:9000/user/output/wordcount") # 输出路径
spark.stop()
Java
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class WordCountJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCount Example");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFile = sc.textFile("hdfs://localhost:9000/user/input/text.txt"); // 替换为你的文件路径
JavaPairRDD<String, Integer> counts = textFile.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("hdfs://localhost:9000/user/output/wordcount"); // 输出路径
sc.stop();
}
}
这些示例中,我们首先创建一个SparkSession或SparkContext,然后读取指定路径的文本文件。接着,通过一系列转换操作(flatMap将每行文本分割成单词,map将每个单词映射为(word, 1)的键值对,reduceByKey则将相同单词的计数相加)实现单词计数。最后,将结果保存到指定的输出路径。请注意替换示例中的文件路径和输出路径以适应你的实际环境。
Spark Streaming怎么实现数据持久化保存?
Spark Streaming 提供了几种方法来实现数据的持久化保存,确保数据处理后的结果能够被可靠地存储和后续使用。以下是实现数据持久化的几种主要方式:
1、写入文件系统:
最直接的方法是将数据流保存到文件系统中,如HDFS、Amazon S3或其他支持的文件系统。这可以通过调用DStream的相应方法实现,例如:
- saveAsTextFiles(prefix[, suffix]):将DStream的内容以文本格式保存,每个批次的输出文件会带有时间戳的前缀和可选的后缀。
- saveAsObjectFiles(prefix[, suffix]):将DStream中的对象序列化后保存,适用于需要保存复杂对象结构的场景。
- saveAsHadoopFiles(prefix[, suffix]):按照Hadoop的文件格式保存数据,适用于需要与其他Hadoop工具兼容的场景。
2、内存持久化:
- 类似于RDD,DStream也支持将数据持久化到内存中,这可以通过调用persist()或cache()方法实现。这样可以在后续的转换操作中复用已经计算过的结果,提高处理效率。需要注意的是,DStream的默认持久化级别通常是MEMORY_AND_DISK_SER_2,这意味着数据会被序列化后存储在内存中,如果内存不足,则溢写到磁盘。
3、检查点(Checkpointing):
- 为了实现状态ful的流处理和故障恢复,Spark Streaming支持检查点机制。通过配置检查点目录,Spark会定期将流应用程序的元数据和状态信息保存到该目录下,使得在Driver进程失败时可以从检查点恢复,确保数据处理的连续性和状态的一致性。
4、使用外部存储系统:
- 对于需要长期存储或进一步分析的数据,可以设计流程将处理后的DStream数据写入到数据库(如MySQL、HBase)、NoSQL存储(如Cassandra)、消息队列(如Kafka)或其他数据仓库系统中。
5、WAL(Write Ahead Log):
- 在与某些数据源(如Kafka)集成时,启用WAL可以确保接收到的数据在内存缓冲区满后立即被写入到持久存储中,即使在接收数据的Executor失败,也能从日志中恢复数据,避免数据丢失。
通过以上方法,Spark Streaming不仅能够处理实时数据流,还能确保数据处理结果的可靠性存储和后续访问。选择哪种持久化策略取决于具体的业务需求、数据量、可用的存储资源以及对数据可靠性和性能的要求。
Spark SQL读取文件,内存不够使用,如何处理?
当在Spark SQL中读取文件时,如果内存不够使用(即出现OOM,OutOfMemoryError),通常有以下几种策略来处理这个问题:
1、增加内存:
- 如果可能的话,给Spark执行器(Executors)分配更多的内存。这可以在Spark配置中设置,例如通过spark.executor.memory。
- 如果你的集群支持,也可以增加执行器的数量,通过spark.executor.instances。
2、优化数据读取:
- 如果文件非常大,考虑只读取需要的部分数据,而不是整个数据集。这可以通过在读取时使用谓词(predicates)或分区(partitioning)来实现。
- 使用更高效的文件格式,如Parquet或ORC,它们提供了更好的压缩和编码机制,可以减少内存使用。
3、优化Spark作业:
- 减少shuffle操作,因为shuffle操作会消耗大量内存和磁盘空间。可以通过合理的分区和join操作来减少shuffle。
- 优化广播join(Broadcast Join),如果可能的话,将较小的DataFrame广播到所有节点,以减少shuffle和内存使用。
- 使用cache()或persist()缓存经常使用的DataFrame,但请注意这也会占用内存。
4、调整垃圾回收(GC)设置:
- 根据JVM的垃圾回收器(GC)和Spark作业的特点,调整GC设置。例如,使用G1GC而不是默认的CMS或ParallelGC。
- 调整GC参数,如-XX:+UseG1GC, -XX:InitiatingHeapOccupancyPercent等,以更好地适应你的工作负载。
5、使用内存管理策略:
- Spark提供了统一的内存管理模型,允许你控制内存的使用方式。通过调整spark.memory.fraction, spark.memory.storageFraction, 和 spark.memory.offHeapFraction等参数,可以影响堆内存、存储内存和堆外内存的使用。
6、优化序列化:
- 使用更高效的序列化库,如Kryo,而不是默认的Java序列化。Kryo通常可以提供更小的序列化大小和更快的性能。
7、使用列式存储:
- 如果你的数据是宽表(即有很多列),并且不是所有列都经常访问,考虑使用列式存储(如Parquet或ORC)。这样,Spark可以只读取需要的列,从而减少内存使用。
8、数据倾斜处理:
- 如果数据倾斜严重(即某些键的数据量远大于其他键),考虑使用salting技术将数据分散到多个键中,或者使用倾斜感知的join策略。
9、监控和诊断:
- 使用Spark UI和监控工具(如Ganglia, Prometheus, Grafana等)来监控Spark作业的执行和资源使用情况。
- 分析Spark作业的执行计划和日志,找出可能的性能瓶颈和内存问题。
10、考虑使用其他技术:
- 如果Spark SQL无法满足你的需求,考虑使用其他大数据技术栈或工具,如Flink, Beam, Trino(原名PrestoSQL)等。这些工具可能有不同的内存管理和优化策略,更适合你的工作负载。
Spark的lazy体现在哪里?
Spark的lazy特性主要体现在其RDD(Resilient Distributed Dataset)的转换(transformation)操作上。以下是关于Spark lazy特性的详细解释:
1、转换(Transformation)操作的延迟执行:
- Spark中的RDD支持两种类型的操作:转换(transformation)和行动(action)。转换操作会针对已有的RDD创建一个新的RDD,但它本身并不会立即执行,而是具有lazy特性。这意味着,当你定义了一个或多个转换操作时,Spark并不会立即开始计算这些操作的结果。
- 常见的转换操作包括map(), filter(), flatMap(), reduceByKey(), groupByKey()等。这些操作会返回一个新的RDD,但并不会触发实际的计算。
2、行动(Action)操作触发计算:
- 只有当执行了行动操作时,Spark才会开始从输入数据源读取数据,并执行之前定义的所有转换操作。行动操作会将结果返回给Driver程序,或者写入外部存储系统。
- 常见的行动操作包括collect(), count(), reduce(), saveAsTextFile()等。这些操作会触发Spark作业的执行,从而触发之前所有转换操作的执行。
3、优化和容错:
- Spark的lazy特性使得它能够进行底层的优化,例如通过合并多个转换操作来减少中间RDD的创建和存储,从而提高计算效率。
- 此外,lazy特性也使得Spark能够支持容错。由于转换操作是延迟执行的,如果某个节点在计算过程中失败,Spark可以重新计算丢失的分区,而不是重新计算整个RDD。
4、示例:
- 假设你有一个RDD lines,你对其执行了filter()和map()两个转换操作,得到了一个新的RDD filteredLines。此时,Spark并不会立即开始计算这两个操作的结果。只有当你对filteredLines执行了一个行动操作(如collect())时,Spark才会开始从输入数据源读取lines的数据,并执行filter()和map()操作,最终返回结果。
总结来说,Spark的lazy特性主要体现在其RDD的转换操作上。这些操作在定义时并不会立即执行,而是会延迟到执行行动操作时才开始计算。这种特性使得Spark能够进行底层的优化和容错处理,提高了计算效率和可靠性。
Spark中的并行度等于什么
在Apache Spark中,并行度(Parallelism)是指在执行Spark作业时,能够同时运行的任务(Tasks)数量。这直接影响到Spark作业的执行效率和资源利用。并行度的概念体现在多个层面,包括但不限于:
1、RDD(弹性分布式数据集)的分区:RDD是Spark处理数据的基本单位,其分区数量决定了数据处理时的基本并行单元。每个分区都会对应一个Task,因此增加RDD的分区数量可以提高并行度。
2、Stage内的Task数量:Spark作业会被DAG Scheduler分解为多个Stage,每个Stage又包含若干个Task。Stage内的Task数量直接反映了该阶段的并行度。
3、Spark作业的整体并行度:这是所有Stage中Task数量的总和,体现了整个作业的并行处理能力。
4、配置参数:并行度可以通过配置参数来控制,最直接相关的配置项是spark.default.parallelism,这个参数如果没有显式设置,默认值会根据运行模式有所不同,例如在本地模式下默认为当前节点的CPU核心数,在分布式模式下可能是集群中所有CPU核心数的某个倍数,以确保充分利用资源。
5、手动设置:在创建RDD、执行行动操作(Action)或在Spark SQL中,用户可以手动指定分区数或并行度,以更精细地控制并行计算的程度。
总结来说,Spark中的并行度是一个动态的概念,它与RDD的分区数、作业划分的Stage和Task数量、以及配置参数紧密相关,通过合理的设置可以优化执行效率,避免资源浪费或过度竞争。
Spark运行时并行度的设署
Spark运行时并行度的设置对于充分利用集群资源、提高处理效率至关重要。以下是关于Spark并行度设置的详细解释和参考信息:
1. 并行度的概念
- Spark的并行度指的是在集群中同时执行任务的数量,也可以理解为同时处理的分区数量。
- 并行度的高低直接影响到Spark作业的执行效率和资源利用率。
2. 并行度的设置方式
** 2.1 在代码中设置**
- 在创建RDD时,可以使用parallelize()方法并传入一个数字作为参数来指定并行度。例如:sc.parallelize(data, 4),表示将数据分成4个分区进行并行处理。
- 在调用transformation操作时,如repartition()或coalesce(),也可以指定新的并行度。例如:rdd.repartition(8),表示将RDD重新分为8个分区。
** 2.2 在配置文件中设置**
在spark-defaults.conf配置文件中,可以设置全局的默认并行度。通过spark.default.parallelism属性来指定。例如:spark.default.parallelism 100。
2.3 在提交程序的客户端参数中设置
在使用spark-submit提交作业时,可以通过--conf参数来设置spark.default.parallelism。例如:bin/spark-submit --conf "spark.default.parallelism=100" 。
3. 并行度设置的建议
- 根据集群资源调整:最好的情况是根据实际的集群资源情况来调整并行度,以获得最佳的性能。如果集群资源充足且任务复杂,可以增加并行度;如果资源有限或任务简单,可以减少并行度。
- 设置为CPU总核心的2~10倍:一个常见的建议是将并行度设置为集群中CPU总核心的2到10倍。例如,如果集群有100个CPU核心,那么建议的并行度范围是200到1000。
- 考虑任务特性:对于某些特殊的任务,如reduceByKey等会发生shuffle的操作,可以使用并行度最大的父RDD的并行度。
- 避免资源浪费:如果设置的并行度过高,可能会导致某些CPU核心空闲,造成资源浪费。因此,需要根据实际情况进行调整。
4. 注意事项
- 在设置并行度时,要确保是CPU核心的整数倍,这有助于更有效地利用集群资源。
- 并行度的设置需要根据具体的应用场景和集群环境进行调整,没有固定的最佳值。
- 在调整并行度时,建议通过监控任务运行情况和资源利用率来评估性能,并根据需要进行调整。
Spark SQL的数据倾斜
Spark SQL中的数据倾斜是一个常见的问题,它指的是在数据处理过程中,部分数据分布不均匀,导致某些任务的处理时间明显长于其他任务,从而影响整体性能。以下是对Spark SQL数据倾斜问题的详细解释和解决方案:
数据倾斜的原因
** 1) 数据问题:**
- Key本身分布不均匀,包括大量的key为空或key的设置不合理。
- 无效数据、大量重复的测试数据或是对结果影响不大的有效数据也可能导致数据倾斜。
** 2) Spark使用问题:**
- Shuffle时的并发度不够,如spark.sql.shuffle.partitions参数设置过小,导致数据倾斜。
- 计算方式有误,例如不恰当的join操作可能导致数据倾斜。
数据倾斜的解决方案
** 1) 随机打散:**
使用repartition或coalesce方法将数据随机打乱,使数据分布更加均匀。
** 2) 增加分区数:**
通过增加分区数,使得数据能够更加均匀地分布在不同的分区中。可以使用repartition方法增加分区数。
** 3) 聚合合并:**
如果数据倾斜的原因是某个key对应的数据量过大,可以将倾斜的key进行聚合合并,减少数据量。可以使用groupBy和aggregate等方法进行聚合操作。
** 4) 使用随机前缀:**
对于某些导致数据倾斜的key,可以在key值前面添加随机前缀,使得数据在处理过程中更加均匀分布。可以使用spark.sql.functions.rand函数生成随机前缀。
** 5) 数据重分布:**
将倾斜的数据拆分成多个小文件,然后重新分配到不同的分区中。可以使用repartition方法进行数据重分布。
** 6) 避免shuffle过程:**
如果Spark作业的数据来源于Hive表,可以先在Hive表中对数据进行聚合,之后只进行map操作,避免shuffle操作。
** 7) 提高shuffle并行度:**
设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200,可以根据实际情况进行调整。
** 8) 使用随机key实现双重聚合:**
在使用类似于groupByKey、reduceByKey这样的算子时,可以通过map算子给每个数据的key添加随机数前缀,进行第一次聚合,然后去除前缀进行第二次聚合。
** 9) 将reduce join转换为map join:**
如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,避免shuffle操作。
** 10) sample采样对倾斜key单独进行join:**
通过sample采样找出倾斜的key,然后对这些key单独进行join操作,最后与正常数据进行union。
总结
解决Spark SQL数据倾斜的根本方法是通过优化数据模型,尽量避免数据倾斜的发生。这包括使用合适的数据结构、优化数据分布等方法。在实际应用中,可以根据具体情况选择合适的方法来解决数据倾斜的问题。
Spark的exactly-once
在Apache Spark中,"Exactly-Once"语义指的是在数据处理过程中,每个数据记录恰好被处理一次,不多也不少,即使在出现故障和重新处理的情况下也是如此。这对于需要高一致性的应用场景(如金融交易、计费系统)至关重要。Spark提供了几种机制来支持Exactly-Once语义,尤其是在Structured Streaming和Spark Streaming中:
Structured Streaming
1、使用事务日志和幂等写入:Structured Streaming 支持与外部系统(如Kafka、HDFS等)集成时实现Exactly-Once。它通过 checkpoint 机制维护一个事务日志,跟踪哪些数据已经被处理。同时,与支持事务的外部存储(如支持事务的数据库、HDFS上的事务日志)配合,确保写入操作是幂等的,即多次执行同一操作对系统的影响和执行一次相同。
2、Watermark机制:Structured Streaming 引入了Watermark概念来处理事件时间(Event Time),并确保在处理延迟数据时不违反Exactly-Once原则。Watermarks帮助系统识别哪些数据是“迟到”的,从而在不影响结果正确性的情况下处理窗口聚合和其他时间相关的操作。
3、Kafka Exactly-Once支持:从Spark 2.3开始,Structured Streaming与Apache Kafka集成时可以实现端到端的Exactly-Once语义,这依赖于Kafka的事务功能和Spark的checkpoint机制。
Spark Streaming
在早期版本的Spark Streaming中,Exactly-Once语义较难保证,特别是当涉及状态更新和输出到外部系统时。但通过与外部系统的特定配置和一些额外的逻辑(比如使用Kafka的offset管理),可以尽量接近这一目标。
注意事项
- 依赖外部系统特性:实现Exactly-Once通常需要外部数据源和接收端都支持事务或幂等写入。
- 资源消耗:追求Exactly-Once语义可能会增加系统的复杂度和资源消耗,特别是在处理大量数据和高吞吐量的场景下。
- 性能考量:在某些情况下,At-Least-Once(至少一次处理)或At-Most-Once(最多一次处理)的语义可能因其实现简单且性能开销小而被优先考虑。
总之,Spark通过不断演进的Structured Streaming框架和与外部系统的深度集成,提供了强大的支持来实现端到端的Exactly-Once语义,满足了现代大数据处理中对数据准确性和一致性的严格要求。
Spark的RDD和partition的联系
1、数据切分与并行处理
RDD中的数据按照一定的逻辑(如HDFS的block大小、数据库查询的range等)被切分成多个partition。
每个partition对应一个task,可以在集群中的一个或多个节点上并行执行。
因此,RDD的partition数量决定了Spark作业的并行度,影响着处理数据的效率和资源利用率。
2、Partition的数量与设置
Partition的数量可以在创建RDD时通过参数指定,如sc.textFile("path", numPartitions)中的numPartitions参数。
如果不指定partition数量,Spark会根据数据的特性和集群的配置自动计算partition数量。例如,在读取HDFS文件时,Spark会尽量让partition的数量与文件的block数量相同,以便更好地利用HDFS的存储特性。
3、Partition与数据局部性
Spark在调度任务时会考虑数据局部性,即尽量让处理某个partition的任务在存储该partition数据的节点上执行,以减少数据传输的开销。
这通过Partition的getPreferredLocations方法实现,该方法返回一个节点列表,表示处理该partition的优选节点。
4、Partition与故障恢复
由于RDD是不可变的,一旦创建就不能被修改。因此,当某个partition因为节点故障而丢失时,Spark可以根据RDD的依赖关系(Dependency)重新计算该partition,从而实现故障恢复。
总结
Spark的RDD和partition是紧密相关的概念。RDD代表了分布式的数据集合,而partition则是RDD中数据的切分单元,用于实现数据的并行处理。Partition的数量和设置影响着Spark作业的并行度和处理效率,而Partition的数据局部性和故障恢复能力则保证了Spark作业的稳定性和可靠性。在编写Spark作业时,需要根据数据的特性和集群的配置来合理设置partition的数量和分布,以获得最佳的性能和效果。
Spark 3.0特性
Apache Spark 3.0是在2020年发布的一个重要版本,它引入了许多新特性、性能改进和API增强,旨在提升用户体验、优化执行效率以及增强机器学习和SQL功能。以下是Spark 3.0的一些关键特性:
1、Adaptive Query Execution (AQE):这是Spark SQL中最显著的改进之一,AQE能够在查询执行期间动态调整执行计划,包括合并小任务、重新分区数据和选择更适合的执行策略,从而无需用户手动调优就能显著提升查询性能。
2、Sub-Query Pruning:优化了SQL查询解析器,能够智能地剪枝不必要的子查询,减少不必要的计算,提升查询效率。
3、Dynamic Partition Pruning for Joins:在JOIN操作中引入了动态分区剪枝,进一步减少了不必要的数据扫描,提高了JOIN操作的效率。
4、Enhanced Pandas UDFs (Vectorized UDFs):在PySpark中增强了Pandas UDFs,引入了新的矢量化UDF,能够更高效地处理数据,特别是在处理大规模数据集时,通过减少Python和JVM之间的数据转换,显著提升了性能。
5、Better Support for Continuous Processing in Structured Streaming:改进了Structured Streaming的连续处理模式,增强了对事件时间(event time)和 watermark的支持,提高了处理延迟数据的能力,更易于实现端到端的Exactly-Once语义。
6、SQL ANSI Compliance Improvements:增加了对更多SQL标准的支持,包括INTERSECT, EXCEPT, MERGE INTO等操作,提高了与传统数据库系统的兼容性。
7、Native Kubernetes Support:Spark 3.0原生支持Kubernetes作为资源管理器,简化了在Kubernetes集群上部署和管理Spark应用的过程,提供了更好的容器化支持。
8、MLlib Performance and API Enhancements:对MLlib进行了多项性能优化,并引入了一些新的机器学习算法和模型评估工具,以及更易用的API。
9、Scala 2.12 Support:Spark 3.0开始支持Scala 2.12,同时保持对Scala 2.11的兼容,允许用户利用Scala最新版本的功能。
10、Improved Memory Management:优化了内存管理,减少了内存溢出的风险,特别是在处理大内存需求任务时。
这些改进和新特性使得Spark 3.0成为一个更强大、更灵活、更易用的大数据处理平台,满足了从数据工程到高级分析的各种需求。
Spark计算的灵活性体现在哪里
1、多种数据源支持:
Spark支持从多种数据源读取和写入数据,如HDFS、HBase、Cassandra、Kafka、JDBC、Parquet、ORC等。这种广泛的数据源支持使得Spark能够轻松集成到现有的大数据生态系统中。
2、丰富的API:
Spark提供了多种编程API,包括Scala、Java、Python和R。这些API为开发者提供了丰富的功能和灵活的编程方式,使得开发者能够使用自己熟悉的语言进行数据处理和分析。
3、多种计算模式:
Spark支持批处理(Spark Core)、流处理(Spark Streaming)、交互式查询(Spark SQL)、图计算(GraphX)和机器学习(MLlib)等多种计算模式。这种多模式支持使得Spark能够处理各种类型的数据和满足不同的业务需求。
4、高度可定制性:
Spark允许开发者通过自定义RDD(弹性分布式数据集)操作、自定义转换(transformations)和动作(actions)来定义自己的数据处理逻辑。这种灵活性使得Spark能够处理复杂的数据处理任务。
5、优化器:
Spark拥有一个强大的Catalyst优化器,它能够在查询执行之前对查询计划进行优化,以提高查询性能。Catalyst优化器采用基于规则的优化和基于成本的优化技术,能够自动选择最优的执行计划。
6、动态资源调度:
Spark使用YARN、Mesos或Kubernetes等集群管理器进行资源调度,可以根据作业的需求动态分配和释放资源。这种动态资源调度机制使得Spark能够充分利用集群资源,提高资源利用率。
7、容错性:
Spark具有强大的容错性,能够在节点故障时自动恢复作业的执行。Spark使用RDD的不可变性和lineage(血统)信息来重新计算丢失的分区,确保数据的完整性和一致性。
8、集成性和扩展性:
Spark能够与Hadoop生态系统中的其他组件(如HDFS、YARN等)无缝集成,使得开发者能够充分利用现有的Hadoop基础设施。此外,Spark还提供了丰富的扩展点,使得开发者能够根据自己的需求定制Spark的功能。
9、交互式查询:
Spark SQL提供了类SQL的查询语言,使得开发者能够使用SQL语句进行交互式查询。这种交互式查询方式使得数据分析师和数据科学家能够更快地获取数据洞察和发现数据中的价值。
引用:https://www.nowcoder.com/discuss/353159520220291072
通义千问、文心一言
版权归原作者 小的~~ 所有, 如有侵权,请联系我们删除。