This comprehensive guide shows you how to connect to popular workplace apps like Google Drive, Slack, and Notion using Composio, extract their data, and store it in Cortex as your vector database for AI-powered search and retrieval.

Overview

By combining Composio’s app integration capabilities with Cortex’s vector storage, you can:
  • Unify data sources: Connect to 100+ apps including Google Workspace, Slack, Notion, GitHub, and more
  • Intelligent processing: Extract and process different content types (documents, messages, pages)
  • Semantic search: Query across all your data sources using natural language
  • Real-time sync: Keep your knowledge base updated with the latest information

Prerequisites

Before you begin, ensure you have:
  • Python 3.8+ installed
  • A Composio account and API key (get it here)
  • A Cortex account and API key (contact us)
  • Access to the apps you want to integrate (Google Drive, Slack, Notion, etc.)

Step 1: Install Required SDKs

Install both Composio and the necessary HTTP client for Cortex:
pip install composio-core requests

Step 2: Set Up Authentication

Composio Authentication

First, set up your Composio API key:
import os
from composio import Composio

# Set your API key
os.environ["COMPOSIO_API_KEY"] = "your_composio_api_key"

# Initialize Composio client
composio = Composio()

Cortex Configuration

Configure your Cortex API settings:
import requests

CORTEX_API_KEY = "your_cortex_api_key"
CORTEX_BASE_URL = "https://api.usecortex.ai"
TENANT_ID = "your_tenant_id"

headers = {
    "Authorization": f"Bearer {CORTEX_API_KEY}",
    "Content-Type": "application/json"
}

Step 3: Connect to Your Apps

Authorize App Connections

Connect to your desired applications using Composio’s authorization flow:
def connect_to_apps(user_id: str):
    """Connect to Google Drive, Slack, and Notion"""
    
    apps_to_connect = [
        {"name": "googledrive", "display": "Google Drive"},
        {"name": "slack", "display": "Slack"},
        {"name": "notion", "display": "Notion"}
    ]
    
    connected_apps = {}
    
    for app in apps_to_connect:
        try:
            # Create connection request
            connection_request = composio.client.get_connection_request(
                user_uuid=user_id,
                app_name=app["name"]
            )
            
            print(f"Authorize {app['display']} by visiting: {connection_request.redirect_url}")
            
            # Wait for user to complete authorization
            connection_request.wait_until_active(timeout=300)  # 5 minutes timeout
            
            connected_apps[app["name"]] = connection_request
            print(f"{app['display']} connected successfully!")
            
        except Exception as e:
            print(f"Failed to connect to {app['display']}: {str(e)}")
    
    return connected_apps

# Connect apps for a user
user_id = "user@yourcompany.com"
connections = connect_to_apps(user_id)

Step 4: Extract Data from Apps

Get Available Tools

Retrieve the tools for your connected apps:
def get_available_tools(user_id: str, app_names: list):
    """Get tools for connected applications"""
    tools = {}
    
    for app_name in app_names:
        try:
            app_tools = composio.tools.get(
                user_id=user_id,
                apps=[app_name]
            )
            tools[app_name] = app_tools
            print(f"Retrieved {len(app_tools)} tools for {app_name}")
        except Exception as e:
            print(f"Error getting tools for {app_name}: {str(e)}")
    
    return tools

# Get tools for connected apps
app_names = ["googledrive", "slack", "notion"]
available_tools = get_available_tools(user_id, app_names)

Extract Data from Each App

Google Drive Integration

def extract_google_drive_data(user_id: str):
    """Extract files and documents from Google Drive"""
    try:
        # List files in Google Drive
        result = composio.tools.execute_action(
            action="googledrive_list_files",
            user_id=user_id,
            params={
                "q": "mimeType='application/pdf' or mimeType contains 'document' or mimeType contains 'text'",
                "maxResults": 50
            }
        )
        
        extracted_data = []
        
        for file_item in result.get("files", []):
            # Get file content
            content_result = composio.tools.execute_action(
                action="googledrive_get_file_content",
                user_id=user_id,
                params={"fileId": file_item["id"]}
            )
            
            extracted_data.append({
                "id": file_item["id"],
                "title": file_item["name"],
                "type": "googledrive",
                "content": content_result.get("content", ""),
                "url": file_item.get("webViewLink", ""),
                "timestamp": file_item.get("modifiedTime", ""),
                "metadata": {
                    "mimeType": file_item.get("mimeType"),
                    "size": file_item.get("size"),
                    "owner": file_item.get("owners", [{}])[0].get("displayName", "")
                }
            })
        
        return extracted_data
        
    except Exception as e:
        print(f"Error extracting Google Drive data: {str(e)}")
        return []

google_drive_data = extract_google_drive_data(user_id)
print(f"Extracted {len(google_drive_data)} files from Google Drive")

Slack Integration

def extract_slack_data(user_id: str):
    """Extract messages and threads from Slack"""
    try:
        # Get Slack channels
        channels_result = composio.tools.execute_action(
            action="slack_list_channels",
            user_id=user_id,
            params={"types": "public_channel,private_channel"}
        )
        
        extracted_data = []
        
        for channel in channels_result.get("channels", []):
            # Get recent messages from each channel
            messages_result = composio.tools.execute_action(
                action="slack_get_channel_history",
                user_id=user_id,
                params={
                    "channel": channel["id"],
                    "count": 100,
                    "inclusive": True
                }
            )
            
            for message in messages_result.get("messages", []):
                if message.get("text"):  # Only process messages with text content
                    extracted_data.append({
                        "id": f"slack_{channel['id']}_{message['ts']}",
                        "title": f"Message in #{channel['name']}",
                        "type": "slack",
                        "content": message["text"],
                        "url": f"https://yourworkspace.slack.com/archives/{channel['id']}/p{message['ts'].replace('.', '')}",
                        "timestamp": message["ts"],
                        "metadata": {
                            "channel": channel["name"],
                            "user": message.get("user", ""),
                            "thread_ts": message.get("thread_ts")
                        }
                    })
        
        return extracted_data
        
    except Exception as e:
        print(f"Error extracting Slack data: {str(e)}")
        return []

slack_data = extract_slack_data(user_id)
print(f"Extracted {len(slack_data)} messages from Slack")

Notion Integration

def extract_notion_data(user_id: str):
    """Extract pages and databases from Notion"""
    try:
        # Search for pages in Notion
        search_result = composio.tools.execute_action(
            action="notion_search_pages",
            user_id=user_id,
            params={
                "filter": {"property": "object", "value": "page"},
                "page_size": 50
            }
        )
        
        extracted_data = []
        
        for page in search_result.get("results", []):
            # Get page content
            content_result = composio.tools.execute_action(
                action="notion_get_page_content",
                user_id=user_id,
                params={"page_id": page["id"]}
            )
            
            # Extract title from properties
            title = "Untitled"
            if page.get("properties", {}).get("title"):
                title_prop = page["properties"]["title"]
                if title_prop.get("title") and len(title_prop["title"]) > 0:
                    title = title_prop["title"][0].get("plain_text", "Untitled")
            
            extracted_data.append({
                "id": page["id"],
                "title": title,
                "type": "notion",
                "content": content_result.get("content", ""),
                "url": page.get("url", ""),
                "timestamp": page.get("last_edited_time", ""),
                "metadata": {
                    "created_time": page.get("created_time"),
                    "created_by": page.get("created_by", {}).get("id"),
                    "parent": page.get("parent", {})
                }
            })
        
        return extracted_data
        
    except Exception as e:
        print(f"Error extracting Notion data: {str(e)}")
        return []

notion_data = extract_notion_data(user_id)
print(f"Extracted {len(notion_data)} pages from Notion")

Step 5: Store Data in Cortex

Batch Upload to Cortex

Use Cortex’s app sources upload endpoint to efficiently store all your extracted data:
def upload_to_cortex(data_sources: list, source_type: str):
    """Upload extracted data to Cortex using the app sources endpoint"""
    
    # Prepare data for Cortex app sources format
    cortex_data = []
    
    for item in data_sources:
        cortex_item = {
            "id": item["id"],
            "title": item["title"],
            "type": source_type,
            "description": f"Content from {item['type']}",
            "url": item.get("url", ""),
            "timestamp": item.get("timestamp", ""),
            "content": {
                "text": item["content"],
                "html_base64": "",
                "csv_base64": "",
                "markdown": "",
                "files": [],
                "layout": []
            },
            "cortex_metadata": {
                "source_app": item["type"],
                "extracted_at": item.get("timestamp", "")
            },
            "meta": item.get("metadata", {}),
            "attachments": []
        }
        cortex_data.append(cortex_item)
    
    # Upload in batches of 20 (recommended by Cortex)
    batch_size = 20
    uploaded_count = 0
    
    for i in range(0, len(cortex_data), batch_size):
        batch = cortex_data[i:i + batch_size]
        
        try:
            response = requests.post(
                f"{CORTEX_BASE_URL}/upload/upload_app_sources",
                params={"tenant_id": TENANT_ID},
                headers=headers,
                json=batch
            )
            
            if response.status_code == 200:
                uploaded_count += len(batch)
                print(f"Uploaded batch {i//batch_size + 1}: {len(batch)} items")
            else:
                print(f"Failed to upload batch {i//batch_size + 1}: {response.text}")
        
        except Exception as e:
            print(f"Error uploading batch: {str(e)}")
        
        # Wait 1 second between batches as recommended
        time.sleep(1)
    
    return uploaded_count

# Upload all extracted data
import time

total_uploaded = 0
total_uploaded += upload_to_cortex(google_drive_data, "google_drive")
total_uploaded += upload_to_cortex(slack_data, "slack")
total_uploaded += upload_to_cortex(notion_data, "notion")

print(f"Successfully uploaded {total_uploaded} items to Cortex!")

Verify Processing

Check if your uploaded data is ready for querying:
def verify_processing():
    """Verify that uploaded data is processed and ready"""
    try:
        response = requests.get(
            f"{CORTEX_BASE_URL}/list/sources",
            params={"tenant_id": TENANT_ID},
            headers=headers
        )
        
        if response.status_code == 200:
            sources = response.json()
            print(f"Found {len(sources)} sources in your knowledge base:")
            
            for source in sources[:10]:  # Show first 10
                print(f"  - {source.get('title', 'Untitled')} ({source.get('type', 'unknown')})")
            
            return True
        else:
            print(f"Error checking sources: {response.text}")
            return False
            
    except Exception as e:
        print(f"Error verifying processing: {str(e)}")
        return False

# Verify processing
if verify_processing():
    print("Data is ready for querying!")

Step 6: Query Your Integrated Data

Now you can query across all your connected apps using Cortex’s Q&A endpoint:
def query_integrated_data(question: str, session_id: str = "integration_session"):
    """Query across all integrated app data"""
    
    query_payload = {
        "question": question,
        "session_id": session_id,
        "tenant_id": TENANT_ID,
        "ai_generation": True,
        "highlight_chunks": True,
        "search_alpha": 0.7,  # Balance between semantic and keyword search
        "recency_bias": 0.3   # Slightly favor recent content
    }
    
    try:
        response = requests.post(
            f"{CORTEX_BASE_URL}/search/qna",
            headers=headers,
            json=query_payload
        )
        
        if response.status_code == 200:
            result = response.json()
            
            print(f"Answer: {result.get('answer', 'No answer generated')}")
            print(f"\nSources:")
            
            for i, source in enumerate(result.get('sources', [])[:3], 1):
                print(f"  {i}. {source.get('title', 'Untitled')} (via {source.get('type', 'unknown')})")
                if source.get('url'):
                    print(f"     {source['url']}")
            
            return result
        else:
            print(f"Query failed: {response.text}")
            return None
            
    except Exception as e:
        print(f"Error querying data: {str(e)}")
        return None

# Example queries
example_queries = [
    "What recent project updates were discussed in Slack?",
    "Find documentation about our API from Google Drive",
    "Show me meeting notes from Notion about the Q4 planning",
    "What are the main topics discussed across all our platforms?"
]

for query in example_queries:
    print(f"\nQuery: {query}")
    query_integrated_data(query)
    print("-" * 80)

Advanced Configuration

Filtering by Source

