Created
April 23, 2020 02:40
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java | |
index cfedb5777e7..dd5a8fe2968 100644 | |
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java | |
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java | |
@@ -97,7 +97,7 @@ import static java.util.Collections.emptyMap; | |
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> { | |
private static final Logger logger = LogManager.getLogger(TransportBulkAction.class); | |
- | |
+ private static final Logger customLogger = LogManager.getLogger("*********************"); | |
private final ThreadPool threadPool; | |
private final AutoCreateIndex autoCreateIndex; | |
private final ClusterService clusterService; | |
@@ -668,6 +668,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul | |
logger.debug("failed to execute pipeline for a bulk request", exception); | |
listener.onFailure(exception); | |
} else { | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Success handler for bulk"); | |
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); | |
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); | |
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, | |
@@ -692,6 +693,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul | |
@Override | |
protected void doRun() throws Exception { | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Before doExecute for bulk"); | |
doExecute(task, bulkRequest, actionListener); | |
} | |
diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java | |
index 9cc414c5a15..12fa430de48 100644 | |
--- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java | |
+++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java | |
@@ -19,6 +19,8 @@ | |
package org.elasticsearch.ingest; | |
+import org.apache.logging.log4j.LogManager; | |
+import org.apache.logging.log4j.Logger; | |
import org.elasticsearch.ElasticsearchException; | |
import org.elasticsearch.common.collect.Tuple; | |
@@ -47,6 +49,7 @@ public class CompoundProcessor implements Processor { | |
private final List<Processor> onFailureProcessors; | |
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics; | |
private final LongSupplier relativeTimeProvider; | |
+ private static final Logger customLogger = LogManager.getLogger("*********************"); | |
CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) { | |
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider); | |
@@ -154,8 +157,10 @@ public class CompoundProcessor implements Processor { | |
} | |
} else { | |
if (result != null) { | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] innerExecute -> result != null "); | |
innerExecute(currentProcessor + 1, result, handler); | |
} else { | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] innerExecute -> result == null "); | |
handler.accept(null, null); | |
} | |
} | |
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java | |
index d57264dc88d..4bbd88b3090 100644 | |
--- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java | |
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java | |
@@ -647,7 +647,8 @@ public final class IngestDocument { | |
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) { | |
if (executedPipelines.add(pipeline.getId())) { | |
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId()); | |
- pipeline.execute(this, (result, e) -> { | |
+ pipeline. | |
+ execute(this, (result, e) -> { | |
executedPipelines.remove(pipeline.getId()); | |
if (previousPipeline != null) { | |
ingestMetadata.put("pipeline", previousPipeline); | |
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java | |
index 1f9190c58bb..a0cf12245b3 100644 | |
--- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java | |
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java | |
@@ -77,6 +77,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge | |
public static final String NOOP_PIPELINE_NAME = "_none"; | |
private static final Logger logger = LogManager.getLogger(IngestService.class); | |
+ private static final Logger customLogger = LogManager.getLogger("*********************"); | |
private final ClusterService clusterService; | |
private final ScriptService scriptService; | |
@@ -382,8 +383,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge | |
continue; | |
} | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Before execute pipelines in IngestService.executeBulkRequest"); | |
executePipelines(i, pipelines.iterator(), indexRequest, onDropped, onFailure, counter, onCompletion, originalThread); | |
- | |
i++; | |
} | |
} | |
@@ -401,6 +402,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge | |
final Thread originalThread | |
) { | |
while (it.hasNext()) { | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] In IngestService.executePipelines"); | |
final String pipelineId = it.next(); | |
try { | |
PipelineHolder holder = pipelines.get(pipelineId); | |
@@ -414,6 +416,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge | |
} | |
if (it.hasNext()) { | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Before execute pipelines in IngestService.executePipelines"); | |
executePipelines(slot, it, indexRequest, onDropped, onFailure, counter, onCompletion, originalThread); | |
} else { | |
if (counter.decrementAndGet() == 0) { | |
diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java | |
index 3d41d991f3e..33ca113b693 100644 | |
--- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java | |
+++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java | |
@@ -19,6 +19,8 @@ | |
package org.elasticsearch.ingest; | |
+import org.apache.logging.log4j.LogManager; | |
+import org.apache.logging.log4j.Logger; | |
import org.elasticsearch.ElasticsearchParseException; | |
import org.elasticsearch.common.Nullable; | |
@@ -29,6 +31,7 @@ import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.BiConsumer; | |
import java.util.function.LongSupplier; | |
+import java.util.stream.Collectors; | |
import org.elasticsearch.script.ScriptService; | |
@@ -50,6 +53,7 @@ public final class Pipeline { | |
private final CompoundProcessor compoundProcessor; | |
private final IngestMetric metrics; | |
private final LongSupplier relativeTimeProvider; | |
+ private static final Logger customLogger = LogManager.getLogger("*********************"); | |
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { | |
this(id, description, version, compoundProcessor, System::nanoTime); | |
@@ -97,6 +101,9 @@ public final class Pipeline { | |
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) { | |
final long startTimeInNanos = relativeTimeProvider.getAsLong(); | |
metrics.preIngest(); | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Processing ingest document"); | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "]" + | |
+ Arrays.stream(Thread.currentThread().getStackTrace()).map(StackTraceElement::toString).collect(Collectors.joining("\n"))); | |
compoundProcessor.execute(ingestDocument, (result, e) -> { | |
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); | |
metrics.postIngest(ingestTimeInMillis); | |
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java | |
index e285c8fef23..d10fdb4687f 100644 | |
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java | |
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java | |
@@ -5,6 +5,8 @@ | |
*/ | |
package org.elasticsearch.xpack.enrich.action; | |
+import org.apache.logging.log4j.LogManager; | |
+import org.apache.logging.log4j.Logger; | |
import org.apache.logging.log4j.util.BiConsumer; | |
import org.elasticsearch.action.ActionListener; | |
import org.elasticsearch.action.ActionType; | |
@@ -47,6 +49,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> { | |
public static final EnrichCoordinatorProxyAction INSTANCE = new EnrichCoordinatorProxyAction(); | |
public static final String NAME = "indices:data/read/xpack/enrich/coordinate_lookups"; | |
+ private static final Logger customLogger = LogManager.getLogger("*********************"); | |
private EnrichCoordinatorProxyAction() { | |
super(NAME, SearchResponse::new); | |
} | |
@@ -130,6 +133,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> { | |
while (queue.isEmpty() == false && remoteRequestsCurrent.get() < maxNumberOfConcurrentRequests) { | |
final List<Slot> slots = new ArrayList<>(); | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Draining queue"); | |
queue.drainTo(slots, maxLookupsPerRequest); | |
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); | |
slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest)); | |
@@ -153,6 +157,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> { | |
if (responseItem.isFailure()) { | |
slot.actionListener.onFailure(responseItem.getFailure()); | |
} else { | |
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Calling slot.actionListener"); | |
slot.actionListener.onResponse(responseItem.getResponse()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
repo case:
logs: