guava二级缓存原理深入理解
时间:5年前 阅读:5075
1、CacheBuilder 是如何创建的?
@GwtCompatible(emulated = true)
public final class CacheBuilder<K, V> {
private static final int DEFAULT_INITIAL_CAPACITY = 16;
private static final int DEFAULT_CONCURRENCY_LEVEL = 4;
private static final int DEFAULT_EXPIRATION_NANOS = 0;
private static final int DEFAULT_REFRESH_NANOS = 0;
static final Supplier<? extends StatsCounter> NULL_STATS_COUNTER = Suppliers.ofInstance(
new StatsCounter() {
@Override
public void recordHits(int count) {}
@Override
public void recordMisses(int count) {}
@Override
public void recordLoadSuccess(long loadTime) {}
@Override
public void recordLoadException(long loadTime) {}
@Override
public void recordEviction() {}
@Override
public CacheStats snapshot() {
return EMPTY_STATS;
}
});
static final CacheStats EMPTY_STATS = new CacheStats(0, 0, 0, 0, 0, 0);
static final Supplier<StatsCounter> CACHE_STATS_COUNTER =
new Supplier<StatsCounter>() {
@Override
public StatsCounter get() {
return new SimpleStatsCounter();
}
};
enum NullListener implements RemovalListener<Object, Object> {
INSTANCE;
@Override
public void onRemoval(RemovalNotification<Object, Object> notification) {}
}
enum OneWeigher implements Weigher<Object, Object> {
INSTANCE;
@Override
public int weigh(Object key, Object value) {
return 1;
}
}
static final Ticker NULL_TICKER = new Ticker() {
@Override
public long read() {
return 0;
}
};
private static final Logger logger = Logger.getLogger(CacheBuilder.class.getName());
static final int UNSET_INT = -1;
boolean strictParsing = true;
int initialCapacity = UNSET_INT;
int concurrencyLevel = UNSET_INT;
long maximumSize = UNSET_INT;
long maximumWeight = UNSET_INT;
Weigher<? super K, ? super V> weigher;
Strength keyStrength;
Strength valueStrength;
long expireAfterWriteNanos = UNSET_INT;
long expireAfterAccessNanos = UNSET_INT;
long refreshNanos = UNSET_INT;
Equivalence<Object> keyEquivalence;
Equivalence<Object> valueEquivalence;
RemovalListener<? super K, ? super V> removalListener;
Ticker ticker;
Supplier<? extends StatsCounter> statsCounterSupplier = NULL_STATS_COUNTER;
// TODO(fry): make constructor private and update tests to use newBuilder
CacheBuilder() {}
/**
* Constructs a new {@code CacheBuilder} instance with default settings, including strong keys,
* strong values, and no automatic eviction of any kind.
*/
public static CacheBuilder<Object, Object> newBuilder() {
return new CacheBuilder<Object, Object>();
}
/**
* Sets the minimum total size for the internal hash tables. For example, if the initial capacity
* is {@code 60}, and the concurrency level is {@code 8}, then eight segments are created, each
* having a hash table of size eight. Providing a large enough estimate at construction time
* avoids the need for expensive resizing operations later, but setting this value unnecessarily
* high wastes memory.
*
* @throws IllegalArgumentException if {@code initialCapacity} is negative
* @throws IllegalStateException if an initial capacity was already set
*/
public CacheBuilder<K, V> initialCapacity(int initialCapacity) {
checkState(this.initialCapacity == UNSET_INT, "initial capacity was already set to %s",
this.initialCapacity);
checkArgument(initialCapacity >= 0);
this.initialCapacity = initialCapacity;
return this;
}
int getInitialCapacity() {
return (initialCapacity == UNSET_INT) ? DEFAULT_INITIAL_CAPACITY : initialCapacity;
}
/**
* Guides the allowed concurrency among update operations. Used as a hint for internal sizing. The
* table is internally partitioned to try to permit the indicated number of concurrent updates
* without contention. Because assignment of entries to these partitions is not necessarily
* uniform, the actual concurrency observed may vary. Ideally, you should choose a value to
* accommodate as many threads as will ever concurrently modify the table. Using a significantly
* higher value than you need can waste space and time, and a significantly lower value can lead
* to thread contention. But overestimates and underestimates within an order of magnitude do not
* usually have much noticeable impact. A value of one permits only one thread to modify the cache
* at a time, but since read operations and cache loading computations can proceed concurrently,
* this still yields higher concurrency than full synchronization.
*
* <p> Defaults to 4. <b>Note:</b>The default may change in the future. If you care about this
* value, you should always choose it explicitly.
*
* <p>The current implementation uses the concurrency level to create a fixed number of hashtable
* segments, each governed by its own write lock. The segment lock is taken once for each explicit
* write, and twice for each cache loading computation (once prior to loading the new value,
* and once after loading completes). Much internal cache management is performed at the segment
* granularity. For example, access queues and write queues are kept per segment when they are
* required by the selected eviction algorithm. As such, when writing unit tests it is not
* uncommon to specify {@code concurrencyLevel(1)} in order to achieve more deterministic eviction
* behavior.
*
* <p>Note that future implementations may abandon segment locking in favor of more advanced
* concurrency controls.
*
* @throws IllegalArgumentException if {@code concurrencyLevel} is nonpositive
* @throws IllegalStateException if a concurrency level was already set
*/
public CacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) {
checkState(this.concurrencyLevel == UNSET_INT, "concurrency level was already set to %s",
this.concurrencyLevel);
checkArgument(concurrencyLevel > 0);
this.concurrencyLevel = concurrencyLevel;
return this;
}
int getConcurrencyLevel() {
return (concurrencyLevel == UNSET_INT) ? DEFAULT_CONCURRENCY_LEVEL : concurrencyLevel;
}
/**
* Specifies the maximum number of entries the cache may contain. Note that the cache <b>may evict
* an entry before this limit is exceeded</b>. As the cache size grows close to the maximum, the
* cache evicts entries that are less likely to be used again. For example, the cache may evict an
* entry because it hasn't been used recently or very often.
*
* <p>When {@code size} is zero, elements will be evicted immediately after being loaded into the
* cache. This can be useful in testing, or to disable caching temporarily without a code change.
*
* <p>This feature cannot be used in conjunction with {@link #maximumWeight}.
*
* @param size the maximum size of the cache
* @throws IllegalArgumentException if {@code size} is negative
* @throws IllegalStateException if a maximum size or weight was already set
*/
public CacheBuilder<K, V> maximumSize(long size) {
checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s",
this.maximumSize);
checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s",
this.maximumWeight);
checkState(this.weigher == null, "maximum size can not be combined with weigher");
checkArgument(size >= 0, "maximum size must not be negative");
this.maximumSize = size;
return this;
}
/**
* Specifies the maximum weight of entries the cache may contain. Weight is determined using the
* {@link Weigher} specified with {@link #weigher}, and use of this method requires a
* corresponding call to {@link #weigher} prior to calling {@link #build}.
*
* <p>Note that the cache <b>may evict an entry before this limit is exceeded</b>. As the cache
* size grows close to the maximum, the cache evicts entries that are less likely to be used
* again. For example, the cache may evict an entry because it hasn't been used recently or very
* often.
*
* <p>When {@code weight} is zero, elements will be evicted immediately after being loaded into
* cache. This can be useful in testing, or to disable caching temporarily without a code
* change.
*
* <p>Note that weight is only used to determine whether the cache is over capacity; it has no
* effect on selecting which entry should be evicted next.
*
* <p>This feature cannot be used in conjunction with {@link #maximumSize}.
*
* @param weight the maximum total weight of entries the cache may contain
* @throws IllegalArgumentException if {@code weight} is negative
* @throws IllegalStateException if a maximum weight or size was already set
* @since 11.0
*/
@GwtIncompatible("To be supported")
public CacheBuilder<K, V> maximumWeight(long weight) {
checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s",
this.maximumWeight);
checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s",
this.maximumSize);
this.maximumWeight = weight;
checkArgument(weight >= 0, "maximum weight must not be negative");
return this;
}
/**
* Specifies the weigher to use in determining the weight of entries. Entry weight is taken
* into consideration by {@link #maximumWeight(long)} when determining which entries to evict, and
* use of this method requires a corresponding call to {@link #maximumWeight(long)} prior to
* calling {@link #build}. Weights are measured and recorded when entries are inserted into the
* cache, and are thus effectively static during the lifetime of a cache entry.
*
* <p>When the weight of an entry is zero it will not be considered for size-based eviction
* (though it still may be evicted by other means).
*
* <p><b>Important note:</b> Instead of returning <em>this</em> as a {@code CacheBuilder}
* instance, this method returns {@code CacheBuilder<K1, V1>}. From this point on, either the
* original reference or the returned reference may be used to complete configuration and build
* the cache, but only the "generic" one is type-safe. That is, it will properly prevent you from
* building caches whose key or value types are incompatible with the types accepted by the
* weigher already provided; the {@code CacheBuilder} type cannot do this. For best results,
* simply use the standard method-chaining idiom, as illustrated in the documentation at top,
* configuring a {@code CacheBuilder} and building your {@link Cache} all in a single statement.
*
* <p><b>Warning:</b> if you ignore the above advice, and use this {@code CacheBuilder} to build
* a cache whose key or value type is incompatible with the weigher, you will likely experience
* a {@link ClassCastException} at some <i>undefined</i> point in the future.
*
* @param weigher the weigher to use in calculating the weight of cache entries
* @throws IllegalArgumentException if {@code size} is negative
* @throws IllegalStateException if a maximum size was already set
* @since 11.0
*/
@GwtIncompatible("To be supported")
public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> weigher(
Weigher<? super K1, ? super V1> weigher) {
checkState(this.weigher == null);
if (strictParsing) {
checkState(this.maximumSize == UNSET_INT, "weigher can not be combined with maximum size",
this.maximumSize);
}
// safely limiting the kinds of caches this can produce
@SuppressWarnings("unchecked")
CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;
me.weigher = checkNotNull(weigher);
return me;
}
// Make a safe contravariant cast now so we don't have to do it over and over.
@SuppressWarnings("unchecked")
<K1 extends K, V1 extends V> Weigher<K1, V1> getWeigher() {
return (Weigher<K1, V1>) MoreObjects.firstNonNull(weigher, OneWeigher.INSTANCE);
}
/**
* Specifies that each entry should be automatically removed from the cache once a fixed duration
* has elapsed after the entry's creation, or the most recent replacement of its value.
*
* <p>When {@code duration} is zero, this method hands off to
* {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum
* size or weight. This can be useful in testing, or to disable caching temporarily without a code
* change.
*
* <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or
* write operations. Expired entries are cleaned up as part of the routine maintenance described
* in the class javadoc.
*
* @param duration the length of time after an entry is created that it should be automatically
* removed
* @param unit the unit that {@code duration} is expressed in
* @throws IllegalArgumentException if {@code duration} is negative
* @throws IllegalStateException if the time to live or time to idle was already set
*/
public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
checkState(expireAfterWriteNanos == UNSET_INT, "expireAfterWrite was already set to %s ns",
expireAfterWriteNanos);
checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
this.expireAfterWriteNanos = unit.toNanos(duration);
return this;
}
long getExpireAfterWriteNanos() {
return (expireAfterWriteNanos == UNSET_INT) ? DEFAULT_EXPIRATION_NANOS : expireAfterWriteNanos;
}
/**
* Specifies that each entry should be automatically removed from the cache once a fixed duration
* has elapsed after the entry's creation, the most recent replacement of its value, or its last
* access. Access time is reset by all cache read and write operations (including
* {@code Cache.asMap().get(Object)} and {@code Cache.asMap().put(K, V)}), but not by operations
* on the collection-views of {@link Cache#asMap}.
*
* <p>When {@code duration} is zero, this method hands off to
* {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum
* size or weight. This can be useful in testing, or to disable caching temporarily without a code
* change.
*
* <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or
* write operations. Expired entries are cleaned up as part of the routine maintenance described
* in the class javadoc.
*
* @param duration the length of time after an entry is last accessed that it should be
* automatically removed
* @param unit the unit that {@code duration} is expressed in
* @throws IllegalArgumentException if {@code duration} is negative
* @throws IllegalStateException if the time to idle or time to live was already set
*/
public CacheBuilder<K, V> expireAfterAccess(long duration, TimeUnit unit) {
checkState(expireAfterAccessNanos == UNSET_INT, "expireAfterAccess was already set to %s ns",
expireAfterAccessNanos);
checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
this.expireAfterAccessNanos = unit.toNanos(duration);
return this;
}
long getExpireAfterAccessNanos() {
return (expireAfterAccessNanos == UNSET_INT)
? DEFAULT_EXPIRATION_NANOS : expireAfterAccessNanos;
}
/**
* Specifies that active entries are eligible for automatic refresh once a fixed duration has
* elapsed after the entry's creation, or the most recent replacement of its value. The semantics
* of refreshes are specified in {@link LoadingCache#refresh}, and are performed by calling
* {@link CacheLoader#reload}.
*
* <p>As the default implementation of {@link CacheLoader#reload} is synchronous, it is
* recommended that users of this method override {@link CacheLoader#reload} with an asynchronous
* implementation; otherwise refreshes will be performed during unrelated cache read and write
* operations.
*
* <p>Currently automatic refreshes are performed when the first stale request for an entry
* occurs. The request triggering refresh will make a blocking call to {@link CacheLoader#reload}
* and immediately return the new value if the returned future is complete, and the old value
* otherwise.
*
* <p><b>Note:</b> <i>all exceptions thrown during refresh will be logged and then swallowed</i>.
*
* @param duration the length of time after an entry is created that it should be considered
* stale, and thus eligible for refresh
* @param unit the unit that {@code duration} is expressed in
* @throws IllegalArgumentException if {@code duration} is negative
* @throws IllegalStateException if the refresh interval was already set
* @since 11.0
*/
@Beta
@GwtIncompatible("To be supported (synchronously).")
public CacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) {
checkNotNull(unit);
checkState(refreshNanos == UNSET_INT, "refresh was already set to %s ns", refreshNanos);
checkArgument(duration > 0, "duration must be positive: %s %s", duration, unit);
this.refreshNanos = unit.toNanos(duration);
return this;
}
long getRefreshNanos() {
return (refreshNanos == UNSET_INT) ? DEFAULT_REFRESH_NANOS : refreshNanos;
}
/**
* Specifies a nanosecond-precision time source for use in determining when entries should be
* expired. By default, {@link System#nanoTime} is used.
*
* <p>The primary intent of this method is to facilitate testing of caches which have been
* configured with {@link #expireAfterWrite} or {@link #expireAfterAccess}.
*
* @throws IllegalStateException if a ticker was already set
*/
public CacheBuilder<K, V> ticker(Ticker ticker) {
checkState(this.ticker == null);
this.ticker = checkNotNull(ticker);
return this;
}
Ticker getTicker(boolean recordsTime) {
if (ticker != null) {
return ticker;
}
return recordsTime ? Ticker.systemTicker() : NULL_TICKER;
}
/**
* Specifies a listener instance that caches should notify each time an entry is removed for any
* {@linkplain RemovalCause reason}. Each cache created by this builder will invoke this listener
* as part of the routine maintenance described in the class documentation above.
*
* <p><b>Warning:</b> after invoking this method, do not continue to use <i>this</i> cache
* builder reference; instead use the reference this method <i>returns</i>. At runtime, these
* point to the same instance, but only the returned reference has the correct generic type
* information so as to ensure type safety. For best results, use the standard method-chaining
* idiom illustrated in the class documentation above, configuring a builder and building your
* cache in a single statement. Failure to heed this advice can result in a {@link
* ClassCastException} being thrown by a cache operation at some <i>undefined</i> point in the
* future.
*
* <p><b>Warning:</b> any exception thrown by {@code listener} will <i>not</i> be propagated to
* the {@code Cache} user, only logged via a {@link Logger}.
*
* @return the cache builder reference that should be used instead of {@code this} for any
* remaining configuration and cache building
* @throws IllegalStateException if a removal listener was already set
*/
@CheckReturnValue
public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> removalListener(
RemovalListener<? super K1, ? super V1> listener) {
checkState(this.removalListener == null);
// safely limiting the kinds of caches this can produce
@SuppressWarnings("unchecked")
CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;
me.removalListener = checkNotNull(listener);
return me;
}
// Make a safe contravariant cast now so we don't have to do it over and over.
@SuppressWarnings("unchecked")
<K1 extends K, V1 extends V> RemovalListener<K1, V1> getRemovalListener() {
return (RemovalListener<K1, V1>)
MoreObjects.firstNonNull(removalListener, NullListener.INSTANCE);
}
/**
* Enable the accumulation of {@link CacheStats} during the operation of the cache. Without this
* {@link Cache#stats} will return zero for all statistics. Note that recording stats requires
* bookkeeping to be performed with each operation, and thus imposes a performance penalty on
* cache operation.
*
* @since 12.0 (previously, stats collection was automatic)
*/
public CacheBuilder<K, V> recordStats() {
statsCounterSupplier = CACHE_STATS_COUNTER;
return this;
}
boolean isRecordingStats() {
return statsCounterSupplier == CACHE_STATS_COUNTER;
}
Supplier<? extends StatsCounter> getStatsCounterSupplier() {
return statsCounterSupplier;
}
/**
* Builds a cache, which either returns an already-loaded value for a given key or atomically
* computes or retrieves it using the supplied {@code CacheLoader}. If another thread is currently
* loading the value for this key, simply waits for that thread to finish and returns its
* loaded value. Note that multiple threads can concurrently load values for distinct keys.
*
* <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be
* invoked again to create multiple independent caches.
*
* @param loader the cache loader used to obtain new values
* @return a cache having the requested features
*/
public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
CacheLoader<? super K1, V1> loader) {
checkWeightWithWeigher();
return new LocalCache.LocalLoadingCache<K1, V1>(this, loader);
}
/**
* Builds a cache which does not automatically load values when keys are requested.
*
* <p>Consider {@link #build(CacheLoader)} instead, if it is feasible to implement a
* {@code CacheLoader}.
*
* <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be
* invoked again to create multiple independent caches.
*
* @return a cache having the requested features
* @since 11.0
*/
public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
checkWeightWithWeigher();
checkNonLoadingCache();
return new LocalCache.LocalManualCache<K1, V1>(this);
}
}
如上,使用建造者模式创建 LoadingCache<K, V> 缓存; 设置好 最大值,过期时间等参数;
2、如何获取一个guava缓存?
其实就是一个get方法而已! stringDbCacheContainer.get(key);
// com.google.common.cache.LocalCache
// LoadingCache methods
@Override
public V get(K key) throws ExecutionException {
// 两种数据来源,一是直接获取,二是调用 load() 方法加载数据
return localCache.getOrLoad(key);
}
// com.google.common.cache.LocalCache
V getOrLoad(K key) throws ExecutionException {
return get(key, defaultLoader);
}
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));
// 还记得 ConcurrentHashMap 吗? 先定位segment, 再定位 entry
return segmentFor(hash).get(key, hash, loader);
}
Segment<K, V> segmentFor(int hash) {
// TODO(fry): Lazily create segments?
return segments[(hash >>> segmentShift) & segmentMask];
}
// 核心取数逻辑在此get 中
// loading
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
// 如果存在值,则依据 ticker 进行判断是否过期,从而直接返回值,具体过期逻辑稍后再说
long now = map.ticker.read();
V value = getLiveValue(e, now);
if (value != null) {
recordRead(e, now);
statsCounter.recordHits(1);
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// 初次加载或过期之后,进入加载逻辑,重要
// at this point e is either null or expired;
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}
// static class Segment<K, V> extends ReentrantLock
// 整个 Segment 继承了 ReentrantLock, 所以 LocalCache 的锁是依赖于 ReentrantLock 实现的
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)
throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
// 在更新值前,先把过期数据清除
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// 处理 hash 碰撞时的链表查询
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) {
enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
// 如果是第一次加载,则先创建 Entry, 进入 load() 逻辑
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
// 同步加载数据源值, 从 loader 中处理
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
// 记录未命中计数,默认为空
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
// 如果有线程正在更新缓存,则等待结果即可,具体实现就是调用 Future.get()
return waitForLoadingValue(e, key, valueReference);
}
}
// 加载原始值
// at most one of loadSync.loadAsync may be called for any given LoadingValueReference
V loadSync(K key, int hash, LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) throws ExecutionException {
// loadingValueReference中保存了回调引用,加载原始值
ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
// 存储数据入缓存,以便下次使用
return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
}
// 从 loader 中加载数据,
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
stopwatch.start();
V previousValue = oldValue.get();
try {
// 如果原来没有值,则直接加载后返回
if (previousValue == null) {
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
// 否则一般为无过期时间的数据进行 reload, 如果 reload() 的结果为空,则直接返回
// 须重写 reload() 实现
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// To avoid a race, make sure the refreshed value is set into loadingValueReference
// *before* returning newValue from the cache query.
return Futures.transform(newValue, new Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
});
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return setException(t) ? futureValue : fullyFailedFuture(t);
}
}
// com.google.common.util.concurrent.Uninterruptibles
/**
* Waits uninterruptibly for {@code newValue} to be loaded, and then records loading stats.
*/
V getAndRecordStats(K key, int hash, LoadingValueReference<K, V> loadingValueReference,
ListenableFuture<V> newValue) throws ExecutionException {
V value = null;
try {
// 同步等待加载结果,注意,此处返回值不允许为null, 否则将报异常,这可能是为了规避缓存攻击漏洞吧
value = getUninterruptibly(newValue);
if (value == null) {
throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
}
// 加载成功记录,此处扩展点,默认为空
statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());
// 最后将值存入缓存容器中,返回(论hash的重要性)
storeLoadedValue(key, hash, loadingValueReference, value);
return value;
} finally {
if (value == null) {
statsCounter.recordLoadException(loadingValueReference.elapsedNanos());
removeLoadingValue(key, hash, loadingValueReference);
}
}
}
/**
* Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
* To get uninterruptibility and remove checked exceptions, see
* {@link Futures#getUnchecked}.
*
* <p>If instead, you wish to treat {@link InterruptedException} uniformly
* with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
* or {@link Futures#makeChecked}.
*
* @throws ExecutionException if the computation threw an exception
* @throws CancellationException if the computation was cancelled
*/
public static <V> V getUninterruptibly(Future<V> future)
throws ExecutionException {
boolean interrupted = false;
try {
while (true) {
try {
return future.get();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
如上,就是获取一个缓存的过程。总结下来就是:
1. 先使用hash定位到 segment中,然后尝试直接到 map中获取结果;
2. 如果没有找到或者已过期,则调用客户端的load()方法加载原始数据;
3. 将结果存入 segment.map 中,本地缓存生效;
4. 记录命中情况,读取计数;
3、如何处理过期?
其实刚刚我们在看get()方法时,就看到了一些端倪。
要确认两点: 1. 是否有创建异步清理线程进行过期数据清理? 2. 清理过程中,原始数据如何自处?
其实guava的清理时机是在加载数据之前进行的!
// com.google.common.cache.LocalCache
// static class Segment<K, V> extends ReentrantLock
// 整个 Segment 继承了 ReentrantLock, 所以 LocalCache 的锁是依赖于 ReentrantLock 实现的
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)
throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
// 在更新值前,先把过期数据清除
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// 处理 hash 碰撞时的链表查询
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) {
enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
// 如果是第一次加载,则先创建 Entry, 进入 load() 逻辑
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
// 同步加载数据源值, 从 loader 中处理
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
// 记录未命中计数,默认为空
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
// 我们来细看下 preWriteCleanup(now); 是如何清理过期数据的
/**
* Performs routine cleanup prior to executing a write. This should be called every time a
* write thread acquires the segment lock, immediately after acquiring the lock.
*
* <p>Post-condition: expireEntries has been run.
*/
@GuardedBy("this")
void preWriteCleanup(long now) {
runLockedCleanup(now);
}
void runLockedCleanup(long now) {
// 再次确保清理数据时,锁是存在的
if (tryLock()) {
try {
// 当存在特殊类型数据时,可以先进行清理
drainReferenceQueues();
// 清理过期数据,按时间清理
expireEntries(now); // calls drainRecencyQueue
// 读计数清零
readCount.set(0);
} finally {
unlock();
}
}
}
/**
* Drain the key and value reference queues, cleaning up internal entries containing garbage
* collected keys or values.
*/
@GuardedBy("this")
void drainReferenceQueues() {
if (map.usesKeyReferences()) {
drainKeyReferenceQueue();
}
if (map.usesValueReferences()) {
drainValueReferenceQueue();
}
}
@GuardedBy("this")
void expireEntries(long now) {
// 更新最近的访问队列
drainRecencyQueue();
ReferenceEntry<K, V> e;
// 从头部开始取元素,如果过期就进行清理
// 写队列超时: 清理, 访问队列超时: 清理
while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
}
@Override
public ReferenceEntry<K, V> peek() {
ReferenceEntry<K, V> next = head.getNextInAccessQueue();
return (next == head) ? null : next;
}
// 清理指定类型的元素,如 过期元素
@GuardedBy("this")
boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
// 调用 removeValueFromChain, 清理具体元素
ReferenceEntry<K, V> newFirst = removeValueFromChain(
first, e, e.getKey(), hash, e.getValueReference(), cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
}
@GuardedBy("this")
@Nullable
ReferenceEntry<K, V> removeValueFromChain(ReferenceEntry<K, V> first,
ReferenceEntry<K, V> entry, @Nullable K key, int hash, ValueReference<K, V> valueReference,
RemovalCause cause) {
enqueueNotification(key, hash, valueReference, cause);
// 清理两队列
writeQueue.remove(entry);
accessQueue.remove(entry);
if (valueReference.isLoading()) {
valueReference.notifyNewValue(null);
return first;
} else {
return removeEntryFromChain(first, entry);
}
}
@GuardedBy("this")
@Nullable
ReferenceEntry<K, V> removeEntryFromChain(ReferenceEntry<K, V> first,
ReferenceEntry<K, V> entry) {
int newCount = count;
// 普通情况,则直接返回 next 元素链即可
// 针对有first != entry 的情况,则依次将 first 移动到队尾,然后跳到下一个元素返回
ReferenceEntry<K, V> newFirst = entry.getNext();
for (ReferenceEntry<K, V> e = first; e != entry; e = e.getNext()) {
// 将first链表倒转到 newFirst 尾部
ReferenceEntry<K, V> next = copyEntry(e, newFirst);
if (next != null) {
newFirst = next;
} else {
removeCollectedEntry(e);
newCount--;
}
}
this.count = newCount;
return newFirst;
}
到此,我们就完整的看到了一个 key 的过期处理流程了。总结就是:
1. 在读取的时候,触发清理操作;
2. 使用 ReentrantLock 来进行线程安全的更新;
3. 读取计数器清零,元素数量减少;
3. 怎样主动放入一个缓存?
这个和普通的map的put方法一样,简单看下即可!
// com.google.common.cache.LocalCache$LocalManualCache
@Override
public void put(K key, V value) {
localCache.put(key, value);
}
// com.google.common.cache.LocalCache
@Override
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}
// com.google.common.cache.LocalCache$Segment
@Nullable
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count + 1;
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
++modCount;
if (valueReference.isActive()) {
enqueueNotification(key, hash, valueReference, RemovalCause.COLLECTED);
setValue(e, key, value, now);
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
evictEntries();
return null;
} else if (onlyIfAbsent) {
// Mimic
// "if (!map.containsKey(key)) ...
// else return map.get(key);
recordLockedRead(e, now);
return entryValue;
} else {
// clobber existing entry, count remains unchanged
++modCount;
enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED);
setValue(e, key, value, now);
evictEntries();
return entryValue;
}
}
}
// Create a new entry.
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
evictEntries();
return null;
} finally {
unlock();
postWriteCleanup();
}
}
就这样,基于guava的二级缓存功能就搞定了。
本站声明:网站内容来源于网络,如有侵权,请联系我们https://www.qiquanji.com,我们将及时处理。
微信扫码关注
更新实时通知
网友评论