Introduction to Amazon Code Pipeline with Java part 17: the JobProcessor implementation

Introduction

In the previous post we continued to explore the job processor interface of the job agent. In particular we looked at the WorkResult object which holds a number of properties that describe the job result. These include a summary message, the failure reason, the progress made and various others.

In this post we’ll discuss a possible implementation of the JobProcessor interface. Note that the exact implementation will depend on your business since this is where the actual Code Pipeline work item is processed. Our implementation the interface is also very tightly coupled with the load testing domain of our business. Hence I won’t present every single detail around that code as much of it irrelevant for Code Pipeline and the job agent.

A job processor implementation

The implemented process function will have a code branch based on the availability of the continuation token. Recall that the continuation token is a means for us to provide our own job identifier for the job being carried out. Very often it will be some kind of internal job ID and in our implementation that is also the case. A load test will get a job ID, such as 123456 and we use that identifier to query the load test job status and its results.

If the continuation token is null then we know that CP has just handed us a brand new job which has not been handled before. In that case we’ll initiate a new load test, get hold of the load test job ID and return it to CP. Here’s an implementation with much of the business specific details stripped out:

public class ApicaLoadtestJobProcessor implements JobProcessor
{

    private final LoadtestJobExecutorService loadtestJobExecutorService;
    private final LoadtestThresholdParser codePipelineLoadtestThresholdParser;

    public ApicaLoadtestJobProcessor(LoadtestJobExecutorService loadtestJobExecutorService,
            LoadtestThresholdParser codePipelineLoadtestThresholdParser)
    {
        this.loadtestJobExecutorService = loadtestJobExecutorService;
        this.codePipelineLoadtestThresholdParser = codePipelineLoadtestThresholdParser;
    }

    @Override
    public WorkResult process(WorkItem workItem)
    {
        final Map<String, String> actionConfiguration = workItem.getJobData().getActionConfiguration();
        String continuationToken = workItem.getJobData().getContinuationToken();
        if (continuationToken == null) //brand new job, about to start
        {
            JobParamsValidationResult validationResult = validateJobParams(actionConfiguration, workItem);
            if (!validationResult.isAllParamsPresent())
            {
                return WorkResult.failure(
                        workItem.getJobId(),
                        new FailureDetails(FailureType.ConfigurationError, validationResult.getExceptionSummary()));
            }

            StartJobByPresetResponse startJobByPresetResponse = transmitJob(actionConfiguration, workItem);
            if (startJobByPresetResponse.getJobId() > 0)
            {
                return WorkResult.success(workItem.getJobId(), new ExecutionDetails("Load test job started",
                        Integer.toString(startJobByPresetResponse.getJobId()), 5),
                        null,
                        Integer.toString(startJobByPresetResponse.getJobId()));
            } else
            {
                return WorkResult.failure(
                        workItem.getJobId(),
                        new FailureDetails(FailureType.JobFailed, startJobByPresetResponse.getException()));
            }
        } else //continuation token != null means that this load test job was already handled at least once
        {
            int apicaJobId = Integer.parseInt(continuationToken);            
            JobStatusRequest jobStatusRequest = new JobStatusRequest();
            //code ignored
            JobStatusResponse jobStatus = loadtestJobExecutorService.checkJobStatus(jobStatusRequest);
            if (jobStatus.isCompleted() && !jobStatus.isJobFaulted()) //job completed without errors
            {        
                
                return WorkResult.success(
                            workItem.getJobId(),
                            new ExecutionDetails("Load test job completed"),
                                    Integer.toString(apicaJobId), 100),
                            new CurrentRevision("revision", "change identifier"));
                
            } else if (jobStatus.isJobFaulted()) //job completed with errors
            {
                return WorkResult.failure(
                        workItem.getJobId(),
                        new FailureDetails(FailureType.JobFailed, jobStatus.getException()));
            } else if (!jobStatus.isAborted() && !jobStatus.isActive()) //job was deleted 
            {
                return WorkResult.failure(
                        workItem.getJobId(),
                        new FailureDetails(FailureType.JobFailed, "This job has been deleted."));
            } else //job ongoing
            {
                return WorkResult.success(workItem.getJobId(), new ExecutionDetails(jobStatus.getStatusMessage(),
                        Integer.toString(apicaJobId), 50),
                        null,
                        //new CurrentRevision("revision", "change identifier"),
                        Integer.toString(apicaJobId));
            }
        }
    }

    private StartJobByPresetResponse transmitJob(Map<String, String> actionConfiguration,
            WorkItem workItem)
    {
        TransmitJobRequestArgs transmitJobRequestArgs = new TransmitJobRequestArgs();
		//code ignored
        StartJobByPresetResponse startByPresetResponse = this.loadtestJobExecutorService.transmitJob(transmitJobRequestArgs);
        return startByPresetResponse;
    }

    private JobParamsValidationResult validateJobParams(Map<String, String> actionConfiguration, WorkItem workItem)
    {
        JobParamsValidationResult res = new JobParamsValidationResult();
		//validation code ignored
        
        return res;
    }
}

We get hold of the action configuration and the continuation token from the incoming work item. If the token is null then we know that it’s a brand new load test job and it must be instantiated. The job parameters are validated before a load test starts. This validation process is specific to our load testing business. Your third party action will probably also have some validation process. The parameter validation process returns a JobParamsValidationResult object. Here’s the object code if you’re interested:

package com.apica.awscodepipelinebuildrunner.loadtest;

public class JobParamsValidationResult
{
    private boolean allParamsPresent;
    private int presetTestInstanceId;
    private String exceptionSummary;
    private String authTokenException;
    private String scenarioFileException;
    private String presetNameException;

    public int getPresetTestInstanceId()
    {
        return presetTestInstanceId;
    }

    public void setPresetTestInstanceId(int presetTestInstanceId)
    {
        this.presetTestInstanceId = presetTestInstanceId;
    }
    
    public String getAuthTokenException()
    {
        return authTokenException;
    }

    public void setAuthTokenException(String authTokenException)
    {
        this.authTokenException = authTokenException;
    }

    public String getScenarioFileException()
    {
        return scenarioFileException;
    }

    public void setScenarioFileException(String scenarioFileException)
    {
        this.scenarioFileException = scenarioFileException;
    }

    public String getPresetNameException()
    {
        return presetNameException;
    }

    public void setPresetNameException(String presetNameException)
    {
        this.presetNameException = presetNameException;
    }
    
    public boolean isAllParamsPresent()
    {
        return allParamsPresent;
    }

    public void setAllParamsPresent(boolean allParamsPresent)
    {
        this.allParamsPresent = allParamsPresent;
    }

    public String getExceptionSummary()
    {
        return exceptionSummary;
    }

    public void setExceptionMessage(String exceptionMessage)
    {
        this.exceptionSummary = exceptionMessage;
    }
}

If validation fails then we return a failure action of type ConfigurationError. We also include the exception message summary.

If we pass the validation stage then we can start the load test by a step called “transmitJob”. Again, this is some internal process which is not relevant to Code Pipeline. Your third party action will probably also have a similar job start method. Here’s the StartJobByPresetResponse object which holds our internal load test job ID which will become the continuation token:

package com.apica.awscodepipelinebuildrunner.loadtest;


public class StartJobByPresetResponse
{
    private String exception;
    private int jobId;
    
    public String getException()
    {
        return exception;
    }

    public void setException(String exception)
    {
        this.exception = exception;
    }

    public int getJobId()
    {
        return jobId;
    }

    public void setJobId(int jobId)
    {
        this.jobId = jobId;
    }
}

If the load test job ID is greater than 0 then we know that the load test job has successfully been transmitted. We can then send a success response with the job ID as the last parameter to the WorkResult.success function. We simply set the progress indicator to 5% to indicate to Code Pipeline that it’s an ongoing job and shouldn’t be considered complete. There’s nothing special about this number, it could be 1 or 16 as well, but it should lie on a scale from 1 to 99 to begin with. 100 means a completed job.

If the job transmit phase has failed then we send back a failure result with the relevant exception message.

Then we have the code block where the continuation token is not null. The token is converted into an integer which is the load test job ID. We know at this stage that this is an ongoing job and we need to check its status. We check the job status using the following request and response objects:

package com.apica.awscodepipelinebuildrunner.loadtest;

public class JobStatusRequest
{
    private String awsClientId;
    private String awsClientToken;
    private int jobId;

    public String getAwsClientId()
    {
        return awsClientId;
    }

    public void setAwsClientId(String awsClientId)
    {
        this.awsClientId = awsClientId;
    }

    public String getAwsClientToken()
    {
        return awsClientToken;
    }

    public void setAwsClientToken(String awsClientToken)
    {
        this.awsClientToken = awsClientToken;
    }
    
    public int getJobId()
    {
        return jobId;
    }

    public void setJobId(int jobId)
    {
        this.jobId = jobId;
    }
}

package com.apica.awscodepipelinebuildrunner.loadtest;

import java.util.Date;

public class JobStatusResponse
{
    private int jobId;
    private String statusMessage;
    private boolean completed;
    private boolean jobFaulted;
    private String exception;
    private boolean aborted;
    private boolean active;

    public boolean isAborted()
    {
        return aborted;
    }

    public void setAborted(boolean aborted)
    {
        this.aborted = aborted;
    }

    public boolean isActive()
    {
        return active;
    }

    public void setActive(boolean active)
    {
        this.active = active;
    }
    
    public int getJobId()
    {
        return jobId;
    }

    public void setJobId(int jobId)
    {
        this.jobId = jobId;
    }

    public String getStatusMessage()
    {
        return statusMessage;
    }

    public void setStatusMessage(String statusMessage)
    {
        this.statusMessage = statusMessage;
    }

    public boolean isCompleted()
    {
        return completed;
    }

    public void setCompleted(boolean completed)
    {
        this.completed = completed;
    }

    public boolean isJobFaulted()
    {
        return jobFaulted;
    }

    public void setJobFaulted(boolean jobFaulted)
    {
        this.jobFaulted = jobFaulted;
    }

    public String getException()
    {
        return exception;
    }

    public void setException(String exception)
    {
        this.exception = exception;
    }

    public boolean isJobCompleted()
    {
        return jobFaulted || completed;
    }
}

If the load test has run complete in our backend system then we put the status according to the actual job outcome. The load test job may have run with or without exceptions. Note how set the progress indicator to 100 in the ExecutionDetails object. The CurrentRevision object has in fact no relevance in our third party action. We only pass in some irrelevant string values.

If the job is still ongoing in our backend then we let CP know that it still hasn’t finished by setting the progress indicator to the arbitrary value of 50. CP doesn’t actually show any progress bar in the UI so this could be any positive integer from 1 to 99. If CP is extended in the future so that the user can view the progress then this progress indicator integer will require some more sophisticated calculation of course.

As you see our implementation of the job processor doesn’t really do much work on its own, e.g. it is not responsible for executing a load test autonomously in a self-contained manner. It communicates with our backend using our public API. Again, that is an implementation detail that may well differ from your job agent.

That’s it really for the model implementation of the JobProcessor interface.

In the next post we’ll start investigating how the long running job polling thread is started.

View all posts related to Amazon Web Services and Big Data here.

Advertisements

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

Robin Sedlaczek's Blog

Developer on Microsoft Technologies

HarsH ReaLiTy

A Good Blog is Hard to Find

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: