Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Rhythm ingest path #4314

Merged
merged 7 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,12 @@ func (t *App) initIngester() (services.Service, error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort
t.cfg.Ingester.DedicatedColumns = t.cfg.StorageConfig.Trace.Block.DedicatedColumns
t.cfg.Ingester.IngestStorageConfig = t.cfg.Ingest
ingester, err := ingester.New(t.cfg.Ingester, t.store, t.Overrides, prometheus.DefaultRegisterer)

// In SingleBinary mode don't try to discover parition from host name. Always use
// partition 0. This is for small installs or local/debugging setups.
singlePartition := t.cfg.Target == SingleBinary

ingester, err := ingester.New(t.cfg.Ingester, t.store, t.Overrides, prometheus.DefaultRegisterer, singlePartition)
if err != nil {
return nil, fmt.Errorf("failed to create ingester: %w", err)
}
Expand Down Expand Up @@ -328,6 +333,11 @@ func (t *App) initBlockBuilder() (services.Service, error) {
t.cfg.BlockBuilder.IngestStorageConfig = t.cfg.Ingest
t.cfg.BlockBuilder.IngestStorageConfig.Kafka.ConsumerGroup = blockbuilder.ConsumerGroup

if t.cfg.Target == SingleBinary && len(t.cfg.BlockBuilder.AssignedPartitions) == 0 {
// In SingleBinary mode always use partition 0. This is for small installs or local/debugging setups.
t.cfg.BlockBuilder.AssignedPartitions = append(t.cfg.BlockBuilder.AssignedPartitions, 0)
}

t.blockBuilder = blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store)

return t.blockBuilder, nil
Expand Down
13 changes: 5 additions & 8 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,16 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
}

func (b *BlockBuilder) running(ctx context.Context) error {
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration)
err := b.consumeCycle(ctx, cycleEndTime)
if err != nil {
return fmt.Errorf("failed to consume cycle: %w", err)
}
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

cycleEndTime, waitTime := nextCycleEnd(time.Now(), b.cfg.ConsumeCycleDuration)
for {
select {
case <-time.After(waitTime):
err = b.consumeCycle(ctx, cycleEndTime)
err := b.consumeCycle(ctx, cycleEndTime)
if err != nil {
return fmt.Errorf("failed to consume cycle: %w", err)
b.logger.Log("msg", "consumeCycle failed", "err", err)

// Don't progress cycle forward, keep trying at this timestamp
continue
}

cycleEndTime = cycleEndTime.Add(b.cfg.ConsumeCycleDuration)
Expand Down
10 changes: 10 additions & 0 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,13 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Usage.RegisterFlagsAndApplyDefaults(prefix, f)
}

func (cfg *Config) Validate() error {
if cfg.KafkaWritePathEnabled {
if err := cfg.KafkaConfig.Validate(); err != nil {
return err
}
}

return nil
}
19 changes: 12 additions & 7 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func New(
loggingLevel dslog.Level,
reg prometheus.Registerer,
) (*Distributor, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -419,20 +423,21 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
return nil, err
}

if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
}

if err := d.forwardersManager.ForTenant(userID).ForwardTraces(ctx, traces); err != nil {
_ = level.Warn(d.logger).Log("msg", "failed to forward batches for tenant=%s: %w", userID, err)
}

if d.cfg.KafkaWritePathEnabled {
err := d.sendWriteRequestsToPartitions(ctx, userID, keys, rebatchedTraces)
if d.kafkaProducer != nil {
err := d.sendToKafka(ctx, userID, keys, rebatchedTraces)
if err != nil {
// TODO: Handle error
level.Error(d.logger).Log("msg", "failed to write to kafka", "err", err)
}
} else {
// See if we need to send to the generators
if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
}
}

return nil, nil // PushRequest is ignored, so no reason to create one
Expand Down Expand Up @@ -561,7 +566,7 @@ func (d *Distributor) UsageTrackerHandler() http.Handler {
return nil
}

func (d *Distributor) sendWriteRequestsToPartitions(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace) error {
func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace) error {
marshalledTraces := make([][]byte, len(traces))
for i, t := range traces {
b, err := proto.Marshal(t.trace)
Expand Down
14 changes: 10 additions & 4 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Ingester struct {
}

// New makes a new Ingester.
func New(cfg Config, store storage.Store, overrides overrides.Interface, reg prometheus.Registerer) (*Ingester, error) {
func New(cfg Config, store storage.Store, overrides overrides.Interface, reg prometheus.Registerer, singlePartition bool) (*Ingester, error) {
i := &Ingester{
cfg: cfg,
instances: map[string]*instance{},
Expand All @@ -110,9 +110,15 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro
i.lifecycler = lc

if ingestCfg := cfg.IngestStorageConfig; ingestCfg.Enabled {
i.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
if singlePartition {
// For single-binary don't require hostname to identify a partition.
// Assume partition 0.
i.ingestPartitionID = 0
} else {
i.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
}
}

partitionRingKV := cfg.IngesterPartitionRing.KVStore.Mock
Expand Down
5 changes: 3 additions & 2 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ func TestIngesterStartingReadOnly(t *testing.T) {
defaultIngesterTestConfig(),
defaultIngesterStore(t, t.TempDir()),
limits,
prometheus.NewPedanticRegistry())
prometheus.NewPedanticRegistry(),
false)
require.NoError(t, err)

_, err = ingester.PushBytesV2(ctx, &tempopb.PushBytesRequest{})
Expand Down Expand Up @@ -455,7 +456,7 @@ func defaultIngesterWithOverrides(t testing.TB, tmpDir string, o overrides.Confi

s := defaultIngesterStore(t, tmpDir)

ingester, err := New(ingesterConfig, s, limits, prometheus.NewPedanticRegistry())
ingester, err := New(ingesterConfig, s, limits, prometheus.NewPedanticRegistry(), false)
require.NoError(t, err, "unexpected error creating ingester")
ingester.replayJitter = false

Expand Down
1 change: 1 addition & 0 deletions pkg/ingest/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Encode(partitionID int32, tenantID string, req *tempopb.PushBytesRequest, m
var records []*kgo.Record
batch := encoderPoolGet()
defer encoderPoolPut(batch)

currentSize := 0

for i, entry := range req.Traces {
Expand Down
Loading