-
Notifications
You must be signed in to change notification settings - Fork 11
chore: Add FDv2 data source interfaces. #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
19d9c02
fbea872
adcaa0e
f2b209d
8c115cc
de2fded
98d3b39
da3c639
8fb88ed
da27015
aba46ef
7401331
bba0cdc
89bd017
228f3e6
9469b23
9a450e6
4b8313b
31eb13e
4a2fe3b
3428591
ff60216
e985f80
a956484
ff2376e
194c30c
707fe0e
cb79f5e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.logging.LDLogger; | ||
| import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.Selector; | ||
| import com.launchdarkly.sdk.internal.http.HttpHelpers; | ||
| import com.launchdarkly.sdk.internal.http.HttpProperties; | ||
| import com.launchdarkly.sdk.json.SerializationException; | ||
|
|
||
| import okhttp3.Call; | ||
| import okhttp3.Callback; | ||
| import okhttp3.Headers; | ||
| import okhttp3.OkHttpClient; | ||
| import okhttp3.Request; | ||
| import okhttp3.Response; | ||
|
|
||
| import javax.annotation.Nonnull; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.net.SocketTimeoutException; | ||
| import java.net.URI; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| /** | ||
| * Implementation of FDv2Requestor for polling feature flag data via FDv2 protocol. | ||
| */ | ||
| public class DefaultFDv2Requestor implements FDv2Requestor, Closeable { | ||
| private static final String VERSION_QUERY_PARAM = "version"; | ||
| private static final String STATE_QUERY_PARAM = "state"; | ||
|
|
||
| private final OkHttpClient httpClient; | ||
| private final URI pollingUri; | ||
| private final Headers headers; | ||
| private final LDLogger logger; | ||
| private final Map<URI, String> etags; | ||
|
|
||
| /** | ||
| * Creates a DefaultFDv2Requestor. | ||
| * | ||
| * @param httpProperties HTTP configuration properties | ||
| * @param baseUri base URI for the FDv2 polling endpoint | ||
| * @param requestPath the request path to append to the base URI (e.g., "/sdk/poll") | ||
| * @param logger logger for diagnostic output | ||
| */ | ||
| public DefaultFDv2Requestor(HttpProperties httpProperties, URI baseUri, String requestPath, LDLogger logger) { | ||
| this.logger = logger; | ||
| this.pollingUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); | ||
| this.etags = new HashMap<>(); | ||
|
|
||
| OkHttpClient.Builder httpBuilder = httpProperties.toHttpClientBuilder(); | ||
| this.headers = httpProperties.toHeadersBuilder().build(); | ||
| this.httpClient = httpBuilder.build(); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<FDv2PayloadResponse> Poll(Selector selector) { | ||
| CompletableFuture<FDv2PayloadResponse> future = new CompletableFuture<>(); | ||
|
|
||
| try { | ||
| // Build the request URI with query parameters | ||
| URI requestUri = pollingUri; | ||
|
|
||
| if (!selector.isEmpty()) { | ||
| requestUri = HttpHelpers.addQueryParam(requestUri, VERSION_QUERY_PARAM, String.valueOf(selector.getVersion())); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see this version constraint on the Dotnet impl. I'm wondering why this ever was imposed? What if 0 is valid?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a couple reasons. 0 and not present would be the same, and 0 is the "zero" value for the go implementation. That said this should use |
||
|
|
||
| if (selector.getState() != null && !selector.getState().isEmpty()) { | ||
| requestUri = HttpHelpers.addQueryParam(requestUri, STATE_QUERY_PARAM, selector.getState()); | ||
| } | ||
|
|
||
| logger.debug("Making FDv2 polling request to: {}", requestUri); | ||
|
|
||
| // Build the HTTP request | ||
| Request.Builder requestBuilder = new Request.Builder() | ||
| .url(requestUri.toURL()) | ||
| .headers(headers) | ||
| .get(); | ||
|
|
||
| // Add ETag if we have one cached for this URI | ||
| synchronized (etags) { | ||
| String etag = etags.get(requestUri); | ||
| if (etag != null) { | ||
| requestBuilder.header("If-None-Match", etag); | ||
| } | ||
| } | ||
|
|
||
| Request request = requestBuilder.build(); | ||
| final URI finalRequestUri = requestUri; | ||
|
|
||
| // Make asynchronous HTTP call | ||
| httpClient.newCall(request).enqueue(new Callback() { | ||
| @Override | ||
| public void onFailure(@Nonnull Call call, @Nonnull IOException e) { | ||
| if (e instanceof SocketTimeoutException) { | ||
| future.completeExceptionally( | ||
| new IOException("FDv2 polling request timed out: " + finalRequestUri, e) | ||
| ); | ||
| } else { | ||
| future.completeExceptionally(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onResponse(@Nonnull Call call, @Nonnull Response response) { | ||
| try { | ||
| // Handle 304 Not Modified - no new data | ||
| if (response.code() == 304) { | ||
| logger.debug("FDv2 polling request returned 304: not modified"); | ||
| future.complete(null); | ||
| return; | ||
| } | ||
|
|
||
| if (!response.isSuccessful()) { | ||
| future.completeExceptionally( | ||
| new IOException("FDv2 polling request failed with status code: " + response.code()) | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| // Update ETag cache | ||
| String newEtag = response.header("ETag"); | ||
| synchronized (etags) { | ||
| if (newEtag != null) { | ||
| etags.put(finalRequestUri, newEtag); | ||
| } else { | ||
| etags.remove(finalRequestUri); | ||
| } | ||
| } | ||
|
|
||
| // The documentation indicates that the body will not be null for a response passed to the | ||
| // onResponse callback. | ||
| String responseBody = Objects.requireNonNull(response.body()).string(); | ||
| logger.debug("Received FDv2 polling response"); | ||
|
|
||
| List<FDv2Event> events = FDv2Event.parseEventsArray(responseBody); | ||
|
|
||
| // Create and return the response | ||
| FDv2PayloadResponse pollingResponse = new FDv2PayloadResponse(events, response.headers()); | ||
| future.complete(pollingResponse); | ||
|
|
||
| } catch (IOException | SerializationException e) { | ||
| future.completeExceptionally(e); | ||
| } finally { | ||
| response.close(); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| } catch (Exception e) { | ||
| future.completeExceptionally(e); | ||
| } | ||
|
|
||
| return future; | ||
| } | ||
|
|
||
| /** | ||
| * Closes the HTTP client and releases resources. | ||
| */ | ||
| public void close() { | ||
| HttpProperties.shutdownHttpClient(httpClient); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import com.launchdarkly.logging.LDLogger; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2Change; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet.FDv2ChangeType; | ||
| import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; | ||
| import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; | ||
| import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; | ||
| import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; | ||
| import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; | ||
|
|
||
| import java.util.AbstractMap; | ||
| import java.util.ArrayList; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * Translates FDv2 changesets into data store formats. | ||
| */ | ||
| final class FDv2ChangeSetTranslator { | ||
| private FDv2ChangeSetTranslator() { | ||
| } | ||
|
|
||
| /** | ||
| * Converts an FDv2ChangeSet to a DataStoreTypes.ChangeSet. | ||
| * | ||
| * @param changeset the FDv2 changeset to convert | ||
| * @param logger logger for diagnostic messages | ||
| * @param environmentId the environment ID to include in the changeset (may be null) | ||
| * @return a DataStoreTypes.ChangeSet containing the converted data | ||
| * @throws IllegalArgumentException if the changeset type is unknown | ||
| */ | ||
| public static DataStoreTypes.ChangeSet<ItemDescriptor> toChangeSet( | ||
| FDv2ChangeSet changeset, | ||
| LDLogger logger, | ||
| String environmentId) { | ||
| ChangeSetType changeSetType; | ||
| switch (changeset.getType()) { | ||
| case FULL: | ||
| changeSetType = ChangeSetType.Full; | ||
| break; | ||
| case PARTIAL: | ||
| changeSetType = ChangeSetType.Partial; | ||
| break; | ||
| case NONE: | ||
| changeSetType = ChangeSetType.None; | ||
| break; | ||
| default: | ||
| throw new IllegalArgumentException( | ||
| "Unknown FDv2ChangeSetType: " + changeset.getType() + ". This is an implementation error."); | ||
| } | ||
|
|
||
| // Use a LinkedHashMap to group items by DataKind in a single pass while preserving order | ||
| Map<DataKind, List<Map.Entry<String, ItemDescriptor>>> kindToItems = new LinkedHashMap<>(); | ||
|
|
||
| for (FDv2Change change : changeset.getChanges()) { | ||
| DataKind dataKind = getDataKind(change.getKind()); | ||
|
|
||
| if (dataKind == null) { | ||
| logger.warn("Unknown data kind '{}' in changeset, skipping", change.getKind()); | ||
| continue; | ||
| } | ||
|
|
||
| ItemDescriptor item; | ||
|
|
||
| if (change.getType() == FDv2ChangeType.PUT) { | ||
| if (change.getObject() == null) { | ||
| logger.warn( | ||
| "Put operation for {}/{} missing object data, skipping", | ||
| change.getKind(), | ||
| change.getKey()); | ||
| continue; | ||
| } | ||
| item = dataKind.deserialize(change.getObject().toString()); | ||
| } else if (change.getType() == FDv2ChangeType.DELETE) { | ||
| item = ItemDescriptor.deletedItem(change.getVersion()); | ||
| } else { | ||
| throw new IllegalArgumentException( | ||
| "Unknown FDv2ChangeType: " + change.getType() + ". This is an implementation error."); | ||
| } | ||
|
|
||
| List<Map.Entry<String, ItemDescriptor>> itemsList = | ||
| kindToItems.computeIfAbsent(dataKind, k -> new ArrayList<>()); | ||
|
|
||
| itemsList.add(new AbstractMap.SimpleImmutableEntry<>(change.getKey(), item)); | ||
| } | ||
|
|
||
| ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> dataBuilder = | ||
| ImmutableList.builder(); | ||
|
|
||
| for (Map.Entry<DataKind, List<Map.Entry<String, ItemDescriptor>>> entry : kindToItems.entrySet()) { | ||
| dataBuilder.add( | ||
| new AbstractMap.SimpleImmutableEntry<>( | ||
| entry.getKey(), | ||
| new KeyedItems<>(entry.getValue()) | ||
| )); | ||
| } | ||
|
|
||
| return new DataStoreTypes.ChangeSet<>( | ||
| changeSetType, | ||
| changeset.getSelector(), | ||
| dataBuilder.build(), | ||
| environmentId); | ||
| } | ||
|
|
||
| /** | ||
| * Maps an FDv2 object kind to the corresponding DataKind. | ||
| * | ||
| * @param kind the kind string from the FDv2 change | ||
| * @return the corresponding DataKind, or null if the kind is not recognized | ||
| */ | ||
| private static DataKind getDataKind(String kind) { | ||
| switch (kind) { | ||
| case "flag": | ||
| return DataModel.FEATURES; | ||
| case "segment": | ||
| return DataModel.SEGMENTS; | ||
| default: | ||
| return null; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some usage of "poll / polling" in this file, not sure if you want to eliminate that so if this requestor is used elsewhere, the concept of polling doesn't come with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change the types in the interface level, but this implementation is specific to our polling implementation and the format of data it provides as well as the cycle it operates on.