脚本任务实现:Shell和Python

v1.7
xueli.xue 8 years ago
parent 7a129468a4
commit 935cc1276f

@ -193,17 +193,15 @@ public class DemoGlueJobHandler extends IJobHandler {
<textarea class="glueSource_shell" style="display:none;" > <textarea class="glueSource_shell" style="display:none;" >
#!/bin/bash #!/bin/bash
echo hello shell echo "xxl-job: hello shell"
for x in 1 2 3 4 for item in 1 2 3
do do
echo number=$x echo "shell : $item"
sleep 1s sleep 1s
done done
echo1 111 echo "Good bye!"
printf 666
echo2 222
</textarea> </textarea>
<textarea class="glueSource_python" style="display:none;" > <textarea class="glueSource_python" style="display:none;" >
#!/usr/bin/python #!/usr/bin/python
@ -214,11 +212,11 @@ import time
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logging.info('hello python') logging.info('xxl-job: hello python')
for num in range(0, 3): for num in range(1, 3):
time.sleep(1) time.sleep(1)
logging.info(' :' + str(num) ) logging.info('python :' + str(num) )
logging.info('Good bye!') logging.info('Good bye!')
</textarea> </textarea>

@ -9,15 +9,12 @@ import com.xxl.job.core.glue.GlueFactory;
import com.xxl.job.core.glue.GlueTypeEnum; import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.impl.GlueJobHandler; import com.xxl.job.core.handler.impl.GlueJobHandler;
import com.xxl.job.core.handler.impl.ScriptJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.util.ScriptUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Date; import java.util.Date;
/** /**
@ -85,11 +82,11 @@ public class ExecutorBizImpl implements ExecutorBiz {
} else if (GlueTypeEnum.GLUE_GROOVY==GlueTypeEnum.match(triggerParam.getGlueType())) { } else if (GlueTypeEnum.GLUE_GROOVY==GlueTypeEnum.match(triggerParam.getGlueType())) {
// valid exists job threadchange handler or glue timeout, need kill old thread // valid exists job threadchange handler or gluesource updated, need kill old thread
if (jobThread != null && if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler !(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change glue model or glue timeout, kill old job thread // change glue model or gluesource updated, kill old job thread
jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程");
jobThread.interrupt(); jobThread.interrupt();
XxlJobExecutor.removeJobThread(triggerParam.getJobId()); XxlJobExecutor.removeJobThread(triggerParam.getJobId());
@ -107,61 +104,25 @@ public class ExecutorBizImpl implements ExecutorBiz {
} }
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(jobHandler, triggerParam.getGlueUpdatetime())); jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(jobHandler, triggerParam.getGlueUpdatetime()));
} }
} else if (GlueTypeEnum.GLUE_SHELL==GlueTypeEnum.match(triggerParam.getGlueType())) { } else if (GlueTypeEnum.GLUE_SHELL==GlueTypeEnum.match(triggerParam.getGlueType())
|| GlueTypeEnum.GLUE_PYTHON==GlueTypeEnum.match(triggerParam.getGlueType()) ) {
// make path
String scriptPath = XxlJobFileAppender.filePath + "gluesource/";
String scriptFileName = triggerParam.getJobId() + "_" + triggerParam.getGlueUpdatetime() + ".sh";
// valid file
File scriptFile = new File(scriptPath, scriptFileName);
if (!scriptFile.exists()) {
// valid glue source
if (triggerParam.getGlueSource()==null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueSource is null.");
}
// .../gluesource/
File scriptPathDir = new File(scriptPath);
if (!scriptPathDir.exists()) {
scriptPathDir.mkdirs();
}
// .../gluesource/666-156465656.sh
scriptFile = new File(scriptPath, scriptFileName);
FileOutputStream fos = null;
try {
scriptFile.createNewFile();
fos = new FileOutputStream(scriptFile, true);
fos.write(triggerParam.getGlueSource().getBytes("utf-8"));
fos.flush();
} catch (IOException e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
// valid exists job threadchange script or gluesource updated, need kill old thread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change glue model or gluesource updated, kill old job thread
jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程");
jobThread.interrupt();
XxlJobExecutor.removeJobThread(triggerParam.getJobId());
jobThread = null;
} }
// log File // make thread: new or exists invalid
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); if (jobThread == null) {
ScriptJobHandler scriptJobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
// run script jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), scriptJobHandler);
ScriptUtil.execToFile("python", scriptFile.getName(), (XxlJobFileAppender.filePath + logFileName) ); }
return ReturnT.FAIL;
} else if (GlueTypeEnum.GLUE_PYTHON==GlueTypeEnum.match(triggerParam.getGlueType())) {
String scriptFilePath = XxlJobFileAppender.filePath + "gluesource/" + triggerParam.getJobId() + "_" + triggerParam.getGlueUpdatetime() + ".py";
} else { } else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
} }

@ -0,0 +1,55 @@
package com.xxl.job.core.handler.impl;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.ScriptUtil;
/**
* Created by xuxueli on 17/4/27.
*/
public class ScriptJobHandler extends IJobHandler {
private int jobId;
private long glueUpdatetime;
private String gluesource;
private GlueTypeEnum glueType;
public ScriptJobHandler(int jobId, long glueUpdatetime, String gluesource, GlueTypeEnum glueType){
this.jobId = jobId;
this.glueUpdatetime = glueUpdatetime;
this.gluesource = gluesource;
this.glueType = glueType;
}
public long getGlueUpdatetime() {
return glueUpdatetime;
}
@Override
public ReturnT<String> execute(String... params) throws Exception {
// cmd + script-file-name
String cmd = "bash";
String scriptFileName = null;
if (GlueTypeEnum.GLUE_SHELL == glueType) {
cmd = "bash";
scriptFileName = XxlJobFileAppender.filePath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".sh");
} else if (GlueTypeEnum.GLUE_PYTHON == glueType) {
cmd = "python";
scriptFileName = XxlJobFileAppender.filePath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".py");
}
// make script file
ScriptUtil.markScriptFile(scriptFileName, gluesource);
// log file
String logFileName = XxlJobFileAppender.filePath.concat(XxlJobFileAppender.contextHolder.get());
// invoke
ScriptUtil.execToFile(cmd, scriptFileName, logFileName);
return ReturnT.SUCCESS;
}
}

