Commit 793eb2c3 authored by Matija Obreza's avatar Matija Obreza

Reworked ES reindexing

- reindexing is delayed by 5s
- not using the distributed reindex queue, too much overhead
parent 8c2a5c9a
......@@ -46,7 +46,7 @@ import java.util.Set;
@Table(name = "partner")
@Cacheable
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "uuid")
@Document(indexName = "catalog")
@Document(indexName = "partner")
public class Partner extends UuidModel implements SelfCleaning, AclAwareModel {
private static final ClassAclOid<Partner> PARENT_OID = ClassAclOid.forClass(Partner.class);
......
......@@ -74,7 +74,7 @@ import com.fasterxml.jackson.annotation.JsonView;
@Table(name = "dataset")
@Cacheable
@Audited
@Document(indexName = "catalog")
@Document(indexName = "dataset")
public class Dataset extends UuidModel implements Publishable, SelfCleaning, PublishValidationInterface, AclAwareModel {
/** The Constant serialVersionUID. */
......
......@@ -46,7 +46,7 @@ import java.util.List;
@Table(name = "descriptor")
@Audited
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "uuid")
@Document(indexName = "catalog")
@Document(indexName = "descriptor")
public class Descriptor extends UuidModel implements SelfCleaning, Publishable, Copyable<Descriptor>, AclAwareModel {
/** The Constant serialVersionUID. */
......
......@@ -48,7 +48,7 @@ import java.util.Map;
@Audited
@Cacheable
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "uuid")
@Document(indexName = "catalog")
@Document(indexName = "descriptorlist")
public class DescriptorList extends UuidModel implements Publishable, SelfCleaning, AclAwareModel {
/** The Constant serialVersionUID. */
......
......@@ -44,7 +44,7 @@ import java.util.List;
@Entity
@Table(name = "vocabulary")
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "uuid")
@Document(indexName = "catalog")
@Document(indexName = "controlledvocabulary")
public class ControlledVocabulary extends UuidModel implements Publishable, SelfCleaning, AclAwareModel {
/** The Constant serialVersionUID. */
......
......@@ -46,7 +46,7 @@ import java.text.MessageFormat;
*/
@Entity
@Table(name = "term")
@Document(indexName = "catalog")
@Document(indexName = "vocabularyterm")
public class VocabularyTerm extends BasicModel implements SelfCleaning {
/** The Constant serialVersionUID. */
......
......@@ -29,7 +29,6 @@ import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.genesys.blocks.model.BasicModel;
import org.genesys2.server.service.ElasticsearchService;
import org.slf4j.Logger;
......@@ -89,8 +88,8 @@ public class ElasticJPAListener {
*
* @param joinPoint the join point
*/
@Before(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(..))")
public void beforeRemove(final JoinPoint joinPoint) {
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(..))")
public void afterRemove(final JoinPoint joinPoint) {
final Object[] args = joinPoint.getArgs();
try {
final Object removed = args[0];
......
......@@ -46,7 +46,7 @@ import org.springframework.data.elasticsearch.annotations.Document;
@Index(name = "IX_accession_lastModifiedDate", columnList = "lastModifiedDate"),
@Index(name = "IX_accession_createdDate", columnList = "createdDate")})
@Audited
@Document(indexName = "genesys")
@Document(indexName = "accession")
public class Accession extends AccessionData {
/**
*
......
......@@ -29,9 +29,11 @@ import javax.validation.constraints.Size;
import org.genesys.blocks.model.AuditedVersionedModel;
import org.hibernate.annotations.Type;
import org.springframework.data.elasticsearch.annotations.Document;
@Entity
@Table(name = "activitypost")
@Document(indexName = "activitypost")
public class ActivityPost extends AuditedVersionedModel {
private static final long serialVersionUID = 8690395020204070378L;
......
......@@ -32,9 +32,11 @@ import javax.validation.constraints.Size;
import org.genesys.blocks.model.AuditedVersionedModel;
import org.genesys.blocks.model.ClassPK;
import org.hibernate.annotations.Type;
import org.springframework.data.elasticsearch.annotations.Document;
@Entity
@Table(name = "article")
@Document(indexName = "article")
public class Article extends AuditedVersionedModel {
private static final long serialVersionUID = 8690395020204070378L;
......
......@@ -39,6 +39,7 @@ import org.genesys.blocks.model.BasicModel;
import org.genesys.blocks.model.JsonViews;
import org.genesys.custom.elasticsearch.IgnoreField;
import org.hibernate.annotations.Type;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
......@@ -50,6 +51,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
@Cacheable
@Entity
@Table(name = "country")
@Document(indexName = "country")
public class Country extends BasicModel {
private static final long serialVersionUID = -1688723909298769804L;
......
......@@ -51,6 +51,7 @@ import org.genesys2.server.model.GlobalVersionedAuditedModel;
import org.hibernate.annotations.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.annotations.Document;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonView;
......@@ -60,6 +61,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
@Cacheable
@Entity
@Table(name = "crop")
@Document(indexName = "crop")
public class Crop extends GlobalVersionedAuditedModel implements SelfCleaning, AclAwareModel {
private static final long serialVersionUID = -2686341831839109257L;
......
......@@ -38,6 +38,7 @@ import org.genesys.blocks.util.ClassAclOid;
import org.genesys.catalog.model.Partner;
import org.genesys.custom.elasticsearch.IgnoreField;
import org.genesys2.server.model.genesys.PDCIStatistics;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldIndex;
import org.springframework.data.elasticsearch.annotations.FieldType;
......@@ -51,6 +52,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
@Cacheable
@Entity
@Table(name = "faoinstitute", uniqueConstraints = @UniqueConstraint(columnNames = { "code" }), indexes = { @Index(columnList = "code", name = "code_FAOINSTITUTE") })
@Document(indexName = "faoinstitute")
public class FaoInstitute extends BasicModel implements GeoReferencedEntity, AclAwareModel, EntityId {
private static final ClassAclOid<FaoInstitute> DEFAULT_PARENT_OID = ClassAclOid.forClass(FaoInstitute.class);
......
......@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
......@@ -88,7 +89,7 @@ import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.elasticsearch.core.query.SourceFilter;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.data.querydsl.SimpleEntityPathResolver;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.transaction.annotation.Transactional;
......@@ -119,7 +120,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
protected static final SourceFilter DEFAULT_SOURCE_FILTER = new FetchSourceFilter(new String[] { "id", "_class", "title", "code", "description" }, new String[] {});
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
private TaskScheduler taskScheduler;
@Autowired
private EntityManager em;
......@@ -336,17 +337,14 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
ensureWriteAlias(clazz);
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
// LOG.debug("Reindexing {} size={}", clazz.getName(), copy.size());
try {
_self.update(clazz, copy);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
taskScheduler.schedule(() -> {
LOG.debug("Running scheduled reindex of {} size={}", clazz.getName(), copy.size());
try {
_self.update(clazz, copy);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
});
}, new Date(System.currentTimeMillis() + 5000));
}
/**
......@@ -424,10 +422,12 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
// }
em.clear();
queries.clear();
for (Long id : notFoundIds) {
LOG.debug("Removing from index {} {}", clazz.getName(), id);
elasticsearchTemplate.delete(indexName, COMMON_TYPE_NAME, String.valueOf(id));
LOG.trace("Removing {} id={} from index {}/{}", clazz.getName(), id, indexName, COMMON_TYPE_NAME);
String res = elasticsearchTemplate.delete(indexName, COMMON_TYPE_NAME, String.valueOf(id));
LOG.trace("Deleted ES document id={}", res);
}
}
......
......@@ -17,6 +17,8 @@
package org.genesys2.spring.config;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.elasticsearch.client.Client;
import org.genesys.blocks.model.JsonViews;
......@@ -58,7 +60,6 @@ import com.fasterxml.jackson.datatype.hibernate4.Hibernate4Module;
import com.fasterxml.jackson.datatype.hibernate4.Hibernate4Module.Feature;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IQueue;
/**
* Elasticsearch config.
......@@ -78,6 +79,14 @@ public class ElasticsearchConfig {
@Value("${elasticsearch.embedded.data}")
private String embeddedDataHome;
/**
* No-limits blocking queue for reindexing with ES
*/
@Bean
public BlockingQueue<ElasticReindex> elasticReindexQueue() {
return new LinkedBlockingQueue<>();
}
/**
* Elastic client.
*
......@@ -115,17 +124,6 @@ public class ElasticsearchConfig {
return lock;
}
/**
* Elastic reindex queue.
*
* @param hazelcast the hazelcast
* @return the i queue
*/
@Bean
public IQueue<ElasticReindex> elasticReindexQueue(final HazelcastInstance hazelcast) {
return hazelcast.getQueue("elasticsearchReindexQueue");
}
/**
* Elasticsearch service.
*
......
......@@ -53,7 +53,6 @@ import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ManagedContext;
import com.hazelcast.hibernate.HazelcastCacheRegionFactory;
import com.hazelcast.spring.cache.HazelcastCacheManager;
......@@ -388,28 +387,6 @@ public abstract class HazelcastConfig {
return x;
}
/**
* Elastic remove queue.
*
* @param hazelcast the hazelcast
* @return the i queue
*/
@Bean
public IQueue<Object> elasticRemoveQueue(final HazelcastInstance hazelcast) {
return hazelcast.getQueue("es-remove");
}
/**
* Elastic update queue.
*
* @param hazelcast the hazelcast
* @return the i queue
*/
@Bean
public IQueue<Object> elasticUpdateQueue(final HazelcastInstance hazelcast) {
return hazelcast.getQueue("es-update");
}
/**
* Distributed executor.
*
......
......@@ -16,28 +16,68 @@
package org.genesys2.spring.config;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
@EnableAsync
@EnableScheduling
@Configuration
public class SchedulerConfig {
public class SchedulerConfig implements SchedulingConfigurer, AsyncConfigurer {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerConfig.class);
@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskExecutor taskExecutor() {
final ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(4);
pool.setMaxPoolSize(8);
pool.setMaxPoolSize(16);
pool.setQueueCapacity(100);
pool.setThreadNamePrefix("genesys-background-");
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return pool;
}
@Bean(destroyMethod = "shutdown")
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("genesys-async-");
scheduler.setThreadFactory(taskExecutor());
return scheduler;
}
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
LOG.warn("Unhandled exception in async task {} params={}: {}", method, Arrays.toString(params), ex.getMessage());
}
};
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(taskScheduler());
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment