Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need to restart Cassandra node if drop + create table with different primary key fields #77

Open
nicoloboschi opened this issue Jul 8, 2022 · 1 comment

Comments

@nicoloboschi
Copy link
Contributor

nicoloboschi commented Jul 8, 2022

Let's say you create a table table1 in this way (key text, value text, primary key (text)) with cdc=true. Then the agent will cache the schema for the primary keys in memory. If the user drop the table and recreate in with the same name and in the same keyspace but with different primary keys fields, you'll get an error like:

2022-07-08 08:25:13,561 CommitLogReadHandlerImpl.java:468 - Invalid primary key schema:
com.datastax.oss.cdc.agent.exceptions.CassandraConnectorSchemaException: Not a valid schema field: key
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.buildAvroKey(AbstractPulsarMutationSender.java:237)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.sendMutationAsync(AbstractPulsarMutationSender.java:256)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.sendAsync(CommitLogReadHandlerImpl.java:461)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:32)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:24)
        at com.datastax.oss.cdc.agent.AbstractMutationMaker.insert(AbstractMutationMaker.java:28)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleRowModifications(CommitLogReadHandlerImpl.java:344)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.process(CommitLogReadHandlerImpl.java:303)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleMutation(CommitLogReadHandlerImpl.java:238)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readMutation(CommitLogReader.java:477)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readSection(CommitLogReader.java:396)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:243)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:146)
        at com.datastax.oss.cdc.agent.CommitLogReaderServiceImpl$1.run(CommitLogReaderServiceImpl.java:72)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Unknown Source) 

After that the agent is not able to go forward and you have to restart the agent (therefore the node) to make it work again.
Note that other tables still continue to be sent to Pulsar

@nicoloboschi
Copy link
Contributor Author

Even after the node restarts, the dirty topic schema should be updated otherwise you'll get

ERROR [CdcCommitlogProcessor:3] 2022-07-08 08:35:54,658 AbstractPulsarMutationSender.java:221 - Failed to get a pulsar producer
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "key",
    "type" : "string"
  } ]
}
using schema:
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "a",
    "type" : "string"
  } ]
} caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "key",
    "type" : "string"
  } ]
}
using schema:
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "a",
    "type" : "string"
  } ]
}
        at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:967)
        at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:95)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.lambda$getProducer$1(AbstractPulsarMutationSender.java:219)
        at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.getProducer(AbstractPulsarMutationSender.java:187)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.sendMutationAsync(AbstractPulsarMutationSender.java:251)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.sendAsync(CommitLogReadHandlerImpl.java:461)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:32)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:24)
        at com.datastax.oss.cdc.agent.AbstractMutationMaker.delete(AbstractMutationMaker.java:40)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handlePartitionDeletion(CommitLogReadHandlerImpl.java:322)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.process(CommitLogReadHandlerImpl.java:280)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleMutation(CommitLogReadHandlerImpl.java:238)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readMutation(CommitLogReader.java:477)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readSection(CommitLogReader.java:396)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:243)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:146)
        at com.datastax.oss.cdc.agent.CommitLogReaderServiceImpl$1.run(CommitLogReaderServiceImpl.java:72)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Unknown Source)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant