Feat[pool]: Concurrent safe buffer pool experiment (breaking changes maybe?)
This commit is contained in:
parent
85943b8762
commit
92c0b3d0fd
373
pool/bytes.go
373
pool/bytes.go
|
@ -22,6 +22,14 @@ func NewBufferFactory() BufferFactory {
|
|||
}
|
||||
}
|
||||
|
||||
func (cf BufferFactory) withMutex(buf *Buffer) *Buffer {
|
||||
if buf.mu != nil {
|
||||
return buf
|
||||
}
|
||||
buf.mu = &sync.Mutex{}
|
||||
return buf
|
||||
}
|
||||
|
||||
// NewSizedBufferFactory creates a new BufferFactory that creates new buffers of the given size on demand.
|
||||
func NewSizedBufferFactory(size int) BufferFactory {
|
||||
return BufferFactory{
|
||||
|
@ -35,9 +43,35 @@ func NewSizedBufferFactory(size int) BufferFactory {
|
|||
func (cf BufferFactory) Put(buf *Buffer) error {
|
||||
var err = ErrBufferReturned
|
||||
buf.o.Do(func() {
|
||||
_ = buf.Reset()
|
||||
if cf.put(buf) {
|
||||
err = nil
|
||||
}
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (cf BufferFactory) put(buf *Buffer) (ok bool) {
|
||||
_ = buf.Reset()
|
||||
if buf.mu != nil {
|
||||
buf.mu = nil
|
||||
}
|
||||
cf.pool.Put(buf.Buffer)
|
||||
buf.Buffer = nil
|
||||
ok = true
|
||||
return
|
||||
}
|
||||
|
||||
func (cf BufferFactory) SafePut(buf *Buffer) error {
|
||||
var err = ErrBufferReturned
|
||||
if !buf.hasmu() {
|
||||
return ErrBufferHasNoMutex
|
||||
}
|
||||
buf.mu.Lock()
|
||||
buf.o.Do(func() {
|
||||
buf.Buffer.Reset()
|
||||
cf.pool.Put(buf.Buffer)
|
||||
buf.Buffer = nil
|
||||
buf.mu = nil
|
||||
err = nil
|
||||
})
|
||||
return err
|
||||
|
@ -63,15 +97,26 @@ type Buffer struct {
|
|||
*bytes.Buffer
|
||||
o *sync.Once
|
||||
co *sync.Once
|
||||
mu *sync.Mutex
|
||||
p *BufferFactory
|
||||
}
|
||||
|
||||
func (c *Buffer) hasmu() bool {
|
||||
return c.mu != nil
|
||||
}
|
||||
|
||||
// WithParent sets the parent of the buffer. This is useful for chaining factories, and for facilitating
|
||||
// in-line buffer return with functions like Buffer.Close(). Be mindful, however, that this adds a bit of overhead.
|
||||
func (c Buffer) WithParent(p *BufferFactory) *Buffer {
|
||||
func (c *Buffer) WithParent(p *BufferFactory) *Buffer {
|
||||
c.p = p
|
||||
c.co = &sync.Once{}
|
||||
return &c
|
||||
return c
|
||||
}
|
||||
|
||||
// WithMutex implements WithParent and also sets a mutex on the buffer to be used with the Safe* methods.
|
||||
func (c *Buffer) WithMutex(p *BufferFactory) *Buffer {
|
||||
cNew := c.WithParent(p)
|
||||
return p.withMutex(cNew)
|
||||
}
|
||||
|
||||
// Bytes returns a slice of length b.Len() holding the unread portion of the buffer.
|
||||
|
@ -81,15 +126,35 @@ func (c Buffer) WithParent(p *BufferFactory) *Buffer {
|
|||
// so immediate changes to the slice will affect the result of future reads.
|
||||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
func (c Buffer) Bytes() []byte {
|
||||
func (c *Buffer) Bytes() []byte {
|
||||
if c.Buffer == nil {
|
||||
return nil
|
||||
}
|
||||
return c.Buffer.Bytes()
|
||||
b := c.Buffer.Bytes()
|
||||
return b
|
||||
}
|
||||
|
||||
// SafeBytes returns a slice of length b.Len() holding the unread portion of the buffer.
|
||||
// This method is the same as the Bytes() method in this package, but it holds a write lock on the buffer.
|
||||
func (c *Buffer) SafeBytes() []byte {
|
||||
if !c.hasmu() {
|
||||
panic(ErrBufferHasNoMutex)
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
b := c.Buffer.Bytes()
|
||||
c.mu.Unlock()
|
||||
return b
|
||||
|
||||
}
|
||||
|
||||
// MustBytes is the same as Bytes but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustBytes() []byte {
|
||||
func (c *Buffer) MustBytes() []byte {
|
||||
if c.Buffer == nil {
|
||||
panic(ErrBufferReturned)
|
||||
}
|
||||
|
@ -102,15 +167,34 @@ func (c Buffer) MustBytes() []byte {
|
|||
// To build strings more efficiently, see the strings.Builder type.
|
||||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
func (c Buffer) String() string {
|
||||
func (c *Buffer) String() string {
|
||||
if c.Buffer == nil {
|
||||
return ""
|
||||
}
|
||||
return c.Buffer.String()
|
||||
}
|
||||
|
||||
// SafeString returns the contents of the unread portion of the buffer.
|
||||
//
|
||||
// This function is the same as the String() method in this package, but it holds a write lock on the buffer.
|
||||
func (c *Buffer) SafeString() string {
|
||||
if !c.hasmu() {
|
||||
panic(ErrBufferHasNoMutex)
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
s := c.Buffer.String()
|
||||
c.mu.Unlock()
|
||||
return s
|
||||
}
|
||||
|
||||
// MustString is the same as String but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustString() string {
|
||||
func (c *Buffer) MustString() string {
|
||||
if c.Buffer == nil {
|
||||
panic(ErrBufferReturned)
|
||||
}
|
||||
|
@ -123,7 +207,7 @@ func (c Buffer) MustString() string {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) Reset() error {
|
||||
func (c *Buffer) Reset() error {
|
||||
if c.Buffer == nil {
|
||||
return ErrBufferReturned
|
||||
}
|
||||
|
@ -131,8 +215,25 @@ func (c Buffer) Reset() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SafeReset is the same as Reset but it holds a write lock on the buffer.
|
||||
func (c *Buffer) SafeReset() error {
|
||||
if !c.hasmu() {
|
||||
return ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return ErrBufferReturned
|
||||
}
|
||||
c.Buffer.Reset()
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// MustReset is the same as Reset but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustReset() {
|
||||
func (c *Buffer) MustReset() {
|
||||
if err := c.Reset(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -144,15 +245,32 @@ func (c Buffer) MustReset() {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns 0 if the buffer has already been returned to the pool.
|
||||
func (c Buffer) Len() int {
|
||||
func (c *Buffer) Len() int {
|
||||
if c.Buffer == nil {
|
||||
return 0
|
||||
}
|
||||
return c.Buffer.Len()
|
||||
}
|
||||
|
||||
// SafeLen is the same as Len but it holds a write lock on the buffer.
|
||||
func (c *Buffer) SafeLen() int {
|
||||
if !c.hasmu() {
|
||||
panic(ErrBufferHasNoMutex)
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
l := c.Buffer.Len()
|
||||
c.mu.Unlock()
|
||||
return l
|
||||
}
|
||||
|
||||
// MustLen is the same as Len but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustLen() int {
|
||||
func (c *Buffer) MustLen() int {
|
||||
if c.Buffer == nil {
|
||||
panic(ErrBufferReturned)
|
||||
}
|
||||
|
@ -165,15 +283,32 @@ func (c Buffer) MustLen() int {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) Write(p []byte) (int, error) {
|
||||
func (c *Buffer) Write(p []byte) (int, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
return c.Buffer.Write(p)
|
||||
}
|
||||
|
||||
// SafeWrite is the same as Write but it holds a write lock on the buffer.
|
||||
func (c *Buffer) SafeWrite(p []byte) (int, error) {
|
||||
if !c.hasmu() {
|
||||
return 0, ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
n, err := c.Buffer.Write(p)
|
||||
c.mu.Unlock()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// MustWrite is the same as Write but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustWrite(p []byte) {
|
||||
func (c *Buffer) MustWrite(p []byte) {
|
||||
if _, err := c.Write(p); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -186,7 +321,7 @@ func (c Buffer) MustWrite(p []byte) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) WriteRune(r rune) (int, error) {
|
||||
func (c *Buffer) WriteRune(r rune) (int, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
|
@ -194,7 +329,7 @@ func (c Buffer) WriteRune(r rune) (int, error) {
|
|||
}
|
||||
|
||||
// MustWriteRune is the same as WriteRune but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustWriteRune(r rune) {
|
||||
func (c *Buffer) MustWriteRune(r rune) {
|
||||
if _, err := c.WriteRune(r); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -207,7 +342,7 @@ func (c Buffer) MustWriteRune(r rune) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) WriteByte(cyte byte) error {
|
||||
func (c *Buffer) WriteByte(cyte byte) error {
|
||||
if c.Buffer == nil {
|
||||
return ErrBufferReturned
|
||||
}
|
||||
|
@ -215,7 +350,7 @@ func (c Buffer) WriteByte(cyte byte) error {
|
|||
}
|
||||
|
||||
// MustWriteByte is the same as WriteByte but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustWriteByte(cyte byte) {
|
||||
func (c *Buffer) MustWriteByte(cyte byte) {
|
||||
if err := c.WriteByte(cyte); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -227,7 +362,7 @@ func (c Buffer) MustWriteByte(cyte byte) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) WriteString(str string) (int, error) {
|
||||
func (c *Buffer) WriteString(str string) (int, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
|
@ -242,7 +377,7 @@ func (c Buffer) WriteString(str string) (int, error) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) Grow(n int) error {
|
||||
func (c *Buffer) Grow(n int) error {
|
||||
if c.Buffer == nil {
|
||||
return ErrBufferReturned
|
||||
}
|
||||
|
@ -255,7 +390,7 @@ func (c Buffer) Grow(n int) error {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// If the buffer has already been returned to the pool, Cap will return 0.
|
||||
func (c Buffer) Cap() int {
|
||||
func (c *Buffer) Cap() int {
|
||||
if c.Buffer == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -268,7 +403,7 @@ func (c Buffer) Cap() int {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) Truncate(n int) error {
|
||||
func (c *Buffer) Truncate(n int) error {
|
||||
if c.Buffer == nil {
|
||||
return ErrBufferReturned
|
||||
}
|
||||
|
@ -276,8 +411,23 @@ func (c Buffer) Truncate(n int) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Buffer) SafeTruncate(n int) {
|
||||
if !c.hasmu() {
|
||||
panic(ErrBufferHasNoMutex)
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
panic(ErrBufferReturned)
|
||||
}
|
||||
c.Buffer.Truncate(n)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// MustTruncate is the same as Truncate but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustTruncate(n int) {
|
||||
func (c *Buffer) MustTruncate(n int) {
|
||||
if err := c.Truncate(n); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -290,15 +440,31 @@ func (c Buffer) MustTruncate(n int) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) ReadFrom(r io.Reader) (int64, error) {
|
||||
func (c *Buffer) ReadFrom(r io.Reader) (int64, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
return c.Buffer.ReadFrom(r)
|
||||
}
|
||||
|
||||
func (c *Buffer) SafeReadFrom(r io.Reader) (int64, error) {
|
||||
if !c.hasmu() {
|
||||
return 0, ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
n, err := c.Buffer.ReadFrom(r)
|
||||
c.mu.Unlock()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// MustReadFrom is the same as ReadFrom but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustReadFrom(r io.Reader) {
|
||||
func (c *Buffer) MustReadFrom(r io.Reader) {
|
||||
if _, err := c.ReadFrom(r); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -311,15 +477,32 @@ func (c Buffer) MustReadFrom(r io.Reader) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) WriteTo(w io.Writer) (int64, error) {
|
||||
func (c *Buffer) WriteTo(w io.Writer) (int64, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
return c.Buffer.WriteTo(w)
|
||||
}
|
||||
|
||||
// SafeWriteTo is the same as WriteTo but holds a write lock on the buffer while writing.
|
||||
func (c *Buffer) SafeWriteTo(w io.Writer) (int64, error) {
|
||||
if !c.hasmu() {
|
||||
return 0, ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
n, err := c.Buffer.WriteTo(w)
|
||||
c.mu.Unlock()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// MustWriteTo is the same as WriteTo but panics if the buffer has already been returned to the pool.
|
||||
func (c Buffer) MustWriteTo(w io.Writer) {
|
||||
func (c *Buffer) MustWriteTo(w io.Writer) {
|
||||
if _, err := c.WriteTo(w); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -332,25 +515,59 @@ func (c Buffer) MustWriteTo(w io.Writer) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) Read(p []byte) (int, error) {
|
||||
func (c *Buffer) Read(p []byte) (int, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
return c.Buffer.Read(p)
|
||||
}
|
||||
|
||||
// SafeRead is the same as Read but holds a write lock on the buffer while reading.
|
||||
func (c *Buffer) SafeRead(p []byte) (int, error) {
|
||||
if !c.hasmu() {
|
||||
return 0, ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
n, err := c.Buffer.Read(p)
|
||||
c.mu.Unlock()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// ReadByte reads and returns the next byte from the buffer.
|
||||
// If no byte is available, it returns error io.EOF.
|
||||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) ReadByte() (byte, error) {
|
||||
func (c *Buffer) ReadByte() (byte, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
return c.Buffer.ReadByte()
|
||||
}
|
||||
|
||||
// SafeReadByte is the same as ReadByte but holds a write lock on the buffer while reading.
|
||||
func (c *Buffer) SafeReadByte() (byte, error) {
|
||||
if !c.hasmu() {
|
||||
return 0, ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return 0, ErrBufferReturned
|
||||
}
|
||||
b, err := c.Buffer.ReadByte()
|
||||
c.mu.Unlock()
|
||||
return b, err
|
||||
}
|
||||
|
||||
// ReadRune reads and returns the next UTF-8-encoded
|
||||
// Unicode code point from the buffer.
|
||||
// If no bytes are available, the error returned is io.EOF.
|
||||
|
@ -359,7 +576,7 @@ func (c Buffer) ReadByte() (byte, error) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) ReadRune() (rune, int, error) {
|
||||
func (c *Buffer) ReadRune() (rune, int, error) {
|
||||
if c.Buffer == nil {
|
||||
return 0, 0, ErrBufferReturned
|
||||
}
|
||||
|
@ -373,13 +590,30 @@ func (c Buffer) ReadRune() (rune, int, error) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) UnreadByte() error {
|
||||
func (c *Buffer) UnreadByte() error {
|
||||
if c.Buffer == nil {
|
||||
return ErrBufferReturned
|
||||
}
|
||||
return c.Buffer.UnreadByte()
|
||||
}
|
||||
|
||||
// SafeUnreadByte is the same as UnreadByte but holds a write lock on the buffer while un-reading.
|
||||
func (c *Buffer) SafeUnreadByte() error {
|
||||
if !c.hasmu() {
|
||||
return ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return ErrBufferReturned
|
||||
}
|
||||
err := c.Buffer.UnreadByte()
|
||||
c.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// UnreadRune unreads the last rune returned by ReadRune.
|
||||
// If the most recent read or write operation on the buffer was
|
||||
// not a successful ReadRune, UnreadRune returns an error. (In this regard
|
||||
|
@ -388,7 +622,7 @@ func (c Buffer) UnreadByte() error {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) UnreadRune() error {
|
||||
func (c *Buffer) UnreadRune() error {
|
||||
if c.Buffer == nil {
|
||||
return ErrBufferReturned
|
||||
}
|
||||
|
@ -404,7 +638,7 @@ func (c Buffer) UnreadRune() error {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns an error if the buffer has already been returned to the pool.
|
||||
func (c Buffer) ReadBytes(delim byte) ([]byte, error) {
|
||||
func (c *Buffer) ReadBytes(delim byte) ([]byte, error) {
|
||||
if c.Buffer == nil {
|
||||
return nil, ErrBufferReturned
|
||||
}
|
||||
|
@ -418,38 +652,87 @@ func (c Buffer) ReadBytes(delim byte) ([]byte, error) {
|
|||
//
|
||||
// *This is from the bytes.Buffer docs.*
|
||||
// This wrapper returns nil if the buffer has already been returned to the pool.
|
||||
func (c Buffer) Next(n int) []byte {
|
||||
func (c *Buffer) Next(n int) []byte {
|
||||
if c.Buffer == nil {
|
||||
return nil
|
||||
}
|
||||
return c.Buffer.Next(n)
|
||||
}
|
||||
|
||||
// IsClosed returns true if the buffer has been returned to the pool.
|
||||
func (c Buffer) IsClosed() bool {
|
||||
var closed = true
|
||||
if c.co == nil {
|
||||
c.co = &sync.Once{}
|
||||
// SafeNext is the same as Next but holds a write lock on the buffer while reading.
|
||||
func (c *Buffer) SafeNext(n int) []byte {
|
||||
if !c.hasmu() {
|
||||
panic(ErrBufferHasNoMutex)
|
||||
}
|
||||
c.co.Do(func() {
|
||||
closed = false
|
||||
})
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
b := c.Buffer.Next(n)
|
||||
c.mu.Unlock()
|
||||
return b
|
||||
}
|
||||
|
||||
// IsClosed returns true if the buffer has been returned to the pool.
|
||||
func (c *Buffer) IsClosed() bool {
|
||||
if c.p == nil {
|
||||
return false
|
||||
}
|
||||
return c.Buffer == nil || c.co == nil
|
||||
}
|
||||
|
||||
// SafeIsClosed is the same as IsClosed but holds a write lock on the buffer while reading.
|
||||
func (c *Buffer) SafeIsClosed() bool {
|
||||
if !c.hasmu() {
|
||||
panic(ErrBufferHasNoMutex)
|
||||
}
|
||||
if !c.mu.TryLock() {
|
||||
return false
|
||||
}
|
||||
closed := c.IsClosed()
|
||||
c.mu.Unlock()
|
||||
return closed
|
||||
}
|
||||
|
||||
// Close implements io.Closer. It returns the buffer to the pool. This
|
||||
func (c Buffer) Close() error {
|
||||
func (c *Buffer) Close() error {
|
||||
if c.Buffer == nil {
|
||||
return errors.New("buffer already returned to pool")
|
||||
}
|
||||
if c.p == nil {
|
||||
if c.p == nil || c.co == nil {
|
||||
return errors.New(
|
||||
"buffer does not know it's parent pool and therefore cannot return itself, use Buffer.WithParent",
|
||||
)
|
||||
}
|
||||
var err = ErrBufferReturned
|
||||
c.co.Do(func() {
|
||||
err = c.p.Put(&c)
|
||||
err = c.p.Put(c)
|
||||
c.Buffer = nil
|
||||
})
|
||||
c.co = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// SafeClose is the same as Close but holds a write lock on the buffer while closing.
|
||||
func (c *Buffer) SafeClose() error {
|
||||
if !c.hasmu() {
|
||||
return ErrBufferHasNoMutex
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.Buffer == nil || c.p == nil || c.co == nil {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
return errors.New("buffer already returned to pool")
|
||||
}
|
||||
var err = ErrBufferReturned
|
||||
c.co.Do(func() {
|
||||
if c.p.put(c) {
|
||||
err = nil
|
||||
}
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
)
|
||||
|
||||
func TestNewBufferFactory(t *testing.T) {
|
||||
t.Parallel()
|
||||
bf := NewBufferFactory()
|
||||
if bf.pool == nil {
|
||||
t.Fatalf("The pool is nil")
|
||||
|
@ -15,6 +16,7 @@ func TestNewBufferFactory(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBufferFactory(t *testing.T) {
|
||||
t.Parallel()
|
||||
bf := NewBufferFactory()
|
||||
t.Run("BufferPut", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
@ -572,7 +574,7 @@ func TestBufferFactory(t *testing.T) {
|
|||
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is not nil")
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -581,4 +583,289 @@ func TestBufferFactory(t *testing.T) {
|
|||
t.Fatal("The error is nil after closing a nil buffer")
|
||||
}
|
||||
})
|
||||
t.Run("BufferLockingSafety", func(t *testing.T) {
|
||||
// implement the Buffer.Safe* methods and test them
|
||||
t.Parallel()
|
||||
buf := bf.Get().WithMutex(&bf)
|
||||
if _, err := buf.SafeWrite([]byte("hello")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := buf.SafeWrite([]byte("world")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := buf.SafeWrite([]byte("!")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if buf.SafeString() != "helloworld!" {
|
||||
t.Fatalf("The string is not 'helloworld!': %v", buf.String())
|
||||
}
|
||||
if b := buf.SafeNext(1); b == nil {
|
||||
t.Fatal("The byte is nil despite having written to the buffer")
|
||||
}
|
||||
if _, err := buf.SafeRead(make([]byte, 1)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := buf.SafeUnreadByte(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := buf.SafeWrite([]byte("yeet")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
go func() {
|
||||
if _, err := buf.SafeReadByte(); err != nil {
|
||||
t.Errorf("Error reading from buffer: %v", err)
|
||||
}
|
||||
}()
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
if _, err := buf.SafeWrite([]byte("yeet")); err != nil {
|
||||
t.Errorf("Error writing to buffer: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
// must be ran with race detector enabled or this check does nothing
|
||||
_ = buf.SafeLen()
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
if clsd := buf.SafeIsClosed(); clsd {
|
||||
t.Errorf("The buffer is closed somehow?")
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := buf.SafeRead([]byte("yeet")); err != nil {
|
||||
t.Errorf("Error reading from buffer: %v", err)
|
||||
}
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
_ = buf.SafeBytes()
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
_ = buf.SafeString()
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
if _, err := buf.SafeWriteTo(io.Discard); err != nil {
|
||||
t.Errorf("Error writing to buffer: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
br := bytes.NewReader([]byte("yeet"))
|
||||
if _, err := buf.SafeReadFrom(br); err != nil {
|
||||
t.Errorf("Error reading from buffer: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := buf.SafeReadFrom(strings.NewReader("yeet")); err != nil {
|
||||
t.Errorf("Error reading from buffer: %v", err)
|
||||
}
|
||||
}
|
||||
if err := buf.SafeReset(); err != nil {
|
||||
t.Errorf("Error reading from buffer: %v", err)
|
||||
}
|
||||
if _, err := buf.SafeWrite([]byte("hello")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := buf.SafeClose(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := buf.SafeClose(); err == nil {
|
||||
t.Fatal("The error is nil despite closing the buffer twice")
|
||||
}
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The error is nil despite closing the buffer twice")
|
||||
}
|
||||
}()
|
||||
_ = buf.SafeBytes()
|
||||
}()
|
||||
buf2 := bf.Get().WithMutex(&bf)
|
||||
_, err := buf2.SafeWrite([]byte("hello"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
b, err := buf2.SafeReadByte()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if b != 'h' {
|
||||
t.Fatalf("The byte is not 'h': %v", b)
|
||||
}
|
||||
bts := buf2.SafeNext(buf2.Len())
|
||||
if bts == nil {
|
||||
t.Fatal("The bytes are nil despite having written to the buffer")
|
||||
}
|
||||
if !bytes.Equal(bts, []byte("ello")) {
|
||||
t.Fatalf("The bytes are not 'ello': %v", bts)
|
||||
}
|
||||
buf2.SafeTruncate(0)
|
||||
if err = bf.SafePut(buf2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = bf.SafePut(buf2); err == nil {
|
||||
t.Fatal("The error is nil despite putting the buffer twice")
|
||||
}
|
||||
})
|
||||
t.Run("BufferLockingSafetyMustFail", func(t *testing.T) {
|
||||
// implement the Buffer.Safe* methods and test them
|
||||
t.Parallel()
|
||||
buf := bf.Get()
|
||||
if _, err := buf.SafeWrite([]byte("hello")); err == nil {
|
||||
t.Fatal("the error is nil despite nil mutex")
|
||||
}
|
||||
if _, err := buf.SafeWrite([]byte("world")); err == nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := buf.SafeWrite([]byte("!")); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
if buf.SafeString() != "helloworld!" {
|
||||
t.Fatalf("The string is not 'helloworld!': %v", buf.String())
|
||||
}
|
||||
}()
|
||||
if _, err := buf.SafeRead(make([]byte, 1)); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
if err := buf.SafeUnreadByte(); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
if _, err := buf.SafeWrite([]byte("yeet")); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
if _, err := buf.SafeReadByte(); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
_ = buf.SafeLen()
|
||||
}()
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
if clsd := buf.SafeIsClosed(); clsd {
|
||||
t.Errorf("The buffer is closed somehow?")
|
||||
}
|
||||
}()
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
_ = buf.SafeBytes()
|
||||
}()
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
_ = buf.SafeString()
|
||||
}()
|
||||
if _, err := buf.SafeWriteTo(io.Discard); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
br := bytes.NewReader([]byte("yeet"))
|
||||
if _, err := buf.SafeReadFrom(br); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
if _, err := buf.SafeReadFrom(strings.NewReader("yeet")); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
if err := buf.SafeReset(); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
buf.SafeTruncate(1)
|
||||
}()
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
_ = buf.SafeNext(1)
|
||||
}()
|
||||
if err := buf.SafeClose(); err == nil {
|
||||
t.Fatal("The error is nil despite nil mutex")
|
||||
}
|
||||
buf2 := bf.Get().WithMutex(&bf)
|
||||
buf2.Buffer = nil
|
||||
if err := buf2.SafeReset(); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
if n := buf2.SafeLen(); n != 0 {
|
||||
t.Fatalf("The length is not 0: %v", n)
|
||||
}
|
||||
if _, err := buf2.SafeWriteTo(io.Discard); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
if _, err := buf2.SafeReadFrom(br); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
if _, err := buf2.SafeWrite([]byte("yeet")); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
if _, err := buf2.SafeRead(make([]byte, 1)); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
if err := buf2.SafeUnreadByte(); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
if _, err := buf2.SafeReadByte(); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
if err := buf2.SafeClose(); err == nil {
|
||||
t.Fatal("The error is nil despite nil buffer")
|
||||
}
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("The panic is nil")
|
||||
}
|
||||
}()
|
||||
buf2.SafeTruncate(1)
|
||||
}()
|
||||
if bts := buf2.SafeNext(1); bts != nil {
|
||||
t.Fatalf("The bytes are not nil: %v", bts)
|
||||
}
|
||||
buf2 = buf2.WithMutex(&bf)
|
||||
if bts := buf2.SafeBytes(); bts != nil {
|
||||
t.Fatalf("The bytes are not nil: %v", bts)
|
||||
}
|
||||
if str := buf2.SafeString(); str != "" {
|
||||
t.Fatalf("The string is not empty: %v", str)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -2,4 +2,7 @@ package pool
|
|||
|
||||
import "errors"
|
||||
|
||||
var ErrBufferReturned = errors.New("buffer already returned")
|
||||
var (
|
||||
ErrBufferReturned = errors.New("buffer already returned")
|
||||
ErrBufferHasNoMutex = errors.New("buffer has no mutex, use WithMutex method to acquire a mutex for the buffer")
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue