Spock and testing RESTful API services

November 5, 2012 3 comments

It’s been a while but here’s my next Blog post, hope you like it.

Spock is a BBD testing framework that allows for easy BDD tests to be written. The framework is an extension upon JUnit which allows for easy IDE integration and using existing JUnit functionality. Spock tests are written in Groovy and can be used for writing a wide range of tests from small unit tests to full application integration tests.

Without going into too much detail on how to write Spock based tests (see below for a few excellent links), lets go through how we can use the framework to build integration tests for testing a RESTful API.

Our first RESTful API Test

package com.wolfware.integration

import groovyx.net.http.RESTClient
import spock.lang.*

import spock.lang.Specification

import com.movideo.runtime.extension.custom.APIVersion
import com.movideo.runtime.extension.custom.EnvironmentEndPoint

@APIVersion(minimimApiVersion="1.0.0.0")
class GetAuthenticationToken extends Specification {

	@EnvironmentEndPoint
	protected def environmentHost

	def "Get authentication token XML from API for valid account"() {
		given: "a valid account"
			def authenticationTokenRequestParams = ['key':"AAABBBCCC123", 'user':"myauthemail@bla.com"]

		and: "a client to get the authentication token XML"
			def client = new RESTClient(environmentHost)

		when: "we attempt to retrieve authentication token XML"
			def resp = client.get(path : "/authenticate", query : authenticationTokenRequestParams)

		then: "we should get a valid authentication token XML response"
			assert resp.data.token.isEmpty() == false
			// lots more asserts
	}
}

As you can see, apart from the @APIVersion and @EnvironmentEndPoint annotations (these are Spock extensions as explained later), the spec is a fairly simple Spock test.

This specification has a feature that, as the name suggests, gets a authentication token in XML format and validates it. Lets look at each step:

Given

  • The url parameters required to get a authentication token from the RESTful service

When

  • using the Groovy RestClient to call the RESTful service for the authentication token details

Then

  • We can assert all the details of the response.

The thing I really like about Spock is the readability of the tests. From the name being a descriptive sentence rather than some short hand with _ throughout to make a valid method name to being able to easily see where setup of the test is done and then the expectations and assertions.

Trying to test any environment RESTful service

I’ve found that when trying to write integration tests, there has either been:

  • Hard coded environment details and the code branched for each environment making it near impossible to keep code in sync as merge hell becomes the norm.
  • Config files that define the environment are used to define environment details, again checked into each branch for each environment.

Trying to follow the principles of continuous delivery, it would be great to be able to use the same code base to test against any environment. This is where Spock Extensions come into play to help us out.

Spock Extensions

In short Spock allows us to extend it to perform other functionality during the test life-cycle (a great post on extensions can be read on this excellent blog post). I’ve developed two extensions which help to make the idea of running the same test suite across different environments easier.

The @EnvironmentEndPoint Extension

The aim of this Spock extension is to have a placeholder variable in code that at run-time, can be defined with the environment host of the RESTful services that we want to test.

package com.movideo.runtime.extension.custom

import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.spockframework.runtime.extension.AbstractAnnotationDrivenExtension
import org.spockframework.runtime.extension.AbstractMethodInterceptor
import org.spockframework.runtime.extension.IMethodInvocation
import org.spockframework.runtime.model.FieldInfo
import org.spockframework.runtime.model.SpecInfo

/**
 * Spock Environment Annotation Extension
 */
class EnvironmentEndPointExtension extends AbstractAnnotationDrivenExtension<EnvironmentEndPoint> {

	private static final Log LOG = LogFactory.getLog(getClass());
	
	private static def config = new ConfigSlurper().parse(new File('src/test/resources/SpockConfig.groovy').toURL())

	/**
	 * env environment variable
	 * <p>
	 * Defaults to {@code LOCAL_END_POINT}
	 */
	private static final String envString = System.getProperties().getProperty("env", config.envHost);

	static {
		LOG.info("Environment End Point [" + envString + "]")
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	void visitFieldAnnotation(EnvironmentEndPoint annotation, FieldInfo field) {
		def interceptor = new EnvironmentInterceptor(field, envString)

		interceptor.install(field.parent.getTopSpec())
	}
}

/**
 * 
 * Environment Intercepter
 *
 */
class EnvironmentInterceptor extends AbstractMethodInterceptor {
	private final FieldInfo field
	private final String envString

	EnvironmentInterceptor(FieldInfo field, String envString) {
		this.field = field
		this.envString = envString
	}

	private void injectEnvironmentHost(target) {
		field.writeValue(target, envString)
	}

	@Override
	void interceptSetupMethod(IMethodInvocation invocation) {
		injectEnvironmentHost(invocation.target)
		invocation.proceed()
	}

	@Override
	void install(SpecInfo spec) {
		spec.setupMethod.addInterceptor this
	}
}

The EnvironmentEndPointExtension class defines the following:

  • config: is a ConfigSlurper that parses a config file ‘SpockConfig.groovy’ that is used to define the default environment host (envHost)
  • envString: gets the value of ‘env’ from all System Properties (these include run-time properties) and defaults to config.envHost

With the environment host able to be accessed by Spock, now we need to inject this into the placeholder variable for Spock tests to access.

An interceptor is created which is used to inject(field.writeValue method) the value of the environment host into the placeholder variable. This placeholder is the one that the @EnvironmentEndPoint is annotating.

When the test is run, the interceptor sets the placeholder variable and the test can then use this value as the host for the RestClient object.

When running the Spock tests either the default value from the config file will be used or the JVM argument -Denv=? can be used. This makes running the same test code base against any environment so much easier.

A note on Gradle builds.

By default, Gradle will not pass through JVM arguments through to forked processes such as running tests. The code snippet below shows how to achieve this:

/*
 * Required to pass all system properties to Test tasks. 
 * Not default for Gradle to pass system properties through to forked processes.
 */
tasks.withType(Test) {
	def config = new ConfigSlurper().parse(new File('src/test/resources/SpockConfig.groovy').toURL())
	
	systemProperty 'env', System.getProperty('env', config.envHost)
}

This allows all tasks that are a type of ‘Test’ to have some custom code run. In this case, we are defining the ‘SpockConfig.groovy’ config file and then setting ‘systemPropery’ within Gradle Test tasks to ‘env’ and either getting the value from the passed in JVM argument or from the config file.

With this code in the build.gradle, we’re able to run all tests via a Gradle test build, which will produce lovely test reports (in Gradle HTML and JUnit XML).

The @APIVersion Extension

Another integration testing problem I’ve found is that if we try and develop our tests first (or at least during the process of developing a feature or bug fix) that running the same tests against an environment that doesn’t yet have the new code base (but we are using the same test code base everywhere), we’ll have failing tests that aren’t really failures as the new code isn’t there yet.

To help solve this problem, I’ve developed the @APIVersion extension to help with this issue. As newly developed code should be deployed with a new version, we can use this version to compare to a minimum version that a test can be run against.

package com.movideo.runtime.extension.custom

import groovyx.net.http.RESTClient

import java.lang.annotation.Annotation
import java.util.regex.Pattern

import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.spockframework.runtime.extension.AbstractAnnotationDrivenExtension
import org.spockframework.runtime.model.FeatureInfo
import org.spockframework.runtime.model.SpecInfo

/**
 * API Version Extension
 *
 */
class APIVersionExtension extends AbstractAnnotationDrivenExtension<APIVersion> {

	/**
	 * Logger
	 */
	private static final Log LOG = LogFactory.getLog(getClass());

	/**
	 * 
	 */
	private static def config = new ConfigSlurper().parse(new File('src/test/resources/SpockConfig.groovy').toURL())

	/**
	 * env environment variable
	 * <p>
	 * Defaults to {@code LOCAL_END_POINT}
	 */
	private static final String envString = System.getProperties().getProperty("env", config.envHost);

	/**
	 * Version REGX pattern
	 */
	private static final def VERSION_PATTERN = Pattern.compile(".", Pattern.LITERAL);

