Skip to content

Commit

Permalink
Merge pull request #4 from Famcache/feature/1.2.2-retry
Browse files Browse the repository at this point in the history
Feature/1.2.2 retry
  • Loading branch information
shahen94 authored May 31, 2024
2 parents e0dc85a + 5a9b10c commit be77b43
Show file tree
Hide file tree
Showing 38 changed files with 645 additions and 292 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.2
1.2.3
2 changes: 1 addition & 1 deletion domain/cache/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ package cache
type SetOptions struct {
Key string
Value string
TTL *int64
TTL *uint64
}
8 changes: 8 additions & 0 deletions domain/command/messaging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package command

type MessagingCommand interface {
ID() string
Type() CommandType
Topic() string
Data() string
}
15 changes: 15 additions & 0 deletions domain/command/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package command

import "net"

type StoreCommand interface {
ID() string
Type() CommandType
Key() string
Value() *string
TTL() *uint64

ReplyError(conn net.Conn, response string)
ReplySuccess(conn net.Conn)
ReplyOK(conn net.Conn, response string)
}
File renamed without changes.
12 changes: 10 additions & 2 deletions domain/pubsub/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package pubsub

import "famcache/domain/command"

type Message interface {
GetTopic() string
GetData() string
ID() string
Topic() string
Data() string
Recipient() string
CreatedAt() int64
RetryCount() uint
IncrementRetryCount()
ToMessagingCommand() command.MessagingCommand
}
4 changes: 3 additions & 1 deletion domain/pubsub/queue.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package pubsub

type Queue interface {
Retry(message Message)
Enqueue(message Message)
Remove(messageId string)
Batch() []Message
}
4 changes: 4 additions & 0 deletions pkg/server/actor/root.go → pkg/actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"famcache/domain/pubsub"
conn "famcache/pkg/connection"
queue "famcache/pkg/pubsub"
"time"
)

type Actor struct {
logger *logger.Logger
cache *cache.Cache
messagingQueue pubsub.Queue
peers connection.PeersManager
queueTicker time.Ticker
}

