Introduction to Amazon Code Pipeline with Java part 21: the job service

Introduction

In the previous post we looked at the job worker configuration class. It’s not a very complicated implementation. It basically constructs a job poller for each CP region the job agent will monitor. It also constructs an AWS client with the proper AWS credentials.

There are a couple more building blocks of the third party action before we have a functioning system. One of them is the implementation of the JobPoller interface which is the topic of this post. In particular we’ll be looking at one of the dependencies of the job poller, the job service class.

The job poller and the job service

Recall that the role of the job poller is to periodically poll an AWS CP endpoint for new jobs. In our example the job agent is configured to check for new jobs every 10 seconds. The JobPoller interface has a single function called execute. The implementation should show what the job poller will do with any new job that comes from CP.

The JobPoller interface is implemented by the CodePipelineJobPoller class. Recall that one such job poller is constructed for each CP endpoint of the job agent in the job worker configuration class:

public List<JobPoller> jobPollers(LoggerService logger)
    {
        List<JobPoller> pollers = new ArrayList<>();
        
        String[] codePipelineEndpointsArray = this.codePipelineEndpoints.split(";");
        for (String endpoint : codePipelineEndpointsArray)
        {
            pollers.add(new CodePipelineJobPoller(jobService(endpoint), jobProcessor(), 
                    threadPoolExecutor(), pollBatchSize, logger, endpoint, UUID.randomUUID().toString()));
        }
        return pollers;
    }

Let’s start with the dependencies and private fields of the class:

package com.apica.awscodepipelinebuildrunner;

import com.amazonaws.AmazonClientException;
import com.apica.awscodepipelinebuildrunner.logging.LogMessageProperties;
import com.apica.awscodepipelinebuildrunner.model.FailureDetails;
import com.apica.awscodepipelinebuildrunner.model.FailureType;
import com.apica.awscodepipelinebuildrunner.model.JobStatus;
import com.apica.awscodepipelinebuildrunner.model.WorkItem;
import com.apica.awscodepipelinebuildrunner.model.WorkResult;
import com.apica.awscodepipelinebuildrunner.model.WorkResultStatus;
import com.apica.awscodepipelinebuildrunner.monitoring.JobAgentMonitoringSharedService;
import com.apica.awscodepipelinebuildrunner.monitoring.JobAgentWorkerThread;
import com.apica.awscodepipelinebuildrunner.monitoring.WorkerThreadWorkItem;
import com.apica.awscodepipelinebuildrunner.util.CommonUtils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import com.apica.awscodepipelinebuildrunner.logging.LoggerService;

public class CodePipelineJobPoller implements JobPoller
{

    private final JobProcessor jobProcessor;
    private final JobService jobService;
    private final ThreadPoolExecutor executorService;
    private final int pollBatchSize;
    private final LoggerService logger;
    private final String codePipelineEndpoint;
    private final String jobPollerIdentifier;

    
    public CodePipelineJobPoller(final JobService jobService,
            final JobProcessor jobProcessor,
            final ThreadPoolExecutor executorService,
            final int pollBatchSize,
            final LoggerService logger,
            final String codePipelineEndpoint,
            final String jobPollerIdentifier)
    {
        Validator.notNull(jobService);
        Validator.notNull(jobProcessor);
        Validator.notNull(executorService);
        Validator.notNull(logger);
        this.jobService = jobService;
        this.jobProcessor = jobProcessor;
        this.executorService = executorService;
        this.pollBatchSize = pollBatchSize;
        this.logger = logger;
        this.codePipelineEndpoint = codePipelineEndpoint;
        this.jobPollerIdentifier = jobPollerIdentifier;
    }

    @Override
    public void execute()
    {
        
    }    
}

The JobService interface is new and we’ll look at its details below. We are familiar with other parameters by now with the exception of the poller identifier. Don’t worry about it too much, we only use it for logging purposes. We assign an GUID or UUID to each job poller which is then used as an identifier in the logs.

The job service provides a couple of methods around job handling: polling, acknowledging and status updates. Here’s the interface:

package com.apica.awscodepipelinebuildrunner;

import com.apica.awscodepipelinebuildrunner.model.CurrentRevision;
import com.apica.awscodepipelinebuildrunner.model.ExecutionDetails;
import com.apica.awscodepipelinebuildrunner.model.FailureDetails;
import com.apica.awscodepipelinebuildrunner.model.JobStatus;
import com.apica.awscodepipelinebuildrunner.model.WorkItem;
import java.util.List;

public interface JobService
{
    List<WorkItem> pollForJobs(int maxBatchSize);
    JobStatus acknowledgeJob(String jobId, String clientId, String nonce);

