The Developer's Guide to Custom Pipeline Functions in Python

By Dherik Barison
May 2025 | 20 min. read
Table of Contents
When it comes to data integration and transformation, flexibility matters. At Datastreamer, we understand that every organization has unique data processing needs. These needs are best addressed with tools that are both adaptable and powerful.
To support this, we created the Custom Function component—an essential part of our data pipelines. This feature empowers developers to write their own Python code to manage specific data transformation tasks, making workflows more tailored and efficient.
What Are Custom Functions?
In short, Custom Functions let you run Python code within your data pipeline. As a result, you gain complete control over how your documents are processed.
For example, you might need to:
- Filter out specific records
Convert or standardize data formats
Add new information to documents
Run advanced calculations
Custom Functions allow you to do all of the above, and more. Ultimately, they give you the flexibility to meet highly specific transformation requirements without depending on out-of-the-box solutions alone.
Why Use Custom Functions?
Although Datastreamer provides many pre-built components for standard data transformation tasks, there are still many situations where a custom approach is essential.
For instance, you might need to:
Clean data using logic that’s unique to your business
Filter documents based on highly specific or nested conditions
Transform formats in ways that go beyond default mappings
Enrich data through advanced algorithms tailored to your needs
In these cases and more, Custom Functions allow you to go beyond the basics and build exactly what your pipeline requires.
Getting Started with Custom Functions
Understanding the Function Structure
To begin, every Custom Function in Datastreamer follows a simple and consistent structure. This makes it easy to implement and reuse within your pipelines.
def process_batch(documents: list) -> list:
# Your custom logic here
return filtered_documents
In this example, the function receives a list of documents—each one represented as a Python dictionary—and returns a list of processed documents. This format ensures consistency while giving you the flexibility to define custom transformation logic.
Alternatively, if you’re working with individual documents rather than batches, you can define a function like this:
def process_document(document):
# Your custom logic here
return changed_document
Both approaches are fully supported within the Datastreamer environment, allowing you to choose what works best for your use case.
Example: Calculating Test Readability Scores
Now, let’s walk through a practical example. Although Datastreamer includes components for sentiment analysis and natural language processing (NLP), it doesn’t yet offer a built-in tool for calculating text readability scores (such as Flesch-Kincaid).
This is an ideal opportunity to use a Custom Function.
import re
def calculate_flesch_reading_ease(text):
"""Calculate Flesch Reading Ease score for English text"""
# Handle empty text
if not text:
return 0
# Count sentences more accurately
# Look for sentence-ending punctuation followed by space or end of string
sentences = len(re.findall(r'[.!?][\s$]', text + ' '))
if sentences == 0: # Ensure at least one sentence if text exists
sentences = 1
# Count words (split by whitespace, filter empty strings)
words = [word for word in re.split(r'\s+', text) if word]
word_count = len(words)
if word_count == 0:
return 0
# Better syllable counting
syllable_count = 0
for word in words:
word = word.lower()
# Remove non-alphabetic characters
word = re.sub(r'[^a-z]', '', word)
if not word:
continue
# Count syllables with improved method
syllable_count += count_syllables(word)
# Calculate score
score = 206.835 - (1.015 * (word_count / sentences)) - (84.6 * (syllable_count / word_count))
return round(score, 2)
def count_syllables(word):
"""Count syllables in a word with basic English syllable rules"""
# Special case for empty strings
if not word:
return 0
# Count vowel groups
vowels = "aeiouy"
count = 0
prev_is_vowel = False
for char in word:
is_vowel = char in vowels
# Count vowel groups (not individual vowels)
if is_vowel and not prev_is_vowel:
count += 1
prev_is_vowel = is_vowel
# Adjust for common patterns
# Silent 'e' at end of word
if word.endswith('e') and len(word) > 2 and word[-2] not in vowels:
count -= 1
# Words ending with 'le' usually add a syllable
if len(word) > 2 and word.endswith('le') and word[-3] not in vowels:
count += 1
# Ensure at least one syllable for any word
return max(1, count)
def process_document(document):
if 'content' in document and isinstance(document['content'], str):
text = document['content']
document['readability_score'] = calculate_flesch_reading_ease(text)
# Add interpretation
score = document['readability_score']
if score >= 90:
document['readability_level'] = "Very Easy"
elif score >= 80:
document['readability_level'] = "Easy"
elif score >= 70:
document['readability_level'] = "Fairly Easy"
elif score >= 60:
document['readability_level'] = "Standard"
elif score >= 50:
document['readability_level'] = "Fairly Difficult"
elif score >= 30:
document['readability_level'] = "Difficult"
else:
document['readability_level'] = "Very Difficult"
return [document]
You could use it to:
Analyze customer support responses to ensure appropriate language complexity
Evaluate content marketing materials based on audience reading level
Monitor social media posts for accessibility and clarity
By creating a Custom Function, you gain direct control over how these metrics are calculated and applied—without relying on external tooling.
Testing Your Custom Functions
Before deploying your Custom Function to a production pipeline, it’s important to test it thoroughly. Fortunately, one of the most powerful features of the Custom Function component is its built-in testing capability.
To get started, you can:
Write your Python function directly in the code editor
Provide sample documents in JSON format
Run the test to preview exactly how your function processes the data
You’ll see a clear interface showing your code, sample inputs, and the resulting output.

Here’s what the test results reveal:
Successfully processed documents
Failed documents: along with detailed error messages
Console output: which you can use for debugging
Execution time: helping you understand performance

