Commit a27ea291 authored by Matija Obreza's avatar Matija Obreza

ES indices refresh automatically

parent b7276698
......@@ -56,7 +56,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
@Override
public void run() {
LOG.info("Started.");
Set<String> updatedIndices = new HashSet<String>();
// Set<String> updatedIndices = new HashSet<String>();
while (running) {
try {
......@@ -71,18 +71,15 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
if (toRemove != null) {
removeNode(toRemove);
updatedIndices.add(toRemove.getClassName());
// updatedIndices.add(toRemove.getClassName());
}
if (i % 100 == 0) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size());
if (!updatedIndices.isEmpty()) {
// for (String className : updatedIndices) {
// elasticService.refreshIndex(className);
// }
updatedIndices.clear();
}
// if (!updatedIndices.isEmpty()) {
// updatedIndices.clear();
// }
}
} while (running && toRemove != null);
......@@ -98,23 +95,33 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
if (toUpdate != null) {
updateNode(toUpdate);
updatedIndices.add(toUpdate.getClassName());
// updatedIndices.add(toUpdate.getClassName());
}
if (i % 100 == 0) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size());
if (!updatedIndices.isEmpty()) {
// for (String className : updatedIndices) {
// elasticService.refreshIndex(className);
// }
updatedIndices.clear();
}
// if (!updatedIndices.isEmpty()) {
// updatedIndices.clear();
// }
}
} while (running && toUpdate != null);
}
if (!buckets.isEmpty()) {
for (String className : buckets.keySet()) {
Set<Long> bucket = buckets.get(className);
elasticService.updateAll(className, bucket);
bucket.clear();
}
buckets.clear();
}
// if (!updatedIndices.isEmpty()) {
// updatedIndices.clear();
// }
} catch (HazelcastInstanceNotActiveException e) {
LOG.warn("Hazelcast not active.");
}
......@@ -123,20 +130,6 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
LOG.debug("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size());
}
for (String className : buckets.keySet()) {
Set<Long> bucket = buckets.get(className);
elasticService.updateAll(className, bucket);
bucket.clear();
}
buckets.clear();
if (!updatedIndices.isEmpty()) {
// for (String className : updatedIndices) {
// elasticService.refreshIndex(className);
// }
updatedIndices.clear();
}
try {
// LOG.info("ES updater sleeping");
Thread.sleep(2000);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment