1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
|
package kv
import (
"io"
"sync"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
"codeberg.org/gruf/go-store/util"
)
// KVStore is a very simple, yet performant key-value store
type KVStore struct {
mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
mutex sync.RWMutex // mutex is the total store mutex
storage storage.Storage // storage is the underlying storage
}
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
// Attempt to open disk storage
storage, err := storage.OpenFile(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) {
// Attempt to open block storage
storage, err := storage.OpenBlock(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Perform initial storage clean
err := storage.Clean()
if err != nil {
return nil, err
}
// Return new KVStore
return &KVStore{
mutexMap: mutexes.NewMap(mutexes.NewRW),
mutex: sync.RWMutex{},
storage: storage,
}, nil
}
// Get fetches the bytes for supplied key in the store
func (st *KVStore) Get(key string) ([]byte, error) {
// Acquire store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Pass to unprotected fn
return st.get(key)
}
func (st *KVStore) get(key string) ([]byte, error) {
// Acquire read lock for key
runlock := st.mutexMap.RLock(key)
defer runlock()
// Read file bytes
return st.storage.ReadBytes(key)
}
// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
// Acquire store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Pass to unprotected fn
return st.getStream(key)
}
func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
// Acquire read lock for key
runlock := st.mutexMap.RLock(key)
// Attempt to open stream for read
rd, err := st.storage.ReadStream(key)
if err != nil {
runlock()
return nil, err
}
// Wrap readcloser in our own callback closer
return util.ReadCloserWithCallback(rd, runlock), nil
}
// Put places the bytes at the supplied key location in the store
func (st *KVStore) Put(key string, value []byte) error {
// Acquire store write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Pass to unprotected fn
return st.put(key, value)
}
func (st *KVStore) put(key string, value []byte) error {
// Acquire write lock for key
unlock := st.mutexMap.Lock(key)
defer unlock()
// Write file bytes
return st.storage.WriteBytes(key, value)
}
// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
func (st *KVStore) PutStream(key string, r io.Reader) error {
// Acquire store write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Pass to unprotected fn
return st.putStream(key, r)
}
func (st *KVStore) putStream(key string, r io.Reader) error {
// Acquire write lock for key
unlock := st.mutexMap.Lock(key)
defer unlock()
// Write file stream
return st.storage.WriteStream(key, r)
}
// Has checks whether the supplied key exists in the store
func (st *KVStore) Has(key string) (bool, error) {
// Acquire store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Pass to unprotected fn
return st.has(key)
}
func (st *KVStore) has(key string) (bool, error) {
// Acquire read lock for key
runlock := st.mutexMap.RLock(key)
defer runlock()
// Stat file on disk
return st.storage.Stat(key)
}
// Delete removes the supplied key-value pair from the store
func (st *KVStore) Delete(key string) error {
// Acquire store write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Pass to unprotected fn
return st.delete(key)
}
func (st *KVStore) delete(key string) error {
// Acquire write lock for key
unlock := st.mutexMap.Lock(key)
defer unlock()
// Remove file from disk
return st.storage.Remove(key)
}
// Iterator returns an Iterator for key-value pairs in the store, using supplied match function
func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
// If no function, match all
if matchFn == nil {
matchFn = func(string) bool { return true }
}
// Get store read lock
st.mutex.RLock()
// Setup the walk keys function
entries := []storage.StorageEntry{}
walkFn := func(entry storage.StorageEntry) {
// Ignore unmatched entries
if !matchFn(entry.Key()) {
return
}
// Add to entries
entries = append(entries, entry)
}
// Walk keys in the storage
err := st.storage.WalkKeys(&storage.WalkKeysOptions{WalkFn: walkFn})
if err != nil {
st.mutex.RUnlock()
return nil, err
}
// Return new iterator
return &KVIterator{
store: st,
entries: entries,
index: -1,
key: "",
onClose: st.mutex.RUnlock,
}, nil
}
// Read provides a read-only window to the store, holding it in a read-locked state until
// the supplied function returns
func (st *KVStore) Read(do func(*StateRO)) {
// Get store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Create new store state (defer close)
state := &StateRO{store: st}
defer state.close()
// Pass state
do(state)
}
// Update provides a read-write window to the store, holding it in a read-write-locked state
// until the supplied functions returns
func (st *KVStore) Update(do func(*StateRW)) {
// Get store lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Create new store state (defer close)
state := &StateRW{store: st}
defer state.close()
// Pass state
do(state)
}
|