Introduction to Amazon Code Pipeline with Java part 20: the job worker configuration

Introduction

In the previous part we started looking at the job worker initialisation details. The job worker takes the form of a deamon thread and implements the org.apache.commons.daemon.Daemon interface. The job worker daemon implementation introduces a range of new classes that we need to look at in order to get the full picture. One was the job poller interface which only has a single method called execute.

Another such element is the job worker configuration class which we’ll discuss in this post.

The job worker configuration

The job worker configuration class implements the JobWorkerConfiguration interface which we’ve seen already:

package com.apica.awscodepipelinebuildrunner.configuration;

import com.apica.awscodepipelinebuildrunner.JobPoller;
import java.util.List;
import com.apica.awscodepipelinebuildrunner.logging.LoggerService;

public interface JobWorkerConfiguration
{
    
    public List<JobPoller> jobPollers(LoggerService logger);
    public long getPollingIntervalInMs();
}

The basic implementation of the interface is provided by a default class called DefaultJobWorkerConfiguration:

package com.apica.awscodepipelinebuildrunner.configuration;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSSessionCredentialsProvider;
import com.amazonaws.services.codepipeline.AWSCodePipelineClient;
import com.apica.awscodepipelinebuildrunner.CodePipelineJobPoller;
import com.apica.awscodepipelinebuildrunner.CodePipelineJobProcessor;
import com.apica.awscodepipelinebuildrunner.JobPoller;
import com.apica.awscodepipelinebuildrunner.JobProcessor;
import com.apica.awscodepipelinebuildrunner.JobService;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import com.apica.awscodepipelinebuildrunner.logging.LoggerService;


public abstract class DefaultJobWorkerConfiguration implements JobWorkerConfiguration
{
    private final long pollIntervalMs;
    private final int workerThreads;
    private final int pollBatchSize;
    private final String codePipelineEndpoints;

    public DefaultJobWorkerConfiguration(long pollIntervalMs, int workerThreads, int pollBatchSize, 
            String codePipelineEndpoints)
    {
        this.pollIntervalMs = pollIntervalMs;
        this.workerThreads = workerThreads;
        this.pollBatchSize = pollBatchSize;
        this.codePipelineEndpoints = codePipelineEndpoints;
    }
    
    @Override
    public long getPollingIntervalInMs()
    {
        return this.pollIntervalMs;
    }

    @Override
    public List<JobPoller> jobPollers(LoggerService logger)
    {
        List<JobPoller> pollers = new ArrayList<>();
        
        String[] codePipelineEndpointsArray = this.codePipelineEndpoints.split(";");
        for (String endpoint : codePipelineEndpointsArray)
        {
            pollers.add(new CodePipelineJobPoller(jobService(endpoint), jobProcessor(), 
                    threadPoolExecutor(), pollBatchSize, logger, endpoint, UUID.randomUUID().toString()));
        }
        return pollers;
    }

    protected JobProcessor jobProcessor()
    {
        return new CodePipelineJobProcessor();
    }

    protected AWSCodePipelineClient codePipelineClient(String codePipelineEndpoint)
    {        
        AWSCredentials awsCredentials = new BasicAWSCredentials("your AWS secret key", "your AWS secret access key");
        AWSCredentialsProvider temporaryCredentialsProvider = new STSSessionCredentialsProvider(awsCredentials);
        AWSCodePipelineClient codePipelineClient = new AWSCodePipelineClient(temporaryCredentialsProvider);
        codePipelineClient.setEndpoint(codePipelineEndpoint);  
        return codePipelineClient;
    }
    
    protected ThreadPoolExecutor threadPoolExecutor()
    {
        return (ThreadPoolExecutor) Executors.newFixedThreadPool(this.workerThreads);
    }
    
    protected abstract JobService jobService(String codePipelineEndpoint);
}

THe jobPollers function reads the endpoint addresses from the code pipeline endpoints string which includes the CP endpoints delimited by a semi-colon. For each endpoint to be monitored the function constructs a new CodePipelineJobPoller object whose implementation we saw earlier in this series.

jobProcessor() returns a very basic implementation of the JobProcessor interface:

package com.apica.awscodepipelinebuildrunner;

import com.apica.awscodepipelinebuildrunner.model.CurrentRevision;
import com.apica.awscodepipelinebuildrunner.model.ExecutionDetails;
import com.apica.awscodepipelinebuildrunner.model.FailureDetails;
import com.apica.awscodepipelinebuildrunner.model.FailureType;
import com.apica.awscodepipelinebuildrunner.model.JobStatus;
import com.apica.awscodepipelinebuildrunner.model.WorkItem;
import com.apica.awscodepipelinebuildrunner.model.WorkResult;
import java.util.Map;
import java.util.UUID;

public class CodePipelineJobProcessor implements JobProcessor
{

    private static final String JOB_STATUS = "JobStatus";
    
