Commit 9f880204 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

Merge branch 'v4.4-crawling-automation' into 'v4.4-dev'

447: Implement configurable scheduler (OPENED)

See merge request !64
parents 8bed7ef9 a78e9a0f
Pipeline #31235 passed with stage
in 1 minute and 31 seconds
package eu.dariah.de.search.automation.schedule;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.Optional;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.support.CronExpression;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CronTrigger implements Trigger {
private final String triggerName;
private final CronExpression cronExpression;
@Getter private final boolean active;
public CronTrigger(String triggerName, String cronExpression) {
this.triggerName = triggerName;
this.cronExpression = this.parseCronExpression(cronExpression);
this.active = this.cronExpression!=null;
}
@Override
public Date nextExecutionTime(TriggerContext context) {
Optional<Date> lastCompletion = Optional.ofNullable(context.lastCompletionTime());
Optional<Date> nextExecution = Optional.ofNullable(this.getNextExecution());
this.logExecutionInfo(lastCompletion, nextExecution);
return nextExecution.orElse(null);
}
private CronExpression parseCronExpression(String expression) {
CronExpression cronEx = null;
if (expression!=null) {
try {
cronEx = CronExpression.parse(expression);
} catch (Exception e) {
log.error("Failed to parse cron expression", e);
}
}
return cronEx;
}
public Date getNextExecution() {
if (cronExpression==null) {
return null;
}
LocalDateTime nextExecutionLocalTime = cronExpression.next(LocalDateTime.now());
if (nextExecutionLocalTime==null) {
return null;
}
return Date.from(nextExecutionLocalTime.atZone(ZoneId.systemDefault()).toInstant());
}
private void logExecutionInfo(Optional<Date> lastCompletion, Optional<Date> nextExecution) {
StringBuilder infoBldr = new StringBuilder();
if (triggerName!=null) {
infoBldr.append(triggerName + ": ");
}
infoBldr.append("next execution ");
if (nextExecution.isPresent()) {
infoBldr.append(nextExecution.get().toString());
} else {
infoBldr.append("never");
}
if (lastCompletion.isPresent()) {
infoBldr.append(", last completion ").append(lastCompletion.get());
}
log.info(infoBldr.toString());
}
}
\ No newline at end of file
......@@ -6,5 +6,5 @@ import lombok.Data;
public class CrawlingAutomationConfigProperties {
private boolean online = false;
private boolean offline = false;
private int syncInterval = 300; // every 5 minutes
private String cronSchedule;
}
......@@ -22,7 +22,7 @@ import de.unibamberg.minf.core.web.pojo.MessagePojo;
import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.CollectionSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.pojo.CollectionPojo;
import eu.dariah.de.search.pojo.DatasetPojo;
import eu.dariah.de.search.pojo.DatasourcePojo;
......@@ -38,7 +38,7 @@ public class CollectionController extends BaseController {
@Autowired private CollectionService collectionService;
@Autowired private CollectionConverter collectionConverter;
@Autowired private CollectionSyncService crSyncService;
@Autowired private TimedCrawlManagerImpl crawlManager;
@Autowired private RunnableCrawlManagerImpl crawlManager;
@Autowired private CrawlingConfigProperties crawlingConfig;
......
......@@ -27,7 +27,7 @@ import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.DmeSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.config.MainConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.pojo.DatamodelPojo;
......@@ -43,7 +43,7 @@ public class DatamodelController extends BaseController {
@Autowired private DatamodelConverter datamodelConverter;
@Autowired private MappingGenerationService mappingGenerationService;
@Autowired private DmeSyncService dmeSyncService;
@Autowired private TimedCrawlManagerImpl crawlManager;
@Autowired private RunnableCrawlManagerImpl crawlManager;
@Autowired private MainConfigProperties config;
@Autowired private CrawlingConfigProperties crawlingConfig;
......
......@@ -3,7 +3,7 @@ package eu.dariah.de.search.crawling;
import org.joda.time.DateTime;
import org.springframework.beans.factory.DisposableBean;
public interface TimedCrawlManager extends CrawlManager, DisposableBean {
public interface RunnableCrawlManager extends CrawlManager, DisposableBean, Runnable {
public DateTime getLastSyncTimestamp();
public void synchronize();
......
......@@ -8,12 +8,11 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Minutes;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
......@@ -29,26 +28,19 @@ import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.DatamodelService;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import eu.dariah.de.search.service.CrawlService.CrawlCompleteFlag;
import eu.dariah.de.search.service.CrawlService.CrawlErrorFlag;
import eu.dariah.de.search.service.CrawlService.CrawlOnlineFlag;
public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCrawlManager {
protected final Logger logger = LoggerFactory.getLogger(TimedCrawlManagerImpl.class);
@Slf4j
public class RunnableCrawlManagerImpl extends CrawlManagerImpl implements RunnableCrawlManager, Runnable {
@Autowired private DatamodelService datamodelService;
@Autowired private AdminService adminService;
@Autowired private MappingGenerationService mappingGenerationService;
private Timer syncTimer = null;
private TimerTask timeoutTimerTask;
private ReentrantLock setupLock = new ReentrantLock();
private DateTime lastSyncTimestamp;
private boolean maintenanceMode;
/* To block endpoints from being processed in parallel
* list sizes should always be 1 unless manual crawl is triggered by user */
private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<String, List<String>>();
......@@ -56,58 +48,33 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
/* This is to keep track about indexed datamodels e.g. to determine, when outdated recrawl can happen */
private Map<String, List<String>> datamodelIdCrawlIdMap = new HashMap<String, List<String>>();
/* datamodels contained are blocked for further crawling until refresh has completed */
private Map<String, List<String>> outdatedDatamodelIdCrawlIdMap = new HashMap<String, List<String>>();
// Configuration properties
protected boolean debugging;
private boolean autocrawlOnline;
private boolean autocrawlOffline;
private int syncInterval;
private int timeout;
public boolean isDebugging() { return debugging; }
public void setDebugging(boolean debugging) { this.debugging = debugging; }
public boolean isAutocrawlOnline() { return autocrawlOnline; }
public void setAutocrawlOnline(boolean autocrawlOnline) { this.autocrawlOnline = autocrawlOnline; }
public boolean isAutocrawlOffline() { return autocrawlOffline; }
public void setAutocrawlOffline(boolean autocrawlOffline) { this.autocrawlOffline = autocrawlOffline; }
public int getSyncInterval() { return syncInterval; }
public void setSyncInterval(int syncInterval) { this.syncInterval = syncInterval; }
public int getTimeout() { return timeout; }
public void setTimeout(int timeout) { this.timeout = timeout; }
@Getter @Setter private boolean cronActive;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
@Getter @Setter private boolean autocrawlOnline;
@Getter @Setter private boolean autocrawlOffline;
@Getter @Setter private int timeout;
@Getter private DateTime lastSyncTimestamp;
@Getter private boolean maintenanceMode;
@Override public DateTime getLastSyncTimestamp() { return this.lastSyncTimestamp; }
public boolean isMaintenanceMode() { return maintenanceMode; }
@Override
public void run() {
this.synchronize();
}
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
if (!this.autocrawlOffline && !this.autocrawlOnline) {
logger.info("CrawlManager initialized without reocurring synchronization due to configuration");
return;
}
if (this.autocrawlOnline || this.autocrawlOffline) {
if (!debugging && this.syncInterval < 60) {
this.syncInterval = 60;
logger.warn("Sync interval increased to 60s");
}
if (!debugging && this.timeout < 30) {
this.timeout = 30;
logger.warn("Crawl timeout increased to 30s (which is probably still too short)");
}
this.syncTimer = new Timer();
this.setupSyncTimer();
if (!debugging && this.timeout < 30) {
this.timeout = 30;
log.warn("Crawl manager timeout increased to 30s");
}
}
......@@ -116,7 +83,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
Stopwatch sw = new Stopwatch().start();
if (debugging) {
logger.debug("Executing synchronization");
log.debug("Executing synchronization");
}
try {
statusMapslock.lock();
......@@ -131,10 +98,10 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
if (debugging) {
logger.debug("Synchronization completed in {}ms", sw.getElapsedTime());
log.debug("Synchronization completed in {}ms", sw.getElapsedTime());
}
} catch (Exception e) {
logger.error("Failed to execute crawl synchronization", e);
log.error("Failed to execute crawl synchronization", e);
} finally {
statusMapslock.unlock();
}
......@@ -156,6 +123,41 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl;
// TODO Also check last online crawl success and duration
for (Collection collection : collectionService.getAll()) {
boolean modified = false;
boolean hasUpdatePolicy = (collection.getUpdatePeriod()!=null && !collection.getUpdatePeriod().isEmpty());
for (Endpoint endpoint : collection.getEndpoints()) {
for (Dataset dataset : endpoint.getDatasets()) {
// Find last online crawl
lastOnlineCrawls = crawlService.findCrawls(endpoint.getId(), dataset.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Both, CrawlErrorFlag.Both, 1);
if (lastOnlineCrawls!=null && !lastOnlineCrawls.isEmpty()) {
lastOnlineCrawl = lastOnlineCrawls.get(0);
} else {
lastOnlineCrawl = null;
}
Days d = Days.daysBetween(lastOnlineCrawl.getModified(), syncTimestamp);
Minutes m = Minutes.minutesBetween(lastOnlineCrawl.getCreated(), lastOnlineCrawl.getModified());
log.debug("{}: hasUpdatePolicy: {}, lastOnlineSucceeded: {}, duration: {}, age: {}", collection.getName(null), hasUpdatePolicy, lastOnlineCrawl!=null && lastOnlineCrawl.isComplete() && !lastOnlineCrawl.isError(), m.getMinutes(), d.getDays());
}
}
}
}
public void enqueueNewAndOutdatedDatasetsOld() {
DateTime syncTimestamp = DateTime.now();
List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl;
for (Collection collection : collectionService.getAll()) {
boolean modified = false;
boolean onlineUpdates = (collection.getUpdatePeriod()!=null && !collection.getUpdatePeriod().isEmpty());
......@@ -188,7 +190,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
}
} catch (Exception e) {
logger.error("Failed to process dataset for autocrawling", e);
log.error("Failed to process dataset for autocrawling", e);
dataset.setError(true);
modified = true;
}
......@@ -203,19 +205,19 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
private boolean enqueueOnlineCrawlIfAvailable(Collection collection, Endpoint endpoint, Dataset dataset) {
if (outdatedDatamodelIdCrawlIdMap.containsKey(dataset.getId())) {
if (debugging) {
logger.debug("Cannot enqueue online crawl due to outdated datamodel [{}] being reprocessed", dataset.getId());
log.debug("Cannot enqueue online crawl due to outdated datamodel [{}] being reprocessed", dataset.getId());
}
return false;
}
if (endpointIdCrawlIdMap.containsKey(endpoint.getId())) {
if (debugging) {
logger.debug("Cannot enqueue online crawl due to endpoint [{}] already being busy", endpoint.getId());
log.debug("Cannot enqueue online crawl due to endpoint [{}] already being busy", endpoint.getId());
}
return false;
}
if (debugging) {
logger.debug("Enqueued online crawl for endpoint [{}] and dataset [{}]", endpoint.getId(), dataset.getId());
log.debug("Enqueued online crawl for endpoint [{}] and dataset [{}]", endpoint.getId(), dataset.getId());
}
this.performOnlineCrawl(collection, endpoint, datamodelService.findById(dataset.getId()));
......@@ -233,7 +235,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
if (baseCrawlId!=null) {
this.performOfflineCrawl(collection, endpoint, datamodel, baseCrawlId);
if (this.debugging) {
logger.debug("");
log.debug("");
}
count++;
}
......@@ -241,7 +243,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
}
}
logger.debug("Reindexing {} datasets of datamodel {}", count, datamodel.getId());
log.debug("Reindexing {} datasets of datamodel {}", count, datamodel.getId());
return count;
}
......@@ -261,7 +263,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
return true;
} catch (Exception e) {
logger.error(String.format("Failed to recreate index and mapping for model [%s]", datamodel.getId()));
log.error(String.format("Failed to recreate index and mapping for model [%s]", datamodel.getId()));
return false;
}
}
......@@ -307,7 +309,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
@Override
protected void enqueue(CrawlPipeline pipeline, Crawl crawl) {
if (pipeline==null || crawl==null) {
logger.warn("Nothing to enqueue - either pipeline or crawl are not set");
log.warn("Nothing to enqueue - either pipeline or crawl are not set");
return;
}
try {
......@@ -320,7 +322,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
pipelineExecutor.execute(pipeline);
} catch (Exception e) {
logger.error("Failed to setup processing pipeline", e);
log.error("Failed to setup processing pipeline", e);
this.error(pipeline.getUuid());
} finally {
statusMapslock.unlock();
......@@ -345,14 +347,14 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
pipelineExecutor = Executors.newFixedThreadPool(getMaxPoolSize());
}
logger.info(String.format("Running sychronization with client [%s], interrupt timeout set to %s", syncClient.getClass().getSimpleName(), this.getTimeout()));
log.info(String.format("Running sychronization with client [%s], interrupt timeout set to %s", syncClient.getClass().getSimpleName(), this.getTimeout()));
// For interrupting the crawl thread if necessary
Timer timeoutTimer = new Timer();
timeoutTimerTask = new TimeoutTimerTask(pipelineExecutor.submit(syncClient), pipelineExecutor);
TimerTask timeoutTimerTask = new TimeoutTimerTask(pipelineExecutor.submit(syncClient), pipelineExecutor);
timeoutTimer.schedule(timeoutTimerTask, this.timeout * 1000);
} catch (Exception e) {
logger.error("An error occurred while executing synchronization", e);
log.error("An error occurred while executing synchronization", e);
}
}
......@@ -364,7 +366,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
// Wait until all threads are finished
while (!pipelineExecutor.isTerminated()) {}
} catch (final Exception e) {
logger.error("Error closing sync executor", e);
log.error("Error closing sync executor", e);
}
}
......@@ -398,12 +400,12 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
private void cleanupCompleted(UUID serviceId) {
CrawlPipeline pipeline = this.serviceIdServiceMap.get(serviceId);
if (pipeline==null || pipeline.getCrawlId()==null) {
logger.error("Failed to resolve pipeline for given service ID. Hashed id maps are probably inconsistent.");
log.error("Failed to resolve pipeline for given service ID. Hashed id maps are probably inconsistent.");
return;
}
Crawl c = crawlService.findById(pipeline.getCrawlId());
if (c==null) {
logger.error("Failed to resolve crawl for given service ID. Hashed id maps are probably inconsistent.");
log.error("Failed to resolve crawl for given service ID. Hashed id maps are probably inconsistent.");
return;
}
......@@ -412,7 +414,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
this.removeCrawlId(this.datamodelIdCrawlIdMap, c.getDatamodelId(), c.getId());;
this.removeCrawlId(this.endpointIdCrawlIdMap, c.getEndpointId(), c.getId());
} catch (Exception e) {
logger.error("Failed to process completed service. Hashed id maps are probably inconsistent.");
log.error("Failed to process completed service. Hashed id maps are probably inconsistent.");
} finally {
statusMapslock.unlock();
}
......@@ -429,23 +431,4 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
}
}
private void setupSyncTimer() {
TimerTask syncTimerTask = new TimerTask() {
public void run() {
try {
setupLock.lock();
lastSyncTimestamp = DateTime.now();
setupLock.unlock();
synchronize();
} catch (Exception e) {
logger.error("Failed to execute sync task", e);
}
}
};
syncTimer.scheduleAtFixedRate(syncTimerTask, this.getSyncInterval()*1000, this.getSyncInterval()*1000);
logger.info(String.format("Scheduled synchronization every %s seconds", this.getSyncInterval()));
}
}
......@@ -15,7 +15,7 @@ public class BaseNamedEntityImpl extends BaseEntityImpl implements BaseNamedEnti
return names.get(locale);
}
// TODO: Better choose some default
if (!names.isEmpty()) {
if (names!=null && !names.isEmpty()) {
return names.get(names.keySet().iterator().next());
}
return null;
......
......@@ -4,14 +4,19 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import de.unibamberg.minf.gtf.GtfElementProcessor;
import de.unibamberg.minf.gtf.GtfMappingProcessor;
import de.unibamberg.minf.mapping.service.MappingExecutionServiceImpl;
......@@ -21,8 +26,9 @@ import de.unibamberg.minf.processing.service.tabular.CsvProcessingService;
import de.unibamberg.minf.processing.service.tabular.TsvProcessingService;
import de.unibamberg.minf.processing.service.text.TextProcessingService;
import de.unibamberg.minf.processing.service.xml.XmlProcessingService;
import eu.dariah.de.search.automation.schedule.CronTrigger;
import eu.dariah.de.search.config.nested.CrawlingAutomationConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.crawling.RunnableCrawlManagerImpl;
import eu.dariah.de.search.crawling.crawler.FileProcessor;
import eu.dariah.de.search.crawling.crawler.GitCrawlerImpl;
import eu.dariah.de.search.crawling.crawler.IndexCleaner;
......@@ -36,13 +42,32 @@ import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper=true)
@EnableScheduling
@Configuration
@ConfigurationProperties(prefix = "crawling")
public class CrawlingConfig extends CrawlingConfigProperties {
public class CrawlingConfig extends CrawlingConfigProperties implements SchedulingConfigurer {
private int maxThreadsPerServiceType = 6;
private int apiAccessPolitenessSpanMs = 200;
@Bean
public Executor crawlingAutomationTaskExecutor() {
return Executors.newSingleThreadScheduledExecutor();
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
CronTrigger crawlManagerTrigger = new CronTrigger("ScheduledCrawlManager", this.getAutomation().getCronSchedule());
RunnableCrawlManagerImpl crawlManager = this.crawlManager(null);
taskRegistrar.setScheduler(crawlingAutomationTaskExecutor());
taskRegistrar.addTriggerTask(crawlManager, crawlManagerTrigger);
if (crawlManagerTrigger.isActive()) {
crawlManager.setCronActive(true);
crawlManager.setCronExpression(this.getAutomation().getCronSchedule());
}
}
@PostConstruct
public void completeConfiguration() {
if (this.getAutomation()==null) {
......@@ -51,12 +76,11 @@ public class CrawlingConfig extends CrawlingConfigProperties {
}
@Bean
public TimedCrawlManagerImpl crawlManager(MainConfigProperties mainConfig) {
TimedCrawlManagerImpl crawlManager = new TimedCrawlManagerImpl();
public RunnableCrawlManagerImpl crawlManager(MainConfigProperties mainConfig) {
RunnableCrawlManagerImpl crawlManager = new RunnableCrawlManagerImpl();
crawlManager.setDebugging(this.isDebugging());
crawlManager.setAutocrawlOffline(this.getAutomation().isOffline());
crawlManager.setAutocrawlOnline(this.getAutomation().isOnline());
crawlManager.setSyncInterval(this.getAutomation().getSyncInterval());
crawlManager.setMaxPoolSize(this.getMaxThreads());
crawlManager.setBaseDownloadPath(mainConfig.getPaths().getDownloads());
......
package eu.dariah.de.search.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@Configuration
@EnableScheduling
@ConfigurationProperties(prefix = "scheduling")