@ -1,49 +1,57 @@
package com.xxl.job.core.util; package com.xxl.job.core.util;
import com.xxl.job.core.log.XxlJobFileAppender;
import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.exec.PumpStreamHandler;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException;
/** /**
* 1"PythonInterpreter"使java"Runtime.getRuntime().exec()"(shellpython) * 1"PythonInterpreter"使java"Runtime.getRuntime().exec()"(shellpython)
* 2javaPATH * 2javaPATH
* 3 * 3
* * 4python Logloggingprinf
*
* 1[>>logfile 2>&1]21logfile
* 2python Logloggingprinf
* *
* Created by xuxueli on 17/2/25. * Created by xuxueli on 17/2/25.
*/ */
public class ScriptUtil { public class ScriptUtil {
private static String pyCmd = "python"; /**
private static String shllCmd = "bash"; * make script file
private static String pyFile = "/Users/xuxueli/workspaces/idea-git-workspace/github/xxl-incubator/xxl-util/src/main/resources/script/pytest.py"; *
private static String shellFile = "/Users/xuxueli/workspaces/idea-git-workspace/github/xxl-incubator/xxl-util/src/main/resources/script/shelltest.sh"; * @param scriptFileName
private static String pyLogFile = "/Users/xuxueli/Downloads/tmp/pylog.log"; * @param content
private static String shLogFile = "/Users/xuxueli/Downloads/tmp/shlog.log"; * @throws IOException
*/
public static void main(String[] args) { public static void markScriptFile(String scriptFileName, String content) throws IOException {
// filePath/
String command = pyCmd; File filePathDir = new File(XxlJobFileAppender.filePath);
String filename = pyFile; if (!filePathDir.exists()) {
String logFile = pyLogFile; filePathDir.mkdirs();
if (false) {
command = shllCmd;
filename = shellFile;
logFile = shLogFile;
} }
execToFile(command, filename, logFile); // filePath/gluesource/
File filePathSourceDir = new File(filePathDir, "gluesource");
} if (!filePathSourceDir.exists()) {
filePathSourceDir.mkdirs();
}
public static File markScriptFile(){ // make file, filePath/gluesource/666-123456789.py
return null; FileOutputStream fileOutputStream = null;
try {
fileOutputStream = new FileOutputStream(scriptFileName);
fileOutputStream.write(content.getBytes("UTF-8"));
fileOutputStream.close();
} catch (Exception e) {
throw e;
}finally{
if(fileOutputStream != null){
fileOutputStream.close();
}
}
} }
/** /**
@ -58,29 +66,22 @@ public class ScriptUtil {
* @param scriptFile * @param scriptFile
* @param logFile * @param logFile
*/ */
public static void execToFile(String command, String scriptFile, String logFile){ public static void execToFile(String command, String scriptFile, String logFile) throws IOException {
try { // 标准输出print null if watchdog timeout
// 标准输出print null if watchdog timeout // 错误输出logging + 异常 still exists if watchdog timeout
// 错误输出logging + 异常 still exists if watchdog timeout // 标准输入
// 标准输出 FileOutputStream fileOutputStream = new FileOutputStream(logFile, true);
FileOutputStream fileOutputStream = new FileOutputStream(logFile); PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null);
PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null);
// command // command
CommandLine commandline = new CommandLine(command); CommandLine commandline = new CommandLine(command);
commandline.addArgument(scriptFile); commandline.addArgument(scriptFile);
// exec // exec
DefaultExecutor exec = new DefaultExecutor(); DefaultExecutor exec = new DefaultExecutor();
exec.setExitValues(null); exec.setExitValues(null);
exec.setStreamHandler(streamHandler); exec.setStreamHandler(streamHandler);
int exitValue = exec.execute(commandline); int exitValue = exec.execute(commandline);
} catch (Exception e) {
e.printStackTrace();
}
/*Process process = Runtime.getRuntime().exec(cmdarray);
IOUtils.copy(process.getInputStream(), out);
IOUtils.copy(process.getErrorStream(), out);*/
} }
} }

Loading…
Cancel
Save