Skip to content

Instantly share code, notes, and snippets.

@jakelandis
Created April 23, 2020 02:40
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index cfedb5777e7..dd5a8fe2968 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -97,7 +97,7 @@ import static java.util.Collections.emptyMap;
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private static final Logger logger = LogManager.getLogger(TransportBulkAction.class);
-
+ private static final Logger customLogger = LogManager.getLogger("*********************");
private final ThreadPool threadPool;
private final AutoCreateIndex autoCreateIndex;
private final ClusterService clusterService;
@@ -668,6 +668,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Success handler for bulk");
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis,
@@ -692,6 +693,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override
protected void doRun() throws Exception {
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Before doExecute for bulk");
doExecute(task, bulkRequest, actionListener);
}
diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java
index 9cc414c5a15..12fa430de48 100644
--- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java
+++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java
@@ -19,6 +19,8 @@
package org.elasticsearch.ingest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
@@ -47,6 +49,7 @@ public class CompoundProcessor implements Processor {
private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final LongSupplier relativeTimeProvider;
+ private static final Logger customLogger = LogManager.getLogger("*********************");
CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
@@ -154,8 +157,10 @@ public class CompoundProcessor implements Processor {
}
} else {
if (result != null) {
+ customLogger.warn("[" + Thread.currentThread().getName() + "] innerExecute -> result != null ");
innerExecute(currentProcessor + 1, result, handler);
} else {
+ customLogger.warn("[" + Thread.currentThread().getName() + "] innerExecute -> result == null ");
handler.accept(null, null);
}
}
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
index d57264dc88d..4bbd88b3090 100644
--- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
@@ -647,7 +647,8 @@ public final class IngestDocument {
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
- pipeline.execute(this, (result, e) -> {
+ pipeline.
+ execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
index 1f9190c58bb..a0cf12245b3 100644
--- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
@@ -77,6 +77,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
public static final String NOOP_PIPELINE_NAME = "_none";
private static final Logger logger = LogManager.getLogger(IngestService.class);
+ private static final Logger customLogger = LogManager.getLogger("*********************");
private final ClusterService clusterService;
private final ScriptService scriptService;
@@ -382,8 +383,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
continue;
}
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Before execute pipelines in IngestService.executeBulkRequest");
executePipelines(i, pipelines.iterator(), indexRequest, onDropped, onFailure, counter, onCompletion, originalThread);
-
i++;
}
}
@@ -401,6 +402,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
final Thread originalThread
) {
while (it.hasNext()) {
+ customLogger.warn("[" + Thread.currentThread().getName() + "] In IngestService.executePipelines");
final String pipelineId = it.next();
try {
PipelineHolder holder = pipelines.get(pipelineId);
@@ -414,6 +416,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
}
if (it.hasNext()) {
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Before execute pipelines in IngestService.executePipelines");
executePipelines(slot, it, indexRequest, onDropped, onFailure, counter, onCompletion, originalThread);
} else {
if (counter.decrementAndGet() == 0) {
diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java
index 3d41d991f3e..33ca113b693 100644
--- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java
+++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java
@@ -19,6 +19,8 @@
package org.elasticsearch.ingest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
@@ -29,6 +31,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
import org.elasticsearch.script.ScriptService;
@@ -50,6 +53,7 @@ public final class Pipeline {
private final CompoundProcessor compoundProcessor;
private final IngestMetric metrics;
private final LongSupplier relativeTimeProvider;
+ private static final Logger customLogger = LogManager.getLogger("*********************");
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this(id, description, version, compoundProcessor, System::nanoTime);
@@ -97,6 +101,9 @@ public final class Pipeline {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metrics.preIngest();
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Processing ingest document");
+ customLogger.warn("[" + Thread.currentThread().getName() + "]" +
+ Arrays.stream(Thread.currentThread().getStackTrace()).map(StackTraceElement::toString).collect(Collectors.joining("\n")));
compoundProcessor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metrics.postIngest(ingestTimeInMillis);
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
index e285c8fef23..d10fdb4687f 100644
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
@@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.enrich.action;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.BiConsumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
@@ -47,6 +49,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
public static final EnrichCoordinatorProxyAction INSTANCE = new EnrichCoordinatorProxyAction();
public static final String NAME = "indices:data/read/xpack/enrich/coordinate_lookups";
+ private static final Logger customLogger = LogManager.getLogger("*********************");
private EnrichCoordinatorProxyAction() {
super(NAME, SearchResponse::new);
}
@@ -130,6 +133,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
while (queue.isEmpty() == false && remoteRequestsCurrent.get() < maxNumberOfConcurrentRequests) {
final List<Slot> slots = new ArrayList<>();
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Draining queue");
queue.drainTo(slots, maxLookupsPerRequest);
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest));
@@ -153,6 +157,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
if (responseItem.isFailure()) {
slot.actionListener.onFailure(responseItem.getFailure());
} else {
+ customLogger.warn("[" + Thread.currentThread().getName() + "] Calling slot.actionListener");
slot.actionListener.onResponse(responseItem.getResponse());
}
}
@jakelandis
Copy link
Author

repo case:

POST _bulk
{ "index" : { "_index" : "mysource"} }
{ "my_number" : 1, "my_value" : "a" }

PUT _enrich/policy/myenrich_policy
{
   "match": {
        "indices": "mysource",
        "match_field": "my_number",
        "enrich_fields": ["my_value"]
    }
}

POST _enrich/policy/myenrich_policy/_execute

PUT /_ingest/pipeline/myset_pipeline
{
  "processors": [
    {
      "set": {
        "field": "my_set_field",
        "value": "foobar"
      }
    }
  ]
}

PUT /_ingest/pipeline/myenrich_pipeline
{
  "processors": [
    {
      "enrich": {
        "policy_name": "myenrich_policy",
        "field": "custom_id",
        "target_field": "enrich_value"
      }
    },
    {
      "pipeline": {
        "name": "myset_pipeline"
      }
    }
  ]
}


POST myindex/_doc/1?pipeline=myenrich_pipeline
{
  "custom_id" : 1
}

logs:

[2020-04-22T21:44:53,700][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][write][T#6]] Before execute pipelines in IngestService.executeBulkRequest
[2020-04-22T21:44:53,700][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][write][T#6]] In IngestService.executePipelines
[2020-04-22T21:44:53,700][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][write][T#6]] Processing ingest document
[2020-04-22T21:44:53,701][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][write][T#6]]java.base/java.lang.Thread.getStackTrace(Thread.java:1598)
org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:106)
org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:651)
org.elasticsearch.ingest.IngestService.innerExecute(IngestService.java:508)
org.elasticsearch.ingest.IngestService.executePipelines(IngestService.java:413)
org.elasticsearch.ingest.IngestService$3.doRun(IngestService.java:387)
org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:691)
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
java.base/java.lang.Thread.run(Thread.java:832)
[2020-04-22T21:44:53,702][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][write][T#6]] Draining queue
[2020-04-22T21:44:53,717][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] Calling slot.actionListener
[2020-04-22T21:44:53,718][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] innerExecute -> result != null
[2020-04-22T21:44:53,718][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] Processing ingest document
[2020-04-22T21:44:53,719][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]]java.base/java.lang.Thread.getStackTrace(Thread.java:1598)
org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:106)
org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:651)
org.elasticsearch.ingest.PipelineProcessor.execute(PipelineProcessor.java:45)
org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:141)
org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:161)
org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$execute$0(AbstractEnrichProcessor.java:130)
org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$createSearchRunner$1(AbstractEnrichProcessor.java:182)
org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
org.elasticsearch.client.node.NodeClient.lambda$executeLocally$0(NodeClient.java:91)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:158)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:151)
org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.handleResponse(EnrichCoordinatorProxyAction.java:161)
org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.lambda$coordinateLookups$1(EnrichCoordinatorProxyAction.java:143)
org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.lambda$lookupFunction$4(EnrichCoordinatorProxyAction.java:206)
org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
org.elasticsearch.client.node.NodeClient.lambda$executeLocally$0(NodeClient.java:91)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:158)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:151)
org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction$2.handleResponse(TransportSingleShardAction.java:261)
org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction$2.handleResponse(TransportSingleShardAction.java:247)
org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1101)
org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1179)
org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1159)
org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:54)
org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:47)
org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:30)
org.elasticsearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:58)
org.elasticsearch.action.ActionRunnable$2.doRun(ActionRunnable.java:73)
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:44)
org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:691)
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
java.base/java.lang.Thread.run(Thread.java:832)
[2020-04-22T21:44:53,719][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] innerExecute -> result != null
[2020-04-22T21:44:53,719][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] innerExecute -> result != null
[2020-04-22T21:44:53,719][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] Success handler for bulk
[2020-04-22T21:44:53,720][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][write][T#1]] Before doExecute for bulk
[2020-04-22T21:44:53,720][INFO ][o.e.c.m.MetadataCreateIndexService] [runTask-0] applying create index request using v1 templates []
[2020-04-22T21:44:53,724][INFO ][o.e.c.m.MetadataCreateIndexService] [runTask-0] [myindex] creating index, cause [auto(bulk api)], templates [], shards [1]/[1], mappings []
[2020-04-22T21:44:54,025][INFO ][o.e.c.m.MetadataMappingService] [runTask-0] [myindex/2_YotTEPRuiX9gtuXPKqPw] create_mapping```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment