Build Smarter AI Workflows with Datastreamer Pipelines

By Marvan Boff

May 2025 | 20 min. read

Table of Contents

In the world of data-driven decision making, turning raw data into actionable insights is essential but the journey from ingestion to analysis is rarely simple. For teams focused on machine learning and innovation, building an AI data pipeline that supports this journey can be a costly distraction from their core mission.

Rather than reinventing the wheel, we’ll demonstrate how Datastreamer, a data pipeline platform, can be used to offload infrastructure concerns and provide all the framework needed  to help you achieve your goal. 

👉 New to Datastreamer? Check out our Getting Started Guide.

The Problem: Infrastructure Slows AI Teams Down

To demonstrate the challenge, we’ll use a fictional company called SocialMaple, which develops machine learning models to predict emerging trends.

The SocialMaple team is made up of data scientists focused on model development and accuracy. Their customers provide large datasets, often stored in cloud platforms like Amazon S3, Google Cloud Storage, or similar external sources.

Before any predictions can be made, this raw data must be ingested, transformed, enriched, and analyzed.

Managing this end-to-end data pipeline requires a significant infrastructure investment. As a result, SocialMaple is forced to divert time and resources away from its primary goal (delivering reliable, real-time trend forecasts) into maintaining a complex backend system.

The Solution: Build a Complete AI Pipeline Without the Overhead

To solve SocialMaple’s infrastructure challenge, we use the Datastreamer platform to build a pipeline that ingests, transforms, enriches, and analyzes data in just a few steps.

This setup allows SocialMaple to focus on delivering insights instead of managing backend systems.

Code Example and Demo Pipeline

Here’s how to build a fully functioning pipeline using Datastreamer. 

Create Your Pipeline

Start by visiting the Datastreamer Portal and creating a new pipeline. 

Your pipeline will include the following components:

  • Google Cloud Storage Ingress
    Ingests JSON files from a specific GCS bucket and forwards them in batches to the next stage in the pipeline.
  • Custom Function
    Uses SocialMaple’s prediction API to process each record. The predictions (sentiment and named entities) are merged to the original JSON data. 
  • Datastreamer Searchable Storage
    Stores enriched data in an indexed, searchable format for fast querying and aggregation.
Screenshot of the Datastreamer pipeline builder interface with a configured pipeline including Google Cloud Storage Ingress, a Custom Function, and Datastreamer Searchable Storage.
Example Datastreamer pipeline setup showing data ingestion from Google Cloud Storage, enrichment using a custom function, and storage in searchable format.

Step-by-Step Pipeline Configuration 

Once your pipeline is created, it’s time to configure the individual components to connect your data and apply transformations.

Step 1: Configure Google Cloud Storage Ingress
1. Add Google Service Account to API secret. Secrets are encrypted and stored in a secure storage.
Screenshot of the Datastreamer Keys & Secrets page showing an active API key and a configured Google Cloud service account secret for pipeline access.
The Datastreamer Keys & Secrets dashboard with an active API key and a configured Google Cloud service account used for secure data access.
2. To simulate real-world usage, we’ll use a set of fictional social media comments related to non-alcoholic beer. This sample dataset will be ingested into the pipeline and enriched using AI.
Sample dataset containing fictional social media comments about non-alcoholic beer, formatted as JSON and used to demonstrate pipeline ingestion and enrichment.
Sample dataset containing fictional social media comments about non-alcoholic beer, formatted as JSON and used to demonstrate pipeline ingestion and enrichment.
The Google Cloud Storage bucket containing the posts.json file, used as input for the Datastreamer pipeline.
The Google Cloud Storage bucket containing the posts.json file, used as input for the Datastreamer pipeline.
3. Connect and configure the Google Cloud Storage Ingress to pull JSON from your GCS bucket.
The Google Cloud Storage Ingress component in Datastreamer configured to read JSON files from the socialmapple bucket using a defined service account and folder structure.
4. Add a new job to monitor the bucket for incoming data.
 

In this example, we’ll use the folder scan option to continuously check for new files in the designated cloud storage folder.

📘 For more details on supported ingestion types, refer to our cloud storage documentation.

*Note that jobs can be added before or after the pipeline deployment.

