Introduction to Amazon Code Pipeline with Java part 22: the job poller implementation
August 20, 2016 Leave a comment
Introduction
In the previous post we looked at the job service interface and its implementation. The job service is responsible for polling for new jobs, acknowledging them and setting their final statuses to either success or failure. Much of the implementation uses the AWS client library to execute the work. We also know that each job polling thread will get a job service. The functions in the job service will be called by the job poller at a fixed interval.
We also started looking into the job poller implementation briefly but the job service took up most of the discussion. In this post we’ll finish the job poller class.
The job poller
In the previous post we only got as far as discussing the private fields and the constructor dependencies of the CodePipelineJobPoller class. The execute method implementation and other private helper methods are still unknown at this stage. We’ll discuss those now.
We’ll start with the private methods. reportJobStatus calls upon the job service to set the status of the job to either success or failure:
private void reportJobStatus(final WorkItem workItem, final WorkResult result) { Validator.notNull(workItem); Validator.notNull(result); if (WorkResultStatus.Success.equals(result.getStatus())) { jobService.putJobSuccess(workItem.getJobId(), workItem.getClientId(), result.getExecutionDetails(), result.getCurrentRevision(), result.getContinuationToken()); } else { jobService.putJobFailure(workItem.getJobId(), workItem.getClientId(), result.getFailureDetails()); } }
The branching depends on the status returned by WorkResult. Recall that WorkResultStatus is an enumeration with two members:
package com.apica.awscodepipelinebuildrunner.model; public enum WorkResultStatus { Success, Failure }
The other private function in CodePipelineJobPoller returns a Runnable, i.e. it is meant to be executed on a parallel thread. The function is responsible for handing the CP job over to the job processor. We looked at the job processor interface and its implementation before. Here’s the newProcessWorkItemRunnable function that returns the Runnable object:
private Runnable newProcessWorkItemRunnable(final WorkItem workItem) { return () -> { try { JobStatus jobStatus = jobService.acknowledgeJob(workItem.getJobId(), workItem.getClientId(), workItem.getJobNonce()); if (JobStatus.InProgress.equals(jobStatus)) { final WorkResult result = jobProcessor.process(workItem); reportJobStatus(workItem, result); } } catch (RuntimeException e) { //log the exception WorkResult failure = WorkResult.failure( workItem.getJobId(), new FailureDetails(FailureType.JobFailed, e.getMessage())); reportJobStatus(workItem, failure); } }; }
JobStatus is also en enumeration which holds a number of possible job statuses:
package com.apica.awscodepipelinebuildrunner.model; public enum JobStatus { /** * Job has been created but not enqueued yet. Is not available for polling. */ Created, /** * Job has been enqueued and is available for polling. */ Queued, /** * Job has been handed out by poll for jobs. */ Dispatched, /** * Job worker acknowledged the job. */ InProgress, /** * Job timed out because it has not been processed in time. */ TimedOut, /** * Job worker reported success. */ Succeeded, /** * Job worker reported failure. */ Failed }
We can finally look at the implemented execute method:
@Override public void execute() { List<WorkItem> workItems = null; final int batchSize = pollBatchSize - executorService.getActiveCount(); try { if (batchSize > 0) { int pollingBatchSize = Math.min(batchSize, pollBatchSize); workItems = jobService.pollForJobs(pollingBatchSize); for (final WorkItem workItem : workItems) { if (workItem.isWorkItemRetrievalSuccess()) { try { executorService.submit(newProcessWorkItemRunnable(workItem)); } catch (final RejectedExecutionException e) { //log the exception WorkResult failure = WorkResult.failure( workItem.getJobId(), new FailureDetails(FailureType.JobFailed, e.getMessage())); reportJobStatus(workItem, failure); } } else { WorkResult failure = WorkResult.failure( workItem.getJobId(), new FailureDetails(FailureType.JobFailed, workItem.getExceptionMessage())); try { reportJobStatus(workItem, failure); } catch (AmazonClientException ace) { //log the exception } } } } } catch (AmazonClientException ace) { //log the exception } catch (Exception ex) { //put each job to failed otherwise it will stay "in progress" in codepipeline if (workItems != null && !workItems.isEmpty()) { for (final WorkItem workItem : workItems) { WorkResult failure = WorkResult.failure( workItem.getJobId(), new FailureDetails(FailureType.JobFailed, ex.getMessage())); reportJobStatus(workItem, failure); } } } }
The execute method calls upon the job service to poll for new jobs. The maximum number of jobs returned is specified by the pollingBatchSize variable. If there are any new jobs then each of them is handed over to the thread executor service.
There you are, that’s a minimal job poller implementation.
In the next part we’ll wrap up the job worker daemon class.
View all posts related to Amazon Web Services and Big Data here.