    @Override
    public WorkResult process(final WorkItem workItem)
    {
        
        final Map<String, String> actionCofiguration = workItem.getJobData().getActionConfiguration();

        if (actionCofiguration.containsKey(JOB_STATUS))
        {
            if (actionCofiguration.get(JOB_STATUS).equals(JobStatus.Failed.toString()))
            {

                return WorkResult.failure(
                        workItem.getJobId(),
                        new FailureDetails(FailureType.JobFailed, "job failed"));
            }
        }

        return WorkResult.success(
                workItem.getJobId(),
                new ExecutionDetails("test summary", UUID.randomUUID().toString(), 100),
                new CurrentRevision("test revision", "test change identifier"));
    }

}

The above stub is only used for testing. The “real” implementation of the interface is the ApicaLoadtestJobProcessor class which we already looked at in a previous post.

The codePipelineClient() function sets up the AWS CP client using the AWS library and the AWS credentials. threadPoolExecutor() returns a fixed sized thread pool, one for each worker thread. The jobService function is abstract and is implemented by the ApicaLoadtestJobWorkerConfiguration class which therefore derives from DefaultJobWorkerConfiguration:

package com.apica.awscodepipelinebuildrunner.configuration;

import com.apica.awscodepipelinebuildrunner.JobProcessor;
import com.apica.awscodepipelinebuildrunner.JobService;
import com.apica.awscodepipelinebuildrunner.model.ActionTypeId;
import com.apica.awscodepipelinebuildrunner.plugin.thirdparty.ClientTokenProvider;
import com.apica.awscodepipelinebuildrunner.plugin.thirdparty.ApicaLoadtestCodePipelineJobService;
import com.apica.awscodepipelinebuildrunner.repository.ConfigKeys;
import java.util.Properties;
import com.apica.awscodepipelinebuildrunner.logging.LoggerService;

public class ApicaLoadtestJobWorkerConfiguration extends DefaultJobWorkerConfiguration
{
    private final LoggerService logger;
    private final ClientTokenProvider clientTokenProvider;
    private final JobProcessor jobProcessor;
    private final String codePipelineActionProviderName;
    
    public ApicaLoadtestJobWorkerConfiguration(Properties properties, String codePipelineEndpoints, LoggerService logger
            , ClientTokenProvider clientTokenProvider, JobProcessor jobProcessor)
    {        
        super(Long.parseLong(properties.getProperty(ConfigKeys.POLL_INTERVAL_MILLIS_KEY, "30000")),
                Integer.parseInt(properties.getProperty(ConfigKeys.WORKER_THREADS_KEY, "1")),
                Integer.parseInt(properties.getProperty(ConfigKeys.POLL_BATCH_SIZE_KEY, "1")), 
                codePipelineEndpoints);
        this.logger = logger;
        this.clientTokenProvider = clientTokenProvider;
        this.jobProcessor = jobProcessor;
        this.codePipelineActionProviderName = properties.getProperty(ConfigKeys.CODE_PIPELINE_ACTION_PROVIDER_NAME_KEY, "Apica");
    }    
    
    public ActionTypeId getActionType()
    {
        return new ActionTypeId("Test", "ThirdParty", codePipelineActionProviderName, "1");
    }

    @Override
    public JobService jobService(String codePipelineEndpoint)
    {
        
        return new ApicaLoadtestCodePipelineJobService(codePipelineClient(codePipelineEndpoint), 
                getActionType(), clientTokenProvider(), logger);
    }

    public ClientTokenProvider clientTokenProvider()
    {
        return this.clientTokenProvider;
    }
    
    @Override
    protected JobProcessor jobProcessor()
    {
        return this.jobProcessor;
    }
}

ActionTypeId tells CP what type of code pipeline runner your third party action is:

package com.apica.awscodepipelinebuildrunner.model;

import com.apica.awscodepipelinebuildrunner.Validator;

public class ActionTypeId
{

    private final String category;
    private final String owner;
    private final String provider;
    private final String version;
    
    public ActionTypeId(final String category, final String owner, final String provider, final String version)
    {
        Validator.notNull(category);
        Validator.notNull(owner);
        Validator.notNull(provider);
        Validator.notNull(version);
        this.category = category;
        this.owner = owner;
        this.provider = provider;
        this.version = version;
    }

    public String getCategory()
    {
        return category;
    }

    public String getOwner()
    {
        return owner;
    }

    public String getProvider()
    {
        return provider;
    }

    public String getVersion()
    {
        return version;
    }

    @Override
    public String toString()
    {
        return "[category=" + category + ", owner=" + owner + ", provider=" + provider + ", version=" + version + "]";
    }
}

The category and other values will make sure that the Apica third party action will turn up in the list of test drivers in the CP GUI.

We’re done with the configuration class. The next post will discuss the job poller implementation.

View all posts related to Amazon Web Services and Big Data here.

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a comment

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.