Commit 2810b7ba authored by Viacheslav Pavlov's avatar Viacheslav Pavlov
Browse files

Merge branch '210-scheduled-kpi-runs-and-cleanup' into 'main'

Resolve "Scheduled KPI runs and cleanup"

Closes #210

See merge request grin-global/grin-global-server!265
parents 8d3877d5 93bbba1a
......@@ -126,4 +126,12 @@ public interface KPIService {
}
}
/**
* Remove runs that have the same observations and don't add much value.
*
* @param execution the execution
* @return
*/
long purgeExecutionRuns(Execution execution);
}
......@@ -26,6 +26,9 @@ import javax.persistence.Query;
import javax.persistence.TypedQuery;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
......@@ -626,6 +629,87 @@ public class KPIServiceImpl implements KPIService {
return lazyLoad(executionRunRepository.findById(runId).orElse(null));
}
@Override
@Transactional
@PreAuthorize("hasRole('ADMINISTRATOR')")
public long purgeExecutionRuns(Execution execution) {
Pageable pageable = PageRequest.of(0, 50);
LOG.info("Cleaning runs for id={} {}", execution.getId(), execution.getName());
LOG.debug("Loading runs p={} s={}", pageable.getPageNumber(), pageable.getPageSize());
Page<ExecutionRun> runs = executionRunRepository.findLast(execution, pageable);
ExecutionRun previousRun = null;
List<ExecutionRun> toDelete = new ArrayList<>();
while (runs.hasContent() && toDelete.size() < 20) {
for (ExecutionRun run : runs) {
if (previousRun == null) {
previousRun = run;
continue;
}
// We have a run to compare against
boolean keepThisRun = false;
ZonedDateTime runLocalDate = run.getTimestamp().toInstant().atZone(ZoneOffset.systemDefault());
ZonedDateTime previousRunLocalDate = previousRun.getTimestamp().toInstant().atZone(ZoneOffset.systemDefault());
ZonedDateTime nowDateTime = ZonedDateTime.now();
Duration periodToNow = Duration.between(runLocalDate, nowDateTime).abs();
LOG.trace("Inspecting run id={} date={} against previous={} on={}", run.getId(), run.getTimestamp(), previousRun.getId(), previousRun.getTimestamp());
if (periodToNow.toDays() < 10) {
// Younger than 10 days
keepThisRun = true;
LOG.debug("Keeping run younger than 10 days id={} timestamp={}", run.getId(), runLocalDate);
} else if (runLocalDate.getDayOfMonth() == 1) {
// Keep every 1st of the Month
keepThisRun = true;
LOG.debug("Keeping run on 1st of Month id={} timestamp={}", run.getId(), runLocalDate);
} else if (Duration.between(runLocalDate, previousRunLocalDate).abs().toDays() > 31) {
// Keep if gap to previous kept run is more than 31 days
keepThisRun = true;
LOG.debug("Keeping run with a big gap id={} timestamp={} days={}", run.getId(), runLocalDate, Duration.between(runLocalDate, previousRunLocalDate).abs().toDays());
} else if (! runs.hasNext() && runs.getContent().get(runs.getNumberOfElements() - 1).equals(run)) {
// It's the first ever run! Keep every 1st of the Month
keepThisRun = true;
LOG.debug("Keeping first ever run id={} timestamp={}", run.getId(), runLocalDate);
} else if (Math.abs(run.getTotalValue() / previousRun.getTotalValue()) > 1.10) {
// Keep because totals are 10% off
keepThisRun = true;
LOG.debug("Keeping run because totalValue diff>{} id={} timestamp={}", Math.abs(run.getTotalValue() / previousRun.getTotalValue()), run.getId(), runLocalDate);
} else {
// Don't keep
LOG.debug("Meh run id={} timestamp={}", run.getId(), runLocalDate);
}
if (!keepThisRun) {
LOG.info("Removing run id={} timestamp={}", run.getId(), runLocalDate);
toDelete.add(run);
if (toDelete.size() >= 20) {
LOG.debug("Many runs to delete!");
break;
}
} else {
// KEEP run!
previousRun = run;
}
}
// Load next page
pageable = pageable.next();
LOG.debug("Loading runs p={} s={}", pageable.getPageNumber(), pageable.getPageSize());
runs = executionRunRepository.findLast(execution, pageable);
}
LOG.info("Deleting {} runs", toDelete.size());
toDelete.forEach(deleteRun -> executionRunRepository.delete(deleteRun));
return toDelete.size();
}
private void copyValues(Execution target, Execution source) {
if (source.getExecutionDimensions() != null) {
target.getExecutionDimensions().clear();
......
/*
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.service.worker;
import java.util.List;
import java.util.concurrent.Callable;
import com.google.common.collect.Lists;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.apache.commons.lang3.time.StopWatch;
import org.gringlobal.model.kpi.Execution;
import org.gringlobal.persistence.kpi.ExecutionRepository;
import org.gringlobal.service.KPIService;
import org.gringlobal.service.UserService;
import org.gringlobal.spring.TransactionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* Component to periodically run all KPI Executions
*/
@Component
public class ScheduledKPIExecutor {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(ScheduledKPIExecutor.class);
/** The kpi service. */
@Autowired
private KPIService kpiService;
@Autowired
private UserService userService;
@Autowired
private ExecutionRepository executionRepository;
/**
* Run executions.
*
* @throws Exception the exception
*/
@Scheduled(cron = "0 0 3 * * *") // nightly at 3:00:am
// @Scheduled(cron = "0 0/1 * * * *") // Test every 1min
@SchedulerLock(name = "org.gringlobal.service.worker.ScheduledKPIExecutor")
public void runExecutions() throws Exception {
LOG.info("Started scheduled executions run");
StopWatch stopWatch = StopWatch.createStarted();
asAdmin(() -> {
Pageable pageable = PageRequest.of(0, 50);
Page<Execution> executions;
do {
executions = kpiService.listExecutions(pageable);
for (Execution execution : executions.getContent()) {
try {
LOG.info("Started execution {} after {}ms", execution.getName(), stopWatch.getTime());
kpiService.executeAndSave(execution);
LOG.info("Execution {} successful after {}ms", execution.getName(), stopWatch.getTime());
} catch (Throwable e) {
LOG.error("Error running KPI Execution {}: {}", execution.getName(), e.getMessage(), e);
}
}
// Load next page
pageable = pageable.next();
} while (executions.hasNext());
if (executions.getTotalElements() > 0) {
LOG.info("Run of {} executions ended successfully after {}ms.", executions.getTotalElements(), stopWatch.getTime());
}
return true;
});
}
@Transactional
@Scheduled(fixedDelayString = "PT2H", initialDelayString = "PT5M")
@SchedulerLock(name = "org.gringlobal.service.worker.ScheduledKPIExecutor")
public void removeOldRuns() throws Exception {
asAdmin(() -> {
LOG.warn("Scheduled removal of old runs across all executions");
executionRepository.findAll().forEach(execution -> {
LOG.warn("Scheduled removal of old runs of execution {} id={}", execution.getTitle(), execution.getId());
kpiService.purgeExecutionRuns(execution);
});
return true;
});
}
private <T> void asAdmin(Callable<T> callable) throws Exception {
UserDetails administrator = userService.loadUserByUsername("administrator");
List<GrantedAuthority> authorities = Lists.newArrayList(new SimpleGrantedAuthority("ROLE_ADMINISTRATOR"));
authorities.addAll(administrator.getAuthorities());
Authentication authentication = new UsernamePasswordAuthenticationToken(administrator, null, authorities);
TransactionHelper.asUser(authentication, callable);
}
}
......@@ -31,6 +31,7 @@ log4j.category.org.gringlobal=warn
#log4j.category.org.gringlobal.custom.elasticsearch=debug
#log4j.category.org.gringlobal.service.impl.ElasticsearchServiceImpl=debug
#log4j.category.org.gringlobal.compatibility.service.impl.SearchServiceImpl=debug
log4j.category.org.gringlobal.service.worker.ScheduledKPIExecutor=info
# Set to debug to log HTTP request info
log4j.category.org.springframework.web.filter.CommonsRequestLoggingFilter=debug
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment