diff --git a/.gitignore b/.gitignore index 08d1d2bfd..011218ddf 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ types/pb/tendermint .vscode/launch.json .vscode/settings.json vendor +*.test */**.html *.idea *.env diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go new file mode 100644 index 000000000..45fae2e86 --- /dev/null +++ b/block/internal/da/tracing.go @@ -0,0 +1,137 @@ +package da + +import ( + "context" + "encoding/hex" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// tracedClient decorates a FullClient with OpenTelemetry spans. +type tracedClient struct { + inner FullClient + tracer trace.Tracer +} + +// WithTracingClient decorates the provided client with tracing spans. +func WithTracingClient(inner FullClient) FullClient { + return &tracedClient{inner: inner, tracer: otel.Tracer("ev-node/da")} +} + +func (t *tracedClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { + total := 0 + for _, b := range data { + total += len(b) + } + ctx, span := t.tracer.Start(ctx, "DA.Submit", + trace.WithAttributes( + attribute.Int("blob.count", len(data)), + attribute.Int("blob.total_size_bytes", total), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + res := t.inner.Submit(ctx, data, gasPrice, namespace, options) + if res.Code != datypes.StatusSuccess { + span.RecordError(&submitError{msg: res.Message}) + span.SetStatus(codes.Error, res.Message) + } else { + span.SetAttributes(attribute.Int64("da.height", int64(res.Height))) + } + return res +} + +func (t *tracedClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + ctx, span := t.tracer.Start(ctx, "DA.Retrieve", + trace.WithAttributes( + attribute.Int("ns.length", len(namespace)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + res := t.inner.Retrieve(ctx, height, namespace) + + if res.Code != datypes.StatusSuccess && res.Code != datypes.StatusNotFound { + span.RecordError(&submitError{msg: res.Message}) + span.SetStatus(codes.Error, res.Message) + } else { + span.SetAttributes(attribute.Int("blob.count", len(res.Data))) + } + return res +} + +func (t *tracedClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { + ctx, span := t.tracer.Start(ctx, "DA.Get", + trace.WithAttributes( + attribute.Int("id.count", len(ids)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + blobs, err := t.inner.Get(ctx, ids, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.SetAttributes(attribute.Int("blob.count", len(blobs))) + return blobs, nil +} + +func (t *tracedClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) { + ctx, span := t.tracer.Start(ctx, "DA.GetProofs", + trace.WithAttributes( + attribute.Int("id.count", len(ids)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + proofs, err := t.inner.GetProofs(ctx, ids, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.SetAttributes(attribute.Int("proof.count", len(proofs))) + return proofs, nil +} + +func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) { + ctx, span := t.tracer.Start(ctx, "DA.Validate", + trace.WithAttributes( + attribute.Int("id.count", len(ids)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + res, err := t.inner.Validate(ctx, ids, proofs, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.SetAttributes(attribute.Int("result.count", len(res))) + return res, nil +} + +func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() } +func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() } +func (t *tracedClient) GetForcedInclusionNamespace() []byte { + return t.inner.GetForcedInclusionNamespace() +} +func (t *tracedClient) HasForcedInclusionNamespace() bool { + return t.inner.HasForcedInclusionNamespace() +} + +type submitError struct{ msg string } + +func (e *submitError) Error() string { return e.msg } diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go new file mode 100644 index 000000000..ca288770c --- /dev/null +++ b/block/internal/da/tracing_test.go @@ -0,0 +1,222 @@ +package da + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// mockFullClient provides function hooks for testing the tracing decorator. +type mockFullClient struct { + submitFn func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit + retrieveFn func(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve + getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) + validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) +} + +func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { + if m.submitFn != nil { + return m.submitFn(ctx, data, gasPrice, namespace, options) + } + return datypes.ResultSubmit{} +} +func (m *mockFullClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + if m.retrieveFn != nil { + return m.retrieveFn(ctx, height, namespace) + } + return datypes.ResultRetrieve{} +} +func (m *mockFullClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { + if m.getFn != nil { + return m.getFn(ctx, ids, namespace) + } + return nil, nil +} +func (m *mockFullClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) { + if m.getProofsFn != nil { + return m.getProofsFn(ctx, ids, namespace) + } + return nil, nil +} +func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) { + if m.validateFn != nil { + return m.validateFn(ctx, ids, proofs, namespace) + } + return nil, nil +} +func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } +func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } +func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } +func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } + +// setup a tracer provider + span recorder +func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingClient(inner), sr +} + +func TestTracedDA_Submit_Success(t *testing.T) { + mock := &mockFullClient{ + submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit { + return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: 123}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Submit(ctx, [][]byte{[]byte("a"), []byte("bc")}, -1.0, []byte{0xaa, 0xbb}, nil) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DA.Submit", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "blob.count", 2) + requireAttribute(t, attrs, "blob.total_size_bytes", 3) + // namespace hex string length assertion + // 2 bytes = 4 hex characters + foundNS := false + for _, a := range attrs { + if string(a.Key) == "da.namespace" { + foundNS = true + require.Equal(t, 4, len(a.Value.AsString())) + } + } + require.True(t, foundNS, "attribute da.namespace not found") +} + +func TestTracedDA_Submit_Error(t *testing.T) { + mock := &mockFullClient{ + submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit { + return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "boom"}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Submit(ctx, [][]byte{[]byte("a")}, -1.0, []byte{0xaa}, nil) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "boom", span.Status().Description) +} + +func TestTracedDA_Retrieve_Success(t *testing.T) { + mock := &mockFullClient{ + retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve { + return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: height}, Data: []datypes.Blob{{}, {}}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Retrieve(ctx, 42, []byte{0x01}) + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DA.Retrieve", span.Name()) + attrs := span.Attributes() + requireAttribute(t, attrs, "ns.length", 1) + requireAttribute(t, attrs, "blob.count", 2) +} + +func TestTracedDA_Retrieve_Error(t *testing.T) { + mock := &mockFullClient{ + retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve { + return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "oops"}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Retrieve(ctx, 7, []byte{0x02}) + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "oops", span.Status().Description) +} + +func TestTracedDA_Get_Success(t *testing.T) { + mock := &mockFullClient{ + getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { + return []datypes.Blob{{}, {}}, nil + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + ids := []datypes.ID{[]byte{0x01}, []byte{0x02}} + + blobs, err := client.Get(ctx, ids, []byte{0x01}) + require.NoError(t, err) + require.Len(t, blobs, 2) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DA.Get", span.Name()) + attrs := span.Attributes() + requireAttribute(t, attrs, "id.count", 2) + requireAttribute(t, attrs, "blob.count", 2) +} + +func TestTracedDA_Get_Error(t *testing.T) { + mock := &mockFullClient{ + getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { + return nil, errors.New("get failed") + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + ids := []datypes.ID{[]byte{0x01}} + + _, err := client.Get(ctx, ids, []byte{0x01}) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "get failed", span.Status().Description) +} + +// helper copied from eth tracing tests +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} diff --git a/block/public.go b/block/public.go index 1e3e0de07..54bba68c7 100644 --- a/block/public.go +++ b/block/public.go @@ -48,7 +48,7 @@ func NewDAClient( config config.Config, logger zerolog.Logger, ) FullDAClient { - return da.NewClient(da.Config{ + base := da.NewClient(da.Config{ DA: blobRPC, Logger: logger, Namespace: config.DA.GetNamespace(), @@ -56,6 +56,10 @@ func NewDAClient( DataNamespace: config.DA.GetDataNamespace(), ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(), }) + if config.Instrumentation.IsTracingEnabled() { + return da.WithTracingClient(base) + } + return base } // ErrForceInclusionNotConfigured is returned when force inclusion is not configured.