Skip to content

Commit

Permalink
Merge branch 'feature-495/publish' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Nov 11, 2024
2 parents 82a7c83 + ae5001a commit 5469139
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 12 deletions.
8 changes: 7 additions & 1 deletion cmd/substreams/gui.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func init() {
guiCmd.Flags().String("network", "", "Specify the network to use for params and initialBlocks, overriding the 'network' field in the substreams package")
guiCmd.Flags().Bool("insecure", false, "Skip certificate validation on GRPC connection")
guiCmd.Flags().Bool("plaintext", false, "Establish GRPC connection in plaintext")
guiCmd.Flags().String("spkg-registry", "https://spkg.io", "Substreams package registry")
guiCmd.Flags().StringSliceP("header", "H", nil, "Additional headers to be sent in the substreams request")
guiCmd.Flags().StringP("start-block", "s", "", "Start block to stream from. If empty, will be replaced by initialBlock of the first module you are streaming. If negative, will be resolved by the server relative to the chain head")
guiCmd.Flags().StringP("cursor", "c", "", "Cursor to stream from. Leave blank for no cursor")
Expand All @@ -47,6 +48,9 @@ var guiCmd = &cobra.Command{
Stream module output from a given package on a remote endpoint. The manifest is optional as it will try to find a file named
'substreams.yaml' in current working directory if nothing entered. You may enter a directory that contains a 'substreams.yaml'
file in place of '<manifest_file>, or a link to a remote .spkg file, using urls gs://, http(s)://, ipfs://, etc.'.
You can also use substreams gui [email protected] to specify a specific version of the package. This will fetch it from
https://spkg.io/...
`),
RunE: runGui,
Args: cobra.RangeArgs(0, 2),
Expand Down Expand Up @@ -79,7 +83,9 @@ func runGui(cmd *cobra.Command, args []string) (err error) {
paramsStringMap[moduleName] = struct{}{}
}

readerOptions := []manifest.Option{}
readerOptions := []manifest.Option{
manifest.WithRegistryURL(sflags.MustGetString(cmd, "spkg-registry")),
}

if len(requestParams) != 0 {
params, err := manifest.ParseParams(requestParams)
Expand Down
12 changes: 6 additions & 6 deletions cmd/substreams/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ var initCmd = &cobra.Command{
var huhTheme *huh.Theme

func init() {
defaultEndpoint := "https://codegen.substreams.dev"
if newValue := os.Getenv("SUBSTREAMS_INIT_CODEGEN_ENDPOINT"); newValue != "" {
defaultEndpoint = newValue
}
initCmd.Flags().String("codegen-endpoint", defaultEndpoint, "Endpoint used to discover code generators")
initCmd.Flags().String("state-file", "./generator.json", "File to load/save the state of the code generator")
initCmd.Flags().Bool("force-download-cwd", false, "Force download at current dir")
rootCmd.AddCommand(initCmd)
Expand Down Expand Up @@ -111,7 +106,12 @@ func runSubstreamsInitE(cmd *cobra.Command, args []string) error {
connect.WithGRPC(),
}

initConvoURL := sflags.MustGetString(cmd, "codegen-endpoint")
codegenEndpoint := "https://codegen.substreams.dev"
if newValue := os.Getenv("SUBSTREAMS_CODEGEN_ENDPOINT"); newValue != "" {
codegenEndpoint = newValue
}

initConvoURL := codegenEndpoint
stateFile, stateFileFlagProvided := sflags.MustGetStringProvided(cmd, "state-file")
if !strings.HasSuffix(stateFile, ".json") {
return fmt.Errorf("state file must have a .json extension")
Expand Down
16 changes: 16 additions & 0 deletions cmd/substreams/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package main

import (
"github.com/spf13/cobra"
)

func init() {
rootCmd.AddCommand(publishCmd)
}

var publishCmd = &cobra.Command{
Use: "publish [github_release_url | https_spkg_path | local_spkg_path | local_substreams_path]",
Short: "Publish a package to the Substreams.dev registry. Alias for `substreams registry publish`",
Args: cobra.MaximumNArgs(1),
RunE: runRegistryPublish,
}
81 changes: 81 additions & 0 deletions cmd/substreams/registry-login.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"bufio"
"errors"
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
)

var registryLoginCmd = &cobra.Command{
Use: "login",
Short: "Login to the Substreams registry",
SilenceUsage: true,
RunE: runRegistryLoginE,
}

var registryTokenFilename = filepath.Join(os.Getenv("HOME"), ".config", "substreams", "registry-token")

func init() {
registryLoginCmd.Flags().String("registry", "https://substreams.dev", "Substreams registry URL")

registryCmd.AddCommand(registryLoginCmd)
}

func runRegistryLoginE(cmd *cobra.Command, args []string) error {
registryURL, err := cmd.Flags().GetString("registry")
if err != nil {
return fmt.Errorf("could not get registry URL: %w", err)
}

loginRegistryPage := fmt.Sprintf("%s/me", registryURL)

fmt.Printf("Paste the token found on %s below\n", loginRegistryPage)

scanner := bufio.NewScanner(os.Stdin)
var token string
for scanner.Scan() {
token = scanner.Text()
break
}

fmt.Println("")

isFileExists := checkFileExists(registryTokenFilename)
if isFileExists {
fmt.Println("Token already saved to registry-token")
fmt.Printf("Do you want to overwrite it? [y/N] ")
scanner.Scan()
if scanner.Text() == "y" {
err = writeRegistryToken(token)
if err != nil {
return fmt.Errorf("could not write token to registry: %w", err)
}
} else {
return nil
}

} else {
err = writeRegistryToken(token)
if err != nil {
return fmt.Errorf("could not write token to registry: %w", err)
}

}

fmt.Printf("Publish packages with SUBSTREAMS_REGISTRY_TOKEN=%s\n", token)
fmt.Printf("Token %s saved to registry-token\n", token)
return nil
}

func writeRegistryToken(token string) error {
return os.WriteFile(registryTokenFilename, []byte(token), 0644)
}

func checkFileExists(filePath string) bool {
_, err := os.Stat(filePath)
return !errors.Is(err, os.ErrNotExist)
}
193 changes: 193 additions & 0 deletions cmd/substreams/registry-publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package main

import (
"bytes"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"

"github.com/charmbracelet/huh"
"github.com/charmbracelet/lipgloss"
"github.com/spf13/cobra"
"github.com/streamingfast/substreams/manifest"
"github.com/tidwall/gjson"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

func init() {
registryCmd.AddCommand(registryPublish)
}

var registryPublish = &cobra.Command{
Use: "publish [github_release_url | https_spkg_path | local_spkg_path | local_substreams_path]",
Short: "Publish a package to the Substreams.dev registry",
Args: cobra.MaximumNArgs(1),
RunE: runRegistryPublish,
}

// FLOW:
// - The user get an API_KEY (registry token) on substreams.dev
// - Set API_KEY :
// - If the user doesn't have the API_KEY SET FOR REGISTRY, let's redirect him to `substreams.dev` and grab a registry token
// - If it has one already, use it
// - SET UP Publish request :
// - If the user does the command on a manifest, pack it first
// - If the user does provide an spkg, use it as is
// - If the user does provide a github release url, download the spkg and pack it

func runRegistryPublish(cmd *cobra.Command, args []string) error {
apiEndpoint := "https://substreams.dev"
if newValue := os.Getenv("SUBSTREAMS_REGISTRY_ENDPOINT"); newValue != "" {
apiEndpoint = newValue
}

var apiKey string
registryTokenBytes, err := os.ReadFile(registryTokenFilename)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("failed to read registry token: %w", err)
}
}

substreamsRegistryToken := os.Getenv("SUBSTREAMS_REGISTRY_TOKEN")
apiKey = string(registryTokenBytes)
if apiKey == "" {
if substreamsRegistryToken != "" {
apiKey = substreamsRegistryToken
} else {
linkStyle := lipgloss.NewStyle().Foreground(lipgloss.Color("12"))
//Let's redirect the user to substreams.dev to get a registry token and let them paste in the terminal
fmt.Println("`SUBSTREAMS_REGISTRY_TOKEN` env variable is missing...")
fmt.Println("You can get a token using the following link: ")
fmt.Println()
fmt.Println(" " + linkStyle.Render(fmt.Sprintf("%s/me", apiEndpoint)))
fmt.Println("")

var token string
form := huh.NewForm(
huh.NewGroup(
huh.NewInput().
EchoMode(huh.EchoModePassword).
Title("After retrieving your registry token, paste it here:").
Inline(true).
Value(&token).
Validate(func(s string) error {
if s == "" {
return errors.New("token cannot be empty")
}
return nil
}),
),
)

if err := form.Run(); err != nil {
return fmt.Errorf("error running form: %w", err)
}

// Set the API_KEY using the input token
apiKey = token
}
}

zlog.Debug("loaded api key", zap.String("api_key", apiKey))

var manifestPath string
switch len(args) {
case 0:
manifestPath, err = resolveManifestFile("")
if err != nil {
return fmt.Errorf("resolving manifest: %w", err)
}
case 1:
manifestPath = args[0]
}

spkgRegistry := "https://spkg.io"
if newValue := os.Getenv("SUBSTREAMS_DOWNLOAD_ENDPOINT"); newValue != "" {
apiEndpoint = newValue
}

readerOptions := []manifest.Option{
manifest.WithRegistryURL(spkgRegistry),
}

manifestReader, err := manifest.NewReader(manifestPath, readerOptions...)
if err != nil {
return fmt.Errorf("manifest reader: %w", err)
}

pkgBundle, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

spkg := pkgBundle.Package

var requestBody bytes.Buffer
writer := multipart.NewWriter(&requestBody)

// Create form file to get it read from the `substreams.dev` server

formFile, err := writer.CreateFormFile("file", "substreams_package")
if err != nil {
return fmt.Errorf("failed to create form file: %w", err)
}

data, err := proto.Marshal(spkg)
if err != nil {
return fmt.Errorf("marshalling substreams package: %w", err)
}

_, err = formFile.Write(data)
if err != nil {
return fmt.Errorf("failed to write file content: %w", err)
}

err = writer.Close()
if err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}

publishPackageEndpoint := fmt.Sprintf("%s/sf.substreams.dev.Api/PublishPackage", apiEndpoint)

zlog.Debug("publishing package", zap.String("registry_url", publishPackageEndpoint))

req, err := http.NewRequest("POST", publishPackageEndpoint, &requestBody)
if err != nil {
return err
}
req.Header.Set("Content-Type", writer.FormDataContentType())
req.Header.Set("X-Api-Key", apiKey)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read body")
}

if resp.StatusCode != http.StatusOK {
linkStyle := lipgloss.NewStyle().Foreground(lipgloss.Color("9"))
fmt.Println("")
fmt.Println(linkStyle.Render("Failed to publish package") + "\n")
fmt.Println("Reason:" + string(b))
return nil
}

spkgUrlPath := gjson.Get(string(b), "spkgLink").String()

fmt.Println("Package published successfully")
if spkgUrlPath != "" {
fmt.Printf("Start streaming your package with: `substreams gui %s`\n", spkgUrlPath)
}

return nil
}
19 changes: 19 additions & 0 deletions cmd/substreams/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"github.com/spf13/cobra"
"github.com/streamingfast/cli"
)

var registryCmd = &cobra.Command{
Use: "registry",
Short: "Manage substreams registry",
Long: cli.Dedent(`
Login, publish and list packages from the Substreams registry
`),
SilenceUsage: true,
}

func init() {
rootCmd.AddCommand(registryCmd)
}
5 changes: 5 additions & 0 deletions cmd/substreams/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func init() {
runCmd.Flags().Bool("final-blocks-only", false, "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD")
runCmd.Flags().Bool("insecure", false, "Skip certificate validation on GRPC connection")
runCmd.Flags().Bool("plaintext", false, "Establish GRPC connection in plaintext")
runCmd.Flags().String("spkg-registry", "https://spkg.io", "Substreams package registry")
runCmd.Flags().StringP("output", "o", "", "Output mode, one of: [ui, json, jsonl, clock] Defaults to 'ui' when in a TTY is present, and 'json' otherwise")
runCmd.Flags().StringSlice("debug-modules-initial-snapshot", nil, "List of 'store' modules from which to print the initial data snapshot (Unavailable in Production Mode)")
runCmd.Flags().StringSlice("debug-modules-output", nil, "List of modules from which to print outputs, deltas and logs (Unavailable in Production Mode)")
Expand All @@ -49,6 +50,9 @@ var runCmd = &cobra.Command{
Stream module outputs from a given package on a remote endpoint. The manifest is optional as it will try to find a file named
'substreams.yaml' in current working directory if nothing entered. You may enter a directory that contains a 'substreams.yaml'
'substreams.yaml' file in place of '<manifest_file>', or a link to a remote .spkg file, using urls gs://, http(s)://, ipfs://, etc.'.
You can also use substreams run [email protected] to specify a specific version of the package. This will fetch it from
https://spkg.io/...
`),
RunE: runRun,
Args: cobra.RangeArgs(1, 2),
Expand Down Expand Up @@ -84,6 +88,7 @@ func runRun(cmd *cobra.Command, args []string) error {
manifest.WithOverrideOutputModule(outputModule),
manifest.WithOverrideNetwork(network),
manifest.WithParams(params),
manifest.WithRegistryURL(sflags.MustGetString(cmd, "spkg-registry")),
}
if sflags.MustGetBool(cmd, "skip-package-validation") {
readerOptions = append(readerOptions, manifest.SkipPackageValidationReader())
Expand Down
Loading

0 comments on commit 5469139

Please sign in to comment.