Build Smarter AI Workflows with Datastreamer Pipelines

By Marvan Boff
May 2025 | 20 min. read
Table of Contents
In the world of data-driven decision making, turning raw data into actionable insights is essential but the journey from ingestion to analysis is rarely simple. For teams focused on machine learning and innovation, building an AI data pipeline that supports this journey can be a costly distraction from their core mission.
Rather than reinventing the wheel, we’ll demonstrate how Datastreamer, a data pipeline platform, can be used to offload infrastructure concerns and provide all the framework needed to help you achieve your goal.
👉 New to Datastreamer? Check out our Getting Started Guide.
The Problem: Infrastructure Slows AI Teams Down
To demonstrate the challenge, we’ll use a fictional company called SocialMaple, which develops machine learning models to predict emerging trends.
The SocialMaple team is made up of data scientists focused on model development and accuracy. Their customers provide large datasets, often stored in cloud platforms like Amazon S3, Google Cloud Storage, or similar external sources.
Before any predictions can be made, this raw data must be ingested, transformed, enriched, and analyzed.
Managing this end-to-end data pipeline requires a significant infrastructure investment. As a result, SocialMaple is forced to divert time and resources away from its primary goal (delivering reliable, real-time trend forecasts) into maintaining a complex backend system.
The Solution: Build a Complete AI Pipeline Without the Overhead
To solve SocialMaple’s infrastructure challenge, we use the Datastreamer platform to build a pipeline that ingests, transforms, enriches, and analyzes data in just a few steps.
This setup allows SocialMaple to focus on delivering insights instead of managing backend systems.
Code Example and Demo Pipeline
Here’s how to build a fully functioning pipeline using Datastreamer.
Create Your Pipeline
Start by visiting the Datastreamer Portal and creating a new pipeline.
Your pipeline will include the following components:
- Google Cloud Storage Ingress
Ingests JSON files from a specific GCS bucket and forwards them in batches to the next stage in the pipeline.
- Custom Function
Uses SocialMaple’s prediction API to process each record. The predictions (sentiment and named entities) are merged to the original JSON data. - Datastreamer Searchable Storage
Stores enriched data in an indexed, searchable format for fast querying and aggregation.

Step-by-Step Pipeline Configuration
Once your pipeline is created, it’s time to configure the individual components to connect your data and apply transformations.
Step 1: Configure Google Cloud Storage Ingress



posts.json
file, used as input for the Datastreamer pipeline.

In this example, we’ll use the folder scan option to continuously check for new files in the designated cloud storage folder.
📘 For more details on supported ingestion types, refer to our cloud storage documentation.
*Note that jobs can be added before or after the pipeline deployment.

Step 2: Enrich Posts with Custom Python Function
Named Entity Recognition (NER) results
Sentiment labels for classification
📘 For implementation details, see the documentation on
Datastreamer Sentiment Operations and Named Entity Recognition.
To understand how to implement custom logic such as this within your AI data pipeline, check the guide on Custom Functions.

posts.json
file, used as input for the Datastreamer pipeline.
import requests
import json
def process_batch(documents: list) -> list[dict[any, any]]:
"""
Sends an request to social mapple prediction API and merge the response into the content
"""
content_list = []
for doc in documents:
content_list.append(doc["content"]["body"])
url = "https://socialmapple-prediction-344857442423.us-central1.run.app"
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=json.dumps(content_list))
predict_response = response.json()
for i, doc in enumerate(documents):
doc["enrichment"] = {
"sentiment": predict_response[i]["sentiment"],
"entity": predict_response[i]["entity"],
}
doc["data_source"] = "social_mapple"
return documents
Step 3: Store Enriched Data Using Datastreamer Searchable Storage
The final step automatically ingests and stores your enriched JSON records in Datastreamer’s internal searchable database.
No additional configuration is required — data becomes query-ready as soon as it enters this step.
Deploy Your Pipeline
Now that your pipeline has been created, configured and saved, it is time to deploy your pipeline. All you need to do is hit the Deploy button.
More details on the Pipeline Deploy documentation page.
Getting Insights From Your Data
Last, it is time to query the data and get insights using Datastreamer Search API. The API supports an extensible list of aggregations. For this example, we’ll aggregate the content using term aggregation using the sentiment label and filtering the content by entity non-alcoholic.
More details on the Pipeline Deploy documentation page.

non-alcoholic
.

Send Data to Storage, Warehouse, or Custom Destinations
Datastreamer pipelines support multiple egress destinations to fit your data architecture. You can send enriched data to analytics platforms like Snowflake, Databricks, or BigQuery, or export it to cloud storage services such as GCS, Amazon S3, and Azure.
For custom workflows, Datastreamer also enables integration with internal systems using Webhooks and Firehose delivery.
Scale AI Workflows Without Infrastructure Overhead
This article showed how to use Datastreamer to simplify the full data pipeline, including data ingestion from Google Cloud Storage, custom function integration for AI predictions, and searchable storage for querying and analysis.
With just a few configuration steps, teams can enrich large volumes of data, extract sentiment and entity insights, and make that data available for downstream use—without the need to build or maintain complex infrastructure.
By offloading backend processes, AI and data science teams can stay focused on model development and innovation, supported by a scalable and efficient data pipeline foundation.
Ready to Build Smarter AI Pipelines?
Whether you’re working with real-time social data or training high-performance models, Datastreamer gives you the tools to connect, enrich, and activate your data without the infrastructure burden.