// --------------------------------------------------------------------------------------------------------------------
//
// Copyright (c) by respective owners including Yahoo!, Microsoft, and
// individual contributors. All rights reserved. Released under a BSD
// license as described in the file LICENSE.
//
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using VW.Labels;
using VW.Serializer;
namespace VW
{
///
/// An async wrapper VW supporting data ingest using declarative serializer infrastructure used with .
///
/// The user type to be serialized.
public class VowpalWabbitAsync : IDisposable
{
///
/// The owning manager.
///
private VowpalWabbitThreadedLearning manager;
///
/// The serializers are not thread-safe. Thus we need to allocate one for each VW instance.
///
private IVowpalWabbitSerializer[] serializers;
internal VowpalWabbitAsync(VowpalWabbitThreadedLearning manager)
{
Contract.Requires(manager != null);
Contract.Ensures(this.serializers != null);
this.manager = manager;
// create a serializer for each instance - maintaining separate example caches
var serializer = VowpalWabbitSerializerFactory.CreateSerializer(manager.Settings);
this.serializers = this.manager.VowpalWabbits
.Select(vw => serializer.Create(vw))
.ToArray();
}
///
/// Learns from the given example.
///
/// The example to learn.
/// The label for this .
///
/// The method only enqueues the example for learning and returns immediately.
/// You must not re-use the example.
///
public void Learn(TExample example, ILabel label = null)
{
Contract.Requires(example != null);
Contract.Requires(label != null);
manager.Post(vw =>
{
using (var ex = this.serializers[vw.Settings.Node].Serialize(example, label))
{
ex.Learn();
}
});
}
///
/// Predicts for the given example.
///
/// The example to predict for.
///
/// The method only enqueues the example for prediction and returns immediately.
/// You must not re-use the example.
///
public void Predict(TExample example)
{
Contract.Requires(example != null);
manager.Post(vw =>
{
using (var ex = this.serializers[vw.Settings.Node].Serialize(example))
{
ex.Predict();
}
});
}
///
/// Learns from the given example.
///
/// The example to learn.
/// The label for this .
/// The prediction factory to be used. See .
/// The prediction for the given .
///
/// The method only enqueues the example for learning and returns immediately.
/// Await the returned task to receive the prediction result.
///
public Task Learn(TExample example, ILabel label, IVowpalWabbitPredictionFactory predictionFactory)
{
Contract.Requires(example != null);
Contract.Requires(label != null);
Contract.Requires(predictionFactory != null);
return manager.Post(vw =>
{
using (var ex = this.serializers[vw.Settings.Node].Serialize(example, label))
{
return ex.Learn(predictionFactory);
}
});
}
///
/// Predicts for the given example.
///
/// The example to predict for.
/// The prediction factory to be used. See .
/// The prediction for the given .
///
/// The method only enqueues the example for learning and returns immediately.
/// Await the returned task to receive the prediction result.
///
public Task Predict(TExample example, IVowpalWabbitPredictionFactory predictionFactory)
{
Contract.Requires(example != null);
Contract.Requires(predictionFactory != null);
return manager.Post(vw =>
{
using (var ex = this.serializers[vw.Settings.Node].Serialize(example))
{
return ex.Predict(predictionFactory);
}
});
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing)
{
if (this.serializers != null)
{
foreach (var serializer in this.serializers)
{
// free cached examples
serializer.Dispose();
}
this.serializers = null;
}
}
}
}
///
/// An async VW wrapper for multiline ingest.
///
/// The user type of the shared feature.
/// The user type for each action dependent feature.
public class VowpalWabbitAsync : IDisposable
{
///
/// The owning manager.
///
private readonly VowpalWabbitThreadedLearning manager;
///
/// The serializers are not thread-safe. Thus we need to allocate one for each VW instance.
///
private VowpalWabbitSingleExampleSerializer[] serializers;
///
/// The serializers are not thread-safe. Thus we need to allocate one for each VW instance.
///
private VowpalWabbitSingleExampleSerializer[] actionDependentFeatureSerializers;
internal VowpalWabbitAsync(VowpalWabbitThreadedLearning manager)
{
if (manager == null)
throw new ArgumentNullException("manager");
if (manager.Settings == null)
throw new ArgumentNullException("manager.Settings");
if (manager.Settings.ParallelOptions == null)
throw new ArgumentNullException("manager.Settings.ParallelOptions");
if (manager.Settings.ParallelOptions.MaxDegreeOfParallelism <= 0)
throw new ArgumentOutOfRangeException("MaxDegreeOfParallelism must be greater than zero.");
Contract.Ensures(this.serializers != null);
Contract.Ensures(this.actionDependentFeatureSerializers != null);
Contract.EndContractBlock();
this.manager = manager;
// create a serializer for each instance - maintaining separate example caches
var serializer = VowpalWabbitSerializerFactory.CreateSerializer(manager.Settings) as VowpalWabbitSingleExampleSerializerCompiler;
if (serializer == null)
throw new ArgumentException(string.Format(
"{0} maps to a multiline example. Use VowpalWabbitAsync<{0}> instead.",
typeof(TExample)));
var adfSerializer = VowpalWabbitSerializerFactory.CreateSerializer(manager.Settings) as VowpalWabbitSingleExampleSerializerCompiler;
if (adfSerializer == null)
throw new ArgumentException(string.Format(
"{0} maps to a multiline example. Use VowpalWabbitAsync<{0}> instead.",
typeof(TActionDependentFeature)));
this.serializers = this.manager.VowpalWabbits
.Select(vw => serializer.Create(vw))
.ToArray();
this.actionDependentFeatureSerializers = this.manager.VowpalWabbits
.Select(vw => adfSerializer.Create(vw))
.ToArray();
}
///
/// Learn from the given example and return the current prediction for it.
///
/// The shared example.
/// The action dependent features.
/// The index of the example to learn within .
/// The label for the example to learn.
public void Learn(TExample example, IReadOnlyCollection actionDependentFeatures, int index, ILabel label)
{
Contract.Requires(example != null);
Contract.Requires(actionDependentFeatures != null);
Contract.Requires(index >= 0);
Contract.Requires(label != null);
manager.Post(vw => VowpalWabbitMultiLine.Learn(
vw,
this.serializers[vw.Settings.Node],
this.actionDependentFeatureSerializers[vw.Settings.Node],
example,
actionDependentFeatures,
index,
label));
}
///
/// Learn from the given example and return the current prediction for it.
///
/// The shared example.
/// The action dependent features.
/// The index of the example to learn within .
/// The label for the example to learn.
/// The ranked prediction for the given examples.
public Task[]> LearnAndPredict(TExample example, IReadOnlyCollection actionDependentFeatures, int index, ILabel label)
{
Contract.Requires(example != null);
Contract.Requires(actionDependentFeatures != null);
Contract.Requires(index >= 0);
Contract.Requires(label != null);
return manager.Post(vw => VowpalWabbitMultiLine.LearnAndPredict(
vw,
this.serializers[vw.Settings.Node],
this.actionDependentFeatureSerializers[vw.Settings.Node],
example,
actionDependentFeatures,
index,
label));
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing)
{
if (this.serializers != null)
{
foreach (var serializer in this.serializers)
{
// free cached examples
serializer.Dispose();
}
this.serializers = null;
}
if (this.actionDependentFeatureSerializers != null)
{
foreach (var serializer in this.actionDependentFeatureSerializers)
{
// free cached examples
serializer.Dispose();
}
this.actionDependentFeatureSerializers = null;
}
}
}
}
}