Commit 8a1c29c6 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

1327: Separate IndexingService from ResourceEnrichmentService

Task-Url: https://pm.winseda.de/issues/1327
parent f2e19925
......@@ -39,12 +39,16 @@ public class Constants {
public static final Pattern ONLINE_FILE_PATTERN = Pattern.compile("^(https?|ftp)://.*$");
public enum IndexingStages {
public enum ResourceEnrichmentStages {
RESOURCE_COLLECTION, // Collecting resources as provided
RESOURCE_PRESENTATION, // Extending presentation layer to resources as provided
INTEGRATIONS_TRANSFORMATION, // Transforming collected resources to integrations model
INTEGRATIONS_PRESENTATION, // Extending presentation layer to transformed resources
INDEXING // Finally indexing data
INTEGRATIONS_PRESENTATION // Extending presentation layer to transformed resources
}
public enum IndexingStages {
RESOURCE_COLLECTION,
INDEXING
}
public enum ResultElementRelationTypes {
......
package eu.dariah.de.search.es.service;
import de.unibamberg.minf.processing.consumption.MappedResourceConsumptionService;
import de.unibamberg.minf.processing.consumption.ResourceConsumptionService;
import eu.dariah.de.search.es.client.base.BaseEsClient;
public interface IndexingService extends BaseEsClient, MappedResourceConsumptionService {
public interface IndexingService extends BaseEsClient, ResourceConsumptionService {
}
......@@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.node.ValueNode;
import de.unibamberg.minf.core.util.json.JsonNodeHelper;
import de.unibamberg.minf.dme.model.base.Nonterminal;
import de.unibamberg.minf.dme.model.datamodel.NonterminalImpl;
import de.unibamberg.minf.processing.exception.ProcessingConfigException;
import de.unibamberg.minf.processing.model.SerializableResource;
import de.unibamberg.minf.processing.model.SerializableRootResource;
import de.unibamberg.minf.processing.model.base.Resource;
......@@ -49,6 +50,7 @@ import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.DatamodelService;
import eu.dariah.de.search.service.MappingService;
import eu.dariah.de.search.transformation.ResourceEnrichmentService;
import eu.dariah.de.search.transformation.ResourceTransformationService;
@Component
......@@ -89,7 +91,7 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
private String presentationsProcessingRoot;
private Queue<IndexingStages> stageQueue;
private List<ResourceContainer> resourceBatch;
private String index;
private String type;
......@@ -113,203 +115,65 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
public boolean isInitialized() { return initialized; }
@Autowired private ResourceEnrichmentService enrichmentService;
@Override
public void init(String schema) {
public void init(String datamodelId) {
try {
enrichmentService.init(datamodelId);
} catch (ProcessingConfigException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// Consequent calls from transformationService.init() need no action
if (!initialized) {
logger.debug("Initializing new indexing client...");
this.setupStages();
this.runStage();
this.initialized=true;
if (integrationModelEntityId==null) {
logger.warn("No integrations model set, indexing source data model only");
} else {
try {
integrationsProcessingRoot = this.findProcessingRootElementId(integrationModelEntityId);
} catch (Exception e) {
// TODO: handle exception
}
}
if (presentationModelEntityId==null) {
logger.warn("No index model set, no presentation layer available");
} else {
try {
presentationsProcessingRoot = this.findProcessingRootElementId(presentationModelEntityId);
} catch (Exception e) {
// TODO: handle exception
}
}
}
}
private void setupStages() {
stageQueue = new LinkedList<IndexingStages>();
stageQueue = new LinkedList<>();
stageQueue.add(IndexingStages.RESOURCE_COLLECTION);
// Collect available, mapping-based indexing stages
try {
if (presentationModelEntityId!=null && !this.getType().equals(presentationModelEntityId) && this.mappingService.getMappingBySourceAndTarget(type, presentationModelEntityId)!=null) {
stageQueue.add(IndexingStages.RESOURCE_PRESENTATION);
}
} catch (Exception e) {
logger.error("Failed to setup RESOURCE_PRESENTATION stage", e);
}
try {
if (integrationModelEntityId!=null && !this.getType().equals(integrationModelEntityId) && mappingService.getMappingBySourceAndTarget(type, integrationModelEntityId)!=null) {
stageQueue.add(IndexingStages.INTEGRATIONS_TRANSFORMATION);
try {
if (presentationModelEntityId!=null && !presentationModelEntityId.equals(integrationModelEntityId) && mappingService.getMappingBySourceAndTarget(integrationModelEntityId, presentationModelEntityId)!=null) {
stageQueue.add(IndexingStages.INTEGRATIONS_PRESENTATION);
}
} catch (Exception e) {
logger.error("Failed to setup INTEGRATIONS_PRESENTATION stage", e);
}
}
} catch (Exception e) {
logger.error("Failed to setup INTEGRATIONS_TRANSFORMATION stage", e);
throw e;
}
stageQueue.add(IndexingStages.INDEXING);
}
private void runStage() {
if (this.isCurrentStage(IndexingStages.RESOURCE_COLLECTION)) {
// Noting else to do -> consume and commit are called externally
this.resourceBatch = new ArrayList<ResourceContainer>();
} else if (this.isCurrentStage(IndexingStages.INDEXING)) {
if (this.isCurrentStage(IndexingStages.INDEXING)) {
this.appendMetadata();
this.indexResources();
this.commit();
} else {
try {
if (this.isCurrentStage(IndexingStages.RESOURCE_PRESENTATION)) {
transformationService.init(mappingService.getMappingBySourceAndTarget(type, presentationModelEntityId), this);
} else if (this.isCurrentStage(IndexingStages.INTEGRATIONS_TRANSFORMATION)) {
transformationService.init(mappingService.getMappingBySourceAndTarget(type, integrationModelEntityId), this);
} else if (this.isCurrentStage(IndexingStages.INTEGRATIONS_PRESENTATION)) {
transformationService.init(mappingService.getMappingBySourceAndTarget(integrationModelEntityId, presentationModelEntityId), this);
}
// To prevent co-modification: presentation stages can create partial resources
List<ResourceContainer> transformBatch = new ArrayList<ResourceContainer>();
transformBatch.addAll(this.resourceBatch);
transformationService.transformResources(transformBatch);
} catch (Exception e) {
logger.error(String.format("Failed to execute indexing stage [%s]", this.getCurrentStage().toString()), e);
}
}
}
}
@Override
public void consume(Resource resource) {
this.consume(resource, null);
}
@Override
public void consume(Resource resource, Resource rTransformed) {
if (resource==null) {
return;
}
if (this.isCurrentStage(IndexingStages.RESOURCE_COLLECTION)) {
ResourceContainer rc = this.collectResource(resource);
rc.setCurrentStage(this.getCurrentStage());
resourceBatch.add(rc);
// Chunk-processing incoming data
if (resourceBatch.size()>=BULK_CHUNK_SIZE) {
this.commit();
}
return;
}
if (rTransformed==null) {
return;
}
this.enrichmentService.consume(resource);
ResourceContainer rc = (ResourceContainer)resource;
rc.setCurrentStage(this.getCurrentStage());
if (this.getCurrentStage()==IndexingStages.INTEGRATIONS_TRANSFORMATION) {
rc.setCurrentResource(this.findProcessingRoot(rTransformed, integrationsProcessingRoot));
} else {
rc.setCurrentResource(this.findProcessingRoot(rTransformed, presentationsProcessingRoot));
}
// For our split resources
if (!resourceBatch.contains(rc)) {
resourceBatch.add(rc);
// Chunk-processing incoming data
if (this.enrichmentService.getResourceBatch().size()>=BULK_CHUNK_SIZE) {
this.commit();
}
}
@Override
public int commit() {
stageQueue.remove(stageQueue.peek());
if (stageQueue.isEmpty()) {
// Restart with further incoming resources -> occurrs on every chunk
this.setupStages();
}
this.runStage();
this.enrichmentService.commit();
return 0;
}
private String findProcessingRootElementId(String entityId) {
if (entityId!=null) {
ExtendedDatamodelContainer dmc = datamodelService.findById(entityId);
Nonterminal root = (Nonterminal)dmc.getOrRenderElementHierarchy();
Nonterminal processingRoot = NonterminalImpl.findProcessingRoot(root);
return processingRoot==null ? root.getId() : processingRoot.getId();
}
return null;
}
private ResourceContainer collectResource(Resource resource) {
try {
Resource r = null;
if (resource.getKey().equals(OaiPmhHarvestingService.OAI_PMH_ENVELOPE_RESOURCE)) {
for (Resource rChild : resource.getChildResources()) {
if (rChild.getKey().equals(RootElementKeys.CONTENT.toString())) {
r = rChild;
}
}
if (r==null || r.getChildResources().isEmpty()) {
logger.debug("Skipping empty resource");
return null;
} else if (r.getChildResources().size()>1) {
logger.warn("Muliple resources found under one content element...only first 'root' element is processed");
}
r = r.getChildResources().get(0);
} else {
r = resource;
}
return new ResourceContainer(r);
} catch (Exception e) {
logger.warn("Failed to index processed resource", e);
}
return null;
}
private Resource findProcessingRoot(Resource resource, String processingRootId) {
if (!resource.getElementId().equals(processingRootId)) {
if (resource.getChildResources()!=null) {
Resource rRoot;
for (Resource rChild : resource.getChildResources()) {
rRoot = this.findProcessingRoot(rChild, processingRootId);
if (rRoot!=null) {
return rRoot;
}
}
}
} else {
return resource;
}
return null;
}
private void appendMetadata() {
// Metadata rendered with the resource
for (ResourceContainer rc : this.resourceBatch) {
rc.setCurrentStage(this.getCurrentStage());
for (ResourceContainer rc : this.enrichmentService.getResourceBatch()) {
Resource rMetaParent = new SerializableResource(Constants.ELEMENT_KEY_META_RESOURCE, null);
Resource rMetaParentMerge;
......@@ -462,13 +326,13 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
}
private int indexResources() {
if (this.resourceBatch.isEmpty()) {
if (this.enrichmentService.getResourceBatch().isEmpty()) {
return 0;
}
// TODO: Refactor this...
Map<String, Map<String, Object>> sources = new HashMap<>();
for (ResourceContainer rc : this.resourceBatch) {
for (ResourceContainer rc : this.enrichmentService.getResourceBatch()) {
sources.put(rc.getId(), rc.toSource());
}
......
......@@ -10,11 +10,11 @@ import com.fasterxml.jackson.databind.JsonNode;
import de.unibamberg.minf.processing.model.SerializableResource;
import de.unibamberg.minf.processing.model.base.Resource;
import de.unibamberg.minf.processing.model.helper.ResourceHelper;
import eu.dariah.de.search.Constants.IndexingStages;
import eu.dariah.de.search.Constants.ResourceEnrichmentStages;
import eu.dariah.de.search.Constants.RootElementKeys;
public class ResourceContainer implements Resource {
private IndexingStages currentStage;
private ResourceEnrichmentStages currentStage;
private Resource content;
private Resource integrations;
......@@ -22,8 +22,8 @@ public class ResourceContainer implements Resource {
private JsonNode meta;
private String id;
public IndexingStages getCurrentStage() { return currentStage; }
public void setCurrentStage(IndexingStages currentStage) { this.currentStage = currentStage; }
public ResourceEnrichmentStages getCurrentStage() { return currentStage; }
public void setCurrentStage(ResourceEnrichmentStages currentStage) { this.currentStage = currentStage; }
public Resource getContent() { return content; }
public Resource getIntegrations() { return integrations; }
......@@ -43,13 +43,13 @@ public class ResourceContainer implements Resource {
}
public void setCurrentResource(Resource currentResource) {
if (this.currentStage == IndexingStages.RESOURCE_PRESENTATION) {
if (this.currentStage == ResourceEnrichmentStages.RESOURCE_PRESENTATION) {
if (currentResource!=null) {
this.presentation = currentResource;
}
} else if (this.currentStage == IndexingStages.INTEGRATIONS_TRANSFORMATION) {
} else if (this.currentStage == ResourceEnrichmentStages.INTEGRATIONS_TRANSFORMATION) {
this.integrations = currentResource;
} else if (this.currentStage == IndexingStages.INTEGRATIONS_PRESENTATION) {
} else if (this.currentStage == ResourceEnrichmentStages.INTEGRATIONS_PRESENTATION) {
if (this.presentation!=null && currentResource!=null) {
ResourceHelper.mergeResourceTrees(this.presentation, (Resource)currentResource);
} else if (currentResource!=null) {
......@@ -60,7 +60,7 @@ public class ResourceContainer implements Resource {
public Resource getCurrentResource() {
// Resource currently at transformation => now presentation
if (this.currentStage == IndexingStages.INTEGRATIONS_TRANSFORMATION) {
if (this.currentStage == ResourceEnrichmentStages.INTEGRATIONS_TRANSFORMATION) {
return this.integrations;
} else {
return this.content;
......
package eu.dariah.de.search.transformation;
import java.util.List;
import de.unibamberg.minf.processing.consumption.MappedResourceConsumptionService;
import eu.dariah.de.search.indexing.model.ResourceContainer;
public interface ResourceEnrichmentService extends MappedResourceConsumptionService {
public List<ResourceContainer> getResourceBatch();
}
package eu.dariah.de.search.transformation;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import de.unibamberg.minf.dme.model.base.Nonterminal;
import de.unibamberg.minf.dme.model.datamodel.NonterminalImpl;
import de.unibamberg.minf.processing.model.base.Resource;
import de.unibamberg.minf.processing.service.online.OaiPmhHarvestingService;
import eu.dariah.de.search.Constants.ResourceEnrichmentStages;
import eu.dariah.de.search.Constants.RootElementKeys;
import eu.dariah.de.search.indexing.model.ResourceContainer;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.DatamodelService;
import eu.dariah.de.search.service.MappingService;
@Component
@Scope("prototype")
public class ResourceEnrichmentServiceImpl implements ResourceEnrichmentService {
protected final Logger logger = LoggerFactory.getLogger(ResourceEnrichmentServiceImpl.class);
@Autowired private MappingService mappingService;
@Autowired private DatamodelService datamodelService;
@Autowired private ResourceTransformationService transformationService;
@Value("${datamodels.integration}")
private String integrationModelEntityId;
@Value("${datamodels.presentation}")
private String presentationModelEntityId;
@Value("${debugging.indexing.disable_downloads:#{false}}")
private boolean disableDownloads;
@Value("${indexing.resources.index_errors:#{true}}")
private boolean indexErrors;
private String integrationsProcessingRoot;
private String presentationsProcessingRoot;
private Queue<ResourceEnrichmentStages> stageQueue;
private List<ResourceContainer> resourceBatch;
private String sourceModelId;
private boolean initialized = false;
public boolean isInitialized() { return initialized; }
public String getSourceModelId() { return sourceModelId; }
public void setSourceModelId(String sourceModelId) { this.sourceModelId = sourceModelId; }
@Override
public List<ResourceContainer> getResourceBatch() { return resourceBatch; }
@Override
public void init(String datamodelId) {
// Consequent calls from transformationService.init() need no action
if (!initialized) {
logger.debug("Initializing new indexing client...");
this.sourceModelId = datamodelId;
this.setupStages();
this.runStage();
this.initialized=true;
if (integrationModelEntityId==null) {
logger.warn("No integrations model set, indexing source data model only");
} else {
try {
integrationsProcessingRoot = this.findProcessingRootElementId(integrationModelEntityId);
} catch (Exception e) {
// TODO: handle exception
}
}
if (presentationModelEntityId==null) {
logger.warn("No index model set, no presentation layer available");
} else {
try {
presentationsProcessingRoot = this.findProcessingRootElementId(presentationModelEntityId);
} catch (Exception e) {
// TODO: handle exception
}
}
}
}
private void setupStages() {
stageQueue = new LinkedList<ResourceEnrichmentStages>();
stageQueue.add(ResourceEnrichmentStages.RESOURCE_COLLECTION);
// Collect available, mapping-based indexing stages
try {
if (presentationModelEntityId!=null && !this.getSourceModelId().equals(presentationModelEntityId) && this.mappingService.getMappingBySourceAndTarget(this.getSourceModelId(), presentationModelEntityId)!=null) {
stageQueue.add(ResourceEnrichmentStages.RESOURCE_PRESENTATION);
}
} catch (Exception e) {
logger.error("Failed to setup RESOURCE_PRESENTATION stage", e);
}
try {
if (integrationModelEntityId!=null && !this.getSourceModelId().equals(integrationModelEntityId) && mappingService.getMappingBySourceAndTarget(this.getSourceModelId(), integrationModelEntityId)!=null) {
stageQueue.add(ResourceEnrichmentStages.INTEGRATIONS_TRANSFORMATION);
try {
if (presentationModelEntityId!=null && !presentationModelEntityId.equals(integrationModelEntityId) && mappingService.getMappingBySourceAndTarget(integrationModelEntityId, presentationModelEntityId)!=null) {
stageQueue.add(ResourceEnrichmentStages.INTEGRATIONS_PRESENTATION);
}
} catch (Exception e) {
logger.error("Failed to setup INTEGRATIONS_PRESENTATION stage", e);
}
}
} catch (Exception e) {
logger.error("Failed to setup INTEGRATIONS_TRANSFORMATION stage", e);
throw e;
}
}
private void runStage() {
if (this.isCurrentStage(ResourceEnrichmentStages.RESOURCE_COLLECTION)) {
// Noting else to do -> consume and commit are called externally
this.resourceBatch = new ArrayList<>();
} else {
try {
if (this.isCurrentStage(ResourceEnrichmentStages.RESOURCE_PRESENTATION)) {
transformationService.init(mappingService.getMappingBySourceAndTarget(this.getSourceModelId(), presentationModelEntityId), this);
} else if (this.isCurrentStage(ResourceEnrichmentStages.INTEGRATIONS_TRANSFORMATION)) {
transformationService.init(mappingService.getMappingBySourceAndTarget(this.getSourceModelId(), integrationModelEntityId), this);
} else if (this.isCurrentStage(ResourceEnrichmentStages.INTEGRATIONS_PRESENTATION)) {
transformationService.init(mappingService.getMappingBySourceAndTarget(integrationModelEntityId, presentationModelEntityId), this);
}
// To prevent co-modification: presentation stages can create partial resources
List<ResourceContainer> transformBatch = new ArrayList<>();
transformBatch.addAll(this.resourceBatch);
transformationService.transformResources(transformBatch);
} catch (Exception e) {
logger.error(String.format("Failed to execute indexing stage [%s]", this.getCurrentStage().toString()), e);
}
}
}
@Override
public void consume(Resource resource) {
this.consume(resource, null);
}
@Override
public void consume(Resource resource, Resource rTransformed) {
if (resource==null) {
return;
}
if (this.isCurrentStage(ResourceEnrichmentStages.RESOURCE_COLLECTION)) {
ResourceContainer rc = this.collectResource(resource);
rc.setCurrentStage(this.getCurrentStage());
resourceBatch.add(rc);
return;
}
if (rTransformed==null) {
return;
}
ResourceContainer rc = (ResourceContainer)resource;
rc.setCurrentStage(this.getCurrentStage());
if (this.getCurrentStage()==ResourceEnrichmentStages.INTEGRATIONS_TRANSFORMATION) {
rc.setCurrentResource(this.findProcessingRoot(rTransformed, integrationsProcessingRoot));
} else {
rc.setCurrentResource(this.findProcessingRoot(rTransformed, presentationsProcessingRoot));
}
// For our split resources
if (!resourceBatch.contains(rc)) {
resourceBatch.add(rc);
}
}
@Override
public int commit() {
stageQueue.remove(stageQueue.peek());
if (stageQueue.isEmpty()) {
// Restart with further incoming resources -> occurrs on every chunk
this.setupStages();
}
this.runStage();
return 0;
}
private String findProcessingRootElementId(String entityId) {
if (entityId!=null) {
ExtendedDatamodelContainer dmc = datamodelService.findById(entityId);
Nonterminal root = (Nonterminal)dmc.getOrRenderElementHierarchy();
Nonterminal processingRoot = NonterminalImpl.findProcessingRoot(root);
return processingRoot==null ? root.getId() : processingRoot.getId();
}
return null;
}
private ResourceContainer collectResource(Resource resource) {
try {
Resource r = null;
if (resource.getKey().equals(OaiPmhHarvestingService.OAI_PMH_ENVELOPE_RESOURCE)) {
for (Resource rChild : resource.getChildResources()) {
if (rChild.getKey().equals(RootElementKeys.CONTENT.toString())) {
r = rChild;
}