In this post I’m going to look at distributing Hibernate Search using the Spring framework and the messaging system HornetQ developed by JBoss. Hibernate Search provides an abstraction layer above core Lucene Search and provides functionality to automatically perform updates to the Lucene index and database.
The Lucene index is made up of physical files the make up the index for a specific model / object. Most applications that operate on one server can interact directly with a single Lucene index with no performance issues. Due to the nature of today’s large distributed web applications, most will be housed within a cluster / cloud architecture environment so horizontal scaling can be achieved. For each web server within the cluster / cloud architecture, each will need access to the Lucene index to allow searching to be performed.
Hibernate Search allows for a number of ways to distribute the Lucene index for each server:
- NTFS shares that each server maps to (fairly easy but performance can be an issue where locking of the Lucene index occurs when being written too).
- RSync which can be used to distribute the index to each server
- A JMS solution which allows for a Master / Slave configuration, this is what we’ll look at in more detail.
JMS Solution
The JMS solution basically has one master Lucene index and a copy of this index on each slave node. The slave nodes will be each application in cluster / cloud that require access to the Lucene index. Each slave node sends updates the Lucene index via JMS messages sent to a master queue. The master node processes each message from the master queue and updates the master Lucene index. Every x number of seconds, the master Lucene index is copied to a share which each slave node can pull the Lucene index updates therefore distributing the index.
Below is a diagram of what the architecture looks like.
The Hibernate Search Master Node
The master node handles processing all JMS messages sent from slave nodes that update the Lucene index. The master index application is a stand-alone application that is essentially a stripped down version of the web application that contains the Hibernate model objects that are indexed in the Lucene index. The master index application updates the master index when it processes JMS messages from slave nodes. After the specified refresh period, the master application will copy the changed Lucene index segment files from the master index to a master shared index that is shared between the master node and all slave nodes.
The Hibernate Search Slave nodes
The slave nodes will contain a local copy of the Search index which the applications will read from when performing search operations. When updates occur, Hibernate will handle updating the database and Hibernate Search will create a Lucene index update job and send this via JMS to the master node. After the specified refresh period, the slave application will copy the changed Lucene index segment files from the master shared index directory to get the latest Lucene index.
JMS Brokers
The master node and slave nodes contain an instance of a JMS broker. The current JMS Broker application used is HornetQ. The slave instances all have HornetQ Bridges configured to the master HornetQ instance. This configuration means that when slave applications attempt to send the JMS messages, if the master node JMS broker is down, the messages will queue on each slave node and will be delivered when the master JMS broker is back online.
Note: If the slave JMS broker goes offline, any updates that are performed via that nodes application will still be written to the database but the JMS message will be undelivered as no JMS broker will be available. Logging will indicate an exception when the JMS message is attempted to be sent and custom logging around the exception can add details of the message (Eg. Entity type, Database ID and Lucene Work Type – Deletion or Addition of documents).
How do I actually do all this?
So far we’ve looked at the way of distributing Hibernate Search with JMS in detail. But what is needed for this to be actually done?! Let’s start with looking at how the master index application is configured.
Master Index Application Code and Config
The master index application is a stand-alone application made up of a stripped down version of the web application. Its primary purpose is to process update messages for the Lucene index from slave nodes.
Master Index Application Spring Application Context
We need to define JMS connection details for the master application. We are using JBoss HornetQ as the messaging system (we’ll look at the config for this later on). JNDI can be used to configure the connection to the HornetQ queue.
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- JNDI Template -->
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">
org.jnp.interfaces.NamingContextFactory
</prop>
<prop key="java.naming.provider.url">
jnp://localhost:2099
</prop>
<prop key="java.naming.factory.url.pkgs">
org.jboss.naming:org.jnp.interfaces
</prop>
</props>
</property>
</bean>
<!-- JMS Connection Factory -->
<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate" ref="jndiTemplate" />
<property name="jndiName">
<value>/ConnectionFactory</value>
</property>
</bean>
<bean id="userCredentialsConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
<property name="username" value="user"/>
<property name="password" value="pass"/>
</bean>
<!-- JMS Queue -->
<bean id="queue" class="org.hornetq.jms.client.HornetQQueue">
<constructor-arg value="master-queue" />
</bean>
<!-- MDP Listener Container -->
<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="userCredentialsConnectionFactory" />
<property name="destination" ref="queue" />
<property name="messageListener" ref="messageListener" />
</bean>
<!-- Message Driven POJO (MDP) for Hibernate Search Master Updates -->
<bean id="messageListener" class="com.search.messaging.MDPHibernateSearch" />
</beans>
The JNDI Template bean is configured for connecting to HornetQ queues. The “java.naming.provider.url” is the URL of the HornetQ queue (refer to HornetQ config).
The JMS Connection Factory bean defines the JNDI Template and JNDI Name which is defined in the HornetQ config.
For extra security, I’ve also defined a username and password for the HornetQ queue. Spring requires a ‘UserCredentialsConnectionFactoryAdapter’ bean to be defined to contain this information. It is defined with username, password and the target connection factory bean already defined.
The queue bean is an instance of HornetQQueue. The constructor argument is the name of the queue that is defined in the HornetQ config.
The messageListener bean is a custom bean that is shown below.
package com.search.messaging;
import javax.jms.MessageListener;
import org.hibernate.Session;
import org.hibernate.search.backend.impl.jms.AbstractJMSHibernateSearchController;
import com.utils.hibernate.HibernateUtils;
public class MDPHibernateSearch extends AbstractJMSHibernateSearchController implements MessageListener{
@Override
protected void cleanSessionIfNeeded(Session arg0) {
HibernateUtils.closeSession();
}
@Override
protected Session getSession() {
return HibernateUtils.currentSession();
}
}
Hibernate Search requires an implementation of MessageListener which also needs to extend AbstractJMSHibernateSearchController to process messages received from slave nodes and update the master index.
As we are using Spring and not EJB’s, I’ve prefixed the class name with MDP to indicate this is a ‘Message Driven POJO’. The bean queueListenerContainer instance is Spring’s message listener container, which uses the previously defined connectionFactory, queue and messageListener beans.
Master Index Application Hibernate Config
Hibernate config also needs to have various properties set to be able to indicate if the application uses either the master or slave search settings. The master index application requires the following properties in the hiberate.cfg.xml:
<property name="hibernate.search.default.directory_provider">org.hibernate.search.store.FSMasterDirectoryProvider</property> <property name="hibernate.search.default.refresh">60</property> <property name="hibernate.search.default.indexBase">/var/search/base</property> <property name="hibernate.search.default.sourceBase">/var/search/source</property>
hibernate.search.default.directory_provider
Defines the class that hibernate will use as the Directory Provider.
hibernate.search.default.refresh
The refresh time that hibernate will use before it will push the index updates to the master share.
hibernate.search.default.indexBase
The local directory where the master index is located.
hibernate.search.default.sourceBase
The share directory where the master index is located.
Master Node HornetQ Config
HornetQ ships with a number of great example use-case configurations. Take a look at the excellent documentation within the examples and the HornetQ website to get an idea of other uses of the messaging system.
hornetq-beans.xml
<!-- JNDI server. Disable this if you don't want JNDI -->
<bean name="JNDIServer" class="org.jnp.server.Main">
<property name="namingInfo">
<inject bean="Naming" />
</property>
<property name="port">2099</property>
<property name="bindAddress">localhost</property>
<property name="rmiPort">2098</property>
<property name="rmiBindAddress">localhost</property>
</bean>
hornetq-configuration.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<!-- Connectors -->
<connectors>
<connector name="netty-connector">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
<param key="host" value="localhost" />
<param key="port" value="5446" />
</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="localhost" />
<param key="port" value="5446" />
</acceptor>
</acceptors>
<!--security settings -->
<security-settings>
<security-setting match="jms.queue.#">
<permission type="createDurableQueue" roles="user" />
<permission type="deleteDurableQueue" roles="user" />
<permission type="createTempQueue" roles="user" />
<permission type="deleteTempQueue" roles="user" />
<permission type="consume" roles="user" />
<permission type="send" roles="user" />
</security-setting>
</security-settings>
<!-- cluster security settings -->
<cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>clusterpass</cluster-password>
</configuration>
hornetq-jms.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector" />
</connectors>
<entries>
<entry name="/ConnectionFactory" />
</entries>
</connection-factory>
<queue name="master-queue">
<entry name="/queue/master-queue" />
</queue>
</configuration>
hornetq-users.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
<!-- the default user. This is used where username is null-->
<defaultuser name="guest" password="guest">
<role name="guest" />
</defaultuser>
<!-- user and role definition -->
<user name="user" password="pass">
<role name="user" />
</user>
</configuration>
Slave Node Application Code and Config
The slave nodes are all the applications that require access to a local copy of the master index. Slave nodes that perform updates to data will update the data storage (database in this case) and then fire off a message via JMS to the master index application queue which will make updates to the master index and subsequently push these to the slave nodes.
Slave Applcation Spring Application Context
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- JNDI Template -->
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">
org.jnp.interfaces.NamingContextFactory
</prop>
<prop key="java.naming.provider.url">
jnp://localhost:1099
</prop>
<prop key="java.naming.factory.url.pkgs">
org.jboss.naming:org.jnp.interfaces
</prop>
</props>
</property>
</bean>
<!-- JMS Connection Factory -->
<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate" ref="jndiTemplate" />
<property name="jndiName">
<value>/ConnectionFactory</value>
</property>
</bean>
<!-- JMS Queue -->
<bean id="queue" class="org.hornetq.jms.client.HornetQQueue">
<constructor-arg value="slave-queue" />
</bean>
</beans>
Similar to the master application Spring application context, JNDI and JMS connection details are in the config. There are a couple of differences:
- The JMS Queue points to a ‘slave-queue’. Each slave node has an instance of HornetQ running which has a ‘JMS bridge’ configured between the slave-queue and master-queue. This allows for the master node HornetQ instance to go offline and JMS messages to queue on each slave server until the master node is back online.
- The slave node application context doesn’t specify a MDP (Message Driven POJO) as this application will not listen for any JMS messages on the ‘slave-queue’.
Slave Application Hibernate Config
The slave application Hibernate config requires the following properties in the hibernate.cfg.xml:
<property name="hibernate.search.worker.backend">com.search.backend.impl.springjms.SpringJMSBackendQueueProcessorFactory</property> <property name="hibernate.search.default.directory_provider">org.hibernate.search.store.FSSlaveDirectoryProvider</property> <property name="hibernate.search.default.refresh">60</property> <property name="hibernate.search.default.indexBase">/var/search/base</property> <property name="hibernate.search.default.sourceBase">/var/search/source</property>
hibernate.search.default.directory_provider
Defines the class that hibernate will use as the Directory Provider.
hibernate.search.default.refresh
The refresh time that hibernate will use before it will pull the index updates from the master share.
hibernate.search.default.indexBase
The local directory where the local index is located.
hibernate.search.default.sourceBase
The share directory where the master share index is located.
hibernate.search.worker.backend
Defines the backend worker class that hibernate search uses to make updates. By default it will write to a local index (as the master application does) but for slave applications, this will need to be configured to process updates via JMS. Hibernate search does come with a JMS backend processor but requires that JMS connection details be specified in the application server. In our case, we want to get these details from the Spring application context.
I have created a custom BackendQueueProcessor and BackendQueueProcessorFactory below:
package com.search.backend.impl.springjms;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import org.hibernate.HibernateException;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.OptimizeLuceneWork;
import org.hibernate.search.util.LoggerFactory;
import org.slf4j.Logger;
public class SpringJMSBackendQueueProcessor implements Runnable {
private static final Logger log = LoggerFactory.make();
private List<LuceneWork> queue;
private SpringJMSBackendQueueProcessorFactory factory;
public SpringJMSBackendQueueProcessor(List<LuceneWork> queue, SpringJMSBackendQueueProcessorFactory jmsBackendQueueProcessorFactory) {
this.queue = queue;
this.factory = jmsBackendQueueProcessorFactory;
}
public void run() {
List<LuceneWork> filteredQueue = new ArrayList<LuceneWork>(queue);
for (LuceneWork work : queue) {
if (work instanceof OptimizeLuceneWork) {
// we don't want optimization to be propagated
filteredQueue.remove(work);
}
}
if (filteredQueue.size() == 0)
return;
QueueConnection cnn = null;
QueueSender sender;
QueueSession session;
try {
cnn = factory.getJMSFactory().createQueueConnection("user", "pass");
session = cnn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
ObjectMessage message = session.createObjectMessage();
message.setObject((Serializable) filteredQueue);
sender = session.createSender(factory.getJmsQueue());
sender.send(message);
session.close();
} catch (JMSException e) {
StringBuilder sb = new StringBuilder();
sb.append("Unable to send Lucene index updates to local JMS queue: " + factory.getJmsQueueName() + ".\r");
sb.append("Lucene Work Item affected by JMS Exception: \r");
for (LuceneWork luceneWork : filteredQueue) {
String id = luceneWork.getIdInString();
String entityClass = luceneWork.getEntityClass().toString();
String luceneWorkItem = luceneWork.toString();
sb.append(" -ID: " + id);
sb.append(" Entity: " + entityClass);
sb.append(" Lucene Work: " + luceneWorkItem);
sb.append("\r");
}
throw new HibernateException(sb.toString(), e);
} finally {
try {
if (cnn != null)
cnn.close();
} catch (JMSException e) {
log.warn("Unable to close JMS connection for " + factory.getJmsQueueName(), e);
}
}
}
}
package com.search.backend.impl.springjms;
import java.util.List;
import java.util.Properties;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import org.hibernate.search.backend.BackendQueueProcessorFactory;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.core.io.ClassPathResource;
public class SpringJMSBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
private String jmsQueueName;
private Queue jmsQueue;
private QueueConnectionFactory queueConnectionFactory;
public void initialize(Properties props, SearchFactoryImplementor searchFactoryImplementor) {
ClassPathResource res = new ClassPathResource("slave-applicationContext.xml");
XmlBeanFactory factory = new XmlBeanFactory(res);
queueConnectionFactory = (QueueConnectionFactory)factory.getBean("jmsConnectionFactory");
jmsQueue = (Queue)factory.getBean("queue");
}
public Runnable getProcessor(List<LuceneWork> queue) {
return new SpringJMSBackendQueueProcessor( queue, this );
}
public QueueConnectionFactory getJMSFactory() {
return queueConnectionFactory;
}
public Queue getJmsQueue() {
return jmsQueue;
}
public String getJmsQueueName() {
return jmsQueueName;
}
public void close() {
// no need to release anything
}
}
The factory variable is defined as a SpringJMSBackendQueueProcessorFactory object where Hibernate Search just defines a JMSBackendQueueProcessorFactory.
I also construct the QueueConnection with ‘user’ and ‘pass’ parameters as these are required for security.
Slave Node HornetQ Config
hornetq-beans.xml
<bean name="JNDIServer" class="org.jnp.server.Main"> <property name="namingInfo"> <inject bean="Naming" /> </property> <property name="port">1099</property> <property name="bindAddress">localhost</property> <property name="rmiPort">1098</property> <property name="rmiBindAddress">localhost</property> </bean>
hornetq-configuration.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<!-- Connectors -->
<connectors>
<connector name="netty-connector">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
<param key="port" value="5445" />
</connector>
<!-- Connector to the master node -->
<connector name="remote-connector">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
<param key="host" value="localhost" />
<param key="port" value="5446" />
</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
<param key="port" value="5445" />
</acceptor>
</acceptors>
<!--
We need to create a core queue for the JMS queue explicitly because
the bridge will be deployed before the JMS queue is deployed, so the
first time, it otherwise won't find the queue
-->
<queues>
<queue name="jms.queue.slave-queue">
<address>jms.queue.slave-queue</address>
</queue>
</queues>
<bridges>
<bridge name="slave-master-bridge">
<queue-name>jms.queue.slave-queue</queue-name>
<forwarding-address>jms.queue.master-queue
</forwarding-address>
<reconnect-attempts>-1</reconnect-attempts>
<connector-ref connector-name="remote-connector" />
</bridge>
</bridges>
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.#">
<permission type="createDurableQueue" roles="user" />
<permission type="deleteDurableQueue" roles="user" />
<permission type="createTempQueue" roles="user" />
<permission type="deleteTempQueue" roles="user" />
<permission type="consume" roles="user" />
<permission type="send" roles="user" />
</security-setting>
</security-settings>
<cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>cluster-password</cluster-password>
</configuration>
hornetq-jms.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector" />
</connectors>
<entries>
<entry name="/ConnectionFactory" />
</entries>
</connection-factory>
<queue name="slave-queue">
<entry name="/queue/slave-queue" />
</queue>
</configuration>
References


Recent Comments