	/**
	 * Max version length
	 */
	private static final def MAX_VERSION_LENGTH = 4;

	/**
	 * Current API Version
	 */
	private static final def CURRENT_API_VERSION = getDeployedAPIVersion();

	/**
	 * {@inheritDoc}
	 */
	@Override
	void visitFeatureAnnotation(APIVersion annotation, FeatureInfo feature) {
		if(!isApiVersionGreaterThanMinApiVersion(annotation, feature.name)) {
			feature.setSkipped(true)
		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void visitSpecAnnotation(APIVersion annotation, SpecInfo spec) {
		if(!isApiVersionGreaterThanMinApiVersion(annotation, spec.name)) {
			spec.setSkipped(true)
		}
	}

	/**
	 * Get the current deployed API version
	 * <p>
	 * Performs a HTTP request to the current deployed API version. Parses the returned data and get the {@code version} node data.
	 * @return current deployed API version
	 */
	private static String getDeployedAPIVersion() {
		def apiVersion = null

		try {
			def client = new RESTClient(envString)
			def resp = client.get(path : config.versionServiceUri)

			apiVersion = resp.data.version

			LOG.info("Current deployed API version [" + apiVersion + "]");
		} catch (ex) {
			APIVersionError apiVersionError = new APIVersionError("Error occurred attempting to get current deployed API version from %s", envString + config.versionServiceUri);
			apiVersionError.setStackTrace(ex.stackTrace);
			
			throw apiVersionError;
		}

		return apiVersion
	}

	/**
	 * 
	 * @param annotation
	 * @param infoName
	 * @return
	 */
	private boolean isApiVersionGreaterThanMinApiVersion(APIVersion annotation, String infoName) {
		def isApiVersionGreaterThanMinApiVersion = true

		def minApiVersionRequired = annotation.minimimApiVersion();

		// normalise both version id's
		def apiVersionNormalised = normaliseVersion(CURRENT_API_VERSION);
		def minApiVersionRequiredNormalised = normaliseVersion(minApiVersionRequired);

		// compare version id's
		int cmp = apiVersionNormalised.compareTo(minApiVersionRequiredNormalised);

		// if the comparison is less than 0, min API version is greater than the deployed API version
		if(cmp < 0) {
			LOG.info("min api version [" + minApiVersionRequired + "] greater than api version [" + CURRENT_API_VERSION + "], skipping [" + infoName + "]")
			isApiVersionGreaterThanMinApiVersion = false
		}

		return isApiVersionGreaterThanMinApiVersion
	}

	/**
	 * 
	 * @param version
	 * @return
	 */
	private String normaliseVersion(String version) {
		String[] split = VERSION_PATTERN.split(version);
		StringBuilder sb = new StringBuilder();

		for (String s : split) {
			sb.append(String.format("%" + MAX_VERSION_LENGTH + 's', s));
		}

		return sb.toString();
	}
}

The @APIVersion extension defines the same environment config as the @EnvironmentEndPoint extension does so that the environment can be injected and used purely for accessing the API version endpoint without the need for @EnvironmentEndPoint.

The RESTful API version endpoint is required to be setup and publicly available. The @APIVersion extension will call this service to get details about the version of RESTful API.

The version response data should be as follows:

<ServiceInformation>
  <serviceName>Media API</serviceName>
  <version>1.51.1</version>
</ServiceInformation>

The @APIVersion extension will look for the version data to define what the current deployed version of the RESTful API is. Once the version of the RESTful API is known, the extension then checks the minimum API version required, Eg. @APIVersion(minimimApiVersion="1.0.0.0"). The extension then uses this value to compare against the response data version and if the required version is greater than that of the deployed RESTful API services, then the test is skipped.

This extension annotation can be placed on Specification’s or Feature’s allowing whole Specs to have a minimum version and / or Features to have their own minimum version.

This extension has made writing integration tests with Spock even more portable and allows for a ‘build once’ set of tests that can be run against any environment, with some small changes to allow getting the API version.

The SpockConfig.groovy file

Here is an example of the SpockConfig.groovy config file used to configure defaults for both @EnvironmentEndPoint and @APIVersion extensions.

versionServiceUri="/public/serviceInformation"
envHost="http://api.wolfware.com"
  • The ‘versionServiceUri’ is required for @APIVersion extension as the URI for the RESTful API version
  • The ‘envHost’ is required for both @APIVersion and @EnvironmentEndPoint extensions as the host of the RESTful API

Go and start testing

Hopefully these Spock extensions might help your Spock integration tests. The framework is really easy and fun to use to build essential tests for the whole test stack.

Checkout my GitHub projects for the code for both extensions.

Hope this post has been helpful and hopefully I’ll post something sooner for my next post.

References and really helpful links

Distributed Hibernate Search with Spring and HornetQ

July 17, 2010 8 comments

In this post I’m going to look at distributing Hibernate Search using the Spring framework and the messaging system HornetQ developed by JBoss. Hibernate Search provides an abstraction layer above core Lucene Search and provides functionality to automatically perform updates to the Lucene index and database.

The Lucene index is made up of physical files the make up the index for a specific model / object. Most applications that operate on one server can interact directly with a single Lucene index with no performance issues. Due to the nature of today’s large distributed web applications, most will be housed within a cluster / cloud architecture environment so horizontal scaling can be achieved. For each web server within the cluster / cloud architecture, each will need access to the Lucene index to allow searching to be performed.

Hibernate Search allows for a number of ways to distribute the Lucene index for each server:

  • NTFS shares that each server maps to (fairly easy but performance can be an issue where locking of the Lucene index occurs when being written too).
  • RSync which can be used to distribute the index to each server
  • A JMS solution which allows for a Master / Slave configuration, this is what we’ll look at in more detail.

JMS Solution
The JMS solution basically has one master Lucene index and a copy of this index on each slave node. The slave nodes will be each application in cluster / cloud that require access to the Lucene index. Each slave node sends updates the Lucene index via JMS messages sent to a master queue. The master node processes each message from the master queue and updates the master Lucene index. Every x number of seconds, the master Lucene index is copied to a share which each slave node can pull the Lucene index updates therefore distributing the index.

Below is a diagram of what the architecture looks like.

The Hibernate Search Master Node
The master node handles processing all JMS messages sent from slave nodes that update the Lucene index. The master index application is a stand-alone application that is essentially a stripped down version of the web application that contains the Hibernate model objects that are indexed in the Lucene index. The master index application updates the master index when it processes JMS messages from slave nodes. After the specified refresh period, the master application will copy the changed Lucene index segment files from the master index to a master shared index that is shared between the master node and all slave nodes.

The Hibernate Search Slave nodes
The slave nodes will contain a local copy of the Search index which the applications will read from when performing search operations. When updates occur, Hibernate will handle updating the database and Hibernate Search will create a Lucene index update job and send this via JMS to the master node. After the specified refresh period, the slave application will copy the changed Lucene index segment files from the master shared index directory to get the latest Lucene index.

JMS Brokers
The master node and slave nodes contain an instance of a JMS broker. The current JMS Broker application used is HornetQ. The slave instances all have HornetQ Bridges configured to the master HornetQ instance. This configuration means that when slave applications attempt to send the JMS messages, if the master node JMS broker is down, the messages will queue on each slave node and will be delivered when the master JMS broker is back online.
Note: If the slave JMS broker goes offline, any updates that are performed via that nodes application will still be written to the database but the JMS message will be undelivered as no JMS broker will be available. Logging will indicate an exception when the JMS message is attempted to be sent and custom logging around the exception can add details of the message (Eg. Entity type, Database ID and Lucene Work Type – Deletion or Addition of documents).

How do I actually do all this?
So far we’ve looked at the way of distributing Hibernate Search with JMS in detail. But what is needed for this to be actually done?! Let’s start with looking at how the master index application is configured.

Master Index Application Code and Config
The master index application is a stand-alone application made up of a stripped down version of the web application. Its primary purpose is to process update messages for the Lucene index from slave nodes.

Master Index Application Spring Application Context
We need to define JMS connection details for the master application. We are using JBoss HornetQ as the messaging system (we’ll look at the config for this later on). JNDI can be used to configure the connection to the HornetQ queue.

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd">

	<!-- JNDI Template -->
	<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
	    <property name="environment">
	        <props>
	            <prop key="java.naming.factory.initial">
	                org.jnp.interfaces.NamingContextFactory
	            </prop>
	            <prop key="java.naming.provider.url">
	                 jnp://localhost:2099
	            </prop>
	            <prop key="java.naming.factory.url.pkgs">
	                org.jboss.naming:org.jnp.interfaces
	            </prop>
	        </props>
	    </property>
	</bean>
	
	<!-- JMS Connection Factory -->
	<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
      <property name="jndiTemplate" ref="jndiTemplate" />
      <property name="jndiName">
          <value>/ConnectionFactory</value>
      </property>
  	</bean>

	<bean id="userCredentialsConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
		<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
		<property name="username" value="user"/>
		<property name="password" value="pass"/>
	</bean>
 
  	<!--  JMS Queue -->
	<bean id="queue" class="org.hornetq.jms.client.HornetQQueue">
		<constructor-arg value="master-queue" />
	</bean>

	<!-- MDP Listener Container -->
	<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="userCredentialsConnectionFactory" />
		<property name="destination" ref="queue" />
		<property name="messageListener" ref="messageListener" />
	</bean>

	<!-- Message Driven POJO (MDP) for Hibernate Search Master Updates -->
	<bean id="messageListener" class="com.search.messaging.MDPHibernateSearch" />

</beans>

The JNDI Template bean is configured for connecting to HornetQ queues. The “java.naming.provider.url” is the URL of the HornetQ queue (refer to HornetQ config).

The JMS Connection Factory bean defines the JNDI Template and JNDI Name which is defined in the HornetQ config.

For extra security, I’ve also defined a username and password for the HornetQ queue. Spring requires a ‘UserCredentialsConnectionFactoryAdapter’ bean to be defined to contain this information. It is defined with username, password and the target connection factory bean already defined.

The queue bean is an instance of HornetQQueue. The constructor argument is the name of the queue that is defined in the HornetQ config.

The messageListener bean is a custom bean that is shown below.

package com.search.messaging;

import javax.jms.MessageListener;
import org.hibernate.Session;
import org.hibernate.search.backend.impl.jms.AbstractJMSHibernateSearchController;
import com.utils.hibernate.HibernateUtils;

public class MDPHibernateSearch extends AbstractJMSHibernateSearchController implements MessageListener{

    @Override
    protected void cleanSessionIfNeeded(Session arg0) {
        HibernateUtils.closeSession();
    }

    @Override
    protected Session getSession() {
        return HibernateUtils.currentSession();
    }
}

Hibernate Search requires an implementation of MessageListener which also needs to extend AbstractJMSHibernateSearchController to process messages received from slave nodes and update the master index.

As we are using Spring and not EJB’s, I’ve prefixed the class name with MDP to indicate this is a ‘Message Driven POJO’. The bean queueListenerContainer instance is Spring’s message listener container, which uses the previously defined connectionFactory, queue and messageListener beans.

Master Index Application Hibernate Config
Hibernate config also needs to have various properties set to be able to indicate if the application uses either the master or slave search settings. The master index application requires the following properties in the hiberate.cfg.xml:

<property name="hibernate.search.default.directory_provider">org.hibernate.search.store.FSMasterDirectoryProvider</property>
<property name="hibernate.search.default.refresh">60</property>
<property name="hibernate.search.default.indexBase">/var/search/base</property>
<property name="hibernate.search.default.sourceBase">/var/search/source</property>

hibernate.search.default.directory_provider
Defines the class that hibernate will use as the Directory Provider.

hibernate.search.default.refresh
The refresh time that hibernate will use before it will push the index updates to the master share.

hibernate.search.default.indexBase
The local directory where the master index is located.

hibernate.search.default.sourceBase
The share directory where the master index is located.

Master Node HornetQ Config
HornetQ ships with a number of great example use-case configurations. Take a look at the excellent documentation within the examples and the HornetQ website to get an idea of other uses of the messaging system.

hornetq-beans.xml

        <!-- JNDI server. Disable this if you don't want JNDI -->
        <bean name="JNDIServer" class="org.jnp.server.Main">
                <property name="namingInfo">
                        <inject bean="Naming" />
                </property>
                <property name="port">2099</property>
                <property name="bindAddress">localhost</property>
                <property name="rmiPort">2098</property>
                <property name="rmiBindAddress">localhost</property>
        </bean>

hornetq-configuration.xml

<configuration xmlns="urn:hornetq"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

        <!-- Connectors -->
        <connectors>
                <connector name="netty-connector">
                        <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
                        <param key="host" value="localhost" />
                        <param key="port" value="5446" />
                </connector>
        </connectors>

        <!-- Acceptors -->
        <acceptors>
                <acceptor name="netty-acceptor">
                        <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
                        <param key="host" value="localhost" />
                        <param key="port" value="5446" />
                </acceptor>
        </acceptors>

        <!--security settings -->
        <security-settings>
                <security-setting match="jms.queue.#">
                        <permission type="createDurableQueue" roles="user" />
                        <permission type="deleteDurableQueue" roles="user" />
                        <permission type="createTempQueue" roles="user" />
                        <permission type="deleteTempQueue" roles="user" />
                        <permission type="consume" roles="user" />
                        <permission type="send" roles="user" />
                </security-setting>
        </security-settings>

        <!-- cluster security settings -->
        <cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user>
        <cluster-password>clusterpass</cluster-password>

</configuration>

hornetq-jms.xml

<configuration xmlns="urn:hornetq"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

        <connection-factory name="ConnectionFactory">
                <connectors>
                        <connector-ref connector-name="netty-connector" />
                </connectors>
                <entries>
                        <entry name="/ConnectionFactory" />
                </entries>
        </connection-factory>

        <queue name="master-queue">
                <entry name="/queue/master-queue" />
        </queue>
</configuration>

hornetq-users.xml

<configuration xmlns="urn:hornetq"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">

        <!-- the default user. This is used where username is null-->
        <defaultuser name="guest" password="guest">
                <role name="guest" />
        </defaultuser>

        <!-- user and role definition -->
        <user name="user" password="pass">
                <role name="user" />
        </user>
</configuration>

Slave Node Application Code and Config
The slave nodes are all the applications that require access to a local copy of the master index. Slave nodes that perform updates to data will update the data storage (database in this case) and then fire off a message via JMS to the master index application queue which will make updates to the master index and subsequently push these to the slave nodes.

Slave Applcation Spring Application Context

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd">

	<!-- JNDI Template -->
	<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
	    <property name="environment">
	        <props>
	            <prop key="java.naming.factory.initial">
	                org.jnp.interfaces.NamingContextFactory
	            </prop>
	            <prop key="java.naming.provider.url">
	                jnp://localhost:1099
	            </prop>
	            <prop key="java.naming.factory.url.pkgs">
	                org.jboss.naming:org.jnp.interfaces
	            </prop>
	        </props>
	    </property>
	</bean>
	
	<!-- JMS Connection Factory -->
	<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
      <property name="jndiTemplate" ref="jndiTemplate" />
      <property name="jndiName">
          <value>/ConnectionFactory</value>
      </property>
  	</bean>
  	
  	<!--  JMS Queue -->
	<bean id="queue" class="org.hornetq.jms.client.HornetQQueue">
		<constructor-arg value="slave-queue" />
	</bean>
</beans>

Similar to the master application Spring application context, JNDI and JMS connection details are in the config. There are a couple of differences:

  • The JMS Queue points to a ‘slave-queue’. Each slave node has an instance of HornetQ running which has a ‘JMS bridge’ configured between the slave-queue and master-queue. This allows for the master node HornetQ instance to go offline and JMS messages to queue on each slave server until the master node is back online.
  • The slave node application context doesn’t specify a MDP (Message Driven POJO) as this application will not listen for any JMS messages on the ‘slave-queue’.

Slave Application Hibernate Config
The slave application Hibernate config requires the following properties in the hibernate.cfg.xml:

		<property name="hibernate.search.worker.backend">com.search.backend.impl.springjms.SpringJMSBackendQueueProcessorFactory</property>
		<property name="hibernate.search.default.directory_provider">org.hibernate.search.store.FSSlaveDirectoryProvider</property>
		<property name="hibernate.search.default.refresh">60</property> 
		<property name="hibernate.search.default.indexBase">/var/search/base</property>
		<property name="hibernate.search.default.sourceBase">/var/search/source</property>

hibernate.search.default.directory_provider
Defines the class that hibernate will use as the Directory Provider.

hibernate.search.default.refresh
The refresh time that hibernate will use before it will pull the index updates from the master share.

hibernate.search.default.indexBase
The local directory where the local index is located.

hibernate.search.default.sourceBase
The share directory where the master share index is located.

hibernate.search.worker.backend
Defines the backend worker class that hibernate search uses to make updates. By default it will write to a local index (as the master application does) but for slave applications, this will need to be configured to process updates via JMS. Hibernate search does come with a JMS backend processor but requires that JMS connection details be specified in the application server. In our case, we want to get these details from the Spring application context.

I have created a custom BackendQueueProcessor and BackendQueueProcessorFactory below:

package com.search.backend.impl.springjms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;

import org.hibernate.HibernateException;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.OptimizeLuceneWork;
import org.hibernate.search.util.LoggerFactory;
import org.slf4j.Logger;

public class SpringJMSBackendQueueProcessor implements Runnable {
	private static final Logger log = LoggerFactory.make();

	private List<LuceneWork> queue;
	private SpringJMSBackendQueueProcessorFactory factory;

	public SpringJMSBackendQueueProcessor(List<LuceneWork> queue, SpringJMSBackendQueueProcessorFactory jmsBackendQueueProcessorFactory) {
		this.queue = queue;
		this.factory = jmsBackendQueueProcessorFactory;
	}

	public void run() {
		List<LuceneWork> filteredQueue = new ArrayList<LuceneWork>(queue);
		for (LuceneWork work : queue) {
			if (work instanceof OptimizeLuceneWork) {
				// we don't want optimization to be propagated
				filteredQueue.remove(work);
			}
		}
		if (filteredQueue.size() == 0)
			return;
		QueueConnection cnn = null;
		QueueSender sender;
		QueueSession session;
		try {
			cnn = factory.getJMSFactory().createQueueConnection("user", "pass");
			session = cnn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
			ObjectMessage message = session.createObjectMessage();
			message.setObject((Serializable) filteredQueue);
			sender = session.createSender(factory.getJmsQueue());
			sender.send(message);
			session.close();
		} catch (JMSException e) {
			StringBuilder sb = new StringBuilder();
			sb.append("Unable to send Lucene index updates to local JMS queue: " + factory.getJmsQueueName() + ".\r");
			sb.append("Lucene Work Item affected by JMS Exception: \r");

			for (LuceneWork luceneWork : filteredQueue) {
				String id = luceneWork.getIdInString();
				String entityClass = luceneWork.getEntityClass().toString();
				String luceneWorkItem = luceneWork.toString();

				sb.append(" -ID: " + id);
				sb.append(" Entity: " + entityClass);
				sb.append(" Lucene Work: " + luceneWorkItem);
				sb.append("\r");
			}

			throw new HibernateException(sb.toString(), e);
		} finally {
			try {
				if (cnn != null)
					cnn.close();
			} catch (JMSException e) {
				log.warn("Unable to close JMS connection for " + factory.getJmsQueueName(), e);
			}
		}
	}
}
package com.search.backend.impl.springjms;

import java.util.List;
import java.util.Properties;

import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;

import org.hibernate.search.backend.BackendQueueProcessorFactory;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.core.io.ClassPathResource;

public class SpringJMSBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
	private String jmsQueueName;
	private Queue jmsQueue;
	private QueueConnectionFactory queueConnectionFactory;

	public void initialize(Properties props, SearchFactoryImplementor searchFactoryImplementor) {
	    ClassPathResource res = new ClassPathResource("slave-applicationContext.xml");
	    XmlBeanFactory factory = new XmlBeanFactory(res);
	    
	    queueConnectionFactory = (QueueConnectionFactory)factory.getBean("jmsConnectionFactory");
	    jmsQueue = (Queue)factory.getBean("queue");
	}

	public Runnable getProcessor(List<LuceneWork> queue) {
		return new SpringJMSBackendQueueProcessor( queue, this );
	}

	public QueueConnectionFactory getJMSFactory() {
		return queueConnectionFactory;
	}

	public Queue getJmsQueue() {
		return jmsQueue;
	}

	public String getJmsQueueName() {
		return jmsQueueName;
	}

	public void close() {
		// no need to release anything
	}
}

The factory variable is defined as a SpringJMSBackendQueueProcessorFactory object where Hibernate Search just defines a JMSBackendQueueProcessorFactory.
I also construct the QueueConnection with ‘user’ and ‘pass’ parameters as these are required for security.

Slave Node HornetQ Config

hornetq-beans.xml

