Writing schedulers

The `sched` subcommand allows Runaway to be automated by a program of your writing. In this page, we see how to write such a scheduler.

Protocol specification

To operate in sched mode, Runaway needs to spawn a command, the scheduler, from which the main program will get parameters in an online fashion. The communication will occur through the standard input and output of the scheduler, following a small protocol presented here, in which the scheduler will be responding to queries sent by runaway. The scheduler will have to comply with the following protocol:

Request (written by runaway on scheduler stdin) Details
{"GET_PARAMETERS_REQUEST": {"uuid": "kkkagr23451"}} Message sent by runaway to request a parameter to the scheduler. This parameter will in turn be executed on the cluster.
{"RECORD_OUTPUT_REQUEST": {"uuid": "kkkagr23451", "parameters": "some params", "stdout": "some mess", "stderr": "some mess", "ecode": 0, "path": "/home", "features": "[0.5, 0.5]"} } Message sent by runaway for the scheduler to record the output of an execution. This allows the scheduler to incur the result of an execution.
{"SHUTDOWN_REQUEST": {}} Message sent by runaway to allow the scheduler to clean after itself before both of them are closed.
Response (written by scheduler on scheduler stdout) Details
{"GET_PARAMETERS_RESPONSE": {"parameters": "some params"}} Response sent by the scheduler to reply positively to a parameter request. Should contain a parameter string.
{"NOT_READY_RESPONSE": {}} Response sent by the scheduler, to reply negatively to a parameter request. This can be used if the scheduler needs to record some outputs before issuing more parameters.
{"RECORD_OUTPUT_RESPONSE": {}} Response sent by the scheduler when the output was recorded.
{"SHUTDOWN_RESPONSE": {}} Response sent by the scheduler when all the necessary cleanup was done, and the scheduler can be killed.
{"ERROR_RESPONSE": {"message": "some error message" }} Response that can be sent at any moment, to signal an error in the scheduler.

Example implementation

For instance here is a stub implementation of this protocol in python:

#!/usr/bin/env python3
import json 
import sys
import traceback


class Scheduler(object):
    """
    This simple scheduler is initialized with a set of parameters to be executed and
    stores executions outputs in a dict.
    """

    def __init__(self):
        self.parameters = ["1", "2", "3"]
        self.output = dict()
        
    def loop(self):
        """
        Protocol handling loop. This function dispatches messages to the handling functions.
        """
        while True:
            inpt = json.loads(input())
            if "GET_PARAMETERS_REQUEST" in inpt.keys():
                self.handle_parameter_request(**inpt["GET_PARAMETERS_REQUEST"])    
            elif "RECORD_OUTPUT_REQUEST" in inpt.keys():
                self.handle_record_output_request(**inpt["RECORD_OUTPUT_REQUEST"])
            elif "SHUTDOWN_REQUEST" in inpt.keys():
                break
            else:
                raise Exception(f"Protocol mismatch. Received unexpected message: {inpt}.")
        self.handle_shutdown_request()

    @show_traceback
    def handle_parameter_request(self, uuid):
        next_parameters = self.parameters.pop()
        if next_parameters is None:
            return_error_response("All executions were processed.") 
        else:
            self.outputs[uuid] = dict()
            return_get_parameters_response(next_parameters)

    @show_traceback
    def handle_record_output_request(self, uuid, parameters, stdout, stderr, ecode, path, features):
        out = self.outputs[uuid]
        out['parameters'] = parameters
        out['stdout'] = stdout
        out['stderr'] = stderr
        out['ecode'] = ecode
        out['path'] = path
        out['features'] = features 
        return_record_output_response()

    @show_traceback
    def handle_shutdown_request(self):
        return_shutdown_response()


def return_error_response(message):
    return print(json.dumps({"ERROR_RESPONSE": {"message": message}}))

def return_not_ready_response():
    return print(json.dumps({"NOT_READY_RESPONSE": {}}))

def return_record_output_response():
    return print(json.dumps({"RECORD_OUTPUT_RESPONSE": {}}))

def return_shutdown_response():
    return print(json.dumps({"SHUTDOWN_RESPONSE": {}}))

def return_get_parameters_response(parameters):
    return print(json.dumps({"GET_PARAMETERS_RESPONSE": {"parameters": parameters}}))

def show_traceback(func):
    def inner(*args):
        try:
            return func(*args)
        except Exception as e:
            traceback.print_tb(e.__traceback__)
            return_error_response(f"Exception occurred: {e}")
    return inner 


if __name__ == "__main__":

    Scheduler().loop()