Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Fetch Module

Path: src/fetch/

Handles HTTP communication with the Spark REST API via the Databricks driver proxy.

Files

FilePurpose
client.rsSparkHttpClient and FetchError
types.rsSpark API response types (serde)
spark.rsEndpoint methods on SparkHttpClient
databricks.rsDatabricksClient for Databricks REST API, DBFS, Spark UI, and History Server
orchestrator.rspoll_once, assemble_data_payload, compute_health_summary
poller.rsBackground polling loop and historical fallback chain
eventlog/Event log parsing: DBFS download, gzip decompression, SparkEvent serde

client.rs — SparkHttpClient

SparkHttpClient

#![allow(unused)]
fn main() {
pub struct SparkHttpClient {
    client: reqwest::Client,
    base_url: String,
    token: String,
}
}
MethodSignatureDescription
new(base_url, token) -> SelfCreates a new client
base_url&self -> &strReturns the base URL
get&self, path: &str -> Result<T, FetchError>Generic GET request with Bearer auth and JSON deserialization

FetchError

Error enum with user-friendly messages for common HTTP errors:

VariantStatusMessage
Unauthorized401Token expired or invalid
Forbidden403Insufficient permissions
NotFound404Spark UI not available / app may have ended
ServiceUnavailable503Cluster not reachable
HttpErrorotherGeneric HTTP error with status and body
DeserializeJSON deserialization failure
RequestNetwork-level request failure
NoApplicationsNo Spark applications found

types.rs — API Types

Application & Job Types

TypeKey Fields
SparkApplicationid, name
SparkJobjob_id, name, status, submission_time, completion_time, stage_ids, task counts
JobStatusSucceeded, Running, Failed, Unknown

Stage Types

TypeKey Fields
SparkStagestage_id, attempt_id, status, num_tasks, executor_run_time, I/O bytes, spill bytes
StageStatusActive, Complete, Pending, Failed, Skipped

SQL Types

TypeKey Fields
SparkSqlExecutionid, status, description, plan_description, duration, job ID lists

Task Types

TypeKey Fields
SparkTasktask_id, stage_id, executor_id, host, status, duration, I/O bytes, spill bytes, peak_execution_memory
RawSparkTaskRaw API format with nested task_metrics (flattened into SparkTask via custom deserializer)

Executor Types

TypeKey Fields
SparkExecutorid, total_cores, max_memory, is_active
ClusterResourcestotal_executor_memory, total_executor_cores, num_executors

spark.rs — Endpoint Methods

Methods on SparkHttpClient:

MethodPathReturns
discover_app_id/applicationsString (first application ID)
get_jobs/applications/{id}/jobsVec<SparkJob>
get_stages/applications/{id}/stagesVec<SparkStage>
get_sql_executions/applications/{id}/sqlVec<SparkSqlExecution>
get_task_list/applications/{id}/stages/{sid}/{attempt}/taskListVec<SparkTask>
get_executors/applications/{id}/executorsVec<SparkExecutor>

databricks.rs — DatabricksClient

