Skip to content

Commit

Permalink
WIP: Rhythm ingest path (#4314)
Browse files Browse the repository at this point in the history
* Validate distributor config. Finish encoder/decoder tests

* Repair tests

* Make SingleBinary work out of the box by defaulting to partition 0

* Fix first time startup where blockbuilder fails before ingester can create topic

* Fix initial startup cycle time and delay
  • Loading branch information
mdisibio authored Nov 19, 2024
1 parent 80c057b commit 2b4e1fb
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 22 deletions.
12 changes: 11 additions & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,12 @@ func (t *App) initIngester() (services.Service, error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort
t.cfg.Ingester.DedicatedColumns = t.cfg.StorageConfig.Trace.Block.DedicatedColumns
t.cfg.Ingester.IngestStorageConfig = t.cfg.Ingest
ingester, err := ingester.New(t.cfg.Ingester, t.store, t.Overrides, prometheus.DefaultRegisterer)

// In SingleBinary mode don't try to discover parition from host name. Always use
// partition 0. This is for small installs or local/debugging setups.
singlePartition := t.cfg.Target == SingleBinary

ingester, err := ingester.New(t.cfg.Ingester, t.store, t.Overrides, prometheus.DefaultRegisterer, singlePartition)
if err != nil {
return nil, fmt.Errorf("failed to create ingester: %w", err)
}
Expand Down Expand Up @@ -328,6 +333,11 @@ func (t *App) initBlockBuilder() (services.Service, error) {
t.cfg.BlockBuilder.IngestStorageConfig = t.cfg.Ingest
t.cfg.BlockBuilder.IngestStorageConfig.Kafka.ConsumerGroup = blockbuilder.ConsumerGroup

if t.cfg.Target == SingleBinary && len(t.cfg.BlockBuilder.AssignedPartitions) == 0 {
// In SingleBinary mode always use partition 0. This is for small installs or local/debugging setups.
t.cfg.BlockBuilder.AssignedPartitions = append(t.cfg.BlockBuilder.AssignedPartitions, 0)
}

t.blockBuilder = blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store)

return t.blockBuilder, nil
Expand Down
15 changes: 7 additions & 8 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,18 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
}

func (b *BlockBuilder) running(ctx context.Context) error {
// Initial polling and delay
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration)
err := b.consumeCycle(ctx, cycleEndTime)
if err != nil {
return fmt.Errorf("failed to consume cycle: %w", err)
}

cycleEndTime, waitTime := nextCycleEnd(time.Now(), b.cfg.ConsumeCycleDuration)
waitTime := 2 * time.Second
for {
select {
case <-time.After(waitTime):
err = b.consumeCycle(ctx, cycleEndTime)
err := b.consumeCycle(ctx, cycleEndTime)
if err != nil {
return fmt.Errorf("failed to consume cycle: %w", err)
b.logger.Log("msg", "consumeCycle failed", "err", err)

// Don't progress cycle forward, keep trying at this timestamp
continue
}

cycleEndTime = cycleEndTime.Add(b.cfg.ConsumeCycleDuration)
Expand Down
10 changes: 10 additions & 0 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,13 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Usage.RegisterFlagsAndApplyDefaults(prefix, f)
}

func (cfg *Config) Validate() error {
if cfg.KafkaWritePathEnabled {
if err := cfg.KafkaConfig.Validate(); err != nil {
return err
}
}

return nil
}
19 changes: 12 additions & 7 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func New(
loggingLevel dslog.Level,
reg prometheus.Registerer,
) (*Distributor, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -419,20 +423,21 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
return nil, err
}

if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
}

if err := d.forwardersManager.ForTenant(userID).ForwardTraces(ctx, traces); err != nil {
_ = level.Warn(d.logger).Log("msg", "failed to forward batches for tenant=%s: %w", userID, err)
}

if d.cfg.KafkaWritePathEnabled {
err := d.sendWriteRequestsToPartitions(ctx, userID, keys, rebatchedTraces)
if d.kafkaProducer != nil {
err := d.sendToKafka(ctx, userID, keys, rebatchedTraces)
if err != nil {
// TODO: Handle error
level.Error(d.logger).Log("msg", "failed to write to kafka", "err", err)
}
} else {
// See if we need to send to the generators
if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
}
}

return nil, nil // PushRequest is ignored, so no reason to create one
Expand Down Expand Up @@ -561,7 +566,7 @@ func (d *Distributor) UsageTrackerHandler() http.Handler {
return nil
}

func (d *Distributor) sendWriteRequestsToPartitions(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace) error {
func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace) error {
marshalledTraces := make([][]byte, len(traces))
for i, t := range traces {
b, err := proto.Marshal(t.trace)
Expand Down
14 changes: 10 additions & 4 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Ingester struct {
}

// New makes a new Ingester.
func New(cfg Config, store storage.Store, overrides overrides.Interface, reg prometheus.Registerer) (*Ingester, error) {
func New(cfg Config, store storage.Store, overrides overrides.Interface, reg prometheus.Registerer, singlePartition bool) (*Ingester, error) {
i := &Ingester{
cfg: cfg,
instances: map[string]*instance{},
Expand All @@ -110,9 +110,15 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro
i.lifecycler = lc

if ingestCfg := cfg.IngestStorageConfig; ingestCfg.Enabled {
i.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
if singlePartition {
// For single-binary don't require hostname to identify a partition.
// Assume partition 0.
i.ingestPartitionID = 0
} else {
i.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
}
}

partitionRingKV := cfg.IngesterPartitionRing.KVStore.Mock
Expand Down
5 changes: 3 additions & 2 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ func TestIngesterStartingReadOnly(t *testing.T) {
defaultIngesterTestConfig(),
defaultIngesterStore(t, t.TempDir()),
limits,
prometheus.NewPedanticRegistry())
prometheus.NewPedanticRegistry(),
false)
require.NoError(t, err)

_, err = ingester.PushBytesV2(ctx, &tempopb.PushBytesRequest{})
Expand Down Expand Up @@ -547,7 +548,7 @@ func defaultIngesterWithOverrides(t testing.TB, tmpDir string, o overrides.Confi

s := defaultIngesterStore(t, tmpDir)

ingester, err := New(ingesterConfig, s, limits, prometheus.NewPedanticRegistry())
ingester, err := New(ingesterConfig, s, limits, prometheus.NewPedanticRegistry(), false)
require.NoError(t, err, "unexpected error creating ingester")
ingester.replayJitter = false

Expand Down
1 change: 1 addition & 0 deletions pkg/ingest/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Encode(partitionID int32, tenantID string, req *tempopb.PushBytesRequest, m
var records []*kgo.Record
batch := encoderPoolGet()
defer encoderPoolPut(batch)

currentSize := 0

for i, entry := range req.Traces {
Expand Down

0 comments on commit 2b4e1fb

Please sign in to comment.