API Quickstart

Example search for Tesla stock outlook

Making your first request with the Athena API is straightforward with this script.

There are two examples provided below. The first, and recommended method, is using our /api/v2/query-async endpoint. The second method uses our /api/v2/query endpoint, which is geared towards small searches.

Using Async Query Endpoint

First define your ATHENA_API_KEY. The payload dictionary specifies the query parameters, including a search term ("donald trump"), your api_key, other configuration details to filter the results (for the sake of simplicity we are choosing to omit extra parameters — see API Reference for other options).

The initial request is sent via a POST call to Athena’s query-async API endpoint. The response contains your query_id, which is how you will retrieve results from your search with the /api/v2/get-results endpoint.

import time
import requests

# Constants
ATHENA_API_KEY = "YOUR_API_KEY"
QUERY = "donald trump"
start_date = "2025-03-04T15:13:52.466Z"
end_date = "2025-03-05T15:13:52.466Z"
QUERY_URL = "https://app.runathena.com/api/v2/query-async"
RESULTS_URL = "https://app.runathena.com/api/v2/get-results"
HEADERS = {"Content-Type": "application/json"}
ARTICLES_PER_PAGE = 25
POLL_INTERVAL = 1  # seconds to wait between polls

def send_initial_query(query: str, api_key: str) -> str:
    """
    Sends the initial query to the API and returns the query_id.
    """
    payload = {"query": query, "api_key": api_key,'toggle_state':'Encoded Articles',"start_date":start_date,"end_date":end_date}
    response = requests.post(QUERY_URL, headers=HEADERS, json=payload)
    response.raise_for_status()  # raise an error for bad responses
    data = response.json()
    return data.get('query_id')

def poll_for_results(query_id: str, api_key: str, interval: int = POLL_INTERVAL) -> dict:
    """
    Polls the API until the query status is not 'PENDING'.
    Returns the final result data.
    """
    payload = {"query_id": query_id, "api_key": api_key}
    response = requests.post(RESULTS_URL, headers=HEADERS, json=payload)
    response.raise_for_status()
    data = response.json()
    
    # Poll until the status changes from 'PENDING'
    while data.get('state') == 'PENDING':
        time.sleep(interval)
        response = requests.post(RESULTS_URL, headers=HEADERS, json=payload)
        response.raise_for_status()
        data = response.json()
    
    return data

def fetch_all_articles(query_id: str, total_results: int, api_key: str) -> list:
    """
    Fetches and aggregates all articles by paginating through the results.
    """
    all_articles = []
    page = 1
    payload = {"query_id": query_id, "api_key": api_key,'toggle_state':'Encoded Articles'}
    
    # Continue fetching while there are articles remaining.
    while (page - 1) * ARTICLES_PER_PAGE < total_results:
        payload['page'] = page
        response = requests.post(RESULTS_URL, headers=HEADERS, json=payload)
        response.raise_for_status()
        data = response.json()
        
        articles = data.get('articles', [])
        all_articles.extend(articles)
        page += 1
    
    return all_articles

def main():
    try:
        # Step 1: Send the initial query and retrieve the query_id.
        query_id = send_initial_query(QUERY, ATHENA_API_KEY)
        if not query_id:
            print("Failed to retrieve query ID.")
            return
        
        # Step 2: Poll the API until the query is processed.
        result_data = poll_for_results(query_id, ATHENA_API_KEY)
        
        #print(result_data)
        #print(result_data.keys())
        # Check if the query was successful before pagination.
        if result_data.get('state') != 'SUCCESS':
            print("Query did not complete successfully:", result_data)
            return
        
        total_results = result_data.get('totalArticles', 0)
        if total_results == 0:
            print("No results found.")
            return
        
        # Step 3: Paginate through the results and collect all articles.
        all_articles = fetch_all_articles(query_id, total_results, ATHENA_API_KEY)
        print("Total articles fetched:", len(all_articles))
    
    except requests.RequestException as e:
        print("An error occurred while communicating with the API:", e)

if __name__ == '__main__':
    main()
```

Using Synchronous Endpoint

This example makes use of the /api/v2/auery endpoint, which is best geared for smaller searches. It's simpler to use, but is not meant to handle queries generating large results. See API Reference for more details.

It begins by defining the necessary ATHENA_API_KEY. The payload dictionary specifies the query parameters, including a search term ("donald trump"), your api key, and other configuration details used to filter the results (for the sake of simplicity we are choosing to omit extra parameters).

The initial request is sent via a POST call to Athena’s API endpoint, with the response stored in data. This response includes metadata such as totalResults, allowing for efficient pagination. The loop then iterates over pages, adjusting the payload to request the next set of articles and appending each batch to all_articles. By the end, all_articles contains all retrieved articles matching the query, providing an easy way to gather a complete dataset in only a few lines of code.

This example makes use of the /api/v2/auery endpoint, which is best geared for smaller searches. Our /api/v2/query-async endpoint is suited for larger queries - see API Reference for more details.

import requests
import json

ATHENA_API_KEY = "your_api_key"
url = "https://app.runathena.com/api/v2/query"
headers = {
    "Content-Type": "application/json"
}

# Define the initial payload with the query and other required fields
payload = {
    "query": "donald trump",
    "api_key": ATHENA_API_KEY
}

# Send the initial request
response = requests.post(url, headers=headers, json=payload)
data = response.json()

# Initialize variables for pagination
page = 1
all_articles = []
articles_per_request = 25 

# Fetch and collect all articles while there are more pages
while page * articles_per_request < data.get('totalResults', 0):
    payload['page'] = page
    response = requests.post(url, headers=headers, json=payload)
    data = response.json()
    all_articles.extend(data.get('articles', []))
    page += 1

print(all_articles)

Last updated