diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java new file mode 100644 index 0000000..6cb391d --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java @@ -0,0 +1,167 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.internal.http.HttpHelpers; +import com.launchdarkly.sdk.internal.http.HttpProperties; +import com.launchdarkly.sdk.json.SerializationException; + +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Headers; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of FDv2Requestor for polling feature flag data via FDv2 protocol. + */ +public class DefaultFDv2Requestor implements FDv2Requestor, Closeable { + private static final String VERSION_QUERY_PARAM = "version"; + private static final String STATE_QUERY_PARAM = "state"; + + private final OkHttpClient httpClient; + private final URI pollingUri; + private final Headers headers; + private final LDLogger logger; + private final Map etags; + + /** + * Creates a DefaultFDv2Requestor. + * + * @param httpProperties HTTP configuration properties + * @param baseUri base URI for the FDv2 polling endpoint + * @param requestPath the request path to append to the base URI (e.g., "/sdk/poll") + * @param logger logger for diagnostic output + */ + public DefaultFDv2Requestor(HttpProperties httpProperties, URI baseUri, String requestPath, LDLogger logger) { + this.logger = logger; + this.pollingUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); + this.etags = new HashMap<>(); + + OkHttpClient.Builder httpBuilder = httpProperties.toHttpClientBuilder(); + this.headers = httpProperties.toHeadersBuilder().build(); + this.httpClient = httpBuilder.build(); + } + + @Override + public CompletableFuture Poll(Selector selector) { + CompletableFuture future = new CompletableFuture<>(); + + try { + // Build the request URI with query parameters + URI requestUri = pollingUri; + + if (!selector.isEmpty()) { + requestUri = HttpHelpers.addQueryParam(requestUri, VERSION_QUERY_PARAM, String.valueOf(selector.getVersion())); + } + + if (selector.getState() != null && !selector.getState().isEmpty()) { + requestUri = HttpHelpers.addQueryParam(requestUri, STATE_QUERY_PARAM, selector.getState()); + } + + logger.debug("Making FDv2 polling request to: {}", requestUri); + + // Build the HTTP request + Request.Builder requestBuilder = new Request.Builder() + .url(requestUri.toURL()) + .headers(headers) + .get(); + + // Add ETag if we have one cached for this URI + synchronized (etags) { + String etag = etags.get(requestUri); + if (etag != null) { + requestBuilder.header("If-None-Match", etag); + } + } + + Request request = requestBuilder.build(); + final URI finalRequestUri = requestUri; + + // Make asynchronous HTTP call + httpClient.newCall(request).enqueue(new Callback() { + @Override + public void onFailure(@Nonnull Call call, @Nonnull IOException e) { + if (e instanceof SocketTimeoutException) { + future.completeExceptionally( + new IOException("FDv2 polling request timed out: " + finalRequestUri, e) + ); + } else { + future.completeExceptionally(e); + } + } + + @Override + public void onResponse(@Nonnull Call call, @Nonnull Response response) { + try { + // Handle 304 Not Modified - no new data + if (response.code() == 304) { + logger.debug("FDv2 polling request returned 304: not modified"); + future.complete(null); + return; + } + + if (!response.isSuccessful()) { + future.completeExceptionally( + new IOException("FDv2 polling request failed with status code: " + response.code()) + ); + return; + } + + // Update ETag cache + String newEtag = response.header("ETag"); + synchronized (etags) { + if (newEtag != null) { + etags.put(finalRequestUri, newEtag); + } else { + etags.remove(finalRequestUri); + } + } + + // The documentation indicates that the body will not be null for a response passed to the + // onResponse callback. + String responseBody = Objects.requireNonNull(response.body()).string(); + logger.debug("Received FDv2 polling response"); + + List events = FDv2Event.parseEventsArray(responseBody); + + // Create and return the response + FDv2PayloadResponse pollingResponse = new FDv2PayloadResponse(events, response.headers()); + future.complete(pollingResponse); + + } catch (IOException | SerializationException e) { + future.completeExceptionally(e); + } finally { + response.close(); + } + } + }); + + } catch (Exception e) { + future.completeExceptionally(e); + } + + return future; + } + + /** + * Closes the HTTP client and releases resources. + */ + public void close() { + HttpProperties.shutdownHttpClient(httpClient); + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2ChangeSetTranslator.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2ChangeSetTranslator.java new file mode 100644 index 0000000..e3ff539 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2ChangeSetTranslator.java @@ -0,0 +1,125 @@ +package com.launchdarkly.sdk.server; + +import com.google.common.collect.ImmutableList; +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2Change; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2ChangeType; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Translates FDv2 changesets into data store formats. + */ +final class FDv2ChangeSetTranslator { + private FDv2ChangeSetTranslator() { + } + + /** + * Converts an FDv2ChangeSet to a DataStoreTypes.ChangeSet. + * + * @param changeset the FDv2 changeset to convert + * @param logger logger for diagnostic messages + * @param environmentId the environment ID to include in the changeset (may be null) + * @return a DataStoreTypes.ChangeSet containing the converted data + * @throws IllegalArgumentException if the changeset type is unknown + */ + public static DataStoreTypes.ChangeSet toChangeSet( + FDv2ChangeSet changeset, + LDLogger logger, + String environmentId) { + ChangeSetType changeSetType; + switch (changeset.getType()) { + case FULL: + changeSetType = ChangeSetType.Full; + break; + case PARTIAL: + changeSetType = ChangeSetType.Partial; + break; + case NONE: + changeSetType = ChangeSetType.None; + break; + default: + throw new IllegalArgumentException( + "Unknown FDv2ChangeSetType: " + changeset.getType() + ". This is an implementation error."); + } + + // Use a LinkedHashMap to group items by DataKind in a single pass while preserving order + Map>> kindToItems = new LinkedHashMap<>(); + + for (FDv2Change change : changeset.getChanges()) { + DataKind dataKind = getDataKind(change.getKind()); + + if (dataKind == null) { + logger.warn("Unknown data kind '{}' in changeset, skipping", change.getKind()); + continue; + } + + ItemDescriptor item; + + if (change.getType() == FDv2ChangeType.PUT) { + if (change.getObject() == null) { + logger.warn( + "Put operation for {}/{} missing object data, skipping", + change.getKind(), + change.getKey()); + continue; + } + item = dataKind.deserialize(change.getObject().toString()); + } else if (change.getType() == FDv2ChangeType.DELETE) { + item = ItemDescriptor.deletedItem(change.getVersion()); + } else { + throw new IllegalArgumentException( + "Unknown FDv2ChangeType: " + change.getType() + ". This is an implementation error."); + } + + List> itemsList = + kindToItems.computeIfAbsent(dataKind, k -> new ArrayList<>()); + + itemsList.add(new AbstractMap.SimpleImmutableEntry<>(change.getKey(), item)); + } + + ImmutableList.Builder>> dataBuilder = + ImmutableList.builder(); + + for (Map.Entry>> entry : kindToItems.entrySet()) { + dataBuilder.add( + new AbstractMap.SimpleImmutableEntry<>( + entry.getKey(), + new KeyedItems<>(entry.getValue()) + )); + } + + return new DataStoreTypes.ChangeSet<>( + changeSetType, + changeset.getSelector(), + dataBuilder.build(), + environmentId); + } + + /** + * Maps an FDv2 object kind to the corresponding DataKind. + * + * @param kind the kind string from the FDv2 change + * @return the corresponding DataKind, or null if the kind is not recognized + */ + private static DataKind getDataKind(String kind) { + switch (kind) { + case "flag": + return DataModel.FEATURES; + case "segment": + return DataModel.SEGMENTS; + default: + return null; + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java new file mode 100644 index 0000000..2e124a4 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -0,0 +1,255 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.DataSourceShutdown; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Initializer; +import com.launchdarkly.sdk.server.datasources.Synchronizer; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataSource; +import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +class FDv2DataSource implements DataSource { + private final List initializers; + private final List synchronizers; + + private final DataSourceUpdateSink dataSourceUpdates; + + private final CompletableFuture startFuture = new CompletableFuture<>(); + private final AtomicBoolean started = new AtomicBoolean(false); + + /** + * Lock for active sources and shutdown state. + */ + private final Object activeSourceLock = new Object(); + private DataSourceShutdown activeSource; + private boolean isShutdown = false; + + private static class SynchronizerFactoryWithState { + public enum State { + /** + * This synchronizer is available to use. + */ + Available, + + /** + * This synchronizer is no longer available to use. + */ + Blocked, + + /** + * This synchronizer is recovering from a previous failure and will be available to use + * after a delay. + */ + Recovering + } + + private final SynchronizerFactory factory; + + private State state = State.Available; + + + public SynchronizerFactoryWithState(SynchronizerFactory factory) { + this.factory = factory; + } + + public State getState() { + return state; + } + + public void block() { + state = State.Blocked; + } + + public void setRecovering(Duration delay) { + state = State.Recovering; + // TODO: Determine how/when to recover. + } + + public Synchronizer build() { + return factory.build(); + } + } + + public interface InitializerFactory { + Initializer build(); + } + + public interface SynchronizerFactory { + Synchronizer build(); + } + + + public FDv2DataSource( + List initializers, + List synchronizers, + DataSourceUpdateSink dataSourceUpdates + ) { + this.initializers = initializers; + this.synchronizers = synchronizers + .stream() + .map(SynchronizerFactoryWithState::new) + .collect(Collectors.toList()); + this.dataSourceUpdates = dataSourceUpdates; + } + + private void run() { + Thread runThread = new Thread(() -> { + if (!initializers.isEmpty()) { + runInitializers(); + } + runSynchronizers(); + // TODO: Handle. We have ran out of sources or we are shutting down. + }); + runThread.setDaemon(true); + // TODO: Thread priority. + //thread.setPriority(threadPriority); + runThread.start(); + } + + private SynchronizerFactoryWithState getFirstAvailableSynchronizer() { + synchronized (synchronizers) { + for (SynchronizerFactoryWithState synchronizer : synchronizers) { + if (synchronizer.getState() == SynchronizerFactoryWithState.State.Available) { + return synchronizer; + } + } + + return null; + } + } + + private void runSynchronizers() { + SynchronizerFactoryWithState availableSynchronizer = getFirstAvailableSynchronizer(); + // TODO: Add recovery handling. If there are no available synchronizers, but there are + // recovering ones, then we likely will want to wait for them to be available (or bypass recovery). + while (availableSynchronizer != null) { + Synchronizer synchronizer = availableSynchronizer.build(); + synchronized (activeSourceLock) { + if (isShutdown) { + return; + } + activeSource = synchronizer; + } + try { + boolean running = true; + while (running) { + FDv2SourceResult result = synchronizer.next().get(); + switch (result.getResultType()) { + case CHANGE_SET: + // TODO: Apply to the store. + // This could have been completed by any data source. But if it has not been completed before + // now, then we complete it. + startFuture.complete(true); + break; + case STATUS: + FDv2SourceResult.Status status = result.getStatus(); + switch (status.getState()) { + case INTERRUPTED: + // TODO: Track how long we are interrupted. + break; + case SHUTDOWN: + // We should be overall shutting down. + // TODO: We may need logging or to do a little more. + return; + case TERMINAL_ERROR: + case GOODBYE: + running = false; + break; + } + break; + } + } + } catch (ExecutionException | InterruptedException | CancellationException e) { + // TODO: Log. + // Move to next synchronizer. + } + availableSynchronizer = getFirstAvailableSynchronizer(); + } + } + + private void runInitializers() { + boolean anyDataReceived = false; + for (InitializerFactory factory : initializers) { + try { + Initializer initializer = factory.build(); + synchronized (activeSourceLock) { + if (isShutdown) { + return; + } + activeSource = initializer; + } + FDv2SourceResult res = initializer.run().get(); + switch (res.getResultType()) { + case CHANGE_SET: + // TODO: Apply to the store. + anyDataReceived = true; + if (!res.getChangeSet().getSelector().isEmpty()) { + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); + startFuture.complete(true); + return; + } + return; + case STATUS: + // TODO: Implement. + break; + } + } catch (ExecutionException | InterruptedException | CancellationException e) { + // TODO: Log. + } + } + // We received data without a selector, and we have exhausted initializers, so we are going to + // consider ourselves initialized. + if (anyDataReceived) { + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); + startFuture.complete(true); + } + } + + @Override + public Future start() { + if (!started.getAndSet(true)) { + run(); + } + return startFuture.thenApply(x -> null); + } + + @Override + public boolean isInitialized() { + try { + return startFuture.isDone() && startFuture.get(); + } catch (Exception e) { + return false; + } + } + + @Override + public void close() throws IOException { + // If this is already set, then this has no impact. + startFuture.complete(false); + synchronized (synchronizers) { + for (SynchronizerFactoryWithState synchronizer : synchronizers) { + synchronizer.block(); + } + } + // If there is an active source, we will shut it down, and that will result in the loop handling that source + // exiting. + // If we do not have an active source, then the loop will check isShutdown when attempting to set one. When + // it detects shutdown it will exit the loop. + synchronized (activeSourceLock) { + isShutdown = true; + if (activeSource != null) { + activeSource.shutdown(); + } + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2Requestor.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2Requestor.java new file mode 100644 index 0000000..8a2297e --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2Requestor.java @@ -0,0 +1,41 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import okhttp3.Headers; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * This type is currently experimental and not subject to semantic versioning. + *

+ * Interface for making FDv2 polling requests. + */ +interface FDv2Requestor { + /** + * Response for a set of FDv2 events that result in a payload. Either a full payload or the events required + * to get from one payload version to another. + * This isn't intended for use for implementations which may require multiple executions to get an entire payload. + */ + public static class FDv2PayloadResponse { + private final List events; + private final Headers headers; + + public FDv2PayloadResponse(List events, Headers headers) { + this.events = events; + this.headers = headers; + } + + public List getEvents() { + return events; + } + + public Headers getHeaders() { + return headers; + } + } + CompletableFuture Poll(Selector selector); + + void close(); +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/IterableAsyncQueue.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/IterableAsyncQueue.java new file mode 100644 index 0000000..2212354 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/IterableAsyncQueue.java @@ -0,0 +1,32 @@ +package com.launchdarkly.sdk.server; + +import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; + +class IterableAsyncQueue { + private final Object lock = new Object(); + private final LinkedList queue = new LinkedList<>(); + + private CompletableFuture nextFuture = null; + + public void put(T item) { + synchronized (lock) { + if(nextFuture != null) { + nextFuture.complete(item); + nextFuture = null; + } + queue.addLast(item); + } + } + public CompletableFuture take() { + synchronized (lock) { + if(!queue.isEmpty()) { + return CompletableFuture.completedFuture(queue.removeFirst()); + } + if (nextFuture == null) { + nextFuture = new CompletableFuture<>(); + } + return nextFuture; + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java new file mode 100644 index 0000000..f871791 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java @@ -0,0 +1,132 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.internal.http.HttpErrors; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; +import com.launchdarkly.sdk.server.subsystems.SerializationException; + +import java.io.IOException; +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +import static com.launchdarkly.sdk.internal.http.HttpErrors.*; + +class PollingBase { + private final FDv2Requestor requestor; + private final LDLogger logger; + + public PollingBase(FDv2Requestor requestor, LDLogger logger) { + this.requestor = requestor; + this.logger = logger; + } + + protected void internalShutdown() { + requestor.close(); + } + + protected CompletableFuture poll(Selector selector, boolean oneShot) { + return requestor.Poll(selector).handle(((pollingResponse, ex) -> { + if (ex != null) { + if (ex instanceof HttpErrors.HttpErrorException) { + HttpErrors.HttpErrorException e = (HttpErrors.HttpErrorException) ex; + DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(e.getStatus()); + boolean recoverable = e.getStatus() > 0 && !isHttpErrorRecoverable(e.getStatus()); + logger.error("Polling request failed with HTTP error: {}", e.getStatus()); + // For a one-shot request all errors are terminal. + if (oneShot) { + return FDv2SourceResult.terminalError(errorInfo); + } else { + return recoverable ? FDv2SourceResult.interrupted(errorInfo) : FDv2SourceResult.terminalError(errorInfo); + } + } else if (ex instanceof IOException) { + IOException e = (IOException) ex; + logger.error("Polling request failed with network error: {}", e.toString()); + DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, + 0, + e.toString(), + new Date().toInstant() + ); + return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + } else if (ex instanceof SerializationException) { + SerializationException e = (SerializationException) ex; + logger.error("Polling request received malformed data: {}", e.toString()); + DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.INVALID_DATA, + 0, + e.toString(), + new Date().toInstant() + ); + return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + } + Exception e = (Exception) ex; + logger.error("Polling request failed with an unknown error: {}", e.toString()); + DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + e.toString(), + new Date().toInstant() + ); + return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + } + FDv2ProtocolHandler handler = new FDv2ProtocolHandler(); + for (FDv2Event event : pollingResponse.getEvents()) { + FDv2ProtocolHandler.IFDv2ProtocolAction res = handler.handleEvent(event); + switch (res.getAction()) { + case CHANGESET: + try { + + DataStoreTypes.ChangeSet converted = FDv2ChangeSetTranslator.toChangeSet( + ((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset(), + logger, + // TODO: Implement environment ID support. + null + ); + return FDv2SourceResult.changeSet(converted); + } catch (Exception e) { + // TODO: Do we need to be more specific about the exception type here? + DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.INVALID_DATA, + 0, + e.toString(), + new Date().toInstant() + ); + return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + } + case ERROR: { + FDv2ProtocolHandler.FDv2ActionError error = ((FDv2ProtocolHandler.FDv2ActionError) res); + DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + error.getReason(), + new Date().toInstant()); + return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + } + case GOODBYE: + return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason()); + case NONE: + break; + case INTERNAL_ERROR: { + DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + "Internal error occurred during polling", + new Date().toInstant()); + return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + } + } + } + return FDv2SourceResult.terminalError(new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.UNKNOWN, + 0, + "Unexpected end of polling response", + new Date().toInstant() + )); + })); + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java new file mode 100644 index 0000000..3040aee --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java @@ -0,0 +1,32 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Initializer; +import com.launchdarkly.sdk.server.datasources.SelectorSource; + +import java.util.concurrent.CompletableFuture; + +class PollingInitializerImpl extends PollingBase implements Initializer { + private final CompletableFuture shutdownFuture = new CompletableFuture<>(); + private final SelectorSource selectorSource; + + public PollingInitializerImpl(FDv2Requestor requestor, LDLogger logger, SelectorSource selectorSource) { + super(requestor, logger); + this.selectorSource = selectorSource; + } + + @Override + public CompletableFuture run() { + CompletableFuture pollResult = poll(selectorSource.getSelector(), true); + return CompletableFuture.anyOf(shutdownFuture, pollResult) + .thenApply(result -> (FDv2SourceResult) result); + } + + @Override + public void shutdown() { + shutdownFuture.complete(FDv2SourceResult.shutdown()); + internalShutdown(); + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java new file mode 100644 index 0000000..e1b0eae --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java @@ -0,0 +1,82 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.SelectorSource; +import com.launchdarkly.sdk.server.datasources.Synchronizer; + +import java.time.Duration; +import java.util.concurrent.*; + +class PollingSynchronizerImpl extends PollingBase implements Synchronizer { + private final CompletableFuture shutdownFuture = new CompletableFuture<>(); + private final SelectorSource selectorSource; + + private final ScheduledFuture task; + + private final IterableAsyncQueue resultQueue = new IterableAsyncQueue<>(); + + public PollingSynchronizerImpl( + FDv2Requestor requestor, + LDLogger logger, + SelectorSource selectorSource, + ScheduledExecutorService sharedExecutor, + Duration pollInterval + ) { + super(requestor, logger); + this.selectorSource = selectorSource; + + synchronized (this) { + task = sharedExecutor.scheduleAtFixedRate( + this::doPoll, + 0L, + pollInterval.toMillis(), + TimeUnit.MILLISECONDS); + } + } + + private void doPoll() { + try { + FDv2SourceResult res = poll(selectorSource.getSelector(), false).get(); + switch(res.getResultType()) { + case CHANGE_SET: + break; + case STATUS: + switch(res.getStatus().getState()) { + case INTERRUPTED: + break; + case SHUTDOWN: + // The base poller doesn't emit shutdown, we instead handle it at this level. + // So when shutdown is called, we return shutdown on subsequent calls to next. + break; + case TERMINAL_ERROR: + case GOODBYE: + synchronized (this) { + task.cancel(true); + } + internalShutdown(); + break; + } + break; + } + resultQueue.put(res); + } catch (InterruptedException | ExecutionException e) { + // TODO: Determine if handling is needed. + } + } + + @Override + public CompletableFuture next() { + return CompletableFuture.anyOf(shutdownFuture, resultQueue.take()) + .thenApply(result -> (FDv2SourceResult) result); + } + + @Override + public void shutdown() { + shutdownFuture.complete(FDv2SourceResult.shutdown()); + synchronized (this) { + task.cancel(true); + } + internalShutdown(); + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StandardEndpoints.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StandardEndpoints.java index e29a7e7..16afc99 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StandardEndpoints.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StandardEndpoints.java @@ -22,6 +22,9 @@ private StandardEndpoints() {} public static final String STREAMING_REQUEST_PATH = "/all"; public static final String POLLING_REQUEST_PATH = "/sdk/latest-all"; + public static final String FDV2_POLLING_REQUEST_PATH = "/sdk/poll"; + public static final String FDV2_STREAMING_REQUEST_PATH = "/sdk/stream"; + /** * Internal method to decide which URI a given component should connect to. *

diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/DataSourceShutdown.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/DataSourceShutdown.java new file mode 100644 index 0000000..63829b1 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/DataSourceShutdown.java @@ -0,0 +1,14 @@ +package com.launchdarkly.sdk.server.datasources; + +/** + * This type is currently experimental and not subject to semantic versioning. + *

+ * Interface used to shut down a data source. + */ +public interface DataSourceShutdown { + /** + * Shutdown the data source. The data source should emit a status event with a SHUTDOWN state as soon as possible. + * If the data source has already completed, or is in the process of completing, this method should have no effect. + */ + void shutdown(); +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java new file mode 100644 index 0000000..3f7ad16 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java @@ -0,0 +1,109 @@ +package com.launchdarkly.sdk.server.datasources; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; + +/** + * This type is currently experimental and not subject to semantic versioning. + *

+ * The result type for FDv2 initializers and synchronizers. An FDv2 initializer produces a single result, while + * an FDv2 synchronizer produces a stream of results. + */ +public class FDv2SourceResult { + public enum State { + /** + * The data source has encountered an interruption and will attempt to reconnect. This isn't intended to be used + * with an initializer, and instead TERMINAL_ERROR should be used. When this status is used with an initializer + * it will still be a terminal state. + */ + INTERRUPTED, + /** + * The data source has been shut down and will not produce any further results. + */ + SHUTDOWN, + /** + * The data source has encountered a terminal error and will not produce any further results. + */ + TERMINAL_ERROR, + /** + * The data source has been instructed to disconnect and will not produce any further results. + */ + GOODBYE, + } + + public enum ResultType { + /** + * The source has emitted a change set. This implies that the source is valid. + */ + CHANGE_SET, + /** + * The source is emitting a status which indicates a transition from being valid to being in some kind + * of error state. The source will emit a CHANGE_SET if it becomes valid again. + */ + STATUS, + } + + /** + * Represents a change in the status of the source. + */ + public static class Status { + private final State state; + private final DataSourceStatusProvider.ErrorInfo errorInfo; + + public State getState() { + return state; + } + + public DataSourceStatusProvider.ErrorInfo getErrorInfo() { + return errorInfo; + } + + public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) { + this.state = state; + this.errorInfo = errorInfo; + } + } + + private final DataStoreTypes.ChangeSet changeSet; + private final Status status; + + private final ResultType resultType; + + private FDv2SourceResult(DataStoreTypes.ChangeSet changeSet, Status status, ResultType resultType) { + this.changeSet = changeSet; + this.status = status; + this.resultType = resultType; + } + + public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo) { + return new FDv2SourceResult(null, new Status(State.INTERRUPTED, errorInfo), ResultType.STATUS); + } + + public static FDv2SourceResult shutdown() { + return new FDv2SourceResult(null, new Status(State.SHUTDOWN, null), ResultType.STATUS); + } + + public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo) { + return new FDv2SourceResult(null, new Status(State.TERMINAL_ERROR, errorInfo), ResultType.STATUS); + } + + public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet changeSet) { + return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET); + } + + public static FDv2SourceResult goodbye(String reason) { + // TODO: Goodbye reason. + return new FDv2SourceResult(null, new Status(State.GOODBYE, null), ResultType.STATUS); + } + + public ResultType getResultType() { + return resultType; + } + + public Status getStatus() { + return status; + } + + public DataStoreTypes.ChangeSet getChangeSet() { + return changeSet; + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java new file mode 100644 index 0000000..bf44a30 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Initializer.java @@ -0,0 +1,52 @@ +package com.launchdarkly.sdk.server.datasources; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; + +// Mermaid source for state diagram. +// stateDiagram-v2 +// [*] --> RUNNING +// RUNNING --> SHUTDOWN +// RUNNING --> INTERRUPTED +// RUNNING --> CHANGESET +// RUNNING --> TERMINAL_ERROR +// RUNNING --> GOODBYE +// SHUTDOWN --> [*] +// INTERRUPTED --> [*] +// CHANGESET --> [*] +// TERMINAL_ERROR --> [*] +// GOODBYE --> [*] + +/** + * This type is currently experimental and not subject to semantic versioning. + *

+ * Interface for an asynchronous data source initializer. + *

+ * An initializer will run and produce a single result. If the initializer is successful, then it should emit a result + * containing a change set. If the initializer fails, then it should emit a status result describing the error. + *

+ * [START] + * │ + * ▼ + * ┌─────────────┐ + * │ RUNNING │──┐ + * └─────────────┘ │ + * │ │ │ │ │ + * │ │ │ │ └──► SHUTDOWN ───► [END] + * │ │ │ │ + * │ │ │ └─────► INTERRUPTED ───► [END] + * │ │ │ + * │ │ └─────────► CHANGESET ───► [END] + * │ │ + * │ └─────────────► TERMINAL_ERROR ───► [END] + * │ + * └─────────────────► GOODBYE ───► [END] + * + */ +public interface Initializer extends DataSourceShutdown { + /** + * Run the initializer to completion. + * @return The result of the initializer. + */ + CompletableFuture run(); +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/SelectorSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/SelectorSource.java new file mode 100644 index 0000000..163c384 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/SelectorSource.java @@ -0,0 +1,16 @@ +package com.launchdarkly.sdk.server.datasources; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; + +/** + * This type is currently experimental and not subject to semantic versioning. + *

+ * Source of selectors for FDv2 implementations. + */ +public interface SelectorSource { + /** + * Get the current selector. + * @return The current selector. + */ + Selector getSelector(); +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Synchronizer.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Synchronizer.java new file mode 100644 index 0000000..ee86f23 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/Synchronizer.java @@ -0,0 +1,57 @@ +package com.launchdarkly.sdk.server.datasources; + +import java.util.concurrent.CompletableFuture; + +// Mermaid source for state diagram. +// stateDiagram-v2 +// [*] --> RUNNING +// RUNNING --> SHUTDOWN +// SHUTDOWN --> [*] +// RUNNING --> TERMINAL_ERROR +// TERMINAL_ERROR --> [*] +// RUNNING --> GOODBYE +// GOODBYE --> [*] +// RUNNING --> CHANGE_SET +// CHANGE_SET --> RUNNING +// RUNNING --> INTERRUPTED +// INTERRUPTED --> RUNNING + +/** + * This type is currently experimental and not subject to semantic versioning. + *

+ * Interface for an asynchronous data source synchronizer. + *

+ * A synchronizer will run and produce a stream of results. When it experiences a temporary failure, it will emit a + * status event indicating that it is INTERRUPTED, while it attempts to resolve its failure. When it receives data, + * it should emit a result containing a change set. When the data source is shut down gracefully, it should emit a + * status event indicating that it is SHUTDOWN. + *

+ * [START] + * │ + * ▼ + * ┌─────────────┐ + * ┌─►│ RUNNING │──┐ + * │ └─────────────┘ │ + * │ │ │ │ │ │ + * │ │ │ │ │ └──► SHUTDOWN ───► [END] + * │ │ │ │ │ + * │ │ │ │ └──────► TERMINAL_ERROR ───► [END] + * │ │ │ │ + * │ │ │ └──────────► GOODBYE ───► [END] + * │ │ │ + * │ │ └──────────────► CHANGE_SET ───┐ + * │ │ │ + * │ └──────────────────► INTERRUPTED ──┤ + * │ │ + * └──────────────────────────────────────┘ + */ +public interface Synchronizer extends DataSourceShutdown { + /** + * Get the next result from the stream. + *

+ * This method is intended to be driven by a single thread, and for there to be a single outstanding call + * at any given time. + * @return a future that will complete when the next result is available + */ + CompletableFuture next(); +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/package-info.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/package-info.java new file mode 100644 index 0000000..ece5caa --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/package-info.java @@ -0,0 +1,6 @@ +/** + * Internal data source components for FDv2 protocol support. + *

+ * This package is currently experimental and not subject to semantic versioning. + */ +package com.launchdarkly.sdk.server.datasources; diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java new file mode 100644 index 0000000..ea3a77a --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java @@ -0,0 +1,447 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.internal.http.HttpProperties; +import com.launchdarkly.sdk.server.subsystems.ClientContext; +import com.launchdarkly.testhelpers.httptest.Handler; +import com.launchdarkly.testhelpers.httptest.Handlers; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestInfo; + +import org.junit.Test; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@SuppressWarnings("javadoc") +public class DefaultFDv2RequestorTest extends BaseTest { + private static final String SDK_KEY = "sdk-key"; + private static final String REQUEST_PATH = "/sdk/poll"; + + // Valid FDv2 polling response with multiple events + private static final String VALID_EVENTS_JSON = "{\n" + + " \"events\": [\n" + + " {\n" + + " \"event\": \"server-intent\",\n" + + " \"data\": {\n" + + " \"payloads\": [{\n" + + " \"id\": \"payload-1\",\n" + + " \"target\": 100,\n" + + " \"intentCode\": \"xfer-full\",\n" + + " \"reason\": \"payload-missing\"\n" + + " }]\n" + + " }\n" + + " },\n" + + " {\n" + + " \"event\": \"put-object\",\n" + + " \"data\": {\n" + + " \"version\": 150,\n" + + " \"kind\": \"flag\",\n" + + " \"key\": \"test-flag\",\n" + + " \"object\": {\n" + + " \"key\": \"test-flag\",\n" + + " \"version\": 1,\n" + + " \"on\": true,\n" + + " \"fallthrough\": { \"variation\": 0 },\n" + + " \"offVariation\": 1,\n" + + " \"variations\": [true, false],\n" + + " \"salt\": \"test-salt\",\n" + + " \"trackEvents\": false,\n" + + " \"trackEventsFallthrough\": false,\n" + + " \"debugEventsUntilDate\": null,\n" + + " \"clientSide\": false,\n" + + " \"deleted\": false\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"event\": \"payload-transferred\",\n" + + " \"data\": {\n" + + " \"state\": \"(p:payload-1:100)\",\n" + + " \"version\": 100\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + + // Empty events array + private static final String EMPTY_EVENTS_JSON = "{\"events\": []}"; + + private DefaultFDv2Requestor makeRequestor(HttpServer server) { + return makeRequestor(server, LDConfig.DEFAULT); + } + + private DefaultFDv2Requestor makeRequestor(HttpServer server, LDConfig config) { + return new DefaultFDv2Requestor(makeHttpConfig(config), server.getUri(), REQUEST_PATH, testLogger); + } + + private HttpProperties makeHttpConfig(LDConfig config) { + return ComponentsImpl.toHttpProperties(config.http.build(new ClientContext(SDK_KEY))); + } + + @Test + public void successfulRequestWithEvents() throws Exception { + Handler resp = Handlers.bodyJson(VALID_EVENTS_JSON); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + FDv2Requestor.FDv2PayloadResponse response = future.get(5, TimeUnit.SECONDS); + + assertNotNull(response); + assertNotNull(response.getEvents()); + assertEquals(3, response.getEvents().size()); + + List events = response.getEvents(); + assertEquals("server-intent", events.get(0).getEventType()); + assertEquals("put-object", events.get(1).getEventType()); + assertEquals("payload-transferred", events.get(2).getEventType()); + + RequestInfo req = server.getRecorder().requireRequest(); + assertEquals(REQUEST_PATH, req.getPath()); + } + } + } + + @Test + public void emptyEventsArray() throws Exception { + Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + FDv2Requestor.FDv2PayloadResponse response = future.get(5, TimeUnit.SECONDS); + + assertNotNull(response); + assertNotNull(response.getEvents()); + assertTrue(response.getEvents().isEmpty()); + } + } + } + + @Test + public void requestWithVersionQueryParameter() throws Exception { + Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + Selector selector = Selector.make(42, null); + + CompletableFuture future = + requestor.Poll(selector); + + future.get(5, TimeUnit.SECONDS); + + RequestInfo req = server.getRecorder().requireRequest(); + assertEquals(REQUEST_PATH, req.getPath()); + assertThat(req.getQuery(), containsString("version=42")); + } + } + } + + @Test + public void requestWithStateQueryParameter() throws Exception { + Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + Selector selector = Selector.make(0, "test-state"); + + CompletableFuture future = + requestor.Poll(selector); + + future.get(5, TimeUnit.SECONDS); + + RequestInfo req = server.getRecorder().requireRequest(); + assertEquals(REQUEST_PATH, req.getPath()); + assertThat(req.getQuery(), containsString("state=test-state")); + } + } + } + + @Test + public void requestWithBothQueryParameters() throws Exception { + Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + Selector selector = Selector.make(100, "my-state"); + + CompletableFuture future = + requestor.Poll(selector); + + future.get(5, TimeUnit.SECONDS); + + RequestInfo req = server.getRecorder().requireRequest(); + assertEquals(REQUEST_PATH, req.getPath()); + assertThat(req.getQuery(), containsString("version=100")); + assertThat(req.getQuery(), containsString("state=my-state")); + } + } + } + + @Test + public void etagCachingWith304NotModified() throws Exception { + Handler cacheableResp = Handlers.all( + Handlers.header("ETag", "my-etag-value"), + Handlers.bodyJson(VALID_EVENTS_JSON) + ); + Handler cachedResp = Handlers.status(304); + Handler sequence = Handlers.sequential(cacheableResp, cachedResp); + + try (HttpServer server = HttpServer.start(sequence)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + // First request should succeed and cache the ETag + CompletableFuture future1 = + requestor.Poll(Selector.EMPTY); + + FDv2Requestor.FDv2PayloadResponse response1 = future1.get(5, TimeUnit.SECONDS); + assertNotNull(response1); + assertEquals(3, response1.getEvents().size()); + + RequestInfo req1 = server.getRecorder().requireRequest(); + assertEquals(REQUEST_PATH, req1.getPath()); + assertEquals(null, req1.getHeader("If-None-Match")); + + // Second request should send If-None-Match and receive 304 + CompletableFuture future2 = + requestor.Poll(Selector.EMPTY); + + FDv2Requestor.FDv2PayloadResponse response2 = future2.get(5, TimeUnit.SECONDS); + assertEquals(null, response2); + + RequestInfo req2 = server.getRecorder().requireRequest(); + assertEquals(REQUEST_PATH, req2.getPath()); + assertEquals("my-etag-value", req2.getHeader("If-None-Match")); + } + } + } + + @Test + public void etagUpdatedOnNewResponse() throws Exception { + Handler resp1 = Handlers.all( + Handlers.header("ETag", "etag-1"), + Handlers.bodyJson(VALID_EVENTS_JSON) + ); + Handler resp2 = Handlers.all( + Handlers.header("ETag", "etag-2"), + Handlers.bodyJson(EMPTY_EVENTS_JSON) + ); + Handler resp3 = Handlers.status(304); + Handler sequence = Handlers.sequential(resp1, resp2, resp3); + + try (HttpServer server = HttpServer.start(sequence)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + // First request + requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); + RequestInfo req1 = server.getRecorder().requireRequest(); + assertEquals(null, req1.getHeader("If-None-Match")); + + // Second request should use etag-1 + requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); + RequestInfo req2 = server.getRecorder().requireRequest(); + assertEquals("etag-1", req2.getHeader("If-None-Match")); + + // Third request should use etag-2 + requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); + RequestInfo req3 = server.getRecorder().requireRequest(); + assertEquals("etag-2", req3.getHeader("If-None-Match")); + } + } + } + + @Test + public void etagRemovedWhenNotInResponse() throws Exception { + Handler resp1 = Handlers.all( + Handlers.header("ETag", "etag-1"), + Handlers.bodyJson(VALID_EVENTS_JSON) + ); + Handler resp2 = Handlers.bodyJson(EMPTY_EVENTS_JSON); // No ETag + Handler resp3 = Handlers.bodyJson(EMPTY_EVENTS_JSON); // Third request + Handler sequence = Handlers.sequential(resp1, resp2, resp3); + + try (HttpServer server = HttpServer.start(sequence)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + // First request with ETag + requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); + server.getRecorder().requireRequest(); + + // Second request should use etag-1 + requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); + RequestInfo req2 = server.getRecorder().requireRequest(); + assertEquals("etag-1", req2.getHeader("If-None-Match")); + + // Third request should not send ETag (was removed) + requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); + RequestInfo req3 = server.getRecorder().requireRequest(); + assertEquals(null, req3.getHeader("If-None-Match")); + } + } + } + + @Test + public void httpErrorCodeThrowsException() throws Exception { + Handler resp = Handlers.status(500); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), notNullValue()); + assertThat(e.getCause().getMessage(), containsString("500")); + } + } + } + } + + @Test + public void http404ThrowsException() throws Exception { + Handler resp = Handlers.status(404); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), notNullValue()); + assertThat(e.getCause().getMessage(), containsString("404")); + } + } + } + } + + @Test + public void invalidJsonThrowsException() throws Exception { + Handler resp = Handlers.bodyJson("{ invalid json }"); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), notNullValue()); + } + } + } + } + + @Test + public void missingEventsPropertyThrowsException() throws Exception { + Handler resp = Handlers.bodyJson("{}"); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), notNullValue()); + } + } + } + } + + @Test + public void baseUriCanHaveContextPath() throws Exception { + Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON); + + try (HttpServer server = HttpServer.start(resp)) { + URI uri = server.getUri().resolve("/context/path"); + + try (DefaultFDv2Requestor requestor = new DefaultFDv2Requestor( + makeHttpConfig(LDConfig.DEFAULT), uri, REQUEST_PATH, testLogger)) { + + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + future.get(5, TimeUnit.SECONDS); + + RequestInfo req = server.getRecorder().requireRequest(); + assertEquals("/context/path" + REQUEST_PATH, req.getPath()); + } + } + } + + @Test + public void differentSelectorsUseDifferentEtags() throws Exception { + Handler resp = Handlers.all( + Handlers.header("ETag", "etag-for-request"), + Handlers.bodyJson(EMPTY_EVENTS_JSON) + ); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + Selector selector1 = Selector.make(100, "state1"); + Selector selector2 = Selector.make(200, "state2"); + + // First request with selector1 + requestor.Poll(selector1).get(5, TimeUnit.SECONDS); + RequestInfo req1 = server.getRecorder().requireRequest(); + assertEquals(null, req1.getHeader("If-None-Match")); + + // Second request with selector1 should use cached ETag + requestor.Poll(selector1).get(5, TimeUnit.SECONDS); + RequestInfo req2 = server.getRecorder().requireRequest(); + assertEquals("etag-for-request", req2.getHeader("If-None-Match")); + + // Request with selector2 should not have ETag (different URI) + requestor.Poll(selector2).get(5, TimeUnit.SECONDS); + RequestInfo req3 = server.getRecorder().requireRequest(); + assertEquals(null, req3.getHeader("If-None-Match")); + } + } + } + + @Test + public void responseHeadersAreIncluded() throws Exception { + Handler resp = Handlers.all( + Handlers.header("X-Custom-Header", "custom-value"), + Handlers.bodyJson(EMPTY_EVENTS_JSON) + ); + + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFDv2Requestor requestor = makeRequestor(server)) { + CompletableFuture future = + requestor.Poll(Selector.EMPTY); + + FDv2Requestor.FDv2PayloadResponse response = future.get(5, TimeUnit.SECONDS); + + assertNotNull(response); + assertNotNull(response.getHeaders()); + assertEquals("custom-value", response.getHeaders().get("X-Custom-Header")); + } + } + } +} \ No newline at end of file diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2ChangeSetTranslatorTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2ChangeSetTranslatorTest.java new file mode 100644 index 0000000..522e94f --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2ChangeSetTranslatorTest.java @@ -0,0 +1,350 @@ +package com.launchdarkly.sdk.server; + +import com.google.common.collect.ImmutableList; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import com.launchdarkly.logging.LDLogLevel; +import com.launchdarkly.logging.LogCapture; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2Change; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2ChangeSetType; +import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2ChangeType; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class FDv2ChangeSetTranslatorTest extends BaseTest { + + private static JsonElement createFlagJsonElement(String key, int version) { + String json = String.format( + "{\n" + + " \"key\": \"%s\",\n" + + " \"version\": %d,\n" + + " \"on\": true,\n" + + " \"fallthrough\": {\"variation\": 0},\n" + + " \"variations\": [true, false]\n" + + "}", + key, version); + return JsonParser.parseString(json); + } + + private static JsonElement createSegmentJsonElement(String key, int version) { + String json = String.format( + "{\n" + + " \"key\": \"%s\",\n" + + " \"version\": %d\n" + + "}", + key, version); + return JsonParser.parseString(json); + } + + @Test + public void toChangeSet_withFullChangeset_returnsFullChangeSetType() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(ChangeSetType.Full, result.getType()); + } + + @Test + public void toChangeSet_withPartialChangeset_returnsPartialChangeSetType() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.PARTIAL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(ChangeSetType.Partial, result.getType()); + } + + @Test + public void toChangeSet_withNoneChangeset_returnsNoneChangeSetType() { + List changes = ImmutableList.of(); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.NONE, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(ChangeSetType.None, result.getType()); + } + + @Test + public void toChangeSet_includesSelector() { + List changes = ImmutableList.of(); + Selector selector = Selector.make(42, "test-state"); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, selector); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(selector.getVersion(), result.getSelector().getVersion()); + assertEquals(selector.getState(), result.getSelector().getState()); + } + + @Test + public void toChangeSet_includesEnvironmentId() { + List changes = ImmutableList.of(); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, "test-env-id"); + + assertEquals("test-env-id", result.getEnvironmentId()); + } + + @Test + public void toChangeSet_withNullEnvironmentId_returnsNullEnvironmentId() { + List changes = ImmutableList.of(); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertNull(result.getEnvironmentId()); + } + + @Test + public void toChangeSet_withPutOperation_deserializesItem() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + Map.Entry> flagData = findDataKind(result, "features"); + assertNotNull(flagData); + Map.Entry item = getFirstItem(flagData.getValue()); + assertEquals("flag1", item.getKey()); + assertNotNull(item.getValue().getItem()); + assertEquals(1, item.getValue().getVersion()); + } + + @Test + public void toChangeSet_withDeleteOperation_createsDeletedDescriptor() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.DELETE, "flag", "flag1", 5, null) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.PARTIAL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + Map.Entry> flagData = findDataKind(result, "features"); + assertNotNull(flagData); + Map.Entry item = getFirstItem(flagData.getValue()); + assertEquals("flag1", item.getKey()); + assertNull(item.getValue().getItem()); + assertEquals(5, item.getValue().getVersion()); + } + + @Test + public void toChangeSet_withMultipleFlags_groupsByKind() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)), + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag2", 2, createFlagJsonElement("flag2", 2)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + Map.Entry> flagData = findDataKind(result, "features"); + assertNotNull(flagData); + assertEquals(2, countItems(flagData.getValue())); + } + + @Test + public void toChangeSet_withFlagsAndSegments_createsMultipleDataKinds() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)), + new FDv2Change(FDv2ChangeType.PUT, "segment", "seg1", 1, createSegmentJsonElement("seg1", 1)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(2, countDataKinds(result)); + assertNotNull(findDataKind(result, "features")); + assertNotNull(findDataKind(result, "segments")); + } + + @Test + public void toChangeSet_withUnknownKind_skipsItemAndLogsWarning() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "unknown-kind", "item1", 1, createFlagJsonElement("item1", 1)), + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(1, countDataKinds(result)); + assertNotNull(findDataKind(result, "features")); + assertLogMessageContains(LDLogLevel.WARN, "Unknown data kind 'unknown-kind' in changeset, skipping"); + } + + @Test + public void toChangeSet_withPutOperationMissingObject_skipsItemAndLogsWarning() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, null), + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag2", 2, createFlagJsonElement("flag2", 2)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + Map.Entry> flagData = findDataKind(result, "features"); + assertNotNull(flagData); + assertEquals(1, countItems(flagData.getValue())); + assertEquals("flag2", getFirstItem(flagData.getValue()).getKey()); + assertLogMessageContains(LDLogLevel.WARN, "Put operation for flag/flag1 missing object data, skipping"); + } + + @Test + public void toChangeSet_withEmptyChanges_returnsEmptyData() { + List changes = ImmutableList.of(); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(0, countDataKinds(result)); + } + + @Test + public void toChangeSet_withMixedPutAndDelete_handlesAllOperations() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)), + new FDv2Change(FDv2ChangeType.DELETE, "flag", "flag2", 2, null), + new FDv2Change(FDv2ChangeType.PUT, "segment", "seg1", 1, createSegmentJsonElement("seg1", 1)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.PARTIAL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + assertEquals(2, countDataKinds(result)); + + Map.Entry> flagData = findDataKind(result, "features"); + assertNotNull(flagData); + assertEquals(2, countItems(flagData.getValue())); + + Map.Entry flag1 = findItem(flagData.getValue(), "flag1"); + assertNotNull(flag1.getValue().getItem()); + assertEquals(1, flag1.getValue().getVersion()); + + Map.Entry flag2 = findItem(flagData.getValue(), "flag2"); + assertNull(flag2.getValue().getItem()); + assertEquals(2, flag2.getValue().getVersion()); + + Map.Entry> segmentData = findDataKind(result, "segments"); + assertNotNull(segmentData); + assertEquals(1, countItems(segmentData.getValue())); + } + + @Test + public void toChangeSet_preservesOrderOfChangesWithinKind() { + List changes = ImmutableList.of( + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag3", 3, createFlagJsonElement("flag3", 3)), + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag1", 1, createFlagJsonElement("flag1", 1)), + new FDv2Change(FDv2ChangeType.PUT, "flag", "flag2", 2, createFlagJsonElement("flag2", 2)) + ); + FDv2ChangeSet fdv2ChangeSet = new FDv2ChangeSet(FDv2ChangeSetType.FULL, changes, Selector.make(1, "state1")); + + DataStoreTypes.ChangeSet result = + FDv2ChangeSetTranslator.toChangeSet(fdv2ChangeSet, testLogger, null); + + Map.Entry> flagData = findDataKind(result, "features"); + assertNotNull(flagData); + List> items = toList(flagData.getValue().getItems()); + assertEquals("flag3", items.get(0).getKey()); + assertEquals("flag1", items.get(1).getKey()); + assertEquals("flag2", items.get(2).getKey()); + } + + // Helper methods + + private Map.Entry> findDataKind( + DataStoreTypes.ChangeSet changeSet, String kindName) { + for (Map.Entry> entry : changeSet.getData()) { + if (entry.getKey().getName().equals(kindName)) { + return entry; + } + } + return null; + } + + private Map.Entry getFirstItem( + KeyedItems keyedItems) { + return keyedItems.getItems().iterator().next(); + } + + private Map.Entry findItem( + KeyedItems keyedItems, String key) { + for (Map.Entry entry : keyedItems.getItems()) { + if (entry.getKey().equals(key)) { + return entry; + } + } + return null; + } + + private int countItems(KeyedItems keyedItems) { + int count = 0; + for (@SuppressWarnings("unused") Map.Entry entry : keyedItems.getItems()) { + count++; + } + return count; + } + + private int countDataKinds(DataStoreTypes.ChangeSet changeSet) { + int count = 0; + for (@SuppressWarnings("unused") Map.Entry> entry : changeSet.getData()) { + count++; + } + return count; + } + + private List> toList( + Iterable> items) { + List> list = new ArrayList<>(); + for (Map.Entry item : items) { + list.add(item); + } + return list; + } + + private void assertLogMessageContains(LDLogLevel level, String expectedMessageSubstring) { + for (LogCapture.Message message : logCapture.getMessages()) { + if (message.getLevel() == level && message.getText().contains(expectedMessageSubstring)) { + return; + } + } + throw new AssertionError("Expected log message at level " + level + " containing: " + expectedMessageSubstring); + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java new file mode 100644 index 0000000..42e4b4b --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java @@ -0,0 +1,334 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.internal.http.HttpErrors; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.SelectorSource; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.SerializationException; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("javadoc") +public class PollingInitializerImplTest extends BaseTest { + + private FDv2Requestor mockRequestor() { + return mock(FDv2Requestor.class); + } + + private SelectorSource mockSelectorSource() { + SelectorSource source = mock(SelectorSource.class); + when(source.getSelector()).thenReturn(Selector.EMPTY); + return source; + } + + // Helper for Java 8 compatibility - failedFuture() is Java 9+ + private CompletableFuture failedFuture(Throwable ex) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(ex); + return future; + } + + private FDv2Requestor.FDv2PayloadResponse makeSuccessResponse() { + String json = "{\n" + + " \"events\": [\n" + + " {\n" + + " \"event\": \"server-intent\",\n" + + " \"data\": {\n" + + " \"payloads\": [{\n" + + " \"id\": \"payload-1\",\n" + + " \"target\": 100,\n" + + " \"intentCode\": \"xfer-full\",\n" + + " \"reason\": \"payload-missing\"\n" + + " }]\n" + + " }\n" + + " },\n" + + " {\n" + + " \"event\": \"payload-transferred\",\n" + + " \"data\": {\n" + + " \"state\": \"(p:payload-1:100)\",\n" + + " \"version\": 100\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + + try { + return new FDv2Requestor.FDv2PayloadResponse( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(json), + okhttp3.Headers.of() + ); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void successfulInitialization() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertNotNull(result.getChangeSet()); + + verify(requestor, times(1)).Poll(any(Selector.class)); + } + + @Test + public void httpRecoverableError() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(failedFuture(new HttpErrors.HttpErrorException(503))); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + assertNotNull(result.getStatus().getErrorInfo()); + assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind()); + + + } + + @Test + public void httpNonRecoverableError() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(failedFuture(new HttpErrors.HttpErrorException(401))); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind()); + + + } + + @Test + public void networkError() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(failedFuture(new IOException("Connection refused"))); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, result.getStatus().getErrorInfo().getKind()); + + + } + + @Test + public void serializationError() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(failedFuture(new SerializationException(new Exception("Invalid JSON")))); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.INVALID_DATA, result.getStatus().getErrorInfo().getKind()); + + + } + + @Test + public void shutdownBeforePollCompletes() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + CompletableFuture delayedResponse = new CompletableFuture<>(); + when(requestor.Poll(any(Selector.class))).thenReturn(delayedResponse); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + + // Shutdown before poll completes + Thread.sleep(100); + initializer.shutdown(); + + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + assertNull(result.getStatus().getErrorInfo()); + + + } + + @Test + public void shutdownAfterPollCompletes() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + + // Shutdown after completion should still work + initializer.shutdown(); + + + } + + @Test + public void errorEventInResponse() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + String errorJson = "{\n" + + " \"events\": [\n" + + " {\n" + + " \"event\": \"error\",\n" + + " \"data\": {\n" + + " \"error\": \"invalid-request\",\n" + + " \"reason\": \"bad request\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + + FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(errorJson), + okhttp3.Headers.of() + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + + + } + + @Test + public void goodbyeEventInResponse() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + String goodbyeJson = "{\n" + + " \"events\": [\n" + + " {\n" + + " \"event\": \"goodbye\",\n" + + " \"data\": {\n" + + " \"reason\": \"service-unavailable\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + + FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(goodbyeJson), + okhttp3.Headers.of() + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.GOODBYE, result.getStatus().getState()); + + + } + + @Test + public void emptyEventsArray() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + String emptyJson = "{\"events\": []}"; + + FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(emptyJson), + okhttp3.Headers.of() + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + // Empty events array should result in terminal error + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + + + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java new file mode 100644 index 0000000..ce8fa6f --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java @@ -0,0 +1,526 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.SelectorSource; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; + +import org.junit.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@SuppressWarnings("javadoc") +public class PollingSynchronizerImplTest extends BaseTest { + + private FDv2Requestor mockRequestor() { + return mock(FDv2Requestor.class); + } + + private SelectorSource mockSelectorSource() { + SelectorSource source = mock(SelectorSource.class); + when(source.getSelector()).thenReturn(Selector.EMPTY); + return source; + } + + // Helper for Java 8 compatibility - CompletableFuture.failedFuture() is Java 9+ + private CompletableFuture failedFuture(Throwable ex) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(ex); + return future; + } + + private FDv2Requestor.FDv2PayloadResponse makeSuccessResponse() { + String json = "{\n" + + " \"events\": [\n" + + " {\n" + + " \"event\": \"server-intent\",\n" + + " \"data\": {\n" + + " \"payloads\": [{\n" + + " \"id\": \"payload-1\",\n" + + " \"target\": 100,\n" + + " \"intentCode\": \"xfer-full\",\n" + + " \"reason\": \"payload-missing\"\n" + + " }]\n" + + " }\n" + + " },\n" + + " {\n" + + " \"event\": \"payload-transferred\",\n" + + " \"data\": {\n" + + " \"state\": \"(p:payload-1:100)\",\n" + + " \"version\": 100\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + + try { + return new FDv2Requestor.FDv2PayloadResponse( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(json), + okhttp3.Headers.of() + ); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void nextWaitsWhenQueueEmpty() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + // Delay the response so queue is initially empty + CompletableFuture delayedResponse = new CompletableFuture<>(); + when(requestor.Poll(any(Selector.class))).thenReturn(delayedResponse); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + CompletableFuture nextFuture = synchronizer.next(); + + // Verify future is not complete yet + Thread.sleep(50); + assertEquals(false, nextFuture.isDone()); + + // Complete the delayed response + delayedResponse.complete(makeSuccessResponse()); + + // Now the future should complete + FDv2SourceResult result = nextFuture.get(5, TimeUnit.SECONDS); + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + + synchronizer.shutdown(); + } finally { + executor.shutdown(); + } + } + + @Test + public void nextReturnsImmediatelyWhenResultQueued() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + // Wait for first poll to complete and queue result + Thread.sleep(150); + + // Now next() should return immediately + CompletableFuture nextFuture = synchronizer.next(); + assertTrue(nextFuture.isDone()); + + FDv2SourceResult result = nextFuture.get(1, TimeUnit.SECONDS); + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + + synchronizer.shutdown(); + } finally { + executor.shutdown(); + } + } + + @Test + public void multipleItemsQueuedReturnedInOrder() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + // Wait for multiple polls to complete and queue results + Thread.sleep(250); + + // Should have at least 3-4 results queued + FDv2SourceResult result1 = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result1); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result1.getResultType()); + + FDv2SourceResult result2 = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result2); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result2.getResultType()); + + FDv2SourceResult result3 = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result3); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result3.getResultType()); + + synchronizer.shutdown(); + } finally { + executor.shutdown(); + } + } + + @Test + public void shutdownBeforeNextCalled() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + // Shutdown immediately + synchronizer.shutdown(); + + // next() should return shutdown result + FDv2SourceResult result = synchronizer.next().get(5, TimeUnit.SECONDS); + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + } finally { + executor.shutdown(); + } + } + + @Test + public void shutdownWhileNextWaiting() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + // Delay the response so next() will be waiting + CompletableFuture delayedResponse = new CompletableFuture<>(); + when(requestor.Poll(any(Selector.class))).thenReturn(delayedResponse); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + CompletableFuture nextFuture = synchronizer.next(); + + // Verify next() is waiting + Thread.sleep(50); + assertEquals(false, nextFuture.isDone()); + + // Shutdown while waiting + synchronizer.shutdown(); + + // next() should complete with shutdown result + FDv2SourceResult result = nextFuture.get(5, TimeUnit.SECONDS); + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.SHUTDOWN, result.getStatus().getState()); + } finally { + executor.shutdown(); + } + } + + @Test + public void shutdownAfterMultipleItemsQueued() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + // Wait for multiple polls to complete + Thread.sleep(250); + + // Consume one result + FDv2SourceResult result1 = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result1); + + // Shutdown with items still in queue + synchronizer.shutdown(); + + // next() can return either queued items or shutdown + // Just verify we get valid results and eventually shutdown + boolean gotShutdown = false; + for (int i = 0; i < 10; i++) { + FDv2SourceResult result = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result); + if (result.getResultType() == FDv2SourceResult.ResultType.STATUS && + result.getStatus().getState() == FDv2SourceResult.State.SHUTDOWN) { + gotShutdown = true; + break; + } + } + assertTrue("Should eventually receive shutdown result", gotShutdown); + } finally { + executor.shutdown(); + } + } + + @Test + public void pollingContinuesInBackground() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + AtomicInteger pollCount = new AtomicInteger(0); + when(requestor.Poll(any(Selector.class))).thenAnswer(invocation -> { + pollCount.incrementAndGet(); + return CompletableFuture.completedFuture(makeSuccessResponse()); + }); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + // Wait for several poll intervals + Thread.sleep(250); + + // Should have polled multiple times + int count = pollCount.get(); + assertTrue("Expected multiple polls, got " + count, count >= 3); + + synchronizer.shutdown(); + } finally { + executor.shutdown(); + } + } + + @Test + public void errorsInPollingAreSwallowed() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + AtomicInteger callCount = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + when(requestor.Poll(any(Selector.class))).thenAnswer(invocation -> { + int count = callCount.incrementAndGet(); + // First and third calls succeed, second fails + if (count == 2) { + return failedFuture(new IOException("Network error")); + } else { + successCount.incrementAndGet(); + return CompletableFuture.completedFuture(makeSuccessResponse()); + } + }); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + // Wait for multiple polls including the failed one + Thread.sleep(250); + + // First result should be success + FDv2SourceResult result1 = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result1); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result1.getResultType()); + + // Second result should be the error (INTERRUPTED status) + FDv2SourceResult result2 = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result2); + assertEquals(FDv2SourceResult.ResultType.STATUS, result2.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result2.getStatus().getState()); + assertEquals(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, result2.getStatus().getErrorInfo().getKind()); + + // Third result should be success again + FDv2SourceResult result3 = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result3); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result3.getResultType()); + + // Verify polling continued after error + assertTrue("Should have at least 2 successful polls", successCount.get() >= 2); + + synchronizer.shutdown(); + } finally { + executor.shutdown(); + } + } + + @Test + public void taskCancelledOnShutdown() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + AtomicInteger pollCount = new AtomicInteger(0); + when(requestor.Poll(any(Selector.class))).thenAnswer(invocation -> { + pollCount.incrementAndGet(); + return CompletableFuture.completedFuture(makeSuccessResponse()); + }); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + Thread.sleep(100); + int countBeforeShutdown = pollCount.get(); + + synchronizer.shutdown(); + + // Wait and verify no more polls occur + Thread.sleep(200); + int countAfterShutdown = pollCount.get(); + + // Count should not increase significantly after shutdown + assertTrue("Polling should stop after shutdown", + countAfterShutdown <= countBeforeShutdown + 1); // Allow for 1 in-flight poll + } finally { + executor.shutdown(); + } + } + + @Test + public void nullResponseSwallowedInPolling() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + AtomicInteger callCount = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + when(requestor.Poll(any(Selector.class))).thenAnswer(invocation -> { + int count = callCount.incrementAndGet(); + // First call returns null (304 Not Modified), subsequent return success + if (count == 1) { + return CompletableFuture.completedFuture(null); + } else { + successCount.incrementAndGet(); + return CompletableFuture.completedFuture(makeSuccessResponse()); + } + }); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + // Wait for multiple polls + Thread.sleep(250); + + // Should get success results - null responses cause exceptions that are swallowed + FDv2SourceResult result = synchronizer.next().get(1, TimeUnit.SECONDS); + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + + // Verify polling continued after null response + assertTrue("Should have successful polls after null", successCount.get() >= 1); + + synchronizer.shutdown(); + } finally { + executor.shutdown(); + } + } + + @Test + public void multipleConsumersCanCallNext() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(50) + ); + + // Wait for some results to queue + Thread.sleep(200); + + // Multiple consumers get results + CompletableFuture future1 = synchronizer.next(); + CompletableFuture future2 = synchronizer.next(); + CompletableFuture future3 = synchronizer.next(); + + FDv2SourceResult result1 = future1.get(5, TimeUnit.SECONDS); + FDv2SourceResult result2 = future2.get(5, TimeUnit.SECONDS); + FDv2SourceResult result3 = future3.get(5, TimeUnit.SECONDS); + + assertNotNull(result1); + assertNotNull(result2); + assertNotNull(result3); + + synchronizer.shutdown(); + } finally { + executor.shutdown(); + } + } +}