    void putJobSuccess(String jobId,
                       String clientId,
                       ExecutionDetails executionDetails,
                       CurrentRevision currentRevision,
                       String continuationToken);

    void putJobFailure(String jobId,
                       String clientId,
                       FailureDetails failureDetails);
}

The implementation is called ApicaLoadtestCodePipelineJobService and it is returned in the overridden jobService function of ApicaLoadtestJobWorkerConfiguration:

@Override
    public JobService jobService(String codePipelineEndpoint)
    {
        
        return new ApicaLoadtestCodePipelineJobService(codePipelineClient(codePipelineEndpoint), 
                getActionType(), clientTokenProvider(), logger);
    }

Here’s the JobService interface implementation with some unnecessary items, like logging, stripped out:

package com.apica.awscodepipelinebuildrunner.plugin.thirdparty;

import com.amazonaws.services.codepipeline.AWSCodePipelineClient;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.codepipeline.model.AcknowledgeThirdPartyJobRequest;
import com.amazonaws.services.codepipeline.model.AcknowledgeThirdPartyJobResult;
import com.amazonaws.services.codepipeline.model.GetThirdPartyJobDetailsRequest;
import com.amazonaws.services.codepipeline.model.GetThirdPartyJobDetailsResult;
import com.amazonaws.services.codepipeline.model.PollForThirdPartyJobsRequest;
import com.amazonaws.services.codepipeline.model.PollForThirdPartyJobsResult;
import com.amazonaws.services.codepipeline.model.PutThirdPartyJobFailureResultRequest;
import com.amazonaws.services.codepipeline.model.PutThirdPartyJobSuccessResultRequest;
import com.amazonaws.services.codepipeline.model.ThirdPartyJob;
import com.amazonaws.services.codepipeline.model.ThirdPartyJobData;
import com.amazonaws.services.codepipeline.model.ThirdPartyJobDetails;
import com.amazonaws.util.json.Jackson;
import com.apica.awscodepipelinebuildrunner.JobService;
import com.apica.awscodepipelinebuildrunner.Validator;
import com.apica.awscodepipelinebuildrunner.logging.LogMessageProperties;
import com.apica.awscodepipelinebuildrunner.model.ActionTypeId;
import com.apica.awscodepipelinebuildrunner.model.CurrentRevision;
import com.apica.awscodepipelinebuildrunner.model.ExecutionDetails;
import com.apica.awscodepipelinebuildrunner.model.FailureDetails;
import com.apica.awscodepipelinebuildrunner.model.JobStatus;
import com.apica.awscodepipelinebuildrunner.model.WorkItem;
import com.apica.awscodepipelinebuildrunner.plugin.JobConverter;
import java.util.logging.Level;
import com.apica.awscodepipelinebuildrunner.logging.LoggerService;

public class ApicaLoadtestCodePipelineJobService implements JobService
{

    private final AWSCodePipelineClient codePipelineClient;
    private final ActionTypeId actionType;
    private final ClientTokenProvider clientTokenProvider;
    private final LoggerService logger;

    public ApicaLoadtestCodePipelineJobService(final AWSCodePipelineClient codePipelineClient,
            final ActionTypeId actionType, final ClientTokenProvider clientTokenProvider,
            final LoggerService logger)
    {
        Validator.notNull(codePipelineClient);
        Validator.notNull(actionType);
        Validator.notNull(clientTokenProvider);
        Validator.notNull(logger);
        this.codePipelineClient = codePipelineClient;
        this.actionType = actionType;
        this.clientTokenProvider = clientTokenProvider;
        this.logger = logger;
    }

    /**
     * Polls for jobs for the configured action type of the job worker.
     *
     * @param maxBatchSize maximum number of jobs to be returned by the poll
     * api.
     * @return List of work items.
     */
    @Override
    public List<WorkItem> pollForJobs(final int maxBatchSize)
    {
        final List<WorkItem> result = new ArrayList<>();

        final PollForThirdPartyJobsRequest pollForJobsRequest = new PollForThirdPartyJobsRequest();
        pollForJobsRequest.setActionTypeId(getActionType());
        pollForJobsRequest.setMaxBatchSize(maxBatchSize);

        final PollForThirdPartyJobsResult pollForJobsResult = codePipelineClient.pollForThirdPartyJobs(pollForJobsRequest);
        
        for (final ThirdPartyJob job : pollForJobsResult.getJobs())
        {
            try
            {
                ThirdPartyJobDetails jobDetails = getJobDetails(job.getJobId(), job.getClientId());
                ThirdPartyJobData thirdPartyJobData = jobDetails.getData();
                WorkItem workItem = JobConverter.convert(job.getClientId(), jobDetails);
                workItem.setWorkItemRetrievalSuccess(true);                
                result.add(workItem);
            } catch (Exception ex)
            {
                //handle exception
            }

        }
        return result;
    }
    
    @Override
    public JobStatus acknowledgeJob(final String jobId, final String clientId, final String nonce)
    {        
        final AcknowledgeThirdPartyJobRequest request = new AcknowledgeThirdPartyJobRequest();
        request.setJobId(jobId);
        request.setNonce(nonce);
        request.setClientToken(clientTokenProvider.lookupClientSecret(clientId));
        final AcknowledgeThirdPartyJobResult result = codePipelineClient.acknowledgeThirdPartyJob(request);
        return JobStatus.valueOf(result.getStatus());
    }
    
    @Override
    public void putJobSuccess(final String jobId,
            final String clientId,
            final ExecutionDetails executionDetails,
            final CurrentRevision currentRevision,
            final String continuationToken)
    {        
        final PutThirdPartyJobSuccessResultRequest request = new PutThirdPartyJobSuccessResultRequest();
        request.setJobId(jobId);
        request.setClientToken(clientTokenProvider.lookupClientSecret(clientId));
        request.setExecutionDetails(JobConverter.convert(executionDetails));
        request.setCurrentRevision(JobConverter.convert(currentRevision));
        request.setContinuationToken(continuationToken);
        codePipelineClient.putThirdPartyJobSuccessResult(request);
    }
    
    @Override
    public void putJobFailure(final String jobId, final String clientId, final FailureDetails failureDetails)
    {        
        final PutThirdPartyJobFailureResultRequest request = new PutThirdPartyJobFailureResultRequest();
        request.setJobId(jobId);
        request.setClientToken(clientTokenProvider.lookupClientSecret(clientId));
        request.setFailureDetails(JobConverter.convert(failureDetails));
        codePipelineClient.putThirdPartyJobFailureResult(request);
    }

    private ThirdPartyJobDetails getJobDetails(final String jobId, final String clientId)
    {
        final GetThirdPartyJobDetailsRequest getJobDetailsRequest = new GetThirdPartyJobDetailsRequest();
        getJobDetailsRequest.setJobId(jobId);
        String clientSecret = clientTokenProvider.lookupClientSecret(clientId);
        getJobDetailsRequest.setClientToken(clientSecret);
        final GetThirdPartyJobDetailsResult getJobDetailsResult
                = codePipelineClient.getThirdPartyJobDetails(getJobDetailsRequest);

        return getJobDetailsResult.getJobDetails();
    }

    private com.amazonaws.services.codepipeline.model.ActionTypeId getActionType()
    {
        return JobConverter.convert(actionType);
    }
}

The implemented methods use the AWS CP client to find new jobs, acknowledge them and update their final statuses. Classes like PollForThirdPartyJobsRequest and GetThirdPartyJobDetailsRequest are from the AWS CP library.

The JobConverter class provides a number of conversion functions that map between AWS specific objects and their Apica specific counterparts:

package com.apica.awscodepipelinebuildrunner.plugin;

