0


2023年JAVA集成调用Kettle示例

最近要弄一个java调用kettle的代码,查找网上的例子有很多的jar包下不下来,弄下来了各种报错,花了一点时间趟平了坑。临近新年,最后祝各位新年快乐!

成功示例

确认版本

依赖的jar包以pom的形式引入,有诸多版本,如果与kettle的版本不匹配则会调用失败。因为在java代码里会初始化插件来执行任务,查看插件版本9.0.0.0-423.

引入依赖

仓库镜像地址:
<repository>
    <id>pentaho-public</id>
    <name>Pentaho Public</name>
    <url>https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/</url>
    <releases>
        <enabled>true</enabled>
        <updatePolicy>daily</updatePolicy>
    </releases>
    <snapshots>
        <enabled>true</enabled>
        <updatePolicy>interval:15</updatePolicy>
    </snapshots>
</repository>
<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>kettle-core</artifactId>
    <version>9.0.0.0-423</version>
</dependency>
<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>kettle-dbdialog</artifactId>
    <version>9.0.0.0-423</version>
</dependency>

<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>kettle-engine</artifactId>
    <version>9.0.0.0-423</version>
</dependency>

<dependency>
    <groupId>pentaho</groupId>
    <artifactId>metastore</artifactId>
    <version>9.0.0.0-423</version>
</dependency>

以前的pentao镜像地址已经打不开了,https://nexus.pentaho.org/content/groups/omni/ ,估计以后也不会恢复了,所以使用新地址https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/

java源代码

可以执行转换和任务,特别注意到pluginsPath需要修改到pentao的插件目录 x:\xx\data-integration\plugins

import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginFolder;
import org.pentaho.di.core.plugins.PluginFolderInterface;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

import java.io.File;
import java.util.List;

/**
 * kettle服务
 * @author 
 */
public class KettleTool {

    private static String username = "xxx";
    private static String password = "xxx";
    private static String pluginsPath = "D:\\workspace\\data-integration\\data-integration\\plugins";

    /**
     * 加载kettle插件
     */
    static {
        File dir = new File(pluginsPath);
        File[] files = dir.listFiles();
        List<PluginFolderInterface> pluginFolders = StepPluginType.getInstance().getPluginFolders();
        for (File file: files ) {
            pluginFolders.add(new PluginFolder(pluginsPath + "\\" + file.getName() , false , true));
        }
    }

    /**
     * 数据库连接元对象
     * (kettle数据库连接名称(KETTLE工具右上角显示),资源库类型,连接方式,IP,数据库名,端口,用户名,密码) //cgmRepositoryConn
     */
    private static DatabaseMeta getDatabaseMeta(){
        DatabaseMeta databaseMeta = new DatabaseMeta("123", "MYSQL", "Native(JDBC)", "127.0.0.1",
                "kettle_repo", "3306", "root", "123456");
        // 关闭mysql推荐SSL连接提示
        databaseMeta.addExtraOption("MYSQL" , "useSSL" , "false");
        return databaseMeta;
    }

    public static void main(String[] args) throws Exception {
        executeTrans("/ETL_TEST" , "ETL_TABLE_SINGLE_TEST");
    }

    /**
     * 执行转换
     * @param folder
     * @param transId
     * @throws KettleException
     */
    public static void executeTrans(String folder, String transId) throws KettleException{
        KettleDatabaseRepository repository = RepositoryCon();
        // 获取目录
        RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree().findDirectory(folder);
        // 根据作业名称获取作业id,加载作业
        ObjectId id = repository.getTransformationID(transId , directory);
        TransMeta transMeta = repository.loadTransformation(id, null);
        Trans trans = new Trans(transMeta);
        String[] arguments = new String[2];
        trans.execute(arguments);
        trans.waitUntilFinished();
    }

    /**
     * 执行任务
     *
     * @param folder
     * @param jobId
     * @throws KettleException
     */
    public static void executeTask(String folder, String jobId) throws KettleException {
        KettleDatabaseRepository repository = RepositoryCon();
        // 获取目录
        RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree().findDirectory(folder);
        // 根据作业名称获取作业id
        ObjectId id = repository.getJobId(jobId, directory);
        // 加载作业
        JobMeta jobMeta = repository.loadJob(id, null);
        Job job = new Job(repository, jobMeta);
        // 执行作业
        job.run();
        // 等待作业执行完毕
        job.waitUntilFinished();
    }

    /**
     * * 连接到资源库
     */
    private static KettleDatabaseRepository RepositoryCon() throws KettleException {

        // 初始化环境
        if (!KettleEnvironment.isInitialized()) {
            try {
                KettleEnvironment.init();
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        // 数据库形式的资源库元对象
        KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
        kettleDatabaseRepositoryMeta.setConnection(getDatabaseMeta());
        // 数据库形式的资源库对象
        KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
        // 用资源库元对象初始化资源库对象
        kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
        // 连接到资源库 , 默认的连接资源库的用户名和密码
        kettleDatabaseRepository.connect(username, password);
        if (kettleDatabaseRepository.isConnected()) {
            System.out.println("连接成功");
            return kettleDatabaseRepository;
        } else {
            System.out.println("连接失败");
            return null;
        }
    }

}

本文转载自: https://blog.csdn.net/weixin_38820750/article/details/128625518
版权归原作者 汤圆不吃鱼 所有, 如有侵权,请联系我们删除。

“2023年JAVA集成调用Kettle示例”的评论:

还没有评论