Skip to content

Commit

Permalink
remove create from store commands and use apply only
Browse files Browse the repository at this point in the history
resolves #10
  • Loading branch information
jlarfors committed Nov 15, 2024
1 parent 1758d15 commit 3037d63
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 204 deletions.
3 changes: 1 addition & 2 deletions examples/greetings/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ func (h *PortalHandler) post(rw http.ResponseWriter, req *http.Request) {
client := hz.NewClient(
h.Conn,
hz.WithClientSessionFromRequest(req),
hz.WithClientDefaultManager(),
)
greetClient := hz.ObjectClient[Greeting]{Client: client}
if err := greetClient.Create(req.Context(), greeting); err != nil {
if _, err := greetClient.Apply(req.Context(), greeting, hz.WithApplyCreateOnly(true)); err != nil {
_ = rendr.greetingsControllerForm(reqName, err).
Render(req.Context(), rw)
return
Expand Down
1 change: 0 additions & 1 deletion pkg/extensions/core/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestSecrets(t *testing.T) {
client := hz.NewClient(
ts.Conn,
hz.WithClientInternal(true),
hz.WithClientDefaultManager(),
)
_, err = client.Apply(ctx, hz.WithApplyObject(secret))
tu.AssertNoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (d *DefaultHandler) PostNamespaces(
Namespace: hz.RootNamespace,
},
}
err := nsClient.Create(r.Context(), ns)
_, err := nsClient.Apply(r.Context(), ns, hz.WithApplyCreateOnly(true))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (o *ObjectsHandler) create(w http.ResponseWriter, r *http.Request) {
)
return
}
if err := client.Create(r.Context(), hz.WithCreateObject(obj)); err != nil {
if _, err := client.Apply(r.Context(), hz.WithApplyObject(obj)); err != nil {
httpError(w, err)
return
}
Expand Down
122 changes: 16 additions & 106 deletions pkg/hz/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
const (
HeaderStatus = "Hz-Status"
HeaderAuthorization = "Hz-Authorization"
HeaderApplyCreateOnly = "Hz-Apply-Create-Only"
HeaderApplyFieldManager = "Hz-Apply-Field-Manager"
HeaderApplyForceConflicts = "Hz-Apply-Force-Conflicts"
)
Expand Down Expand Up @@ -102,13 +103,6 @@ type ObjectClient[T Objecter] struct {
Client Client
}

func (oc ObjectClient[T]) Create(
ctx context.Context,
object T,
) error {
return oc.Client.Create(ctx, WithCreateObject(object))
}

func (oc ObjectClient[T]) Apply(
ctx context.Context,
object T,
Expand Down Expand Up @@ -252,12 +246,6 @@ func WithClientSessionFromRequest(req *http.Request) ClientOption {
}
}

func WithClientDefaultManager() ClientOption {
return func(co *clientOpts) {
co.manager = "hzctl"
}
}

func WithClientManager(manager string) ClientOption {
return func(co *clientOpts) {
co.manager = manager
Expand All @@ -271,7 +259,9 @@ type clientOpts struct {
}

func NewClient(conn *nats.Conn, opts ...ClientOption) Client {
co := clientOpts{}
co := clientOpts{
manager: "hzctl",
}
for _, opt := range opts {
opt(&co)
}
Expand Down Expand Up @@ -433,9 +423,10 @@ func (c Client) Validate(
type ApplyOption func(*applyOptions)

type applyOptions struct {
object Objecter
data []byte
force bool
object Objecter
data []byte
force bool
createOnly bool
}

func WithApplyObject(object Objecter) ApplyOption {
Expand All @@ -456,6 +447,13 @@ func WithApplyForce(force bool) ApplyOption {
}
}

// WithApplyCreateOnly will apply the object only if it does not exist.
func WithApplyCreateOnly(createOnly bool) ApplyOption {
return func(ao *applyOptions) {
ao.createOnly = createOnly
}
}

type ApplyOpResult string

const (
Expand Down Expand Up @@ -523,6 +521,7 @@ func (c Client) Apply(
key.ObjectName(),
),
)
msg.Header.Set(HeaderApplyCreateOnly, strconv.FormatBool(ao.createOnly))
msg.Header.Set(HeaderApplyFieldManager, c.Manager)
msg.Header.Set(HeaderApplyForceConflicts, strconv.FormatBool(ao.force))
msg.Header.Set(HeaderAuthorization, c.Session)
Expand Down Expand Up @@ -563,95 +562,6 @@ func (c Client) Apply(
}
}

type CreateOption func(*createOptions)

type createOptions struct {
object Objecter
data []byte
}

func WithCreateObject(object Objecter) CreateOption {
return func(ao *createOptions) {
ao.object = object
}
}

func WithCreateData(data []byte) CreateOption {
return func(ao *createOptions) {
ao.data = data
}
}

func (c *Client) Create(
ctx context.Context,
opts ...CreateOption,
) error {
if err := c.checkSession(); err != nil {
return err
}
co := createOptions{}
for _, opt := range opts {
opt(&co)
}

var (
key ObjectKeyer
data []byte
)
if co.object != nil {
var err error
data, err = c.marshalObjectWithTypeFields(co.object)
if err != nil {
return fmt.Errorf("marshalling object: %w", err)
}
key = co.object
}
if co.data != nil {
var obj MetaOnlyObject
if err := json.Unmarshal(co.data, &obj); err != nil {
return fmt.Errorf("unmarshalling data: %w", err)
}
key = obj
data = co.data
}
if key == nil {
return fmt.Errorf("create: %w", ErrClientObjectOrDataRequired)
}
if data == nil {
return fmt.Errorf("create: %w", ErrClientObjectOrDataRequired)
}

if err := validateKeyStrict(key); err != nil {
return fmt.Errorf("invalid key: %w", err)
}

msg := nats.NewMsg(
c.SubjectPrefix() + fmt.Sprintf(
SubjectStoreCreate,
key.ObjectGroup(),
key.ObjectVersion(),
key.ObjectKind(),
key.ObjectNamespace(),
key.ObjectName(),
),
)
msg.Data = data
msg.Header.Set(HeaderAuthorization, c.Session)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
reply, err := c.Conn.RequestMsgWithContext(
ctx,
msg,
)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
return ErrStoreNotResponding
}
return fmt.Errorf("making request to store: %w", err)
}
return ErrorFromNATS(reply)
}

type GetOption func(*getOptions)

func WithGetKey(key ObjectKeyer) GetOption {
Expand Down
13 changes: 7 additions & 6 deletions pkg/hz/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestReconciler(t *testing.T) {
Name: "dummy",
},
}
err = dummyClient.Create(ctx, do)
_, err = dummyClient.Apply(ctx, do, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)

childClient := hz.ObjectClient[ChildObject]{Client: client}
Expand All @@ -91,7 +91,7 @@ func TestReconciler(t *testing.T) {
},
}

err = childClient.Create(ctx, co)
_, err = childClient.Apply(ctx, co, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)

time.Sleep(time.Second * 1)
Expand Down Expand Up @@ -137,11 +137,13 @@ func TestReconcilerPanic(t *testing.T) {
},
}

err = dummyClient.Create(ctx, do)
_, err = dummyClient.Apply(ctx, do, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)
// If we publish messages too quickly the reconciler will only get the last.
// Add a little sleep to make sure both messages get handled.
time.Sleep(time.Second)
// Make a change otherwise apply is no-op.
do.Labels = map[string]string{"foo": "bar"}
_, err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)

Expand Down Expand Up @@ -261,7 +263,6 @@ func TestReconcilerWaitForFinish(t *testing.T) {
client := hz.NewClient(
ti.Conn,
hz.WithClientInternal(true),
hz.WithClientDefaultManager(),
)
dummyClient := hz.ObjectClient[DummyObject]{Client: client}

Expand Down Expand Up @@ -392,9 +393,9 @@ func TestReconcilerConcurrent(t *testing.T) {
},
}
go func() {
err = dummyClient.Create(ctx, do)
_, err = dummyClient.Apply(ctx, do, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)
err = childClient.Create(ctx, co)
_, err = childClient.Apply(ctx, co, hz.WithApplyCreateOnly(true))
tu.AssertNoError(t, err)
for i := 0; i < 50; i++ {
_, err = dummyClient.Apply(ctx, do)
Expand Down
1 change: 0 additions & 1 deletion pkg/hz/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (e *PortalHandler) Start(ctx context.Context) error {
client := NewClient(
e.conn,
WithClientInternal(true),
WithClientDefaultManager(),
)
extClient := ObjectClient[Portal]{Client: client}
// TODO: field manager.
Expand Down
33 changes: 11 additions & 22 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -353,29 +354,17 @@ func (s *Server) checkRootNamespaceObject(
nsClient := hz.ObjectClient[core.Namespace]{
Client: hz.NewClient(s.Conn, hz.WithClientInternal(true)),
}
if _, err := nsClient.Get(
ctx,
hz.WithGetKey(hz.ObjectKey{
applyOp, err := nsClient.Apply(ctx, core.Namespace{
ObjectMeta: hz.ObjectMeta{
Name: hz.RootNamespace,
Namespace: hz.RootNamespace,
}),
); err != nil {
if !errors.Is(err, hz.ErrNotFound) {
return fmt.Errorf("get root namespace: %w", err)
}
fmt.Println("Checking root namespace object: not found, creating...")
// If the root namespace is not found, we need to create it.
if err := nsClient.Create(ctx, core.Namespace{
ObjectMeta: hz.ObjectMeta{
Name: hz.RootNamespace,
Namespace: hz.RootNamespace,
},
Spec: &core.NamespaceSpec{},
Status: &core.NamespaceStatus{},
}); err != nil {
return fmt.Errorf("create root namespace: %w", err)
}
}

},
Spec: &core.NamespaceSpec{},
Status: &core.NamespaceStatus{},
})
if err != nil {
return fmt.Errorf("apply root namespace: %w", err)
}
slog.Info("applied root namespace", "op", applyOp)
return nil
}
16 changes: 14 additions & 2 deletions pkg/store/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ type ApplyRequest struct {
// Manager is the name of the field manager for this request.
Manager string
// Force will force the apply to happen even if there are conflicts.
Force bool
Key hz.ObjectKeyer
Force bool
Key hz.ObjectKeyer
IsCreate bool
}

// Apply performs the apply operation on the given request.
Expand Down Expand Up @@ -90,6 +91,17 @@ func (s *Store) Apply(ctx context.Context, req ApplyRequest) (int, error) {

// If the object already exists we need to perform a merge of the objects
// and managed fields.
// Check if this is a create request. If it is and the object already
// exists, return a conflict error.
if req.IsCreate {
return -1, &hz.Error{
Status: http.StatusConflict,
Message: fmt.Sprintf(
"object already exists: %q",
req.Key,
),
}
}
// Decode the existing object's managed fields.
var generic hz.GenericObject
if err := json.Unmarshal(rawObj, &generic); err != nil {
Expand Down
Loading

0 comments on commit 3037d63

Please sign in to comment.