The Developer's Guide to Custom Pipeline Functions in Python

Dherik 2

By Dherik Barison

May 2025 | 20 min. read

Table of Contents

When it comes to data integration and transformation, flexibility matters. At Datastreamer, we understand that every organization has unique data processing needs. These needs are best addressed with tools that are both adaptable and powerful.

To support this, we created the Custom Function component—an essential part of our data pipelines. This feature empowers developers to write their own Python code to manage specific data transformation tasks, making workflows more tailored and efficient.

What Are Custom Functions?

In short, Custom Functions let you run Python code within your data pipeline. As a result, you gain complete control over how your documents are processed.

For example, you might need to:

  • Filter out specific records
  • Convert or standardize data formats

  • Add new information to documents

  • Run advanced calculations

Custom Functions allow you to do all of the above, and more. Ultimately, they give you the flexibility to meet highly specific transformation requirements without depending on out-of-the-box solutions alone.

Why Use Custom Functions?

Although Datastreamer provides many pre-built components for standard data transformation tasks, there are still many situations where a custom approach is essential.

For instance, you might need to:

  • Clean data using logic that’s unique to your business

  • Filter documents based on highly specific or nested conditions

  • Transform formats in ways that go beyond default mappings

  • Enrich data through advanced algorithms tailored to your needs

In these cases and more, Custom Functions allow you to go beyond the basics and build exactly what your pipeline requires.

Getting Started with Custom Functions

Understanding the Function Structure

To begin, every Custom Function in Datastreamer follows a simple and consistent structure. This makes it easy to implement and reuse within your pipelines.

				
					def process_batch(documents: list) -> list:
    # Your custom logic here
    return filtered_documents

				
			

In this example, the function receives a list of documents—each one represented as a Python dictionary—and returns a list of processed documents. This format ensures consistency while giving you the flexibility to define custom transformation logic.

Alternatively, if you’re working with individual documents rather than batches, you can define a function like this:

				
					def process_document(document):
    # Your custom logic here
    return changed_document

				
			

Both approaches are fully supported within the Datastreamer environment, allowing you to choose what works best for your use case.

Example: Calculating Test Readability Scores

Now, let’s walk through a practical example. Although Datastreamer includes components for sentiment analysis and natural language processing (NLP), it doesn’t yet offer a built-in tool for calculating text readability scores (such as Flesch-Kincaid).

This is an ideal opportunity to use a Custom Function.

				
					import re

def calculate_flesch_reading_ease(text):
    """Calculate Flesch Reading Ease score for English text"""
    # Handle empty text
    if not text:
        return 0
    
    # Count sentences more accurately
    # Look for sentence-ending punctuation followed by space or end of string
    sentences = len(re.findall(r'[.!?][\s$]', text + ' '))
    if sentences == 0:  # Ensure at least one sentence if text exists
        sentences = 1
    
    # Count words (split by whitespace, filter empty strings)
    words = [word for word in re.split(r'\s+', text) if word]
    word_count = len(words)
    
    if word_count == 0:
        return 0
    
    # Better syllable counting
    syllable_count = 0
    for word in words:
        word = word.lower()
        # Remove non-alphabetic characters
        word = re.sub(r'[^a-z]', '', word)
        if not word:
            continue
            
        # Count syllables with improved method
        syllable_count += count_syllables(word)
    
    # Calculate score
    score = 206.835 - (1.015 * (word_count / sentences)) - (84.6 * (syllable_count / word_count))
    return round(score, 2)

def count_syllables(word):
    """Count syllables in a word with basic English syllable rules"""
    # Special case for empty strings
    if not word:
        return 0
        
    # Count vowel groups
    vowels = "aeiouy"
    count = 0
    prev_is_vowel = False
    
    for char in word:
        is_vowel = char in vowels
        
        # Count vowel groups (not individual vowels)
        if is_vowel and not prev_is_vowel:
            count += 1
            
        prev_is_vowel = is_vowel
    
    # Adjust for common patterns
    # Silent 'e' at end of word
    if word.endswith('e') and len(word) > 2 and word[-2] not in vowels:
        count -= 1
    
    # Words ending with 'le' usually add a syllable
    if len(word) > 2 and word.endswith('le') and word[-3] not in vowels:
        count += 1
        
    # Ensure at least one syllable for any word
    return max(1, count)

def process_document(document):
    if 'content' in document and isinstance(document['content'], str):
        text = document['content']
        document['readability_score'] = calculate_flesch_reading_ease(text)
        
        # Add interpretation
        score = document['readability_score']
        if score >= 90:
            document['readability_level'] = "Very Easy"
        elif score >= 80:
            document['readability_level'] = "Easy"
        elif score >= 70:
            document['readability_level'] = "Fairly Easy"
        elif score >= 60:
            document['readability_level'] = "Standard"
        elif score >= 50:
            document['readability_level'] = "Fairly Difficult"
        elif score >= 30:
            document['readability_level'] = "Difficult"
        else:
            document['readability_level'] = "Very Difficult"
            
    return [document]
				
			

You could use it to:

  • Analyze customer support responses to ensure appropriate language complexity

  • Evaluate content marketing materials based on audience reading level

  • Monitor social media posts for accessibility and clarity

By creating a Custom Function, you gain direct control over how these metrics are calculated and applied—without relying on external tooling.

Testing Your Custom Functions

Before deploying your Custom Function to a production pipeline, it’s important to test it thoroughly. Fortunately, one of the most powerful features of the Custom Function component is its built-in testing capability.

To get started, you can:

  1. Write your Python function directly in the code editor

  2. Provide sample documents in JSON format

  3. Run the test to preview exactly how your function processes the data

You’ll see a clear interface showing your code, sample inputs, and the resulting output.

Screenshot of the Datastreamer Custom Function testing interface, with a Python function editor and JSON input area used to simulate document processing.
Example of the Custom Function testing interface in Datastreamer, showing Python code execution and sample JSON input for validating function behavior.

Here’s what the test results reveal:

  • Successfully processed documents

  • Failed documents: along with detailed error messages

  • Console output: which you can use for debugging

  • Execution time: helping you understand performance

Screenshot of Datastreamer's Custom Function test results showing successful and failed documents, console output, and example JSON input.
Output view of the Custom Function testing tool in Datastreamer, displaying processed results, error messages, and execution time.

This built-in functionality removes the guesswork from development. In turn, it ensures your function performs as intended—before it’s ever used in a live pipeline.

Best Practices for Custom Functions

Performance Optimization

When working with Custom Functions that process large volumes of data, performance becomes a top priority. To help your function run efficiently, keep the following best practices in mind:

  • Use efficient data structures that suit the task at hand

  • Avoid unnecessary iterations through your list of documents

  • Monitor memory usage when handling large or nested data

  • Leverage Python’s built-in functions whenever possible, as they are typically more optimized

These strategies can significantly reduce processing time and resource consumption.

However, performance isn’t just about code structure. If your function isn’t optimized, it can directly affect the speed and efficiency of your entire pipeline.

That’s why we recommend the following: since Datastreamer supports native Python, our engineering team encourages you to run your own load tests locally. This is especially important if you’re working with large datasets—testing ahead of deployment can help prevent costly bottlenecks later on.

Error Handling

Robust error handling is essential in any data pipeline. Without it, a single failure can cause the entire batch of documents to stop processing.

Here’s an example of how to handle errors gracefully:

				
					def process_batch(documents: list) -> list:
    processed_docs = []
    for doc in documents:
        try:
            # Process the document
            result = transform_document(doc)
            processed_docs.append(result)
        except Exception as e:
            # Log the error but continue processing other documents
            print(f"Error processing document {doc.get('id', 'unknown')}: {str(e)}")
    return processed_docs

				
			

This pattern allows your pipeline to continue running even if one or more documents fail.

Additionally, it’s considered best practice to use safe access methods like .get when retrieving values from dictionaries. In the example above, .get ensures the code still runs smoothly by providing a default fallback value.

By implementing error handling early, you reduce the risk of critical pipeline failures and make debugging much easier down the line.

Documentation

Proper documentation is essential for every Custom Function. It helps define what the function does, what inputs it expects, what outcomes it should produce, and any limitations to be aware of.

