In this tutorial, you will take an existing piece of software (in this case, it's a very simply Python script which doesn't do anything else but to wait for a while) and wrap a simple asynchronous service around it, so that the software can be used on the CAxMan infrastructure stack. In a more realistic scenario, the software would be anything which takes a longer time to complete and where intermediate status updates should be displayed to the user during service execution.
This tutorial starts from the code example async_waiter_tutorial. To begin with, copy this folder to a new location or alternatively create a new, local git branch to work on.
Now, change into the folder containing your copy of the code example and open a terminal there.
As for the calculator-service tutorial, the first thing to do is to adapt the existing code so that it can run on your deployment setup. To be able to listen to the correct http requests, the webservice needs to know its relative deployment path or context root.
Example: If you aim to run the service on a VM which is reachable via
<somehost>/mycompany/myvm
(<somehost>
can, for example, be
caxman.clesgo.net
), the context root needs to be set to /mycompany/myvm
.
The code example we're working with is a Docker container which is configured
via environment variables defined in the env
file in the source folder. Edit
this file and change the context-root definition to the string appropriate to
your deployment path.
The example directory already contains a full service skeleton and a Docker
container to wrap the service in. For now, have a look at
app/wait_a_while.py
, which is our placeholder for a long-running complex
calculation. The script expects three input parameters when executed: the
number of seconds to wait, the path to a statusfile, and the path to a results
file. In that respect, this script is not different from many "real" software
packages: It expects user input (how many seconds to wait), it will create log
files (the status file) during execution and results once done (the results
file).
The main part of the pre-implemented webservice skeleton is app/Waiter.py
.
Open that file in a text editor and have a quick look around. You will see an
almost empty definition of a WaiterService
class as well as a function
create_html_progresspage()
, which takes a number in percent and creates a
very simple html status page from it.
In the following, we will add two webmethods to the so far empty
WaiterService
class: one to start the asynchronous service, and one to obtain
its current exection status. Both these functions will be called by the
workflow manager when the webservice is registered as a CAxMan asynchronous
service.
Before we implement the method to start the waiter service, let's think shortly
about the method interface. Every asynchronous service needs to accept at least
two input parameters: The serviceID
(a unique identifier assigned by the
workflow manager) and the sessionToken
(an authentication token that can be
used to verify user credentials). In this example, we also want to have the
number of seconds to wait as an input, meaning that we will implement a total
of three input parameters.
Furthermore, every asynchronous service at least needs to have one output
parameter, namely status_base64
, a base64-encoded status string.
Additionally, we want to have another output parameter representing the result
of the long- running program (= the waiter in this case) we start from within
the service.
With this in mind, add the following method definition to the WaiterService
class:
@spyne.srpc(Unicode, Unicode, Integer, _returns=(Unicode, Unicode),
_out_variable_names=("status_base64", "result"))
def startWaiter(serviceID, sessionToken, secondsToWait=300):
The function decorator @spyne.srpc(...)
marks the following function
definition as a Spyne SOAP method, which will be made available via the
webservice's wsdl file. In this decorator, we define the function signature,
specifying two strings (Unicode
) and one integer input value as well as two
string output values. The input values map directly to the arguments of the
function definition in the next line (serviceID
, sessionToken
, and
secondsToWait
). Since in Python, return arguments are never named, we also
explicitly define the names the return variables will have in the SOAP service
definition.
Our start method needs to perform three tasks:
- Prepare a unique environment (meaning a location for input, status, and output data) for the waiter script to run in.
- Start the waiter script.
- Create a first status report to send back to the workflow manager as a return argument.
Important: Note that a CAxMan service can be run several times in parallel. It is therefore important that subsequent status-query calls to the service return information from the correct long-running background process (the waiter script in this example). The workflow manager assigns a unique service ID to every service which is executed (the service ID is really more a service-execution ID), which we can use to distinguish between these different service executions.
Add the following lines to the method you just created:
waiterdir = os.path.join(WAITER_LOG_FOLDER, serviceID)
if not os.path.exists(waiterdir):
os.mkdir(waiterdir)
statusfile = os.path.join(waiterdir, 'status.txt')
resultfile = os.path.join(waiterdir, 'result.txt')
We create a temporary folder named after the service ID (WAITER_LOG_FOLDER
is read from an environment variable at the top of Waiter.py
) to have a
unique environment for the waiter script to run in. We furthermore define paths
for the status and results files.
Now, we add the execution of the waiter script to the function:
command = ['python', 'wait_a_while.py', str(secondsToWait),
statusfile, resultfile]
subprocess.Popen(command)
These lines start the waiter script wait_a_while.py
as a detached subprocess.
It is crucial here that the startWaiter
method does not wait for this
process to return, as this would ruin the idea of an asynchronous service.
Instead, the subprocess.Popen()
call returns immediately.
Finally, add the following lines to conclude the startWaiter
method:
status = base64.b64encode(create_html_progresspage(0))
result = "UNSET"
return (status, result)
We call create_html_progresspage()
with an initial progress of 0 % (which is
our "best guess" since we actually don't know the status yet). We then
base64-encode the result page to make it digestable for the workflow manager.
We furthermore set the result value to some value indicating that it is not
assigned yet. (We have to supply this output value, but it won't have any
meaningful content as long as the waiter script is still running.)
Note: The status page we return can be anything from a very simple text to a full-fledged html page including, for example, images. This way, we can create a rich feedback for the user during the execution of asynchronous services.
The second function missing in our service has the pre-defined name
getServiceStatus()
. This function is called every few seconds by the workflow
manager to query the current execution status of the service, until the service
terminates.
Add the following function definition to the WaiterService
class:
@spyne.srpc(Unicode, Unicode, _returns=(Unicode, Unicode),
_out_variable_names=("status_base64", "result"))
def getServiceStatus(serviceID, sessionToken):
The function signature is similar to that of the startWaiter
method. The two
input arguments serviceID
and sessionToken
are again mandatory for
asynchronous services. Note that the output arguments are exactly identical to
those of the start method.
First, our function needs to make sure to query the correct background waiter script. Therefore, add the following lines to the function:
waiterdir = os.path.join(WAITER_LOG_FOLDER, serviceID)
statusfile = os.path.join(waiterdir, 'status.txt')
resultfile = os.path.join(waiterdir, 'result.txt')
You can see that this again creates directory and file names using the unique
service ID, just as we did in the startWaiter
method.
Next, the function should read the status file (which is periodically updated by the waiter script) and act depending on its content. Add the following lines to your code:
with open(statusfile) as f:
current_status = f.read().strip()
The waiter script simply writes a number between 0 and 100 into the status file to indicate its progress, so we can use this number to decide how to proceed. We first add the logic for when the waiter script has finished its execution:
if current_status == "100":
status = "COMPLETED"
# Read result page from waiter
with open(resultfile) as f:
result = base64.b64encode(f.read())
return (status, result)
In this case, we report the pre-defined status string "COMPLETED"
, which
will tell the workflow manager that the service has finished and that the next
element in the workflow can now be executed. We furthermore read the waiter
script's result file (which should at this state be filled with something
meaningful), base64-encode it and send it back to the workflow manager together
with the status string.
Finally, we add the logic for when the waiter script has not yet finished its work:
result = "UNSET"
status = base64.b64encode(create_html_progressbar(int(current_status)))
return (status, result)
In that case, we use the current status value to again create a html status page and once again return "UNSET" as the result output.
You might have noticed an oddity about the return values we defined for the two
methods we implemented. Specifically, we defined the result
return value in
startWaiter
without actually using it for anything meaningful. The reason for
this is simple: The workflow manager will use the main method of an
asynchronous service (here, that is startWaiter
) to deduce the service's
input and output arguments. So any output value which should be used later on
in the workflow has to be defined in this main method. Since the final result
of an asynchronous service cannot be known at the execution time of this main
method, the last call to getServiceStatus
has the responsibility to deliver
the final value for all output arguments.
In our implementation of getServiceStatus
, the status we report is either a
base64-encoded html page (when the service is still running) or the string
"COMPLETED"
in case the service has finished its work. The workflow manager
can process a second pre-defined status string, namely "UNCHANGED"
. This
status means that there was no change in status compared to the previous call
to getServiceStatus
, and the workflow manager will continue showing the last
status page created. This is useful in situations where the creation of a
status page itself is costly, for example when a lot of log processing, image
creation etc. needs to be done. In our simple example here, we omitted this
status option for the sake of brevity.
Your waiter webservice is now complete and ready for deployment. If you were not sure about some of the code additions, you can compare your code with the code example async_waiter, which contains the waiter service in its complete form. (It contains actually a bit more than the implementation in this tutorial, but you can ignore those differences here.)
As in tutorial 3-1, we use a pre-defined build script for deployment:
./rebuildandrun.sh <port>
The service will be started in a Docker container named waiter
. Again, you
can run docker ps
and docker logs waiter
to check that the service is
running properly.
For making test calls to the service, the code again ships with a test client
available in the test_client/
folder. To run the test client as a Docker
container, run:
./build.sh
./run.sh <port> start
The calls the startWaiter
method with a dummy service ID and session token,
which should give you a similar output to this:
Using port 8080
wsdl URL is http://localhost:8080/sintef/docker_services/waiter/Waiter?wsdl
Starting service
(reply){
status_base64 = "PGh0bWw+CjxoZWFkPgo8dGl0bGU+V2FpdGVyIHN0YXR1czwvdGl0bGU+CjwvaGVhZD4KPGJvZHkgc3R5bGU9Im1hcmdpbjogMjBweDsgcGFkZGluZzogMjBweDsiPgo8aDE+V2FpdGVyIHN0YXR1cyBhdCAyMjoyMzoxMTwvaDE+CjxkaXYgc3R5bGU9ImJvcmRlci1yYWRpdXM6IDVweDsgYm9yZGVyLWNvbG9yOiBsaWdodGJsdWVibHVlOyBib3JkZXItc3R5bGU6ZGFzaGVkOyB3aWR0aDogODAwcHg7IGhlaWdodDogODBweDtwYWRkaW5nOjA7IG1hcmdpbjogMDsgYm9yZGVyLXdpZHRoOiAzcHg7Ij4KPGRpdiBzdHlsZT0icG9zaXRpb246IHJlbGF0aXZlOyB0b3A6IC0zcHg7IGxlZnQ6IC0zcHg7IGJvcmRlci1yYWRpdXM6IDVweDsgYm9yZGVyLWNvbG9yOiBsaWdodGJsdWU7IGJvcmRlci1zdHlsZTpzb2xpZDsgd2lkdGg6IDAuMHB4OyBoZWlnaHQ6IDgwcHg7cGFkZGluZzowOyBtYXJnaW46IDA7IGJvcmRlci13aWR0aDogM3B4OyBiYWNrZ3JvdW5kLWNvbG9yOiBsaWdodGJsdWU7Ij4KPGgxIHN0eWxlPSJtYXJnaW4tbGVmdDogMjBweDsiID4wJTwvaDE+CjwvZGl2Pgo8L2Rpdj4KPC9oZWFkPgo8L2JvZHk+"
result = "UNSET"
}
You see the base64-encoded status page as well as the still unset result.
Now, execute the run script again, but with a different argument:
./run.sh <port> status
This will call the getServiceStatus
method, which should return output
similar to the following:
Using port 8080
wsdl URL is http://localhost:8080/sintef/docker_services/waiter/Waiter?wsdl
Calling getServiceStatus:
(reply){
status_base64 = "PGh0bWw+CjxoZWFkPgo8dGl0bGU+V2FpdGVyIHN0YXR1czwvdGl0bGU+CjwvaGVhZD4KPGJvZHkgc3R5bGU9Im1hcmdpbjogMjBweDsgcGFkZGluZzogMjBweDsiPgo8aDE+V2FpdGVyIHN0YXR1cyBhdCAyMjoyMzoyODwvaDE+CjxkaXYgc3R5bGU9ImJvcmRlci1yYWRpdXM6IDVweDsgYm9yZGVyLWNvbG9yOiBsaWdodGJsdWVibHVlOyBib3JkZXItc3R5bGU6ZGFzaGVkOyB3aWR0aDogODAwcHg7IGhlaWdodDogODBweDtwYWRkaW5nOjA7IG1hcmdpbjogMDsgYm9yZGVyLXdpZHRoOiAzcHg7Ij4KPGRpdiBzdHlsZT0icG9zaXRpb246IHJlbGF0aXZlOyB0b3A6IC0zcHg7IGxlZnQ6IC0zcHg7IGJvcmRlci1yYWRpdXM6IDVweDsgYm9yZGVyLWNvbG9yOiBsaWdodGJsdWU7IGJvcmRlci1zdHlsZTpzb2xpZDsgd2lkdGg6IDIyNC4wcHg7IGhlaWdodDogODBweDtwYWRkaW5nOjA7IG1hcmdpbjogMDsgYm9yZGVyLXdpZHRoOiAzcHg7IGJhY2tncm91bmQtY29sb3I6IGxpZ2h0Ymx1ZTsiPgo8aDEgc3R5bGU9Im1hcmdpbi1sZWZ0OiAyMHB4OyIgPjI4JTwvaDE+CjwvZGl2Pgo8L2Rpdj4KPC9oZWFkPgo8L2JvZHk+"
result = "UNSET"
}
After 60 seconds (which is the waiting time pre-defined by the test client),
yet another call to ./run.sh <port> status
should give the following
output:
Using port 8080
wsdl URL is http://localhost:8080/sintef/docker_services/waiter/Waiter?wsdl
Calling getServiceStatus:
(reply){
status_base64 = "COMPLETED"
result = "FINISHED"
}
You are now ready to register the deployed waiter service as a CAxMan asynchronous service.
First, make sure that the webservice's wsdl is reachable from the outside by opening the following URL in a browser:
https://<host><your_context_root>/waiter/Waiter?wsdl
Replace <host>
and <your_context_root>
with the appropriate values. You
should receive an xml file containing a formal description of the webservice.
Now, register the webservice's startWaiter
method as a CAxMan service
using the workflow-editor GUI. See the tutorial on service registration for details on how to do that. Make
sure that you select asynchronous service during the registration.
You can now create a simple workflow which contains only your newly created
waiter service. Make sure to connect the workflow inputs serviceID
and
sessionToken
to the corresponding inputs of the waiter service, and connect
the waiter service's result
output to the workflow output. Hard-code a value
for the secondsToWait
input of the waiter service.
When you save, publish, and run your workflow, you should see a very simple status page updating every few seconds until the service has waited for the time you specified in the workflow.