-
Notifications
You must be signed in to change notification settings - Fork 2
/
record.go
155 lines (129 loc) · 2.87 KB
/
record.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package turbine
import (
"encoding/json"
"strings"
"time"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
type Records struct {
Stream string
records []Record
}
func NewRecords(rr []Record) Records {
return Records{records: rr}
}
func GetRecords(r Records) []Record {
return r.records
}
type Record struct {
Key string
Payload Payload
Timestamp time.Time
}
// JSONSchema returns true if the record is formatted with JSON Schema, false otherwise
func (r Record) JSONSchema() bool {
p, err := r.Payload.Map()
if err != nil {
return false
}
if _, ok := p["schema"]; ok {
if _, ok := p["payload"]; ok {
return true
}
}
return false
}
// OpenCDC returns true if the record is formatted with OpenCDC schema, false otherwise
func (r Record) OpenCDC() bool {
p, err := r.Payload.Map()
if err != nil {
return false
}
if _, ok := p["schema"]; ok {
if payload, ok := p["payload"]; ok {
if _, ok := payload.(map[string]interface{})["after"]; ok {
return true
}
}
}
return false
}
type Payload []byte
func (p Payload) Map() (map[string]interface{}, error) {
var m map[string]interface{}
err := json.Unmarshal(p, &m)
return m, err
}
func (p Payload) Get(path string) interface{} {
nestedPath := strings.Join([]string{"payload", path}, ".")
return gjson.Get(string(p), nestedPath).Value()
}
// TODO: Add GetType(path string) to tell you what the data type is.
// TODO: Should we passthrough the gjson helper methods?
type schemaField struct {
Field string `json:"field"`
Optional bool `json:"optional"`
Type string `json:"type"`
}
func (p *Payload) Set(path string, value interface{}) error {
nestedPath := strings.Join([]string{"payload", path}, ".")
fieldExists := gjson.Get(string(*p), nestedPath).Exists()
// update payload
val, err := sjson.Set(string(*p), nestedPath, value)
if err != nil {
return err
}
*p = []byte(val)
// Add schema field if field is new
if !fieldExists {
fieldType := mapGoToKCDataTypes(val)
field := schemaField{
Field: path,
Optional: true,
Type: fieldType,
}
schemaNestedPath := strings.Join([]string{"schema", "fields.-1"}, ".")
sval, err := sjson.Set(string(*p), schemaNestedPath, field)
if err != nil {
return err
}
*p = []byte(sval)
}
return nil
}
func (p *Payload) Delete(path string) error {
val, err := sjson.Delete(string(*p), path)
if err != nil {
return err
}
*p = []byte(val)
return nil
}
type RecordWithError struct {
Error error
Record
}
// map Go types to Apache Kafka Connect data types
func mapGoToKCDataTypes(v interface{}) string {
switch v.(type) {
case string:
return "string"
case int8:
return "int8"
case int16:
return "int16"
case int, int32:
return "int32"
case int64:
return "int64"
case float32:
return "float32"
case float64:
return "float64"
case bool:
return "boolean"
default:
return "unsupported"
}
}