From b00410d1ca2fc891dacea6e17356a3b46ff33780 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 14 Jan 2026 10:07:35 -0800 Subject: [PATCH] updated segments and rb segments storages --- splitio/client/factory.py | 8 +-- splitio/storage/inmemmory.py | 24 +++++++-- tests/client/test_client.py | 62 +++++++++++----------- tests/engine/test_evaluator.py | 20 +++---- tests/integration/test_client_e2e.py | 20 +++---- tests/push/test_split_worker.py | 2 +- tests/storage/test_inmemory_storage.py | 66 +++++++++++++++++++++--- tests/sync/test_segments_synchronizer.py | 6 ++- tests/sync/test_splits_synchronizer.py | 13 ++--- tests/sync/test_synchronizer.py | 6 +-- tests/sync/test_telemetry.py | 2 +- tests/util/test_storage_helper.py | 4 +- 12 files changed, 152 insertions(+), 81 deletions(-) diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 42fa35a2..da41868c 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -551,8 +551,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl events_queue = queue.Queue() storages = { 'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), - 'segments': InMemorySegmentStorage(), - 'rule_based_segments': InMemoryRuleBasedSegmentStorage(), + 'segments': InMemorySegmentStorage(events_queue), + 'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue), 'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize'], telemetry_runtime_producer), 'events': InMemoryEventStorage(cfg['eventsQueueSize'], telemetry_runtime_producer), } @@ -1101,8 +1101,8 @@ def _build_localhost_factory(cfg): events_queue = queue.Queue() storages = { 'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), - 'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors. - 'rule_based_segments': InMemoryRuleBasedSegmentStorage(), + 'segments': InMemorySegmentStorage(events_queue), # not used, just to avoid possible future errors. + 'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue), 'impressions': LocalhostImpressionsStorage(), 'events': LocalhostEventsStorage(), } diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index 02cea19f..75097b14 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -113,11 +113,12 @@ def remove_flag_set(self, flag_sets, feature_flag_name, should_filter): class InMemoryRuleBasedSegmentStorage(RuleBasedSegmentsStorage): """InMemory implementation of a feature flag storage base.""" - def __init__(self): + def __init__(self, internal_event_queue): """Constructor.""" self._lock = threading.RLock() self._rule_based_segments = {} self._change_number = -1 + self._internal_event_queue = internal_event_queue def clear(self): """ @@ -153,6 +154,10 @@ def update(self, to_add, to_delete, new_change_number): [self._put(add_segment) for add_segment in to_add] [self._remove(delete_segment) for delete_segment in to_delete] self._set_change_number(new_change_number) + self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.RB_SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) def _put(self, rule_based_segment): """ @@ -934,11 +939,12 @@ async def is_flag_set_exist(self, flag_set): class InMemorySegmentStorage(SegmentStorage): """In-memory implementation of a segment storage.""" - def __init__(self): + def __init__(self, internal_event_queue): """Constructor.""" self._segments = {} self._change_numbers = {} self._lock = threading.RLock() + self._internal_event_queue = internal_event_queue def get(self, segment_name): """ @@ -968,9 +974,14 @@ def put(self, segment): with self._lock: self._segments[segment.name] = segment + self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + def update(self, segment_name, to_add, to_remove, change_number=None): """ - Update a feature flag. Create it if it doesn't exist. + Update a segment. Create it if it doesn't exist. :param segment_name: Name of the segment to update. :type segment_name: str @@ -988,6 +999,11 @@ def update(self, segment_name, to_add, to_remove, change_number=None): if change_number is not None: self._segments[segment_name].change_number = change_number + self._internal_event_queue.put( + SdkInternalEventNotification( + SdkInternalEvent.SEGMENTS_UPDATED, + EventsMetadata(SdkEventType.SEGMENT_UPDATE, {}))) + def get_change_number(self, segment_name): """ Retrieve latest change number for a segment. @@ -1100,7 +1116,7 @@ async def put(self, segment): async def update(self, segment_name, to_add, to_remove, change_number=None): """ - Update a feature flag. Create it if it doesn't exist. + Update a segment. Create it if it doesn't exist. :param segment_name: Name of the segment to update. :type segment_name: str diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 5846b169..a0226126 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -39,8 +39,8 @@ def test_get_treatment(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -117,8 +117,8 @@ def test_get_treatment_with_config(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) @@ -195,8 +195,8 @@ def test_get_treatments(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) @@ -276,8 +276,8 @@ def test_get_treatments_by_flag_set(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) @@ -356,8 +356,8 @@ def test_get_treatments_by_flag_sets(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) @@ -436,8 +436,8 @@ def test_get_treatments_with_config(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) @@ -520,8 +520,8 @@ def test_get_treatments_with_config_by_flag_set(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) @@ -601,8 +601,8 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) @@ -682,8 +682,8 @@ def test_impression_toggle_optimized(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -747,8 +747,8 @@ def test_impression_toggle_debug(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -812,8 +812,8 @@ def test_impression_toggle_none(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) @@ -953,8 +953,8 @@ def test_evaluations_before_running_post_fork(self, mocker): impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1) destroyed_property = mocker.PropertyMock() destroyed_property.return_value = False @@ -1035,8 +1035,8 @@ def test_telemetry_not_ready(self, mocker): impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1) recorder = StandardRecorder(impmanager, mocker.Mock(), mocker.Mock(), telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) factory = SplitFactory('localhost', @@ -1071,7 +1071,7 @@ def test_telemetry_record_treatment_exception(self, mocker): split_storage = InMemorySplitStorage(events_queue) split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1) segment_storage = mocker.Mock(spec=SegmentStorage) - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) impression_storage = mocker.Mock(spec=ImpressionStorage) event_storage = mocker.Mock(spec=EventStorage) destroyed_property = mocker.PropertyMock() @@ -1175,8 +1175,8 @@ def test_telemetry_method_latency(self, mocker): impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1) recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) destroyed_property = mocker.PropertyMock() @@ -1288,8 +1288,8 @@ def test_impressions_properties(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) diff --git a/tests/engine/test_evaluator.py b/tests/engine/test_evaluator.py index bccc3f78..dc83cc36 100644 --- a/tests/engine/test_evaluator.py +++ b/tests/engine/test_evaluator.py @@ -264,8 +264,8 @@ def test_evaluate_treatment_with_rbs_in_condition(self): e = evaluator.Evaluator(splitters.Splitter()) events_queue = queue.Queue() splits_storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() - segment_storage = InMemorySegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) + segment_storage = InMemorySegmentStorage(events_queue) evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage) rbs_segments = os.path.join(os.path.dirname(__file__), 'files', 'rule_base_segments.json') @@ -291,8 +291,8 @@ def test_using_segment_in_excluded(self): e = evaluator.Evaluator(splitters.Splitter()) events_queue = queue.Queue() splits_storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() - segment_storage = InMemorySegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) + segment_storage = InMemorySegmentStorage(events_queue) evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage) mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, []) @@ -316,8 +316,8 @@ def test_using_rbs_in_excluded(self): e = evaluator.Evaluator(splitters.Splitter()) events_queue = queue.Queue() splits_storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() - segment_storage = InMemorySegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) + segment_storage = InMemorySegmentStorage(events_queue) evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage) mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, []) @@ -340,8 +340,8 @@ def test_prerequisites(self): e = evaluator.Evaluator(splitters.Splitter()) events_queue = queue.Queue() splits_storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() - segment_storage = InMemorySegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) + segment_storage = InMemorySegmentStorage(events_queue) evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage) rbs = rule_based_segments.from_raw(data["rbs"]["d"][0]) @@ -549,8 +549,8 @@ def test_get_context(self): split2 = Split('split2', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, []) events_queue = queue.Queue() flag_storage = InMemorySplitStorage(events_queue, []) - segment_storage = InMemorySegmentStorage() - rbs_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rbs_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) flag_storage.update([mocked_split, split2], [], -1) rbs = copy.deepcopy(rbs_raw) rbs['conditions'].append( diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 018f3d42..8789aa3d 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -518,8 +518,8 @@ def setup_method(self): """Prepare storages with test data.""" events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') with open(split_fn, 'r') as flo: @@ -681,8 +681,8 @@ def setup_method(self): """Prepare storages with test data.""" events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() - rb_segment_storage = InMemoryRuleBasedSegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) + rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') with open(split_fn, 'r') as flo: data = json.loads(flo.read()) @@ -1970,7 +1970,7 @@ class InMemoryImpressionsToggleIntegrationTests(object): def test_optimized(self): events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) split_storage.update([splits.from_raw(splits_json['splitChange1_1']['ff']['d'][0]), splits.from_raw(splits_json['splitChange1_1']['ff']['d'][1]), @@ -1985,7 +1985,7 @@ def test_optimized(self): storages = { 'splits': split_storage, 'segments': segment_storage, - 'rule_based_segments': InMemoryRuleBasedSegmentStorage(), + 'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue), 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), } @@ -2029,7 +2029,7 @@ def test_optimized(self): def test_debug(self): events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) split_storage.update([splits.from_raw(splits_json['splitChange1_1']['ff']['d'][0]), splits.from_raw(splits_json['splitChange1_1']['ff']['d'][1]), @@ -2044,7 +2044,7 @@ def test_debug(self): storages = { 'splits': split_storage, 'segments': segment_storage, - 'rule_based_segments': InMemoryRuleBasedSegmentStorage(), + 'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue), 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), } @@ -2088,7 +2088,7 @@ def test_debug(self): def test_none(self): events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) split_storage.update([splits.from_raw(splits_json['splitChange1_1']['ff']['d'][0]), splits.from_raw(splits_json['splitChange1_1']['ff']['d'][1]), @@ -2103,7 +2103,7 @@ def test_none(self): storages = { 'splits': split_storage, 'segments': segment_storage, - 'rule_based_segments': InMemoryRuleBasedSegmentStorage(), + 'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue), 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), } diff --git a/tests/push/test_split_worker.py b/tests/push/test_split_worker.py index 22a146e3..198372a7 100644 --- a/tests/push/test_split_worker.py +++ b/tests/push/test_split_worker.py @@ -264,7 +264,7 @@ def test_fetch_segment(self, mocker): q = queue.Queue() events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - segment_storage = InMemorySegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) self.segment_name = None def segment_handler_sync(segment_name, change_number): diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index 7639bcb7..a37a1a4d 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -713,7 +713,8 @@ class InMemorySegmentStorageTests(object): def test_segment_storage_retrieval(self, mocker): """Test storing and retrieving segments.""" - storage = InMemorySegmentStorage() + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) segment = mocker.Mock(spec=Segment) name_property = mocker.PropertyMock() name_property.return_value = 'some_segment' @@ -725,14 +726,16 @@ def test_segment_storage_retrieval(self, mocker): def test_change_number(self, mocker): """Test storing and retrieving segment changeNumber.""" - storage = InMemorySegmentStorage() + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) storage.set_change_number('some_segment', 123) # Change number is not updated if segment doesn't exist assert storage.get_change_number('some_segment') is None assert storage.get_change_number('nonexistant-segment') is None # Change number is updated if segment does exist. - storage = InMemorySegmentStorage() + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) segment = mocker.Mock(spec=Segment) name_property = mocker.PropertyMock() name_property.return_value = 'some_segment' @@ -743,7 +746,8 @@ def test_change_number(self, mocker): def test_segment_contains(self, mocker): """Test using storage to determine whether a key belongs to a segment.""" - storage = InMemorySegmentStorage() + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) segment = mocker.Mock(spec=Segment) name_property = mocker.PropertyMock() name_property.return_value = 'some_segment' @@ -755,7 +759,8 @@ def test_segment_contains(self, mocker): def test_segment_update(self): """Test updating a segment.""" - storage = InMemorySegmentStorage() + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) segment = Segment('some_segment', ['key1', 'key2', 'key3'], 123) storage.put(segment) assert storage.get('some_segment') == segment @@ -768,6 +773,22 @@ def test_segment_update(self): assert not storage.segment_contains('some_segment', 'key3') assert storage.get_change_number('some_segment') == 456 + def test_internal_event_notification(self): + """Test updating a segment.""" + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) + segment = Segment('some_segment', ['key1', 'key2', 'key3'], 123) + storage.put(segment) + event = events_queue.get() + assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 + + storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456) + event = events_queue.get() + assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 class InMemorySegmentStorageAsyncTests(object): """In memory segment storage tests.""" @@ -1865,7 +1886,8 @@ class InMemoryRuleBasedSegmentStorageTests(object): def test_storing_retrieving_segments(self, mocker): """Test storing and retrieving splits works.""" - rbs_storage = InMemoryRuleBasedSegmentStorage() + events_queue = queue.Queue() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) segment1 = mocker.Mock(spec=RuleBasedSegment) name_property = mocker.PropertyMock() @@ -1887,7 +1909,8 @@ def test_storing_retrieving_segments(self, mocker): def test_store_get_changenumber(self): """Test that storing and retrieving change numbers works.""" - storage = InMemoryRuleBasedSegmentStorage() + events_queue = queue.Queue() + storage = InMemoryRuleBasedSegmentStorage(events_queue) assert storage.get_change_number() == -1 storage.update([], [], 5) assert storage.get_change_number() == 5 @@ -1911,12 +1934,39 @@ def test_contains(self): raw3 = copy.deepcopy(raw) raw3["name"] = "segment3" segment3 = rule_based_segments.from_raw(raw3) - storage = InMemoryRuleBasedSegmentStorage() + events_queue = queue.Queue() + storage = InMemoryRuleBasedSegmentStorage(events_queue) storage.update([segment1, segment2, segment3], [], -1) assert storage.contains(["segment1"]) assert storage.contains(["segment1", "segment3"]) assert not storage.contains(["segment5"]) + def test_internal_event_notification(self, mocker): + """Test storing and retrieving splits works.""" + events_queue = queue.Queue() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) + + segment1 = mocker.Mock(spec=RuleBasedSegment) + name_property = mocker.PropertyMock() + name_property.return_value = 'some_segment' + type(segment1).name = name_property + + segment2 = mocker.Mock() + name2_prop = mocker.PropertyMock() + name2_prop.return_value = 'segment2' + type(segment2).name = name2_prop + + rbs_storage.update([segment1, segment2], [], -1) + event = events_queue.get() + assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 + + rbs_storage.update([], ['some_segment'], -1) + assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED + assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE + assert len(event.metadata.get_names()) == 0 + class InMemoryRuleBasedSegmentStorageAsyncTests(object): """In memory rule based segment storage test cases.""" diff --git a/tests/sync/test_segments_synchronizer.py b/tests/sync/test_segments_synchronizer.py index e88db2fa..a3657e98 100644 --- a/tests/sync/test_segments_synchronizer.py +++ b/tests/sync/test_segments_synchronizer.py @@ -504,7 +504,8 @@ def test_synchronize_segments(self, mocker): """Test the normal operation flow.""" split_storage = mocker.Mock(spec=InMemorySplitStorage) split_storage.get_segment_names.return_value = ['segmentA', 'segmentB', 'segmentC'] - storage = InMemorySegmentStorage() + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) segment_a = {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], 'since': -1, 'till': 123} @@ -585,7 +586,8 @@ def test_reading_json(self, mocker): f.write('{"name": "segmentA", "added": ["key1", "key2", "key3"], "removed": [],"since": -1, "till": 123}') f.close() split_storage = mocker.Mock(spec=InMemorySplitStorage) - storage = InMemorySegmentStorage() + events_queue = queue.Queue() + storage = InMemorySegmentStorage(events_queue) segments_synchronizer = LocalSegmentSynchronizer('.', split_storage, storage) assert segments_synchronizer.synchronize_segments(['segmentA']) diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index d63b5f6a..ca3daa82 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -404,7 +404,8 @@ def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" events_queue = queue.Queue() storage = InMemorySplitStorage(events_queue, ['set1', 'set2']) - rbs_storage = InMemoryRuleBasedSegmentStorage() + events_queue = queue.Queue() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) split = copy.deepcopy(self.splits[0]) split['name'] = 'second' @@ -451,7 +452,7 @@ def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" events_queue = queue.Queue() storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) split = copy.deepcopy(self.splits[0]) split['name'] = 'second' splits1 = [self.splits[0].copy(), split] @@ -900,7 +901,7 @@ def test_synchronize_splits(self, mocker): """Test split sync.""" events_queue = queue.Queue() storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) def read_splits_from_json_file(*args, **kwargs): return self.payload @@ -945,7 +946,7 @@ def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" events_queue = queue.Queue() storage = InMemorySplitStorage(events_queue, ['set1', 'set2']) - rbs_storage = InMemoryRuleBasedSegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' @@ -988,7 +989,7 @@ def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" events_queue = queue.Queue() storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' @@ -1034,7 +1035,7 @@ def test_reading_json(self, mocker): f.close() events_queue = queue.Queue() storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_synchronizer = LocalSplitSynchronizer("./splits.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer.synchronize_splits() diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index 17a4f103..258077d4 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -127,12 +127,12 @@ def run(x, y, c): def test_synchronize_splits(self, mocker): events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_api = mocker.Mock() split_api.fetch_splits.return_value = {'ff': {'d': splits, 's': 123, 't': 123}, 'rbs': {'d': [], 's': -1, 't': -1}} split_sync = SplitSynchronizer(split_api, split_storage, rbs_storage) - segment_storage = InMemorySegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) segment_api = mocker.Mock() segment_api.fetch_segment.return_value = {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], 'since': 123, 'till': 123} @@ -155,7 +155,7 @@ def test_synchronize_splits(self, mocker): def test_synchronize_splits_calling_segment_sync_once(self, mocker): events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) - rbs_storage = InMemoryRuleBasedSegmentStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue) split_api = mocker.Mock() split_api.fetch_splits.return_value = {'ff': {'d': splits, 's': 123, 't': 123}, 'rbs': {'d': [], 's': -1, 't': -1}} diff --git a/tests/sync/test_telemetry.py b/tests/sync/test_telemetry.py index c37251af..5b41b344 100644 --- a/tests/sync/test_telemetry.py +++ b/tests/sync/test_telemetry.py @@ -61,7 +61,7 @@ def test_synchronize_telemetry(self, mocker): events_queue = queue.Queue() split_storage = InMemorySplitStorage(events_queue) split_storage.update([Split('split1', 1234, 1, False, 'user', Status.ACTIVE, 123)], [], -1) - segment_storage = InMemorySegmentStorage() + segment_storage = InMemorySegmentStorage(events_queue) segment_storage.put(Segment('segment1', [], 123)) telemetry_submitter = InMemoryTelemetrySubmitter(telemetry_consumer, split_storage, segment_storage, api) diff --git a/tests/util/test_storage_helper.py b/tests/util/test_storage_helper.py index 5804a6fa..dc75caa0 100644 --- a/tests/util/test_storage_helper.py +++ b/tests/util/test_storage_helper.py @@ -1,5 +1,6 @@ """Storage Helper tests.""" import pytest +import queue from splitio.util.storage_helper import update_feature_flag_storage, get_valid_flag_sets, combine_valid_flag_sets, \ update_rule_based_segment_storage, update_rule_based_segment_storage_async, update_feature_flag_storage_async, \ @@ -193,7 +194,8 @@ def clear(): assert self.clear == 1 def test_get_standard_segment_in_rbs_storage(self, mocker): - storage = InMemoryRuleBasedSegmentStorage() + events_queue = queue.Queue() + storage = InMemoryRuleBasedSegmentStorage(events_queue) segments = update_rule_based_segment_storage(storage, [self.rbs], 123) assert get_standard_segment_names_in_rbs_storage(storage) == {'excluded_segment', 'employees'}