import com.amazonaws.services.codepipeline.model.Job;
import com.amazonaws.services.codepipeline.model.S3ArtifactLocation;
import com.amazonaws.services.codepipeline.model.ThirdPartyJobDetails;
import com.apica.awscodepipelinebuildrunner.model.AWSSessionCredentials;
import com.apica.awscodepipelinebuildrunner.model.ActionTypeId;
import com.apica.awscodepipelinebuildrunner.model.Artifact;
import com.apica.awscodepipelinebuildrunner.model.CurrentRevision;
import com.apica.awscodepipelinebuildrunner.model.ExecutionDetails;
import com.apica.awscodepipelinebuildrunner.model.FailureDetails;
import com.apica.awscodepipelinebuildrunner.model.JobData;
import com.apica.awscodepipelinebuildrunner.model.WorkItem;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class JobConverter
{

    /**
     * Converts the custom action job details into a work item which can be
     * processed by the job worker.
     *
     */
    public final static WorkItem convert(final Job job)
    {
        return new WorkItem(job.getId(),
                job.getNonce(),
                JobConverter.convert(job.getData()),
                job.getAccountId());
    }

    /**
     * Converts the third party job details into a work item which can be
     * processed by the job worker.
     */
    public final static WorkItem convert(final String clientId, final ThirdPartyJobDetails jobDetails)
    {
        return new WorkItem(jobDetails.getId(),
                jobDetails.getNonce(),
                convert(jobDetails.getData()),
                clientId);
    }

    private final static JobData convert(final com.amazonaws.services.codepipeline.model.JobData jobData)
    {
        Map<String, String> actionConfiguration = null;
        if (jobData.getActionConfiguration() != null)
        {
            actionConfiguration = jobData.getActionConfiguration().getConfiguration();
        }
        return new JobData(actionConfiguration,
                convert(jobData.getInputArtifacts()),
                convert(jobData.getOutputArtifacts()),
                convert(jobData.getArtifactCredentials()),
                jobData.getContinuationToken());
    }

    private final static JobData convert(final com.amazonaws.services.codepipeline.model.ThirdPartyJobData jobData)
    {
        Map<String, String> actionConfiguration = null;
        if (jobData.getActionConfiguration() != null)
        {
            actionConfiguration = jobData.getActionConfiguration().getConfiguration();
        }
        return new JobData(actionConfiguration,
                convert(jobData.getInputArtifacts()),
                convert(jobData.getOutputArtifacts()),
                convert(jobData.getArtifactCredentials()),
                jobData.getContinuationToken());
    }

    private final static AWSSessionCredentials convert(final com.amazonaws.services.codepipeline.model.AWSSessionCredentials actionCredentials)
    {
        if (actionCredentials == null)
        {
            return null;
        }
        return new AWSSessionCredentials(actionCredentials.getAccessKeyId(),
                actionCredentials.getSecretAccessKey(),
                actionCredentials.getSessionToken());
    }

    private final static List<Artifact> convert(final List<com.amazonaws.services.codepipeline.model.Artifact> artifacts)
    {
        if (artifacts == null)
        {
            return null;
        }
        return artifacts.stream()
                .map(a -> convert(a))
                .collect(Collectors.toList());
    }

    private final static Artifact convert(final com.amazonaws.services.codepipeline.model.Artifact artifact)
    {
        String bucketName = null;
        String objectKey = null;
        if (artifact.getLocation() != null && artifact.getLocation().getS3Location() != null)
        {
            S3ArtifactLocation s3ArtifactLocation = artifact.getLocation().getS3Location();
            bucketName = s3ArtifactLocation.getBucketName();
            objectKey = s3ArtifactLocation.getObjectKey();
        }
        return new Artifact(artifact.getName(),
                artifact.getRevision(),
                bucketName,
                objectKey);
    }

    /**
     * Converts the current revision structure to the third party model.
     *
     */
    public final static com.amazonaws.services.codepipeline.model.CurrentRevision convert(final CurrentRevision currentRevision)
    {
        if (currentRevision == null)
        {
            return null;
        }
        com.amazonaws.services.codepipeline.model.CurrentRevision result
                = new com.amazonaws.services.codepipeline.model.CurrentRevision();
        result.setChangeIdentifier(currentRevision.getChangeIdentifier());
        result.setRevision(currentRevision.getRevision());
        return result;
    }

    /**
     * Converts the execution details structure to the third party model.
     *
     */
    public final static com.amazonaws.services.codepipeline.model.ExecutionDetails convert(final ExecutionDetails executionDetails)
    {
        if (executionDetails == null)
        {
            return null;
        }
        com.amazonaws.services.codepipeline.model.ExecutionDetails result = new com.amazonaws.services.codepipeline.model.ExecutionDetails();
        result.setExternalExecutionId(executionDetails.getExternalExecutionId());
        result.setSummary(executionDetails.getSummary());
        result.setPercentComplete(executionDetails.getPercentComplete());
        return result;
    }

    /**
     * Converts the failure details structure to the third party model.
     *
     */
    public final static com.amazonaws.services.codepipeline.model.FailureDetails convert(final FailureDetails failureDetails)
    {
        com.amazonaws.services.codepipeline.model.FailureDetails result = new com.amazonaws.services.codepipeline.model.FailureDetails();
        result.setType(failureDetails.getType().toString());
        result.setMessage(failureDetails.getMessage());
        return result;
    }

    /**
     * Converts the third party action type structure into the internal job
     * worker structure.
     */
    public final static com.amazonaws.services.codepipeline.model.ActionTypeId convert(final ActionTypeId actionTypeId)
    {
        final com.amazonaws.services.codepipeline.model.ActionTypeId result = new com.amazonaws.services.codepipeline.model.ActionTypeId();
        result.setCategory(actionTypeId.getCategory());
        result.setOwner(actionTypeId.getOwner());
        result.setProvider(actionTypeId.getProvider());
        result.setVersion(actionTypeId.getVersion());
        return result;
    }
}

We’re done with the job service. In the next post we’ll continue with the job poller implementation.

View all posts related to Amazon Web Services and Big Data here.

Advertisements

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

Robin Sedlaczek's Blog

Developer on Microsoft Technologies

HarsH ReaLiTy

A Good Blog is Hard to Find

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: