summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store/kv/store.go
blob: d25b3fb040ca871510873f2e1ba39bcb857d370c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package kv

import (
	"io"
	"sync"

	"codeberg.org/gruf/go-mutexes"
	"codeberg.org/gruf/go-store/storage"
	"codeberg.org/gruf/go-store/util"
)

// KVStore is a very simple, yet performant key-value store
type KVStore struct {
	mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
	mutex    sync.RWMutex     // mutex is the total store mutex
	storage  storage.Storage  // storage is the underlying storage
}

func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
	// Attempt to open disk storage
	storage, err := storage.OpenFile(path, cfg)
	if err != nil {
		return nil, err
	}

	// Return new KVStore
	return OpenStorage(storage)
}

func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) {
	// Attempt to open block storage
	storage, err := storage.OpenBlock(path, cfg)
	if err != nil {
		return nil, err
	}

	// Return new KVStore
	return OpenStorage(storage)
}

func OpenStorage(storage storage.Storage) (*KVStore, error) {
	// Perform initial storage clean
	err := storage.Clean()
	if err != nil {
		return nil, err
	}

	// Return new KVStore
	return &KVStore{
		mutexMap: mutexes.NewMap(mutexes.NewRW),
		mutex:    sync.RWMutex{},
		storage:  storage,
	}, nil
}

// Get fetches the bytes for supplied key in the store
func (st *KVStore) Get(key string) ([]byte, error) {
	// Acquire store read lock
	st.mutex.RLock()
	defer st.mutex.RUnlock()

	// Pass to unprotected fn
	return st.get(key)
}

func (st *KVStore) get(key string) ([]byte, error) {
	// Acquire read lock for key
	runlock := st.mutexMap.RLock(key)
	defer runlock()

	// Read file bytes
	return st.storage.ReadBytes(key)
}

// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
	// Acquire store read lock
	st.mutex.RLock()
	defer st.mutex.RUnlock()

	// Pass to unprotected fn
	return st.getStream(key)
}

func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
	// Acquire read lock for key
	runlock := st.mutexMap.RLock(key)

	// Attempt to open stream for read
	rd, err := st.storage.ReadStream(key)
	if err != nil {
		runlock()
		return nil, err
	}

	// Wrap readcloser in our own callback closer
	return util.ReadCloserWithCallback(rd, runlock), nil
}

// Put places the bytes at the supplied key location in the store
func (st *KVStore) Put(key string, value []byte) error {
	// Acquire store write lock
	st.mutex.Lock()
	defer st.mutex.Unlock()

	// Pass to unprotected fn
	return st.put(key, value)
}

func (st *KVStore) put(key string, value []byte) error {
	// Acquire write lock for key
	unlock := st.mutexMap.Lock(key)
	defer unlock()

	// Write file bytes
	return st.storage.WriteBytes(key, value)
}

// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
func (st *KVStore) PutStream(key string, r io.Reader) error {
	// Acquire store write lock
	st.mutex.Lock()
	defer st.mutex.Unlock()

	// Pass to unprotected fn
	return st.putStream(key, r)
}

func (st *KVStore) putStream(key string, r io.Reader) error {
	// Acquire write lock for key
	unlock := st.mutexMap.Lock(key)
	defer unlock()

	// Write file stream
	return st.storage.WriteStream(key, r)
}

// Has checks whether the supplied key exists in the store
func (st *KVStore) Has(key string) (bool, error) {
	// Acquire store read lock
	st.mutex.RLock()
	defer st.mutex.RUnlock()

	// Pass to unprotected fn
	return st.has(key)
}

func (st *KVStore) has(key string) (bool, error) {
	// Acquire read lock for key
	runlock := st.mutexMap.RLock(key)
	defer runlock()

	// Stat file on disk
	return st.storage.Stat(key)
}

// Delete removes the supplied key-value pair from the store
func (st *KVStore) Delete(key string) error {
	// Acquire store write lock
	st.mutex.Lock()
	defer st.mutex.Unlock()

	// Pass to unprotected fn
	return st.delete(key)
}

func (st *KVStore) delete(key string) error {
	// Acquire write lock for key
	unlock := st.mutexMap.Lock(key)
	defer unlock()

	// Remove file from disk
	return st.storage.Remove(key)
}

// Iterator returns an Iterator for key-value pairs in the store, using supplied match function
func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
	// If no function, match all
	if matchFn == nil {
		matchFn = func(string) bool { return true }
	}

	// Get store read lock
	st.mutex.RLock()

	// Setup the walk keys function
	entries := []storage.StorageEntry{}
	walkFn := func(entry storage.StorageEntry) {
		// Ignore unmatched entries
		if !matchFn(entry.Key()) {
			return
		}

		// Add to entries
		entries = append(entries, entry)
	}

	// Walk keys in the storage
	err := st.storage.WalkKeys(&storage.WalkKeysOptions{WalkFn: walkFn})
	if err != nil {
		st.mutex.RUnlock()
		return nil, err
	}

	// Return new iterator
	return &KVIterator{
		store:   st,
		entries: entries,
		index:   -1,
		key:     "",
		onClose: st.mutex.RUnlock,
	}, nil
}

// Read provides a read-only window to the store, holding it in a read-locked state until
// the supplied function returns
func (st *KVStore) Read(do func(*StateRO)) {
	// Get store read lock
	st.mutex.RLock()
	defer st.mutex.RUnlock()

	// Create new store state (defer close)
	state := &StateRO{store: st}
	defer state.close()

	// Pass state
	do(state)
}

// Update provides a read-write window to the store, holding it in a read-write-locked state
// until the supplied functions returns
func (st *KVStore) Update(do func(*StateRW)) {
	// Get store lock
	st.mutex.Lock()
	defer st.mutex.Unlock()

	// Create new store state (defer close)
	state := &StateRW{store: st}
	defer state.close()

	// Pass state
	do(state)
}