Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,359 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.eventsource.ErrorStrategy;
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.FaultEvent;
import com.launchdarkly.eventsource.HttpConnectStrategy;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.StreamClosedByCallerException;
import com.launchdarkly.eventsource.StreamEvent;
import com.launchdarkly.eventsource.StreamException;
import com.launchdarkly.eventsource.StreamHttpErrorException;
import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.logging.LogValues;
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.internal.http.HttpHelpers;
import com.launchdarkly.sdk.internal.http.HttpProperties;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.SelectorSource;
import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
import com.launchdarkly.sdk.server.subsystems.SerializationException;
import com.google.gson.stream.JsonReader;
import okhttp3.Headers;

import java.io.Reader;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.launchdarkly.sdk.internal.http.HttpErrors.checkIfErrorIsRecoverableAndLog;

/**
* Implementation of FDv2 streaming synchronizer.
* Maintains a long-running streaming connection and queues results as they arrive.
*/
class StreamingSynchronizerImpl implements Synchronizer {
private static final Duration DEAD_CONNECTION_INTERVAL = Duration.ofSeconds(300);

private final HttpProperties httpProperties;
private final SelectorSource selectorSource;
final URI streamUri;
private final LDLogger logger;
private final IterableAsyncQueue<FDv2SourceResult> resultQueue = new IterableAsyncQueue<>();
private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>();
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final FDv2ProtocolHandler protocolHandler = new FDv2ProtocolHandler();
private volatile EventSource eventSource;
private volatile Thread streamThread;

public StreamingSynchronizerImpl(
HttpProperties httpProperties,
URI baseUri,
String requestPath,
LDLogger logger,
SelectorSource selectorSource
) {
this.httpProperties = httpProperties;
this.selectorSource = selectorSource;
this.logger = logger;
this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);

startStream();
}

private void startStream() {
Headers headers = httpProperties.toHeadersBuilder()
.add("Accept", "text/event-stream")
.build();

HttpConnectStrategy connectStrategy = ConnectStrategy.http(streamUri)
.headers(headers)
.clientBuilderActions(clientBuilder -> {
httpProperties.applyToHttpClientBuilder(clientBuilder);
// Add interceptor to inject selector query parameters on each request
clientBuilder.addInterceptor(chain -> {
okhttp3.Request originalRequest = chain.request();
Selector selector = selectorSource.getSelector();

if (selector.isEmpty()) {
return chain.proceed(originalRequest);
}

// Build new URL with selector query parameters
URI currentUri = originalRequest.url().uri();
URI updatedUri = HttpHelpers.addQueryParam(currentUri, "version", String.valueOf(selector.getVersion()));
if (selector.getState() != null && !selector.getState().isEmpty()) {
updatedUri = HttpHelpers.addQueryParam(updatedUri, "state", selector.getState());
}

okhttp3.Request newRequest = originalRequest.newBuilder()
.url(updatedUri.toString())
.build();
return chain.proceed(newRequest);
});
})
.readTimeout(DEAD_CONNECTION_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);

EventSource.Builder builder = new EventSource.Builder(connectStrategy)
.errorStrategy(ErrorStrategy.alwaysContinue())
.logger(logger)
.readBufferSize(5000)
.streamEventData(true)
.expectFields("event");

eventSource = builder.build();

streamThread = new Thread(() -> {
try {
for (StreamEvent event : eventSource.anyEvents()) {
if (shutdownRequested.get()) {
break;
}

if (!handleEvent(event)) {
break;
}
}
} catch (Exception e) {
if (shutdownRequested.get()) {
return;
}
logger.error("Stream thread ended with exception: {}", LogValues.exceptionSummary(e));
logger.debug(LogValues.exceptionTrace(e));

DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
0,
e.toString(),
Instant.now()
);
resultQueue.put(FDv2SourceResult.terminalError(errorInfo));
} finally {
try {
if (eventSource != null) {
eventSource.close();
}
} catch (Exception e) {
logger.debug("Error closing event source: {}", LogValues.exceptionSummary(e));
}
}
});
streamThread.setName("LaunchDarkly-FDv2-streaming-synchronizer");
// TODO: Implement thread priority.
//streamThread.setPriority();
streamThread.setDaemon(true);
streamThread.start();
}

@Override
public CompletableFuture<FDv2SourceResult> next() {
return CompletableFuture.anyOf(shutdownFuture, resultQueue.take())
.thenApply(result -> (FDv2SourceResult) result);
}

@Override
public void shutdown() {
if (shutdownRequested.getAndSet(true)) {
return; // already shutdown
}

shutdownFuture.complete(FDv2SourceResult.shutdown());

if (eventSource != null) {
try {
eventSource.close();
} catch (Exception e) {
logger.debug("Error closing event source during shutdown: {}", LogValues.exceptionSummary(e));
}
}
}

private boolean handleEvent(StreamEvent event) {
if (event instanceof MessageEvent) {
handleMessage((MessageEvent) event);
return true;
} else if (event instanceof FaultEvent) {
return handleError(((FaultEvent) event).getCause());
}
return true;
}

