Skip to content

Commit

Permalink
SNOW-983635 Allow ZSTD compression algorithm (#654)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lthiede authored Feb 1, 2024
1 parent 2659e4d commit 84420bf
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The Snowflake Ingest Service SDK depends on the following libraries:

* snowflake-jdbc (3.13.30 to 3.13.33)
* slf4j-api
* com.github.luben:zstd-jni (1.5.0-1)

These dependencies will be fetched automatically by build systems like Maven or Gradle. If you don't build your project
using a build system, please make sure these dependencies are on the classpath.
Expand Down
13 changes: 7 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@

<!-- All of our needed dependencies -->
<dependencies>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -389,7 +388,6 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -473,6 +471,12 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.0-1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down Expand Up @@ -954,6 +958,7 @@
<excludes>
<exclude>net.snowflake:snowflake-jdbc</exclude>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>com.github.luben:zstd-jni</exclude>
</excludes>
</artifactSet>
<relocations>
Expand Down Expand Up @@ -1034,10 +1039,6 @@
<pattern>com.ctc</pattern>
<shadedPattern>${shadeBase}.com.ctc</shadedPattern>
</relocation>
<relocation>
<pattern>com.github.luben</pattern>
<shadedPattern>${shadeBase}.com.github.luben</shadedPattern>
</relocation>
<relocation>
<pattern>com.thoughtworks</pattern>
<shadedPattern>${shadeBase}.com.thoughtworks</shadedPattern>
Expand Down
6 changes: 6 additions & 0 deletions public_pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,11 @@
<version>1.7.36</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.0-1</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
3 changes: 2 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public static BdecVersion fromInt(int val) {
* CompressionCodecName, but we want to control and allow only specific values of that.
*/
public enum BdecParquetCompression {
GZIP;
GZIP,
ZSTD;

public CompressionCodecName getCompressionCodec() {
return CompressionCodecName.fromConf(this.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,17 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() {
Constants.BdecParquetCompression.GZIP,
parameterProvider.getBdecParquetCompressionAlgorithm());
});
List<String> zstdValues = Arrays.asList("ZSTD", "zstd", "Zstd", "zStd");
zstdValues.forEach(
v -> {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(
Constants.BdecParquetCompression.ZSTD,
parameterProvider.getBdecParquetCompressionAlgorithm());
});
}

@Test
Expand All @@ -322,7 +333,7 @@ public void testInvalidCompressionAlgorithm() {
} catch (IllegalArgumentException e) {
Assert.assertEquals(
"Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = 'invalid_comp', allowed values are"
+ " [GZIP]",
+ " [GZIP, ZSTD]",
e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static net.snowflake.ingest.TestUtils.verifyTableRowCount;
import static net.snowflake.ingest.utils.Constants.ROLE;
import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -17,8 +18,13 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

/** Ingest large amount of rows. */
@RunWith(Parameterized.class)
public class StreamingIngestBigFilesIT {
private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA";
Expand All @@ -29,6 +35,14 @@ public class StreamingIngestBigFilesIT {
private Connection jdbcConnection;
private String testDb;

@Parameters(name = "{index}: {0}")
public static Object[] compressionAlgorithms() {
return new Object[] {"GZIP", "ZSTD"};
}

@Parameter
public String compressionAlgorithm;

@Before
public void beforeAll() throws Exception {
testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4);
Expand All @@ -51,6 +65,7 @@ public void beforeAll() throws Exception {
if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) {
prop.setProperty(ROLE, "ACCOUNTADMIN");
}
prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
client =
(SnowflakeStreamingIngestClientInternal<?>)
SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
import static net.snowflake.ingest.utils.Constants.COMPRESS_BLOB_TWICE;
import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT;
import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

Expand Down Expand Up @@ -45,10 +46,15 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/** Example streaming ingest sdk integration test */
@RunWith(Parameterized.class)
public class StreamingIngestIT {
private static final String TEST_TABLE = "STREAMING_INGEST_TEST_TABLE";
private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
Expand All @@ -64,6 +70,14 @@ public class StreamingIngestIT {
private Connection jdbcConnection;
private String testDb;

@Parameters(name = "{index}: {0}")
public static Object[] compressionAlgorithms() {
return new Object[] {"GZIP", "ZSTD"};
}

@Parameter
public String compressionAlgorithm;

@Before
public void beforeAll() throws Exception {
testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4);
Expand All @@ -90,7 +104,7 @@ public void beforeAll() throws Exception {

// Test without role param
prop = TestUtils.getProperties(Constants.BdecVersion.THREE, true);

prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
client =
(SnowflakeStreamingIngestClientInternal<?>)
SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.Constants;
import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
import net.snowflake.ingest.utils.SFException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public abstract class AbstractDataTypeTest {
private static final String SOURCE_COLUMN_NAME = "source";
private static final String VALUE_COLUMN_NAME = "value";
Expand Down Expand Up @@ -56,6 +62,14 @@ public abstract class AbstractDataTypeTest {
private SnowflakeStreamingIngestClient client;
private static final ObjectMapper objectMapper = new ObjectMapper();

@Parameters(name = "{index}: {0}")
public static Object[] compressionAlgorithms() {
return new Object[] {"GZIP", "ZSTD"};
}

@Parameter
public String compressionAlgorithm;

@Before
public void before() throws Exception {
databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier());
Expand All @@ -70,6 +84,7 @@ public void before() throws Exception {
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
props.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
client = SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(props).build();
}

Expand Down

0 comments on commit 84420bf

Please sign in to comment.