Foreign Keys
Foreign keys can be defined to represent the relationships between datasets where values are required to match for particular columns.
Single column
Define a column in one data source to match against another column.
Below example shows a postgres
data source with two tables, accounts
and transactions
that have a foreign key
for account_id
.
var postgresAcc = postgres("my_postgres", "jdbc:...")
.table("public.accounts")
.schema(
field().name("account_id"),
field().name("name"),
...
);
var postgresTxn = postgres(postgresAcc)
.table("public.transactions")
.schema(
field().name("account_id"),
field().name("full_name"),
...
);
plan().addForeignKeyRelationship(
postgresAcc, "account_id",
List.of(Map.entry(postgresTxn, "account_id"))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
.table("public.accounts")
.schema(
field.name("account_id"),
field.name("name"),
...
)
val postgresTxn = postgres(postgresAcc)
.table("public.transactions")
.schema(
field.name("account_id"),
field.name("full_name"),
...
)
plan.addForeignKeyRelationship(
postgresAcc, "account_id",
List(postgresTxn -> "account_id")
)
---
name: "postgres_data"
steps:
- name: "accounts"
type: "postgres"
options:
dbtable: "account.accounts"
schema:
fields:
- name: "account_id"
- name: "name"
- name: "transactions"
type: "postgres"
options:
dbtable: "account.transactions"
schema:
fields:
- name: "account_id"
- name: "full_name"
---
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
- name: "postgres_data"
dataSourceName: "my_postgres"
sinkOptions:
foreignKeys:
"postgres.accounts.account_id":
- "postgres.transactions.account_id"
Multiple columns
You may have a scenario where multiple columns need to be aligned. From the same example, we want account_id
and name
from accounts
to match with account_id
and full_name
to match in transactions
respectively.
var postgresAcc = postgres("my_postgres", "jdbc:...")
.table("public.accounts")
.schema(
field().name("account_id"),
field().name("name"),
...
);
var postgresTxn = postgres(postgresAcc)
.table("public.transactions")
.schema(
field().name("account_id"),
field().name("full_name"),
...
);
plan().addForeignKeyRelationship(
postgresAcc, List.of("account_id", "name"),
List.of(Map.entry(postgresTxn, List.of("account_id", "full_name")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
.table("public.accounts")
.schema(
field.name("account_id"),
field.name("name"),
...
)
val postgresTxn = postgres(postgresAcc)
.table("public.transactions")
.schema(
field.name("account_id"),
field.name("full_name"),
...
)
plan.addForeignKeyRelationship(
postgresAcc, List("account_id", "name"),
List(postgresTxn -> List("account_id", "full_name"))
)
---
name: "postgres_data"
steps:
- name: "accounts"
type: "postgres"
options:
dbtable: "account.accounts"
schema:
fields:
- name: "account_id"
- name: "name"
- name: "transactions"
type: "postgres"
options:
dbtable: "account.transactions"
schema:
fields:
- name: "account_id"
- name: "full_name"
---
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
- name: "postgres_data"
dataSourceName: "my_postgres"
sinkOptions:
foreignKeys:
"my_postgres.accounts.account_id,name":
- "my_postgres.transactions.account_id,full_name"
Nested column
Your schema structure can have nested fields which can also be referenced as foreign keys. But to do so, you need to create a proxy field that gets omitted from the final saved data.
In the example below, the nested customer_details.name
field inside the json
task needs to match with name
from postgres
. A new field in the json
called _txn_name
is used as a temporary column to facilitate the foreign
key definition.
var postgresAcc = postgres("my_postgres", "jdbc:...")
.table("public.accounts")
.schema(
field().name("account_id"),
field().name("name"),
...
);
var jsonTask = json("my_json", "/tmp/json")
.schema(
field().name("account_id"),
field().name("customer_details")
.schema(
field().name("name").sql("_txn_name"), #nested field will get value from '_txn_name'
...
),
field().name("_txn_name").omit(true) #value will not be included in output
);
plan().addForeignKeyRelationship(
postgresAcc, List.of("account_id", "name"),
List.of(Map.entry(jsonTask, List.of("account_id", "_txn_name")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
.table("public.accounts")
.schema(
field.name("account_id"),
field.name("name"),
...
)
var jsonTask = json("my_json", "/tmp/json")
.schema(
field.name("account_id"),
field.name("customer_details")
.schema(
field.name("name").sql("_txn_name"), #nested field will get value from '_txn_name'
...
),
field.name("_txn_name").omit(true) #value will not be included in output
)
plan.addForeignKeyRelationship(
postgresAcc, List("account_id", "name"),
List(jsonTask -> List("account_id", "_txn_name"))
)
---
#postgres task yaml
name: "postgres_data"
steps:
- name: "accounts"
type: "postgres"
options:
dbtable: "account.accounts"
schema:
fields:
- name: "account_id"
- name: "name"
---
#json task yaml
name: "json_data"
steps:
- name: "transactions"
type: "json"
options:
dbtable: "account.transactions"
schema:
fields:
- name: "account_id"
- name: "_txn_name"
generator:
options:
omit: true
- name: "cusotmer_details"
schema:
fields:
name: "name"
generator:
type: "sql"
options:
sql: "_txn_name"
---
#plan yaml
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
- name: "postgres_data"
dataSourceName: "my_postgres"
- name: "json_data"
dataSourceName: "my_json"
sinkOptions:
foreignKeys:
"my_postgres.accounts.account_id,name":
- "my_json.transactions.account_id,_txn_name"