This built-in functionality removes the guesswork from development. In turn, it ensures your function performs as intended—before it’s ever used in a live pipeline.
Best Practices for Custom Functions
Performance Optimization
When working with Custom Functions that process large volumes of data, performance becomes a top priority. To help your function run efficiently, keep the following best practices in mind:
Use efficient data structures that suit the task at hand
Avoid unnecessary iterations through your list of documents
Monitor memory usage when handling large or nested data
Leverage Python’s built-in functions whenever possible, as they are typically more optimized
These strategies can significantly reduce processing time and resource consumption.
However, performance isn’t just about code structure. If your function isn’t optimized, it can directly affect the speed and efficiency of your entire pipeline.
That’s why we recommend the following: since Datastreamer supports native Python, our engineering team encourages you to run your own load tests locally. This is especially important if you’re working with large datasets—testing ahead of deployment can help prevent costly bottlenecks later on.
Error Handling
Robust error handling is essential in any data pipeline. Without it, a single failure can cause the entire batch of documents to stop processing.
Here’s an example of how to handle errors gracefully:
def process_batch(documents: list) -> list:
processed_docs = []
for doc in documents:
try:
# Process the document
result = transform_document(doc)
processed_docs.append(result)
except Exception as e:
# Log the error but continue processing other documents
print(f"Error processing document {doc.get('id', 'unknown')}: {str(e)}")
return processed_docs
This pattern allows your pipeline to continue running even if one or more documents fail.
Additionally, it’s considered best practice to use safe access methods like .get
when retrieving values from dictionaries. In the example above, .get
ensures the code still runs smoothly by providing a default fallback value.
By implementing error handling early, you reduce the risk of critical pipeline failures and make debugging much easier down the line.
Documentation
Proper documentation is essential for every Custom Function. It helps define what the function does, what inputs it expects, what outcomes it should produce, and any limitations to be aware of.
Here are a few key benefits of documenting your functions:
Clarity for Others (and Future You): Clear documentation makes it easier for others to understand how your function works and what to expect. Even if you’re the original author, you might forget the details over time.
Easier Maintenance: Well-documented functions are easier to update, debug, and extend later on.
Collaboration: If you’re working on a team, good documentation ensures alignment and helps prevent misunderstandings.
Testing and Validation: Including examples and expected outputs in your docstrings can guide future testing and help verify that the function behaves correctly.
Additionally, we strongly recommend backing up your Python functions. If you delete a component—or the entire pipeline—Datastreamer cannot recover the original function on your behalf.
Advanced Use Cases
Custom Functions aren’t just for basic tasks—they also enable advanced capabilities like external data enrichment, document normalization, and schema-aware processing. Below are some common use cases where Custom Functions truly shine.
Enriching Documents with External Data
You can use Custom Functions to enrich your documents by connecting to third-party APIs or databases.
For example, if your document includes an address, you can call an external geocoding service and append the corresponding coordinates:
import requests
def process_batch(documents: list) -> list:
for doc in documents:
if 'address' in doc:
# Call a geocoding service
response = requests.get(f"")
location_data = response.json()
# Add the geo coordinates to the document
doc['latitude'] = location_data['lat']
doc['longitude'] = location_data['lng']
return documents
This approach makes your documents more actionable by embedding relevant metadata directly into them.
Document Transformation and Normalization
In real-world scenarios, data often arrives from multiple sources with inconsistent formats. Custom Functions make it easy to normalize this data.
def process_batch(documents: list) -> list:
normalized = []
for doc in documents:
# Create a standardized structure
normalized_doc = {
'id': doc.get('id') or doc.get('ID') or doc.get('_id'),
'timestamp': standardize_date_format(doc.get('date') or doc.get('timestamp')),
'content': doc.get('message') or doc.get('text') or doc.get('content'),
'source': identify_source(doc)
}
normalized.append(normalized_doc)
return normalized
By creating a unified data structure, you simplify downstream processing and analysis.
Common Challenges and Solutions
Even with powerful tools, real-world data presents challenges. Here are two common ones—and how Custom Functions can help.
Handling Large Documents
When dealing with large payloads, it’s important to process only what you need. This example shows how to extract key information and optionally discard large text fields to reduce memory usage:
def process_batch(documents: list) -> list:
for doc in documents:
if 'large_text_field' in doc and len(doc['large_text_field']) > 10000:
# Extract just what you need instead of processing everything
doc['extracted_info'] = extract_key_information(doc['large_text_field'])
# Optionally remove the large field if no longer needed
del doc['large_text_field']
return documents
Working with Different Document Schemas
If your pipeline ingests data from multiple platforms—such as Twitter, Instagram, or Facebook—you can route documents to the appropriate handler based on their schema:
def process_batch(documents: list) -> list:
processed = []
for doc in documents:
# Determine document type and process accordingly
if 'twitter_id' in doc:
processed.append(process_twitter_data(doc))
elif 'instagram_post_id' in doc:
processed.append(process_instagram_data(doc))
elif 'facebook_post_id' in doc:
processed.append(process_facebook_data(doc))
else:
# Handle unknown document types
processed.append(process_generic_data(doc))
return processed
👉 Tip: For more complex use cases, we recommend using our Unify Transformer component before applying a Custom Function. This ensures your documents follow a standard structure.
Wrapping Up
Custom Functions strike the perfect balance between the control of custom code and the reliability of a managed pipeline platform. By embedding Python directly into your Datastreamer pipelines, you unlock the flexibility to meet your specific data transformation needs—without sacrificing performance or scalability.
Whether you’re enriching documents, handling large data volumes, or normalizing formats across multiple sources, Custom Functions empower you to build exactly what your pipeline requires.
Ready to get started?
Explore our complete documentation and start building smarter, more flexible pipelines today.