Add a WithNoSync() option
This commit is contained in:
джерело
485524b5e9
коміт
adc8e19aff
|
@ -227,7 +227,7 @@ func NewMessageBus(opts ...Option) (*MessageBus, error) {
|
|||
|
||||
var metrics *Metrics
|
||||
|
||||
if options.WithMetrics {
|
||||
if options.Metrics {
|
||||
metrics = NewMetrics("msgbus")
|
||||
|
||||
ctime := time.Now()
|
||||
|
|
|
@ -492,19 +492,37 @@ func TestMsgBusMetrics(t *testing.T) {
|
|||
func BenchmarkMessageBusPut(b *testing.B) {
|
||||
require := require.New(b)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
|
||||
require.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
b.Run("Sync", func(b *testing.B) {
|
||||
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
|
||||
require.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
mb, err := NewMessageBus(WithLogPath(testdir))
|
||||
require.NoError(err)
|
||||
mb, err := NewMessageBus(WithLogPath(testdir))
|
||||
require.NoError(err)
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
}
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("NoSync", func(b *testing.B) {
|
||||
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
|
||||
require.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
mb, err := NewMessageBus(WithLogPath(testdir), WithNoSync(true))
|
||||
require.NoError(err)
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkMessageBusGet(b *testing.B) {
|
||||
|
@ -548,20 +566,40 @@ func BenchmarkMessageBusGetEmpty(b *testing.B) {
|
|||
func BenchmarkMessageBusPutGet(b *testing.B) {
|
||||
require := require.New(b)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
|
||||
require.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
b.Run("Sync", func(b *testing.B) {
|
||||
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
|
||||
require.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
mb, err := NewMessageBus(WithLogPath(testdir))
|
||||
require.NoError(err)
|
||||
mb, err := NewMessageBus(WithLogPath(testdir))
|
||||
require.NoError(err)
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
mb.Get(topic)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("NoSync", func(b *testing.B) {
|
||||
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
|
||||
require.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
mb, err := NewMessageBus(WithLogPath(testdir), WithNoSync(true))
|
||||
require.NoError(err)
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
mb.Get(topic)
|
||||
}
|
||||
})
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
mb.Get(topic)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
|
|
37
options.go
37
options.go
|
@ -1,5 +1,7 @@
|
|||
package msgbus
|
||||
|
||||
import "github.com/tidwall/wal"
|
||||
|
||||
const (
|
||||
// DefaultBind is the default bind address
|
||||
DefaultBind = ":8000"
|
||||
|
@ -16,25 +18,37 @@ const (
|
|||
// DefaultBufferLength is the default buffer length for subscriber chans
|
||||
DefaultBufferLength = 256
|
||||
|
||||
// DefaultWithMetrics is the default for whether to enable metrics
|
||||
DefaultWithMetrics = false
|
||||
// DefaultMetrics is the default for whether to enable metrics
|
||||
DefaultMetrics = false
|
||||
|
||||
// DefaultNoSync is the default for whether to disable faync after writing
|
||||
// messages to the write-ahead-log (wal) files. The default is `false` which
|
||||
// is safer and will prevent corruption in event of crahses or power failure,
|
||||
// but is slower.
|
||||
DefaultNoSync = false
|
||||
)
|
||||
|
||||
var DefaultOptions = &Options{
|
||||
LogPath: DefaultLogPath,
|
||||
LogPath: DefaultLogPath,
|
||||
|
||||
BufferLength: DefaultBufferLength,
|
||||
MaxQueueSize: DefaultMaxQueueSize,
|
||||
MaxPayloadSize: DefaultMaxPayloadSize,
|
||||
WithMetrics: DefaultWithMetrics,
|
||||
|
||||
Metrics: DefaultMetrics,
|
||||
NoSync: DefaultNoSync,
|
||||
}
|
||||
|
||||
// Options ...
|
||||
type Options struct {
|
||||
LogPath string
|
||||
LogPath string
|
||||
|
||||
BufferLength int
|
||||
MaxQueueSize int
|
||||
MaxPayloadSize int
|
||||
WithMetrics bool
|
||||
|
||||
Metrics bool
|
||||
NoSync bool
|
||||
}
|
||||
|
||||
type Option func(opts *Options) error
|
||||
|
@ -67,9 +81,16 @@ func WithMaxPayloadSize(maxPayloadSize int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithMetrics(withMetrics bool) Option {
|
||||
func WithMetrics(metrics bool) Option {
|
||||
return func(opts *Options) error {
|
||||
opts.WithMetrics = withMetrics
|
||||
opts.Metrics = metrics
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithNoSync(noSync bool) Option {
|
||||
return func(opts *Options) error {
|
||||
wal.DefaultOptions.NoSync = noSync
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
Завантаження…
Посилання в новій задачі