Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute_ctl: Streamline and Pipeline startup SQL #9717

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
48 changes: 28 additions & 20 deletions compute_tools/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,46 @@
use compute_api::{
responses::CatalogObjects,
spec::{Database, Role},
};
use compute_api::responses::CatalogObjects;
use futures::Stream;
use postgres::{Client, NoTls};
use postgres::NoTls;
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
task,
spawn,
};
use tokio_postgres::connect;
use tokio_stream::{self as stream, StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::warn;

use crate::{
compute::ComputeNode,
pg_helpers::{get_existing_dbs, get_existing_roles},
};
use crate::compute::ComputeNode;
use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async};

pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<CatalogObjects> {
let connstr = compute.connstr.clone();
task::spawn_blocking(move || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
let roles: Vec<Role>;
{
let mut xact = client.transaction()?;
roles = get_existing_roles(&mut xact)?;

let (mut client, connection): (tokio_postgres::Client, _) =
connect(connstr.as_str(), NoTls).await?;

spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
tristan957 marked this conversation as resolved.
Show resolved Hide resolved
let databases: Vec<Database> = get_existing_dbs(&mut client)?.values().cloned().collect();
});

let roles = {
let xact = client.transaction().await?;
let roles = get_existing_roles_async(&xact).await?;
xact.commit().await?;

roles
};

let databases = get_existing_dbs_async(&client)
.await?
.into_values()
.collect();

Ok(CatalogObjects { roles, databases })
})
.await?
Ok(CatalogObjects { roles, databases })
}

#[derive(Debug, thiserror::Error)]
Expand Down
28 changes: 0 additions & 28 deletions compute_tools/src/checker.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,9 @@
use anyhow::{anyhow, Ok, Result};
use postgres::Client;
use tokio_postgres::NoTls;
use tracing::{error, instrument, warn};

use crate::compute::ComputeNode;

/// Create a special service table for availability checks
/// only if it does not exist already.
pub fn create_availability_check_data(client: &mut Client) -> Result<()> {
let query = "
DO $$
BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename = 'health_check'
)
THEN
CREATE TABLE health_check (
id serial primary key,
updated_at timestamptz default now()
);
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = now();
END IF;
END
$$;";
client.execute(query, &[])?;

Ok(())
}

/// Update timestamp in a row in a special service table to check
/// that we can actually write some data in this particular timeline.
#[instrument(skip_all)]
Expand Down
Loading
Loading