Commit a78e9a0f authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

447: Implement configurable scheduler (OPENED)

Task-Url: #447
parent 87d2423c
Pipeline #31234 passed with stage
in 1 minute and 27 seconds
package eu.dariah.de.search.automation.schedule;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.Optional;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.support.CronExpression;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CronTrigger implements Trigger {
private final String triggerName;
private final CronExpression cronExpression;
@Getter private final boolean active;
public CronTrigger(String triggerName, String cronExpression) {
this.triggerName = triggerName;
this.cronExpression = this.parseCronExpression(cronExpression);
this.active = this.cronExpression!=null;
}
@Override
public Date nextExecutionTime(TriggerContext context) {
Optional<Date> lastCompletion = Optional.ofNullable(context.lastCompletionTime());
Optional<Date> nextExecution = Optional.ofNullable(this.getNextExecution());
this.logExecutionInfo(lastCompletion, nextExecution);
return nextExecution.orElse(null);
}
private CronExpression parseCronExpression(String expression) {
CronExpression cronEx = null;
if (expression!=null) {
try {
cronEx = CronExpression.parse(expression);
} catch (Exception e) {
log.error("Failed to parse cron expression", e);
}
}
return cronEx;
}
public Date getNextExecution() {
if (cronExpression==null) {
return null;
}
LocalDateTime nextExecutionLocalTime = cronExpression.next(LocalDateTime.now());
if (nextExecutionLocalTime==null) {
return null;
}
return Date.from(nextExecutionLocalTime.atZone(ZoneId.systemDefault()).toInstant());
}
private void logExecutionInfo(Optional<Date> lastCompletion, Optional<Date> nextExecution) {
StringBuilder infoBldr = new StringBuilder();
if (triggerName!=null) {
infoBldr.append(triggerName + ": ");
}
infoBldr.append("next execution ");
if (nextExecution.isPresent()) {
infoBldr.append(nextExecution.get().toString());
} else {
infoBldr.append("never");
}
if (lastCompletion.isPresent()) {
infoBldr.append(", last completion ").append(lastCompletion.get());
}
log.info(infoBldr.toString());
}
}
\ No newline at end of file
......@@ -6,6 +6,5 @@ import lombok.Data;
public class CrawlingAutomationConfigProperties {
private boolean online = false;
private boolean offline = false;
private int syncInterval = 300; // every 5 minutes
private String cronSchedule;
}
......@@ -22,7 +22,7 @@ import de.unibamberg.minf.core.web.pojo.MessagePojo;
import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.CollectionSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.pojo.CollectionPojo;
import eu.dariah.de.search.pojo.DatasetPojo;
import eu.dariah.de.search.pojo.DatasourcePojo;
......@@ -38,7 +38,7 @@ public class CollectionController extends BaseController {
@Autowired private CollectionService collectionService;
@Autowired private CollectionConverter collectionConverter;
@Autowired private CollectionSyncService crSyncService;
@Autowired private TimedCrawlManagerImpl crawlManager;
@Autowired private RunnableCrawlManagerImpl crawlManager;
@Autowired private CrawlingConfigProperties crawlingConfig;
......
......@@ -27,7 +27,7 @@ import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.DmeSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.config.MainConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.pojo.DatamodelPojo;
......@@ -43,7 +43,7 @@ public class DatamodelController extends BaseController {
@Autowired private DatamodelConverter datamodelConverter;
@Autowired private MappingGenerationService mappingGenerationService;
@Autowired private DmeSyncService dmeSyncService;
@Autowired private TimedCrawlManagerImpl crawlManager;
@Autowired private RunnableCrawlManagerImpl crawlManager;
@Autowired private MainConfigProperties config;
@Autowired private CrawlingConfigProperties crawlingConfig;
......
......@@ -3,7 +3,7 @@ package eu.dariah.de.search.crawling;
import org.joda.time.DateTime;
import org.springframework.beans.factory.DisposableBean;
public interface TimedCrawlManager extends CrawlManager, DisposableBean {
public interface RunnableCrawlManager extends CrawlManager, DisposableBean, Runnable {
public DateTime getLastSyncTimestamp();
public void synchronize();
......
......@@ -8,7 +8,6 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import org.joda.time.DateTime;
import org.joda.time.Days;
......@@ -29,25 +28,19 @@ import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.DatamodelService;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import eu.dariah.de.search.service.CrawlService.CrawlCompleteFlag;
import eu.dariah.de.search.service.CrawlService.CrawlErrorFlag;
import eu.dariah.de.search.service.CrawlService.CrawlOnlineFlag;
@Slf4j
public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCrawlManager {
public class RunnableCrawlManagerImpl extends CrawlManagerImpl implements RunnableCrawlManager, Runnable {
@Autowired private DatamodelService datamodelService;
@Autowired private AdminService adminService;
@Autowired private MappingGenerationService mappingGenerationService;
private Timer syncTimer = null;
private TimerTask timeoutTimerTask;
private ReentrantLock setupLock = new ReentrantLock();
private DateTime lastSyncTimestamp;
private boolean maintenanceMode;
/* To block endpoints from being processed in parallel
* list sizes should always be 1 unless manual crawl is triggered by user */
private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<String, List<String>>();
......@@ -55,58 +48,33 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
/* This is to keep track about indexed datamodels e.g. to determine, when outdated recrawl can happen */
private Map<String, List<String>> datamodelIdCrawlIdMap = new HashMap<String, List<String>>();
/* datamodels contained are blocked for further crawling until refresh has completed */
private Map<String, List<String>> outdatedDatamodelIdCrawlIdMap = new HashMap<String, List<String>>();
// Configuration properties
protected boolean debugging;
private boolean autocrawlOnline;
private boolean autocrawlOffline;
private int syncInterval;
private int timeout;
public boolean isDebugging() { return debugging; }
public void setDebugging(boolean debugging) { this.debugging = debugging; }
public boolean isAutocrawlOnline() { return autocrawlOnline; }
public void setAutocrawlOnline(boolean autocrawlOnline) { this.autocrawlOnline = autocrawlOnline; }
public boolean isAutocrawlOffline() { return autocrawlOffline; }
public void setAutocrawlOffline(boolean autocrawlOffline) { this.autocrawlOffline = autocrawlOffline; }
public int getSyncInterval() { return syncInterval; }
public void setSyncInterval(int syncInterval) { this.syncInterval = syncInterval; }
public int getTimeout() { return timeout; }
public void setTimeout(int timeout) { this.timeout = timeout; }
@Getter @Setter private boolean cronActive;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
@Getter @Setter private boolean autocrawlOnline;
@Getter @Setter private boolean autocrawlOffline;
@Getter @Setter private int timeout;
@Getter private DateTime lastSyncTimestamp;
@Getter private boolean maintenanceMode;
@Override public DateTime getLastSyncTimestamp() { return this.lastSyncTimestamp; }
public boolean isMaintenanceMode() { return maintenanceMode; }
@Override
public void run() {
this.synchronize();
}
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
if (!this.autocrawlOffline && !this.autocrawlOnline) {
log.info("CrawlManager initialized without reocurring synchronization due to configuration");
return;
}
if (this.autocrawlOnline || this.autocrawlOffline) {
if (!debugging && this.syncInterval < 60) {
this.syncInterval = 60;
log.warn("Sync interval increased to 60s");
}
if (!debugging && this.timeout < 30) {
this.timeout = 30;
log.warn("Crawl timeout increased to 30s (which is probably still too short)");
}
this.syncTimer = new Timer();
this.setupSyncTimer();
if (!debugging && this.timeout < 30) {
this.timeout = 30;
log.warn("Crawl manager timeout increased to 30s");
}
}
......@@ -383,7 +351,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
// For interrupting the crawl thread if necessary
Timer timeoutTimer = new Timer();
timeoutTimerTask = new TimeoutTimerTask(pipelineExecutor.submit(syncClient), pipelineExecutor);
TimerTask timeoutTimerTask = new TimeoutTimerTask(pipelineExecutor.submit(syncClient), pipelineExecutor);
timeoutTimer.schedule(timeoutTimerTask, this.timeout * 1000);
} catch (Exception e) {
log.error("An error occurred while executing synchronization", e);
......@@ -463,23 +431,4 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
}
}
private void setupSyncTimer() {
TimerTask syncTimerTask = new TimerTask() {
public void run() {
try {
setupLock.lock();
lastSyncTimestamp = DateTime.now();
setupLock.unlock();
synchronize();
} catch (Exception e) {
log.error("Failed to execute sync task", e);
}
}
};
syncTimer.scheduleAtFixedRate(syncTimerTask, this.getSyncInterval()*1000, this.getSyncInterval()*1000);
log.info(String.format("Scheduled synchronization every %s seconds", this.getSyncInterval()));
}
}
package eu.dariah.de.search.config;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronExpression;
import de.unibamberg.minf.gtf.GtfElementProcessor;
import de.unibamberg.minf.gtf.GtfMappingProcessor;
import de.unibamberg.minf.mapping.service.MappingExecutionServiceImpl;
......@@ -37,8 +26,9 @@ import de.unibamberg.minf.processing.service.tabular.CsvProcessingService;
import de.unibamberg.minf.processing.service.tabular.TsvProcessingService;
import de.unibamberg.minf.processing.service.text.TextProcessingService;
import de.unibamberg.minf.processing.service.xml.XmlProcessingService;
import eu.dariah.de.search.automation.schedule.CronTrigger;
import eu.dariah.de.search.config.nested.CrawlingAutomationConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.crawling.crawler.FileProcessor;
import eu.dariah.de.search.crawling.crawler.GitCrawlerImpl;
import eu.dariah.de.search.crawling.crawler.IndexCleaner;
......@@ -49,9 +39,7 @@ import eu.dariah.de.search.crawling.files.FileUnpacker;
import eu.dariah.de.search.crawling.files.XmlChunker;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@EqualsAndHashCode(callSuper=true)
@EnableScheduling
......@@ -62,7 +50,6 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
private int maxThreadsPerServiceType = 6;
private int apiAccessPolitenessSpanMs = 200;
@Bean
public Executor crawlingAutomationTaskExecutor() {
return Executors.newSingleThreadScheduledExecutor();
......@@ -70,29 +57,14 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
CronExpression cronExpression = CronExpression.parse(this.getAutomation().getCronSchedule());
CronTrigger crawlManagerTrigger = new CronTrigger("ScheduledCrawlManager", this.getAutomation().getCronSchedule());
RunnableCrawlManagerImpl crawlManager = this.crawlManager(null);
taskRegistrar.setScheduler(crawlingAutomationTaskExecutor());
taskRegistrar.addTriggerTask(new Runnable() {
@Override
public void run() {
log.debug("tick");
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext context) {
Optional<Date> lastCompletionTime = Optional.ofNullable(context.lastCompletionTime());
//Instant nextExecutionTime = lastCompletionTime.orElseGet(Date::new).toInstant().plusMillis(tickService.getDelay());
LocalDateTime nextExecutionTime = cronExpression.next(LocalDateTime.now());
log.debug("nextExecutionTime: {}", nextExecutionTime.toString());
log.debug("nextExecutionDate: {}", Date.from(nextExecutionTime.atZone(ZoneId.systemDefault()).toInstant()));
return Date.from(nextExecutionTime.atZone(ZoneId.systemDefault()).toInstant());
}
});
taskRegistrar.addTriggerTask(crawlManager, crawlManagerTrigger);
if (crawlManagerTrigger.isActive()) {
crawlManager.setCronActive(true);
crawlManager.setCronExpression(this.getAutomation().getCronSchedule());
}
}
......@@ -104,12 +76,11 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
}
@Bean
public TimedCrawlManagerImpl crawlManager(MainConfigProperties mainConfig) {
TimedCrawlManagerImpl crawlManager = new TimedCrawlManagerImpl();
public RunnableCrawlManagerImpl crawlManager(MainConfigProperties mainConfig) {
RunnableCrawlManagerImpl crawlManager = new RunnableCrawlManagerImpl();
crawlManager.setDebugging(this.isDebugging());
crawlManager.setAutocrawlOffline(this.getAutomation().isOffline());
crawlManager.setAutocrawlOnline(this.getAutomation().isOnline());
crawlManager.setSyncInterval(this.getAutomation().getSyncInterval());
crawlManager.setMaxPoolSize(this.getMaxThreads());
crawlManager.setBaseDownloadPath(mainConfig.getPaths().getDownloads());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment