diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/Dockerfile b/dataflow-website/recipes/polyglot/polyglot-python-task/Dockerfile new file mode 100644 index 0000000..781df0c --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-task/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.7.3-slim +RUN apt-get update +RUN apt-get install build-essential -y +RUN apt-get install default-libmysqlclient-dev -y +RUN pip install mysqlclient +RUN pip install sqlalchemy +ADD python_task.py / +ADD util/* /util/ +ENTRYPOINT ["python","/python_task.py"] +CMD [] diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/README.md b/dataflow-website/recipes/polyglot/polyglot-python-task/README.md new file mode 100644 index 0000000..8947c3d --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-task/README.md @@ -0,0 +1,21 @@ + +Build docker image and push to Docker Hub. +```bash +docker build -t tzolov/python_task_with_status:0.1 . +docker push tzolov/python_task_with_status:0.1 +``` +NOTE: replace `tzolov` with your docker hub prefix. + +Register the docker image as SCDF `task` application: +```bash +dataflow:>app register --type task --name python-task-with-status --uri docker://tzolov/python_task_with_status:0.1 +``` + +Create task instance and launch it: +```bash +dataflow:>app register --type task --name python-task-with-status --uri docker://tzolov/python_task_with_status:0.1 +dataflow:>task create --name python-task --definition "python-task-with-status" +dataflow:>task launch --name python-task +``` + +TIP: if `--error.message=` is passed as a launch argument, the task will throw an error with the specified text. \ No newline at end of file diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/python_task.py b/dataflow-website/recipes/polyglot/polyglot-python-task/python_task.py new file mode 100644 index 0000000..c2c0967 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-task/python_task.py @@ -0,0 +1,35 @@ +import sys +import time + +from util.task_status import TaskStatus +from util.task_args import get_task_id, get_db_url, get_task_name, get_cmd_arg + +try: + print('sys arguments {}'.format(sys.argv)) + + status = TaskStatus(get_task_id(), get_db_url()) + + # Set task status to RUNNING + status.running() + + # Do something + print('Start task:{}, id:{}, sqlalchemy-url:{}'.format(get_task_name(), get_task_id(), get_db_url())) + print('Wait for 60 seconds ...') + sys.stdout.flush() + time.sleep(60) + + # if you add --error.message=Bla to the launch properties, an exception is thrown + if get_cmd_arg('error.message') is not None: + raise Exception(get_cmd_arg('error.message')) + + print("Goodbye!") + + # Mark task completion status change + status.completed() + +except Exception as exp: + error_message = 'Task failed: {}'.format(exp) + status.failed(1, error_message, error_message) + print(error_message) + + diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/requirements.txt b/dataflow-website/recipes/polyglot/polyglot-python-task/requirements.txt new file mode 100644 index 0000000..87febb5 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-task/requirements.txt @@ -0,0 +1,2 @@ +sqlalchemy +mysqlclient \ No newline at end of file diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/util/__init__.py b/dataflow-website/recipes/polyglot/polyglot-python-task/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_args.py b/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_args.py new file mode 100644 index 0000000..2b4fd73 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_args.py @@ -0,0 +1,53 @@ +import sys +from collections import defaultdict + + +def get_cmd_arg(name): + """Extracts argument value by name. (@author: Chris Schaefer) + + Assumes the exec (default) spring-cloud-deployer-k8s argument passing mode. + + Args: + name: argument name. + Returns: + value of the requested argument. + """ + d = defaultdict(list) + for k, v in ((k.lstrip('-'), v) for k, v in (a.split('=') for a in sys.argv[1:])): + d[k].append(v) + + return d[name][0] + + +def get_db_url(): + """Computes sqlalchemy connection URL + + Uses the s.d.username, s.d.password and s.d.url properties to compute the sqlalchemy url. + This provides access to the SCDF internal DB and tables such as TASK_EXECUTION. + + Returns: + sqlalchemy compatible URL compatible with the target DB. + """ + username = get_cmd_arg('spring.datasource.username') + password = get_cmd_arg('spring.datasource.password') + jdbc_url = get_cmd_arg('spring.datasource.url') + + return str(jdbc_url) \ + .replace('jdbc:', '') \ + .replace('sqlserver:', 'mssql+pyodbc:') \ + .replace('//', '//{username}:{password}@'.format(username=username, password=password)) + + +def get_task_id(): + """Task ID as handled inside SCDF. + + When launching tasks SCDF provides the spring.cloud.task.executionid as command line argument. + + Returns: + The task id as handled inside SCDF. + """ + return get_cmd_arg('spring.cloud.task.executionid') + + +def get_task_name(): + return get_cmd_arg('spring.cloud.task.name') diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_status.py b/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_status.py new file mode 100644 index 0000000..a1917cd --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_status.py @@ -0,0 +1,40 @@ +from sqlalchemy import create_engine +from sqlalchemy.sql import text +import datetime + + +class TaskStatus: + """Helper class to help manage Task's status in the SCDF DB. """ + + def __init__(self, task_id, jdbc_url): + self.task_id = task_id + self.engine = create_engine(jdbc_url) + self.connection = self.engine.connect() + + def running(self): + """Set the TASK_EXECUTION's START_TIME """ + now = datetime.datetime.now() + start_task_statement = text( + "UPDATE TASK_EXECUTION SET START_TIME=:start_time, EXIT_CODE=null, LAST_UPDATED=:last_updated " + "WHERE TASK_EXECUTION_ID=:task_id") + self.connection.execute(start_task_statement, start_time=now, last_updated=now, task_id=self.task_id) + + def completed(self): + """Set the TASK_EXECUTION's END_TIME, EXIST_CODE=0 and EXIST_MESSAGE/ERROR_MESSAGE must be null """ + now = datetime.datetime.now() + complete_task_statement = text( + "UPDATE TASK_EXECUTION SET END_TIME=:end_time, EXIT_CODE=0, EXIT_MESSAGE=null, ERROR_MESSAGE=null, " + "LAST_UPDATED=:last_updated WHERE TASK_EXECUTION_ID=:task_id") + self.connection.execute(complete_task_statement, end_time=now, last_updated=now, task_id=self.task_id) + + def failed(self, exit_code, exit_message, error_message=''): + """Set the TASK_EXECUTION's END_TIME, EXIST_CODE is the error code and EXIST_MESSAGE/ERROR_MESSAGE describe + the error """ + now = datetime.datetime.now() + complete_task_statement = text( + "UPDATE TASK_EXECUTION SET END_TIME=:end_time, EXIT_CODE=:exit_code, EXIT_MESSAGE=:exit_message, " + " ERROR_MESSAGE=:error_message, LAST_UPDATED=:last_updated " + "WHERE TASK_EXECUTION_ID=:task_id") + self.connection.execute(complete_task_statement, end_time=now, exit_code=exit_code, + exit_message=exit_message, error_message=error_message, last_updated=now, + task_id=self.task_id)