diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java new file mode 100644 index 0000000..c4a02cb --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -0,0 +1,359 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.eventsource.ConnectStrategy; +import com.launchdarkly.eventsource.ErrorStrategy; +import com.launchdarkly.eventsource.EventSource; +import com.launchdarkly.eventsource.FaultEvent; +import com.launchdarkly.eventsource.HttpConnectStrategy; +import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.StreamClosedByCallerException; +import com.launchdarkly.eventsource.StreamEvent; +import com.launchdarkly.eventsource.StreamException; +import com.launchdarkly.eventsource.StreamHttpErrorException; +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.logging.LogValues; +import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.internal.http.HttpHelpers; +import com.launchdarkly.sdk.internal.http.HttpProperties; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.SelectorSource; +import com.launchdarkly.sdk.server.datasources.Synchronizer; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; +import com.launchdarkly.sdk.server.subsystems.SerializationException; +import com.google.gson.stream.JsonReader; +import okhttp3.Headers; + +import java.io.Reader; +import java.net.URI; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.launchdarkly.sdk.internal.http.HttpErrors.checkIfErrorIsRecoverableAndLog; + +/** + * Implementation of FDv2 streaming synchronizer. + * Maintains a long-running streaming connection and queues results as they arrive. + */ +class StreamingSynchronizerImpl implements Synchronizer { + private static final Duration DEAD_CONNECTION_INTERVAL = Duration.ofSeconds(300); + + private final HttpProperties httpProperties; + private final SelectorSource selectorSource; + final URI streamUri; + private final LDLogger logger; + private final IterableAsyncQueue resultQueue = new IterableAsyncQueue<>(); + private final CompletableFuture shutdownFuture = new CompletableFuture<>(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final FDv2ProtocolHandler protocolHandler = new FDv2ProtocolHandler(); + private volatile EventSource eventSource; + private volatile Thread streamThread; + + public StreamingSynchronizerImpl( + HttpProperties httpProperties, + URI baseUri, + String requestPath, + LDLogger logger, + SelectorSource selectorSource + ) { + this.httpProperties = httpProperties; + this.selectorSource = selectorSource; + this.logger = logger; + this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); + + startStream(); + } + + private void startStream() { + Headers headers = httpProperties.toHeadersBuilder() + .add("Accept", "text/event-stream") + .build(); + + HttpConnectStrategy connectStrategy = ConnectStrategy.http(streamUri) + .headers(headers) + .clientBuilderActions(clientBuilder -> { + httpProperties.applyToHttpClientBuilder(clientBuilder); + // Add interceptor to inject selector query parameters on each request + clientBuilder.addInterceptor(chain -> { + okhttp3.Request originalRequest = chain.request(); + Selector selector = selectorSource.getSelector(); + + if (selector.isEmpty()) { + return chain.proceed(originalRequest); + } + + // Build new URL with selector query parameters + URI currentUri = originalRequest.url().uri(); + URI updatedUri = HttpHelpers.addQueryParam(currentUri, "version", String.valueOf(selector.getVersion())); + if (selector.getState() != null && !selector.getState().isEmpty()) { + updatedUri = HttpHelpers.addQueryParam(updatedUri, "state", selector.getState()); + } + + okhttp3.Request newRequest = originalRequest.newBuilder() + .url(updatedUri.toString()) + .build(); + return chain.proceed(newRequest); + }); + }) + .readTimeout(DEAD_CONNECTION_INTERVAL.toMillis(), TimeUnit.MILLISECONDS); + + EventSource.Builder builder = new EventSource.Builder(connectStrategy) + .errorStrategy(ErrorStrategy.alwaysContinue()) + .logger(logger) + .readBufferSize(5000) + .streamEventData(true) + .expectFields("event"); + + eventSource = builder.build(); + + streamThread = new Thread(() -> { + try { + for (StreamEvent event : eventSource.anyEvents()) { + if (shutdownRequested.get()) { + break; + } + + if (!handleEvent(event)) { + break; + } + } + } catch (Exception e) { + if (shutdownRequested.get()) { + return; + } + logger.error("Stream thread ended with exception: {}", LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); + + DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + e.toString(), + Instant.now() + ); + resultQueue.put(FDv2SourceResult.terminalError(errorInfo)); + } finally { + try { + if (eventSource != null) { + eventSource.close(); + } + } catch (Exception e) { + logger.debug("Error closing event source: {}", LogValues.exceptionSummary(e)); + } + } + }); + streamThread.setName("LaunchDarkly-FDv2-streaming-synchronizer"); + // TODO: Implement thread priority. + //streamThread.setPriority(); + streamThread.setDaemon(true); + streamThread.start(); + } + + @Override + public CompletableFuture next() { + return CompletableFuture.anyOf(shutdownFuture, resultQueue.take()) + .thenApply(result -> (FDv2SourceResult) result); + } + + @Override + public void shutdown() { + if (shutdownRequested.getAndSet(true)) { + return; // already shutdown + } + + shutdownFuture.complete(FDv2SourceResult.shutdown()); + + if (eventSource != null) { + try { + eventSource.close(); + } catch (Exception e) { + logger.debug("Error closing event source during shutdown: {}", LogValues.exceptionSummary(e)); + } + } + } + + private boolean handleEvent(StreamEvent event) { + if (event instanceof MessageEvent) { + handleMessage((MessageEvent) event); + return true; + } else if (event instanceof FaultEvent) { + return handleError(((FaultEvent) event).getCause()); + } + return true; + } + + private void handleMessage(MessageEvent event) { + String eventName; + try { + eventName = event.getEventName(); + FDv2Event fdv2Event = parseFDv2Event(eventName, event.getDataReader()); + + FDv2ProtocolHandler.IFDv2ProtocolAction action; + try { + action = protocolHandler.handleEvent(fdv2Event); + } catch (Exception e) { + // Protocol handler threw exception processing the event - treat as invalid data + logger.error("FDv2 protocol handler error: {}", LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); + DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.INVALID_DATA, + 0, + e.toString(), + Instant.now() + ); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + if (eventSource != null) { + eventSource.interrupt(); // restart the stream + } + return; + } + + FDv2SourceResult result = null; + boolean shouldTerminate = false; + + switch (action.getAction()) { + case CHANGESET: + FDv2ProtocolHandler.FDv2ActionChangeset changeset = (FDv2ProtocolHandler.FDv2ActionChangeset) action; + try { + // TODO: Environment ID. + DataStoreTypes.ChangeSet converted = + FDv2ChangeSetTranslator.toChangeSet(changeset.getChangeset(), logger, null); + result = FDv2SourceResult.changeSet(converted); + } catch (Exception e) { + logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); + DataSourceStatusProvider.ErrorInfo conversionError = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.INVALID_DATA, + 0, + e.toString(), + Instant.now() + ); + result = FDv2SourceResult.interrupted(conversionError); + if (eventSource != null) { + eventSource.interrupt(); // restart the stream + } + } + break; + + case ERROR: + // In the case of an error, the protocol handler discards the result and we remain connected. + // We log the error to help with debugging. + FDv2ProtocolHandler.FDv2ActionError error = (FDv2ProtocolHandler.FDv2ActionError) action; + logger.error("Received error from server: {} - {}", error.getId(), error.getReason()); + break; + + case GOODBYE: + FDv2ProtocolHandler.FDv2ActionGoodbye goodbye = (FDv2ProtocolHandler.FDv2ActionGoodbye) action; + result = FDv2SourceResult.goodbye(goodbye.getReason()); + shouldTerminate = true; + break; + + case INTERNAL_ERROR: + DataSourceStatusProvider.ErrorInfo internalError = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.INVALID_DATA, + 0, + "Internal error during FDv2 event processing", + Instant.now() + ); + result = FDv2SourceResult.interrupted(internalError); + if (eventSource != null) { + eventSource.interrupt(); // restart the stream + } + break; + + case NONE: + // Continue processing events, don't queue anything + break; + } + + if (result != null) { + resultQueue.put(result); + } + + if (shouldTerminate) { + if (eventSource != null) { + eventSource.close(); + } + } + } catch (SerializationException e) { + logger.error("Failed to parse FDv2 event: {}", LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); + DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.INVALID_DATA, + 0, + e.toString(), + Instant.now() + ); + // Queue as INTERRUPTED, not TERMINAL_ERROR, so we can continue processing other events + resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + if (eventSource != null) { + eventSource.interrupt(); // restart the stream + } + } catch (Exception e) { + logger.error("Unexpected error handling stream message: {}", LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); + DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + e.toString(), + Instant.now() + ); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + if (eventSource != null) { + eventSource.interrupt(); // restart the stream + } + } + } + + private boolean handleError(StreamException e) { + if (e instanceof StreamClosedByCallerException) { + // We closed it ourselves (shutdown was called) + return false; + } + + if (e instanceof StreamHttpErrorException) { + int status = ((StreamHttpErrorException) e).getCode(); + DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(status); + + boolean recoverable = checkIfErrorIsRecoverableAndLog(logger, + "HTTP error " + status, + "in FDv2 streaming connection", + status, + "will retry"); + + if (!recoverable) { + resultQueue.put(FDv2SourceResult.terminalError(errorInfo)); + return false; + } else { + // Queue as INTERRUPTED to indicate temporary failure + resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + return true; // allow reconnect + } + } + + // Network or other error - queue as INTERRUPTED and allow reconnect + logger.warn("Stream error: {}", LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); + DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, + 0, + e.toString(), + Instant.now() + ); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + return true; // allow reconnect + } + + private FDv2Event parseFDv2Event(String eventName, Reader eventDataReader) throws SerializationException { + try { + JsonReader reader = new JsonReader(eventDataReader); + return new FDv2Event(eventName, com.launchdarkly.sdk.internal.GsonHelpers.gsonInstance().fromJson(reader, com.google.gson.JsonElement.class)); + } catch (Exception e) { + throw new SerializationException(e); + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java index 85a5238..c92affa 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java @@ -4,7 +4,5 @@ abstract class Version { private Version() {} // This constant is updated automatically by our Gradle script during a release, if the project version has changed - // x-release-please-start-version static final String SDK_VERSION = "7.10.2"; - // x-release-please-end } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java index bf44a30..be3f0ed 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java @@ -1,6 +1,5 @@ package com.launchdarkly.sdk.server.datasources; -import java.io.Closeable; import java.util.concurrent.CompletableFuture; // Mermaid source for state diagram. diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java new file mode 100644 index 0000000..c5c2247 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java @@ -0,0 +1,442 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.internal.http.HttpProperties; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.SelectorSource; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.testhelpers.httptest.Handlers; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestInfo; +import org.junit.Test; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static com.launchdarkly.sdk.server.ComponentsImpl.toHttpProperties; +import static com.launchdarkly.sdk.server.TestComponents.clientContext; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("javadoc") +public class StreamingSynchronizerImplTest extends BaseTest { + + private SelectorSource mockSelectorSource() { + SelectorSource source = mock(SelectorSource.class); + when(source.getSelector()).thenReturn(Selector.EMPTY); + return source; + } + + private static String makeEvent(String type, String data) { + return "event: " + type + "\ndata: " + data; + } + + @Test + public void receivesMultipleChangesets() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred1 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + String putObject1 = makeEvent("put-object", "{\"kind\":\"flag\",\"key\":\"flag1\",\"version\":1,\"object\":{}}"); + String payloadTransferred2 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:101)\",\"version\":101}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred1), + Handlers.SSE.event(putObject1), + Handlers.SSE.event(payloadTransferred2), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + // First changeset + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + + assertNotNull(result1); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result1.getResultType()); + assertNotNull(result1.getChangeSet()); + + // Second changeset + CompletableFuture result2Future = synchronizer.next(); + FDv2SourceResult result2 = result2Future.get(5, TimeUnit.SECONDS); + + assertNotNull(result2); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result2.getResultType()); + assertNotNull(result2.getChangeSet()); + + synchronizer.shutdown(); + } + } + + @Test + public void httpNonRecoverableError() throws Exception { + try (HttpServer server = HttpServer.start(Handlers.status(401))) { + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind()); + + synchronizer.shutdown(); + } + } + + @Test + public void httpRecoverableError() throws Exception { + try (HttpServer server = HttpServer.start(Handlers.status(503))) { + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind()); + + synchronizer.shutdown(); + } + } + + @Test + public void networkError() throws Exception { + // Use an invalid port to simulate network error + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + URI.create("http://localhost:1"), // invalid port + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, result.getStatus().getErrorInfo().getKind()); + + synchronizer.shutdown(); + } + + @Test + public void invalidEventData() throws Exception { + String badEvent = makeEvent("server-intent", "invalid json"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(badEvent), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.INVALID_DATA, result.getStatus().getErrorInfo().getKind()); + + synchronizer.shutdown(); + } + } + + @Test + public void shutdownBeforeEventReceived() throws Exception { + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.hang()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture nextFuture = synchronizer.next(); + + // Wait a bit then shutdown + Thread.sleep(100); + synchronizer.shutdown(); + + FDv2SourceResult result = nextFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + assertNull(result.getStatus().getErrorInfo()); + } + } + + @Test + public void shutdownAfterEventReceived() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + + // Shutdown after receiving event should still work + synchronizer.shutdown(); + } + } + + @Test + public void goodbyeEventInResponse() throws Exception { + String goodbyeEvent = makeEvent("goodbye", "{\"reason\":\"service-unavailable\"}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(goodbyeEvent), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.GOODBYE, result.getStatus().getState()); + + synchronizer.shutdown(); + } + } + + @Test + public void heartbeatEvent() throws Exception { + String heartbeatEvent = makeEvent("heartbeat", "{}"); + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(heartbeatEvent), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + // Heartbeat should be ignored, and we should get the changeset + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertNotNull(result.getChangeSet()); + + synchronizer.shutdown(); + } + } + + @Test + public void selectorWithVersionAndState() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + + SelectorSource selectorSource = mock(SelectorSource.class); + when(selectorSource.getSelector()).thenReturn(Selector.make(50, "(p:old:50)")); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + + // Verify selector was fetched when connecting + verify(selectorSource, atLeastOnce()).getSelector(); + + // Verify the request had the correct query parameters + assertEquals(1, server.getRecorder().count()); + RequestInfo request = server.getRecorder().requireRequest(); + assertThat(request.getQuery(), containsString("version=50")); + assertThat(request.getQuery(), containsString("state=")); + + synchronizer.shutdown(); + } + } + + @Test + public void selectorRefetchedOnReconnection() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + // Test reconnection with a 503 error followed by successful connection + // Add multiple successful handlers in case EventSource reconnects multiple times + try (HttpServer server = HttpServer.start(Handlers.sequential( + Handlers.status(503), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen())))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + + SelectorSource selectorSource = mock(SelectorSource.class); + when(selectorSource.getSelector()) + .thenReturn(Selector.make(50, "(p:old:50)")) + .thenReturn(Selector.make(100, "(p:new:100)")); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource + ); + + // First result should be an error from the 503 + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + assertNotNull(result1); + assertEquals(FDv2SourceResult.ResultType.STATUS, result1.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result1.getStatus().getState()); + + // Keep getting results until we get a CHANGE_SET (reconnection successful) + // There may be multiple STATUS results if reconnection takes multiple attempts + FDv2SourceResult changesetResult = null; + for (int i = 0; i < 5; i++) { // Try up to 5 times + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(15, TimeUnit.SECONDS); + assertNotNull(result); + if (result.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { + changesetResult = result; + break; + } + // If it's another STATUS, that's fine, just keep waiting for the changeset + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + } + + assertNotNull("Should eventually get a CHANGE_SET after reconnection", changesetResult); + + // Verify selector was fetched at least twice (initial failed connect + successful reconnect) + verify(selectorSource, atLeast(2)).getSelector(); + + // Verify we made at least 2 requests + assertTrue("Should have made at least 2 requests", server.getRecorder().count() >= 2); + + synchronizer.shutdown(); + } + } +}