Commit 0662c990 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

1327: Separate IndexingService from ResourceEnrichmentService

Task-Url: https://pm.winseda.de/issues/1327
parent 29190903
......@@ -4,19 +4,14 @@ import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
import java.util.regex.Matcher;
import org.apache.commons.codec.digest.DigestUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.index.mapper.StrictDynamicMappingException;
import org.jsoup.select.Evaluator.IsEmpty;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
......@@ -27,47 +22,32 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.MissingNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.node.ValueNode;
import de.unibamberg.minf.core.util.json.JsonNodeHelper;
import de.unibamberg.minf.dme.model.base.Nonterminal;
import de.unibamberg.minf.dme.model.datamodel.NonterminalImpl;
import de.unibamberg.minf.processing.exception.ProcessingConfigException;
import de.unibamberg.minf.processing.model.SerializableResource;
import de.unibamberg.minf.processing.model.SerializableRootResource;
import de.unibamberg.minf.processing.model.base.Resource;
import de.unibamberg.minf.processing.model.helper.ResourceHelper;
import de.unibamberg.minf.processing.service.online.OaiPmhHarvestingService;
import eu.dariah.de.search.Constants;
import eu.dariah.de.search.Constants.IndexingStages;
import eu.dariah.de.search.Constants.RootElementKeys;
import eu.dariah.de.search.data.service.CachedImageService;
import eu.dariah.de.search.es.client.IndexingClient;
import eu.dariah.de.search.es.service.base.BaseEsServiceImpl;
import eu.dariah.de.search.indexing.model.ResourceContainer;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.DatamodelService;
import eu.dariah.de.search.service.MappingService;
import eu.dariah.de.search.transformation.ResourceEnrichmentService;
import eu.dariah.de.search.transformation.ResourceTransformationService;
@Component
@Scope("prototype")
public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingService {
public static String GSMODEL_IDENTIFIER = "Indexing.Identifier";
public static String GSMODEL_IMAGE = "Indexing.Image";
public static int BULK_CHUNK_SIZE = 10;
public static final String GSMODEL_IDENTIFIER = "Indexing.Identifier";
public static final String GSMODEL_IMAGE = "Indexing.Image";
public static final int BULK_CHUNK_SIZE = 10;
@Autowired private IndexingClient indexingClient;
@Autowired private MappingService mappingService;
@Autowired private DatamodelService datamodelService;
@Autowired private MappingGenerationService mappingGenerationService;
@Autowired private AdminService adminService;
@Autowired private ResourceTransformationService transformationService;
@Autowired private CachedImageService cachedImageService;
@Autowired private JsonNodeHelper jsonNodeHelper;
......@@ -85,14 +65,7 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
@Value("${indexing.resources.index_errors:#{true}}")
private boolean indexErrors;
private String integrationsProcessingRoot;
private String presentationsProcessingRoot;
private Queue<IndexingStages> stageQueue;
private String index;
private String type;
private String datasourceId;
......@@ -119,32 +92,13 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
@Override
public void init(String datamodelId) {
try {
enrichmentService.init(datamodelId);
} catch (ProcessingConfigException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// Consequent calls from transformationService.init() need no action
if (!initialized) {
logger.debug("Initializing new indexing client...");
this.setupStages();
this.runStage();
}
}
private void setupStages() {
stageQueue = new LinkedList<>();
stageQueue.add(IndexingStages.RESOURCE_COLLECTION);
stageQueue.add(IndexingStages.INDEXING);
}
private void runStage() {
if (this.isCurrentStage(IndexingStages.INDEXING)) {
this.appendMetadata();
this.indexResources();
this.commit();
try {
logger.debug("Initializing new indexing client...");
enrichmentService.init(datamodelId);
} catch (ProcessingConfigException e) {
logger.error("Failed to initialize indexing client", e);
}
}
}
......@@ -160,21 +114,19 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
@Override
public int commit() {
stageQueue.remove(stageQueue.peek());
if (stageQueue.isEmpty()) {
// Restart with further incoming resources -> occurrs on every chunk
this.setupStages();
}
this.runStage();
this.enrichmentService.commit();
this.appendMetadata();
this.indexResources();
this.enrichmentService.reset();
return 0;
}
private void appendMetadata() {
// Metadata rendered with the resource
for (ResourceContainer rc : this.enrichmentService.getResourceBatch()) {
Resource rMetaParent = new SerializableResource(Constants.ELEMENT_KEY_META_RESOURCE, null);
Resource rMetaParentMerge;
if (SerializableRootResource.class.isAssignableFrom(rc.getContent().getClass())) {
......@@ -189,8 +141,6 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
}
}
}
/*Resource rResource*/
JsonNode n = resourceToSimpleNode(rMetaParent);
......@@ -205,7 +155,6 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
jsonNodeHelper.replaceRecursive(metaNode, Constants.ELEMENT_KEY_ERRORS.split("\\."), errNode);
}
List<Resource> idRes = ResourceHelper.findRecursive(rMetaParent, "_id");
......@@ -224,13 +173,8 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
rc.setId(DigestUtils.md2Hex(UUID.randomUUID().toString()));
}
}
rc.setMeta(metaNode.get(Constants.ELEMENT_KEY_META));
}
}
private JsonNode resourceToSimpleNode(Resource r) {
......@@ -395,6 +339,8 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
}
}
logger.debug("Bulk indexed {} documents", succSources.size());
for (Entry<String, Map<String, Object>> succSource : succSources.entrySet()) {
if (succSource.getValue().containsKey(RootElementKeys.PRESENTATION.toString())) {
this.handleDownloads(succSource.getKey(), (Resource)succSource.getValue().get(RootElementKeys.PRESENTATION.toString()));
......@@ -418,12 +364,4 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
}
return true;
}
private IndexingStages getCurrentStage() {
return this.stageQueue.peek();
}
private boolean isCurrentStage(IndexingStages compareStage) {
return this.getCurrentStage()==compareStage;
}
}
......@@ -37,6 +37,7 @@ import eu.dariah.de.search.query.results.QueryResult;
import eu.dariah.de.search.service.CollectionService;
import eu.dariah.de.search.service.DatamodelService;
import eu.dariah.de.search.service.MappingService;
import eu.dariah.de.search.transformation.ResourceEnrichmentService;
@Component
public class SruQueryExecutionServiceImpl implements QueryExecutionService {
......@@ -50,7 +51,7 @@ public class SruQueryExecutionServiceImpl implements QueryExecutionService {
@Autowired protected RestTemplate restTemplate;
@Autowired private DatamodelService datamodelService;
@Autowired private MappingService mappingService;
@Value("${datamodels.integration}")
......@@ -116,11 +117,10 @@ public class SruQueryExecutionServiceImpl implements QueryExecutionService {
Element r = s.getOrRenderElementHierarchy();
XmlProcessingService processingSvc = appContext.getBean(XmlProcessingService.class);
CollectingResourceConsumptionServiceImpl consumptionService = new CollectingResourceConsumptionServiceImpl();
ResourceEnrichmentService enrichmentService = appContext.getBean(ResourceEnrichmentService.class);
processingSvc.setSchema(s.getModel());
processingSvc.addConsumptionService(consumptionService);
processingSvc.addConsumptionService(enrichmentService);
try {
//processingSvc.setExecutionContext(new SessionExecutionContext(sessionsPath, session.getId()));
processingSvc.setRoot((Nonterminal)r);
......@@ -138,11 +138,13 @@ public class SruQueryExecutionServiceImpl implements QueryExecutionService {
processingSvc.setInputStream(IOUtils.toInputStream(sss, Charset.defaultCharset()));
processingSvc.run();
logger.debug("Done");
} catch (Exception e) {
logger.error("Error", e);
}
System.out.print(false);
}
}
return null;
......
......@@ -7,4 +7,6 @@ import eu.dariah.de.search.indexing.model.ResourceContainer;
public interface ResourceEnrichmentService extends MappedResourceConsumptionService {
public List<ResourceContainer> getResourceBatch();
public void reset();
}
......@@ -69,37 +69,28 @@ public class ResourceEnrichmentServiceImpl implements ResourceEnrichmentService
public void init(String datamodelId) {
// Consequent calls from transformationService.init() need no action
if (!initialized) {
logger.debug("Initializing new indexing client...");
logger.debug("Initializing new resource enrichtment service...");
this.sourceModelId = datamodelId;
this.setupStages();
this.runStage();
this.initialized=true;
this.resourceBatch = new ArrayList<>();
if (integrationModelEntityId==null) {
logger.warn("No integrations model set, indexing source data model only");
} else {
try {
integrationsProcessingRoot = this.findProcessingRootElementId(integrationModelEntityId);
} catch (Exception e) {
// TODO: handle exception
}
integrationsProcessingRoot = this.findProcessingRootElementId(integrationModelEntityId);
}
if (presentationModelEntityId==null) {
logger.warn("No index model set, no presentation layer available");
} else {
try {
presentationsProcessingRoot = this.findProcessingRootElementId(presentationModelEntityId);
} catch (Exception e) {
// TODO: handle exception
}
presentationsProcessingRoot = this.findProcessingRootElementId(presentationModelEntityId);
}
this.runStage();
}
}
private void setupStages() {
stageQueue = new LinkedList<ResourceEnrichmentStages>();
stageQueue = new LinkedList<>();
stageQueue.add(ResourceEnrichmentStages.RESOURCE_COLLECTION);
// Collect available, mapping-based indexing stages
......@@ -129,10 +120,7 @@ public class ResourceEnrichmentServiceImpl implements ResourceEnrichmentService
}
private void runStage() {
if (this.isCurrentStage(ResourceEnrichmentStages.RESOURCE_COLLECTION)) {
// Noting else to do -> consume and commit are called externally
this.resourceBatch = new ArrayList<>();
} else {
if (!this.isCurrentStage(ResourceEnrichmentStages.RESOURCE_COLLECTION)) {
try {
if (this.isCurrentStage(ResourceEnrichmentStages.RESOURCE_PRESENTATION)) {
transformationService.init(mappingService.getMappingBySourceAndTarget(this.getSourceModelId(), presentationModelEntityId), this);
......@@ -257,4 +245,10 @@ public class ResourceEnrichmentServiceImpl implements ResourceEnrichmentService
private boolean isCurrentStage(ResourceEnrichmentStages compareStage) {
return this.getCurrentStage()==compareStage;
}
@Override
public void reset() {
this.resourceBatch.clear();
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment