/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package electron import ( "fmt" "github.com/apache/qpid-proton/go/pkg/amqp" "github.com/apache/qpid-proton/go/pkg/proton" "time" ) // Settings associated with a link type LinkSettings interface { // Source address that messages are coming from. Source() string // Target address that messages are going to. Target() string // Name is a unique name for the link among links between the same // containers in the same direction. By default generated automatically. LinkName() string // IsSender is true if this is the sending end of the link. IsSender() bool // IsReceiver is true if this is the receiving end of the link. IsReceiver() bool // SndSettle defines when the sending end of the link settles message delivery. SndSettle() SndSettleMode // RcvSettle defines when the sending end of the link settles message delivery. RcvSettle() RcvSettleMode // Session containing the Link Session() Session // Filter for the link Filter() map[amqp.Symbol]interface{} // Advanced settings for the source SourceSettings() TerminusSettings // Advanced settings for the target TargetSettings() TerminusSettings } // LinkOption can be passed when creating a sender or receiver link to set optional configuration. type LinkOption func(*linkSettings) // Source returns a LinkOption that sets address that messages are coming from. func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } } // Target returns a LinkOption that sets address that messages are going to. func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } } // LinkName returns a LinkOption that sets the link name. func LinkName(s string) LinkOption { return func(l *linkSettings) { l.linkName = s } } // SndSettle returns a LinkOption that sets the send settle mode func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } } // RcvSettle returns a LinkOption that sets the send settle mode func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } } // Capacity returns a LinkOption that sets the link capacity func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } } // Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender. func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } } // DurableSubscription returns a LinkOption that configures a Receiver as a named durable // subscription. The name overrides (and is overridden by) LinkName() so you should normally // only use one of these options. func DurableSubscription(name string) LinkOption { return func(l *linkSettings) { l.linkName = name l.sourceSettings.Durability = proton.Deliveries l.sourceSettings.Expiry = proton.ExpireNever } } // AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages // are sent but no acknowledgment is received, messages can be lost if there is // a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst func AtMostOnce() LinkOption { return func(l *linkSettings) { SndSettle(SndSettled)(l) RcvSettle(RcvFirst)(l) } } // AtLeastOnce returns a LinkOption that requests acknowledgment for every // message, acknowledgment indicates the message was definitely received. In the // event of a failure, unacknowledged messages can be re-sent but there is a // chance that the message will be received twice in this case. Sets // SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst func AtLeastOnce() LinkOption { return func(l *linkSettings) { SndSettle(SndUnsettled)(l) RcvSettle(RcvFirst)(l) } } // Filter returns a LinkOption that sets a filter. func Filter(m map[amqp.Symbol]interface{}) LinkOption { return func(l *linkSettings) { l.filter = m } } // SourceSettings returns a LinkOption that sets all the SourceSettings. // Note: it will override the source address set by a Source() option func SourceSettings(ts TerminusSettings) LinkOption { return func(l *linkSettings) { l.sourceSettings = ts } } // TargetSettings returns a LinkOption that sets all the TargetSettings. // Note: it will override the target address set by a Target() option func TargetSettings(ts TerminusSettings) LinkOption { return func(l *linkSettings) { l.targetSettings = ts } } // SndSettleMode defines when the sending end of the link settles message delivery. type SndSettleMode proton.SndSettleMode const ( // Messages are sent unsettled SndUnsettled = SndSettleMode(proton.SndUnsettled) // Messages are sent already settled SndSettled = SndSettleMode(proton.SndSettled) // Sender can send either unsettled or settled messages. SndMixed = SndSettleMode(proton.SndMixed) ) // RcvSettleMode defines when the receiving end of the link settles message delivery. type RcvSettleMode proton.RcvSettleMode const ( // Receiver settles first. RcvFirst = RcvSettleMode(proton.RcvFirst) // Receiver waits for sender to settle before settling. RcvSecond = RcvSettleMode(proton.RcvSecond) ) type linkSettings struct { source string sourceSettings TerminusSettings target string targetSettings TerminusSettings linkName string isSender bool sndSettle SndSettleMode rcvSettle RcvSettleMode capacity int prefetch bool filter map[amqp.Symbol]interface{} session *session pLink proton.Link } // Advanced AMQP settings for the source or target of a link. // Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription() // and do not need to be set/examined directly. type TerminusSettings struct { Durability proton.Durability Expiry proton.ExpiryPolicy Timeout time.Duration Dynamic bool } func makeTerminusSettings(t proton.Terminus) TerminusSettings { return TerminusSettings{ Durability: t.Durability(), Expiry: t.ExpiryPolicy(), Timeout: t.Timeout(), Dynamic: t.IsDynamic(), } } type link struct { endpoint linkSettings } func (l *linkSettings) Source() string { return l.source } func (l *linkSettings) Target() string { return l.target } func (l *linkSettings) LinkName() string { return l.linkName } func (l *linkSettings) IsSender() bool { return l.isSender } func (l *linkSettings) IsReceiver() bool { return !l.isSender } func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle } func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle } func (l *linkSettings) Filter() map[amqp.Symbol]interface{} { return l.filter } func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings } func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings } func (l *link) Session() Session { return l.session } func (l *link) Connection() Connection { return l.session.Connection() } func (l *link) engine() *proton.Engine { return l.session.connection.engine } func (l *link) handler() *handler { return l.session.connection.handler } // Open a link and return the linkSettings. func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error) { l := linkSettings{ isSender: isSender, capacity: 1, prefetch: false, session: sn, } for _, set := range setting { set(&l) } if l.linkName == "" { l.linkName = l.session.connection.container.nextLinkName() } if l.IsSender() { l.pLink = l.session.pSession.Sender(l.linkName) } else { l.pLink = l.session.pSession.Receiver(l.linkName) } if l.pLink.IsNil() { return l, fmt.Errorf("cannot create link %s", l.pLink) } l.pLink.Source().SetAddress(l.source) if len(l.filter) > 0 { if err := l.pLink.Source().Filter().Marshal(l.filter); err != nil { panic(err) // Shouldn't happen } } l.pLink.Source().SetDurability(l.sourceSettings.Durability) l.pLink.Source().SetExpiryPolicy(l.sourceSettings.Expiry) l.pLink.Source().SetTimeout(l.sourceSettings.Timeout) l.pLink.Source().SetDynamic(l.sourceSettings.Dynamic) l.pLink.Target().SetAddress(l.target) l.pLink.Target().SetDurability(l.targetSettings.Durability) l.pLink.Target().SetExpiryPolicy(l.targetSettings.Expiry) l.pLink.Target().SetTimeout(l.targetSettings.Timeout) l.pLink.Target().SetDynamic(l.targetSettings.Dynamic) l.pLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle)) l.pLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle)) l.pLink.Open() return l, nil } func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings { l := linkSettings{ isSender: pLink.IsSender(), source: pLink.RemoteSource().Address(), sourceSettings: makeTerminusSettings(pLink.RemoteSource()), target: pLink.RemoteTarget().Address(), targetSettings: makeTerminusSettings(pLink.RemoteTarget()), linkName: pLink.Name(), sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()), rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()), capacity: 1, prefetch: false, pLink: pLink, session: sn, } filter := l.pLink.RemoteSource().Filter() if !filter.Empty() { filter.Unmarshal(&l.filter) // TODO aconway 2017-06-08: ignoring errors } return l } // Not part of Link interface but use by Sender and Receiver. func (l *link) Credit() (credit int, err error) { err = l.engine().InjectWait(func() error { if l.Error() != nil { return l.Error() } credit = l.pLink.Credit() return nil }) return } // Not part of Link interface but use by Sender and Receiver. func (l *link) Capacity() int { return l.capacity } func (l *link) Close(err error) { _ = l.engine().Inject(func() { if l.Error() == nil { localClose(l.pLink, err) } }) }