Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync
from splitio.models.fallback_config import FallbackTreatmentCalculator
from splitio.events.events_metadata import EventsMetadata, SdkEventType
from splitio.models.notification import SdkInternalEventNotification
from splitio.models.events import SdkInternalEvent

# Storage
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage, LocalhostTelemetryStorage, \
Expand Down Expand Up @@ -166,6 +170,7 @@ def __init__( # pylint: disable=too-many-arguments
storages,
labels_enabled,
recorder,
internal_events_queue,
sync_manager=None,
sdk_ready_flag=None,
telemetry_producer=None,
Expand Down Expand Up @@ -204,6 +209,7 @@ def __init__( # pylint: disable=too-many-arguments
_LOGGER.debug("Running in threading mode")
self._sdk_internal_ready_flag = sdk_ready_flag
self._fallback_treatment_calculator = fallback_treatment_calculator
self._internal_events_queue = internal_events_queue
self._start_status_updater()

def _start_status_updater(self):
Expand All @@ -224,12 +230,15 @@ def _start_status_updater(self):
ready_updater.start()
else:
self._status = Status.READY

self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))

def _update_status_when_ready(self):
"""Wait until the sdk is ready and update the status."""
self._sdk_internal_ready_flag.wait()
self._status = Status.READY
self._sdk_ready_flag.set()
self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))

self._telemetry_init_producer.record_ready_time(get_current_epoch_time_ms() - self._ready_time)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -270,6 +279,7 @@ def block_until_ready(self, timeout=None):

if not ready:
self._telemetry_init_producer.record_bur_time_out()
self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_TIMED_OUT, None))
raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout)

def destroy(self, destroyed_event=None):
Expand Down Expand Up @@ -548,11 +558,11 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
}

events_queue = queue.Queue()
internal_events_queue = queue.Queue()
storages = {
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
'segments': InMemorySegmentStorage(events_queue),
'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue),
'splits': InMemorySplitStorage(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
'segments': InMemorySegmentStorage(internal_events_queue),
'rule_based_segments': InMemoryRuleBasedSegmentStorage(internal_events_queue),
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize'], telemetry_runtime_producer),
'events': InMemoryEventStorage(cfg['eventsQueueSize'], telemetry_runtime_producer),
}
Expand Down Expand Up @@ -629,14 +639,14 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
synchronizer._split_synchronizers._segment_sync.shutdown()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
recorder, internal_events_queue, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments']))

initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
initialization_thread.start()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, sdk_ready_flag,
recorder, internal_events_queue, manager, sdk_ready_flag,
telemetry_producer, telemetry_init_producer,
telemetry_submitter, fallback_treatment_calculator = FallbackTreatmentCalculator(cfg['fallbackTreatments']))

Expand Down Expand Up @@ -826,12 +836,14 @@ def _build_redis_factory(api_key, cfg):
initialization_thread.start()

telemetry_init_producer.record_config(cfg, {}, 0, 0)

internal_events_queue = queue.Queue()

split_factory = SplitFactory(
api_key,
storages,
cfg['labelsEnabled'],
recorder,
internal_events_queue,
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
Expand Down Expand Up @@ -992,12 +1004,14 @@ def _build_pluggable_factory(api_key, cfg):
initialization_thread.start()

telemetry_init_producer.record_config(cfg, {}, 0, 0)
internal_events_queue = queue.Queue()

split_factory = SplitFactory(
api_key,
storages,
cfg['labelsEnabled'],
recorder,
internal_events_queue,
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
Expand Down Expand Up @@ -1152,11 +1166,14 @@ def _build_localhost_factory(cfg):
telemetry_evaluation_producer,
telemetry_runtime_producer
)
internal_events_queue = queue.Queue()

return SplitFactory(
'localhost',
storages,
False,
recorder,
internal_events_queue,
manager,
ready_event,
telemetry_producer=telemetry_producer,
Expand Down
30 changes: 30 additions & 0 deletions tests/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def synchronize_config(*_):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -136,6 +137,7 @@ def test_get_treatment_with_config(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -215,6 +217,7 @@ def test_get_treatments(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -296,6 +299,7 @@ def test_get_treatments_by_flag_set(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -376,6 +380,7 @@ def test_get_treatments_by_flag_sets(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -455,6 +460,7 @@ def test_get_treatments_with_config(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -531,6 +537,7 @@ def test_get_treatments_with_config_by_flag_set(self, mocker):
destroyed_property = mocker.PropertyMock()
destroyed_property.return_value = False
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -539,6 +546,7 @@ def test_get_treatments_with_config_by_flag_set(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -620,6 +628,7 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -708,6 +717,7 @@ def synchronize_config(*_):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -773,6 +783,7 @@ def synchronize_config(*_):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -838,6 +849,7 @@ def synchronize_config(*_):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -874,6 +886,7 @@ def test_destroy(self, mocker):
telemetry_storage = InMemoryTelemetryStorage()
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -882,6 +895,7 @@ def test_destroy(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -911,6 +925,7 @@ def test_track(self, mocker):
telemetry_storage = InMemoryTelemetryStorage()
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -919,6 +934,7 @@ def test_track(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -961,6 +977,7 @@ def test_evaluations_before_running_post_fork(self, mocker):

impmanager = mocker.Mock(spec=ImpressionManager)
recorder = StandardRecorder(impmanager, mocker.Mock(), impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -969,6 +986,7 @@ def test_evaluations_before_running_post_fork(self, mocker):
'events': mocker.Mock()},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1047,6 +1065,7 @@ def test_telemetry_not_ready(self, mocker):
'events': mocker.Mock()},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1092,6 +1111,7 @@ def test_telemetry_record_treatment_exception(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
impmanager,
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1193,6 +1213,7 @@ def test_telemetry_method_latency(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
impmanager,
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1255,6 +1276,7 @@ def test_telemetry_track_exception(self, mocker):
telemetry_storage = InMemoryTelemetryStorage()
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -1263,6 +1285,7 @@ def test_telemetry_track_exception(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
impmanager,
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1316,6 +1339,7 @@ def synchronize_config(*_):
'events': event_storage},
mocker.Mock(),
recorder,
events_queue,
mocker.Mock(),
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1412,6 +1436,7 @@ def test_fallback_treatment_eval_exception(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_producer.get_telemetry_runtime_producer())
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
internal_events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -1420,6 +1445,7 @@ def test_fallback_treatment_eval_exception(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
internal_events_queue,
impmanager,
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1550,6 +1576,7 @@ def test_fallback_treatment_exception(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_producer.get_telemetry_runtime_producer())
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
internal_events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -1558,6 +1585,7 @@ def test_fallback_treatment_exception(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
internal_events_queue,
impmanager,
mocker.Mock(),
telemetry_producer,
Expand Down Expand Up @@ -1618,6 +1646,7 @@ def test_fallback_treatment_not_ready_impressions(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_producer.get_telemetry_runtime_producer())
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
internal_events_queue = queue.Queue()
factory = SplitFactory(mocker.Mock(),
{'splits': split_storage,
'segments': segment_storage,
Expand All @@ -1626,6 +1655,7 @@ def test_fallback_treatment_not_ready_impressions(self, mocker):
'events': event_storage},
mocker.Mock(),
recorder,
internal_events_queue,
impmanager,
mocker.Mock(),
telemetry_producer,
Expand Down
Loading