func (a *Actor) Peers() *connection.PeersManager {
Expand All @@ -23,11 +25,13 @@ func (a *Actor) Peers() *connection.PeersManager {
func NewActor(logger *logger.Logger, cache *cache.Cache) Actor {
messagingQueue := queue.NewPubsubQueue()
peers := conn.NewPeersManager()
queueTicker := *time.NewTicker(1 * time.Second)

return Actor{
logger,
cache,
messagingQueue,
peers,
queueTicker,
}
}
24 changes: 24 additions & 0 deletions pkg/actor/actor_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package actor

func (actor *Actor) ListenFailedMessages() {
for {
<-actor.queueTicker.C

for _, message := range actor.messagingQueue.Batch() {
peer := actor.peers.GetById(message.Recipient())

if peer == nil {
actor.messagingQueue.Remove(message.ID())
continue
}

err := actor.Publish(peer, message.ToMessagingCommand())

if err == nil {
actor.messagingQueue.Remove(message.ID())
} else {
message.IncrementRetryCount()
}
}
}
}
8 changes: 4 additions & 4 deletions pkg/server/actor/delete.go → pkg/actor/delete.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package actor

import (
"famcache/pkg/server/command"
"famcache/domain/command"
"net"
)

func (actor *Actor) Delete(conn net.Conn, query *command.StoreCommand) {
func (actor *Actor) Delete(conn net.Conn, query command.StoreCommand) {
sCache := *actor.cache
logger := *actor.logger

err := sCache.Delete(query.Key)
err := sCache.Delete(query.Key())

if err != nil {
logger.Error("Error deleting key")
Expand All @@ -19,7 +19,7 @@ func (actor *Actor) Delete(conn net.Conn, query *command.StoreCommand) {
return
}

logger.Info("DELETE", query.Key)
logger.Info("DELETE", query.Key())

query.ReplySuccess(conn)
}
8 changes: 4 additions & 4 deletions pkg/server/actor/get.go → pkg/actor/get.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package actor

import (
"famcache/pkg/server/command"
"famcache/domain/command"
"net"
)

func (actor *Actor) Get(conn net.Conn, query *command.StoreCommand) {
func (actor *Actor) Get(conn net.Conn, query command.StoreCommand) {
sCache := *actor.cache
logger := *actor.logger

value, err := sCache.Get(query.Key)
value, err := sCache.Get(query.Key())

if err != nil {
logger.Error("Error getting key")

query.ReplyError(conn, err.Error())
}

logger.Info("GET", query.Key, value)
logger.Info("GET", query.Key(), value)

query.ReplyOK(conn, value)
}
14 changes: 9 additions & 5 deletions pkg/server/actor/publish.go → pkg/actor/publish.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
package actor

import (
"famcache/domain/command"
"famcache/domain/connection"
"famcache/pkg/pubsub"
"famcache/pkg/server/command"
)

func (actor *Actor) Publish(peer connection.Peer, message *command.MessagingCommand) error {
func (actor *Actor) Publish(peer connection.Peer, message command.MessagingCommand) error {
logger := *actor.logger

logger.Info("Peer " + peer.ID() + " published topic " + message.Topic)
logger.Info("Peer " + peer.ID() + " published topic " + message.Topic())

for _, p := range *actor.peers.All() {
if p.ID() == peer.ID() {
continue
}

if !p.Subscriptions().IsSubscribed(message.Topic()) {
continue
}

// Try to publish the message to the peer immediately
// If the peer is not available, keep the message in the queue
// and retry later
err := p.Publish(message.Topic, message.Data)
err := p.Publish(message.Topic(), message.Data())

if err != nil {
// TODO: Retry logic
actor.messagingQueue.Retry(pubsub.NewPubsubMessage(peer.ID(), message.Topic, message.Data))
actor.messagingQueue.Enqueue(pubsub.NewPubsubMessage(peer.ID(), message.Topic(), message.Data()))
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/server/actor/set.go → pkg/actor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package actor

import (
"famcache/domain/cache"
"famcache/pkg/server/command"
"famcache/domain/command"
"net"
)

func (actor *Actor) Set(conn net.Conn, query *command.StoreCommand) {
func (actor *Actor) Set(conn net.Conn, query command.StoreCommand) {
sCache := *actor.cache
logger := *actor.logger

err := sCache.Set(cache.SetOptions{
Key: query.Key,
Value: *query.Value,
TTL: query.TTL,
Key: query.Key(),
Value: *query.Value(),
TTL: query.TTL(),
})

if err != nil {
Expand All @@ -24,7 +24,7 @@ func (actor *Actor) Set(conn net.Conn, query *command.StoreCommand) {
return
}

logger.Info("SET", query.Key, *query.Value)
logger.Info("SET", query.Key(), *query.Value())

query.ReplySuccess(conn)
}
8 changes: 4 additions & 4 deletions pkg/server/actor/subscribe.go → pkg/actor/subscribe.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package actor

import (
"famcache/domain/command"
"famcache/domain/connection"
"famcache/pkg/server/command"
)

func (actor *Actor) Subscribe(peer connection.Peer, query *command.MessagingCommand) {
func (actor *Actor) Subscribe(peer connection.Peer, query command.MessagingCommand) {
logger := *actor.logger

logger.Info("Peer" + peer.ID() + " subscribed to the topic: " + query.Topic)
logger.Info("Peer" + peer.ID() + " subscribed to the topic: " + query.Topic())

sub := peer.Subscriptions()

sub.Add(query.Topic)
sub.Add(query.Topic())
}
8 changes: 4 additions & 4 deletions pkg/server/actor/unsubscribe.go → pkg/actor/unsubscribe.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package actor

import (
"famcache/domain/command"
"famcache/domain/connection"
"famcache/pkg/server/command"
)

func (actor *Actor) Unsubscribe(peer connection.Peer, query *command.MessagingCommand) {
func (actor *Actor) Unsubscribe(peer connection.Peer, query command.MessagingCommand) {
logger := *actor.logger

logger.Info("Peer" + peer.ID() + " unsubscribed from topic " + query.Topic)
logger.Info("Peer" + peer.ID() + " unsubscribed from topic " + query.Topic())

sub := peer.Subscriptions()

sub.Remove(query.Topic)
sub.Remove(query.Topic())
}
4 changes: 2 additions & 2 deletions pkg/cache/stored_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (

type StoredValue struct {
Value string
TTL *int64
TTL *uint64
CreatedAt int64
}

func NewStoredValue(value string, ttl *int64) (*StoredValue, error) {
func NewStoredValue(value string, ttl *uint64) (*StoredValue, error) {
return &StoredValue{
Value: value,
TTL: ttl,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (c *Cache) CleanupExpired() {
elapsed := time.Now().Unix() - storedValue.CreatedAt

// Check if the stored value has expired
if elapsed >= *storedValue.TTL/1000 {
if elapsed >= int64(*storedValue.TTL)/1000 {
delete(c.storage, key)
}
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/command/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package command

import (
"famcache/domain"
"famcache/domain/command"
"strings"
)

var strToCommandType = map[string]command.CommandType{
"GET": command.CommandGet,
"SET": command.CommandSet,
"DELETE": command.CommandDelete,
"PUBLISH": command.CommandPublish,
"SUBSCRIBE": command.CommandSubscribe,
"UNSUBSCRIBE": command.CommandUnsubscribe,
}

type AbstractCommand struct {
cType command.CommandType
query string
}

func (c *AbstractCommand) ToStoreCommand() command.StoreCommand {
return NewStoreCommand(c.cType, c.query)
}

func (c *AbstractCommand) ToPubsubCommand() command.MessagingCommand {
return NewPubsubCommand(c.cType, c.query)
}

func (c *AbstractCommand) IsStoreCommand() bool {
return c.cType == command.CommandSet || c.cType == command.CommandGet || c.cType == command.CommandDelete
}

func (c *AbstractCommand) IsMessagingCommand() bool {
return c.cType == command.CommandPublish || c.cType == command.CommandSubscribe || c.cType == command.CommandUnsubscribe
}

func determineCommandType(query string) (command.CommandType, bool) {
parts := strings.Fields(strings.TrimSpace(query))

if len(parts) < 2 {
return command.CommandGet, false
}

t, ok := strToCommandType[strings.TrimSpace(strings.ToUpper((parts[1])))]
return t, ok
}

func NewCommand(query string) (*AbstractCommand, error) {
commandType, ok := determineCommandType(query)

if !ok {
return nil, domain.ErrUnableToProcessRequest
}

return &AbstractCommand{
cType: commandType,
query: query,
}, nil
}
Loading

0 comments on commit be77b43

Please sign in to comment.