Introduction to Amazon Code Pipeline with Java part 22: the job poller implementation

Introduction

In the previous post we looked at the job service interface and its implementation. The job service is responsible for polling for new jobs, acknowledging them and setting their final statuses to either success or failure. Much of the implementation uses the AWS client library to execute the work. We also know that each job polling thread will get a job service. The functions in the job service will be called by the job poller at a fixed interval.

We also started looking into the job poller implementation briefly but the job service took up most of the discussion. In this post we’ll finish the job poller class.

The job poller

In the previous post we only got as far as discussing the private fields and the constructor dependencies of the CodePipelineJobPoller class. The execute method implementation and other private helper methods are still unknown at this stage. We’ll discuss those now.

We’ll start with the private methods. reportJobStatus calls upon the job service to set the status of the job to either success or failure:

private void reportJobStatus(final WorkItem workItem, final WorkResult result)
    {
        Validator.notNull(workItem);
        Validator.notNull(result);

        if (WorkResultStatus.Success.equals(result.getStatus()))
        {           

            jobService.putJobSuccess(workItem.getJobId(),
                    workItem.getClientId(),
                    result.getExecutionDetails(),
                    result.getCurrentRevision(),
                    result.getContinuationToken());
        } else
        {            
            jobService.putJobFailure(workItem.getJobId(),
                    workItem.getClientId(),
                    result.getFailureDetails());
        }
    }

The branching depends on the status returned by WorkResult. Recall that WorkResultStatus is an enumeration with two members:

package com.apica.awscodepipelinebuildrunner.model;

public enum WorkResultStatus
{
    Success,
    Failure
}

The other private function in CodePipelineJobPoller returns a Runnable, i.e. it is meant to be executed on a parallel thread. The function is responsible for handing the CP job over to the job processor. We looked at the job processor interface and its implementation before. Here’s the newProcessWorkItemRunnable function that returns the Runnable object:

private Runnable newProcessWorkItemRunnable(final WorkItem workItem)
    {
        return () ->
        {
            try
            {
                JobStatus jobStatus = jobService.acknowledgeJob(workItem.getJobId(),
                        workItem.getClientId(), workItem.getJobNonce());
                if (JobStatus.InProgress.equals(jobStatus))
                {
                    final WorkResult result = jobProcessor.process(workItem);
                    reportJobStatus(workItem, result);
                }
            } catch (RuntimeException e)
            {
                //log the exception
                WorkResult failure = WorkResult.failure(
                        workItem.getJobId(),
                        new FailureDetails(FailureType.JobFailed, e.getMessage()));
                reportJobStatus(workItem, failure);
            }
        };
    }

JobStatus is also en enumeration which holds a number of possible job statuses:

package com.apica.awscodepipelinebuildrunner.model;

public enum JobStatus
{
    /**
     * Job has been created but not enqueued yet. Is not available for polling.
     */
    Created,
    /**
     * Job has been enqueued and is available for polling.
     */
    Queued,
    /**
     * Job has been handed out by poll for jobs.
     */
    Dispatched,
    /**
     * Job worker acknowledged the job.
     */
    InProgress,
    /**
     * Job timed out because it has not been processed in time.
     */
    TimedOut,
    /**
     * Job worker reported success.
     */
    Succeeded,
    /**
     * Job worker reported failure.
     */
    Failed
}

We can finally look at the implemented execute method:

@Override
public void execute()
    {
        List<WorkItem> workItems = null;
        final int batchSize = pollBatchSize - executorService.getActiveCount();
        
        try
        {            
            if (batchSize > 0)
            {
                int pollingBatchSize = Math.min(batchSize, pollBatchSize);                
                workItems = jobService.pollForJobs(pollingBatchSize);

                for (final WorkItem workItem : workItems)
                {
                    if (workItem.isWorkItemRetrievalSuccess())
                    {                        
                        try
                        {
                            executorService.submit(newProcessWorkItemRunnable(workItem));
                        } catch (final RejectedExecutionException e)
                        {
                            //log the exception
                            WorkResult failure = WorkResult.failure(
                                    workItem.getJobId(),
                                    new FailureDetails(FailureType.JobFailed, e.getMessage()));
                            reportJobStatus(workItem, failure);
                        }
                    } else
                    {
                        WorkResult failure = WorkResult.failure(
                                workItem.getJobId(),
                                new FailureDetails(FailureType.JobFailed, workItem.getExceptionMessage()));
                        try
                        {
                            reportJobStatus(workItem, failure);
                        } catch (AmazonClientException ace)
                        {
                            //log the exception
                        }
                    }
                }
            }

        } catch (AmazonClientException ace)
        {
            //log the exception
        } catch (Exception ex)
        {            
            //put each job to failed otherwise it will stay "in progress" in codepipeline
            if (workItems != null && !workItems.isEmpty())
            {
                for (final WorkItem workItem : workItems)
                {
                    WorkResult failure = WorkResult.failure(
                            workItem.getJobId(),
                            new FailureDetails(FailureType.JobFailed, ex.getMessage()));
                    reportJobStatus(workItem, failure);
                }
            }
        }
        
    }

The execute method calls upon the job service to poll for new jobs. The maximum number of jobs returned is specified by the pollingBatchSize variable. If there are any new jobs then each of them is handed over to the thread executor service.

There you are, that’s a minimal job poller implementation.

In the next part we’ll wrap up the job worker daemon class.

View all posts related to Amazon Web Services and Big Data here.

Advertisement

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: