diff --git a/Makefile b/Makefile index 213ef29792..9ba8f3b7fc 100644 --- a/Makefile +++ b/Makefile @@ -150,6 +150,8 @@ aggregator_send_dummy_responses: test_go_retries: @cd core/ && \ go test -v -timeout 15m + @cd operator/pkg/ && \ + go test -v -run TestRequestBatch __OPERATOR__: diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 8916044a67..cfa3b0e020 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -397,17 +397,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by // |---RETRYABLE---| -/* -InitializeNewTaskRetryable -Initialize a new task in the BLS Aggregation service - - Errors: - Permanent: - - TaskAlreadyInitializedError (Permanent): Task is already intialized in the BLS Aggregation service (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L27). - Transient: - - All others. - - Retry times (3 retries): 1 sec, 2 sec, 4 sec -*/ -func (agg *Aggregator) InitializeNewTaskRetryable(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error { +func InitializeNewTaskFunc(agg *Aggregator, batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) func() error { initializeNewTask_func := func() error { err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, timeToExpiry) if err != nil { @@ -418,7 +408,21 @@ func (agg *Aggregator) InitializeNewTaskRetryable(batchIndex uint32, taskCreated } return err } - return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return initializeNewTask_func +} + +/* +InitializeNewTask +Initialize a new task in the BLS Aggregation service + - Errors: + Permanent: + - TaskAlreadyInitializedError (Permanent): Task is already intialized in the BLS Aggregation service (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L27). + Transient: + - All others. + - Retry times (3 retries): 1 sec, 2 sec, 4 sec +*/ +func (agg *Aggregator) InitializeNewTaskRetryable(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error { + return retry.Retry(InitializeNewTaskFunc(agg, batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, timeToExpiry), retry.EthCallRetryConfig()) } // Long-lived goroutine that periodically checks and removes old Tasks from stored Maps diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 0c3ee5c8bd..f6ef8f51ca 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -53,7 +53,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) taskIndex := uint32(0) - taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) + taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) if err != nil { agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") @@ -72,7 +72,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t agg.logger.Info("Starting bls signature process") go func() { - err := agg.ProcessNewSignature( + err := agg.ProcessNewSignatureRetryable( context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash, &signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId, ) @@ -114,16 +114,8 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error { // |---RETRYABLE---| -/* - - Errors: - Permanent: - - SignatureVerificationError: Verification of the sigature within the BLS Aggregation Service failed. (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L42). - Transient: - - All others. - - Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) - - NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown. -*/ -func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error { +func ProcessNewSignatureFunc(agg *Aggregator, ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) func() error { + processNewSignature_func := func() error { err := agg.blsAggregationService.ProcessNewSignature( ctx, taskIndex, taskResponse, @@ -136,11 +128,25 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32 } return err } + return processNewSignature_func +} - return retry.Retry(processNewSignature_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) +/* + - Errors: + Permanent: + - SignatureVerificationError: Verification of the sigature within the BLS Aggregation Service failed. (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L42). + Transient: + - All others. + - Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) + - NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown. +*/ +func (agg *Aggregator) ProcessNewSignatureRetryable(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error { + + return retry.Retry(ProcessNewSignatureFunc(agg, ctx, taskIndex, taskResponse, blsSignature, operatorId), retry.ChainRetryConfig()) } -func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { +func GetTaskIndexFunc(agg *Aggregator, batchIdentifierHash [32]byte) func() (uint32, error) { + getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") @@ -153,6 +159,14 @@ func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error return taskIndex, nil } } + return getTaskIndex_func +} - return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) +// Checks Internal mapping for Signed Task Response, returns its TaskIndex. +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec +*/ +func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) { + return retry.RetryWithData(GetTaskIndexFunc(agg, batchIdentifierHash), retry.EthCallRetryConfig()) } diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index ea810da628..26eefe0463 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index f80de6627e..e14aeb80b4 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -21,6 +21,11 @@ import ( "github.com/yetanotherco/aligned_layer/metrics" ) +const ( + waitForTxConfigMaxInterval = 2 * time.Second + waitForTxConfigNumRetries = 0 +) + type AvsWriter struct { *avsregistry.ChainWriter AvsContractBindings *AvsServiceBindings @@ -93,6 +98,16 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe txOpts.NoSend = false i := 0 + // Set Retry config for RespondToTaskV2 + respondToTaskV2Config := retry.EthCallRetryConfig() + respondToTaskV2Config.MaxElapsedTime = 0 + + // Set Retry config for WaitForTxRetryable + waitForTxConfig := retry.EthCallRetryConfig() + waitForTxConfig.MaxInterval = waitForTxConfigMaxInterval + waitForTxConfig.NumRetries = waitForTxConfigNumRetries + waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump + respondToTaskV2Func := func() (*types.Receipt, error) { gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback) if err != nil { @@ -126,7 +141,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, err } - receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), timeToWaitBeforeBump) + receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), waitForTxConfig) if receipt != nil { w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash) return receipt, nil @@ -143,7 +158,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, fmt.Errorf("transaction failed") } - return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0) + return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config) } // Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit @@ -183,8 +198,8 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t respondToTaskFeeLimit := batchState.RespondToTaskFeeLimit w.logger.Info("Checking balance against Batch RespondToTaskFeeLimit", "RespondToTaskFeeLimit", respondToTaskFeeLimit) // Note: we compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit - // Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned - // Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance + // Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned + // Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance return w.compareBalances(respondToTaskFeeLimit, aggregatorAddress, senderAddress) } diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 0ac44a59ab..3f8a23003e 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -15,14 +15,7 @@ import ( // |---AVS_WRITER---| -/* -RespondToTaskV2Retryable -Send a transaction to the AVS contract to respond to a task. -- All errors are considered Transient Errors -- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) -- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. -*/ -func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { +func RespondToTaskV2Func(w *AvsWriter, opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) func() (*types.Transaction, error) { respondToTaskV2_func := func() (*types.Transaction, error) { // Try with main connection tx, err := w.AvsContractBindings.ServiceManager.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature) @@ -32,21 +25,25 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl } return tx, err } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + return respondToTaskV2_func } /* -BatchesStateRetryable -Get the state of a batch from the AVS contract. +RespondToTaskV2Retryable +Send a transaction to the AVS contract to respond to a task. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec +- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) +- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. */ -func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { + return retry.RetryWithData(RespondToTaskV2Func(w, opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature), retry.ChainRetryConfig()) +} + +func BatchesStateFunc(w *AvsWriter, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int }, error) { - batchesState_func := func() (struct { TaskCreatedBlock uint32 Responded bool @@ -60,16 +57,24 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s } return state, err } - return retry.RetryWithData(batchesState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batchesState_func } /* -BatcherBalancesRetryable -Get the balance of a batcher from the AVS contract. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { +func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + return retry.RetryWithData(BatchesStateFunc(w, opts, arg0), retry.EthCallRetryConfig()) +} + +func BatcherBalancesFunc(w *AvsWriter, opts *bind.CallOpts, senderAddress common.Address) func() (*big.Int, error) { batcherBalances_func := func() (*big.Int, error) { // Try with main connection batcherBalance, err := w.AvsContractBindings.ServiceManager.BatchersBalances(opts, senderAddress) @@ -79,18 +84,20 @@ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress } return batcherBalance, err } - return retry.RetryWithData(batcherBalances_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batcherBalances_func } /* -BalanceAtRetryable -Get the balance of aggregatorAddress at blockNumber. -If blockNumber is nil, it gets the latest balance. -TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. +BatcherBalancesRetryable +Get the balance of a batcher from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { +func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { + return retry.RetryWithData(BatcherBalancesFunc(w, opts, senderAddress), retry.EthCallRetryConfig()) +} + +func BalanceAtFunc(w *AvsWriter, ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) func() (*big.Int, error) { balanceAt_func := func() (*big.Int, error) { // Try with main connection aggregatorBalance, err := w.Client.BalanceAt(ctx, aggregatorAddress, blockNumber) @@ -100,18 +107,24 @@ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress co } return aggregatorBalance, err } - return retry.RetryWithData(balanceAt_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return balanceAt_func } -// |---AVS_SUBSCRIBER---| - /* -BlockNumberRetryable -Get the latest block number from Ethereum +BalanceAtRetryable +Get the balance of aggregatorAddress at blockNumber. +If blockNumber is nil, it gets the latest balance. +TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { +func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { + return retry.RetryWithData(BalanceAtFunc(w, ctx, aggregatorAddress, blockNumber), retry.EthCallRetryConfig()) +} + +// |---AVS_SUBSCRIBER---| + +func BlockNumberFunc(s *AvsSubscriber, ctx context.Context) func() (uint64, error) { latestBlock_func := func() (uint64, error) { // Try with main connection latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(ctx) @@ -121,7 +134,28 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error } return latestBlock, err } - return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return latestBlock_func +} + +/* +BlockNumberRetryable +Get the latest block number from Ethereum +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { + return retry.RetryWithData(BlockNumberFunc(s, ctx), retry.EthCallRetryConfig()) +} + +func FilterBatchV2Func(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { + filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV2(opts, batchMerkleRoot) + } + return logs, err + } + return filterNewBatchV2_func } /* @@ -131,10 +165,18 @@ Get NewBatchV2 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + return retry.RetryWithData(FilterBatchV2Func(s, opts, batchMerkleRoot), retry.EthCallRetryConfig()) +} + +func FilterBatchV3Func(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filterNewBatchV3_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot) + } + return logs, err } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return filterNewBatchV3_func } /* @@ -144,19 +186,10 @@ Get NewBatchV3 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) - } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(FilterBatchV3Func(s, opts, batchMerkleRoot), retry.EthCallRetryConfig()) } -/* -BatchesStateRetryable -Get the state of a batch from the AVS contract. -- All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec -*/ -func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func BatchStateFunc(s *AvsSubscriber, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int @@ -166,19 +199,31 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte Responded bool RespondToTaskFeeLimit *big.Int }, error) { - return s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + state, err := s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + if err != nil { + state, err = s.AvsContractBindings.ServiceManagerFallback.BatchesState(opts, arg0) + } + return state, err } - - return retry.RetryWithData(batchState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batchState_func } /* -SubscribeNewHeadRetryable -Subscribe to new heads from the Ethereum node. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { +func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + + return retry.RetryWithData(BatchStateFunc(s, opts, arg0), retry.EthCallRetryConfig()) +} + +func SubscribeNewHeadFunc(s *AvsSubscriber, ctx context.Context, c chan<- *types.Header) func() (ethereum.Subscription, error) { subscribeNewHead_func := func() (ethereum.Subscription, error) { // Try with main connection sub, err := s.AvsContractBindings.ethClient.SubscribeNewHead(ctx, c) @@ -188,7 +233,29 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- } return sub, err } - return retry.RetryWithData(subscribeNewHead_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return subscribeNewHead_func +} + +/* +SubscribeNewHeadRetryable +Subscribe to new heads from the Ethereum node. +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { + return retry.RetryWithData(SubscribeNewHeadFunc(s, ctx, c), retry.EthCallRetryConfig()) +} + +func SubscribeToNewTasksV2Func( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { + subscribe_func := func() (event.Subscription, error) { + return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + } + return subscribe_func } /* @@ -203,10 +270,19 @@ func SubscribeToNewTasksV2Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { + return retry.RetryWithData(SubscribeToNewTasksV2Func(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.EthCallRetryConfig()) +} + +func SubscribeToNewTasksV3Func( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return subscribe_func } /* @@ -221,8 +297,5 @@ func SubscribeToNewTasksV3Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) - } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(SubscribeToNewTasksV3Func(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.EthCallRetryConfig()) } diff --git a/core/retry.go b/core/retry.go index 551be59c77..a4dfda151b 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,15 +25,47 @@ func (e PermanentError) Is(err error) bool { } const ( - MinDelay = 1 * time.Second // Initial delay for retry interval. - MaxInterval = 60 * time.Second // Maximum interval an individual retry may have. - MaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - RetryFactor float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - NumRetries uint64 = 3 // Total number of retries attempted. - MinDelayChain = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. - MaxIntervalChain = 2 * time.Minute // Maximum interval for an individual retry. + EthCallInitialInterval = 1 * time.Second // Initial delay for retry interval. + EthCallMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. + EthCallMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + EthCallRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. + EthCallMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. + EthCallNumRetries uint64 = 3 // Total number of retries attempted. + ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. + ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) +type RetryConfig struct { + InitialInterval time.Duration // Initial delay for retry interval. + MaxInterval time.Duration // Maximum interval an individual retry may have. + MaxElapsedTime time.Duration // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RandomizationFactor float64 + Multiplier float64 + NumRetries uint64 +} + +func EthCallRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: EthCallInitialInterval, + MaxInterval: EthCallMaxInterval, + MaxElapsedTime: EthCallMaxElapsedTime, + RandomizationFactor: EthCallRandomizationFactor, + Multiplier: EthCallMultiplier, + NumRetries: EthCallNumRetries, + } +} + +func ChainRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: ChainInitialInterval, + MaxInterval: ChainMaxInterval, + MaxElapsedTime: EthCallMaxElapsedTime, + RandomizationFactor: EthCallRandomizationFactor, + Multiplier: EthCallMultiplier, + NumRetries: EthCallNumRetries, + } +} + /* Retry and RetryWithData are custom retry functions used in Aligned's aggregator and operator to facilitate consistent retry logic across the system. They are interfaces for around Cenk Alti (https://github.com/cenkalti) backoff library (https://github.com/cenkalti/backoff). We would like to thank him for his great work. @@ -92,7 +124,8 @@ Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ // Same as Retry only that the functionToRetry can return a value upon correct execution -func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { +func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig) (T, error) { + //func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { f := func() (T, error) { var ( val T @@ -120,15 +153,15 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } @@ -142,7 +175,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat // from the configuration are reached, or until a `PermanentError` is returned. // The function to be retried should return `PermanentError` when the condition for stop retrying // is met. -func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) error { +func Retry(functionToRetry func() error, config *RetryConfig) error { f := func() error { var err error func() { @@ -167,15 +200,15 @@ func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } diff --git a/core/retry_test.go b/core/retry_test.go index 4a725fb6ab..b7cca8acd1 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -42,7 +42,16 @@ func TestRetryWithData(t *testing.T) { x, err := DummyFunction(43) return &x, err } - _, err := retry.RetryWithData(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.EthCallMultiplier, + NumRetries: retry.EthCallNumRetries, + } + _, err := retry.RetryWithData(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -53,7 +62,15 @@ func TestRetry(t *testing.T) { _, err := DummyFunction(43) return err } - err := retry.Retry(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.EthCallMultiplier, + NumRetries: retry.EthCallNumRetries, + } + err := retry.Retry(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -153,7 +170,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function := utils.WaitForTransactionReceiptFunc(*client, *client, hash, retry.EthCallRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -165,7 +183,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function = utils.WaitForTransactionReceiptFunc(*client, *client, hash, retry.EthCallRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) @@ -181,7 +200,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function = utils.WaitForTransactionReceiptFunc(*client, *client, hash, retry.EthCallRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -194,6 +214,50 @@ func TestWaitForTransactionReceipt(t *testing.T) { } } +func TestGetGasPrice(t *testing.T) { + + cmd, client, err := SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + // Assert Call succeeds when Anvil running + gas_function := utils.GetGasPriceFunc(*client, *client) + _, err = gas_function() + assert.Nil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) + + if err = cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } + + gas_function = utils.GetGasPriceFunc(*client, *client) + _, err = gas_function() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connect: connection refused") { + t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) + return + } + + cmd, client, err = SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + gas_function = utils.GetGasPriceFunc(*client, *client) + _, err = gas_function() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } +} + // NOTE: The following tests involving starting the aggregator panic after the connection to anvil is cut crashing the test runner. // The originates within the eigen-sdk and as of 8/11/24 is currently working to be fixed. @@ -339,7 +403,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV3Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -347,13 +412,14 @@ func TestSubscribeToNewTasksV3(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } @@ -363,7 +429,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -389,7 +456,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV2Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -397,13 +465,15 @@ func TestSubscribeToNewTasksV2(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() + assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } @@ -413,7 +483,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -434,7 +505,8 @@ func TestBlockNumber(t *testing.T) { if err != nil { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func := chainio.BlockNumberFunc(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -442,7 +514,8 @@ func TestBlockNumber(t *testing.T) { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumberFunc(sub, context.Background()) + _, err = block_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BlockNumber Emitted non Transient error: %s\n", err) @@ -458,7 +531,8 @@ func TestBlockNumber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumberFunc(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -478,7 +552,8 @@ func TestFilterBatchV2(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV2Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -486,13 +561,14 @@ func TestFilterBatchV2(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } @@ -502,7 +578,8 @@ func TestFilterBatchV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -522,7 +599,8 @@ func TestFilterBatchV3(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV3Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -530,13 +608,14 @@ func TestFilterBatchV3(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err) return } @@ -546,7 +625,8 @@ func TestFilterBatchV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -568,7 +648,8 @@ func TestBatchesStateSubscriber(t *testing.T) { } zero_bytes := [32]byte{} - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func := chainio.BatchStateFunc(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -576,13 +657,14 @@ func TestBatchesStateSubscriber(t *testing.T) { return } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchStateFunc(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } @@ -592,7 +674,8 @@ func TestBatchesStateSubscriber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchStateFunc(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -614,7 +697,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func := chainio.SubscribeNewHeadFunc(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -622,7 +706,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHeadFunc(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeNewHead Emitted non Transient error: %s\n", err) @@ -638,7 +723,8 @@ func TestSubscribeNewHead(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHeadFunc(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -691,7 +777,8 @@ func TestRespondToTaskV2(t *testing.T) { zero_bytes := [32]byte{} // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func := chainio.RespondToTaskV2Func(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -701,7 +788,8 @@ func TestRespondToTaskV2(t *testing.T) { t.Errorf("Error killing process: %v\n", err) } - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2Func(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if _, ok := err.(*backoff.PermanentError); ok { t.Errorf("RespondToTaskV2 Emitted non-Transient error: %s\n", err) @@ -716,7 +804,8 @@ func TestRespondToTaskV2(t *testing.T) { } // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2Func(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -744,7 +833,8 @@ func TestBatchesStateWriter(t *testing.T) { var bytes [32]byte num.FillBytes(bytes[:]) - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func := chainio.BatchesStateFunc(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -752,7 +842,8 @@ func TestBatchesStateWriter(t *testing.T) { return } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesStateFunc(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateWriter Emitted non-Transient error: %s\n", err) @@ -768,7 +859,8 @@ func TestBatchesStateWriter(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesStateFunc(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -778,7 +870,7 @@ func TestBatchesStateWriter(t *testing.T) { } func TestBalanceAt(t *testing.T) { - cmd, _, err := SetupAnvil(8545) + cmd, client, err := SetupAnvil(8545) if err != nil { t.Errorf("Error setting up Anvil: %s\n", err) } @@ -789,9 +881,15 @@ func TestBalanceAt(t *testing.T) { return } aggregator_address := common.HexToAddress("0x0") - blockHeight := big.NewInt(22) + // Fetch the latest block number + blockNumberUint64, err := client.BlockNumber(context.Background()) + blockNumber := new(big.Int).SetUint64(blockNumberUint64) + if err != nil { + t.Errorf("Error retrieving Anvil Block Number: %v\n", err) + } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func := chainio.BalanceAtFunc(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -799,7 +897,8 @@ func TestBalanceAt(t *testing.T) { return } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAtFunc(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BalanceAt Emitted non-Transient error: %s\n", err) @@ -815,7 +914,8 @@ func TestBalanceAt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAtFunc(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -837,7 +937,8 @@ func TestBatchersBalances(t *testing.T) { } senderAddress := common.HexToAddress("0x0") - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func := chainio.BatcherBalancesFunc(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -845,7 +946,8 @@ func TestBatchersBalances(t *testing.T) { return } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalancesFunc(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err) @@ -861,7 +963,8 @@ func TestBatchersBalances(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalancesFunc(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 36b28f0363..4da9bedce1 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -3,7 +3,6 @@ package utils import ( "context" "math/big" - "time" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" eigentypes "github.com/Layr-Labs/eigensdk-go/types" @@ -12,15 +11,7 @@ import ( retry "github.com/yetanotherco/aligned_layer/core" ) -// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. -// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. -// If the receipt is still unavailable after `waitTimeout`, it will return an error. -// -// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. -// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt -// All errors are considered Transient Errors -// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout -func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, waitTimeout time.Duration) (*types.Receipt, error) { +func WaitForTransactionReceiptFunc(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) func() (*types.Receipt, error) { receipt_func := func() (*types.Receipt, error) { receipt, err := client.TransactionReceipt(context.Background(), txHash) if err != nil { @@ -32,7 +23,19 @@ func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackC } return receipt, nil } - return retry.RetryWithData(receipt_func, retry.MinDelay, retry.RetryFactor, 0, time.Second*2, waitTimeout) + return receipt_func +} + +// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. +// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. +// If the receipt is still unavailable after `waitTimeout`, it will return an error. +// +// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. +// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt +// All errors are considered Transient Errors +// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout +func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) { + return retry.RetryWithData(WaitForTransactionReceiptFunc(client, fallbackClient, txHash, config), config) } func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums { @@ -81,14 +84,9 @@ func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, baseBumpPercent return bumpedGasPrice } -/* -GetGasPriceRetryable -Get the gas price from the client with retry logic. -- All errors are considered Transient Errors -- Retry times: 1 sec, 2 sec, 4 sec -*/ -func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) (*big.Int, error) { - respondToTaskV2_func := func() (*big.Int, error) { +func GetGasPriceFunc(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) func() (*big.Int, error) { + + getGasPrice_func := func() (*big.Int, error) { gasPrice, err := client.SuggestGasPrice(context.Background()) if err != nil { gasPrice, err = fallbackClient.SuggestGasPrice(context.Background()) @@ -99,5 +97,15 @@ func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.Inst return gasPrice, nil } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return getGasPrice_func +} + +/* +GetGasPriceRetryable +Get the gas price from the client with retry logic. +- All errors are considered Transient Errors +- Retry times: 1 sec, 2 sec, 4 sec +*/ +func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) (*big.Int, error) { + return retry.RetryWithData(GetGasPriceFunc(client, fallbackClient), retry.EthCallRetryConfig()) } diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index b5be4f9e0c..54f977c05a 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -66,10 +66,8 @@ type Operator struct { } const ( - BatchDownloadTimeout = 1 * time.Minute - BatchDownloadMaxRetries = 3 - BatchDownloadRetryDelay = 5 * time.Second - UnverifiedBatchOffset = 100 + BatchDownloadTimeout = 1 * time.Minute + UnverifiedBatchOffset = 100 ) func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, error) { @@ -361,7 +359,7 @@ func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlig ctx, cancel := context.WithTimeout(context.Background(), BatchDownloadTimeout) defer cancel() - verificationDataBatch, err := o.getBatchFromDataService(ctx, newBatchLog.BatchDataPointer, newBatchLog.BatchMerkleRoot, BatchDownloadMaxRetries, BatchDownloadRetryDelay) + verificationDataBatch, err := o.getBatchFromDataService(ctx, newBatchLog.BatchDataPointer, newBatchLog.BatchMerkleRoot) if err != nil { o.Logger.Errorf("Could not get proofs from S3 bucket: %v", err) return err @@ -442,7 +440,7 @@ func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlig ctx, cancel := context.WithTimeout(context.Background(), BatchDownloadTimeout) defer cancel() - verificationDataBatch, err := o.getBatchFromDataService(ctx, newBatchLog.BatchDataPointer, newBatchLog.BatchMerkleRoot, BatchDownloadMaxRetries, BatchDownloadRetryDelay) + verificationDataBatch, err := o.getBatchFromDataService(ctx, newBatchLog.BatchDataPointer, newBatchLog.BatchMerkleRoot) if err != nil { o.Logger.Errorf("Could not get proofs from S3 bucket: %v", err) return err diff --git a/operator/pkg/s3.go b/operator/pkg/s3.go index 865115fa7a..7030d1f84d 100644 --- a/operator/pkg/s3.go +++ b/operator/pkg/s3.go @@ -7,49 +7,47 @@ import ( "net/http" "time" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ugorji/go/codec" + retry "github.com/yetanotherco/aligned_layer/core" "github.com/yetanotherco/aligned_layer/operator/merkle_tree" ) -func (o *Operator) getBatchFromDataService(ctx context.Context, batchURL string, expectedMerkleRoot [32]byte, maxRetries int, retryDelay time.Duration) ([]VerificationData, error) { - o.Logger.Infof("Getting batch from data service, batchURL: %s", batchURL) +const BatchDownloadRetryDelay = 5 * time.Second - var resp *http.Response - var err error - var req *http.Request - - for attempt := 0; attempt < maxRetries; attempt++ { - if attempt > 0 { - o.Logger.Infof("Waiting for %s before retrying data fetch (attempt %d of %d)", retryDelay, attempt+1, maxRetries) - select { - case <-time.After(retryDelay): - // Wait before retrying - case <-ctx.Done(): - return nil, ctx.Err() - } - retryDelay *= 2 // Exponential backoff. Ex: 5s, 10s, 20s - } +func RequestBatch(req *http.Request, ctx context.Context) func() (*http.Response, error) { - req, err = http.NewRequestWithContext(ctx, "GET", batchURL, nil) + req_func := func() (*http.Response, error) { + resp, err := http.DefaultClient.Do(req) if err != nil { - return nil, err + // We close the response body to free the resources before re-trying + if resp != nil { + err = resp.Body.Close() + resp = nil + } } - resp, err = http.DefaultClient.Do(req) - if err == nil && resp.StatusCode == http.StatusOK { - break // Successful request, exit retry loop - } + return resp, err + } + return req_func +} - if resp != nil { - err := resp.Body.Close() - if err != nil { - return nil, err - } - } +func RequestBatchRetryable(ctx context.Context, logger logging.Logger, req *http.Request, config *retry.RetryConfig) (*http.Response, error) { + + return retry.RetryWithData(RequestBatch(req, ctx), config) +} - o.Logger.Warnf("Error fetching batch from data service - (attempt %d): %v", attempt+1, err) +func (o *Operator) getBatchFromDataService(ctx context.Context, batchURL string, expectedMerkleRoot [32]byte) ([]VerificationData, error) { + o.Logger.Infof("Getting batch from data service, batchURL: %s", batchURL) + + req, err := http.NewRequestWithContext(ctx, "GET", batchURL, nil) + if err != nil { + return nil, err } + config := retry.EthCallRetryConfig() + config.InitialInterval = BatchDownloadRetryDelay + resp, err := RequestBatchRetryable(ctx, o.Logger, req, config) if err != nil { return nil, err } diff --git a/operator/pkg/s3_test.go b/operator/pkg/s3_test.go new file mode 100644 index 0000000000..44c13a7d7b --- /dev/null +++ b/operator/pkg/s3_test.go @@ -0,0 +1,92 @@ +package operator + +import ( + "context" + "io" + "log" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + retry "github.com/yetanotherco/aligned_layer/core" +) + +// EchoHandler reads and writes the request body +func EchoHandler(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Error reading body", http.StatusBadRequest) + return + } + defer r.Body.Close() + + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + _, err = w.Write(body) + if err != nil { + http.Error(w, "Error writing body", http.StatusBadRequest) + return + } +} + +// Note the httptest API requires that for restarting a server its url is empty. Given ours the default address is in use. +// Its abstracting creation of a task server works around these issues. +// NOTE: httptest panic's if the url of the server starts without being set to "" ref: https://cs.opensource.google/go/go/+/refs/tags/go1.23.3:src/net/http/httptest/server.go;l=127 +func CreateTestServer() *httptest.Server { + // To Simulate Retrieving information from S3 we create a mock http server. + handler := http.HandlerFunc(EchoHandler) // create a listener with the desired port. + l, err := net.Listen("tcp", "127.0.0.1:7878") + if err != nil { + log.Fatal(err) + } + + svr := httptest.NewUnstartedServer(handler) + + // NewUnstartedServer creates a listener. Close that listener and replace + // with the one we created. + svr.Listener.Close() + svr.Listener = l + + return svr +} + +func TestRequestBatch(t *testing.T) { + svr := CreateTestServer() + + // Start the server. + svr.Start() + + req, err := http.NewRequestWithContext(context.Background(), "GET", svr.URL, nil) + if err != nil { + t.Errorf("Error creating req: %s\n", err) + } + + batcher_func := RequestBatch(req, context.Background()) + _, err = batcher_func() + assert.Nil(t, err) + + svr.Close() + + batcher_func = RequestBatch(req, context.Background()) + _, err = batcher_func() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connect: connection refused") { + t.Errorf("BatchersBalances did not return expected error: %s\n", err) + return + } + + svr = CreateTestServer() + svr.Start() + defer svr.Close() + + batcher_func = RequestBatch(req, context.Background()) + _, err = batcher_func() + assert.Nil(t, err) +}