Gather Data and Write to a CSV

Use Case Example - Gather All Data from a Dataset and write to a CSV

This use case enables you to use transactions to gather data from a specific dataset and write this data to a CSV file

  • Optional: Limit by row creation start and end dates
  • Optional: Keep all dataset transactions or only the most recent for a unique CVID (row)

This use case enables you to extract data from existing datasets by transactions, according to your criteria. This is useful if you want to:

  • Use the API to automate pulling the most up-to-date data from a dataset and use it in other non-Claravine systems such as analytics, data lakes, or reporting tools.
  • See a complete history of all transactions, by row, for a dataset

Claravine API endpoints used in this use case:

Workflow Overview

  1. Use the UUID of the dataset you are looking to pull data from.
  2. Optional: If you need the UUID, use GET /v1/datasets to gather the Dataset UUID.
  3. List all transactions for that UUID using https://api.claravine.com/v1/datasets/\{datasetUuid}/transactions
  4. Optional: set start and end dates to limit to a specific transaction timeframe
  5. Cycle thru all the transactions, getting the data for each transaction, using GET https://api.claravine.com/v1/datasets/\{datasetUuid}/transactions/\{transactionUuid}/data
  6. Augment the output by adding Claravine IDs (CVIDs), transaction UUIDs, transaction dates, and Claravine ID (CVID), aka row, creation dates. This allows you to see the transaction histories for each row.
  7. Decision: gather all transactions for this dataset UUID or only the most recent transaction for each unique row (CVID). User responds to script prompt with either A for all transactions or M for most recent transactions.
    1. Note: Gathering only the most recent data for each row is similar to the dataset view in The Data Standards Cloud UI. Gathering all transactions allows you to see the full history of edits for every unique row.
  8. Write output to a CSV file

The output of the application will be as follows (displayed as columns in a .csv):

Claravine IDField AField BField Ctransaction_datetransaction_uuidrow_creation_date

Example Python Script

Follow these steps (script available below), and run them at a frequency based on business needs (ex. daily, weekly, monthly):
Note: Before attempting to run the script, make sure you have the following libraries installed:

In order to run the script, you need to edit the first code lines and add values for:

  • DOMAIN (Account domain, e.g. DOMAIN = "your_account.claravine.com")
  • API_KEY (Claravine Public API Key)
  • API_SECRET (Claravine Public API Secret)
  • DATASET_UUID (Dataset unique identifier)
  • START_DATE = "2020-01-01T00:00:00Z" (In ISO 8601 format, specifically the UTC date-time format)
  • END_DATE = "2026-12-31T23:59:59Z" (In ISO 8601 format, specifically the UTC date-time format)
  • Then save the following code into a file (e.g. "dataset_transactions.py") and execute in two different ways:
    • Option 1: dataset_transactions.py (respond “A” for ALL). The script will gather all transactions, producing a history of every row in the dataset
    • Option 2: dataset_transactions.py (respond “M” for MOST RECENT). The script will gather only the most recent transactions for each row, producing a set of data similar to the dataset view in The Data Standards Cloud UI.
  • The script will generate a .csv file with all the dataset fields as well as the additional reference fields of Claravine ID, transaction_uuid, transaction_date, and row_creation_date.
import os
import json
import requests
import pandas as pd
from datetime import datetime
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


'''TODO: Edit the following lines to use your values'''
API_KEY = '' # Claravine API Key
API_SECRET = '' # Claravine API Secret
DATASET_UUID = '' # DUP NAMES # (e.g. '1d32124b-a751-4726-a82c-28899c1ec76f')
START_DATE = "2020-01-01T00:00:00Z"
END_DATE = "2026-12-31T23:59:59Z"
'''End of lines to be edited'''


def create_session():
	session = requests.Session()
	retry_strategy = Retry(
		total=5,
		status_forcelist=[429, 500, 502, 503, 504],
		backoff_factor=0.5,
		raise_on_status=False
	)
	adapter = HTTPAdapter(max_retries=retry_strategy)
	session.mount("https://", adapter)
	session.mount("http://", adapter)
	return session


def fetch_transactions(dataset_uuid, headers, start_date, end_date):
	print("🔁 Fetching all transactions with pagination...")

	session = create_session()
	base_url = f"https://api.claravine.com/v1/datasets/{dataset_uuid}/transactions"
	params = {
		"start-date": start_date,
		"end-date": end_date,
		"page": 1
	}


	all_transactions = []

	while True:
		response = session.get(base_url, headers=headers, params=params)
		if response.status_code != 200:
			print(f"❌ Error fetching transactions: {response.status_code} - {response.text}")
			break

		data = response.json()
		transactions = data.get("transactions", [])
		all_transactions.extend(transactions)

		pagination = data.get("pagination", {})
		current_page = pagination.get("page", 1)
		total_pages = pagination.get("totalPages", 1)

		print(f"✅ Retrieved page {current_page} of {total_pages} ({len(transactions)} transactions)")

		if current_page >= total_pages:
			break

		params["page"] += 1

	print(f"📦 Total transactions collected: {len(all_transactions)}")
	return all_transactions


def fetch_transaction_data(dataset_uuid, transactions, headers):
	session = create_session()
	all_data = []
	columns = []

	for i, txn in enumerate(transactions, 1):
		txn_uuid = txn.get("uuid")
		txn_created = txn.get("date", "N/A")

		print(f"📅 Fetching data for transaction {i}/{len(transactions)}: {txn_uuid}")
		url = f"https://api.claravine.com/v1/datasets/{dataset_uuid}/transactions/{txn_uuid}/data"
		resp = session.get(url, headers=headers)

		if resp.status_code != 200:
			print(f"❌ Failed to fetch data for transaction {txn_uuid}: {resp.status_code}")
			continue

		try:
			data_json = resp.json().get("data", {})
			if not columns:
				columns = data_json.get("fields", []) + ["transaction_date", "transaction_uuid"]

			rows = data_json.get("rows", [])
			for row in rows:
				row.extend([txn_created, txn_uuid])
			all_data.extend(rows)

		except json.JSONDecodeError:
			print(f"⚠️ Invalid JSON for transaction {txn_uuid}")

	return all_data, columns


def process_and_save_data(data, columns, dataset_uuid, keep_all=False):
	if not data or not columns:
		print("⚠️ No data or columns found to write to CSV.")
		return

	df = pd.DataFrame(data, columns=columns)
	df["transaction_date"] = pd.to_datetime(df["transaction_date"], errors="coerce")

	if keep_all:
		df_final = df.copy()
		print(f"📌 Keeping all transactions ({len(df_final)} rows)")
	else:
		created_dates = df.groupby("Claravine ID")["transaction_date"].min().reset_index()
		created_dates.rename(columns={"transaction_date": "row_creation_date"}, inplace=True)
		df_latest = df.sort_values("transaction_date").drop_duplicates(subset="Claravine ID", keep="last")
		df_final = pd.merge(df_latest, created_dates, on="Claravine ID", how="left")
		print(f"📌 Keeping most recent transaction per Claravine ID ({len(df_final)} rows)")

	timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
	filename = f"transaction_data_{dataset_uuid}_{timestamp}.csv"
	df_final.to_csv(filename, index=False)

	print(f"✅ Saved data to {os.path.abspath(filename)}")
	print(f"📟 Unique Claravine IDs: {df_final['Claravine ID'].nunique()}")


if __name__ == "__main__":
	if (API_KEY == '' or API_SECRET == '' or DATASET_UUID == ''):
		error_msg = f"[ERROR] Please edit the first lines of the script and provide values for: 'API_KEY', 'API_SECRET' and 'DATASET_UUID'."
		raise RuntimeError(error_msg)

	headers = {
		'accept': 'application/json',
		'x-claravine-key': API_KEY,
		'x-claravine-secret': API_SECRET
	}

	choice = input("Do you want to keep [A]ll transactions or only the [M]ost recent per 'Claravine ID'? (A/M): ").strip().lower()
	keep_all = choice == 'a'

	transactions = fetch_transactions(DATASET_UUID, headers, START_DATE, END_DATE)
	data, columns = fetch_transaction_data(DATASET_UUID, transactions, headers)
	process_and_save_data(data, columns, DATASET_UUID, keep_all=keep_all)