Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large queries and intermittent internet connectivity cause truncated dataset to be returned instead of error #93

Open
vlastahajek opened this issue Jul 31, 2024 · 0 comments
Assignees
Labels
bug Something isn't working

Comments

@vlastahajek
Copy link
Contributor

vlastahajek commented Jul 31, 2024

This a copy of the issue originally reported in Go v2 client

Specifications

  • Client Version: 0.5.0
  • InfluxDB Version: 3.0 Serverless
  • Apache Arrow Go Version: 13.0.0

Code sample to reproduce problem

  1. Run the code below, substituting in bucket credentials for a bucket containing a large dataset, and use a query that will take a few seconds to run.
  2. After a few seconds, before the query completes, switch off internet connectivity.
package main

import (
	"context"
	"encoding/csv"
	"fmt"
	"log"
	"os"
	"strings"
	"time"

	influx "github.com/InfluxCommunity/influxdb3-go/influxdb3"
	"github.com/apache/arrow/go/v13/arrow"
)

func main() {
	var database string
	var token string
	var query string
	var language string
	var resolution string
	currentTime := time.Now()

	database = "databaseID"
	token = "databaseReadToken"
	query = `SELECT * FROM "queryTest" WHERE time >= 1708819200000ms and time <= 1709023793000ms ORDER BY time DESC`
	language = "influxql"
	resolution = "ms"

	url := "databaseURL"

	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influx.New(influx.ClientConfig{
		Host:     url,
		Token:    token,
		Database: database,
	})
	if err != nil {
		log.Fatal(err)
	}

	// Close client at the end and escalate error if present
	defer func(client *influx.Client) {
		err := client.Close()
		if err != nil {
			log.Fatal(err)
		}
	}(client)

	queryType = influx.InfluxQL

	options := influx.QueryOptions{QueryType: queryType}

	iterator, err := client.QueryWithOptions(context.Background(), &options, query)
	if err != nil {
		log.Fatal(err)
	}

	// Write data to file section
	var headerWritten = false
	dateTimeString := currentTime.Format("2006_01_02-15_04_05") // Example format: YYYY-MM-DD_HH-MM-SS

	csvFilePath := "query_" + dateTimeString + ".csv"
	file, err := os.Create(csvFilePath)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	writer := csv.NewWriter(file)
	defer writer.Flush()
	var header []string

	for iterator.Next() {
		// Iterate over query data row by row
		mapData := iterator.Value()

		if !headerWritten {
			for key := range mapData {
				header = append(header, key)
			}
			if err := writer.Write(header); err != nil {
				fmt.Println(csvFilePath)
				log.Fatal(err)
			}
			headerWritten = true
		}

		// Create an empty slice of empty strings with the same length as the header
		var row []string
		for i := 0; i < len(header); i++ {
			row = append(row, "")
		}

		// Fill in the row slice with values based on the header
		for colIndex, col := range header {
			// Retrieve the value from mapData if it exists
			if value, ok := mapData[col]; ok {
				if col == "time" {
					switch resolution {
					case "ms":
						value = value.(arrow.Timestamp) / 1e6
					case "us":
						value = value.(arrow.Timestamp) / 1e3
					}
				}
				if value == nil {
					value = ""
				}
				row[colIndex] = fmt.Sprintf("%v", value)
			}
		}

		err := writer.Write(row)
		if err != nil {
			fmt.Println(csvFilePath)
			log.Fatal("Failed to write to CSV file, please re-run query")
		}

	}

	writer.Flush() // Ensure data is flushed before the end in normal operation
	fmt.Println(csvFilePath)

} 

Query used and screenshot of final field of CSV used to replicate bug:

SELECT * FROM "queryTest" WHERE time >= 1708819200000ms and time <= 1709023793000ms ORDER BY time DESC

image

Expected behavior

The script should produce a timeout error, or equivalent to inform the user that the connection was interrupted and the query wasn't completed. It should be similar to the behaviour from the Python InfluxDB3 library, which returns the error stack:

…
File "pyarrow\_flight.pyx", line 1081, in pyarrow._flight.FlightStreamReader.read_all
File "pyarrow\_flight.pyx", line 55, in pyarrow._flight.check_flight_status
Pyarrow._flight.FlightInternalError: Flight returned internal error, with message: unexpected EOF.
…

Actual behavior

The script completes without any errors, and the CSV file should be in the script folder. Check the last timestamp in the CSV, and observe that it does not match the last requested timestamp from the query.

Note that this also occurs when attempting to query a very large amount of data (~500MB). The function hangs for a while before eventually returning, but on testing only a partial dataset was returned about 30% of the time.

Additional info

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants