Commit 8c10b46f authored by Matija Obreza's avatar Matija Obreza

Regenerate ES index in batch mode

parent 6fafef63
......@@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.index.IndexRequest;
......@@ -75,9 +76,6 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
private static final String INDEXALIAS_PASSPORT_READ = "passport";
private static final String INDEXALIAS_PASSPORT_WRITE = "passportWrite";
private static final String INDEXALIAS_ARCHIVE_READ = "archive";
private static final String INDEXALIAS_ARCHIVE_WRITE = "archiveWrite";
@Autowired
private Client client;
......@@ -356,8 +354,10 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
queries.add(iq);
}
if (LOG.isInfoEnabled() && !queries.isEmpty()) {
LOG.info("Indexing " + className + " count=" + queries.size() + " of provided objects count=" + ids.size());
if (!queries.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Indexing " + className + " count=" + queries.size() + " of provided objects count=" + ids.size());
}
elasticsearchTemplate.bulkIndex(queries);
}
}
......@@ -419,13 +419,6 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
if (!aliasExists(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE)) {
realias(INDEXALIAS_PASSPORT_WRITE, "genesys");
}
if (!aliasExists(INDEXALIAS_ARCHIVE_READ)) {
realias(INDEXALIAS_ARCHIVE_READ, "genesysarchive");
}
if (!aliasExists(INDEXALIAS_ARCHIVE_WRITE)) {
realias(INDEXALIAS_ARCHIVE_WRITE, "genesysarchive");
}
}
/**
......@@ -451,16 +444,22 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
}
/**
* Create a new index based on current timestamp, reindex it, change alias
* to point to index
* Create new ES indexes
*/
@Override
public void regenerateIndexes() throws ElasticsearchException {
regeneratePassportIndex();
}
/**
* Create a new passport data index based on current timestamp, reindex it,
* change alias to point to index
*/
private void regeneratePassportIndex() throws ElasticsearchException {
try {
if (elasticsearchAdminLock.tryLock(10, TimeUnit.SECONDS)) {
long time = System.currentTimeMillis();
String passportIndexName = ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ + time;
// String archiveIndexName = "archive" + time;
Map<?, ?> indexMapping = elasticsearchTemplate.getMapping(AccessionDetails.class);
Map<?, ?> settings = elasticsearchTemplate.getSetting(AccessionDetails.class);
......@@ -468,22 +467,12 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
if (elasticsearchTemplate.indexExists(passportIndexName)) {
throw new ElasticsearchException("Index already exists with name " + passportIndexName);
}
// if (elasticsearchTemplate.indexExists(archiveIndexName)) {
// throw new ElasticsearchException("Index already exists with
// name " + passportIndexName);
// }
createIndex(passportIndexName, indexMapping, settings);
realias(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE, passportIndexName);
reindex(new AppliedFilters());
realias(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ, passportIndexName);
// createIndex(archiveIndexName, indexMapping, settings);
// realias(ElasticsearchSearchServiceImpl.INDEXALIAS_ARCHIVE_WRITE,
// passportIndexName);
// reindexArchive();
// realias(INDEXALIAS_ARCHIVE_READ, archiveIndexName);
} else {
throw new ElasticsearchException("Could not acquire elasticsearchAdminLock lock");
}
......@@ -610,18 +599,34 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
@Override
public void reindex(AppliedFilters filters) {
final List<Long> ids = new ArrayList<>(100);
genesysLowlevelRepository.listAccessionIds(filters, null, new RowCallbackHandler() {
private long count = 0;
@Override
public void processRow(ResultSet rs) throws SQLException {
long accessionId = rs.getLong(1);
count++;
add(rs.getLong(1));
}
elasticUpdater.update(Accession.class, accessionId);
private void add(long accessionId) {
if (ids.size() >= 100) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling reindex of batch " + (count / 100) + " size=" + ids.size());
}
elasticUpdater.updateAll(Accession.class, ids.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
ids.clear();
}
ids.add(accessionId);
}
});
if (ids.size() >= 100) {
// Final kick
elasticUpdater.updateAll(Accession.class, ids.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
}
LOG.info("Done.");
}
}
......@@ -51,6 +51,7 @@ public class ElasticUpdater {
@Resource
private IQueue<ElasticNode> elasticRemoveQueue;
@Resource
private IQueue<ElasticNode> elasticUpdateQueue;
......@@ -110,6 +111,15 @@ public class ElasticUpdater {
elasticRemoveQueue.removeAll(nodes);
// reinsert to end of queue
elasticUpdateQueue.removeAll(nodes);
while (elasticUpdateQueue.size()>5000) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
elasticUpdateQueue.addAll(nodes);
}
......
......@@ -6,6 +6,7 @@ import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.service.ElasticService;
......@@ -13,6 +14,7 @@ import org.genesys2.server.service.worker.ElasticUpdater.ElasticNode;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
......@@ -35,6 +37,9 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
@Autowired
private ElasticService elasticService;
@Autowired
private ThreadPoolTaskExecutor executor;
@Resource
private IQueue<ElasticNode> elasticRemoveQueue;
......@@ -63,8 +68,8 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
removeNode(toRemove);
}
if (i % 100 == 0) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size() + " deleted=" + i);
if (LOG.isInfoEnabled() && (i % 100 == 0)) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " deleted=" + i);
}
} while (running && toRemove != null);
......@@ -73,6 +78,9 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
// Then update
{
int i = 0;
StopWatch stopWatch=new StopWatch();
stopWatch.start();
ElasticNode toUpdate = null;
do {
toUpdate = elasticUpdateQueue.poll();
......@@ -82,8 +90,8 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
updateNode(toUpdate);
}
if (i % 100 == 0) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size() + " indexed=" + i);
if (LOG.isInfoEnabled() && (i % 500 == 0)) {
LOG.info("Queue size update=" + elasticUpdateQueue.size() + " indexed=" + i + " rate=" + (1000.0 * i/(stopWatch.getTime())) + " records/s");
}
} while (running && toUpdate != null);
......@@ -96,8 +104,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
for (String className : buckets.keySet()) {
Set<Long> bucket = buckets.get(className);
elasticService.updateAll(className, bucket);
bucket.clear();
executeUpdate(className, bucket);
}
buckets.clear();
}
......@@ -108,13 +115,38 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
try {
// LOG.info("ES updater sleeping");
Thread.sleep(5000);
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
LOG.info("Finished");
}
/**
* Schedule a parallel update
*
* @param className
* @param bucket
*/
private void executeUpdate(final String className, final Set<Long> bucket) {
final HashSet<Long> copy = new HashSet<>(bucket);
bucket.clear();
executor.execute(new Runnable() {
@Override
public void run() {
if (LOG.isTraceEnabled()) {
LOG.trace("Reindexing " + className + " size=" + copy.size());
}
elasticService.updateAll(className, copy);
if (LOG.isTraceEnabled()) {
LOG.trace("Reindexing done.");
}
}
});
}
private void updateNode(ElasticNode toUpdate) {
if (toUpdate == null)
return;
......@@ -127,8 +159,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
}
bucket.add(toUpdate.getId());
if (bucket.size() >= BATCH_SIZE) {
elasticService.updateAll(className, bucket);
bucket.clear();
executeUpdate(className, bucket);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment