Commit 8135e574 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

Implement git capabilities of processing-adapters (#406)

parent 657ef77c
......@@ -383,10 +383,14 @@ public class CollectionSyncClient extends BaseApiClientImpl<CollectionApiPojo, E
}
private boolean endpointsAreSame(Endpoint ep1, Endpoint ep2) {
return ep1.getAccessType().equals(ep2.getAccessType()) &&
ep1.getFileType().equals(ep2.getFileType()) &&
ep1.getUrl().equals(ep2.getUrl()) &&
(ep1.getSet()==null && ep2.getSet()==null || ep1.getSet().equals(ep2.getSet()) );
try {
return ep1.getAccessType().equals(ep2.getAccessType()) &&
ep1.getFileType().equals(ep2.getFileType()) &&
ep1.getUrl().equals(ep2.getUrl()) &&
(ep1.getSet()==null && ep2.getSet()==null || ep1.getSet().equals(ep2.getSet()) );
} catch (Exception e) {
return false;
}
}
private boolean datasetsAreSame(Dataset ds1, Dataset ds2) {
......
......@@ -195,8 +195,9 @@ public class CollectionEditorController extends BaseController {
for (EndpointPojo ep : c.getEndpoints()) {
for (DatasetPojo ds : ep.getDatasetPojos()) {
esc = datamodelService.findById(ds.getId());
ds.setDocs(datamodelService.getDocumentCount(esc.getIndexName(), ep.getId()));
if (esc!=null) {
ds.setDocs(datamodelService.getDocumentCount(esc.getIndexName(), ep.getId()));
}
if (dmId==null || ds.getId().equals(dmId)) {
dmId = ds.getId();
model.addAttribute("selectedDsId", ds.getId());
......
......@@ -13,6 +13,7 @@ public interface CrawlManager extends ProcessingListener {
public CrawlState getCrawlState(String crawlId);
public Set<String> getSupportedOnlineAccessMethods();
public Set<String> getSupportedAccessTypes();
public Set<String> getSupportedFileTypes();
public void tryCancelCrawl(String crawlId);
}
package eu.dariah.de.search.crawling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
......@@ -55,19 +57,18 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
protected Map<UUID, CrawlPipeline> serviceIdServiceMap = new HashMap<UUID, CrawlPipeline>();
private int maxPoolSize;
private Map<String, String> offlineProcessingChains;
private Map<String, String> onlineProcessingChains;
private Map<String, String> accessChains;
private Map<String, String> fileProcessingChains;
public int getMaxPoolSize() { return maxPoolSize; }
public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; }
public Map<String, String> getOfflineProcessingChains() { return offlineProcessingChains; }
public void setOfflineProcessingChains(Map<String, String> offlineProcessingChains) { this.offlineProcessingChains = offlineProcessingChains; }
public Map<String, String> getOnlineProcessingChains() { return onlineProcessingChains; }
public void setOnlineProcessingChains(Map<String, String> onlineProcessingChains) { this.onlineProcessingChains = onlineProcessingChains; }
public Map<String, String> getAccessChains() { return accessChains; }
public void setAccessChains(Map<String, String> accessChains) { this.accessChains = accessChains; }
public Map<String, String> getFileProcessingChains() { return fileProcessingChains; }
public void setFileProcessingChains(Map<String, String> fileProcessingChains) { this.fileProcessingChains = fileProcessingChains; }
@Override
public void afterPropertiesSet() throws Exception {
......@@ -83,8 +84,13 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
}
@Override
public Set<String> getSupportedOnlineAccessMethods() {
return this.onlineProcessingChains.keySet();
public Set<String> getSupportedAccessTypes() {
return this.getAccessChains().keySet();
}
@Override
public Set<String> getSupportedFileTypes() {
return this.getFileProcessingChains().keySet();
}
@Override
......@@ -208,27 +214,34 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
}
private CrawlPipeline createPipeline(Endpoint ep, ExtendedDatamodelContainer sc, Crawl c) throws ProcessingConfigException, GenericProcessingException, IOException {
String m = null;
String access = null;
String file = null;
for (AccessMethods mAv : AccessMethods.values()) {
if (mAv.equalsName(ep.getAccessType())) {
m = mAv.toString();
access = mAv.toString();
break;
}
}
for (FileTypes ftv : FileTypes.values()) {
if (ftv.toString().equals(ep.getAccessType())) {
m = ftv.toString();
if (ftv.toString().equals(ep.getFileType())) {
file = ftv.toString();
break;
}
}
if (m==null) {
logger.error(String.format("Unknown access method [%s]; cancelling crawl", ep.getAccessType()));
// Online but no access type detected
if (access==null && c.getBaseCrawlId()==null) {
logger.error(String.format("Unknown access tyüe [%s]; cancelling crawl", ep.getAccessType()));
this.updateCrawl(c.getId(), ProcessingServiceStates.ERROR);
return null;
}
if (file==null) {
logger.error(String.format("Unknown file type method [%s]; cancelling crawl", ep.getFileType()));
this.updateCrawl(c.getId(), ProcessingServiceStates.ERROR);
return null;
}
CrawlingExecutionContext ctx = new CrawlingExecutionContext(this.baseDownloadPath, c);
Crawler[] crawlers = this.getCrawlers(m, c.getBaseCrawlId()==null);
List<Crawler> crawlers = this.getCrawlers(access, file, c.getBaseCrawlId()==null);
ResourceIndexingServiceImpl indexer;
for (Crawler crawler : crawlers) {
if (crawler instanceof Processor) {
......@@ -333,24 +346,25 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
return cId;
}
private Crawler[] getCrawlers(String method, boolean online) throws ProcessingConfigException {
private List<Crawler> getCrawlers(String accessType, String fileType, boolean online) throws ProcessingConfigException {
List<Crawler> chain = new ArrayList<>();
if (online) {
return this.getCrawlers(method, onlineProcessingChains);
} else {
return this.getCrawlers(method, offlineProcessingChains);
chain.addAll(this.getCrawlers(accessType, accessChains));
}
chain.addAll(this.getCrawlers(fileType, fileProcessingChains));
return chain;
}
private Crawler[] getCrawlers(String method, Map<String, String> processingChain) throws ProcessingConfigException {
private List<Crawler> getCrawlers(String method, Map<String, String> processingChain) throws ProcessingConfigException {
if (!processingChain.containsKey(method)) {
logger.error(String.format("No processing service implemented/configured for method [%s]", method.toString()));
throw new ProcessingConfigException(String.format("No processing service implemented/configured for method [%s]", method.toString()));
}
try {
String[] serviceRefs = processingChain.get(method).split(",");
Crawler[] result = new Crawler[serviceRefs.length];
List<Crawler> result = new ArrayList<>(serviceRefs.length);
for (int i=0; i<serviceRefs.length; i++) {
result[i] = (Crawler)appContext.getBean(serviceRefs[i].trim());
result.add((Crawler)appContext.getBean(serviceRefs[i].trim()));
}
return result;
} catch (Exception e) {
......
package eu.dariah.de.search.crawling;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -48,8 +49,8 @@ public class CrawlPipelineImpl implements CrawlPipeline {
@Override public boolean isCancellationRequested() { return cancellationRequested; }
public CrawlPipelineImpl(String crawlId, Crawler[] runnables) throws GenericProcessingException {
if (runnables==null || runnables.length==0) {
public CrawlPipelineImpl(String crawlId, List<Crawler> runnables) throws GenericProcessingException {
if (runnables==null || runnables.isEmpty()) {
throw new GenericProcessingException("Non-empty array of processing services required to instantiate a processing pipeline");
}
......
package eu.dariah.de.search.crawling.crawler;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joda.time.Period;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContextAware;
import de.unibamberg.minf.dme.model.datamodel.NonterminalImpl;
import de.unibamberg.minf.processing.consumption.ResourceConsumptionService;
import de.unibamberg.minf.processing.exception.ProcessingConfigException;
import de.unibamberg.minf.processing.service.MatchingFileCollector;
import de.unibamberg.minf.processing.service.ParallelFileProcessingService;
import eu.dariah.de.search.model.Crawl;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.CrawlService;
public class FileProcessor extends ParallelFileProcessingService implements Processor, ResourceConsumptionService {
public class FileProcessor extends ParallelFileProcessingService implements Processor, ResourceConsumptionService, InitializingBean {
@Autowired private CrawlService crawlService;
@Autowired private List<String> antiPatterns;
private boolean initialized = false;
......@@ -35,6 +47,12 @@ public class FileProcessor extends ParallelFileProcessingService implements Proc
return super.isInitialized() && initialized;
}
@SuppressWarnings("unchecked")
@Override
public void afterPropertiesSet() throws Exception {
this.antiPatterns = (List<String>)applicationContext.getBean("antiPatterns");
}
@Override
public void run() {
MDC.put("uid", crawlId);
......@@ -50,6 +68,7 @@ public class FileProcessor extends ParallelFileProcessingService implements Proc
this.setSchema(sc.getModel());
this.crawlId = crawl.getId();
// An original offline crawl
if (crawl.getBaseCrawlId()!=null) {
this.setPath(crawlService.getCrawlDirPath(crawl.getBaseCrawlId()));
......@@ -58,12 +77,17 @@ public class FileProcessor extends ParallelFileProcessingService implements Proc
else {
this.setPath(crawlService.getCrawlDirPath(crawl.getId()));
}
this.initialized = true;
try {
if (endpoint.getPatterns()!=null) {
this.setFileCollector(new MatchingFileCollector(Paths.get(this.getPath()), endpoint.getPatterns()));
this.getFileCollector().setAntiPatternStrings(antiPatterns);
}
super.init();
} catch (ProcessingConfigException e) {
logger.error("Failed to initialize XML processing", e);
} catch (ProcessingConfigException | IOException e) {
logger.error("Failed to initialize file processing", e);
}
}
}
package eu.dariah.de.search.crawling.crawler;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.slf4j.MDC;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import de.unibamberg.minf.dme.model.datamodel.natures.XmlDatamodelNature;
import de.unibamberg.minf.dme.model.datamodel.natures.xml.XmlTerminal;
import de.unibamberg.minf.processing.service.online.OaiPmhHarvestingService;
import eu.dariah.de.search.api.client.OaiPmhClient;
import eu.dariah.de.search.api.model.oaipmh.OaiPmhMetadataFormat;
import eu.dariah.de.search.api.model.oaipmh.OaiPmhResponseContainer;
import de.unibamberg.minf.processing.git.adapter.GitRepositoryAdapter;
import de.unibamberg.minf.processing.git.service.GitRepositoryProcessingService;
import eu.dariah.de.search.model.Crawl;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.CrawlService;
public class GitCrawlerImpl extends OaiPmhHarvestingService implements Crawler {
public class GitCrawlerImpl extends GitRepositoryProcessingService implements Crawler, ApplicationContextAware {
@Autowired private CrawlService crawlService;
@Autowired private OaiPmhClient oaiPmhClient;
private boolean initialized = false;
private String crawlId;
@Override
public String getUnitMessageCode() {
return "~eu.dariah.de.minfba.search.crawling.oai_crawling.unit";
return "~eu.dariah.de.minfba.search.crawling.git_crawling.unit";
}
@Override
public String getTitleMessageCode() {
return "~eu.dariah.de.minfba.search.crawling.oai_crawling.title";
return "~eu.dariah.de.minfba.search.crawling.git_crawling.title";
}
@Override
......@@ -40,74 +42,29 @@ public class GitCrawlerImpl extends OaiPmhHarvestingService implements Crawler {
@Override
public void run() {
MDC.put("uid", crawlId);
File cDir = new File(this.getCrawlDir());
if (cDir.exists()) {
FileUtils.deleteQuietly(cDir);
}
super.run();
}
@Override
public void init(Endpoint endpoint, Crawl crawl, ExtendedDatamodelContainer sc) {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.setAdapter(applicationContext.getBean(GitRepositoryAdapter.class));
}
@Override
public void init(Endpoint endpoint, Crawl crawl, ExtendedDatamodelContainer sc) {
this.setUrl(endpoint.getUrl());
this.setSet(endpoint.getSet());
this.setBranch(endpoint.getSet());
this.crawlId = crawl.getId();
if (crawl.getPrefix()==null || crawl.getPrefix().trim().isEmpty()) {
String prefix = this.detectMetadataPrefix(endpoint, sc);
if (prefix==null || prefix.trim().isEmpty()) {
logger.warn("Failed to automatically detect metadata prefix for OAI-PMH endpoint");
this.initialized = false;
return;
} else {
logger.warn(String.format("Metadata prefix for OAI-PMH endpoint [%s] automatically detected [%s]", endpoint.getUrl(), prefix));
crawl.setPrefix(prefix);
crawlService.save(crawl);
}
}
this.setPrefix(crawl.getPrefix());
this.setCrawlDir(crawlService.getCrawlDirPath(crawl));
this.initialized = true;
}
private String detectMetadataPrefix(Endpoint ep, ExtendedDatamodelContainer sc) {
String rootNs = null;
XmlDatamodelNature xmlNature = sc.getModel().getNature(XmlDatamodelNature.class);
String rootTerminalId = xmlNature.getTerminalId(sc.getRoot().getId());
for (XmlTerminal t : xmlNature.getTerminals()) {
if (t.getId().equals(rootTerminalId)) {
rootNs = t.getNamespace().trim().toLowerCase();
}
}
String prefix = null;
OaiPmhResponseContainer oaiFormatsResponse = oaiPmhClient.listMetadataFormats(ep.getUrl(), null);
if (oaiFormatsResponse!=null && oaiFormatsResponse.getFormats()!=null) {
for (OaiPmhMetadataFormat format : oaiFormatsResponse.getFormats()) {
if (format.getMetadataNamespace().trim().toLowerCase().equals(rootNs)) {
if (prefix==null) {
prefix = format.getMetadataPrefix();
} else {
logger.warn("Multiple metadata prefixes matched for schema. Using first");
}
}
}
}
if (prefix==null) {
logger.warn("Could not detect metadata prefix from namespaced. Trying schema names");
if (oaiFormatsResponse!=null && oaiFormatsResponse.getFormats()!=null) {
for (OaiPmhMetadataFormat format : oaiFormatsResponse.getFormats()) {
if (format.getMetadataPrefix().trim().toLowerCase().equals(sc.getModel().getName().trim().toLowerCase())) {
if (prefix==null) {
prefix = format.getMetadataPrefix();
} else {
logger.warn("Multiple metadata prefixes matched for schema. Using first");
}
}
}
}
}
return prefix;
}
}
\ No newline at end of file
......@@ -117,6 +117,9 @@ public class UpdateServiceImpl implements InitializingBean {
} else if (method.equals("Git Repository")) {
endpointNode.set("accessType", new TextNode("Git Repository"));
endpointNode.set("fileType", new TextNode("TEXT"));
} else if (method.equals("OPAC")) {
endpointNode.set("accessType", new TextNode("OPAC"));
endpointNode.set("fileType", new TextNode("XML"));
}
}
mongoTemplate.save(collectionNode.toString(), "collection");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment