Commit 15658dc4 authored by Matija Obreza's avatar Matija Obreza
Browse files

ElasticUpdaterProcessor uses @Scheduled annotation instead of own background thread

parent fb4b90f3
......@@ -11,29 +11,24 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.service.ElasticService;
import org.genesys2.server.service.worker.ElasticUpdater.ElasticNode;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IQueue;
/**
* Processor
*
* @author matijaobreza
* ES Processor component uses Spring's @Scheduled annotation to scan queues
* with 2000ms delay measured from the completion time of each preceding
* invocation.
*/
@Component
class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableBean {
class ElasticUpdaterProcessor {
public static final Log LOG = LogFactory.getLog(ElasticUpdaterProcessor.class);
private static final int BATCH_SIZE = 100;
private Thread worker;
private boolean running;
@Autowired
private ElasticService elasticService;
......@@ -48,78 +43,69 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
private HashMap<String, Set<Long>> buckets = new HashMap<String, Set<Long>>();
@Override
public void run() {
LOG.info("Started.");
// Set<String> updatedIndices = new HashSet<String>();
@Scheduled(fixedDelay = 2000)
public void processQueues() {
// First remove
{
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning ES remove queue");
}
while (running) {
try {
int i = 0;
ElasticNode toRemove = null;
do {
toRemove = elasticRemoveQueue.poll();
i++;
// First remove
{
int i = 0;
ElasticNode toRemove = null;
do {
toRemove = elasticRemoveQueue.poll();
i++;
if (toRemove != null) {
removeNode(toRemove);
}
if (toRemove != null) {
removeNode(toRemove);
}
if (LOG.isInfoEnabled() && (i % 100 == 0)) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " deleted=" + i);
}
if (LOG.isInfoEnabled() && (i % 100 == 0)) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " deleted=" + i);
}
} while (toRemove != null);
}
} while (running && toRemove != null);
}
// Then update
{
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning ES update queue");
}
// Then update
{
int i = 0;
StopWatch stopWatch=new StopWatch();
stopWatch.start();
ElasticNode toUpdate = null;
do {
toUpdate = elasticUpdateQueue.poll();
i++;
if (toUpdate != null) {
updateNode(toUpdate);
}
if (LOG.isInfoEnabled() && (i % 500 == 0)) {
LOG.info("Queue size update=" + elasticUpdateQueue.size() + " indexed=" + i + " rate=" + (1000.0 * i/(stopWatch.getTime())) + " records/s");
}
} while (running && toUpdate != null);
}
int i = 0;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ElasticNode toUpdate = null;
do {
toUpdate = elasticUpdateQueue.poll();
i++;
if (!buckets.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size());
}
if (toUpdate != null) {
updateNode(toUpdate);
}
for (String className : buckets.keySet()) {
Set<Long> bucket = buckets.get(className);
executeUpdate(className, bucket);
}
buckets.clear();
if (LOG.isInfoEnabled() && (i % 500 == 0)) {
LOG.info("Queue size update=" + elasticUpdateQueue.size() + " indexed=" + i + " rate=" + (1000.0 * i / (stopWatch.getTime()))
+ " records/s");
}
} catch (HazelcastInstanceNotActiveException e) {
LOG.warn("Hazelcast not active.");
} while (toUpdate != null);
}
if (!buckets.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size());
}
try {
// LOG.info("ES updater sleeping");
Thread.sleep(2000);
} catch (InterruptedException e) {
for (String className : buckets.keySet()) {
Set<Long> bucket = buckets.get(className);
executeUpdate(className, bucket);
}
buckets.clear();
}
LOG.info("Finished");
}
/**
......@@ -137,7 +123,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
if (LOG.isTraceEnabled()) {
LOG.trace("Reindexing " + className + " size=" + copy.size());
}
elasticService.updateAll(className, copy);
if (LOG.isTraceEnabled()) {
......@@ -169,19 +155,4 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
elasticService.remove(toRemove.getClassName(), toRemove.getId());
}
@Override
public void afterPropertiesSet() throws Exception {
this.running = true;
this.worker = new Thread(this, "es-processor");
this.worker.start();
}
@Override
public void destroy() throws Exception {
LOG.info("Stopping worker");
this.running = false;
this.worker.interrupt();
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment