Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataHub Connector #350

Open
wants to merge 13 commits into
base: v2
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected void init() {}
}
final ReadFeaturesProxyWrapper rdRequest = RequestHelper.readFeaturesByIdsRequest(spaceId, featureIds)
.withReadRequestType(ReadRequestType.GET_BY_IDS)
.withQueryParameters(Map.of(FEATURE_IDS, featureIds));
.addQueryParameter(FEATURE_IDS, featureIds);

// Forward request to NH Space Storage reader instance
try (Result result = executeReadRequestFromSpaceStorage(rdRequest)) {
Expand All @@ -179,7 +179,7 @@ protected void init() {}

final ReadFeatures rdRequest = RequestHelper.readFeaturesByIdRequest(spaceId, featureId)
.withReadRequestType(ReadRequestType.GET_BY_ID)
.withQueryParameters(Map.of(FEATURE_ID, featureId));
.addQueryParameter(FEATURE_ID, featureId);

// Forward request to NH Space Storage reader instance
try (Result result = executeReadRequestFromSpaceStorage(rdRequest)) {
Expand Down Expand Up @@ -443,7 +443,7 @@ protected void init() {}
// Forward Read request to NHSpaceStorage instance
final ReadFeatures rdRequest = RequestHelper.readFeaturesByIdRequest(refSpaceId, refFeatureId)
.withReadRequestType(ReadRequestType.GET_BY_ID)
.withQueryParameters(Map.of(FEATURE_ID, refFeatureId));
.addQueryParameter(FEATURE_ID, refFeatureId);
try (final Result result = executeReadRequestFromSpaceStorage(rdRequest)) {
if (result instanceof SuccessResult) {
feature = ResultHelper.readFeatureFromResult(result, XyzFeature.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private XyzResponse attemptFeaturesPatching(
// Extract the version of features in storage
final ReadFeatures rdRequest = RequestHelper.readFeaturesByIdsRequest(spaceId, featureIds)
.withReadRequestType(ReadFeaturesProxyWrapper.ReadRequestType.GET_BY_IDS)
.withQueryParameters(Map.of(FEATURE_IDS, featureIds));
.addQueryParameter(FEATURE_IDS, featureIds);
try (Result result = executeReadRequestFromSpaceStorage(rdRequest)) {
if (result == null) {
return returnError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ public <T> T getQueryParameter(String key) throws ClassCastException {
return (T) queryParameters.get(key);
}

/**
* @param parameters will replace existing query parameters
*/
public ReadFeaturesProxyWrapper withQueryParameters(Map<String, Object> parameters) {
this.queryParameters = parameters;
return this;
}

public ReadFeaturesProxyWrapper addQueryParameter(String key, Object parameter) {
queryParameters.put(key, parameter);
return this;
}

@Override
public ReadFeaturesProxyWrapper shallowClone() {
ReadFeaturesProxyWrapper clone = new ReadFeaturesProxyWrapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.here.naksha.lib.core.models.naksha.Storage;
import com.here.naksha.lib.core.storage.IReadSession;
import com.here.naksha.lib.core.storage.IStorage;
import com.here.naksha.lib.core.storage.IWriteSession;
import com.here.naksha.lib.core.util.json.JsonSerializable;
import com.here.naksha.storage.http.cache.RequestSenderCache;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -57,6 +58,11 @@ public HttpStorage(@NotNull Storage storage) {
return new HttpStorageReadSession(context, useMaster, requestSender, properties.getProtocol());
}

@Override
public @NotNull IWriteSession newWriteSession(@Nullable NakshaContext context, boolean useMaster) {
return new HttpStorageWriteSession(context, requestSender, properties.getProtocol());
}

@Override
public void initStorage() {
log.debug("HttpStorage.initStorage called");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (C) 2017-2024 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/
package com.here.naksha.storage.http;

import static com.here.naksha.storage.http.HttpInterface.dataHubConnector;
import static com.here.naksha.storage.http.HttpInterface.ffwAdapter;

import com.here.naksha.lib.core.NakshaContext;
import com.here.naksha.lib.core.exceptions.StorageLockException;
import com.here.naksha.lib.core.models.XyzError;
import com.here.naksha.lib.core.models.storage.*;
import com.here.naksha.lib.core.storage.IStorageLock;
import com.here.naksha.lib.core.storage.IWriteSession;
import com.here.naksha.storage.http.connector.ConnectorInterfaceWriteExecute;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpStorageWriteSession implements IWriteSession {
adamczyk-HERE marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger log = LoggerFactory.getLogger(HttpStorageWriteSession.class);

private final NakshaContext context;
private final RequestSender requestSender;
private final HttpInterface httpInterface;

public HttpStorageWriteSession(NakshaContext context, RequestSender requestSender, HttpInterface httpInterface) {

this.context = context;
this.requestSender = requestSender;
this.httpInterface = httpInterface;
}

@Override
public @NotNull Result execute(@NotNull WriteRequest<?, ?, ?> writeRequest) {
try {
return switch (httpInterface) {
case ffwAdapter -> throw new UnsupportedOperationException("Writing not supported for ffw adapter");
adamczyk-HERE marked this conversation as resolved.
Show resolved Hide resolved
case dataHubConnector -> ConnectorInterfaceWriteExecute.execute(
context, (WriteXyzFeatures) writeRequest, requestSender);
};
} catch (ConnectorInterfaceWriteExecute.ConflictException e) {
return new ErrorResult(XyzError.CONFLICT, e.getMessage(), e);
} catch (Exception e) {
log.warn("We got exception while executing Write request.", e);
return new ErrorResult(XyzError.EXCEPTION, e.getMessage(), e);
}
}

@Override
public @NotNull IStorageLock lockFeature(
@NotNull String collectionId, @NotNull String featureId, long timeout, @NotNull TimeUnit timeUnit)
throws StorageLockException {
return null;
}

@Override
public @NotNull IStorageLock lockStorage(@NotNull String lockId, long timeout, @NotNull TimeUnit timeUnit)
throws StorageLockException {
return null;
}

@Override
public void commit(boolean autoCloseCursors) {}

@Override
public void rollback(boolean autoCloseCursors) {}

@Override
public void close(boolean autoCloseCursors) {}

@Override
public boolean isMasterConnect() {
return false;
}

@Override
public @NotNull NakshaContext getNakshaContext() {
return null;
}

@Override
public int getFetchSize() {
return 0;
}

@Override
public void setFetchSize(int size) {}

@Override
public long getStatementTimeout(@NotNull TimeUnit timeUnit) {
return 0;
}

@Override
public void setStatementTimeout(long timeout, @NotNull TimeUnit timeUnit) {}

@Override
public long getLockTimeout(@NotNull TimeUnit timeUnit) {
return 0;
}

@Override
public void setLockTimeout(long timeout, @NotNull TimeUnit timeUnit) {}

@Override
public @NotNull Result execute(@NotNull ReadRequest<?> readRequest) {
return null;
}

@Override
public @NotNull Result process(@NotNull Notification<?> notification) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.here.naksha.lib.core.models.Typed;
import com.here.naksha.lib.core.models.XyzError;
import com.here.naksha.lib.core.models.geojson.implementation.XyzFeature;
import com.here.naksha.lib.core.models.geojson.implementation.XyzFeatureCollection;
import com.here.naksha.lib.core.models.payload.responses.ErrorResponse;
import com.here.naksha.lib.core.models.storage.*;
import com.here.naksha.lib.core.util.json.JsonSerializable;
Expand All @@ -44,34 +45,94 @@
*/
public class PrepareResult {

public static Result prepareResult(List<XyzFeature> featureList) {
return createHttpResultFromFeatureList(featureList);
}

public static <T extends Typed> Result prepareResult(
public static <T extends Typed> Result prepareReadResult(
HttpResponse<byte[]> httpResponse,
Class<T> httpResponseType,
Class<T> httpResponseBodyType,
Function<T, List<XyzFeature>> typedResponseToFeatureList) {

Function<String, HttpSuccessResult<XyzFeature, XyzFeatureCodec>> jsonBodyToHttpSuccessResult = jsonBody -> {
T typedBody = JsonSerializable.deserialize(jsonBody, httpResponseBodyType);
List<XyzFeature> featuresList = typedResponseToFeatureList.apply(typedBody);
return prepareReadResult(featuresList);
};

return prepareResult(httpResponse, jsonBodyToHttpSuccessResult);
}

public static HttpSuccessResult<XyzFeature, XyzFeatureCodec> prepareReadResult(
final @NotNull List<XyzFeature> features) {
// Create ForwardCursor with input features
final List<XyzFeatureCodec> codecs = new ArrayList<>();
final XyzFeatureCodecFactory codecFactory = XyzFeatureCodecFactory.get();
for (final XyzFeature feature : features) {
final XyzFeatureCodec codec = codecFactory.newInstance();
codec.setOp(EExecutedOp.READ);
codec.setFeature(feature);
codec.setId(feature.getId());
codecs.add(codec);
}

final HeapCacheCursor<XyzFeature, XyzFeatureCodec> cursor = new HeapCacheCursor<>(codecFactory, codecs, null);
return new HttpSuccessResult<>(cursor);
}

public static Result prepareWriteResult(HttpResponse<byte[]> httpResponse) {
Function<String, HttpSuccessResult<XyzFeature, XyzFeatureCodec>> jsonBodyToHttpSuccessResult = jsonBody -> {
XyzFeatureCollection typedBody = JsonSerializable.deserialize(jsonBody, XyzFeatureCollection.class);
return prepareWriteResult(typedBody);
};

return prepareResult(httpResponse, jsonBodyToHttpSuccessResult);
}

private static Result prepareResult(
HttpResponse<byte[]> httpResponse,
Function<String, HttpSuccessResult<XyzFeature, XyzFeatureCodec>> jsonBodyToHttpSuccessResult) {
int httpStatusCode = httpResponse.statusCode();
if (!isSuccessStatus(httpStatusCode)) {
XyzError error = mapHttpStatusCodeToError(httpStatusCode);
return new ErrorResult(error, "Response http status code: " + httpResponse.statusCode());
}

String body = getDecodedBody(httpResponse);
String jsonBody = getDecodedBody(httpResponse);
try {
T resultFeatures = JsonSerializable.deserialize(body, httpResponseType);
return prepareResult(typedResponseToFeatureList.apply(resultFeatures));
return jsonBodyToHttpSuccessResult.apply(jsonBody);
}
// Some storages may return success status code but contain ErrorResponse.
// UncheckedIOException is thrown then because ErrorResponse cannot be cast to expected httpResponseType.
// UncheckedIOException is thrown then because ErrorResponse cannot be cast to expected httpResponseBodyType.
catch (UncheckedIOException e) {
ErrorResponse errorResponse = JsonSerializable.deserialize(body, ErrorResponse.class);
ErrorResponse errorResponse = JsonSerializable.deserialize(jsonBody, ErrorResponse.class);
return new ErrorResult(errorResponse.getError(), "Error response : " + errorResponse.getErrorMessage());
}
}

private static HttpSuccessResult<XyzFeature, XyzFeatureCodec> prepareWriteResult(
XyzFeatureCollection xyzCollection) {

final XyzFeatureCodecFactory codecFactory = XyzFeatureCodecFactory.get();

final List<XyzFeatureCodec> codecs = xyzCollection.getFeatures().stream()
.map(feature -> {
final XyzFeatureCodec codec = codecFactory.newInstance();
codec.setFeature(feature);
codec.setOp(getOp(feature, xyzCollection));
codec.setId(feature.getId());
return codec;
})
.toList();

final HeapCacheCursor<XyzFeature, XyzFeatureCodec> cursor = new HeapCacheCursor<>(codecFactory, codecs, null);
return new HttpSuccessResult<>(cursor);
}

private static EExecutedOp getOp(XyzFeature feature, XyzFeatureCollection xyzCollection) {
String id = feature.getId();
if (xyzCollection.getInserted() != null && xyzCollection.getInserted().contains(id)) return EExecutedOp.CREATED;
if (xyzCollection.getUpdated() != null && xyzCollection.getUpdated().contains(id)) return EExecutedOp.UPDATED;
if (xyzCollection.getDeleted() != null && xyzCollection.getDeleted().contains(id)) return EExecutedOp.DELETED;
return EExecutedOp.READ;
}

/**
* Decodes byte[] body into String basing on "content-encoding"
* or UTF-8 if no encoding provided.
Expand All @@ -98,23 +159,6 @@ private static String gzipDecode(byte[] encoded) {
}
}

static HttpSuccessResult<XyzFeature, XyzFeatureCodec> createHttpResultFromFeatureList(
final @NotNull List<XyzFeature> features) {
// Create ForwardCursor with input features
final List<XyzFeatureCodec> codecs = new ArrayList<>();
final XyzFeatureCodecFactory codecFactory = XyzFeatureCodecFactory.get();
for (final XyzFeature feature : features) {
final XyzFeatureCodec codec = codecFactory.newInstance();
codec.setOp(EExecutedOp.READ);
codec.setFeature(feature);
codec.setId(feature.getId());
codecs.add(codec);
}

final HeapCacheCursor<XyzFeature, XyzFeatureCodec> cursor = new HeapCacheCursor<>(codecFactory, codecs, null);
return new HttpSuccessResult<>(cursor);
}

private static boolean isSuccessStatus(final int httpStatus) {
return (httpStatus >= 200 && httpStatus <= 299);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public HttpResponse<byte[]> sendRequest(@NotNull String endpoint, @Nullable Map<
return sendRequest(endpoint, true, addHeaders, null, null);
}

public HttpResponse<byte[]> post(String body) {
return sendRequest("", true, null, "POST", body);
}

public HttpResponse<byte[]> sendRequest(
@NotNull String endpoint,
boolean keepDefHeaders,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public static Result execute(NakshaContext context, ReadFeaturesProxyWrapper req
event.setStreamId(streamId);

String jsonEvent = JsonSerializable.serialize(event);
HttpResponse<byte[]> httpResponse = post(sender, jsonEvent);
HttpResponse<byte[]> httpResponse = sender.post(jsonEvent);

return PrepareResult.prepareResult(httpResponse, XyzFeatureCollection.class, XyzFeatureCollection::getFeatures);
return PrepareResult.prepareReadResult(
httpResponse, XyzFeatureCollection.class, XyzFeatureCollection::getFeatures);
}

private static Event createIterateEvent(ReadFeaturesProxyWrapper request) {
Expand Down Expand Up @@ -110,10 +111,6 @@ static void setPropertyOp(ReadFeatures request, QueryEvent getFeaturesByBBoxEven
}
}

private static HttpResponse<byte[]> post(RequestSender sender, String body) {
return sender.sendRequest("", true, null, "POST", body);
}

private static Event createFeaturesByTileEvent(ReadFeaturesProxyWrapper readRequest) {
String tileType = readRequest.getQueryParameter(TILE_TYPE);
if (TILE_TYPE_QUADKEY.equals(tileType)) {
Expand Down
Loading
Loading