Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

KafkaStreams doesn't accept my serde in StreamsConfig

I am trying to merge data by id (Id is a String in the objects) in 2 topics and creating another topic to write the merged data. In my topology, I do not have any Properties configurations since I am not running it on remote yet. Just trying to test it in a unit test with Mockito and JUnit5.

@BeforeEach
void setUp() {
    testDriver = new TopologyTestDriver(myTopology.buildTopology());

    Serde<MyObject1> myObject1Serde = new JsonSerde<>(MyObject1.class);
    Serde<MyObject2> myObject2Serde = new JsonSerde<>(MyObject2.class);
    Serde<MyObject3> myObject3Serde = new JsonSerde<>(MyObject3.class);

    // It goes like creating input topics and serializing, creating output topic and deserializing it like that:
    myObject1Topic = testDriver.createInputTopic(
        MyTopology.FIRST_INPUT_TOPIC,
        Serdes.String().serializer(),
        myObject1Serde.serializer()
    );

    // ... same for the other 2 objects
}

When I run that, it gives me this error:

16:00:02.424 [main] WARN org.apache.kafka.streams.processor.internals.StateDirectory -- Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [/var/folders/dn/f006xw_d4_7f9p5bbywpdxyh0000gp/T//kafka-streams]

org.apache.kafka.common.config.ConfigException: Please specify a value serde or set one through StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:1769)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.valueSerde(AbstractProcessorContext.java:100)
    at org.apache.kafka.streams.processor.internals.SerdeGetter.valueSerde(SerdeGetter.java:51)
    at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde(WrappingNullableUtils.java:94)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.prepareValueSerde(MeteredWindowStore.java:145)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.initStoreSerde(MeteredWindowStore.java:171)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:219)
    at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:99)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:227)
    at org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:525)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:372)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:299)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:275)
    at com.overspentblocker.overspentblocker.topology.OverspentTopologyTest.setUp(MyTopologyTest.java:46)

I assume that it is related to my topology and I am trying to understand Serdes, thanks for your help in advance.

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

I was trying to test my topology and I was expecting to merge data and assert them if it is working good.

>Solution :

I may assume, you just haven’t specified the default value Serde in the StreamsConfig. While you have defined the Serdes for your specific classes MyObject1, MyObject2, and MyObject3, Kafka Streams also require a default Serde to be set. And even if you’re not running anything on remote, you’d need to have local properties anyway, something like this might help:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

PS
I noticed you’ve named both myObject2Serde and myObject3Serde as myObject2Serde, might be a typo

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading