diff --git a/da.test b/da.test new file mode 100755 index 000000000..b8a69e2ef Binary files /dev/null and b/da.test differ diff --git a/execution/grpc/go.mod b/execution/grpc/go.mod index 01c9eecd1..21f02396d 100644 --- a/execution/grpc/go.mod +++ b/execution/grpc/go.mod @@ -7,8 +7,8 @@ require ( connectrpc.com/grpcreflect v1.3.0 github.com/evstack/ev-node v1.0.0-beta.11 github.com/evstack/ev-node/core v1.0.0-beta.5 - golang.org/x/net v0.49.0 - google.golang.org/protobuf v1.36.11 + golang.org/x/net v0.47.0 + google.golang.org/protobuf v1.36.10 ) -require golang.org/x/text v0.33.0 // indirect +require golang.org/x/text v0.31.0 // indirect diff --git a/execution/grpc/go.sum b/execution/grpc/go.sum index 1ec876fd3..4eb05c9c8 100644 --- a/execution/grpc/go.sum +++ b/execution/grpc/go.sum @@ -8,9 +8,9 @@ github.com/evstack/ev-node/core v1.0.0-beta.5 h1:lgxE8XiF3U9pcFgh7xuKMgsOGvLBGRy github.com/evstack/ev-node/core v1.0.0-beta.5/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= -google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= -google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 817ca1f37..fb07058c9 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -378,9 +378,15 @@ func NewServiceHandler( config config.Config, bestKnown BestKnownHeightProvider, ) (http.Handler, error) { - storeServer := NewStoreServer(store, headerStore, dataStore, logger) - p2pServer := NewP2PServer(peerManager) - configServer := NewConfigServer(config, proposerAddress, logger) + var storeServer rpc.StoreServiceHandler = NewStoreServer(store, headerStore, dataStore, logger) + var p2pServer rpc.P2PServiceHandler = NewP2PServer(peerManager) + var configServer rpc.ConfigServiceHandler = NewConfigServer(config, proposerAddress, logger) + + if config.Instrumentation.IsTracingEnabled() { + storeServer = WithTracingStoreServer(storeServer) + p2pServer = WithTracingP2PServer(p2pServer) + configServer = WithTracingConfigServer(configServer) + } mux := http.NewServeMux() diff --git a/pkg/rpc/server/tracing.go b/pkg/rpc/server/tracing.go new file mode 100644 index 000000000..8448285e0 --- /dev/null +++ b/pkg/rpc/server/tracing.go @@ -0,0 +1,270 @@ +package server + +import ( + "context" + "encoding/hex" + + "connectrpc.com/connect" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/types/known/emptypb" + + pb "github.com/evstack/ev-node/types/pb/evnode/v1" + "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" +) + +// tracedStoreServer decorates a StoreServiceHandler with OpenTelemetry spans. +type tracedStoreServer struct { + inner v1connect.StoreServiceHandler + tracer trace.Tracer +} + +// WithTracingStoreServer decorates the provided store service handler with tracing spans. +func WithTracingStoreServer(inner v1connect.StoreServiceHandler) v1connect.StoreServiceHandler { + return &tracedStoreServer{ + inner: inner, + tracer: otel.Tracer("ev-node/store-service"), + } +} + +func (t *tracedStoreServer) GetBlock( + ctx context.Context, + req *connect.Request[pb.GetBlockRequest], +) (*connect.Response[pb.GetBlockResponse], error) { + var attrs []attribute.KeyValue + switch identifier := req.Msg.Identifier.(type) { + case *pb.GetBlockRequest_Height: + attrs = append(attrs, attribute.Int64("height", int64(identifier.Height))) + case *pb.GetBlockRequest_Hash: + if identifier.Hash != nil { + attrs = append(attrs, attribute.String("hash", hex.EncodeToString(identifier.Hash))) + } + } + + ctx, span := t.tracer.Start(ctx, "StoreService.GetBlock", trace.WithAttributes(attrs...)) + defer span.End() + + res, err := t.inner.GetBlock(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Bool("found", res.Msg.Block != nil), + ) + if res.Msg.Block != nil && res.Msg.Block.Data != nil { + totalSize := 0 + for _, tx := range res.Msg.Block.Data.Txs { + totalSize += len(tx) + } + span.SetAttributes( + attribute.Int("block_size_bytes", totalSize), + attribute.Int("tx_count", len(res.Msg.Block.Data.Txs)), + ) + } + return res, nil +} + +func (t *tracedStoreServer) GetState( + ctx context.Context, + req *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetStateResponse], error) { + ctx, span := t.tracer.Start(ctx, "StoreService.GetState") + defer span.End() + + res, err := t.inner.GetState(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + if res.Msg.State != nil { + span.SetAttributes( + attribute.Int64("height", int64(res.Msg.State.LastBlockHeight)), + attribute.String("app_hash", hex.EncodeToString(res.Msg.State.AppHash)), + attribute.Int64("da_height", int64(res.Msg.State.DaHeight)), + ) + } + return res, nil +} + +func (t *tracedStoreServer) GetMetadata( + ctx context.Context, + req *connect.Request[pb.GetMetadataRequest], +) (*connect.Response[pb.GetMetadataResponse], error) { + ctx, span := t.tracer.Start(ctx, "StoreService.GetMetadata", + trace.WithAttributes( + attribute.String("key", req.Msg.Key), + ), + ) + defer span.End() + + res, err := t.inner.GetMetadata(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int("value_size_bytes", len(res.Msg.Value)), + ) + return res, nil +} + +func (t *tracedStoreServer) GetGenesisDaHeight( + ctx context.Context, + req *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetGenesisDaHeightResponse], error) { + ctx, span := t.tracer.Start(ctx, "StoreService.GetGenesisDaHeight") + defer span.End() + + res, err := t.inner.GetGenesisDaHeight(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int64("genesis_da_height", int64(res.Msg.Height)), + ) + return res, nil +} + +func (t *tracedStoreServer) GetP2PStoreInfo( + ctx context.Context, + req *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetP2PStoreInfoResponse], error) { + ctx, span := t.tracer.Start(ctx, "StoreService.GetP2PStoreInfo") + defer span.End() + + res, err := t.inner.GetP2PStoreInfo(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int("store_count", len(res.Msg.Stores)), + ) + return res, nil +} + +// tracedP2PServer decorates a P2PServiceHandler with OpenTelemetry spans. +type tracedP2PServer struct { + inner v1connect.P2PServiceHandler + tracer trace.Tracer +} + +// WithTracingP2PServer decorates the provided P2P service handler with tracing spans. +func WithTracingP2PServer(inner v1connect.P2PServiceHandler) v1connect.P2PServiceHandler { + return &tracedP2PServer{ + inner: inner, + tracer: otel.Tracer("ev-node/p2p-service"), + } +} + +func (t *tracedP2PServer) GetPeerInfo( + ctx context.Context, + req *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetPeerInfoResponse], error) { + ctx, span := t.tracer.Start(ctx, "P2PService.GetPeerInfo") + defer span.End() + + res, err := t.inner.GetPeerInfo(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int("peer_count", len(res.Msg.Peers)), + ) + return res, nil +} + +func (t *tracedP2PServer) GetNetInfo( + ctx context.Context, + req *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetNetInfoResponse], error) { + ctx, span := t.tracer.Start(ctx, "P2PService.GetNetInfo") + defer span.End() + + res, err := t.inner.GetNetInfo(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + if res.Msg.NetInfo != nil { + span.SetAttributes( + attribute.String("node_id", res.Msg.NetInfo.Id), + attribute.Int("listen_address_count", len(res.Msg.NetInfo.ListenAddresses)), + ) + } + return res, nil +} + +// tracedConfigServer decorates a ConfigServiceHandler with OpenTelemetry spans. +type tracedConfigServer struct { + inner v1connect.ConfigServiceHandler + tracer trace.Tracer +} + +// WithTracingConfigServer decorates the provided config service handler with tracing spans. +func WithTracingConfigServer(inner v1connect.ConfigServiceHandler) v1connect.ConfigServiceHandler { + return &tracedConfigServer{ + inner: inner, + tracer: otel.Tracer("ev-node/config-service"), + } +} + +func (t *tracedConfigServer) GetNamespace( + ctx context.Context, + req *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetNamespaceResponse], error) { + ctx, span := t.tracer.Start(ctx, "ConfigService.GetNamespace") + defer span.End() + + res, err := t.inner.GetNamespace(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.String("header_namespace", res.Msg.HeaderNamespace), + attribute.String("data_namespace", res.Msg.DataNamespace), + ) + return res, nil +} + +func (t *tracedConfigServer) GetSignerInfo( + ctx context.Context, + req *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetSignerInfoResponse], error) { + ctx, span := t.tracer.Start(ctx, "ConfigService.GetSignerInfo") + defer span.End() + + res, err := t.inner.GetSignerInfo(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.String("signer_address", hex.EncodeToString(res.Msg.Address)), + ) + return res, nil +} diff --git a/pkg/rpc/server/tracing_test.go b/pkg/rpc/server/tracing_test.go new file mode 100644 index 000000000..f950f9e28 --- /dev/null +++ b/pkg/rpc/server/tracing_test.go @@ -0,0 +1,510 @@ +package server + +import ( + "context" + "errors" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + + pb "github.com/evstack/ev-node/types/pb/evnode/v1" + "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" +) + +// mock implementations for StoreService +type mockStoreServiceHandler struct { + getBlockFn func(context.Context, *connect.Request[pb.GetBlockRequest]) (*connect.Response[pb.GetBlockResponse], error) + getStateFn func(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetStateResponse], error) + getMetadataFn func(context.Context, *connect.Request[pb.GetMetadataRequest]) (*connect.Response[pb.GetMetadataResponse], error) + getGenesisDaHeightFn func(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetGenesisDaHeightResponse], error) + getP2PStoreInfoFn func(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetP2PStoreInfoResponse], error) +} + +func (m *mockStoreServiceHandler) GetBlock(ctx context.Context, req *connect.Request[pb.GetBlockRequest]) (*connect.Response[pb.GetBlockResponse], error) { + if m.getBlockFn != nil { + return m.getBlockFn(ctx, req) + } + return connect.NewResponse(&pb.GetBlockResponse{}), nil +} + +func (m *mockStoreServiceHandler) GetState(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetStateResponse], error) { + if m.getStateFn != nil { + return m.getStateFn(ctx, req) + } + return connect.NewResponse(&pb.GetStateResponse{}), nil +} + +func (m *mockStoreServiceHandler) GetMetadata(ctx context.Context, req *connect.Request[pb.GetMetadataRequest]) (*connect.Response[pb.GetMetadataResponse], error) { + if m.getMetadataFn != nil { + return m.getMetadataFn(ctx, req) + } + return connect.NewResponse(&pb.GetMetadataResponse{}), nil +} + +func (m *mockStoreServiceHandler) GetGenesisDaHeight(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetGenesisDaHeightResponse], error) { + if m.getGenesisDaHeightFn != nil { + return m.getGenesisDaHeightFn(ctx, req) + } + return connect.NewResponse(&pb.GetGenesisDaHeightResponse{}), nil +} + +func (m *mockStoreServiceHandler) GetP2PStoreInfo(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetP2PStoreInfoResponse], error) { + if m.getP2PStoreInfoFn != nil { + return m.getP2PStoreInfoFn(ctx, req) + } + return connect.NewResponse(&pb.GetP2PStoreInfoResponse{}), nil +} + +var _ v1connect.StoreServiceHandler = (*mockStoreServiceHandler)(nil) + +// mock implementations for P2PService +type mockP2PServiceHandler struct { + getPeerInfoFn func(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetPeerInfoResponse], error) + getNetInfoFn func(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetNetInfoResponse], error) +} + +func (m *mockP2PServiceHandler) GetPeerInfo(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetPeerInfoResponse], error) { + if m.getPeerInfoFn != nil { + return m.getPeerInfoFn(ctx, req) + } + return connect.NewResponse(&pb.GetPeerInfoResponse{}), nil +} + +func (m *mockP2PServiceHandler) GetNetInfo(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetNetInfoResponse], error) { + if m.getNetInfoFn != nil { + return m.getNetInfoFn(ctx, req) + } + return connect.NewResponse(&pb.GetNetInfoResponse{}), nil +} + +var _ v1connect.P2PServiceHandler = (*mockP2PServiceHandler)(nil) + +// mock implementations for ConfigService +type mockConfigServiceHandler struct { + getNamespaceFn func(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetNamespaceResponse], error) + getSignerInfoFn func(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetSignerInfoResponse], error) +} + +func (m *mockConfigServiceHandler) GetNamespace(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetNamespaceResponse], error) { + if m.getNamespaceFn != nil { + return m.getNamespaceFn(ctx, req) + } + return connect.NewResponse(&pb.GetNamespaceResponse{}), nil +} + +func (m *mockConfigServiceHandler) GetSignerInfo(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetSignerInfoResponse], error) { + if m.getSignerInfoFn != nil { + return m.getSignerInfoFn(ctx, req) + } + return connect.NewResponse(&pb.GetSignerInfoResponse{}), nil +} + +var _ v1connect.ConfigServiceHandler = (*mockConfigServiceHandler)(nil) + +func setupStoreTrace(t *testing.T, inner v1connect.StoreServiceHandler) (v1connect.StoreServiceHandler, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingStoreServer(inner), sr +} + +func setupP2PTrace(t *testing.T, inner v1connect.P2PServiceHandler) (v1connect.P2PServiceHandler, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingP2PServer(inner), sr +} + +func setupConfigTrace(t *testing.T, inner v1connect.ConfigServiceHandler) (v1connect.ConfigServiceHandler, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingConfigServer(inner), sr +} + +// StoreService tests + +func TestTracedStoreService_GetBlock_Success(t *testing.T) { + mock := &mockStoreServiceHandler{ + getBlockFn: func(ctx context.Context, req *connect.Request[pb.GetBlockRequest]) (*connect.Response[pb.GetBlockResponse], error) { + return connect.NewResponse(&pb.GetBlockResponse{ + Block: &pb.Block{ + Header: &pb.SignedHeader{ + Header: &pb.Header{ + Height: 10, + }, + }, + Data: &pb.Data{ + Txs: [][]byte{[]byte("tx1"), []byte("tx2")}, + }, + }, + }), nil + }, + } + handler, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&pb.GetBlockRequest{ + Identifier: &pb.GetBlockRequest_Height{Height: 10}, + }) + + res, err := handler.GetBlock(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "StoreService.GetBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "height", int64(10)) + requireAttribute(t, attrs, "found", true) + requireAttribute(t, attrs, "tx_count", 2) +} + +func TestTracedStoreService_GetBlock_Error(t *testing.T) { + mock := &mockStoreServiceHandler{ + getBlockFn: func(ctx context.Context, req *connect.Request[pb.GetBlockRequest]) (*connect.Response[pb.GetBlockResponse], error) { + return nil, connect.NewError(connect.CodeNotFound, errors.New("block not found")) + }, + } + handler, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&pb.GetBlockRequest{ + Identifier: &pb.GetBlockRequest_Height{Height: 999}, + }) + + _, err := handler.GetBlock(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedStoreService_GetState_Success(t *testing.T) { + mock := &mockStoreServiceHandler{ + getStateFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetStateResponse], error) { + return connect.NewResponse(&pb.GetStateResponse{ + State: &pb.State{ + LastBlockHeight: 100, + AppHash: []byte{0xaa, 0xbb}, + DaHeight: 50, + LastBlockTime: timestamppb.New(time.Now()), + }, + }), nil + }, + } + handler, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + res, err := handler.GetState(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "StoreService.GetState", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "height", int64(100)) + requireAttribute(t, attrs, "app_hash", "aabb") + requireAttribute(t, attrs, "da_height", int64(50)) +} + +func TestTracedStoreService_GetMetadata_Success(t *testing.T) { + mock := &mockStoreServiceHandler{ + getMetadataFn: func(ctx context.Context, req *connect.Request[pb.GetMetadataRequest]) (*connect.Response[pb.GetMetadataResponse], error) { + return connect.NewResponse(&pb.GetMetadataResponse{ + Value: []byte("metadata_value"), + }), nil + }, + } + handler, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&pb.GetMetadataRequest{ + Key: "test_key", + }) + + res, err := handler.GetMetadata(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "StoreService.GetMetadata", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "key", "test_key") + requireAttribute(t, attrs, "value_size_bytes", 14) +} + +func TestTracedStoreService_GetGenesisDaHeight_Success(t *testing.T) { + mock := &mockStoreServiceHandler{ + getGenesisDaHeightFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetGenesisDaHeightResponse], error) { + return connect.NewResponse(&pb.GetGenesisDaHeightResponse{ + Height: 1000, + }), nil + }, + } + handler, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + res, err := handler.GetGenesisDaHeight(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "StoreService.GetGenesisDaHeight", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "genesis_da_height", int64(1000)) +} + +func TestTracedStoreService_GetP2PStoreInfo_Success(t *testing.T) { + mock := &mockStoreServiceHandler{ + getP2PStoreInfoFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetP2PStoreInfoResponse], error) { + return connect.NewResponse(&pb.GetP2PStoreInfoResponse{ + Stores: []*pb.P2PStoreSnapshot{ + {Label: "Header Store", Height: 100}, + {Label: "Data Store", Height: 99}, + }, + }), nil + }, + } + handler, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + res, err := handler.GetP2PStoreInfo(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "StoreService.GetP2PStoreInfo", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "store_count", 2) +} + +// P2PService tests + +func TestTracedP2PService_GetPeerInfo_Success(t *testing.T) { + mock := &mockP2PServiceHandler{ + getPeerInfoFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetPeerInfoResponse], error) { + return connect.NewResponse(&pb.GetPeerInfoResponse{ + Peers: []*pb.PeerInfo{ + {Id: "peer1", Address: "addr1"}, + {Id: "peer2", Address: "addr2"}, + }, + }), nil + }, + } + handler, sr := setupP2PTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + res, err := handler.GetPeerInfo(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "P2PService.GetPeerInfo", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "peer_count", 2) +} + +func TestTracedP2PService_GetPeerInfo_Error(t *testing.T) { + mock := &mockP2PServiceHandler{ + getPeerInfoFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetPeerInfoResponse], error) { + return nil, connect.NewError(connect.CodeInternal, errors.New("failed to get peers")) + }, + } + handler, sr := setupP2PTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + _, err := handler.GetPeerInfo(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedP2PService_GetNetInfo_Success(t *testing.T) { + mock := &mockP2PServiceHandler{ + getNetInfoFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetNetInfoResponse], error) { + return connect.NewResponse(&pb.GetNetInfoResponse{ + NetInfo: &pb.NetInfo{ + Id: "node123", + ListenAddresses: []string{"/ip4/127.0.0.1/tcp/26656"}, + }, + }), nil + }, + } + handler, sr := setupP2PTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + res, err := handler.GetNetInfo(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "P2PService.GetNetInfo", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "node_id", "node123") + requireAttribute(t, attrs, "listen_address_count", 1) +} + +// ConfigService tests + +func TestTracedConfigService_GetNamespace_Success(t *testing.T) { + mock := &mockConfigServiceHandler{ + getNamespaceFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetNamespaceResponse], error) { + return connect.NewResponse(&pb.GetNamespaceResponse{ + HeaderNamespace: "0x0001020304050607", + DataNamespace: "0x08090a0b0c0d0e0f", + }), nil + }, + } + handler, sr := setupConfigTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + res, err := handler.GetNamespace(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "ConfigService.GetNamespace", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "header_namespace", "0x0001020304050607") + requireAttribute(t, attrs, "data_namespace", "0x08090a0b0c0d0e0f") +} + +func TestTracedConfigService_GetNamespace_Error(t *testing.T) { + mock := &mockConfigServiceHandler{ + getNamespaceFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetNamespaceResponse], error) { + return nil, connect.NewError(connect.CodeInternal, errors.New("failed to get namespace")) + }, + } + handler, sr := setupConfigTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + _, err := handler.GetNamespace(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedConfigService_GetSignerInfo_Success(t *testing.T) { + mock := &mockConfigServiceHandler{ + getSignerInfoFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetSignerInfoResponse], error) { + return connect.NewResponse(&pb.GetSignerInfoResponse{ + Address: []byte{0x01, 0x02, 0x03, 0x04}, + }), nil + }, + } + handler, sr := setupConfigTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + res, err := handler.GetSignerInfo(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "ConfigService.GetSignerInfo", span.Name()) + + attrs := span.Attributes() + requireAttribute(t, attrs, "signer_address", "01020304") +} + +func TestTracedConfigService_GetSignerInfo_Error(t *testing.T) { + mock := &mockConfigServiceHandler{ + getSignerInfoFn: func(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[pb.GetSignerInfoResponse], error) { + return nil, connect.NewError(connect.CodeUnavailable, errors.New("signer not available")) + }, + } + handler, sr := setupConfigTrace(t, mock) + ctx := context.Background() + + req := connect.NewRequest(&emptypb.Empty{}) + _, err := handler.GetSignerInfo(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + case bool: + require.Equal(t, v, attr.Value.AsBool()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +}