Commit 18177229 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

447: Implement configurable scheduler (OPENED)

Task-Url: #447
parent 9f880204
Pipeline #31240 passed with stage
in 3 minutes and 17 seconds
package eu.dariah.de.search.automation.crawling;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dariah.de.search.automation.schedule.ScheduledRunnable;
import eu.dariah.de.search.crawling.CrawlManager;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBean {
@Autowired CrawlManager crawlManager;
@Getter @Setter private boolean active;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
@Getter @Setter private int timeout;
@Getter private Date nextExecution;
public synchronized void setNextExecution(Date nextExecution) {
this.nextExecution = nextExecution;
}
@Getter private Date lastCompletion;
public synchronized void setLastCompletion(Date lastCompletion) {
this.lastCompletion = lastCompletion;
}
@Override
public void run() {
this.executeCrawling();
this.setLastCompletion(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()));
}
@Override
public void destroy() throws Exception {
try {
/*pipelineExecutor.shutdown();
// Wait until all threads are finished
while (!pipelineExecutor.isTerminated()) {}*/
} catch (final Exception e) {
log.error("Error closing sync executor", e);
}
}
protected abstract void executeCrawling();
}
package eu.dariah.de.search.automation.crawling;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@Data
@EqualsAndHashCode(callSuper=false)
@Slf4j
public class OfflineCrawlRunner extends BaseCrawlRunner {
@Override
protected void executeCrawling() {
}
}
package eu.dariah.de.search.automation.crawling;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@Data
@EqualsAndHashCode(callSuper=false)
@Slf4j
public class OnlineCrawlRunner extends BaseCrawlRunner {
@Override
protected void executeCrawling() {
}
}
......@@ -10,6 +10,7 @@ import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.support.CronExpression;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
......@@ -17,13 +18,15 @@ public class CronTrigger implements Trigger {
private final String triggerName;
private final CronExpression cronExpression;
@Getter private final ScheduledRunnable runnable;
@Getter private final boolean active;
public CronTrigger(String triggerName, String cronExpression) {
public CronTrigger(String triggerName, String cronExpression, ScheduledRunnable runnable) {
this.triggerName = triggerName;
this.cronExpression = this.parseCronExpression(cronExpression);
this.active = this.cronExpression!=null;
this.runnable = runnable;
this.setRunnableDates(getNextExecution(), null);
}
@Override
......@@ -32,10 +35,18 @@ public class CronTrigger implements Trigger {
Optional<Date> nextExecution = Optional.ofNullable(this.getNextExecution());
this.logExecutionInfo(lastCompletion, nextExecution);
this.setRunnableDates(nextExecution.orElse(null), lastCompletion.orElse(null));
return nextExecution.orElse(null);
}
private void setRunnableDates(Date nextExecution, Date lastCompletion) {
if (this.runnable!=null) {
this.runnable.setNextExecution(nextExecution);
this.runnable.setLastCompletion(lastCompletion);
}
}
private CronExpression parseCronExpression(String expression) {
CronExpression cronEx = null;
if (expression!=null) {
......
package eu.dariah.de.search.automation.schedule;
import java.util.Date;
public interface ScheduledRunnable extends Runnable {
public void setNextExecution(Date nextExecution);
public void setLastCompletion(Date lastCompletion);
}
package eu.dariah.de.search.config.nested;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
public class CrawlingAutomationConfigProperties {
private boolean online = false;
private boolean offline = false;
private String cronSchedule;
private String cronScheduleOnline;
private String cronScheduleOffline;
public void checkSchedules() {
if (cronSchedule!=null) {
if (cronScheduleOnline==null) {
cronScheduleOnline = cronSchedule;
log.debug("cron_schedule_online not explicitly configured: using cron_schedule parameter");
}
if (cronScheduleOffline==null) {
cronScheduleOffline = cronSchedule;
log.debug("cron_schedule_offline not explicitly configured: using cron_schedule parameter");
}
}
if (cronScheduleOnline!=null && cronScheduleOffline!=null) {
log.warn("cron_schedule_offline and cron_schedule_offline are explicitly configured: ignoring cron_schedule parameter");
}
}
}
......@@ -21,8 +21,7 @@ import de.unibamberg.minf.core.web.controller.DataTableList;
import de.unibamberg.minf.core.web.pojo.MessagePojo;
import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.CollectionSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.automation.crawling.OnlineCrawlRunner;
import eu.dariah.de.search.pojo.CollectionPojo;
import eu.dariah.de.search.pojo.DatasetPojo;
import eu.dariah.de.search.pojo.DatasourcePojo;
......@@ -38,9 +37,7 @@ public class CollectionController extends BaseController {
@Autowired private CollectionService collectionService;
@Autowired private CollectionConverter collectionConverter;
@Autowired private CollectionSyncService crSyncService;
@Autowired private RunnableCrawlManagerImpl crawlManager;
@Autowired private CrawlingConfigProperties crawlingConfig;
@Autowired private OnlineCrawlRunner crawlRunner;
public CollectionController() {
super("collections");
......@@ -55,9 +52,11 @@ public class CollectionController extends BaseController {
@RequestMapping(method=GET, value="/")
public String listDatasources(Model model, Locale locale) {
model.addAttribute("colregUrl", apiConfig.getColreg().getBaseUrl());
model.addAttribute("autoOnline", crawlingConfig.getAutomation().isOnline());
model.addAttribute("autoOffline", crawlingConfig.getAutomation().isOffline());
model.addAttribute("autoSyncCr", crSyncService.isAutosync());
model.addAttribute("autoOnline", crawlRunner.isActive());
model.addAttribute("cronExpression", crawlRunner.getCronExpression());
model.addAttribute("nextExecution", crawlRunner.getNextExecution());
model.addAttribute("lastCompletion", crawlRunner.getLastCompletion());
return "collections/list";
}
......@@ -93,7 +92,7 @@ public class CollectionController extends BaseController {
@RequestMapping(method=GET, value={"/async/triggerOnline"})
public @ResponseBody ModelActionPojo triggerOnline(Model model, Locale locale) {
crawlManager.enqueueNewAndOutdatedDatasets();
crawlRunner.run();
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
......
......@@ -25,9 +25,8 @@ import de.unibamberg.minf.core.web.controller.DataTableList;
import de.unibamberg.minf.core.web.pojo.MessagePojo;
import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.DmeSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.automation.crawling.OfflineCrawlRunner;
import eu.dariah.de.search.config.MainConfigProperties;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.pojo.DatamodelPojo;
......@@ -43,10 +42,9 @@ public class DatamodelController extends BaseController {
@Autowired private DatamodelConverter datamodelConverter;
@Autowired private MappingGenerationService mappingGenerationService;
@Autowired private DmeSyncService dmeSyncService;
@Autowired private RunnableCrawlManagerImpl crawlManager;
@Autowired private OfflineCrawlRunner crawlRunner;
@Autowired private MainConfigProperties config;
@Autowired private CrawlingConfigProperties crawlingConfig;
public DatamodelController() {
super("datamodels");
......@@ -61,10 +59,12 @@ public class DatamodelController extends BaseController {
@RequestMapping(method=GET, value="/")
public String listDatamodels(Model model, Locale locale) {
model.addAttribute("dmeUrl", apiConfig.getDme().getBaseUrl());
model.addAttribute("autoOnline", crawlingConfig.getAutomation().isOnline());
model.addAttribute("autoOffline", crawlingConfig.getAutomation().isOffline());
model.addAttribute("autoSyncDme", dmeSyncService.isAutosync());
model.addAttribute("autoOffline", crawlRunner.isActive());
model.addAttribute("cronExpression", crawlRunner.getCronExpression());
model.addAttribute("nextExecution", crawlRunner.getNextExecution());
model.addAttribute("lastCompletion", crawlRunner.getLastCompletion());
return "datamodels/list";
}
......@@ -123,7 +123,7 @@ public class DatamodelController extends BaseController {
@RequestMapping(method=GET, value={"/async/triggerOffline"})
public @ResponseBody ModelActionPojo triggerOffline(Model model, Locale locale) {
crawlManager.reindexOutdatedData();
crawlRunner.run();
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
......
......@@ -28,15 +28,15 @@ import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.DatamodelService;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import eu.dariah.de.search.service.CrawlService.CrawlCompleteFlag;
import eu.dariah.de.search.service.CrawlService.CrawlErrorFlag;
import eu.dariah.de.search.service.CrawlService.CrawlOnlineFlag;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RunnableCrawlManagerImpl extends CrawlManagerImpl implements RunnableCrawlManager, Runnable {
public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Runnable {
@Autowired private DatamodelService datamodelService;
@Autowired private AdminService adminService;
@Autowired private MappingGenerationService mappingGenerationService;
......@@ -52,7 +52,7 @@ public class RunnableCrawlManagerImpl extends CrawlManagerImpl implements Runnab
private Map<String, List<String>> outdatedDatamodelIdCrawlIdMap = new HashMap<String, List<String>>();
@Getter @Setter private boolean cronActive;
@Getter private boolean cronActive;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
......@@ -64,6 +64,21 @@ public class RunnableCrawlManagerImpl extends CrawlManagerImpl implements Runnab
@Getter private boolean maintenanceMode;
public void setCronActive(boolean cronActive) {
this.cronActive = cronActive;
if (!debugging) {
if (!this.cronActive && this.autocrawlOnline) {
this.autocrawlOnline = false;
log.warn("crawling.automation.online is configured as true, but no valid cronjob provided; overriding crawling.automation.online=false");
}
if (!this.cronActive && this.autocrawlOffline) {
this.autocrawlOffline = false;
log.warn("crawling.automation.offline is configured as true, but no valid cronjob provided; overriding crawling.automation.offline=false");
}
}
}
@Override
public void run() {
this.synchronize();
......@@ -71,14 +86,14 @@ public class RunnableCrawlManagerImpl extends CrawlManagerImpl implements Runnab
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
super.afterPropertiesSet();
if (!debugging && this.timeout < 30) {
this.timeout = 30;
log.warn("Crawl manager timeout increased to 30s");
}
}
@Override
public void synchronize() {
Stopwatch sw = new Stopwatch().start();
......@@ -359,7 +374,7 @@ public class RunnableCrawlManagerImpl extends CrawlManagerImpl implements Runnab
}
@Override
public void destroy() throws Exception {
try {
pipelineExecutor.shutdown();
......
package eu.dariah.de.search.crawling;
import org.joda.time.DateTime;
import org.springframework.beans.factory.DisposableBean;
public interface RunnableCrawlManager extends CrawlManager, DisposableBean, Runnable {
public DateTime getLastSyncTimestamp();
public void synchronize();
}
......@@ -26,9 +26,12 @@ import de.unibamberg.minf.processing.service.tabular.CsvProcessingService;
import de.unibamberg.minf.processing.service.tabular.TsvProcessingService;
import de.unibamberg.minf.processing.service.text.TextProcessingService;
import de.unibamberg.minf.processing.service.xml.XmlProcessingService;
import eu.dariah.de.search.automation.crawling.BaseCrawlRunner;
import eu.dariah.de.search.automation.crawling.OfflineCrawlRunner;
import eu.dariah.de.search.automation.crawling.OnlineCrawlRunner;
import eu.dariah.de.search.automation.schedule.CronTrigger;
import eu.dariah.de.search.config.nested.CrawlingAutomationConfigProperties;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.crawling.CrawlManagerImpl;
import eu.dariah.de.search.crawling.crawler.FileProcessor;
import eu.dariah.de.search.crawling.crawler.GitCrawlerImpl;
import eu.dariah.de.search.crawling.crawler.IndexCleaner;
......@@ -50,37 +53,60 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
private int maxThreadsPerServiceType = 6;
private int apiAccessPolitenessSpanMs = 200;
@Bean
public Executor crawlingAutomationTaskExecutor() {
return Executors.newSingleThreadScheduledExecutor();
}
@PostConstruct
public void completeConfiguration() {
if (this.getAutomation()==null) {
this.setAutomation(new CrawlingAutomationConfigProperties());
}
this.getAutomation().checkSchedules();
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
CronTrigger crawlManagerTrigger = new CronTrigger("ScheduledCrawlManager", this.getAutomation().getCronSchedule());
RunnableCrawlManagerImpl crawlManager = this.crawlManager(null);
this.configureRunnerTask(taskRegistrar, this.onlineCrawlRunner(), this.getAutomation().getCronScheduleOnline());
this.configureRunnerTask(taskRegistrar, this.offlineCrawlRunner(), this.getAutomation().getCronScheduleOffline());
}
private void configureRunnerTask(ScheduledTaskRegistrar taskRegistrar, BaseCrawlRunner runner, String cronExpression) {
CronTrigger runnerTrigger = new CronTrigger(runner.getClass().getSimpleName(), cronExpression, runner);
taskRegistrar.setScheduler(crawlingAutomationTaskExecutor());
taskRegistrar.addTriggerTask(crawlManager, crawlManagerTrigger);
if (crawlManagerTrigger.isActive()) {
crawlManager.setCronActive(true);
crawlManager.setCronExpression(this.getAutomation().getCronSchedule());
taskRegistrar.addTriggerTask(runner, runnerTrigger);
if (runnerTrigger.isActive()) {
runner.setActive(true);
runner.setCronExpression(cronExpression);
} else {
runner.setActive(false);
}
}
@Bean
public Executor crawlingAutomationTaskExecutor() {
return Executors.newSingleThreadScheduledExecutor();
}
@PostConstruct
public void completeConfiguration() {
if (this.getAutomation()==null) {
this.setAutomation(new CrawlingAutomationConfigProperties());
}
@Bean
public OfflineCrawlRunner offlineCrawlRunner() {
OfflineCrawlRunner runner = new OfflineCrawlRunner();
return runner;
}
@Bean
public OnlineCrawlRunner onlineCrawlRunner() {
OnlineCrawlRunner runner = new OnlineCrawlRunner();
return runner;
}
@Bean
public RunnableCrawlManagerImpl crawlManager(MainConfigProperties mainConfig) {
RunnableCrawlManagerImpl crawlManager = new RunnableCrawlManagerImpl();
crawlManager.setDebugging(this.isDebugging());
crawlManager.setAutocrawlOffline(this.getAutomation().isOffline());
crawlManager.setAutocrawlOnline(this.getAutomation().isOnline());
public CrawlManagerImpl crawlManager(MainConfigProperties mainConfig) {
CrawlManagerImpl crawlManager = new CrawlManagerImpl();
crawlManager.setMaxPoolSize(this.getMaxThreads());
crawlManager.setBaseDownloadPath(mainConfig.getPaths().getDownloads());
......
Subproject commit 0201f7065c7d782a49ab7faa01876f01aabf3c31
Subproject commit ec8ab37711c59b50419e8454ba24e3c561b63992
Subproject commit bc9348fa600b80863e88dfe43a7d570ec571865e
Subproject commit ab305f3c933dd419464e70e151acd1ad0a2bd2e2
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment