Pipeline Companion – an AWS Strands Agent for Data Pipeline Monitoring


Problem Statement

Modern data pipelines are built on AWS services such as Glue, Lambda, Step Functions. Testing these pipelines—especially under schema change, scale, and integration demands—is a major bottleneck.

Manual testing is slow, prone to oversight, and unable to adapt to dynamic, distributed pipeline environments.

Data pipeline failures in production environments cost organizations time, money and schedule.

More important, debugging and troubleshooting pipeline failures gives engineer a stressed and hectic time.

Survey Insights wrt Scale of the Problem:


Core Challenges and Issues

Target User


Solution and Technical Components

Pipeline Companion is an AI-powered autonomous testing framework that redesigns how organizations test and validate AWS data pipelines. Integrating Model Context Protocol with agents, it automatically understands pipeline logic, generates comprehensive test scenarios, executes tests in isolated environments, and provides diagnostics.

Features of Pipeline companion


Architecture

Flow of a typical event:


Agent Development

Github: https://github.com/juggarnautss/Pipeline_Agent

Here’s a code skeleton and explanation:

main.py: Entry point. Initializes the orchestrator and starts CLI/web/demo modes.

# ...existing code...
from agents.orchestrator import AgentOrchestrator

def main():
    companion = AgentOrchestrator()
    # CLI or web entry point
    # Example: companion.chat("Analyze my Glue job called data-processor")
# ...existing code...

orchestrator.py: Central router. Decides which specialized agent(s) handle each user request.

# ...existing code...
class AgentOrchestrator:
    def __init__(self):
        self.analyzer = AnalyzerAgent()
        self.tester = TestAgent()
        self.diagnostician = DiagnosticAgent()
        self.reporter = ReportingAgent()
        self.test_executor = TestExecutionAgent()
    
    def process_user_request(self, user_prompt: str) -> str:
        # Route request to appropriate agent(s)
        pass

    def chat(self, message: str) -> str:
        return self.process_user_request(message)
# ...existing code...

base_agent.py: Abstract base for all agents. Handles model setup, MCP integration, and tool management.

# ...existing code...
class BaseAgent(ABC):
    def __init__(self):
        self.model = BedrockModel(...)
        self.mcp_manager = get_unified_mcp_manager()
        self.mcp_status = self.mcp_manager.get_server_status()
    
    @abstractmethod
    def get_tools(self):
        pass
    
    @abstractmethod
    def get_system_prompt(self) -> str:
        pass
    
    def process(self, message: str) -> str:
        agent = Agent(
            model=self.model,
            tools=self.get_agent_tools(),
            system_prompt=self.get_enhanced_system_prompt()
        )
        result = agent(message)
        return result.message if hasattr(result, 'message') else str(result)
# ...existing code...

analyzer_agent.py: Example specialized agent. Implements analysis tools and prompts.

# ...existing code...
class AnalyzerAgent(BaseAgent):
    def get_tools(self):
        # Return analysis tools and MCP tools
        pass

    def get_system_prompt(self) -> str:
        # Return prompt for analysis tasks
        pass

    @tool
    def _analyze_pipeline_health(self, health_request: Dict[str, Any]) -> str:
        # Health check logic
        pass
# ...existing code...

unified_mcp_manager.py: Manages connections to AWS MCP servers (Glue, Lambda, CloudWatch) for deep pipeline insights.

# ...existing code...
class UnifiedMCPManager:
    def __init__(self):
        self.cloudwatch = CloudWatchMCPServer()
        self.glue = GlueMCPServer()
        self.lambda_server = LambdaMCPServer()
        self._initialize_servers()
    
    def get_server_status(self) -> Dict[str, Any]:
        # Return status of MCP servers
        pass
# ...existing code...

Each agent (Analyzer, Tester, Diagnostic, Reporting, Test Execution) inherits from BaseAgent and provides domain-specific tools and prompts. The orchestrator coordinates single or multi-agent workflows based on user intent.


User Interface

Command Line Interface:

GUI:


Deployment

Local deploymentUsing uv package manager we can install the all packages via requirement file.
Create a python virtual environment
Export the AWS configuration
Run the main using python3
Amazon EC2Use  TypeScript CDK to deploy the agent to EC2.
Once deployed, you can test your agent using the public IP address and port.
Amazon EKSPrerequisite is to containerized the Pipeline companion agent using docker.
The deploy the pipeline companion suing helm charts in EKS
Use  ALB to route the traffic to EKSAccess the pipeline companion using ALB DNS name
Lambda and FargateAs per AWS strands documentation


Conclusion

Pipeline Companion demonstrates the power of agentic AI in automating and enhancing AWS data pipeline testing. By leveraging specialized agents and deep AWS integration, it delivers rapid, reliable analysis, intelligent test generation, and actionable diagnostics.

This approach not only accelerates development cycles but also improves pipeline resilience and data quality. As agentic frameworks evolve, solutions like Pipeline Companion will become essential tools for modern data engineering teams seeking scalable, autonomous pipeline validation.


References

Pipeline Companion

AWS Strands SDK

streamlit

Author Profile

Sirin Shaikh
Sirin ShaikhAI | Amplifying Impact
Talks about AI | GenAI | Machine Learning | Cloud | Kubernetes

Leave a Comment