	<bean name="JNDIServer" class="org.jnp.server.Main">
		<property name="namingInfo">
			<inject bean="Naming" />
		</property>
		<property name="port">1099</property>
		<property name="bindAddress">localhost</property>
		<property name="rmiPort">1098</property>
		<property name="rmiBindAddress">localhost</property>
	</bean>

hornetq-configuration.xml

<configuration xmlns="urn:hornetq"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

    <!-- Connectors -->
    <connectors>
        <connector name="netty-connector">
            <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
            <param key="port" value="5445" />
        </connector>

        <!-- Connector to the master node -->
        <connector name="remote-connector">
            <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
            <param key="host" value="localhost" />
            <param key="port" value="5446" />
        </connector>
    </connectors>

    <!-- Acceptors -->
    <acceptors>
        <acceptor name="netty-acceptor">
            <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
            <param key="port" value="5445" />
        </acceptor>
    </acceptors>

    <!--
        We need to create a core queue for the JMS queue explicitly because
        the bridge will be deployed before the JMS queue is deployed, so the
        first time, it otherwise won't find the queue
    -->
    <queues>
        <queue name="jms.queue.slave-queue">
            <address>jms.queue.slave-queue</address>
        </queue>
    </queues>

    <bridges>
        <bridge name="slave-master-bridge">
            <queue-name>jms.queue.slave-queue</queue-name>
            <forwarding-address>jms.queue.master-queue
            </forwarding-address>
            <reconnect-attempts>-1</reconnect-attempts>
            <connector-ref connector-name="remote-connector" />
        </bridge>
    </bridges>

    <security-settings>
        <!--security for example queue-->
        <security-setting match="jms.queue.#">
            <permission type="createDurableQueue" roles="user" />
            <permission type="deleteDurableQueue" roles="user" />
            <permission type="createTempQueue" roles="user" />
            <permission type="deleteTempQueue" roles="user" />
            <permission type="consume" roles="user" />
            <permission type="send" roles="user" />
        </security-setting>
    </security-settings>

    <cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user>
    <cluster-password>cluster-password</cluster-password>

</configuration>

hornetq-jms.xml

<configuration xmlns="urn:hornetq"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

    <connection-factory name="ConnectionFactory">
        <connectors>
            <connector-ref connector-name="netty-connector" />
        </connectors>
        <entries>
            <entry name="/ConnectionFactory" />
        </entries>
    </connection-factory>

    <queue name="slave-queue">
        <entry name="/queue/slave-queue" />
    </queue>
</configuration>

References

Follow

Get every new post delivered to your Inbox.

Join 85 other followers