Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove create from store commands and use apply only #19

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/greetings/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ func (h *PortalHandler) post(rw http.ResponseWriter, req *http.Request) {
client := hz.NewClient(
h.Conn,
hz.WithClientSessionFromRequest(req),
hz.WithClientDefaultManager(),
)
greetClient := hz.ObjectClient[Greeting]{Client: client}
if err := greetClient.Create(req.Context(), greeting); err != nil {
if _, err := greetClient.Apply(req.Context(), greeting, hz.WithApplyCreateOnly(true)); err != nil {
_ = rendr.greetingsControllerForm(reqName, err).
Render(req.Context(), rw)
return
Expand Down
1 change: 0 additions & 1 deletion pkg/extensions/core/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestSecrets(t *testing.T) {
client := hz.NewClient(
ts.Conn,
hz.WithClientInternal(true),
hz.WithClientDefaultManager(),
)
_, err = client.Apply(ctx, hz.WithApplyObject(secret))
tu.AssertNoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (d *DefaultHandler) PostNamespaces(
Namespace: hz.RootNamespace,
},
}
err := nsClient.Create(r.Context(), ns)
_, err := nsClient.Apply(r.Context(), ns, hz.WithApplyCreateOnly(true))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (o *ObjectsHandler) create(w http.ResponseWriter, r *http.Request) {
)
return
}
if err := client.Create(r.Context(), hz.WithCreateObject(obj)); err != nil {
if _, err := client.Apply(r.Context(), hz.WithApplyObject(obj)); err != nil {
httpError(w, err)
return
}
Expand Down
122 changes: 16 additions & 106 deletions pkg/hz/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
const (
HeaderStatus = "Hz-Status"
HeaderAuthorization = "Hz-Authorization"
HeaderApplyCreateOnly = "Hz-Apply-Create-Only"
HeaderApplyFieldManager = "Hz-Apply-Field-Manager"
HeaderApplyForceConflicts = "Hz-Apply-Force-Conflicts"
)
Expand Down Expand Up @@ -102,13 +103,6 @@ type ObjectClient[T Objecter] struct {
Client Client
}

func (oc ObjectClient[T]) Create(
ctx context.Context,
object T,
) error {
return oc.Client.Create(ctx, WithCreateObject(object))
}

func (oc ObjectClient[T]) Apply(
ctx context.Context,
object T,
Expand Down Expand Up @@ -252,12 +246,6 @@ func WithClientSessionFromRequest(req *http.Request) ClientOption {
}
}

func WithClientDefaultManager() ClientOption {
return func(co *clientOpts) {
co.manager = "hzctl"
}
}

func WithClientManager(manager string) ClientOption {
return func(co *clientOpts) {
co.manager = manager
Expand All @@ -271,7 +259,9 @@ type clientOpts struct {
}

func NewClient(conn *nats.Conn, opts ...ClientOption) Client {
co := clientOpts{}
co := clientOpts{
manager: "hzctl",
}
for _, opt := range opts {
opt(&co)
}
Expand Down Expand Up @@ -433,9 +423,10 @@ func (c Client) Validate(
type ApplyOption func(*applyOptions)

type applyOptions struct {
object Objecter
data []byte
force bool
object Objecter
data []byte
force bool
createOnly bool
}

func WithApplyObject(object Objecter) ApplyOption {
Expand All @@ -456,6 +447,13 @@ func WithApplyForce(force bool) ApplyOption {
}
}

// WithApplyCreateOnly will apply the object only if it does not exist.
func WithApplyCreateOnly(createOnly bool) ApplyOption {
return func(ao *applyOptions) {
ao.createOnly = createOnly
}
}

type ApplyOpResult string

const (
Expand Down Expand Up @@ -523,6 +521,7 @@ func (c Client) Apply(
key.ObjectName(),
),
)
msg.Header.Set(HeaderApplyCreateOnly, strconv.FormatBool(ao.createOnly))
msg.Header.Set(HeaderApplyFieldManager, c.Manager)
msg.Header.Set(HeaderApplyForceConflicts, strconv.FormatBool(ao.force))
msg.Header.Set(HeaderAuthorization, c.Session)
Expand Down Expand Up @@ -563,95 +562,6 @@ func (c Client) Apply(
}
}

type CreateOption func(*createOptions)

type createOptions struct {
object Objecter
data []byte
}

func WithCreateObject(object Objecter) CreateOption {
return func(ao *createOptions) {
ao.object = object
}
}

func WithCreateData(data []byte) CreateOption {
return func(ao *createOptions) {
ao.data = data
}
}

func (c *Client) Create(
ctx context.Context,
opts ...CreateOption,
) error {
if err := c.checkSession(); err != nil {
return err
}
co := createOptions{}
for _, opt := range opts {
opt(&co)
}

var (
key ObjectKeyer
data []byte
)
if co.object != nil {
var err error
data, err = c.marshalObjectWithTypeFields(co.object)
if err != nil {
return fmt.Errorf("marshalling object: %w", err)
}
key = co.object
}
if co.data != nil {
var obj MetaOnlyObject
if err := json.Unmarshal(co.data, &obj); err != nil {
return fmt.Errorf("unmarshalling data: %w", err)
}
key = obj
data = co.data
}
if key == nil {
return fmt.Errorf("create: %w", ErrClientObjectOrDataRequired)
}
if data == nil {
return fmt.Errorf("create: %w", ErrClientObjectOrDataRequired)
}

if err := validateKeyStrict(key); err != nil {
return fmt.Errorf("invalid key: %w", err)
}

msg := nats.NewMsg(
c.SubjectPrefix() + fmt.Sprintf(
SubjectStoreCreate,
key.ObjectGroup(),
key.ObjectVersion(),
key.ObjectKind(),
key.ObjectNamespace(),
key.ObjectName(),
),
)
msg.Data = data
msg.Header.Set(HeaderAuthorization, c.Session)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
reply, err := c.Conn.RequestMsgWithContext(
ctx,
msg,
)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
return ErrStoreNotResponding
}
return fmt.Errorf("making request to store: %w", err)
}
return ErrorFromNATS(reply)
}

type GetOption func(*getOptions)

func WithGetKey(key ObjectKeyer) GetOption {
Expand Down
13 changes: 7 additions & 6 deletions pkg/hz/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestReconciler(t *testing.T) {
Name: "dummy",
},
}
err = dummyClient.Create(ctx, do)
_, err = dummyClient.Apply(ctx, do, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)

childClient := hz.ObjectClient[ChildObject]{Client: client}
Expand All @@ -91,7 +91,7 @@ func TestReconciler(t *testing.T) {
},
}

err = childClient.Create(ctx, co)
_, err = childClient.Apply(ctx, co, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)

time.Sleep(time.Second * 1)
Expand Down Expand Up @@ -137,11 +137,13 @@ func TestReconcilerPanic(t *testing.T) {
},
}

err = dummyClient.Create(ctx, do)
_, err = dummyClient.Apply(ctx, do, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)
// If we publish messages too quickly the reconciler will only get the last.
// Add a little sleep to make sure both messages get handled.
time.Sleep(time.Second)
// Make a change otherwise apply is no-op.
do.Labels = map[string]string{"foo": "bar"}
_, err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)

Expand Down Expand Up @@ -261,7 +263,6 @@ func TestReconcilerWaitForFinish(t *testing.T) {
client := hz.NewClient(
ti.Conn,
hz.WithClientInternal(true),
hz.WithClientDefaultManager(),
)
dummyClient := hz.ObjectClient[DummyObject]{Client: client}

Expand Down Expand Up @@ -392,9 +393,9 @@ func TestReconcilerConcurrent(t *testing.T) {
},
}
go func() {
err = dummyClient.Create(ctx, do)
_, err = dummyClient.Apply(ctx, do, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)
err = childClient.Create(ctx, co)
_, err = childClient.Apply(ctx, co, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)
for i := 0; i < 50; i++ {
_, err = dummyClient.Apply(ctx, do)
Expand Down
1 change: 0 additions & 1 deletion pkg/hz/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (e *PortalHandler) Start(ctx context.Context) error {
client := NewClient(
e.conn,
WithClientInternal(true),
WithClientDefaultManager(),
)
extClient := ObjectClient[Portal]{Client: client}
// TODO: field manager.
Expand Down
33 changes: 11 additions & 22 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -353,29 +354,17 @@ func (s *Server) checkRootNamespaceObject(
nsClient := hz.ObjectClient[core.Namespace]{
Client: hz.NewClient(s.Conn, hz.WithClientInternal(true)),
}
if _, err := nsClient.Get(
ctx,
hz.WithGetKey(hz.ObjectKey{
applyOp, err := nsClient.Apply(ctx, core.Namespace{
ObjectMeta: hz.ObjectMeta{
Name: hz.RootNamespace,
Namespace: hz.RootNamespace,
}),
); err != nil {
if !errors.Is(err, hz.ErrNotFound) {
return fmt.Errorf("get root namespace: %w", err)
}
fmt.Println("Checking root namespace object: not found, creating...")
// If the root namespace is not found, we need to create it.
if err := nsClient.Create(ctx, core.Namespace{
ObjectMeta: hz.ObjectMeta{
Name: hz.RootNamespace,
Namespace: hz.RootNamespace,
},
Spec: &core.NamespaceSpec{},
Status: &core.NamespaceStatus{},
}); err != nil {
return fmt.Errorf("create root namespace: %w", err)
}
}

},
Spec: &core.NamespaceSpec{},
Status: &core.NamespaceStatus{},
})
if err != nil {
return fmt.Errorf("apply root namespace: %w", err)
}
slog.Info("applied root namespace", "op", applyOp)
return nil
}
16 changes: 14 additions & 2 deletions pkg/store/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ type ApplyRequest struct {
// Manager is the name of the field manager for this request.
Manager string
// Force will force the apply to happen even if there are conflicts.
Force bool
Key hz.ObjectKeyer
Force bool
Key hz.ObjectKeyer
IsCreate bool
}

// Apply performs the apply operation on the given request.
Expand Down Expand Up @@ -90,6 +91,17 @@ func (s *Store) Apply(ctx context.Context, req ApplyRequest) (int, error) {

// If the object already exists we need to perform a merge of the objects
// and managed fields.
// Check if this is a create request. If it is and the object already
// exists, return a conflict error.
if req.IsCreate {
return -1, &hz.Error{
Status: http.StatusConflict,
Message: fmt.Sprintf(
"object already exists: %q",
req.Key,
),
}
}
// Decode the existing object's managed fields.
var generic hz.GenericObject
if err := json.Unmarshal(rawObj, &generic); err != nil {
Expand Down
Loading
Loading