We will borrow the Kafka task that is already defined under the class AdvancedKafkaPlanRun
or AdvancedKafkaJavaPlanRun. You can go through the Kafka guide here for more details.
Schema
Let us set up the corresponding schema for the CSV file where we want to match the values that are generated for the
Kafka messages.
This is a simple schema where we want to use the values and metadata that is already defined in the kafkaTask to
determine what the data will look like for the CSV file. Even if we defined some metadata here, it would be overridden
when we define our foreign key relationships.
Foreign Keys
From the above CSV schema, we see note the following against the Kafka schema:
account_number in CSV needs to match with the account_id in Kafka
We see that account_id is referred to in the key column as field.name("key").sql("content.account_id")
year needs to match with content.year in Kafka, which is a nested field
We can only do foreign key relationships with top level fields, not nested fields. So we define a new column
called tmp_year which will not appear in the final output for the Kafka messages but is used as an intermediate
step field.name("tmp_year").sql("content.year").omit(true)
name needs to match with content.details.name in Kafka, also a nested field
Using the same logic as above, we define a temporary column called tmp_name which will take the value of the
nested field but will be omitted field.name("tmp_name").sql("content.details.name").omit(true)
payload represents the whole JSON message sent to Kafka, which matches to value column
Our foreign keys are therefore defined like below. Order is important when defining the list of columns. The index needs
to match with the corresponding column in the other data source.
cd..
./run.sh
#input class MyAdvancedBatchEventJavaPlanRun or MyAdvancedBatchEventPlanRun#after completing
dockerexecdocker-kafkaserver-1kafka-console-consumer--bootstrap-serverlocalhost:9092--topicaccount-topic--from-beginning
Great! The account, year, name and payload look to all match up.
Additional Topics
Order of execution
You may notice that the events are generated first, then the CSV file. This is because as part of the execute
function, we passed in the kafkaTask first, before the csvTask. You can change the order of execution by
passing in csvTask before kafkaTask into the execute function.