最近要弄一个java调用kettle的代码,查找网上的例子有很多的jar包下不下来,弄下来了各种报错,花了一点时间趟平了坑。临近新年,最后祝各位新年快乐!
成功示例
确认版本
依赖的jar包以pom的形式引入,有诸多版本,如果与kettle的版本不匹配则会调用失败。因为在java代码里会初始化插件来执行任务,查看插件版本9.0.0.0-423.
引入依赖
仓库镜像地址:
<repository>
<id>pentaho-public</id>
<name>Pentaho Public</name>
<url>https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/</url>
<releases>
<enabled>true</enabled>
<updatePolicy>daily</updatePolicy>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>interval:15</updatePolicy>
</snapshots>
</repository>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>9.0.0.0-423</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-dbdialog</artifactId>
<version>9.0.0.0-423</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>9.0.0.0-423</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>metastore</artifactId>
<version>9.0.0.0-423</version>
</dependency>
以前的pentao镜像地址已经打不开了,https://nexus.pentaho.org/content/groups/omni/ ,估计以后也不会恢复了,所以使用新地址https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/
java源代码
可以执行转换和任务,特别注意到pluginsPath需要修改到pentao的插件目录 x:\xx\data-integration\plugins
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginFolder;
import org.pentaho.di.core.plugins.PluginFolderInterface;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.io.File;
import java.util.List;
/**
* kettle服务
* @author
*/
public class KettleTool {
private static String username = "xxx";
private static String password = "xxx";
private static String pluginsPath = "D:\\workspace\\data-integration\\data-integration\\plugins";
/**
* 加载kettle插件
*/
static {
File dir = new File(pluginsPath);
File[] files = dir.listFiles();
List<PluginFolderInterface> pluginFolders = StepPluginType.getInstance().getPluginFolders();
for (File file: files ) {
pluginFolders.add(new PluginFolder(pluginsPath + "\\" + file.getName() , false , true));
}
}
/**
* 数据库连接元对象
* (kettle数据库连接名称(KETTLE工具右上角显示),资源库类型,连接方式,IP,数据库名,端口,用户名,密码) //cgmRepositoryConn
*/
private static DatabaseMeta getDatabaseMeta(){
DatabaseMeta databaseMeta = new DatabaseMeta("123", "MYSQL", "Native(JDBC)", "127.0.0.1",
"kettle_repo", "3306", "root", "123456");
// 关闭mysql推荐SSL连接提示
databaseMeta.addExtraOption("MYSQL" , "useSSL" , "false");
return databaseMeta;
}
public static void main(String[] args) throws Exception {
executeTrans("/ETL_TEST" , "ETL_TABLE_SINGLE_TEST");
}
/**
* 执行转换
* @param folder
* @param transId
* @throws KettleException
*/
public static void executeTrans(String folder, String transId) throws KettleException{
KettleDatabaseRepository repository = RepositoryCon();
// 获取目录
RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree().findDirectory(folder);
// 根据作业名称获取作业id,加载作业
ObjectId id = repository.getTransformationID(transId , directory);
TransMeta transMeta = repository.loadTransformation(id, null);
Trans trans = new Trans(transMeta);
String[] arguments = new String[2];
trans.execute(arguments);
trans.waitUntilFinished();
}
/**
* 执行任务
*
* @param folder
* @param jobId
* @throws KettleException
*/
public static void executeTask(String folder, String jobId) throws KettleException {
KettleDatabaseRepository repository = RepositoryCon();
// 获取目录
RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree().findDirectory(folder);
// 根据作业名称获取作业id
ObjectId id = repository.getJobId(jobId, directory);
// 加载作业
JobMeta jobMeta = repository.loadJob(id, null);
Job job = new Job(repository, jobMeta);
// 执行作业
job.run();
// 等待作业执行完毕
job.waitUntilFinished();
}
/**
* * 连接到资源库
*/
private static KettleDatabaseRepository RepositoryCon() throws KettleException {
// 初始化环境
if (!KettleEnvironment.isInitialized()) {
try {
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
}
// 数据库形式的资源库元对象
KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
kettleDatabaseRepositoryMeta.setConnection(getDatabaseMeta());
// 数据库形式的资源库对象
KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
// 用资源库元对象初始化资源库对象
kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
// 连接到资源库 , 默认的连接资源库的用户名和密码
kettleDatabaseRepository.connect(username, password);
if (kettleDatabaseRepository.isConnected()) {
System.out.println("连接成功");
return kettleDatabaseRepository;
} else {
System.out.println("连接失败");
return null;
}
}
}
版权归原作者 汤圆不吃鱼 所有, 如有侵权,请联系我们删除。