To operate in sched
mode, Runaway needs to spawn a command, the scheduler, from which the main program will get parameters in an online fashion. The communication will occur through the standard input and output of the scheduler, following a small protocol presented here, in which the scheduler will be responding to queries sent by runaway. The scheduler will have to comply with the following protocol:
Request (written by runaway on scheduler stdin) | Details |
---|---|
{"GET_PARAMETERS_REQUEST": {"uuid": "kkkagr23451"}} |
Message sent by runaway to request a parameter to the scheduler. This parameter will in turn be executed on the cluster. |
{"RECORD_OUTPUT_REQUEST": {"uuid": "kkkagr23451", "parameters": "some params", "stdout": "some mess", "stderr": "some mess", "ecode": 0, "path": "/home", "features": "[0.5, 0.5]"} } |
Message sent by runaway for the scheduler to record the output of an execution. This allows the scheduler to incur the result of an execution. |
{"SHUTDOWN_REQUEST": {}} |
Message sent by runaway to allow the scheduler to clean after itself before both of them are closed. |
Response (written by scheduler on scheduler stdout) | Details |
---|---|
{"GET_PARAMETERS_RESPONSE": {"parameters": "some params"}} |
Response sent by the scheduler to reply positively to a parameter request. Should contain a parameter string. |
{"NOT_READY_RESPONSE": {}} |
Response sent by the scheduler, to reply negatively to a parameter request. This can be used if the scheduler needs to record some outputs before issuing more parameters. |
{"RECORD_OUTPUT_RESPONSE": {}} |
Response sent by the scheduler when the output was recorded. |
{"SHUTDOWN_RESPONSE": {}} |
Response sent by the scheduler when all the necessary cleanup was done, and the scheduler can be killed. |
{"ERROR_RESPONSE": {"message": "some error message" }} |
Response that can be sent at any moment, to signal an error in the scheduler. |
For instance here is a stub implementation of this protocol in python:
#!/usr/bin/env python3
import json
import sys
import traceback
class Scheduler(object):
"""
This simple scheduler is initialized with a set of parameters to be executed and
stores executions outputs in a dict.
"""
def __init__(self):
self.parameters = ["1", "2", "3"]
self.output = dict()
def loop(self):
"""
Protocol handling loop. This function dispatches messages to the handling functions.
"""
while True:
inpt = json.loads(input())
if "GET_PARAMETERS_REQUEST" in inpt.keys():
self.handle_parameter_request(**inpt["GET_PARAMETERS_REQUEST"])
elif "RECORD_OUTPUT_REQUEST" in inpt.keys():
self.handle_record_output_request(**inpt["RECORD_OUTPUT_REQUEST"])
elif "SHUTDOWN_REQUEST" in inpt.keys():
break
else:
raise Exception(f"Protocol mismatch. Received unexpected message: {inpt}.")
self.handle_shutdown_request()
@show_traceback
def handle_parameter_request(self, uuid):
next_parameters = self.parameters.pop()
if next_parameters is None:
return_error_response("All executions were processed.")
else:
self.outputs[uuid] = dict()
return_get_parameters_response(next_parameters)
@show_traceback
def handle_record_output_request(self, uuid, parameters, stdout, stderr, ecode, path, features):
out = self.outputs[uuid]
out['parameters'] = parameters
out['stdout'] = stdout
out['stderr'] = stderr
out['ecode'] = ecode
out['path'] = path
out['features'] = features
return_record_output_response()
@show_traceback
def handle_shutdown_request(self):
return_shutdown_response()
def return_error_response(message):
return print(json.dumps({"ERROR_RESPONSE": {"message": message}}))
def return_not_ready_response():
return print(json.dumps({"NOT_READY_RESPONSE": {}}))
def return_record_output_response():
return print(json.dumps({"RECORD_OUTPUT_RESPONSE": {}}))
def return_shutdown_response():
return print(json.dumps({"SHUTDOWN_RESPONSE": {}}))
def return_get_parameters_response(parameters):
return print(json.dumps({"GET_PARAMETERS_RESPONSE": {"parameters": parameters}}))
def show_traceback(func):
def inner(*args):
try:
return func(*args)
except Exception as e:
traceback.print_tb(e.__traceback__)
return_error_response(f"Exception occurred: {e}")
return inner
if __name__ == "__main__":
Scheduler().loop()