Commit 37308a84 authored by Matija Obreza's avatar Matija Obreza

Use hazelcast for ES updates

parent 7137c510
......@@ -29,10 +29,10 @@ public interface ElasticService {
Page<AccessionDetails> search(String query, Pageable pageable) throws SearchException;
void remove(Class<?> clazz, long id);
void remove(String className, long id);
void update(Class<?> clazz, long id);
void update(String className, long id);
void refreshIndex(Class<?> clazz);
void refreshIndex(String className);
}
......@@ -52,11 +52,11 @@ public class ElasticsearchSearchServiceImpl implements ElasticService {
@Autowired
private CropService cropService;
private final Map<Class<?>, Class<?>> clazzMap;
private final Map<String, Class<?>> clazzMap;
public ElasticsearchSearchServiceImpl() {
clazzMap = new HashMap<Class<?>, Class<?>>();
clazzMap.put(Accession.class, AccessionDetails.class);
clazzMap = new HashMap<String, Class<?>>();
clazzMap.put(Accession.class.getName(), AccessionDetails.class);
}
@Override
......@@ -130,12 +130,12 @@ public class ElasticsearchSearchServiceImpl implements ElasticService {
}
@Override
public void update(Class<?> clazz, long id) {
if (!clazzMap.containsKey(clazz)) {
public void update(String className, long id) {
if (!clazzMap.containsKey(className)) {
return;
}
Object eo = toElasticObject(clazz, id);
Object eo = toElasticObject(className, id);
if (eo == null)
return;
......@@ -143,13 +143,13 @@ public class ElasticsearchSearchServiceImpl implements ElasticService {
iq.setId(String.valueOf(id));
iq.setObject(eo);
LOG.info("Indexing " + clazz + " id=" + id);
LOG.info("Indexing " + className + " id=" + id);
elasticsearchTemplate.index(iq);
}
@Override
public void remove(Class<?> clazz, long id) {
Class<?> clazz2 = clazzMap.get(clazz);
public void remove(String className, long id) {
Class<?> clazz2 = clazzMap.get(className);
if (clazz2 == null) {
return;
......@@ -158,18 +158,18 @@ public class ElasticsearchSearchServiceImpl implements ElasticService {
elasticsearchTemplate.delete(clazz2, String.valueOf(id));
}
private Object toElasticObject(Class<?> clazz, long id) {
if (clazz == Accession.class) {
private Object toElasticObject(String className, long id) {
if (Accession.class.getName().equals(className)) {
return genesysService.getAccessionDetails(id);
}
LOG.warn("Unsupported class " + clazz);
LOG.warn("Unsupported class " + className);
return null;
}
@Override
public void refreshIndex(Class<?> clazz) {
Class<?> clazz2 = clazzMap.get(clazz);
public void refreshIndex(String className) {
Class<?> clazz2 = clazzMap.get(className);
if (clazz2 == null) {
return;
......
......@@ -16,19 +16,21 @@
package org.genesys2.server.service.worker;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.service.GenesysService;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.stereotype.Component;
import com.hazelcast.core.IQueue;
/**
* Component that receives updated or deleted accession IDs and uses a
* background thread to refresh ES
......@@ -36,7 +38,7 @@ import org.springframework.stereotype.Component;
* @author matijaobreza
*/
@Component
public class ElasticUpdater implements DisposableBean, InitializingBean {
public class ElasticUpdater {
public static final Log LOG = LogFactory.getLog(ElasticUpdater.class);
@Autowired
......@@ -45,21 +47,10 @@ public class ElasticUpdater implements DisposableBean, InitializingBean {
@Autowired
private GenesysService genesysService;
@Autowired
private ElasticUpdaterProcessor processor;
private Thread workerThread;
private ConcurrentLinkedQueue<ElasticNode> toRemove;
private ConcurrentLinkedQueue<ElasticNode> toUpdate;
private Object lock = new Object();
private Object semaphore = new Object();
public ElasticUpdater() {
toRemove = new ConcurrentLinkedQueue<ElasticNode>();
toUpdate = new ConcurrentLinkedQueue<ElasticNode>();
}
@Resource
private IQueue<ElasticNode> elasticRemoveQueue;
@Resource
private IQueue<ElasticNode> elasticUpdateQueue;
/**
* Schedule objects for removal
......@@ -70,13 +61,8 @@ public class ElasticUpdater implements DisposableBean, InitializingBean {
public void remove(Class<?> clazz, Long id) {
LOG.info("Removing " + clazz + " " + id);
ElasticNode node = new ElasticNode(clazz, id);
synchronized (lock) {
toUpdate.remove(node);
toRemove.add(node);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
elasticUpdateQueue.remove(node);
elasticRemoveQueue.add(node);
}
/**
......@@ -90,13 +76,8 @@ public class ElasticUpdater implements DisposableBean, InitializingBean {
if (nodes.isEmpty())
return;
synchronized (lock) {
toUpdate.removeAll(nodes);
toRemove.addAll(nodes);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
elasticUpdateQueue.removeAll(nodes);
elasticRemoveQueue.addAll(nodes);
}
/**
......@@ -109,15 +90,10 @@ public class ElasticUpdater implements DisposableBean, InitializingBean {
LOG.info("Updating " + clazz + " " + id);
ElasticNode node = new ElasticNode(clazz, id);
synchronized (lock) {
toRemove.remove(node);
// reinsert all to end of queue
toUpdate.remove(node);
toUpdate.add(node);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
elasticRemoveQueue.remove(node);
// reinsert all to end of queue
elasticUpdateQueue.remove(node);
elasticUpdateQueue.add(node);
}
/**
......@@ -131,15 +107,10 @@ public class ElasticUpdater implements DisposableBean, InitializingBean {
if (nodes.isEmpty())
return;
synchronized (lock) {
toRemove.removeAll(nodes);
// reinsert all to end of queue
toUpdate.removeAll(nodes);
toUpdate.addAll(nodes);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
elasticRemoveQueue.removeAll(nodes);
// reinsert all to end of queue
elasticUpdateQueue.removeAll(nodes);
elasticUpdateQueue.addAll(nodes);
}
/**
......@@ -157,12 +128,12 @@ public class ElasticUpdater implements DisposableBean, InitializingBean {
return set;
}
public static class ElasticNode {
private final Class<?> clazz;
public static class ElasticNode implements Serializable {
private final String className;
private final long id;
public ElasticNode(final Class<?> clazz, final Long id) {
this.clazz = clazz;
this.className = clazz.getName();
this.id = id;
}
......@@ -170,52 +141,8 @@ public class ElasticUpdater implements DisposableBean, InitializingBean {
return id;
}
public Class<?> getClazz() {
return clazz;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((clazz == null) ? 0 : clazz.hashCode());
result = prime * result + (int) (id ^ (id >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ElasticNode other = (ElasticNode) obj;
if (clazz == null) {
if (other.clazz != null)
return false;
} else if (!clazz.equals(other.clazz))
return false;
if (id != other.id)
return false;
return true;
public String getClassName() {
return className;
}
}
@Override
public void destroy() throws Exception {
LOG.info("Shutting down processor");
processor.shutdown();
workerThread.join(2000);
}
@Override
public void afterPropertiesSet() throws Exception {
this.processor.init(lock, semaphore, toRemove, toUpdate);
LOG.info("Starting thread!");
workerThread = new Thread(processor, "genesys-ES-processor");
workerThread.setPriority(3);
workerThread.start();
}
}
......@@ -2,41 +2,43 @@ package org.genesys2.server.service.worker;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.service.ElasticService;
import org.genesys2.server.service.worker.ElasticUpdater.ElasticNode;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IQueue;
/**
* Processor
*
* @author matijaobreza
*/
@Component
class ElasticUpdaterProcessor implements Runnable {
class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableBean {
public static final Log LOG = LogFactory.getLog(ElasticUpdaterProcessor.class);
private Thread worker;
private boolean running;
@Autowired
private ElasticService elasticService;
private boolean running = true;
private Object semaphore;
private Object lock;
private ConcurrentLinkedQueue<ElasticNode> removeQueue;
private ConcurrentLinkedQueue<ElasticNode> updateQueue;
@Resource
private IQueue<ElasticNode> elasticRemoveQueue;
private long indexDelay = 5000;
@Resource
private IQueue<ElasticNode> elasticUpdateQueue;
public void init(Object lock, Object semaphore, ConcurrentLinkedQueue<ElasticNode> toRemove, ConcurrentLinkedQueue<ElasticNode> toUpdate) {
this.lock = lock;
this.semaphore = semaphore;
this.removeQueue = toRemove;
this.updateQueue = toUpdate;
}
private long indexDelay = 5000;
public void setIndexDelay(long indexDelay) {
this.indexDelay = indexDelay;
......@@ -49,50 +51,42 @@ class ElasticUpdaterProcessor implements Runnable {
@Override
public void run() {
LOG.info("Started.");
Set<Class<?>> updatedIndices = new HashSet<Class<?>>();
Set<String> updatedIndices = new HashSet<String>();
while (running) {
try {
LOG.info("Waiting for semaphore");
synchronized (semaphore) {
semaphore.wait();
}
if (!running)
return;
LOG.info("Semaphore triggered, doing some sleeepy time");
Thread.sleep(this.indexDelay);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
updatedIndices.clear();
ElasticNode toRemove = null, toUpdate = null;
do {
synchronized (lock) {
toRemove = removeQueue.peek();
toUpdate = updateQueue.peek();
ElasticNode toRemove = null, toUpdate = null;
do {
toRemove = elasticRemoveQueue.poll();
toUpdate = elasticUpdateQueue.poll();
if (toRemove != null) {
removeNode(toRemove);
updatedIndices.add(toRemove.getClassName());
}
if (toUpdate != null) {
updateNode(toUpdate);
updatedIndices.add(toUpdate.getClassName());
}
} while (running && (toRemove != null || toUpdate != null));
LOG.info("Updating indices");
for (String className : updatedIndices) {
elasticService.refreshIndex(className);
}
removeNode(toRemove);
updateNode(toUpdate);
if (toRemove != null) {
updatedIndices.add(toRemove.getClazz());
}
if (toUpdate != null) {
updatedIndices.add(toUpdate.getClazz());
updatedIndices.clear();
LOG.info("Done loop");
} catch (HazelcastInstanceNotActiveException e) {
try {
if (LOG.isDebugEnabled())
LOG.debug("Hazelcast not active.");
Thread.sleep(500);
} catch (InterruptedException e1) {
}
} while (running && (toRemove != null || toUpdate != null));
LOG.info("Updating indices");
for (Class<?> clazz : updatedIndices) {
elasticService.refreshIndex(clazz);
}
updatedIndices.clear();
LOG.info("Done loop");
}
}
......@@ -100,29 +94,27 @@ class ElasticUpdaterProcessor implements Runnable {
if (toUpdate == null)
return;
elasticService.update(toUpdate.getClazz(), toUpdate.getId());
synchronized (lock) {
updateQueue.remove(toUpdate);
}
elasticService.update(toUpdate.getClassName(), toUpdate.getId());
}
private void removeNode(ElasticNode toRemove) {
if (toRemove == null)
return;
elasticService.remove(toRemove.getClazz(), toRemove.getId());
synchronized (lock) {
removeQueue.remove(toRemove);
}
elasticService.remove(toRemove.getClassName(), toRemove.getId());
}
public void shutdown() {
@Override
public void afterPropertiesSet() throws Exception {
this.running = true;
this.worker = new Thread(this, "es-processor");
this.worker.start();
}
@Override
public void destroy() throws Exception {
this.running = false;
if (semaphore != null) {
synchronized (semaphore) {
semaphore.notifyAll();
}
}
this.worker.interrupt();
}
}
\ No newline at end of file
......@@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import com.hazelcast.config.MapConfig.EvictionPolicy;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ManagedContext;
import com.hazelcast.spring.context.SpringManagedContext;
......
......@@ -27,9 +27,8 @@ import org.springframework.context.annotation.Import;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ManagedContext;
import com.hazelcast.core.IQueue;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import com.hazelcast.spring.context.SpringManagedContext;
import com.hazelcast.web.WebFilter;
@Configuration
......@@ -47,7 +46,7 @@ public class SpringCacheConfig {
@Value("${base.cookie-http-only}")
private String cookieHttpOnly;
@Bean
public HazelcastCacheManager cacheManager(HazelcastInstance hazelcastInstance) {
HazelcastCacheManager cm = new HazelcastCacheManager(hazelcastInstance);
......@@ -60,6 +59,16 @@ public class SpringCacheConfig {
return x;
}
@Bean
public IQueue<Object> elasticRemoveQueue(HazelcastInstance hazelcast) {
return hazelcast.getQueue("es-remove");
}
@Bean
public IQueue<Object> elasticUpdateQueue(HazelcastInstance hazelcast) {
return hazelcast.getQueue("es-update");
}
@Bean
public IExecutorService distributedExecutor(HazelcastInstance hazelcast) {
IExecutorService executorService = hazelcast.getExecutorService("hazel-exec");
......
......@@ -5,8 +5,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
......@@ -25,7 +23,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
......@@ -35,7 +32,6 @@ import com.hazelcast.config.GroupConfig;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.QueueConfig;
import com.hazelcast.config.QueueStoreConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Cluster;
import com.hazelcast.core.Hazelcast;
......@@ -44,13 +40,11 @@ import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IMap;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ISemaphore;
import com.hazelcast.core.ManagedContext;
import com.hazelcast.core.Member;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import com.hazelcast.spring.context.SpringAware;
import com.hazelcast.spring.context.SpringManagedContext;
import com.hazelcast.util.ThreadUtil;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = HazelcastDistributedExecutorTest.TestConfig.class, initializers = PropertyPlacholderInitializer.class)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment