From c16a5ad68c4f8967a98782cae76e88ae3019b261 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Fri, 16 Jan 2026 14:48:08 -0300 Subject: [PATCH] Add BuilderDecorator for handling CloudEventBuilder Signed-off-by: Matheus Cruz --- .../impl/events/BuilderDecorator.java | 32 +++++++++++++++++++ .../impl/executors/EmitExecutor.java | 17 ++++++++++ 2 files changed, 49 insertions(+) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/events/BuilderDecorator.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/BuilderDecorator.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/BuilderDecorator.java new file mode 100644 index 000000000..3e7bc998b --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/BuilderDecorator.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.serverlessworkflow.impl.ServicePriority; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; + +public interface BuilderDecorator extends ServicePriority { + + void decorate( + T builder, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel workflowModel); + + boolean accept(Class clazz); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java index 0dad582c7..b0336a570 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java @@ -32,19 +32,23 @@ import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.events.BuilderDecorator; import io.serverlessworkflow.impl.events.CloudEventUtils; import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import java.net.URI; import java.time.OffsetDateTime; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.ServiceLoader; import java.util.concurrent.CompletableFuture; public class EmitExecutor extends RegularTaskExecutor { private final EventPropertiesBuilder props; + private final List decorators; public static class EmitExecutorBuilder extends RegularTaskExecutorBuilder { @@ -66,6 +70,11 @@ public EmitExecutor buildInstance() { private EmitExecutor(EmitExecutorBuilder builder) { super(builder); this.props = builder.eventBuilder; + this.decorators = + ServiceLoader.load(BuilderDecorator.class).stream() + .map(ServiceLoader.Provider::get) + .sorted() + .toList(); } @Override @@ -73,6 +82,7 @@ protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { Collection eventPublishers = workflow.definition().application().eventPublishers(); + CloudEvent ce = buildCloudEvent(workflow, taskContext); return CompletableFuture.allOf( eventPublishers.stream() @@ -83,6 +93,13 @@ protected CompletableFuture internalExecute( private CloudEvent buildCloudEvent(WorkflowContext workflow, TaskContext taskContext) { io.cloudevents.core.v1.CloudEventBuilder ceBuilder = CloudEventBuilder.v1(); + + for (BuilderDecorator decorator : decorators) { + if (decorator.accept(io.cloudevents.core.v1.CloudEventBuilder.class)) { + decorator.decorate(ceBuilder, workflow, taskContext, taskContext.input()); + } + } + ceBuilder.withId( props .idFilter()