Generate high-quality synthetic single-file datasets within Databricks using DataFramer.
Open in Google Colab
Run this exact tutorial interactively in Google Colab
In this notebook we will demonstrate how the DataFramer Python SDK (PIP Package: pydataframer) can be used to generate large amounts of high quality synthetic datasets where each sample size can be arbitrarily large.
Service Principal Permissions (one-time admin setup)
This notebook must be run using a service principal that has access to the required Unity Catalog objects.
A Databricks admin should create a service principal and grant it:
USE CATALOG on the catalog
USE SCHEMA, CREATE TABLE, SELECT, and MODIFY on the schema
In addition, any user that is using the Databricks UI to access or manage the tables used in this notebook, should be given the same set of permissions specified in Step 1.
DataFramer Databricks Credentials (one-time admin setup)
A DataFramer company admin must configure the Databricks service principal credentials in the DataFramer web application. Navigate to Profile > Keys > Databricks Credentials and enter:
Client ID — the service principal application (client) ID
Client Secret — the service principal secret
API Base URL — the Databricks Model Serving endpoint URL
(e.g. https://adb-xxx.azuredatabricks.net/serving-endpoints)
Once configured, these credentials are used automatically whenever any team member selects a databricks/ model for specs, runs, evaluations, or chat. No credentials need to be passed in API calls.
A Dataframer API key is required for this demo. This can be retrieved by navigating to Profile > Keys > Copy API Key on the web application.
Note that you can use the fully hosted Dataframer solution or an on-prem deployed version (reach out to [email protected] for more details).
Databricks Secrets Setup
This notebook expects the following secrets to be stored in a Databricks secret scope.
End-users running this notebook need at least READ permission on the secret scope.
In this example, we use a scope named dataframer.
DATAFRAMER_API_KEY — DataFramer API key (from step 4)
DATABRICKS_HTTP_PATH — SQL warehouse HTTP path (for data access)
DATABRICKS_CLIENT_ID — Service principal client ID (for data access)
DATABRICKS_CLIENT_SECRET — Service principal secret (for data access)
DATABRICKS_SERVER_HOSTNAME — Databricks workspace hostname (for data access)
The Databricks secrets being stored in a secret scope above are used by the DatabricksConnector for SQL access (fetching sample data and loading generated data into tables). Model Serving authentication is handled automatically by DataFramer using the credentials configured in step 3.
Initialize the DataFramer client and the DatabricksConnector
In this step, we initialize the DataFramer client using an API key stored securely in Databricks Secrets under the dataframer scope. We also initialize the DatabricksConnector with dataframer scope to access/persist data from/into Unity Catalog.
For this demo, we use sample data available in the Databricks catalog, specifically the samples.bakehouse.media_customer_reviews table. To keep the example lightweight, we select only the top 25 rows, export them as a CSV file, and upload the file to DataFramer.
To use DataFramer, only a small sample of data is needed. To derive these samples from a table, we recommend creating a CSV file that contain the relevant rows from the table and supplying that as a Seed dataset to DataFramer.
# Convert the sampled Pandas DataFrame to an in-memory CSV filecsv_file = BytesIO( sample_reviews_df.to_csv(index=False).encode("utf-8"))csv_file.name = "media_customer_reviews_top_25.csv"# Upload the CSV to Dataframertry: dataset = client.dataframer.seed_datasets.create_with_files( name="media_customer_reviews_sample_jan27_26", description="Top 25 rows from Databricks samples.bakehouse.media_customer_reviews", dataset_type="SINGLE_FILE", files=[csv_file] ) print(f"Upload complete ✅\nDataset ID: {dataset.id}")except Exception as e: if "already exists" in str(e): print("Dataset already exists — using existing dataset ✅") dataset = next( d for d in client.dataframer.seed_datasets.list() if d.name == "media_customer_reviews_sample_jan27_26" ) print(f"Dataset ID: {dataset.id}") else: raisedataset_id = dataset.id
Step 3: Generate Specification via the analysis API
A specification (or “spec”) is a detailed description that captures the structure, patterns, and requirements of your data. Think of a spec as a blueprint for your data generation task. DataFramer automatically generates specifications by analyzing your seed data. This cell ensures a specification exists for the dataset by reusing an existing one or generating a new one.When using a databricks/ model, DataFramer automatically retrieves your company’s stored Databricks credentials (configured by your admin in Profile > Keys).
spec_name = f"spec_for_dataset_{dataset_id}"# --- Check for existing spec ---specs = client.dataframer.specs.list()existing_spec = next( (s for s in specs if s.name == spec_name), None)spec_id = Noneif existing_spec: spec_id = existing_spec.id print("Spec already exists — reusing existing spec ✅")else: spec = client.dataframer.specs.create( dataset_id=dataset_id, name=spec_name, spec_generation_model_name="databricks/databricks-claude-sonnet-4-5", extrapolate_values=True, generate_distributions=True, ) spec_id = spec.id print(f"Started specification generation:") print(f"Spec ID: {spec_id}") def spec_not_ready(result): return result.status not in ("SUCCEEDED", "FAILED") @retry(wait=wait_fixed(5), retry=retry_if_result(spec_not_ready), stop=stop_never) def poll_spec_status(client, spec_id): return client.dataframer.specs.retrieve(spec_id=spec_id) print("Polling for spec status (this may take a minute)...") spec_status = poll_spec_status(client, spec_id) if spec_status.status == "FAILED": raise RuntimeError(spec_status.error or "Unknown error") print(f"\nSpec generated successfully! ✅")print(f"spec_id: {spec_id}")
This cell retrieves the latest version of the generated specification and inspects key properties inferred from the dataset, such as data property variations.
# Get the spec (latest content_yaml is returned directly)spec = client.dataframer.specs.retrieve(spec_id=spec_id)# Parse the configuration YAMLconfig = yaml.safe_load(spec.content_yaml)spec_data = config.get("spec", config)print("\nData property variations:")for prop in spec_data.get("data_property_variations", []): print(f" • {prop['property_name']}: {len(prop['property_values'])} values")
This cell demonstrates how to programmatically update a given specification. To keep this demo simple, the update is applied only when the specification is newly created (i.e., when the latest version is 1). If the specification has already been updated, this step is skipped. In this step, we add a new data property called Review Detail Level with values 'Very brief', 'Brief', 'Moderate', 'Detailed' and expected distributions [15, 30, 35, 20].
# Get the spec with version historyspec = client.dataframer.specs.retrieve(spec_id=spec_id, include_versions=True)# To keep this demo simple, only update if this is a newly created spec (1 version)if spec.versions and len(spec.versions) > 1: print("ℹ️ Specification has already been updated — skipping this step")else: # Parse the current config current_config = yaml.safe_load(spec.content_yaml) spec_data = current_config.get('spec', current_config) # Ensure data_property_variations exists spec_data.setdefault('data_property_variations', []) # Add new data property variation new_property = { 'property_name': 'Review Detail Level', 'property_values': ['Very brief', 'Brief', 'Moderate', 'Detailed'], 'base_distributions': { 'Very brief': 15, 'Brief': 30, 'Moderate': 35, 'Detailed': 20 }, 'conditional_distributions': {} } spec_data['data_property_variations'].append(new_property) print(f"✓ Added new property: {new_property['property_name']}") if 'requirements' in spec_data: spec_data['requirements'] += ( "\n\nGenerated reviews should vary naturally in length and level of detail, " "while maintaining an informal customer review tone." ) print("✓ Updated requirements for review context") # Convert back to YAML new_content_yaml = yaml.dump( current_config, default_flow_style=False, sort_keys=False ) # Update the spec (creates a new version automatically) updated_spec = client.dataframer.specs.update( spec_id=spec_id, content_yaml=new_content_yaml ) print("✓ Spec updated successfully")
Once the spec is generated and finalized after any manual modifications, we will use this spec to generate synthetic data. Databricks Model Serving credentials are retrieved automatically from your company’s stored configuration.
# --- Start generation run ---run = client.dataframer.runs.create( spec_id=spec_id, generation_model="databricks/databricks-claude-sonnet-4-5", number_of_samples=3, ## Advanced configuration for outline generation outline_model="databricks/databricks-claude-sonnet-4-5", # revision_types=["coherence_flow", "consistency", "distinguishability", "conformance"], # filtering_types=["structural", "conformance"], # max_revision_cycles=2, # revision_model="databricks/databricks-claude-sonnet-4-5",)run_id = run.idprint("Started generation run")print(f"Run ID: {run_id}")def run_not_finished(result): return result.status not in ("SUCCEEDED", "FAILED")@retry(wait=wait_fixed(10), retry=retry_if_result(run_not_finished), stop=stop_never)def poll_run_status(client, run_id): return client.dataframer.runs.retrieve(run_id=run_id)print("Polling for run status (this may take a couple of minutes)...")run_status = poll_run_status(client, run_id)if run_status.status == "FAILED": raise RuntimeError("Generation failed")print(f"\nGeneration completed successfully!")print(f"Run ID: {run_id}")print(f"Samples completed: {run_status.samples_completed}")
While DataFramer evaluates each sample as it is generated, it also supports a post-generation evaluation. This API shows how to evaluate the generated dataset. Read the documentation for more details.
# --- Start evaluation ---print(f"Creating evaluation for run: {run_id}")evaluation = client.dataframer.evaluations.create( run_id=run_id, evaluation_model="databricks/databricks-claude-sonnet-4-5")evaluation_id = evaluation.idprint("\nEvaluation created")print(f"Evaluation ID: {evaluation_id}")print(f"Created at : {evaluation.created_at}")def eval_not_finished(result): return result.status not in ("SUCCEEDED", "FAILED")@retry(wait=wait_fixed(5), retry=retry_if_result(eval_not_finished), stop=stop_never)def poll_eval_status(client, evaluation_id): return client.dataframer.evaluations.retrieve(evaluation_id=evaluation_id)print("Polling for evaluation status...")eval_status = poll_eval_status(client, evaluation_id)if eval_status.status == "FAILED": print("\nEvaluation failed. ❌") if eval_status.error_message: print(f" Error: {eval_status.error_message}")else: print("\nEvaluation completed successfully! ✅")
This API allows you to download all the generated files with metadata as a compressed ZIP file.
print("📥 Downloading generated files with metadata as ZIP...")def download_not_ready(response): return not hasattr(response, 'download_url') or response.download_url is None@retry(wait=wait_fixed(2), retry=retry_if_result(download_not_ready), stop=stop_never, before_sleep=lambda rs: print(" ZIP generation in progress, waiting..."))def poll_download(client, run_id): return client.dataframer.runs.files.download_all(run_id=run_id)response = poll_download(client, run_id)# Download the ZIP from the presigned URLzip_content = requests.get(response.download_url).contentdownloaded_zip = BytesIO(zip_content)print(f"\n✅ Downloaded ZIP file ({response.size_bytes} bytes)")