spogulis no https://git.mills.io/prologic/msgbus.git
Fix some other minor bugs found
This commit is contained in:
vecāks
284567a3b6
revīzija
c9428080e6
|
@ -55,12 +55,12 @@ func init() {
|
|||
RootCmd.AddCommand(subCmd)
|
||||
|
||||
subCmd.Flags().IntP(
|
||||
"index", "i", -1,
|
||||
"position in the topic's sequence to start subscribing from (-1 indicates end)",
|
||||
"index", "i", 0,
|
||||
"position in the topic's sequence to start subscribing from (0 indicates head)",
|
||||
)
|
||||
|
||||
viper.BindPFlag("index", subCmd.Flags().Lookup("index"))
|
||||
viper.SetDefault("index", -1)
|
||||
viper.SetDefault("index", 0)
|
||||
}
|
||||
|
||||
func handler(command string, args []string) msgbus.HandlerFunc {
|
||||
|
|
|
@ -588,6 +588,12 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
|
|||
return ch
|
||||
}
|
||||
|
||||
if o.Index == 0 {
|
||||
// Index == 0 indicates to start from head
|
||||
// so just return the channel
|
||||
return ch
|
||||
}
|
||||
|
||||
// IF client requests to start from index >= 0 (-1 from the head)
|
||||
// AND the topic's sequence number hasn't been reset (< Index) THEN
|
||||
if o.Index > 0 && o.Index <= t.Sequence {
|
||||
|
|
|
@ -147,7 +147,6 @@ func TestMessageBusSubscribe(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMessageBusSubscribeFullBuffer(t *testing.T) {
|
||||
//assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
|
||||
|
@ -165,13 +164,20 @@ func TestMessageBusSubscribeFullBuffer(t *testing.T) {
|
|||
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 2
|
||||
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
|
||||
|
||||
msgs := mb.Subscribe("id1", "hello")
|
||||
go func() {
|
||||
<-msgs
|
||||
}()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
err = mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
|
||||
require.NoError(err)
|
||||
msgs := mb.Subscribe("id1", "hello")
|
||||
go func(ctx context.Context) {
|
||||
select {
|
||||
case <-msgs:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
//err = mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
|
||||
//require.NoError(err)
|
||||
}
|
||||
|
||||
func TestMessageBusSubscribeWithIndex(t *testing.T) {
|
||||
|
@ -202,9 +208,9 @@ func TestMessageBusSubscribeWithIndex(t *testing.T) {
|
|||
|
||||
msgs = mb.Subscribe("id1", "foo", WithIndex(1))
|
||||
|
||||
assert.Equal([]byte("foo"), (<-msgs).Payload)
|
||||
assert.Equal([]byte("bar"), (<-msgs).Payload)
|
||||
assert.Equal([]byte("baz"), (<-msgs).Payload)
|
||||
assert.Equal("foo", string((<-msgs).Payload))
|
||||
assert.Equal("bar", string((<-msgs).Payload))
|
||||
assert.Equal("baz", string((<-msgs).Payload))
|
||||
}
|
||||
|
||||
func TestMessageBusWAL(t *testing.T) {
|
||||
|
|
Notiek ielāde…
Atsaukties uz šo jaunā problēmā