Streaming
Streaming releases
Bloblang Syntax Highlighting Test
Test page for Bloblang syntax highlighting and interactive features.
| Hover over functions and methods to see documentation. Click "Try It" to execute mappings! |
Pure Bloblang Code Block
root = this
root.id = uuid_v4()
root.timestamp = now()
# Variables
let foo = "bar"
root.message = $foo
# Methods and functions
root.uppercased = this.name.uppercase()
root.replaced = this.text.replace_all("old", "new")
# Boolean and arithmetic
root.is_valid = this.count > 10 && this.status == "active"
root.total = this.price * this.quantity
# Arrays and objects
root.items = this.data.map_each(item -> {
"name": item.name.uppercase(),
"value": item.value * 2
})
# Deletions
root.sensitive = deleted()
# In: {"name": "alice", "text": "hello old world", "count": 15, "status": "active", "price": 10.5, "quantity": 3, "data": [{"name": "item1", "value": 5}, {"name": "item2", "value": 10}], "sensitive": "secret123"}
Bloblang with Alias (blobl)
root.transformed = this.input.uppercase().trim()
root.uuid = uuid_v4()
# In: {"input": " hello world "}
Bloblang in YAML Pipeline
input:
kafka:
addresses: ["localhost:9092"]
topics: ["my-topic"]
pipeline:
processors:
- mapping: |
# Transform the message
root.id = uuid_v4()
root.timestamp = now()
root.data = this.payload.parse_json()
# Uppercase all text fields
root.title = this.payload.title.uppercase()
root.body = this.payload.body
# Add metadata
meta processed_at = now()
meta source = @kafka_topic
output:
http_client:
url: "https://api.example.com/data"
verb: POST
Complex Bloblang Example
root = this
root.id = uuid_v4()
# Conditional transformations
root.status = if this.score > 80 {
"excellent"
} else if this.score > 60 {
"good"
} else {
"needs_improvement"
}
# Array operations
root.filtered_items = this.items.filter(item -> item.active == true)
root.unique_tags = this.tags.unique()
# String operations
root.email = this.email.lowercase().trim()
root.name = this.first_name + " " + this.last_name
# Error handling
root.safe_value = this.risky_field.catch("default_value")
# Delete sensitive fields
root.password = deleted()
# In: {"score": 85, "items": [{"name": "a", "active": true}, {"name": "b", "active": false}, {"name": "c", "active": true}], "tags": ["red", "blue", "red", "green"], "email": " JOHN@EXAMPLE.COM ", "first_name": "John", "last_name": "Doe", "password": "secret"}
Testing All Token Types
# Simple example showing basic transformations
root.message = this.text.uppercase()
root.length = this.text.length()
root.sorted = this.items.sort()
root.unique_nums = this.numbers.unique()
root.id = uuid_v4()
# In: {"text": "hello world", "items": ["c", "a", "b"], "numbers": [1, 2, 2, 3, 3, 3]}
Multi-line Input Format
# This example shows the multi-line # In: format
root.fullName = this.first + " " + this.last
root.age = this.age + 1
# In:
# {
# "first": "Jane",
# "last": "Smith",
# "age": 29
# }
YAML Pipeline with Switch and Check
input:
kafka:
addresses: ["localhost:9092"]
topics: ["events"]
pipeline:
processors:
- switch:
- check: this.type == "order"
processors:
- mapping: |
root = this
root.processed_at = now()
root.order_id = uuid_v4()
- check: this.type == "user"
processors:
- mapping: |
root.user = this
root.user.email = this.email.lowercase()
output:
stdout: {}
YAML with Branch Processor
pipeline:
processors:
- branch:
request_map: |
root.url = "https://api.example.com/enrich"
root.body = this.id
processors:
- http:
url: "${! this.url }"
verb: POST
result_map: |
root.enriched = this
root.enriched.fetched_at = now()
YAML with Interpolation Functions
input:
kafka:
addresses: ["localhost:9092"]
topics: ["events"]
consumer_group: "${! env(\"CONSUMER_GROUP\") }"
pipeline:
processors:
- cache:
resource: my_cache
key: "${! meta(\"kafka_key\") }"
value: "${! content().hash(\"xxhash64\") }"
- dedupe:
key: "${! this.id }-${! meta(\"kafka_partition\") }"
output:
http_client:
url: "${! env(\"TARGET_URL\") }/ingest"
MCP Tool: Weather Service
Real-world example from Redpanda Connect MCP tools.
label: weather-service
processors:
- label: validate_inputs
mutation: |
# Validate and sanitize city input
meta city = this.city.string().
re_replace_all("[^a-zA-Z\\s\\-]", "").
trim()
# Check for empty input
root = if @city == "" {
throw("City name cannot be empty")
} else { "" }
- label: fetch_weather_data
try:
- http:
url: "https://wttr.in/${! @city }?format=j1"
verb: GET
headers:
User-Agent: "redpanda-connect-mcp/1.0"
timeout: "10s"
- mutation: |
root = {
"city": @city,
"temperature_c": this.current_condition.0.temp_C.number(),
"temperature_f": this.current_condition.0.temp_F.number(),
"feels_like_c": this.current_condition.0.FeelsLikeC.number(),
"humidity": this.current_condition.0.humidity.number(),
"description": this.current_condition.0.weatherDesc.0.value,
"wind_speed_kmh": this.current_condition.0.windspeedKmph.number(),
"timestamp": now().format_timestamp("2006-01-02T15:04:05Z07:00")
}
- log:
message: "Weather data fetched for city: ${! @city }"
level: "INFO"
- label: handle_weather_errors
catch:
- mutation: |
root = {
"error": "Failed to fetch weather data",
"city": @city,
"details": error(),
"timestamp": now().format_timestamp("2006-01-02T15:04:05Z07:00")
}
- log:
message: "Weather API error for city ${! @city }: ${! error() }"
level: "ERROR"
meta:
tags: [ weather, example ]
mcp:
enabled: true
description: "Get current weather conditions for any city worldwide"
MCP Tool: Conditional Processing
Switch-based conditional processing with different data formats.
label: parse-data-by-type
processors:
- label: conditional_processing
switch:
- check: this.data_type == "json"
processors:
- mapping: |
root.parsed_data = this.content.parse_json()
root.format = "json"
- check: this.data_type == "csv"
processors:
- mapping: |
root.parsed_data = this.content.parse_csv()
root.format = "csv"
- processors:
- mapping: |
root.error = "Unsupported data type"
root.supported_types = ["json", "csv"]
meta:
mcp:
enabled: true
description: "Parse data based on specified type"
Pipeline: Error Handling with Try/Catch
input:
generate:
count: 1
interval: ""
mapping: |
root = {"request": "data"}
pipeline:
processors:
- try:
- http:
url: "https://api.example.com/data"
verb: GET
- catch:
- mutation: |
root.error = true
root.message = "Request failed: " + error()
output:
stdout: {}
Regular YAML (Kubernetes - Should NOT Be Processed)
This Kubernetes manifest should remain as normal YAML without Bloblang highlighting.
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-app
labels:
app: my-app
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-app
image: my-registry/my-app:latest
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
value: "postgres://localhost:5432/mydb"
Was this helpful?