Thin client for Databricks REST API /api/2.0/* endpoints and workspace-level requests.

DatabricksClient

#![allow(unused)]
fn main() {
pub struct DatabricksClient {
    client: reqwest::Client,
    base_url: String,        // e.g. https://{host}/api/2.0
    workspace_root: String,  // e.g. https://{host}
    token: String,
    sparkui_cookie: Option<String>,
}
}

SparkuiProbeResult

Tri-state result from probing a Spark UI endpoint:

VariantFieldsMeaning
Readybase_url, app_idSpark UI is serving JSON data
Loadingbase_urlSpark UI is authenticated but still downloading/parsing event logs
NotFoundNo accessible Spark UI endpoint found

Key Methods

MethodDescription
get_cluster_infoFetch cluster state, log config, and spark_context_id
try_sparkui_endpointProbe Historical Spark UI REST API (tries Bearer + cookie auth, workspace + dataplane domains)
probe_sparkui_urlRe-probe a single URL for retry after Loading state
fetch_sparkui_dataFetch all data (jobs, stages, SQL, tasks) from a ready Spark UI endpoint
discover_history_serverProbe known Spark History Server proxy URL patterns
fetch_history_dataFetch all data from History Server
dbfs_list / dbfs_read_fullDBFS file operations
find_default_event_logsScan well-known DBFS paths for event log files

Warm-up Detection

The is_loading_page() helper detects HTML loading pages returned during Spark UI warm-up by checking for patterns like <title>Loading</title>, loading spark ui, please wait, or generic HTML responses.

eventlog/ — Event Log Parsing

FilePurpose
events.rsSparkEvent serde types for Spark event log JSON lines
parser.rsEventLogParser — converts raw events into jobs, stages, SQL, tasks
loader.rsDBFS download + gzip decompression pipeline

The load_event_log() function orchestrates: discover log path → download from DBFS → decompress gzip → parse JSON lines → return structured data.

orchestrator.rs — Data Polling & Assembly

poll_once

Fetches all data and runs analysis in a single poll cycle:

  1. Fetch jobs, stages, SQL executions, and executors concurrently (4-way tokio::join!)
  2. Aggregate active executors into ClusterResources (total memory, cores, executor count)
  3. Build cross-reference maps (job↔SQL, stage↔job)
  4. Build ranked jobs (sorted by duration, running first)
  5. Create a SuspectContext from the cross-reference maps
  6. Run 10 stage-level detectors via function pointer table (detect_slow_stages, detect_spill, detect_cpu_efficiency, detect_record_explosion, detect_task_failures, detect_memory_pressure, detect_partition_count, detect_broadcast_join, detect_python_udf, detect_cache_opportunity)
  7. Fetch task lists for top ~15 stages (selected by multiple heuristics)
  8. Run skew detection on fetched tasks (duration, data-size, record-count, executor hotspot)
  9. Aggregate and sort suspects (severity first, then estimated_savings_ms descending)
  10. Build stage_sql_hints — maps stage_id to top SQL plan operations
  11. Compute critical_stages — the longest wall-clock stage per job (critical path)
  12. Compute HealthSummary (job/IO counts, critical/warning counts, top issues)
  13. Return DataPayload

compute_health_summary

#![allow(unused)]
fn main() {
fn compute_health_summary(
    jobs: &[RankedJob],
    stages: &[SparkStage],
    suspects: &[Suspect],
) -> HealthSummary
}

Aggregates job counts, total I/O bytes, and suspect severity counts into a HealthSummary for the summary bar widget.

poller.rs — Background Poller

run_poller

#![allow(unused)]
fn main() {
pub async fn run_poller(
    client: Arc<SparkHttpClient>,
    databricks: Arc<DatabricksClient>,
    config: Arc<Config>,
    tx: mpsc::UnboundedSender<Action>,
    poll_interval: Duration,
)
}
  1. Discovers the application ID (or detects cluster unreachable → historical fallback)
  2. Enters a live poll loop: poll_once (from orchestrator.rs) → send result via channel → sleep
  3. On connection loss during polling, falls back to historical data via try_load_historical

Historical Fallback Chain

When the cluster is unreachable, try_load_historical attempts these strategies in order:

StrategyFunctionCondition
0 — Spark UItry_sparkuiRequires spark_context_id; retries with backoff if UI is loading
1 — History Servertry_history_serverProbes known proxy URL patterns
2 — DBFS event logstry_dbfs_event_logsUses cluster_log_conf or --event-log-path
3 — Default pathsfind_default_event_logsScans well-known DBFS directories

The Spark UI strategy includes warm-up retry: if the endpoint returns an HTML loading page, it retries with backoff delays defined in SPARKUI_RETRY_DELAYS (3, 5, 10, 15, 20 seconds — ~53s total).

DataPayload

#![allow(unused)]
fn main() {
pub struct DataPayload {
    pub app_id: String,
    pub jobs: Vec<RankedJob>,
    pub stages: Vec<SparkStage>,
    pub sql_executions: Vec<SparkSqlExecution>,
    pub suspects: Vec<Suspect>,
    pub stage_tasks: Arc<HashMap<i64, Vec<SparkTask>>>,
    pub summary: HealthSummary,
    pub cluster_resources: ClusterResources,
    pub stage_sql_hints: Arc<HashMap<i64, String>>,
    pub critical_stages: Arc<HashSet<i64>>,
    pub last_updated: String,
    pub data_source: DataSourceMode,
    pub data_source_detail: Option<String>,
}
}

Note: DataPayload is defined in src/tui/mod.rs and contains all data needed to render the TUI.

Key fields:

  • stage_sql_hints — maps stage_id → String with top SQL plan operations (e.g., “HashAggregate → Exchange → Scan parquet”), pre-computed for display in stage detail headers
  • critical_stages — set of stage IDs that represent the critical path (longest wall-clock stage per job), used for “CP” annotations in the job detail view
  • data_sourceDataSourceMode::Live or DataSourceMode::Historical, displayed in the status line
  • data_source_detail — optional label like "Spark UI", "history server", or "event logs", shown alongside the HISTORICAL badge