// --------------------------------------------------------------------------------------------------------------------
//
// Copyright (c) by respective owners including Yahoo!, Microsoft, and
// individual contributors. All rights reserved. Released under a BSD
// license as described in the file LICENSE.
//
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Caching;
namespace VW.Serializer
{
///
/// Reference resolver for JSON.NET $id, $ref elements.
///
public sealed class VowpalWabbitJsonReferenceResolver : IDisposable
{
///
/// Monitoring statistics.
///
public sealed class Stats
{
///
/// The number of items currently cached.
///
public long ItemCount { get; internal set; }
///
/// The number of outstanding requests to resolve a referencce.
///
public long NumberOfOpenRequests { get; internal set; }
}
private readonly Action exampleComplete;
private readonly object lockObject;
private MemoryCache cache;
private MemoryCache cacheRequests;
private readonly Func cacheItemPolicyFactory;
private readonly Func cacheRequestItemPolicyFactory;
private int numberOfOpenRequests;
///
/// Initializes a new instance.
///
/// A callback triggered when all outstanding references for a given example are resolved.
/// Optional name.
/// Optional cache policy for cached items. Defaults to 1 hour sliding expiration.
/// Optional cache policy for resolution requets. Defaults to 1 hour sliding expiration.
public VowpalWabbitJsonReferenceResolver(
Action exampleComplete,
string cacheName = null,
Func cacheItemPolicyFactory = null,
Func cacheRequestItemPolicyFactory = null)
{
this.lockObject = new object();
this.exampleComplete = exampleComplete;
if (cacheName == null)
cacheName = "VowpalWabbitJsonExampleCache";
this.cacheItemPolicyFactory = cacheItemPolicyFactory == null ?
_ => new CacheItemPolicy { SlidingExpiration = TimeSpan.FromHours(1) } :
cacheItemPolicyFactory;
this.cacheRequestItemPolicyFactory = cacheRequestItemPolicyFactory == null ?
_ => new CacheItemPolicy { SlidingExpiration = TimeSpan.FromHours(1) } :
cacheRequestItemPolicyFactory;
this.cache = new MemoryCache(cacheName);
this.cacheRequests = new MemoryCache(cacheName + "Requests");
}
///
/// Monitoring statistics.
///
public Stats Statistics
{
get
{
lock (this.lockObject)
{
return new Stats
{
ItemCount = this.cache.GetCount(),
NumberOfOpenRequests = this.numberOfOpenRequests
};
}
}
}
internal void AddReference(string id, IVowpalWabbitMarshalAction marshalAction)
{
List requests = null;
lock (this.lockObject)
{
// ignore duplicate keys - still update the sliding timer
if (this.cache.Contains(id))
return;
this.cache.Add(
new CacheItem(id, marshalAction),
this.cacheItemPolicyFactory(id));
requests = (List)this.cacheRequests.Get(id);
if (requests != null)
{
foreach (var req in requests)
req.DontDispose = true;
this.cacheRequests.Remove(id);
this.numberOfOpenRequests -= requests.Count;
}
}
// since this can be called from another thread we need to dispatch to the serializer and let it decide
// when to resolve the marshalling request
if (requests != null)
{
foreach (var req in requests)
if (req.Serializer.Resolve(() => req.Marshal(marshalAction)))
this.exampleComplete(req.Serializer);
}
}
internal void Resolve(VowpalWabbitJsonSerializer serializer, string id, Action resolveAction)
{
IVowpalWabbitMarshalAction marshal;
lock (this.lockObject)
{
marshal = (IVowpalWabbitMarshalAction)this.cache.Get(id);
if (marshal == null)
{
// not found, register for delayed completion
var requests = (List)this.cacheRequests.Get(id);
if (requests == null)
{
var policy = this.cacheRequestItemPolicyFactory(id);
// dispatch to original handler too
var removeHandler = policy.RemovedCallback;
if (removeHandler == null)
policy.RemovedCallback = this.CacheEntryRemovedCallback;
else
policy.RemovedCallback = args => { removeHandler(args); this.CacheEntryRemovedCallback(args); };
requests = new List();
this.cacheRequests.Add(
new CacheItem(id, requests),
policy);
}
requests.Add(
new IncompleteReferenceRequest
{
Serializer = serializer,
Marshal = resolveAction
});
this.numberOfOpenRequests++;
serializer.IncreaseUnresolved();
return;
}
}
// avoid extensive locking
resolveAction(marshal);
}
private void CacheEntryRemovedCallback(CacheEntryRemovedArguments arguments)
{
lock (this.lockObject)
{
var requests = (List)arguments.CacheItem.Value;
// dispose outstanding requests
foreach (var request in requests)
if (!request.DontDispose)
request.Serializer.Dispose();
}
}
private sealed class IncompleteReferenceRequest
{
internal IncompleteReferenceRequest()
{
this.DontDispose = false;
}
internal VowpalWabbitJsonSerializer Serializer { get; set; }
internal Action Marshal { get; set; }
// if we return to the handler, the handler has to dispose
internal bool DontDispose { get; set; }
}
///
/// Disposes hold resources.
///
public void Dispose()
{
if (this.cacheRequests != null)
{
// trigger dispose
foreach (var key in this.cacheRequests.Select(kv => kv.Key).ToList())
this.cacheRequests.Remove(key);
this.cacheRequests.Dispose();
this.cacheRequests = null;
}
if (this.cache != null)
{
this.cache.Dispose();
this.cache = null;
}
}
}
}