Datastreamer pipeline interface showing the Google Cloud Storage Ingress configuration for a new job using the “Monitor Folder” ingestion type with JSON documents.
Step 2: Enrich Posts with Custom Python Function
The next step in building an AI data pipeline enriches each incoming post using SocialMaple’s machine learning model. We’ll use a Python-based custom function to send the post content to an API and merge the enriched data back into the original JSON.
 
The SocialMaple API accepts plain text input and returns two important outputs:
  • Named Entity Recognition (NER) results

  • Sentiment labels for classification

📘 For implementation details, see the documentation on
Datastreamer Sentiment Operations and Named Entity Recognition.

To understand how to implement custom logic such as this within your AI data pipeline, check the guide on Custom Functions.

Screenshot of Datastreamer’s Custom Function configuration page showing a Python script used to call an AI prediction API and enrich document data.
The Google Cloud Storage bucket containing the posts.json file, used as input for the Datastreamer pipeline.
				
					<pre><code class="language-python">
import requests
import json

def process_batch(documents: list) -> list[dict[any, any]]:
    """
    Sends an request to social mapple prediction API and merge the response into the content
    """
    content_list = []
    for doc in documents:
        content_list.append(doc["content"]["body"])

    url = "https://socialmapple-prediction-344857442423.us-central1.run.app"

    headers = {
        'Content-Type': 'application/json'
    }

    response = requests.request("POST", url, headers=headers, data=json.dumps(content_list))
    predict_response = response.json()

    for i, doc in enumerate(documents):
        doc["enrichment"] = {
            "sentiment": predict_response[i]["sentiment"],
            "entity": predict_response[i]["entity"],
        }
        doc["data_source"] = "social_mapple"

    return documents
</code></pre>

				
			
Full Python configuration for the custom function used to enrich JSON records via the SocialMaple prediction API.
Step 3: Store Enriched Data Using Datastreamer Searchable Storage

The final step automatically ingests and stores your enriched JSON records in Datastreamer’s internal searchable database.

No additional configuration is required — data becomes query-ready as soon as it enters this step.

Deploy Your Pipeline

Now that your pipeline has been created, configured and saved, it is time to deploy your pipeline. All you need to do is hit the Deploy button.

More details on the Pipeline Deploy documentation page.

Getting Insights From Your Data

Last, it is time to query the data and get insights using Datastreamer Search API. The API supports an extensible list of aggregations. For this example, we’ll aggregate the content using term aggregation using the sentiment label and filtering the content by entity non-alcoholic.

More details on the Pipeline Deploy documentation page.

Screenshot of a Datastreamer Search API POST request and response showing a query for sentiment aggregation filtered by the entity "non-alcoholic".
Datastreamer Search API example showing a JSON query and its aggregated sentiment results filtered by the entity non-alcoholic.
API response from the Datastreamer Search endpoint showing enriched content with sentiment and entity fields, along with aggregated sentiment counts.
API response from Datastreamer showing enriched post data with sentiment and entity values, alongside aggregated sentiment counts.

Send Data to Storage, Warehouse, or Custom Destinations

Datastreamer pipelines support multiple egress destinations to fit your data architecture. You can send enriched data to analytics platforms like Snowflake, Databricks, or BigQuery, or export it to cloud storage services such as GCS, Amazon S3, and Azure.

For custom workflows, Datastreamer also enables integration with internal systems using Webhooks and Firehose delivery.

Scale AI Workflows Without Infrastructure Overhead

This article showed how to use Datastreamer to simplify the full data pipeline, including data ingestion from Google Cloud Storage, custom function integration for AI predictions, and searchable storage for querying and analysis.

With just a few configuration steps, teams can enrich large volumes of data, extract sentiment and entity insights, and make that data available for downstream use—without the need to build or maintain complex infrastructure.

By offloading backend processes, AI and data science teams can stay focused on model development and innovation, supported by a scalable and efficient data pipeline foundation.

Ready to Build Smarter AI Pipelines?

Whether you’re working with real-time social data or training high-performance models, Datastreamer gives you the tools to connect, enrich, and activate your data without the infrastructure burden.

We look forward to connecting with you.

Let us know if you're an existing customer or a new user, so we can help you get started!