private void handleMessage(MessageEvent event) {
String eventName;
try {
eventName = event.getEventName();
FDv2Event fdv2Event = parseFDv2Event(eventName, event.getDataReader());

FDv2ProtocolHandler.IFDv2ProtocolAction action;
try {
action = protocolHandler.handleEvent(fdv2Event);
} catch (Exception e) {
// Protocol handler threw exception processing the event - treat as invalid data
logger.error("FDv2 protocol handler error: {}", LogValues.exceptionSummary(e));
logger.debug(LogValues.exceptionTrace(e));
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
0,
e.toString(),
Instant.now()
);
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
if (eventSource != null) {
eventSource.interrupt(); // restart the stream
}
return;
}

FDv2SourceResult result = null;
boolean shouldTerminate = false;

switch (action.getAction()) {
case CHANGESET:
FDv2ProtocolHandler.FDv2ActionChangeset changeset = (FDv2ProtocolHandler.FDv2ActionChangeset) action;
try {
// TODO: Environment ID.
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> converted =
FDv2ChangeSetTranslator.toChangeSet(changeset.getChangeset(), logger, null);
result = FDv2SourceResult.changeSet(converted);
} catch (Exception e) {
logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e));
logger.debug(LogValues.exceptionTrace(e));
DataSourceStatusProvider.ErrorInfo conversionError = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
0,
e.toString(),
Instant.now()
);
result = FDv2SourceResult.interrupted(conversionError);
if (eventSource != null) {
eventSource.interrupt(); // restart the stream
}
}
break;

case ERROR:
// In the case of an error, the protocol handler discards the result and we remain connected.
// We log the error to help with debugging.
FDv2ProtocolHandler.FDv2ActionError error = (FDv2ProtocolHandler.FDv2ActionError) action;
logger.error("Received error from server: {} - {}", error.getId(), error.getReason());
break;

case GOODBYE:
FDv2ProtocolHandler.FDv2ActionGoodbye goodbye = (FDv2ProtocolHandler.FDv2ActionGoodbye) action;
result = FDv2SourceResult.goodbye(goodbye.getReason());
shouldTerminate = true;
break;

case INTERNAL_ERROR:
DataSourceStatusProvider.ErrorInfo internalError = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
0,
"Internal error during FDv2 event processing",
Instant.now()
);
result = FDv2SourceResult.interrupted(internalError);
if (eventSource != null) {
eventSource.interrupt(); // restart the stream
}
break;

case NONE:
// Continue processing events, don't queue anything
break;
}

if (result != null) {
resultQueue.put(result);
}

if (shouldTerminate) {
if (eventSource != null) {
eventSource.close();
}
}
} catch (SerializationException e) {
logger.error("Failed to parse FDv2 event: {}", LogValues.exceptionSummary(e));
logger.debug(LogValues.exceptionTrace(e));
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
0,
e.toString(),
Instant.now()
);
// Queue as INTERRUPTED, not TERMINAL_ERROR, so we can continue processing other events
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
if (eventSource != null) {
eventSource.interrupt(); // restart the stream
}
} catch (Exception e) {
logger.error("Unexpected error handling stream message: {}", LogValues.exceptionSummary(e));
logger.debug(LogValues.exceptionTrace(e));
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
0,
e.toString(),
Instant.now()
);
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
if (eventSource != null) {
eventSource.interrupt(); // restart the stream
}
}
}

private boolean handleError(StreamException e) {
if (e instanceof StreamClosedByCallerException) {
// We closed it ourselves (shutdown was called)
return false;
}

if (e instanceof StreamHttpErrorException) {
int status = ((StreamHttpErrorException) e).getCode();
DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(status);

boolean recoverable = checkIfErrorIsRecoverableAndLog(logger,
"HTTP error " + status,
"in FDv2 streaming connection",
status,
"will retry");

if (!recoverable) {
resultQueue.put(FDv2SourceResult.terminalError(errorInfo));
return false;
} else {
// Queue as INTERRUPTED to indicate temporary failure
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
return true; // allow reconnect
}
}

// Network or other error - queue as INTERRUPTED and allow reconnect
logger.warn("Stream error: {}", LogValues.exceptionSummary(e));
logger.debug(LogValues.exceptionTrace(e));
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.NETWORK_ERROR,
0,
e.toString(),
Instant.now()
);
resultQueue.put(FDv2SourceResult.interrupted(errorInfo));
return true; // allow reconnect
}

private FDv2Event parseFDv2Event(String eventName, Reader eventDataReader) throws SerializationException {
try {
JsonReader reader = new JsonReader(eventDataReader);
return new FDv2Event(eventName, com.launchdarkly.sdk.internal.GsonHelpers.gsonInstance().fromJson(reader, com.google.gson.JsonElement.class));
} catch (Exception e) {
throw new SerializationException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@ abstract class Version {
private Version() {}

// This constant is updated automatically by our Gradle script during a release, if the project version has changed
// x-release-please-start-version
static final String SDK_VERSION = "7.10.2";
// x-release-please-end
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.launchdarkly.sdk.server.datasources;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

// Mermaid source for state diagram.
Expand Down
Loading