Tutorial: Kafka to S3 Pipeline¶
In this tutorial, you'll build a production-ready data pipeline that reads events from Kafka, transforms them, and writes them to S3 in Parquet format.
Prerequisites: Complete the Quickstart tutorial.
Time: ~30 minutes
What You'll Learn¶
- Configure Kafka inputs with consumer groups
- Set up S3 outputs with partitioning
- Add data transformation logic
- Configure lineage tracking
- Deploy to an environment
Step 1: Create the Package¶
Initialize a new Transform package:
Step 2: Define the Transform Manifest¶
Edit dk.yaml with your Transform configuration:
apiVersion: datakit.infoblox.dev/v1alpha1
kind: Transform
metadata:
name: kafka-to-s3-pipeline
namespace: tutorials
version: 1.0.0
labels:
team: data-engineering
domain: events
spec:
runtime: generic-python
mode: batch
image: myorg/kafka-to-s3-pipeline:v1.0.0
timeout: 1h
inputs:
- dataset: user-events
outputs:
- dataset: processed-events
env:
- name: LOG_LEVEL
value: info
resources:
cpu: "500m"
memory: "2Gi"
Step 3: Define DataSets and Stores¶
Create the input DataSet referencing a Kafka Store:
apiVersion: datakit.infoblox.dev/v1alpha1
kind: DataSet
metadata:
name: user-events
namespace: tutorials
spec:
store: local-kafka
topic: user-events
format: json
Create the output DataSet referencing an S3 Store:
apiVersion: datakit.infoblox.dev/v1alpha1
kind: DataSet
metadata:
name: processed-events
namespace: tutorials
spec:
store: local-s3
prefix: processed/events/
format: parquet
classification: internal
Create the Stores with connection details:
apiVersion: datakit.infoblox.dev/v1alpha1
kind: Store
metadata:
name: local-kafka
spec:
connector: kafka
connection:
bootstrap-servers: localhost:9092
consumer-group: kafka-to-s3-consumer
auto-offset-reset: earliest
apiVersion: datakit.infoblox.dev/v1alpha1
kind: Store
metadata:
name: local-s3
spec:
connector: s3
connection:
endpoint: http://localhost:9000
region: us-east-1
secrets:
accessKeyId: ${AWS_ACCESS_KEY_ID}
secretAccessKey: ${AWS_SECRET_ACCESS_KEY}
Step 4: Write the Pipeline Code¶
Create the transformation logic in src/main.py:
#!/usr/bin/env python3
"""
Kafka to S3 Pipeline
Reads user events from Kafka, transforms them, and writes to S3.
"""
import json
import os
from datetime import datetime
from typing import Iterator
import pyarrow as pa
import pyarrow.parquet as pq
from kafka import KafkaConsumer
import boto3
def get_kafka_consumer() -> KafkaConsumer:
"""Create Kafka consumer from environment."""
return KafkaConsumer(
os.environ["INPUT_TOPIC"],
bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
group_id=os.environ.get("CONSUMER_GROUP", "kafka-to-s3-consumer"),
auto_offset_reset="earliest",
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
consumer_timeout_ms=30000, # 30 second timeout
)
def get_s3_client():
"""Create S3 client from environment."""
return boto3.client(
"s3",
endpoint_url=os.environ.get("S3_ENDPOINT"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "minioadmin"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", "minioadmin"),
)
def transform_event(event: dict) -> dict:
"""Transform a single event."""
return {
"event_id": event.get("id"),
"user_id": event.get("user_id"),
"event_type": event.get("type"),
"timestamp": event.get("timestamp"),
"processed_at": datetime.utcnow().isoformat(),
"properties": json.dumps(event.get("properties", {})),
}
def batch_events(consumer: KafkaConsumer, batch_size: int) -> Iterator[list]:
"""Yield batches of events from Kafka."""
batch = []
for message in consumer:
batch.append(transform_event(message.value))
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def write_to_s3(s3_client, bucket: str, prefix: str, events: list):
"""Write events to S3 as Parquet."""
if not events:
return
# Create Arrow table
table = pa.Table.from_pylist(events)
# Generate partition path
date = datetime.utcnow().strftime("%Y-%m-%d")
timestamp = datetime.utcnow().strftime("%H%M%S")
key = f"{prefix}date={date}/events_{timestamp}.parquet"
# Write to S3
with pa.BufferOutputStream() as buf:
pq.write_table(table, buf, compression="snappy")
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=buf.getvalue().to_pybytes(),
)
print(f"Wrote {len(events)} events to s3://{bucket}/{key}")
def main():
"""Main pipeline entry point."""
print("Starting Kafka to S3 pipeline...")
# Configuration
batch_size = int(os.environ.get("BATCH_SIZE", "1000"))
bucket = os.environ["OUTPUT_BUCKET"]
prefix = os.environ.get("OUTPUT_PREFIX", "processed/events/")
# Initialize clients
consumer = get_kafka_consumer()
s3_client = get_s3_client()
# Process batches
total_events = 0
for batch in batch_events(consumer, batch_size):
write_to_s3(s3_client, bucket, prefix, batch)
total_events += len(batch)
print(f"Pipeline complete. Processed {total_events} events.")
if __name__ == "__main__":
main()
Step 5: Add Dependencies¶
Create requirements.txt for Python dependencies:
Step 6: Start Local Development¶
Start the local development stack:
Verify all services are running:
Step 7: Produce Test Data¶
Create a test data producer script:
#!/usr/bin/env python3
"""Generate test events for the pipeline."""
import json
import time
import uuid
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
)
for i in range(100):
event = {
"id": str(uuid.uuid4()),
"user_id": f"user_{i % 10}",
"type": "page_view",
"timestamp": time.time(),
"properties": {
"page": f"/page/{i}",
"referrer": "https://example.com",
},
}
producer.send("user-events", event)
print(f"Sent event {i + 1}")
producer.flush()
print("Done!")
Run it:
Step 8: Validate and Run¶
Validate your package:
Run the pipeline:
Step 9: Check Results¶
View Lineage¶
Open the Marquez UI at http://localhost:3000 to view the lineage graph.
Check S3 Output¶
Or open the MinIO console at http://localhost:9001.
View in Marquez¶
Open http://localhost:5000 to see the lineage graph.
Step 10: Build and Publish¶
When ready for deployment:
Step 11: Promote to Environment¶
Deploy to the development environment:
Summary¶
You've built a complete Kafka to S3 pipeline with:
- Kafka consumer with consumer groups
- Data transformation logic
- S3 output with Parquet format
- Date-based partitioning
- Automatic lineage tracking
- Environment promotion
Next Steps¶
- Local Development - Deep dive into the dev stack
- Promoting Packages - Advanced promotion workflows
- Troubleshooting - Common issues