Mapping in Airflow
This page focuses only on the overlap between Airflow and mapping authoring.
The main pattern is simple:
- an Airflow task decides run-specific values
- those values are passed into
TranslationContext - the mapping reads them
- the mapping writes them into the output payload or
ta_info - the task reads
TranslationResultfor downstream decisions
The boundary
In an Airflow-based flow:
- Airflow should decide which file to process, which partners are involved, and what run-specific values apply
- the mapping should turn parsed input plus context values into output records
- the task should handle retries, branching, alerts, and persistence
That means a map is usually not the place to:
- open storage clients
- call external APIs
- fetch Airflow Variables directly
- decide task retries
Those belong in the DAG or task code. The mapping should receive the values it needs explicitly.
Basic Airflow pattern
An Airflow task usually creates TranslationContext from upstream task results
or DAG parameters:
from bots_airflow import TranslationContext, init
translator = init(
grammar_in=grammar_in,
grammar_out=grammar_out,
map=OrdersToCsv,
)
result = translator.translate_text(
input_text,
context=TranslationContext(
frompartner="WEBSTORE",
topartner="FULFILLMENT",
reference="batch-001",
metadata={
"dag_run_id": dag_run_id,
"source_key": source_key,
},
values={
"order_prefix": "WEB-",
},
),
)
The mapping then receives that context object automatically.
Example: context value used in output
This is the clearest Airflow-to-mapping example.
Airflow supplies a run-specific prefix:
context = TranslationContext(
values={"order_prefix": "WEB-"},
)
The mapping uses that value when building the outbound record:
from bots_airflow.context import TranslationContext
from bots_airflow.mapping import BaseMapping
class OrdersToCsv(BaseMapping):
def translate(self, inn, out, *, context: TranslationContext, **kwargs):
order_prefix = str(context.value("order_prefix", ""))
for item in inn.getloop({"BOTSID": "root"}):
out.putloop(
{
"BOTSID": "root",
"order_id": f"{order_prefix}{item.record.get('order_id', '')}",
"sku": item.record.get("sku", ""),
"quantity": str(item.record.get("quantity", "")),
}
)
If the inbound record contains order_id = 1001, the outbound row becomes:
WEB-1001,SKU-001,2
That is the core Airflow overlap: Airflow decides the prefix, the mapping places it into the translated output.
Example: fail fast on required Airflow inputs
If a DAG run must supply a value, use the required helpers instead of allowing a blank output to be generated.
Airflow:
context = TranslationContext(
values={
"customer_id": "900001",
"order_prefix": "WEB-",
},
partners={
"WEBSTORE": {
"warehouse_code": "0645",
},
},
frompartner="WEBSTORE",
)
Mapping:
class OrdersToCsv(BaseMapping):
def translate(self, inn, out, *, context: TranslationContext, **kwargs):
customer_id = context.required_value("customer_id")
order_prefix = context.required_value("order_prefix", allow_blank=False)
warehouse_code = context.required_partner_value("from", "warehouse_code")
for item in inn.getloop({"BOTSID": "root"}):
out.putloop(
{
"BOTSID": "root",
"customer_id": customer_id,
"warehouse_code": warehouse_code,
"order_id": f"{order_prefix}{item.record.get('order_id', '')}",
}
)
If Airflow forgets to provide one of those inputs, the mapping raises a clear
ValueError immediately instead of writing a partial or ambiguous output.
Example: multiple context values shaping X12 output
The same pattern works for outbound EDI segments.
Airflow supplies values:
context = TranslationContext(
reference="inv-20260318-01",
values={
"sender_name": "Pleasant Mattress",
"reference_prefix": "run-",
"as_of_date": "20260318",
},
)
The mapping writes those values into the outbound message:
class InventoryJsonTo846(BaseMapping):
def translate(self, inn, out, *, context: TranslationContext, **kwargs):
sender_name = context.value("sender_name", "Pleasant Mattress")
reference_prefix = context.value("reference_prefix", "ref-")
as_of_date = context.value("as_of_date", "20260318")
out.put({"BOTSID": "ST"}, {
"BOTSID": "BIA",
"BIA03": f"{reference_prefix}000001",
"BIA04": as_of_date,
})
n1loop = out.putloop({"BOTSID": "ST"}, {"BOTSID": "N1"})
n1loop.put({
"BOTSID": "N1",
"N101": "SU",
"N102": sender_name,
"N103": "92",
"N104": "ASSIGNED_ID",
})
Here the Airflow task controls:
- the BIA reference prefix
- the effective date
- the outbound sender name
without hard-coding those values inside the mapping.
Example: partner-scoped values from context
If Airflow already knows partner-specific fields, pass them in context.partners
and let the mapping read them.
Airflow:
context = TranslationContext(
frompartner="LIVINGSPC",
partners={
"LIVINGSPC": {
"customer_id": "900001",
"dc_code": "0645",
},
},
)
Mapping:
customer_id = context.partner_value("from", "customer_id", "")
dc_code = context.partner_value("from", "dc_code", "")
Those values can then be written into outbound CSV columns, JSON fields, or EDI segments.
Returning useful data to Airflow
The mapping does not only write the payload. It can also return structured data for the task to use downstream.
Example mapping return:
return {
"row_count": row_count,
"customer_id": customer_id,
}
Then the task can read:
result = translator.translate_text(input_text, context=context)
print(result.mapping_result["row_count"])
This is useful for:
- branching decisions
- audit logging
- XCom payloads
- downstream file naming
Using ta_info for run metadata
Mappings can also write values to out.ta_info:
out.ta_info["reference"] = context.reference
out.ta_info["botskey"] = customer_id
out.ta_info["divtext"] = "inventory_export"
Then Airflow can read them from the result:
result = translator.translate_text(input_text, context=context)
reference = result.ta_info.get("reference", "")
This is a good fit for:
- human-readable references
- identifiers you want in logs
- lightweight summary metadata
Practical guidance
Good Airflow inputs for mappings:
- prefixes and suffixes
- dates chosen upstream
- partner ids
- customer ids
- routing decisions
- object storage keys
- batch references
Less suitable mapping inputs:
- live database connections
- storage clients
- retry counters
- scheduler state
- alerting logic
If the value is decided per DAG run, it usually belongs in TranslationContext.