Skip to main content
In this notebook we will demonstrate how the Dataframer Python SDK (PIP Package: pydataframer) can be used to generate large amounts of high quality synthetic datasets where each sample size can be arbitrarily large.

Prerequisites

  1. Service Principal Permissions (one-time admin setup)
    This notebook must be run using a service principal that has access to the required Unity Catalog objects.
    A Databricks admin should create a service principal and grant it:
    • USE CATALOG on the catalog
    • USE SCHEMA, CREATE TABLE, SELECT, and MODIFY on the schema
  2. In addition, any user that is using the Databricks UI to access or manage the tables used in this notebook, should be given the same set of permissions specified in Step 1.
  3. Dataframer Databricks Credentials (one-time admin setup)
    A Dataframer company admin must configure the Databricks service principal credentials in the Dataframer web application. Navigate to Profile > Keys > Databricks Credentials and enter:
    • Client ID — the service principal application (client) ID
    • Client Secret — the service principal secret
    • API Base URL — the Databricks Model Serving endpoint URL
      (e.g. https://adb-xxx.azuredatabricks.net/serving-endpoints)
    Once configured, these credentials are used automatically whenever any team member selects a databricks/ model for specs, runs, evaluations, or chat. No credentials need to be passed in API calls.
  4. A Dataframer API key is required for this demo. This can be retrieved by navigating to Profile > Keys > Copy API Key on the web application.
    Note that you can use the fully hosted Dataframer solution or an on-prem deployed version (reach out to [email protected] for more details).
  5. Databricks Secrets Setup
    This notebook expects the following secrets to be stored in a Databricks secret scope.
    End-users running this notebook need at least READ permission on the secret scope.
    In this example, we use a scope named dataframer.
    • DATAFRAMER_API_KEY — Dataframer API key (from step 4)
    • DATABRICKS_HTTP_PATH — SQL warehouse HTTP path (for data access)
    • DATABRICKS_CLIENT_ID — Service principal client ID (for data access)
    • DATABRICKS_CLIENT_SECRET — Service principal secret (for data access)
    • DATABRICKS_SERVER_HOSTNAME — Databricks workspace hostname (for data access)
The Databricks secrets being stored in a secret scope above are used by the DatabricksConnector for SQL access (fetching sample data and loading generated data into tables). Model Serving authentication is handled automatically by Dataframer using the credentials configured in step 3.

Step 1: Install and Setup SDK

Install the Dataframer SDK and the Databricks connector package for Dataframer.
%%capture
%pip install --upgrade pydataframer pydataframer-databricks pyyaml tenacity

Initialize the Dataframer client and the DatabricksConnector

In this step, we initialize the Dataframer client using an API key stored securely in Databricks Secrets under the dataframer scope. We also initialize the DatabricksConnector with dataframer scope to access/persist data from/into Unity Catalog.
import os
from datetime import datetime
from io import BytesIO
from pathlib import Path

import dataframer
import requests
import yaml
from dataframer import Dataframer
from pydataframer_databricks import DatabricksConnector
from tenacity import retry, retry_if_result, stop_never, wait_fixed

# Initialize the pydataframer-databricks connector
databricks_connector = DatabricksConnector(dbutils, scope="dataframer")

# Initialize the Dataframer client
client = Dataframer(api_key=dbutils.secrets.get("dataframer", "DATAFRAMER_API_KEY"))

print("✓ Dataframer client initialized successfully")
print(f"Using base URL: {client.base_url}")
print(f"Dataframer SDK version: {dataframer.__version__}")

Fetch sample data

For this demo, we use sample data available in the Databricks catalog, specifically the samples.bakehouse.media_customer_reviews table. To keep the example lightweight, we select only the top 25 rows, export them as a CSV file, and upload the file to Dataframer.
sample_reviews_df = databricks_connector.fetch_sample_data(num_items_to_select=25, table_name="samples.bakehouse.media_customer_reviews")

Step 2: Upload data

Prepare CSV and upload to Dataframer

To use Dataframer, only a small sample of data is needed. To derive these samples from a table, we recommend creating a CSV file that contain the relevant rows from the table and supplying that as a Seed dataset to Dataframer.
# Convert the sampled Pandas DataFrame to an in-memory CSV file
csv_file = BytesIO(
    sample_reviews_df.to_csv(index=False).encode("utf-8")
)
csv_file.name = "media_customer_reviews_top_25.csv"

# Upload the CSV to Dataframer
try:
    dataset = client.dataframer.seed_datasets.create_with_files(
        name="media_customer_reviews_sample_jan27_26",
        description="Top 25 rows from Databricks samples.bakehouse.media_customer_reviews",
        dataset_type="SINGLE_FILE",
        files=[csv_file]
    )
    print(f"Upload complete ✅\nDataset ID: {dataset.id}")

except Exception as e:
    if "already exists" in str(e):
        print("Dataset already exists — using existing dataset ✅")
        dataset = next(
            d for d in client.dataframer.seed_datasets.list()
            if d.name == "media_customer_reviews_sample_jan27_26"
        )
        print(f"Dataset ID: {dataset.id}")
    else:
        raise

dataset_id = dataset.id

Retrieve Dataset Details

This API demonstrates how to retrieve a specific dataset given a dataset ID.
# Get detailed information about the dataset
dataset_info = client.dataframer.seed_datasets.retrieve(dataset_id=dataset_id)

print("📋 Dataset Information:")
print("=" * 80)
print(f"ID: {dataset_info.id}")
print(f"Name: {dataset_info.name}")
print(f"Type: {dataset_info.dataset_type}")
print(f"Description: {dataset_info.description}")
print(f"Created: {dataset_info.created_at}")
print()
print(f"📁 Contents:")
print(f"  Files: {dataset_info.file_count}")
print(f"  Folders: {dataset_info.folder_count}")
print("=" * 80)

Step 3: Generate Specification via the analysis API

A specification (or “spec”) is a detailed description that captures the structure, patterns, and requirements of your data. Think of a spec as a blueprint for your data generation task. Dataframer automatically generates specifications by analyzing your seed data. This cell ensures a specification exists for the dataset by reusing an existing one or generating a new one. When using a databricks/ model, Dataframer automatically retrieves your company’s stored Databricks credentials (configured by your admin in Profile > Keys).
spec_name = f"spec_for_dataset_{dataset_id}"

# --- Check for existing spec ---
specs = client.dataframer.specs.list()

existing_spec = next(
    (s for s in specs if s.name == spec_name),
    None
)

spec_id = None

if existing_spec:
    spec_id = existing_spec.id
    print("Spec already exists — reusing existing spec ✅")
else:
    spec = client.dataframer.specs.create(
        dataset_id=dataset_id,
        name=spec_name,
        spec_generation_model_name="databricks/databricks-claude-sonnet-4-5",
        extrapolate_values=True,
        generate_distributions=True,
    )
    spec_id = spec.id
    print(f"Started specification generation:")
    print(f"Spec ID: {spec_id}")

    def spec_not_ready(result):
        return result.status not in ("SUCCEEDED", "FAILED")

    @retry(wait=wait_fixed(5), retry=retry_if_result(spec_not_ready), stop=stop_never)
    def poll_spec_status(client, spec_id):
        return client.dataframer.specs.retrieve(spec_id=spec_id)

    print("Polling for spec status (this may take a minute)...")
    spec_status = poll_spec_status(client, spec_id)

    if spec_status.status == "FAILED":
        raise RuntimeError(spec_status.error or "Unknown error")

    print(f"\nSpec generated successfully! ✅")

print(f"spec_id: {spec_id}")

Review Generated Specification

This cell retrieves the latest version of the generated specification and inspects key properties inferred from the dataset, such as data property variations.
# Get the spec (latest content_yaml is returned directly)
spec = client.dataframer.specs.retrieve(spec_id=spec_id)

# Parse the configuration YAML
config = yaml.safe_load(spec.content_yaml)
spec_data = config.get("spec", config)

print("\nData property variations:")
for prop in spec_data.get("data_property_variations", []):
    print(f"  • {prop['property_name']}: {len(prop['property_values'])} values")

Step 4: Update Specification (Optional)

This cell demonstrates how to programmatically update a given specification. To keep this demo simple, the update is applied only when the specification is newly created (i.e., when the latest version is 1). If the specification has already been updated, this step is skipped. In this step, we add a new data property called Review Detail Level with values 'Very brief', 'Brief', 'Moderate', 'Detailed' and expected distributions [15, 30, 35, 20].
# Get the spec with version history
spec = client.dataframer.specs.retrieve(spec_id=spec_id, include_versions=True)

# To keep this demo simple, only update if this is a newly created spec (1 version)
if spec.versions and len(spec.versions) > 1:
    print("ℹ️ Specification has already been updated — skipping this step")
else:
    # Parse the current config
    current_config = yaml.safe_load(spec.content_yaml)
    spec_data = current_config.get('spec', current_config)

    # Ensure data_property_variations exists
    spec_data.setdefault('data_property_variations', [])

    # Add new data property variation
    new_property = {
        'property_name': 'Review Detail Level',
        'property_values': ['Very brief', 'Brief', 'Moderate', 'Detailed'],
        'base_distributions': {
            'Very brief': 15,
            'Brief': 30,
            'Moderate': 35,
            'Detailed': 20
        },
        'conditional_distributions': {}
    }

    spec_data['data_property_variations'].append(new_property)
    print(f"✓ Added new property: {new_property['property_name']}")

    if 'requirements' in spec_data:
        spec_data['requirements'] += (
            "\n\nGenerated reviews should vary naturally in length and level of detail, "
            "while maintaining an informal customer review tone."
        )
        print("✓ Updated requirements for review context")

    # Convert back to YAML
    new_content_yaml = yaml.dump(
        current_config,
        default_flow_style=False,
        sort_keys=False
    )

    # Update the spec (creates a new version automatically)
    updated_spec = client.dataframer.specs.update(
        spec_id=spec_id,
        content_yaml=new_content_yaml
    )

    print("✓ Spec updated successfully")

Step 5: Generate New Samples

Once the spec is generated and finalized after any manual modifications, we will use this spec to generate synthetic data. Databricks Model Serving credentials are retrieved automatically from your company’s stored configuration.
# --- Start generation run ---
run = client.dataframer.runs.create(
    spec_id=spec_id,
    generation_model="databricks/databricks-claude-sonnet-4-5",
    number_of_samples=3,

    ## Advanced configuration for outline generation
    outline_model="databricks/databricks-claude-sonnet-4-5",

    # enable_revisions=True,
    # max_revision_cycles=2,
    # revision_model="databricks/databricks-claude-sonnet-4-5",
)

run_id = run.id

print("Started generation run")
print(f"Run ID: {run_id}")

def run_not_finished(result):
    return result.status not in ("SUCCEEDED", "FAILED")

@retry(wait=wait_fixed(10), retry=retry_if_result(run_not_finished), stop=stop_never)
def poll_run_status(client, run_id):
    return client.dataframer.runs.retrieve(run_id=run_id)

print("Polling for run status (this may take a couple of minutes)...")
run_status = poll_run_status(client, run_id)

if run_status.status == "FAILED":
    raise RuntimeError("Generation failed")

print(f"\nGeneration completed successfully!")
print(f"Run ID: {run_id}")
print(f"Samples completed: {run_status.samples_completed}")

Step 6: Evaluate Generated Samples

While Dataframer evaluates each sample as it is generated, it also supports a post-generation evaluation. This API shows how to evaluate the generated dataset. Read the documentation for more details.
# --- Start evaluation ---
print(f"Creating evaluation for run: {run_id}")

evaluation = client.dataframer.evaluations.create(
    run_id=run_id,
    evaluation_model="databricks/databricks-claude-sonnet-4-5"
)

evaluation_id = evaluation.id

print("\nEvaluation created")
print(f"Evaluation ID: {evaluation_id}")
print(f"Created at   : {evaluation.created_at}")

def eval_not_finished(result):
    return result.status not in ("SUCCEEDED", "FAILED")

@retry(wait=wait_fixed(5), retry=retry_if_result(eval_not_finished), stop=stop_never)
def poll_eval_status(client, evaluation_id):
    return client.dataframer.evaluations.retrieve(evaluation_id=evaluation_id)

print("Polling for evaluation status...")
eval_status = poll_eval_status(client, evaluation_id)

if eval_status.status == "FAILED":
    print("\nEvaluation failed. ❌")
    if eval_status.error_message:
        print(f"  Error: {eval_status.error_message}")
else:
    print("\nEvaluation completed successfully! ✅")

Step 7: Download Generated Files

List Generated Files

This API lists all the files that were present in the generated dataset.
# Get generated files for the run
run_details = client.dataframer.runs.retrieve(run_id=run_id)

print("📁 Generated Files:")
print("=" * 80)
print(f"Run ID: {run_id}")
if run_details.generated_files:
    print(f"Total files: {len(run_details.generated_files)}")
    print("=" * 80)

    for i, file in enumerate(run_details.generated_files, 1):
        print(f"\n📄 File {i}:")
        print(f"  Path: {file.path}")
        print(f"  ID: {file.id}")
        print(f"  Size: {file.size_bytes} bytes")
        print(f"  Type: {file.file_type}")
else:
    print("No generated files found")
    print("=" * 80)

Download All Files as ZIP

This API allows you to download all the generated files with metadata as a compressed ZIP file.
print("📥 Downloading generated files with metadata as ZIP...")

def download_not_ready(response):
    return not hasattr(response, 'download_url') or response.download_url is None

@retry(wait=wait_fixed(2), retry=retry_if_result(download_not_ready), stop=stop_never,
       before_sleep=lambda rs: print("  ZIP generation in progress, waiting..."))
def poll_download(client, run_id):
    return client.dataframer.runs.files.download_all(run_id=run_id)

response = poll_download(client, run_id)

# Download the ZIP from the presigned URL
zip_content = requests.get(response.download_url).content
downloaded_zip = BytesIO(zip_content)

print(f"\n✅ Downloaded ZIP file ({response.size_bytes} bytes)")

Load generated data into a Delta table

This cell writes the generated data into a Delta table <catalog>.<schema>.<table_name>. If the table already exists, its contents are overwritten.
from pydataframer_databricks import DatasetType, FileType

databricks_connector.load_to_table(table_name="workspace.default.generated_samples",
                                        downloaded_zip=downloaded_zip,
                                        dataset_type=DatasetType(dataset_info.dataset_type),
                                        file_type=FileType.CSV
                                    )