diff --git a/VERSION b/VERSION index d2d61a7..e2cac26 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.2.2 \ No newline at end of file +1.2.3 \ No newline at end of file diff --git a/domain/cache/options.go b/domain/cache/options.go index fd60450..ebbbef2 100644 --- a/domain/cache/options.go +++ b/domain/cache/options.go @@ -4,5 +4,5 @@ package cache type SetOptions struct { Key string Value string - TTL *int64 + TTL *uint64 } diff --git a/domain/command/messaging.go b/domain/command/messaging.go new file mode 100644 index 0000000..006632b --- /dev/null +++ b/domain/command/messaging.go @@ -0,0 +1,8 @@ +package command + +type MessagingCommand interface { + ID() string + Type() CommandType + Topic() string + Data() string +} diff --git a/domain/command/store.go b/domain/command/store.go new file mode 100644 index 0000000..71205e7 --- /dev/null +++ b/domain/command/store.go @@ -0,0 +1,15 @@ +package command + +import "net" + +type StoreCommand interface { + ID() string + Type() CommandType + Key() string + Value() *string + TTL() *uint64 + + ReplyError(conn net.Conn, response string) + ReplySuccess(conn net.Conn) + ReplyOK(conn net.Conn, response string) +} diff --git a/pkg/server/command/types.go b/domain/command/types.go similarity index 100% rename from pkg/server/command/types.go rename to domain/command/types.go diff --git a/domain/pubsub/message.go b/domain/pubsub/message.go index 0d30bb3..33a87bb 100644 --- a/domain/pubsub/message.go +++ b/domain/pubsub/message.go @@ -1,6 +1,14 @@ package pubsub +import "famcache/domain/command" + type Message interface { - GetTopic() string - GetData() string + ID() string + Topic() string + Data() string + Recipient() string + CreatedAt() int64 + RetryCount() uint + IncrementRetryCount() + ToMessagingCommand() command.MessagingCommand } diff --git a/domain/pubsub/queue.go b/domain/pubsub/queue.go index 4ad12c8..e6c0b7b 100644 --- a/domain/pubsub/queue.go +++ b/domain/pubsub/queue.go @@ -1,5 +1,7 @@ package pubsub type Queue interface { - Retry(message Message) + Enqueue(message Message) + Remove(messageId string) + Batch() []Message } diff --git a/pkg/server/actor/root.go b/pkg/actor/actor.go similarity index 86% rename from pkg/server/actor/root.go rename to pkg/actor/actor.go index f9fe909..6808cf3 100644 --- a/pkg/server/actor/root.go +++ b/pkg/actor/actor.go @@ -7,6 +7,7 @@ import ( "famcache/domain/pubsub" conn "famcache/pkg/connection" queue "famcache/pkg/pubsub" + "time" ) type Actor struct { @@ -14,6 +15,7 @@ type Actor struct { cache *cache.Cache messagingQueue pubsub.Queue peers connection.PeersManager + queueTicker time.Ticker } func (a *Actor) Peers() *connection.PeersManager { @@ -23,11 +25,13 @@ func (a *Actor) Peers() *connection.PeersManager { func NewActor(logger *logger.Logger, cache *cache.Cache) Actor { messagingQueue := queue.NewPubsubQueue() peers := conn.NewPeersManager() + queueTicker := *time.NewTicker(1 * time.Second) return Actor{ logger, cache, messagingQueue, peers, + queueTicker, } } diff --git a/pkg/actor/actor_impl.go b/pkg/actor/actor_impl.go new file mode 100644 index 0000000..e110a05 --- /dev/null +++ b/pkg/actor/actor_impl.go @@ -0,0 +1,24 @@ +package actor + +func (actor *Actor) ListenFailedMessages() { + for { + <-actor.queueTicker.C + + for _, message := range actor.messagingQueue.Batch() { + peer := actor.peers.GetById(message.Recipient()) + + if peer == nil { + actor.messagingQueue.Remove(message.ID()) + continue + } + + err := actor.Publish(peer, message.ToMessagingCommand()) + + if err == nil { + actor.messagingQueue.Remove(message.ID()) + } else { + message.IncrementRetryCount() + } + } + } +} diff --git a/pkg/server/actor/delete.go b/pkg/actor/delete.go similarity index 56% rename from pkg/server/actor/delete.go rename to pkg/actor/delete.go index 7d48780..ef6bd2a 100644 --- a/pkg/server/actor/delete.go +++ b/pkg/actor/delete.go @@ -1,15 +1,15 @@ package actor import ( - "famcache/pkg/server/command" + "famcache/domain/command" "net" ) -func (actor *Actor) Delete(conn net.Conn, query *command.StoreCommand) { +func (actor *Actor) Delete(conn net.Conn, query command.StoreCommand) { sCache := *actor.cache logger := *actor.logger - err := sCache.Delete(query.Key) + err := sCache.Delete(query.Key()) if err != nil { logger.Error("Error deleting key") @@ -19,7 +19,7 @@ func (actor *Actor) Delete(conn net.Conn, query *command.StoreCommand) { return } - logger.Info("DELETE", query.Key) + logger.Info("DELETE", query.Key()) query.ReplySuccess(conn) } diff --git a/pkg/server/actor/get.go b/pkg/actor/get.go similarity index 54% rename from pkg/server/actor/get.go rename to pkg/actor/get.go index d0e6be9..382f742 100644 --- a/pkg/server/actor/get.go +++ b/pkg/actor/get.go @@ -1,15 +1,15 @@ package actor import ( - "famcache/pkg/server/command" + "famcache/domain/command" "net" ) -func (actor *Actor) Get(conn net.Conn, query *command.StoreCommand) { +func (actor *Actor) Get(conn net.Conn, query command.StoreCommand) { sCache := *actor.cache logger := *actor.logger - value, err := sCache.Get(query.Key) + value, err := sCache.Get(query.Key()) if err != nil { logger.Error("Error getting key") @@ -17,7 +17,7 @@ func (actor *Actor) Get(conn net.Conn, query *command.StoreCommand) { query.ReplyError(conn, err.Error()) } - logger.Info("GET", query.Key, value) + logger.Info("GET", query.Key(), value) query.ReplyOK(conn, value) } diff --git a/pkg/server/actor/publish.go b/pkg/actor/publish.go similarity index 56% rename from pkg/server/actor/publish.go rename to pkg/actor/publish.go index f093504..20cca80 100644 --- a/pkg/server/actor/publish.go +++ b/pkg/actor/publish.go @@ -1,29 +1,33 @@ package actor import ( + "famcache/domain/command" "famcache/domain/connection" "famcache/pkg/pubsub" - "famcache/pkg/server/command" ) -func (actor *Actor) Publish(peer connection.Peer, message *command.MessagingCommand) error { +func (actor *Actor) Publish(peer connection.Peer, message command.MessagingCommand) error { logger := *actor.logger - logger.Info("Peer " + peer.ID() + " published topic " + message.Topic) + logger.Info("Peer " + peer.ID() + " published topic " + message.Topic()) for _, p := range *actor.peers.All() { if p.ID() == peer.ID() { continue } + if !p.Subscriptions().IsSubscribed(message.Topic()) { + continue + } + // Try to publish the message to the peer immediately // If the peer is not available, keep the message in the queue // and retry later - err := p.Publish(message.Topic, message.Data) + err := p.Publish(message.Topic(), message.Data()) if err != nil { // TODO: Retry logic - actor.messagingQueue.Retry(pubsub.NewPubsubMessage(peer.ID(), message.Topic, message.Data)) + actor.messagingQueue.Enqueue(pubsub.NewPubsubMessage(peer.ID(), message.Topic(), message.Data())) } } diff --git a/pkg/server/actor/set.go b/pkg/actor/set.go similarity index 57% rename from pkg/server/actor/set.go rename to pkg/actor/set.go index 26e0f9c..b1f4902 100644 --- a/pkg/server/actor/set.go +++ b/pkg/actor/set.go @@ -2,18 +2,18 @@ package actor import ( "famcache/domain/cache" - "famcache/pkg/server/command" + "famcache/domain/command" "net" ) -func (actor *Actor) Set(conn net.Conn, query *command.StoreCommand) { +func (actor *Actor) Set(conn net.Conn, query command.StoreCommand) { sCache := *actor.cache logger := *actor.logger err := sCache.Set(cache.SetOptions{ - Key: query.Key, - Value: *query.Value, - TTL: query.TTL, + Key: query.Key(), + Value: *query.Value(), + TTL: query.TTL(), }) if err != nil { @@ -24,7 +24,7 @@ func (actor *Actor) Set(conn net.Conn, query *command.StoreCommand) { return } - logger.Info("SET", query.Key, *query.Value) + logger.Info("SET", query.Key(), *query.Value()) query.ReplySuccess(conn) } diff --git a/pkg/server/actor/subscribe.go b/pkg/actor/subscribe.go similarity index 53% rename from pkg/server/actor/subscribe.go rename to pkg/actor/subscribe.go index b6e761b..796b88d 100644 --- a/pkg/server/actor/subscribe.go +++ b/pkg/actor/subscribe.go @@ -1,16 +1,16 @@ package actor import ( + "famcache/domain/command" "famcache/domain/connection" - "famcache/pkg/server/command" ) -func (actor *Actor) Subscribe(peer connection.Peer, query *command.MessagingCommand) { +func (actor *Actor) Subscribe(peer connection.Peer, query command.MessagingCommand) { logger := *actor.logger - logger.Info("Peer" + peer.ID() + " subscribed to the topic: " + query.Topic) + logger.Info("Peer" + peer.ID() + " subscribed to the topic: " + query.Topic()) sub := peer.Subscriptions() - sub.Add(query.Topic) + sub.Add(query.Topic()) } diff --git a/pkg/server/actor/unsubscribe.go b/pkg/actor/unsubscribe.go similarity index 53% rename from pkg/server/actor/unsubscribe.go rename to pkg/actor/unsubscribe.go index 269357e..c8b3612 100644 --- a/pkg/server/actor/unsubscribe.go +++ b/pkg/actor/unsubscribe.go @@ -1,16 +1,16 @@ package actor import ( + "famcache/domain/command" "famcache/domain/connection" - "famcache/pkg/server/command" ) -func (actor *Actor) Unsubscribe(peer connection.Peer, query *command.MessagingCommand) { +func (actor *Actor) Unsubscribe(peer connection.Peer, query command.MessagingCommand) { logger := *actor.logger - logger.Info("Peer" + peer.ID() + " unsubscribed from topic " + query.Topic) + logger.Info("Peer" + peer.ID() + " unsubscribed from topic " + query.Topic()) sub := peer.Subscriptions() - sub.Remove(query.Topic) + sub.Remove(query.Topic()) } diff --git a/pkg/cache/stored_value.go b/pkg/cache/stored_value.go index 7d9548c..6412569 100644 --- a/pkg/cache/stored_value.go +++ b/pkg/cache/stored_value.go @@ -6,11 +6,11 @@ import ( type StoredValue struct { Value string - TTL *int64 + TTL *uint64 CreatedAt int64 } -func NewStoredValue(value string, ttl *int64) (*StoredValue, error) { +func NewStoredValue(value string, ttl *uint64) (*StoredValue, error) { return &StoredValue{ Value: value, TTL: ttl, diff --git a/pkg/cache/ticker.go b/pkg/cache/ticker.go index 6d09131..f5474bb 100644 --- a/pkg/cache/ticker.go +++ b/pkg/cache/ticker.go @@ -17,7 +17,7 @@ func (c *Cache) CleanupExpired() { elapsed := time.Now().Unix() - storedValue.CreatedAt // Check if the stored value has expired - if elapsed >= *storedValue.TTL/1000 { + if elapsed >= int64(*storedValue.TTL)/1000 { delete(c.storage, key) } } diff --git a/pkg/command/command.go b/pkg/command/command.go new file mode 100644 index 0000000..251d64e --- /dev/null +++ b/pkg/command/command.go @@ -0,0 +1,61 @@ +package command + +import ( + "famcache/domain" + "famcache/domain/command" + "strings" +) + +var strToCommandType = map[string]command.CommandType{ + "GET": command.CommandGet, + "SET": command.CommandSet, + "DELETE": command.CommandDelete, + "PUBLISH": command.CommandPublish, + "SUBSCRIBE": command.CommandSubscribe, + "UNSUBSCRIBE": command.CommandUnsubscribe, +} + +type AbstractCommand struct { + cType command.CommandType + query string +} + +func (c *AbstractCommand) ToStoreCommand() command.StoreCommand { + return NewStoreCommand(c.cType, c.query) +} + +func (c *AbstractCommand) ToPubsubCommand() command.MessagingCommand { + return NewPubsubCommand(c.cType, c.query) +} + +func (c *AbstractCommand) IsStoreCommand() bool { + return c.cType == command.CommandSet || c.cType == command.CommandGet || c.cType == command.CommandDelete +} + +func (c *AbstractCommand) IsMessagingCommand() bool { + return c.cType == command.CommandPublish || c.cType == command.CommandSubscribe || c.cType == command.CommandUnsubscribe +} + +func determineCommandType(query string) (command.CommandType, bool) { + parts := strings.Fields(strings.TrimSpace(query)) + + if len(parts) < 2 { + return command.CommandGet, false + } + + t, ok := strToCommandType[strings.TrimSpace(strings.ToUpper((parts[1])))] + return t, ok +} + +func NewCommand(query string) (*AbstractCommand, error) { + commandType, ok := determineCommandType(query) + + if !ok { + return nil, domain.ErrUnableToProcessRequest + } + + return &AbstractCommand{ + cType: commandType, + query: query, + }, nil +} diff --git a/pkg/server/command/command_test.go b/pkg/command/command_test.go similarity index 70% rename from pkg/server/command/command_test.go rename to pkg/command/command_test.go index 1df2b80..c6e7ae0 100644 --- a/pkg/server/command/command_test.go +++ b/pkg/command/command_test.go @@ -1,6 +1,9 @@ package command -import "testing" +import ( + "famcache/domain/command" + "testing" +) func TestNewCommand_GET(t *testing.T) { com, ok := NewCommand("id GET key") @@ -16,19 +19,19 @@ func TestNewCommand_GET(t *testing.T) { t.Error("Expected a store command") } - if com.cType != CommandGet { + if com.cType != command.CommandGet { t.Error("Expected a GET command") } - if com.ToStoreCommand().Key != "key" { + if com.ToStoreCommand().Key() != "key" { t.Error("Expected key to be 'key'") } - if com.ToStoreCommand().Value != nil { + if com.ToStoreCommand().Value() != nil { t.Error("Expected value to be nil") } - if com.ToStoreCommand().ID != "id" { + if com.ToStoreCommand().ID() != "id" { t.Error("Expected id to be 'id'") } } @@ -47,23 +50,23 @@ func TestNewCommand_SET(t *testing.T) { t.Error("Expected a store command") } - if com.cType != CommandSet { + if com.cType != command.CommandSet { t.Error("Expected a SET command") } - if com.ToStoreCommand().Key != "key" { + if com.ToStoreCommand().Key() != "key" { t.Error("Expected key to be 'key'") } - if *com.ToStoreCommand().Value != "value" { + if *com.ToStoreCommand().Value() != "value" { t.Error("Expected value to be 'value'") } - if *com.ToStoreCommand().TTL != 0 { + if *com.ToStoreCommand().TTL() != 0 { t.Error("Expected ttl to be 0") } - if com.ToStoreCommand().ID != "id" { + if com.ToStoreCommand().ID() != "id" { t.Error("Expected id to be 'id'") } } @@ -82,19 +85,19 @@ func TestNewCommand_DELETE(t *testing.T) { t.Error("Expected a store command") } - if com.cType != CommandDelete { + if com.cType != command.CommandDelete { t.Error("Expected a DELETE command") } - if com.ToStoreCommand().Key != "key" { + if com.ToStoreCommand().Key() != "key" { t.Error("Expected key to be 'key'") } - if com.ToStoreCommand().Value != nil { + if com.ToStoreCommand().Value() != nil { t.Error("Expected value to be nil") } - if com.ToStoreCommand().ID != "id" { + if com.ToStoreCommand().ID() != "id" { t.Error("Expected id to be 'id'") } } @@ -109,19 +112,19 @@ func TestNewCommand_Publish(t *testing.T) { t.Error("Expected a command") } - if com.cType != CommandPublish { + if com.cType != command.CommandPublish { t.Error("Expected a PUBLISH command") } - if com.ToPubsubCommand().Topic != "channel" { + if com.ToPubsubCommand().Topic() != "channel" { t.Error("Expected channel to be 'channel'") } - if com.ToPubsubCommand().Data != "message" { + if com.ToPubsubCommand().Data() != "message" { t.Error("Expected message to be 'message'") } - if com.ToPubsubCommand().ID != "id" { + if com.ToPubsubCommand().ID() != "id" { t.Error("Expected id to be 'id'") } } @@ -136,19 +139,19 @@ func TestNewCommand_Subscribe(t *testing.T) { t.Error("Expected a command") } - if com.cType != CommandSubscribe { + if com.cType != command.CommandSubscribe { t.Error("Expected a SUBSCRIBE command") } - if com.ToPubsubCommand().Topic != "channel" { + if com.ToPubsubCommand().Topic() != "channel" { t.Error("Expected channel to be 'channel'") } - if com.ToPubsubCommand().Data != "" { + if com.ToPubsubCommand().Data() != "" { t.Error("Expected message to be empty") } - if com.ToPubsubCommand().ID != "id" { + if com.ToPubsubCommand().ID() != "id" { t.Error("Expected id to be 'id'") } } @@ -163,19 +166,19 @@ func TestNewCommand_Unsubscribe(t *testing.T) { t.Error("Expected a command") } - if com.cType != CommandUnsubscribe { + if com.cType != command.CommandUnsubscribe { t.Error("Expected a UNSUBSCRIBE command") } - if com.ToPubsubCommand().Topic != "channel" { + if com.ToPubsubCommand().Topic() != "channel" { t.Error("Expected channel to be 'channel'") } - if com.ToPubsubCommand().Data != "" { + if com.ToPubsubCommand().Data() != "" { t.Error("Expected message to be empty") } - if com.ToPubsubCommand().ID != "id" { + if com.ToPubsubCommand().ID() != "id" { t.Error("Expected id to be 'id'") } } diff --git a/pkg/command/generator.go b/pkg/command/generator.go new file mode 100644 index 0000000..960d8cd --- /dev/null +++ b/pkg/command/generator.go @@ -0,0 +1,30 @@ +package command + +import "fmt" + +func SetCommand(id, key, value string, ttl *uint64) string { + if ttl != nil { + return fmt.Sprintf("%s SET %s %s %d", id, key, value, *ttl) + } + return fmt.Sprintf("%s SET %s %s", id, key, value) +} + +func GetCommand(id, key string) string { + return fmt.Sprintf("%s GET %s", id, key) +} + +func DeleteCommand(id, key string) string { + return fmt.Sprintf("%s DELETE %s", id, key) +} + +func PublishCommand(id, topic, data string) string { + return fmt.Sprintf("%s PUBLISH %s %s", id, topic, data) +} + +func SubscribeCommand(id, topic string) string { + return fmt.Sprintf("%s SUBSCRIBE %s", id, topic) +} + +func UnsubscribeCommand(id, topic string) string { + return fmt.Sprintf("%s UNSUBSCRIBE %s", id, topic) +} diff --git a/pkg/command/generator_test.go b/pkg/command/generator_test.go new file mode 100644 index 0000000..f98bd60 --- /dev/null +++ b/pkg/command/generator_test.go @@ -0,0 +1,60 @@ +package command + +import "testing" + +func TestSetCommand(t *testing.T) { + str := SetCommand("1", "key", "value", nil) + + if str != "1 SET key value" { + t.Errorf("Expected command to be 1 SET key value, got %s", str) + } +} + +func TestSetCommand_WithTTL(t *testing.T) { + ttl := uint64(3000) + str := SetCommand("1", "key", "value", &ttl) + + if str != "1 SET key value 3000" { + t.Errorf("Expected command to be 1 SET key value 3000, got %s", str) + } +} + +func TestGetCommand(t *testing.T) { + str := GetCommand("1", "key") + + if str != "1 GET key" { + t.Errorf("Expected command to be 1 GET key, got %s", str) + } +} + +func TestDeleteCommand(t *testing.T) { + str := DeleteCommand("1", "key") + + if str != "1 DELETE key" { + t.Errorf("Expected command to be 1 DELETE key, got %s", str) + } +} + +func TestPublishCommand(t *testing.T) { + str := PublishCommand("1", "topic", "data") + + if str != "1 PUBLISH topic data" { + t.Errorf("Expected command to be 1 PUBLISH topic data, got %s", str) + } +} + +func TestSubscribeCommand(t *testing.T) { + str := SubscribeCommand("1", "topic") + + if str != "1 SUBSCRIBE topic" { + t.Errorf("Expected command to be 1 SUBSCRIBE topic, got %s", str) + } +} + +func TestUnsubscribeCommand(t *testing.T) { + str := UnsubscribeCommand("1", "topic") + + if str != "1 UNSUBSCRIBE topic" { + t.Errorf("Expected command to be 1 UNSUBSCRIBE topic, got %s", str) + } +} diff --git a/pkg/command/messaging_command.go b/pkg/command/messaging_command.go new file mode 100644 index 0000000..8210ea6 --- /dev/null +++ b/pkg/command/messaging_command.go @@ -0,0 +1,41 @@ +package command + +import ( + "famcache/domain/command" + "strings" +) + +type MessagingCommand struct { + id string + cType command.CommandType + topic string + data string +} + +func NewPubsubCommand(commandType command.CommandType, query string) command.MessagingCommand { + parts := strings.Fields(strings.TrimSpace(query)) + + if len(parts) < 3 { + return nil + } + + queryId := parts[0] + + topic := parts[2] + var data string + + if commandType == command.CommandPublish { + if len(parts) < 4 { + data = "" + } else { + data = parts[3] + } + } + + return &MessagingCommand{ + id: queryId, + topic: topic, + cType: commandType, + data: data, + } +} diff --git a/pkg/command/messaging_command_impl.go b/pkg/command/messaging_command_impl.go new file mode 100644 index 0000000..acf21d6 --- /dev/null +++ b/pkg/command/messaging_command_impl.go @@ -0,0 +1,19 @@ +package command + +import "famcache/domain/command" + +func (m *MessagingCommand) ID() string { + return m.id +} + +func (m *MessagingCommand) Type() command.CommandType { + return m.cType +} + +func (m *MessagingCommand) Topic() string { + return m.topic +} + +func (m *MessagingCommand) Data() string { + return m.data +} diff --git a/pkg/server/command/store_command.go b/pkg/command/store_command.go similarity index 51% rename from pkg/server/command/store_command.go rename to pkg/command/store_command.go index 1c00176..de2cde1 100644 --- a/pkg/server/command/store_command.go +++ b/pkg/command/store_command.go @@ -1,24 +1,25 @@ package command import ( + "famcache/domain/command" "strconv" "strings" ) type StoreCommand struct { // Common fields - Type CommandType - ID string + cType command.CommandType + id string // Get and Delete fields - Key string + key string // Set fields - Value *string - TTL *int64 + value *string + ttl *uint64 } -func NewStoreCommand(commandType CommandType, query string) *StoreCommand { +func NewStoreCommand(commandType command.CommandType, query string) command.StoreCommand { parts := strings.Fields(strings.TrimSpace(query)) if len(parts) < 3 { @@ -28,14 +29,14 @@ func NewStoreCommand(commandType CommandType, query string) *StoreCommand { queryId := parts[0] key := parts[2] - var ttl *int64 = nil + var ttl *uint64 = nil var value *string = nil - if commandType == CommandSet { + if commandType == command.CommandSet { value = &parts[3] if len(parts) == 5 { - convTtl, err := strconv.ParseInt(parts[4], 10, 64) + convTtl, err := strconv.ParseUint(parts[4], 10, 64) if err != nil { return nil } @@ -44,10 +45,10 @@ func NewStoreCommand(commandType CommandType, query string) *StoreCommand { } return &StoreCommand{ - Type: commandType, - ID: queryId, - Key: key, - Value: value, - TTL: ttl, + cType: commandType, + id: queryId, + key: key, + value: value, + ttl: ttl, } } diff --git a/pkg/command/store_command_impl.go b/pkg/command/store_command_impl.go new file mode 100644 index 0000000..51449c3 --- /dev/null +++ b/pkg/command/store_command_impl.go @@ -0,0 +1,21 @@ +package command + +func (sc *StoreCommand) ID() string { + return sc.id +} + +func (sc *StoreCommand) Type() string { + return sc.cType +} + +func (sc *StoreCommand) Key() string { + return sc.key +} + +func (sc *StoreCommand) Value() *string { + return sc.value +} + +func (sc *StoreCommand) TTL() *uint64 { + return sc.ttl +} diff --git a/pkg/server/command/store_command_reply.go b/pkg/command/store_command_reply.go similarity index 70% rename from pkg/server/command/store_command_reply.go rename to pkg/command/store_command_reply.go index 6117e5f..8afdfb4 100644 --- a/pkg/server/command/store_command_reply.go +++ b/pkg/command/store_command_reply.go @@ -3,19 +3,19 @@ package command import "net" func (q *StoreCommand) ReplyOK(conn net.Conn, response string) { - message := q.ID + " OK " + response + "\n" + message := q.ID() + " OK " + response + "\n" conn.Write([]byte(message)) } func (q *StoreCommand) ReplySuccess(conn net.Conn) { - message := q.ID + " OK" + "\n" + message := q.ID() + " OK" + "\n" conn.Write([]byte(message)) } func (q *StoreCommand) ReplyError(conn net.Conn, response string) { - message := q.ID + " ERROR: " + response + "\n" + message := q.ID() + " ERROR: " + response + "\n" conn.Write([]byte(message)) } diff --git a/pkg/command/store_command_test.go b/pkg/command/store_command_test.go new file mode 100644 index 0000000..d8103c9 --- /dev/null +++ b/pkg/command/store_command_test.go @@ -0,0 +1,101 @@ +package command + +import ( + "famcache/domain/command" + "testing" +) + +func TestNewCommand_Set(t *testing.T) { + query := "1 SET key value 3000" + q := NewStoreCommand(command.CommandSet, query) + + if q == nil { + t.Error("Query is nil") + } + + if q.ID() != "1" { + t.Errorf("Expected ID to be 1, got %s", q.ID()) + } + + if q.Type() != command.CommandSet { + t.Errorf("Expected type to be SET, got %s", q.Type()) + } + + if q.Key() != "key" { + t.Errorf("Expected key to be key, got %s", q.Key()) + } + + if q.Value() == nil { + t.Error("Value is nil") + } + + if *q.Value() != "value" { + t.Errorf("Expected value to be value, got %s", *q.Value()) + } + + if q.TTL() == nil { + t.Error("TTL is nil") + } + + if *q.TTL() != 3000 { + t.Errorf("Expected TTL to be 5, got %d", *q.TTL()) + } +} + +func TestNewCommand_Get(t *testing.T) { + query := "1 GET key" + q := NewStoreCommand(command.CommandGet, query) + + if q == nil { + t.Error("Query is nil") + } + + if q.ID() != "1" { + t.Errorf("Expected ID to be 1, got %s", q.ID()) + } + + if q.Type() != command.CommandGet { + t.Errorf("Expected type to be GET, got %s", q.Type()) + } + + if q.Key() != "key" { + t.Errorf("Expected key to be key, got %s", q.Key()) + } + + if q.Value() != nil { + t.Error("Value is not nil") + } + + if q.TTL() != nil { + t.Error("TTL is not nil") + } +} + +func TestNewCommand_Delete(t *testing.T) { + query := "1 DELETE key" + q := NewStoreCommand(command.CommandDelete, query) + + if q == nil { + t.Error("Query is nil") + } + + if q.ID() != "1" { + t.Errorf("Expected ID to be 1, got %s", q.ID()) + } + + if q.Type() != command.CommandDelete { + t.Errorf("Expected type to be DELETE, got %s", q.Type()) + } + + if q.Key() != "key" { + t.Errorf("Expected key to be key, got %s", q.Key()) + } + + if q.Value() != nil { + t.Error("Value is not nil") + } + + if q.TTL() != nil { + t.Error("TTL is not nil") + } +} diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go index 9b92780..c7f9c6d 100644 --- a/pkg/pubsub/message.go +++ b/pkg/pubsub/message.go @@ -1,21 +1,43 @@ package pubsub -import "famcache/domain/pubsub" +import ( + "famcache/domain/pubsub" + "time" + + "github.com/google/uuid" +) type PubsubMessage struct { - to string - topic string - data string + // UUID + id string + + // Peer ID + to string + + // Message topic + topic string + + // Message data + data string + + // Number of times the message has been retried retried uint + + // Message creation time + createdAt int64 } func NewPubsubMessage(to, topic, data string) pubsub.Message { var retried uint = 0 + createdAt := time.Now().Unix() + id := uuid.New().String() return &PubsubMessage{ + id, to, topic, data, retried, + createdAt, } } diff --git a/pkg/pubsub/message_impl.go b/pkg/pubsub/message_impl.go index 0001c6e..9c357aa 100644 --- a/pkg/pubsub/message_impl.go +++ b/pkg/pubsub/message_impl.go @@ -1,9 +1,41 @@ package pubsub -func (m *PubsubMessage) GetTopic() string { +import ( + "famcache/domain/command" + cmd "famcache/pkg/command" +) + +func (m *PubsubMessage) ID() string { + return m.id +} + +func (m *PubsubMessage) Topic() string { return m.topic } -func (m *PubsubMessage) GetData() string { +func (m *PubsubMessage) Data() string { return m.data } + +func (m *PubsubMessage) Recipient() string { + return m.to +} + +func (m *PubsubMessage) CreatedAt() int64 { + return m.createdAt +} + +func (m *PubsubMessage) RetryCount() uint { + return m.retried +} + +func (m *PubsubMessage) IncrementRetryCount() { + m.retried++ +} + +func (m *PubsubMessage) ToMessagingCommand() command.MessagingCommand { + query := cmd.PublishCommand("internal", m.Topic(), m.Data()) + + println("Query", query) + return cmd.NewPubsubCommand(command.CommandPublish, query) +} diff --git a/pkg/pubsub/message_test.go b/pkg/pubsub/message_test.go new file mode 100644 index 0000000..9b0079b --- /dev/null +++ b/pkg/pubsub/message_test.go @@ -0,0 +1,61 @@ +package pubsub + +import ( + "famcache/domain/command" + "testing" +) + +func TestNewPubsubMessage(t *testing.T) { + topic := "topic" + data := "data" + to := "to" + retried := uint(0) + + m := NewPubsubMessage(to, topic, data) + + if m.Topic() != topic { + t.Errorf("Expected topic to be %s, got %s", topic, m.Topic()) + } + + if m.Data() != data { + t.Errorf("Expected data to be %s, got %s", data, m.Data()) + } + + if m.Recipient() != to { + t.Errorf("Expected recipient to be %s, got %s", to, m.Recipient()) + } + + if m.RetryCount() != retried { + t.Errorf("Expected retry count to be %d, got %d", retried, m.RetryCount()) + } +} + +func TestPubsubMessage_IncrementRetryCount(t *testing.T) { + m := NewPubsubMessage("to", "topic", "data") + m.IncrementRetryCount() + + if m.RetryCount() != 1 { + t.Errorf("Expected retry count to be 1, got %d", m.RetryCount()) + } +} + +func TestPubsubMessage_ToMessagingCommand(t *testing.T) { + m := NewPubsubMessage("to", "topic", "data") + cmd := m.ToMessagingCommand() + + if cmd == nil { + t.Error("Command is nil") + } + + if cmd.Type() != command.CommandPublish { + t.Errorf("Expected type to be PUBLISH, got %s", cmd.Type()) + } + + if cmd.Data() != "data" { + t.Errorf("Expected data to be data, got %s", cmd.Data()) + } + + if cmd.Topic() != "topic" { + t.Errorf("Expected topic to be topic, got %s", cmd.Topic()) + } +} diff --git a/pkg/pubsub/queue_impl.go b/pkg/pubsub/queue_impl.go index bb8ffd2..237245d 100644 --- a/pkg/pubsub/queue_impl.go +++ b/pkg/pubsub/queue_impl.go @@ -1,7 +1,41 @@ package pubsub -import "famcache/domain/pubsub" +import ( + "famcache/domain/pubsub" + "time" +) -func (q *PubsubQueue) Retry(message pubsub.Message) { +func (q *PubsubQueue) Enqueue(message pubsub.Message) { q.messages = append(q.messages, message) } + +func (q *PubsubQueue) Batch() []pubsub.Message { + // Filter out messages that needs to be retried + // Message should be retried if + // - {currentTime} - message.createdAt >= message.retried * 5 seconds + + var messages []pubsub.Message = []pubsub.Message{} + + for _, message := range q.messages { + if message.RetryCount() == 0 { + messages = append(messages, message) + } + + elapsedTime := time.Now().Unix() - message.CreatedAt() + + if elapsedTime >= int64(message.RetryCount()*5) { + messages = append(messages, message) + } + } + + return messages +} + +func (q *PubsubQueue) Remove(messageId string) { + for i, message := range q.messages { + if message.ID() == messageId { + q.messages = append(q.messages[:i], q.messages[i+1:]...) + break + } + } +} diff --git a/pkg/server/command/command.go b/pkg/server/command/command.go deleted file mode 100644 index 3ef2701..0000000 --- a/pkg/server/command/command.go +++ /dev/null @@ -1,60 +0,0 @@ -package command - -import ( - "famcache/domain" - "strings" -) - -var strToCommandType = map[string]CommandType{ - "GET": CommandGet, - "SET": CommandSet, - "DELETE": CommandDelete, - "PUBLISH": CommandPublish, - "SUBSCRIBE": CommandSubscribe, - "UNSUBSCRIBE": CommandUnsubscribe, -} - -type AbstractCommand struct { - cType CommandType - query string -} - -func (c *AbstractCommand) ToStoreCommand() *StoreCommand { - return NewStoreCommand(c.cType, c.query) -} - -func (c *AbstractCommand) ToPubsubCommand() *MessagingCommand { - return NewPubsubCommand(c.cType, c.query) -} - -func (c *AbstractCommand) IsStoreCommand() bool { - return c.cType == CommandSet || c.cType == CommandGet || c.cType == CommandDelete -} - -func (c *AbstractCommand) IsMessagingCommand() bool { - return c.cType == CommandPublish || c.cType == CommandSubscribe || c.cType == CommandUnsubscribe -} - -func determineCommandType(query string) (CommandType, bool) { - parts := strings.Fields(strings.TrimSpace(query)) - - if len(parts) < 2 { - return CommandGet, false - } - - t, ok := strToCommandType[strings.TrimSpace(strings.ToUpper((parts[1])))] - return t, ok -} - -func NewCommand(query string) (*AbstractCommand, error) { - commandType, ok := determineCommandType(query) - - if !ok { - return nil, domain.ErrUnableToProcessRequest - } - - return &AbstractCommand{ - cType: commandType, - query: query, - }, nil -} diff --git a/pkg/server/command/pub_sub_command.go b/pkg/server/command/pub_sub_command.go deleted file mode 100644 index 0cbc394..0000000 --- a/pkg/server/command/pub_sub_command.go +++ /dev/null @@ -1,38 +0,0 @@ -package command - -import "strings" - -type MessagingCommand struct { - ID string - Type CommandType - Topic string - Data string -} - -func NewPubsubCommand(commandType CommandType, query string) *MessagingCommand { - parts := strings.Fields(strings.TrimSpace(query)) - - if len(parts) < 3 { - return nil - } - - queryId := parts[0] - - topic := parts[2] - var data string - - if commandType == CommandPublish { - if len(parts) < 4 { - data = "" - } else { - data = parts[3] - } - } - - return &MessagingCommand{ - ID: queryId, - Topic: topic, - Type: commandType, - Data: data, - } -} diff --git a/pkg/server/command/store_command_test.go b/pkg/server/command/store_command_test.go deleted file mode 100644 index 2700202..0000000 --- a/pkg/server/command/store_command_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package command - -import ( - "testing" -) - -func TestNewCommand_Set(t *testing.T) { - query := "1 SET key value 3000" - q := NewStoreCommand(CommandSet, query) - - if q == nil { - t.Error("Query is nil") - } - - if q.ID != "1" { - t.Errorf("Expected ID to be 1, got %s", q.ID) - } - - if q.Type != CommandSet { - t.Errorf("Expected type to be SET, got %s", q.Type) - } - - if q.Key != "key" { - t.Errorf("Expected key to be key, got %s", q.Key) - } - - if q.Value == nil { - t.Error("Value is nil") - } - - if *q.Value != "value" { - t.Errorf("Expected value to be value, got %s", *q.Value) - } - - if q.TTL == nil { - t.Error("TTL is nil") - } - - if *q.TTL != 3000 { - t.Errorf("Expected TTL to be 5, got %d", *q.TTL) - } -} - -func TestNewCommand_Get(t *testing.T) { - query := "1 GET key" - q := NewStoreCommand(CommandGet, query) - - if q == nil { - t.Error("Query is nil") - } - - if q.ID != "1" { - t.Errorf("Expected ID to be 1, got %s", q.ID) - } - - if q.Type != CommandGet { - t.Errorf("Expected type to be GET, got %s", q.Type) - } - - if q.Key != "key" { - t.Errorf("Expected key to be key, got %s", q.Key) - } - - if q.Value != nil { - t.Error("Value is not nil") - } - - if q.TTL != nil { - t.Error("TTL is not nil") - } -} - -func TestNewCommand_Delete(t *testing.T) { - query := "1 DELETE key" - q := NewStoreCommand(CommandDelete, query) - - if q == nil { - t.Error("Query is nil") - } - - if q.ID != "1" { - t.Errorf("Expected ID to be 1, got %s", q.ID) - } - - if q.Type != CommandDelete { - t.Errorf("Expected type to be DELETE, got %s", q.Type) - } - - if q.Key != "key" { - t.Errorf("Expected key to be key, got %s", q.Key) - } - - if q.Value != nil { - t.Error("Value is not nil") - } - - if q.TTL != nil { - t.Error("TTL is not nil") - } -} diff --git a/pkg/server/handler.go b/pkg/server/handler.go index 9d44588..71d01f2 100644 --- a/pkg/server/handler.go +++ b/pkg/server/handler.go @@ -3,7 +3,7 @@ package server import ( "bufio" "famcache/domain/connection" - "famcache/pkg/server/command" + "famcache/pkg/command" "io" ) diff --git a/pkg/server/messaging_command_handler.go b/pkg/server/messaging_command_handler.go index 0aabbba..094025d 100644 --- a/pkg/server/messaging_command_handler.go +++ b/pkg/server/messaging_command_handler.go @@ -1,12 +1,12 @@ package server import ( + "famcache/domain/command" "famcache/domain/connection" - "famcache/pkg/server/command" ) -func (s *Server) handleMessagingCommand(peer connection.Peer, com *command.MessagingCommand) { - switch com.Type { +func (s *Server) handleMessagingCommand(peer connection.Peer, com command.MessagingCommand) { + switch com.Type() { case command.CommandPublish: s.actor.Publish(peer, com) case command.CommandSubscribe: diff --git a/pkg/server/server.go b/pkg/server/server.go index 6f85b2f..2e13802 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -4,7 +4,7 @@ import ( "famcache/domain/cache" "famcache/domain/logger" "famcache/domain/server" - "famcache/pkg/server/actor" + "famcache/pkg/actor" "net" ) diff --git a/pkg/server/store_command_handler.go b/pkg/server/store_command_handler.go index 587e86e..8f61fa1 100644 --- a/pkg/server/store_command_handler.go +++ b/pkg/server/store_command_handler.go @@ -1,12 +1,12 @@ package server import ( + "famcache/domain/command" "famcache/domain/connection" - "famcache/pkg/server/command" ) -func (s *Server) handleStoreCommand(peer connection.Peer, com *command.StoreCommand) { - switch com.Type { +func (s *Server) handleStoreCommand(peer connection.Peer, com command.StoreCommand) { + switch com.Type() { case command.CommandGet: s.actor.Get(peer.Conn(), com) case command.CommandSet: