API Quickstart

Making your first request with the Athena API is straightforward with this script.
Athena's News API operates asynchronously — meaning that when you are submitting your query, a job is created (along with a unique job id). You'll then have to poll the get-results endpoint with that job id to see if your job is done.
Submit query and get a query id
Every x seconds poll the get-results endpoint to see if the query is finished, passing in the query id from the previous step
If the search is complete, your results will be returned
Using Async Query Endpoint
First define your ATHENA_API_KEY
. The payload
dictionary specifies the query parameters, including a search term ("donald trump"), your api_key, other configuration details to filter the results (for the sake of simplicity we are choosing to omit extra parameters — see API Reference for other options).
The initial request is sent via a POST
call to Athena’s query-async API endpoint. The response contains your query_id, which is how you will retrieve results from your search with the /api/v2/get-results
endpoint.
import time
import requests
# Constants
ATHENA_API_KEY = "YOUR_API_KEY"
QUERY = "Tesla dealership protests"
KEY_PHRASES = "('tesla takedown')"
start_date = "2025-03-01T15:13:52.466Z"
end_date = "2025-03-20T15:13:52.466Z"
QUERY_URL = "https://app.runathena.com/api/v2/query-async"
RESULTS_URL = "https://app.runathena.com/api/v2/get-results"
HEADERS = {"Content-Type": "application/json"}
ARTICLES_PER_PAGE = 25
POLL_INTERVAL = 1 # seconds to wait between polls
def send_initial_query(query: str, api_key: str) -> str:
"""
Sends the initial query to the API and returns the query_id.
"""
payload = {"query": query, "key_phrases":KEY_PHRASES,"api_key": api_key,'toggle_state':'All Articles',"start_date":start_date,"end_date":end_date}
response = requests.post(QUERY_URL, headers=HEADERS, json=payload)
response.raise_for_status() # raise an error for bad responses
data = response.json()
return data.get('query_id')
def poll_for_results(query_id: str, api_key: str, interval: int = POLL_INTERVAL) -> dict:
"""
Polls the API until the query status is not 'PENDING'.
Returns the final result data.
"""
payload = {"query_id": query_id, "api_key": api_key}
response = requests.post(RESULTS_URL, headers=HEADERS, json=payload)
response.raise_for_status()
data = response.json()
# Poll until the status changes from 'PENDING'
while data.get('state') == 'PENDING':
time.sleep(interval)
response = requests.post(RESULTS_URL, headers=HEADERS, json=payload)
response.raise_for_status()
data = response.json()
return data
def fetch_all_articles(query_id: str, total_results: int, api_key: str) -> list:
"""
Fetches and aggregates all articles by paginating through the results.
"""
all_articles = []
page = 1
payload = {"query_id": query_id, "api_key": api_key,'toggle_state':'Encoded Articles'}
# Continue fetching while there are articles remaining.
while (page - 1) * ARTICLES_PER_PAGE < total_results:
payload['page'] = page
response = requests.post(RESULTS_URL, headers=HEADERS, json=payload)
response.raise_for_status()
data = response.json()
articles = data.get('articles', [])
all_articles.extend(articles)
page += 1
return all_articles
def main():
try:
# Step 1: Send the initial query and retrieve the query_id.
query_id = send_initial_query(QUERY, ATHENA_API_KEY)
if not query_id:
print("Failed to retrieve query ID.")
return
# Step 2: Poll the API until the query is processed.
result_data = poll_for_results(query_id, ATHENA_API_KEY)
#print(result_data)
#print(result_data.keys())
# Check if the query was successful before pagination.
if result_data.get('state') != 'SUCCESS':
print("Query did not complete successfully:", result_data)
return
total_results = result_data.get('totalArticles', 0)
if total_results == 0:
print("No results found.")
return
# Step 3: Paginate through the results and collect all articles.
all_articles = fetch_all_articles(query_id, total_results, ATHENA_API_KEY)
print("Total articles fetched:", len(all_articles))
except requests.RequestException as e:
print("An error occurred while communicating with the API:", e)
if __name__ == '__main__':
main()
Last updated