Here are a few key benefits of documenting your functions:

  • Clarity for Others (and Future You): Clear documentation makes it easier for others to understand how your function works and what to expect. Even if you’re the original author, you might forget the details over time.

  • Easier Maintenance: Well-documented functions are easier to update, debug, and extend later on.

  • Collaboration: If you’re working on a team, good documentation ensures alignment and helps prevent misunderstandings.

  • Testing and Validation: Including examples and expected outputs in your docstrings can guide future testing and help verify that the function behaves correctly.

Additionally, we strongly recommend backing up your Python functions. If you delete a component—or the entire pipeline—Datastreamer cannot recover the original function on your behalf.

Advanced Use Cases

Custom Functions aren’t just for basic tasks—they also enable advanced capabilities like external data enrichment, document normalization, and schema-aware processing. Below are some common use cases where Custom Functions truly shine.

Enriching Documents with External Data

You can use Custom Functions to enrich your documents by connecting to third-party APIs or databases.

For example, if your document includes an address, you can call an external geocoding service and append the corresponding coordinates:

				
					import requests

def process_batch(documents: list) -> list:
    for doc in documents:
        if 'address' in doc:
            # Call a geocoding service
            response = requests.get(f"<https://geocode.example.com/api?address={doc['address']}>")
            location_data = response.json()

            # Add the geo coordinates to the document
            doc['latitude'] = location_data['lat']
            doc['longitude'] = location_data['lng']

    return documents
				
			

This approach makes your documents more actionable by embedding relevant metadata directly into them.

Document Transformation and Normalization

In real-world scenarios, data often arrives from multiple sources with inconsistent formats. Custom Functions make it easy to normalize this data.

				
					def process_batch(documents: list) -> list:
    normalized = []
    for doc in documents:
        # Create a standardized structure
        normalized_doc = {
            'id': doc.get('id') or doc.get('ID') or doc.get('_id'),
            'timestamp': standardize_date_format(doc.get('date') or doc.get('timestamp')),
            'content': doc.get('message') or doc.get('text') or doc.get('content'),
            'source': identify_source(doc)
        }
        normalized.append(normalized_doc)

    return normalized
				
			

By creating a unified data structure, you simplify downstream processing and analysis.

Common Challenges and Solutions

Even with powerful tools, real-world data presents challenges. Here are two common ones—and how Custom Functions can help.

Handling Large Documents

When dealing with large payloads, it’s important to process only what you need. This example shows how to extract key information and optionally discard large text fields to reduce memory usage:

				
					def process_batch(documents: list) -> list:
    for doc in documents:
        if 'large_text_field' in doc and len(doc['large_text_field']) > 10000:
            # Extract just what you need instead of processing everything
            doc['extracted_info'] = extract_key_information(doc['large_text_field'])
            # Optionally remove the large field if no longer needed
            del doc['large_text_field']

    return documents
				
			

Working with Different Document Schemas

If your pipeline ingests data from multiple platforms—such as Twitter, Instagram, or Facebook—you can route documents to the appropriate handler based on their schema:

				
					def process_batch(documents: list) -> list:
    processed = []

    for doc in documents:
        # Determine document type and process accordingly
        if 'twitter_id' in doc:
            processed.append(process_twitter_data(doc))
        elif 'instagram_post_id' in doc:
            processed.append(process_instagram_data(doc))
        elif 'facebook_post_id' in doc:
            processed.append(process_facebook_data(doc))
        else:
            # Handle unknown document types
            processed.append(process_generic_data(doc))

    return processed
				
			

👉 Tip: For more complex use cases, we recommend using our Unify Transformer component before applying a Custom Function. This ensures your documents follow a standard structure.

Wrapping Up

Custom Functions strike the perfect balance between the control of custom code and the reliability of a managed pipeline platform. By embedding Python directly into your Datastreamer pipelines, you unlock the flexibility to meet your specific data transformation needs—without sacrificing performance or scalability.

Whether you’re enriching documents, handling large data volumes, or normalizing formats across multiple sources, Custom Functions empower you to build exactly what your pipeline requires.

Ready to get started?

Explore our complete documentation and start building smarter, more flexible pipelines today. 

We look forward to connecting with you.

Let us know if you're an existing customer or a new user, so we can help you get started!