You can query specific sources using metadata filters:
def query_specific_source(question: str, source_type: str):
    """Query only specific app sources"""
    
    query_payload = {
        "question": question,
        "session_id": f"{source_type}_session",
        "tenant_id": TENANT_ID,
        "ai_generation": True,
        "tenant_metadata": {"source_app": source_type}  # Filter by source
    }
    
    response = requests.post(
        f"{CORTEX_BASE_URL}/search/qna",
        headers=headers,
        json=query_payload
    )
    
    return response.json() if response.status_code == 200 else None

# Query only Slack data
slack_result = query_specific_source("What were the latest team updates?", "slack")

Setting Up Automated Sync

For keeping your data fresh, you can set up periodic sync:
import schedule
import time

def sync_all_apps():
    """Sync data from all connected apps"""
    print("Starting automated sync...")
    
    # Re-extract data
    new_google_drive_data = extract_google_drive_data(user_id)
    new_slack_data = extract_slack_data(user_id)
    new_notion_data = extract_notion_data(user_id)
    
    # Upload to Cortex
    total_synced = 0
    total_synced += upload_to_cortex(new_google_drive_data, "google_drive")
    total_synced += upload_to_cortex(new_slack_data, "slack")
    total_synced += upload_to_cortex(new_notion_data, "notion")
    
    print(f"Sync completed: {total_synced} items updated")

# Schedule sync every 6 hours
schedule.every(6).hours.do(sync_all_apps)

# Run scheduler (in production, use a proper job scheduler)
# while True:
#     schedule.run_pending()
#     time.sleep(60)

Best Practices

Performance Optimization

  1. Batch Processing: Always upload data in batches of 20 or less
  2. Rate Limiting: Wait 1 second between batch uploads
  3. Incremental Sync: Only sync new/changed content when possible
  4. Content Filtering: Pre-filter irrelevant content before uploading

Security Considerations

  1. API Key Management: Store API keys in environment variables
  2. Access Scopes: Request minimal necessary permissions from apps
  3. Data Retention: Implement proper data cleanup policies
  4. Encryption: Ensure data is encrypted in transit and at rest

Monitoring and Maintenance

def get_sync_status():
    """Monitor the health of your integrations"""
    
    # Check Cortex storage
    sources_response = requests.get(
        f"{CORTEX_BASE_URL}/list/sources",
        params={"tenant_id": TENANT_ID},
        headers=headers
    )
    
            if sources_response.status_code == 200:
            sources = sources_response.json()
            
            # Group by source type
            source_counts = {}
            for source in sources:
                source_type = source.get('type', 'unknown')
                source_counts[source_type] = source_counts.get(source_type, 0) + 1
            
            print("Current Knowledge Base Status:")
            for source_type, count in source_counts.items():
                print(f"  - {source_type}: {count} items")
            
            return source_counts
    
    return {}

# Monitor your integration
sync_status = get_sync_status()

Troubleshooting

Common Issues

  1. Authorization Failures: Ensure app permissions are correctly granted
  2. Rate Limiting: Implement exponential backoff for API calls
  3. Content Processing: Handle different content types appropriately
  4. Memory Usage: Process large datasets in smaller chunks

Error Handling

def robust_extraction(extraction_func, app_name: str, max_retries: int = 3):
    """Wrapper for robust data extraction with retries"""
    
    for attempt in range(max_retries):
        try:
            return extraction_func(user_id)
        except Exception as e:
            print(f"Attempt {attempt + 1} failed for {app_name}: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                print(f"Max retries reached for {app_name}")
                return []

# Use robust extraction
google_drive_data = robust_extraction(extract_google_drive_data, "Google Drive")

Conclusion

You now have a complete system for ingesting data from popular workplace apps using Composio and storing it in Cortex. This integration enables you to:
  • Search across all platforms with natural language queries
  • Get AI-powered answers that cite sources from multiple apps
  • Maintain up-to-date knowledge with automated syncing
  • Scale to additional apps using the same pattern
The combination of Composio’s extensive app ecosystem and Cortex’s powerful vector search capabilities provides a robust foundation for building AI applications that can access and understand your organization’s distributed knowledge. For additional apps and advanced configurations, refer to the Composio documentation and explore Cortex’s API reference for more advanced features.