Commit 00412c53 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

420: Implement configurable crawls -> [GS: Repetitive Crawl Model]

(OPENED)

Task-Url: #420
parent 79cdd252
Pipeline #23358 failed with stage
in 9 seconds
......@@ -22,7 +22,7 @@ allprojects {
gtfVersion = "2.0.0-SNAPSHOT"
processingVersion = "4.1.0-SNAPSHOT"
colregModelVersion = "4.3.4-RELEASE"
dariahSpVersion = "2.1.4-SNAPSHOT"
dariahSpVersion = "2.1.6-RELEASE"
jsonAssertVersion = "1.5.0"
jodaTimeVersion = "2.10.10"
......
......@@ -101,12 +101,14 @@ public abstract class BaseSyncService<TModel extends Identifiable, TApi extends
@Override
public void destroy() throws Exception {
try {
syncExecutor.shutdown();
// Wait until all threads are finished
while (!syncExecutor.isTerminated()) {}
} catch (final Exception e) {
logger.error("Error closing sync executor", e);
if (syncExecutor!=null) {
try {
syncExecutor.shutdown();
// Wait until all threads are finished
while (!syncExecutor.isTerminated()) {}
} catch (final Exception e) {
logger.error("Error closing sync executor", e);
}
}
}
......
......@@ -20,21 +20,17 @@ public class CrawlHelper {
private CrawlHelper() {}
public static String renderAccessUrl(Endpoint ep) {
return renderAccessUrl(ep.getUrl(), ep.getParams(), null);
return renderAccessUrl(ep.getUrl(), ep.getParams(), null, null);
}
public static String renderAccessUrl(String url, List<EndpointParam> endpointParams, List<Resource> dynamicParams) {
public static String renderAccessUrl(String url, List<EndpointParam> endpointParams, List<Resource> dynamicParams, List<Resource> removeParams) {
if (url==null) {
return null;
}
try {
URIBuilder b = new URIBuilder(url);
if (endpointParams!=null) {
for (EndpointParam p : endpointParams) {
b.addParameter(p.getParam(), p.getValue());
}
}
b.addParameters(createNameValuePairs(dynamicParams));
b.addParameters(filterNameValuePairs(endpointParams, removeParams));
b.addParameters(createNameValuePairs(dynamicParams, removeParams));
return b.build().toString();
} catch (URISyntaxException e) {
log.error("Failed to build URL", e);
......@@ -42,12 +38,43 @@ public class CrawlHelper {
return null;
}
private static List<NameValuePair> createNameValuePairs(List<Resource> dynamicParams) {
private static List<NameValuePair> filterNameValuePairs(List<EndpointParam> endpointParams, List<Resource> removeParams) {
if (endpointParams==null) {
return new ArrayList<>();
}
List<NameValuePair> pairs = new ArrayList<>();
// Both structures are allowed: RemoveParam with value & RemoveParam with child 'name'
List<String> removeParamNames = new ArrayList<>();
if (removeParams!=null) {
for (Resource r : removeParams) {
if (r.getValue()!=null) {
removeParamNames.add(r.getValue().toString());
}
if (r.getChildResources()!=null && !r.getChildResources().isEmpty()) {
for (Resource name : ResourceHelper.findRecursive(r, "Name")) {
if (name.getValue()!=null) {
removeParamNames.add(name.getValue().toString());
}
}
}
}
}
for (EndpointParam p : endpointParams) {
if (!removeParamNames.contains(p.getParam())) {
pairs.add(new BasicNameValuePair(p.getParam(), p.getValue()));
}
}
return pairs;
}
private static List<NameValuePair> createNameValuePairs(List<Resource> dynamicParams, List<Resource> removeParams) {
if (dynamicParams==null) {
return new ArrayList<>();
}
List<NameValuePair> pairs = new ArrayList<>();
List<Resource> names;
List<Resource> names = null;
List<Resource> values;
for (Resource r : dynamicParams) {
names = ResourceHelper.findRecursive(r, "Name");
......
......@@ -75,8 +75,6 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
@Override
public void afterPropertiesSet() throws Exception {
pipelineExecutor = Executors.newFixedThreadPool(this.getMaxPoolSize());
logger.info("Initializing CrawlManager...checking registered crawlers and processors for accessiblity");
//this.checkProcessingClasses();
logger.info("CrawlManager initialized");
}
......@@ -219,25 +217,25 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
String access = null;
String file = null;
for (AccessMethods mAv : AccessMethods.values()) {
if (mAv.toString().toLowerCase().equals(ep.getAccessType().toLowerCase())) {
if (mAv.toString().equalsIgnoreCase(ep.getAccessType())) {
access = mAv.toString();
break;
}
}
for (FileTypes ftv : FileTypes.values()) {
if (ftv.toString().toLowerCase().equals(ep.getFileType().toLowerCase())) {
if (ftv.toString().equalsIgnoreCase(ep.getFileType())) {
file = ftv.toString();
break;
}
}
// Online but no access type detected
if (access==null && c.getBaseCrawlId()==null) {
logger.error(String.format("Unknown access type [%s]; cancelling crawl", ep.getAccessType()));
logger.error("Unknown access type [{}]; cancelling crawl", ep.getAccessType());
this.updateCrawl(c.getId(), ProcessingServiceStates.ERROR);
return null;
}
if (file==null) {
logger.error(String.format("Unknown file type method [%s]; cancelling crawl", ep.getFileType()));
logger.error("Unknown file type method [{}]; cancelling crawl", ep.getFileType());
this.updateCrawl(c.getId(), ProcessingServiceStates.ERROR);
return null;
}
......@@ -359,8 +357,8 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
private List<Crawler> getCrawlers(String method, Map<String, String> processingChain) throws ProcessingConfigException {
if (!processingChain.containsKey(method)) {
logger.error(String.format("No processing service implemented/configured for method [%s]", method.toString()));
throw new ProcessingConfigException(String.format("No processing service implemented/configured for method [%s]", method.toString()));
logger.error("No processing service implemented/configured for method [{}]", method);
throw new ProcessingConfigException(String.format("No processing service implemented/configured for method [%s]", method));
}
try {
String[] serviceRefs = processingChain.get(method).split(",");
......@@ -375,27 +373,4 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
}
}
/*private void checkProcessingClasses() {
if (this.onlineProcessingChains!=null) {
this.checkProcessingClasses(this.onlineProcessingChains);
}
if (this.offlineProcessingChains!=null) {
this.checkProcessingClasses(this.offlineProcessingChains);
}
}
private void checkProcessingClasses(Map<String, String> processingChains) {
for (String am : processingChains.keySet()) {
String[] serviceRefs = processingChains.get(am).split(",");
Class<?> crawlerClass;
for (int i=0; i<serviceRefs.length; i++) {
try {
crawlerClass = Class.forName(serviceRefs[i].trim());
Crawler.class.cast(appContext.getBean(crawlerClass));
} catch (Exception e) {
logger.warn(String.format("Could not access class or bean [%s] as configured for processing. This will result in errors when attempting to crawl data.", serviceRefs[i].trim()), e);
}
}
}
}*/
}
......@@ -6,9 +6,11 @@ import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
......@@ -44,42 +46,43 @@ import lombok.Setter;
public class RepetitiveFileCrawlerImpl extends FileDownloader implements ApplicationContextAware {
private ApplicationContext appContext;
@Autowired private MainConfigProperties mainConfig;
@Autowired protected DatamodelService datamodelService;
@Autowired private MappingService mappingService;
@Autowired private MappingExecutionService mappingExecutionService;
@Getter @Setter
Map<String, String> fileProcessingServiceMap;
@Getter @Setter private Map<String, String> fileProcessingServiceMap;
@Getter @Setter private int politenessTimespan = 1000; // 1 call per second (to same endpoint)
BaseResourceProcessingServiceImpl processingService;
CollectingResourceConsumptionServiceImpl sourceResCollector = null;
MappingExecGroup mExecGroup;
CollectingResourceConsumptionServiceImpl targetResCollector = null;
private Endpoint endpoint;
private Crawl crawl;
private ExtendedDatamodelContainer sourceDatamodel;
private ExtendedDatamodelContainer targetDatamodel;
private ExtendedMappingContainer mapping;
private Endpoint endpoint;
private Crawl crawl;
private BaseResourceProcessingServiceImpl processingService;
private MappingExecGroup mExecGroup;
private CollectingResourceConsumptionServiceImpl sourceResCollector = null;
private CollectingResourceConsumptionServiceImpl targetResCollector = null;
private ExtendedDatamodelContainer sourceDatamodelContainer;
private ExtendedDatamodelContainer targetDatamodelContainer;
List<String> processedUrls;
Queue<String> downloadUris;
private List<String> processedUrls;
private Queue<String> downloadUris;
Stopwatch swPoliteness = new Stopwatch();
@Override
public String getOutputFilename() {
return super.getOutputFilename() + "." + (processedUrls==null ? 0 : processedUrls.size());
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.appContext = applicationContext;
this.fileName = UUID.randomUUID().toString();
this.outputFilename = UUID.randomUUID().toString();
}
@Override
......@@ -94,26 +97,68 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
return;
}
this.initServices();
if (processingService==null || mExecGroup==null) {
return;
}
this.processedUrls = new ArrayList<>();
this.downloadUris = new LinkedList<>();
}
@Override
public void downloadFile() {
public void download() {
swPoliteness.start();
super.downloadFile();
super.download();
if (this.mExecGroup==null) {
logger.debug("Resumptive crawling not applicable -> crawl done");
return;
this.registerFinished();
} else {
this.reset();
this.processDownloadedFile();
this.collectDownloadUris();
this.setupAndDownloadNextFile();
}
}
long currentSize = 0;
long overallSize = 0;
protected void registerFinished() {
if (this.getListener() != null) {
this.getListener().finished(this.getUuid());
}
this.reset();
this.processDownloadedFile();
}
this.collectDownloadUris();
this.setupAndDownloadNextFile();
protected void registerError() {
if (this.getListener() != null) {
this.getListener().error(this.getUuid());
}
}
@Override
protected void updateFileSize(long size) {
currentSize = size;
overallSize += size;
if (this.getListener() != null) {
this.getListener().updateSize(this.getUuid(), overallSize);
}
}
@Override
protected void updateFileProcessed(long position) {
if (this.getListener() != null) {
this.getListener().processed(this.getUuid(), overallSize - currentSize + position);
}
}
@Override
protected void registerFileFinished() {
currentSize = 0;
}
@Override
protected void registerFileError() {}
private void initCrawlModels(ExtendedDatamodelContainer sc) {
mapping = null;
......@@ -127,25 +172,25 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
// Dedicated access model or datamodel?
if (endpoint.getAccessModelId()!=null) {
logger.debug("Dedicated access modell configured: {}", endpoint.getAccessModelId());
sourceDatamodelContainer = datamodelService.findById(endpoint.getAccessModelId());
if (sourceDatamodelContainer==null) {
sourceDatamodel = datamodelService.findById(endpoint.getAccessModelId());
if (sourceDatamodel==null) {
logger.warn("Dedicated access modell configured but not available (Sync with DME required?)");
return;
}
} else {
logger.debug("No dedicated access modell, using datamodel: {}", sc.getModel().getId());
sourceDatamodelContainer = sc;
sourceDatamodel = sc;
}
// Crawl model
targetDatamodelContainer = datamodelService.findById(mainConfig.getDatamodels().getCrawling());
if (targetDatamodelContainer==null) {
targetDatamodel = datamodelService.findById(mainConfig.getDatamodels().getCrawling());
if (targetDatamodel==null) {
logger.warn("Crawl modell configured but not available (Sync with DME required?)");
return;
}
// Mapping between source (access or data) model and target crawl model
mapping = mappingService.getMappingBySourceAndTarget(sourceDatamodelContainer.getModel().getId(), mainConfig.getDatamodels().getCrawling());
mapping = mappingService.getMappingBySourceAndTarget(sourceDatamodel.getModel().getId(), mainConfig.getDatamodels().getCrawling());
if (mapping==null) {
logger.info("No mapping to GS: Repetitive Crawl Model modeled; repetitive file crawling not configured");
}
......@@ -161,8 +206,8 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
}
try {
processingService = BaseResourceProcessingServiceImpl.class.cast(appContext.getBean(fileProcessingServiceMap.get(endpoint.getFileType())));
processingService.setSchema(sourceDatamodelContainer.getModel());
processingService.setRoot((Nonterminal)sourceDatamodelContainer.getOrRenderElementHierarchy());
processingService.setSchema(sourceDatamodel.getModel());
processingService.setRoot((Nonterminal)sourceDatamodel.getOrRenderElementHierarchy());
sourceResCollector = new CollectingResourceConsumptionServiceImpl();
processingService.addConsumptionService(sourceResCollector);
......@@ -173,10 +218,10 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
}
// Mapping execution
mExecGroup = this.buildMappingExecutionGroup(mapping, targetDatamodelContainer);
mExecGroup = this.buildMappingExecutionGroup(mapping, targetDatamodel);
targetResCollector = new CollectingResourceConsumptionServiceImpl();
mappingExecutionService.addConsumptionService(targetResCollector);
if (mExecGroup.getConcepts()==null || mExecGroup.getConcepts().isEmpty()) {
if (mExecGroup!=null && (mExecGroup.getConcepts()==null || mExecGroup.getConcepts().isEmpty())) {
mExecGroup = null;
}
}
......@@ -194,21 +239,8 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
if (c==null) {
continue;
}
if (c.getElementGrammarIdsMap()!=null) {
for (String elementid : c.getElementGrammarIdsMap().keySet()) {
String grammarId = c.getElementGrammarIdsMap().get(elementid);
Grammar g;
if (c.getElementGrammarIdsMap().get(elementid)!=null && mc.getGrammars()!=null &&
mc.getGrammars().containsKey(c.getElementGrammarIdsMap().get(elementid))) {
g = mc.getGrammars().get(c.getElementGrammarIdsMap().get(elementid));
} else {
g = new GrammarImpl(mc.getMapping().getId(), grammarId);
g.setId(grammarId);
g.setPassthrough(true);
}
exec.addGrammar(g);
}
}
exec.setGrammarsMap(this.collectGrammars(mc.getId(), c.getElementGrammarIdsMap(), mc.getGrammars()));
FunctionImpl f = new FunctionImpl(mc.getMapping().getId(), c.getFunctionId());
if (mc.getFunctions().containsKey(c.getFunctionId())) {
f.setFunction(mc.getFunctions().get(c.getFunctionId()));
......@@ -218,6 +250,26 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
return exec;
}
private Map<String, Grammar> collectGrammars(String mappingId, Map<String, String> elementGrammarIdsMap, Map<String, Grammar> grammarIdMap) {
if (elementGrammarIdsMap==null) {
return null;
}
Map<String, Grammar> result = new HashMap<>();
Grammar g;
for (Entry<String, String> e : elementGrammarIdsMap.entrySet()) {
if (elementGrammarIdsMap.get(e.getKey())!=null && grammarIdMap!=null &&
grammarIdMap.containsKey(elementGrammarIdsMap.get(e.getKey()))) {
g = grammarIdMap.get(elementGrammarIdsMap.get(e.getKey()));
} else {
g = new GrammarImpl(mappingId, e.getValue());
g.setId(e.getValue());
g.setPassthrough(true);
}
result.put(g.getId(), g);
}
return result;
}
private void reset() {
if (sourceResCollector != null) {
sourceResCollector.setResources(null);
......@@ -251,12 +303,14 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
}
private void collectDownloadUris() {
List<Resource> res;
List<Resource> resParam;
List<Resource> resRemoveParam;
String newUrl;
if (targetResCollector.getResources()!=null) {
for (Resource r : targetResCollector.getResources()) {
res = ResourceHelper.findRecursive(r, "GET.Param");
newUrl = CrawlHelper.renderAccessUrl(this.endpoint.getUrl(), this.endpoint.getParams(), res);
resParam = ResourceHelper.findRecursive(r, "GET.Param");
resRemoveParam = ResourceHelper.findRecursive(r, "GET.RemoveParam");
newUrl = CrawlHelper.renderAccessUrl(this.endpoint.getUrl(), this.endpoint.getParams(), resParam, resRemoveParam);
if (!processedUrls.contains(newUrl)) {
processedUrls.add(newUrl);
downloadUris.add(newUrl);
......@@ -268,6 +322,7 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
private void setupAndDownloadNextFile() {
if (downloadUris.isEmpty()) {
logger.debug("Download queue is empty -> file crawling is complete");
this.registerFinished();
return;
}
......@@ -275,6 +330,7 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
this.setupPaths(this.crawl);
} catch (MalformedURLException e) {
logger.error("Failed to setup paths for crawl", e);
this.registerError();
return;
}
......@@ -285,6 +341,7 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
this.inputURI = new URL(downloadUri).toURI();
} catch (MalformedURLException | URISyntaxException e) {
logger.error("Failed to setup and download next file URI", e);
this.registerError();
}
// If next inputURI is setup -> download, else continue with next download URI
......@@ -303,18 +360,16 @@ public class RepetitiveFileCrawlerImpl extends FileDownloader implements Applica
}
try {
Thread.sleep(sleepTs);
this.downloadFile();
this.download();
} catch (InterruptedException e) {
logger.error("Thead.sleep interrupted", e);
this.registerError();
Thread.currentThread().interrupt();
}
} else {
this.downloadFile();
this.download();
}
}
@Override
protected String getOutputFilename() {
return fileName + "." + (processedUrls==null ? 0 : processedUrls.size());
}
}
\ No newline at end of file
package eu.dariah.de.search.crawling.files;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
......@@ -10,47 +9,30 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import de.unibamberg.minf.processing.exception.ResourceProcessingException;
import eu.dariah.de.search.crawling.CrawlHelper;
import eu.dariah.de.search.crawling.crawler.Crawler;
import eu.dariah.de.search.model.Crawl;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import lombok.Getter;
import lombok.Setter;
public class FileDownloader extends BaseFileStreamCrawler implements Crawler {
private static final int CHUNK_SIZE = 1048576;
protected String fileName;
protected URI inputURI;
private int bufferSize = 1024;
private int chunkSize = 1048576;
protected boolean initialized = false;
@Getter @Setter protected URI inputURI;
@Getter protected String outputFilename;
@Getter protected boolean initialized = false;
public int getBufferSize() { return bufferSize; }
public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; }
@Override protected String getInputFilename() { return null; }
public int getChunkSize() { return chunkSize; }
public void setChunkSize(int chunkSize) { this.chunkSize = chunkSize; }
@Override protected String getInputFilename() { return null; }
@Override protected String getOutputFilename() {
return fileName;
}
@Override
public URI getInputURI() {
return inputURI;
}
@Override public boolean isInitialized() { return this.initialized; }
public void setInputURI(URI inputURI) { this.inputURI = inputURI; }
@Override
public String getUnitMessageCode() {
return "~eu.dariah.de.minfba.search.crawling.file.downloader.unit";
......@@ -66,7 +48,7 @@ public class FileDownloader extends BaseFileStreamCrawler implements Crawler {
super.init(endpoint, crawl, sc);
try {
this.setupPaths(crawl);