Introduction to Amazon Code Pipeline with Java part 23: job worker daemon wrap-up

Introduction

In the previous post we finished discussing the job poller implementation. The job poller’s execute function runs every time the job agent polls Code Pipeline for new jobs. The job poller is responsible for pulling the new jobs and handing them over to the job service and job processor that will process them in some way. The poller will also send a signal to CP whether the job was a success or failure. The exact branching depends on the job status.

We’re now ready to return to the job worker daemon implementation. We’ve seen all the classes that the job worker calls upon either directly or indirectly so it’s time to connect the parts. We’ll do that in this post.

The job worker daemon

Recall that the job worker daemon is constructed and started in the servlet initiator:

Daemon jobAgentDaemon = new JobWorkerDaemon(Executors.newScheduledThreadPool(1),
                            properties, new CentralLogger(properties), 
                            new LtpApiClientTokenProvider(codePipelineService), jobProcessor);
                  DaemonContext jobAgentDaemonContext = new DaemonLoader.Context();

                    try
                    {
                        jobAgentDaemon.init(jobAgentDaemonContext);
                        jobAgentDaemon.start();
                    } catch (Exception ex)
                    {
                        //handle exception
                    }

We’ve seen some parts of the daemon but not the whole picture yet:


package com.apica.awscodepipelinebuildrunner;

import com.apica.awscodepipelinebuildrunner.configuration.JobWorkerConfiguration;
import com.apica.awscodepipelinebuildrunner.configuration.ApicaLoadtestJobWorkerConfiguration;
import com.apica.awscodepipelinebuildrunner.logging.LogMessageProperties;
import com.apica.awscodepipelinebuildrunner.plugin.thirdparty.ClientTokenProvider;
import com.apica.awscodepipelinebuildrunner.repository.ConfigKeys;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
import org.apache.commons.daemon.DaemonInitException;
import com.apica.awscodepipelinebuildrunner.logging.LoggerService;

/**
 * The daemon schedules the poller at a fixed time rate.
 */
public class JobWorkerDaemon implements Daemon
{

    private final ScheduledExecutorService executorService;
    private List<JobPoller> jobPollers;
    private final Properties properties;
    private long pollingIntervalInMs;
    private final LoggerService logger;
    private final ClientTokenProvider clientTokenProvider;
    private final JobProcessor jobProcessor;

    /**
     * Initializes daemon with a custom scheduled executor service and poller.
     *     
     */
    public JobWorkerDaemon(final ScheduledExecutorService executorService,
            final Properties properties, final LoggerService logger, 
            final ClientTokenProvider clientTokenProvider, JobProcessor jobProcessor)
    {
        Validator.notNull(executorService);
        Validator.notNull(properties);
        Validator.notNull(logger);
        Validator.notNull(clientTokenProvider);
        this.executorService = executorService;
        this.logger = logger;
        this.properties = properties;
        this.clientTokenProvider = clientTokenProvider;
        this.jobProcessor = jobProcessor;
        initConfiguration(this.properties);
    }

    /**
     * Initializes the daemon.
     *     
     */
    @Override
    public void init(final DaemonContext context) throws DaemonInitException
    {

        
    }

    /**
     * Starts the daemon. Initializes the executor service to execute the job
     * poller at a fixed rate.     
     */
    @Override
    public void start() throws Exception
    {
        executorService.scheduleAtFixedRate(jobPollerRunnable(),
                0,
                pollingIntervalInMs,
                TimeUnit.MILLISECONDS);
    }

    /**
     * Stops the daemon. Shuts down the executor service gracefully. Waits until
     * the job poller and job processors finished their work     
     */
    public void stop() throws Exception
    {        

        this.executorService.shutdown();
        try
        {
            if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES))
            {
                this.executorService.shutdownNow();
                if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES))
                {
                    throw new IllegalStateException("Failed graceful shutdown of executor threads");
                }
            }
        } catch (InterruptedException e)
        {
            this.executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }        
    }

    /**
     * Destroys the daemon.
     */
    @Override
    public void destroy()
    {
        
    }

    private Runnable jobPollerRunnable()
    {
        return () ->
        {
            try
            {
                jobPollers.stream().forEach((jobPoller) ->
                {
                    jobPoller.execute();
                });
            } catch (final RuntimeException e)
            {
                //log the exception
            }
        };
    }
    
    private void initConfiguration(final Properties properties)
    {
        long pollIntervalMillis = Long.parseLong(properties.getProperty(ConfigKeys.POLL_INTERVAL_MILLIS_KEY, "30000"));
        String delimitedCodePipelineEndpoints = properties.getProperty(ConfigKeys.CODE_PIPELINE_ENDPOINTS_KEY, "https://codepipeline-preview.us-east-1.amazonaws.com");
        JobWorkerConfiguration jobWorkerConfiguration = 
                new ApicaLoadtestJobWorkerConfiguration(properties, delimitedCodePipelineEndpoints, 
                        this.logger, this.clientTokenProvider, this.jobProcessor);
        this.jobPollers = jobWorkerConfiguration.jobPollers(this.logger);
        this.pollingIntervalInMs = pollIntervalMillis;
    }
}

We initialise the building blocks of the job polling mechanism in the initConfiguration function. We’ve gone through the contents of this function in the past couple of posts. The start method schedules the job poller at a fixed rate of 10 seconds or whatever is specified in the pollingIntervalInMs parameter. The scheduleAtFixedRate function requires a Runnable object which is returned by the jobPollerRunnable method.

This was the last element for a basic implementation of a third party job runner in Code Pipeline. We’ll summarise what we’ve gone though in the next post which will then close this series.

View all posts related to Amazon Web Services and Big Data here.

Advertisement

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: