Skip to main content
This guide demonstrates how to use the Dataframer Python SDK within Databricks to generate high-quality synthetic data from your existing datasets.

Prerequisites

  1. Service Principal Permissions (one-time admin setup)
    This notebook must be run using a service principal that has access to the required Unity Catalog objects. A Databricks admin should create a service principal and grant it:
    • USE CATALOG on the catalog
    • USE SCHEMA, CREATE TABLE, SELECT, and MODIFY on the schema
  2. Dataframer API Key
    A Dataframer API key is required for this demo. This can be retrieved by navigating to Account -> Keys -> Copy API Key on the web application. Note that you can use the fully hosted Dataframer solution or an on-prem deployed version (reach out to [email protected] for more details).
  3. Databricks Secrets Setup
    This notebook expects the following secrets to be stored in a Databricks secret scope. In this example, we use a scope named dataframer.
    • DATAFRAMER_API_KEY
    • DATABRICKS_HTTP_PATH
    • DATABRICKS_CLIENT_ID
    • DATABRICKS_CLIENT_SECRET
    • DATABRICKS_SERVER_HOSTNAME

Step 1: Install and Setup SDK

Install the Dataframer SDK and the Databricks connector package for Dataframer.
%%capture
%pip install --upgrade pydataframer pydataframer-databricks pyyaml tenacity

Initialize the Dataframer client and the DatabricksConnector

In this step, we initialize the Dataframer client using an API key stored securely in Databricks Secrets under the dataframer scope. We also initialize the DatabricksConnector with dataframer scope to access/persist data from/into Unity Catalog.
import os
from datetime import datetime
from pathlib import Path

import dataframer
from dataframer import Dataframer
from pydataframer_databricks import DatabricksConnector
from tenacity import retry, retry_if_result, stop_never, wait_fixed

# Initialize the pydataframer-databricks connector
databricks_connector = DatabricksConnector(dbutils, scope="dataframer")

# Initialize the Dataframer client
client = Dataframer(api_key=dbutils.secrets.get("dataframer", "DATAFRAMER_API_KEY"))

print("✓ Dataframer client initialized successfully")
print(f"Using base URL: {client.base_url}")
print(f"Dataframer SDK version: {dataframer.__version__}")

Fetch sample data

For this demo, we use sample data available in the Databricks catalog, specifically the samples.bakehouse.media_customer_reviews table. To keep the example lightweight, we select only the top 25 rows, export them as a CSV file, and upload the file to Dataframer.
sample_reviews_df = databricks_connector.fetch_sample_data(num_items_to_select=25, table_name="samples.bakehouse.media_customer_reviews")

Step 2: Upload data

Prepare CSV and upload to Dataframer

To use Dataframer, only a small sample of data is needed. To derive these samples from a table, we recommend creating a CSV file that contain the relevant rows from the table and supplying that as a Seed dataset to Dataframer.
from io import BytesIO

# Convert the sampled Pandas DataFrame to an in-memory CSV file
csv_file = BytesIO(
    sample_reviews_df.to_csv(index=False).encode("utf-8")
)
csv_file.name = "media_customer_reviews_top_25.csv"

# Upload the CSV to Dataframer
dataset = client.dataframer.seed_datasets.create_with_files(
    name=f"media_customer_reviews_sample_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
    description="Top 25 rows from Databricks samples.bakehouse.media_customer_reviews",
    dataset_type="SINGLE_FILE",
    files=[csv_file]
)
print(f"Upload complete ✅\nDataset ID: {dataset.id}")

dataset_id = dataset.id

Retrieve Dataset Details

This API demonstrates how to retrieve a specific dataset given a dataset ID.
# Get detailed information about the dataset
dataset_info = client.dataframer.seed_datasets.retrieve(dataset_id=dataset_id)

print("📋 Dataset Information:")
print("=" * 80)
print(f"ID: {dataset_info.id}")
print(f"Name: {dataset_info.name}")
print(f"Type: {dataset_info.dataset_type}")
print(f"Description: {dataset_info.description}")
print(f"Created: {dataset_info.created_at}")
print()
print(f"📁 Contents:")
print(f"  Files: {dataset_info.file_count}")
print(f"  Folders: {dataset_info.folder_count}")
print()
print(f"🔧 Compatibility:")
compat = dataset_info.short_sample_compatibility
print(f"  Short samples:  {'✅' if compat.is_short_samples_compatible else '❌'}")
print(f"  Long samples:   {'✅' if compat.is_long_samples_compatible else '❌'}")
if compat.reason:
    print(f"  Reason: {compat.reason}")
print("=" * 80)

Step 3: Generate Specification via the analysis API

A specification (or “spec”) is a detailed description that captures the structure, patterns, and requirements of your data. Think of a spec as a blueprint for your data generation task. Dataframer automatically generates specifications by analyzing your seed data. This cell ensures a specification exists for the dataset by reusing an existing one or generating a new one.
spec = client.dataframer.specs.create(
    dataset_id=dataset_id,
    name=f"spec_for_dataset_{dataset_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
    spec_generation_model_name="anthropic/claude-sonnet-4-5",
    extrapolate_values=True,
    generate_distributions=True,
)
spec_id = spec.id
print(f"Started specification generation:")
print(f"Spec ID: {spec_id}")

def spec_not_ready(result):
    return result.status not in ("SUCCEEDED", "FAILED")

@retry(wait=wait_fixed(5), retry=retry_if_result(spec_not_ready), stop=stop_never)
def poll_spec_status(client, spec_id):
    return client.dataframer.specs.retrieve(spec_id=spec_id)

print("Polling for spec status...")
spec_status = poll_spec_status(client, spec_id)

if spec_status.status == "FAILED":
    raise RuntimeError(spec_status.error or "Unknown error")

print(f"\nSpec generated successfully! ✅")
print(f"spec_id: {spec_id}")

Review Generated Specification

This cell retrieves the latest version of the generated specification and inspects key properties inferred from the dataset, such as data property variations.
import yaml

# Get the spec (latest content_yaml is returned directly)
spec = client.dataframer.specs.retrieve(spec_id=spec_id)

# Parse the configuration YAML
config = yaml.safe_load(spec.content_yaml)
spec_data = config.get("spec", config)

print("\nData property variations:")
for prop in spec_data.get("data_property_variations", []):
    print(f"  • {prop['property_name']}: {len(prop['property_values'])} values")

Step 4: Update Specification (Optional)

This cell demonstrates how to programmatically update a given specification. To keep this demo simple, the update is applied only when the specification is newly created (i.e., when the latest version is 1). If the specification has already been updated, this step is skipped. In this step, we add a new data property called Review Detail Level with values 'Very brief', 'Brief', 'Moderate', 'Detailed' and expected distributions [15, 30, 35, 20].
import yaml

# Get the spec with version history
spec = client.dataframer.specs.retrieve(spec_id=spec_id, include_versions=True)

# To keep this demo simple, only update if this is a newly created spec (1 version)
if spec.versions and len(spec.versions) > 1:
    print("ℹ️ Specification has already been updated — skipping this step")
else:
    # Parse the current config
    current_config = yaml.safe_load(spec.content_yaml)
    spec_data = current_config.get('spec', current_config)

    # Ensure data_property_variations exists
    spec_data.setdefault('data_property_variations', [])

    # Add new data property variation
    new_property = {
        'property_name': 'Review Detail Level',
        'property_values': ['Very brief', 'Brief', 'Moderate', 'Detailed'],
        'base_distributions': {
            'Very brief': 15,
            'Brief': 30,
            'Moderate': 35,
            'Detailed': 20
        },
        'conditional_distributions': {}
    }

    spec_data['data_property_variations'].append(new_property)
    print(f"✓ Added new property: {new_property['property_name']}")

    if 'requirements' in spec_data:
        spec_data['requirements'] += (
            "\n\nGenerated reviews should vary naturally in length and level of detail, "
            "while maintaining an informal customer review tone."
        )
        print("✓ Updated requirements for review context")

    # Convert back to YAML
    new_content_yaml = yaml.dump(
        current_config,
        default_flow_style=False,
        sort_keys=False
    )

    # Update the spec (creates a new version automatically)
    updated_spec = client.dataframer.specs.update(
        spec_id=spec_id,
        content_yaml=new_content_yaml
    )

    print("✓ Spec updated successfully")

Step 5: Generate New Samples

Once the spec is generated and finalized after any manual modifications, we will use this spec to generate synthetic data. Refer to this document for more details on the sample generation.
# --- Start generation run ---
run = client.dataframer.runs.create(
    spec_id=spec_id,
    generation_model="anthropic/claude-sonnet-4-5",
    number_of_samples=3,

    ## Advanced configuration for outline generation (use -thinking suffix for extended thinking)
    outline_model="anthropic/claude-sonnet-4-5-thinking",

    # enable_revisions=True,
    # max_revision_cycles=2,
    # revision_model="anthropic/claude-sonnet-4-5-thinking",
)

run_id = run.id

print("Started generation run")
print(f"Run ID: {run_id}")

def run_not_finished(result):
    return result.status not in ("SUCCEEDED", "FAILED")

@retry(wait=wait_fixed(10), retry=retry_if_result(run_not_finished), stop=stop_never)
def poll_run_status(client, run_id):
    return client.dataframer.runs.retrieve(run_id=run_id)

print("Polling for run status...")
run_status = poll_run_status(client, run_id)

if run_status.status == "FAILED":
    raise RuntimeError("Generation failed")

print(f"\nGeneration completed successfully!")
print(f"Run ID: {run_id}")
print(f"Samples completed: {run_status.samples_completed}")

Step 6: Evaluate Generated Samples

While Dataframer evaluates each sample as it is generated, it also supports a post-generation evaluation. This API shows how to evaluate the generated dataset. Read the documentation for more details.
# --- Start evaluation ---
print(f"Creating evaluation for run: {run_id}")

evaluation = client.dataframer.evaluations.create(
    run_id=run_id,
    evaluation_model="anthropic/claude-sonnet-4-5"
)

evaluation_id = evaluation.id

print("\nEvaluation created")
print(f"Evaluation ID: {evaluation_id}")
print(f"Created at   : {evaluation.created_at}")

def eval_not_finished(result):
    return result.status not in ("SUCCEEDED", "FAILED")

@retry(wait=wait_fixed(5), retry=retry_if_result(eval_not_finished), stop=stop_never)
def poll_eval_status(client, evaluation_id):
    return client.dataframer.evaluations.retrieve(evaluation_id=evaluation_id)

print("Polling for evaluation status...")
eval_status = poll_eval_status(client, evaluation_id)

if eval_status.status == "FAILED":
    print("\nEvaluation failed. ❌")
    if eval_status.error_message:
        print(f"  Error: {eval_status.error_message}")
else:
    print("\nEvaluation completed successfully! ✅")

Step 7: Download Generated Files

List Generated Files

This API lists all the files that were present in the generated dataset.
# Get generated files for the run
run_details = client.dataframer.runs.retrieve(run_id=run_id)

print("📁 Generated Files:")
print("=" * 80)
print(f"Run ID: {run_id}")
if run_details.generated_files:
    print(f"Total files: {len(run_details.generated_files)}")
    print("=" * 80)

    for i, file in enumerate(run_details.generated_files, 1):
        print(f"\n📄 File {i}:")
        print(f"  Path: {file.path}")
        print(f"  ID: {file.id}")
        print(f"  Size: {file.size_bytes} bytes")
        print(f"  Type: {file.file_type}")
else:
    print("No generated files found")
    print("=" * 80)

Download All Files as ZIP

This API allows you to download all the generated files with metadata as a compressed ZIP file.
import requests

print("📥 Downloading generated files with metadata as ZIP...")

def download_not_ready(response):
    return not hasattr(response, 'download_url') or response.download_url is None

@retry(wait=wait_fixed(2), retry=retry_if_result(download_not_ready), stop=stop_never,
       before_sleep=lambda rs: print("  ZIP generation in progress, waiting..."))
def poll_download(client, run_id):
    return client.dataframer.runs.generated_files.download_all(run_id=run_id)

response = poll_download(client, run_id)

# Download the ZIP from the presigned URL
zip_content = requests.get(response.download_url).content

# Wrap in BytesIO for the next cell
from io import BytesIO
downloaded_zip = BytesIO(zip_content)

print(f"\n✅ Downloaded ZIP file ({response.size_bytes} bytes)")

Load generated data into a Delta table

This cell writes the generated data into a Delta table <catalog>.<schema>.<table_name>. If the table already exists, its contents are overwritten.
from pydataframer_databricks import DatasetType, FileType

databricks_connector.load_generated_data(table_name="workspace.default.generated_samples",
                                        downloaded_zip=downloaded_zip,
                                        dataset_type=DatasetType(dataset_info.dataset_type),
                                        file_type=FileType.CSV
                                    )

Cleanup

Delete the spec and dataset generated in this notebook.
# Delete the spec (force=True also deletes all runs and generated files)
client.dataframer.specs.delete(spec_id=spec_id, force=True)
print(f"Deleted spec {spec_id} and all associated runs")

# Delete the dataset
client.dataframer.seed_datasets.delete(dataset_id=dataset_id)
print(f"Deleted dataset {dataset_id}")