// -------------------------------------------------------------------------------------------------------------------- // // Copyright (c) by respective owners including Yahoo!, Microsoft, and // individual contributors. All rights reserved. Released under a BSD // license as described in the file LICENSE. // // -------------------------------------------------------------------------------------------------------------------- using System; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Linq; using System.Text; using System.Threading.Tasks; using VW.Labels; using VW.Serializer; namespace VW { /// /// An async wrapper VW supporting data ingest using declarative serializer infrastructure used with . /// /// The user type to be serialized. public class VowpalWabbitAsync : IDisposable { /// /// The owning manager. /// private VowpalWabbitThreadedLearning manager; /// /// The serializers are not thread-safe. Thus we need to allocate one for each VW instance. /// private IVowpalWabbitSerializer[] serializers; internal VowpalWabbitAsync(VowpalWabbitThreadedLearning manager) { Contract.Requires(manager != null); Contract.Ensures(this.serializers != null); this.manager = manager; // create a serializer for each instance - maintaining separate example caches var serializer = VowpalWabbitSerializerFactory.CreateSerializer(manager.Settings); this.serializers = this.manager.VowpalWabbits .Select(vw => serializer.Create(vw)) .ToArray(); } /// /// Learns from the given example. /// /// The example to learn. /// The label for this . /// /// The method only enqueues the example for learning and returns immediately. /// You must not re-use the example. /// public void Learn(TExample example, ILabel label = null) { Contract.Requires(example != null); Contract.Requires(label != null); manager.Post(vw => { using (var ex = this.serializers[vw.Settings.Node].Serialize(example, label)) { ex.Learn(); } }); } /// /// Predicts for the given example. /// /// The example to predict for. /// /// The method only enqueues the example for prediction and returns immediately. /// You must not re-use the example. /// public void Predict(TExample example) { Contract.Requires(example != null); manager.Post(vw => { using (var ex = this.serializers[vw.Settings.Node].Serialize(example)) { ex.Predict(); } }); } /// /// Learns from the given example. /// /// The example to learn. /// The label for this . /// The prediction factory to be used. See . /// The prediction for the given . /// /// The method only enqueues the example for learning and returns immediately. /// Await the returned task to receive the prediction result. /// public Task Learn(TExample example, ILabel label, IVowpalWabbitPredictionFactory predictionFactory) { Contract.Requires(example != null); Contract.Requires(label != null); Contract.Requires(predictionFactory != null); return manager.Post(vw => { using (var ex = this.serializers[vw.Settings.Node].Serialize(example, label)) { return ex.Learn(predictionFactory); } }); } /// /// Predicts for the given example. /// /// The example to predict for. /// The prediction factory to be used. See . /// The prediction for the given . /// /// The method only enqueues the example for learning and returns immediately. /// Await the returned task to receive the prediction result. /// public Task Predict(TExample example, IVowpalWabbitPredictionFactory predictionFactory) { Contract.Requires(example != null); Contract.Requires(predictionFactory != null); return manager.Post(vw => { using (var ex = this.serializers[vw.Settings.Node].Serialize(example)) { return ex.Predict(predictionFactory); } }); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (disposing) { if (this.serializers != null) { foreach (var serializer in this.serializers) { // free cached examples serializer.Dispose(); } this.serializers = null; } } } } /// /// An async VW wrapper for multiline ingest. /// /// The user type of the shared feature. /// The user type for each action dependent feature. public class VowpalWabbitAsync : IDisposable { /// /// The owning manager. /// private readonly VowpalWabbitThreadedLearning manager; /// /// The serializers are not thread-safe. Thus we need to allocate one for each VW instance. /// private VowpalWabbitSingleExampleSerializer[] serializers; /// /// The serializers are not thread-safe. Thus we need to allocate one for each VW instance. /// private VowpalWabbitSingleExampleSerializer[] actionDependentFeatureSerializers; internal VowpalWabbitAsync(VowpalWabbitThreadedLearning manager) { if (manager == null) throw new ArgumentNullException("manager"); if (manager.Settings == null) throw new ArgumentNullException("manager.Settings"); if (manager.Settings.ParallelOptions == null) throw new ArgumentNullException("manager.Settings.ParallelOptions"); if (manager.Settings.ParallelOptions.MaxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException("MaxDegreeOfParallelism must be greater than zero."); Contract.Ensures(this.serializers != null); Contract.Ensures(this.actionDependentFeatureSerializers != null); Contract.EndContractBlock(); this.manager = manager; // create a serializer for each instance - maintaining separate example caches var serializer = VowpalWabbitSerializerFactory.CreateSerializer(manager.Settings) as VowpalWabbitSingleExampleSerializerCompiler; if (serializer == null) throw new ArgumentException(string.Format( "{0} maps to a multiline example. Use VowpalWabbitAsync<{0}> instead.", typeof(TExample))); var adfSerializer = VowpalWabbitSerializerFactory.CreateSerializer(manager.Settings) as VowpalWabbitSingleExampleSerializerCompiler; if (adfSerializer == null) throw new ArgumentException(string.Format( "{0} maps to a multiline example. Use VowpalWabbitAsync<{0}> instead.", typeof(TActionDependentFeature))); this.serializers = this.manager.VowpalWabbits .Select(vw => serializer.Create(vw)) .ToArray(); this.actionDependentFeatureSerializers = this.manager.VowpalWabbits .Select(vw => adfSerializer.Create(vw)) .ToArray(); } /// /// Learn from the given example and return the current prediction for it. /// /// The shared example. /// The action dependent features. /// The index of the example to learn within . /// The label for the example to learn. public void Learn(TExample example, IReadOnlyCollection actionDependentFeatures, int index, ILabel label) { Contract.Requires(example != null); Contract.Requires(actionDependentFeatures != null); Contract.Requires(index >= 0); Contract.Requires(label != null); manager.Post(vw => VowpalWabbitMultiLine.Learn( vw, this.serializers[vw.Settings.Node], this.actionDependentFeatureSerializers[vw.Settings.Node], example, actionDependentFeatures, index, label)); } /// /// Learn from the given example and return the current prediction for it. /// /// The shared example. /// The action dependent features. /// The index of the example to learn within . /// The label for the example to learn. /// The ranked prediction for the given examples. public Task[]> LearnAndPredict(TExample example, IReadOnlyCollection actionDependentFeatures, int index, ILabel label) { Contract.Requires(example != null); Contract.Requires(actionDependentFeatures != null); Contract.Requires(index >= 0); Contract.Requires(label != null); return manager.Post(vw => VowpalWabbitMultiLine.LearnAndPredict( vw, this.serializers[vw.Settings.Node], this.actionDependentFeatureSerializers[vw.Settings.Node], example, actionDependentFeatures, index, label)); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (disposing) { if (this.serializers != null) { foreach (var serializer in this.serializers) { // free cached examples serializer.Dispose(); } this.serializers = null; } if (this.actionDependentFeatureSerializers != null) { foreach (var serializer in this.actionDependentFeatureSerializers) { // free cached examples serializer.Dispose(); } this.actionDependentFeatureSerializers = null; } } } } }