当前位置: 首页 > news >正文

做网站3年/北京网站优化多少钱

做网站3年,北京网站优化多少钱,网站根目录权限,政府网站 建设目标文章目录前言实现核心依赖配置文件实现代码前言 尽管我们经过谨慎的评估,仍然不能够保证一次计算出来来的线程池参数是合适的,那么我们是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间…

文章目录

    • 前言
    • 实现
      • 核心依赖
      • 配置文件
      • 实现代码

前言

尽管我们经过谨慎的评估,仍然不能够保证一次计算出来来的线程池参数是合适的,那么我们是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢?基于这个思考,我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下
在这里插入图片描述

实现

分布式配置中心这里使用的是Apollo

核心依赖

		<dependency><groupId>com.ctrip.framework.apollo</groupId><artifactId>apollo-client</artifactId><version>1.1.0</version></dependency><!-- apollo动态更新SpringBean需要的依赖--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-context</artifactId></dependency>

配置文件

thread.pool.corePoolSize = 8
thread.pool.maximumPoolSize = 10

实现代码

@Component(ThreadPoolManager.PACKAGE_BEAN_NAME)
@ConfigurationProperties("thread.pool")
@RefreshScope
@Data
public class ThreadPoolManager implements DisposableBean {private int corePoolSize;private int maximumPoolSize;private int queueCapactiy;private static final Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);public static final String PACKAGE_BEAN_NAME = "ThreadPoolManager";/*** 定义线程池*/private ThreadPoolExecutor threadPool;@PostConstructpublic void init() {this.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 1800, TimeUnit.SECONDS,new ArrayBlockingQueue<>(queueCapactiy),new ThreadFactoryBuilder().setDaemon(false).build(),new ThreadPoolExecutor.CallerRunsPolicy());}/*** @param name* @param runnable*/public void execute(final String name, final Runnable runnable) {execute(name, runnable, null);}/*** @param name* @param runnable* @param exceptionHandler 异常处理*/public void execute(final String name, final Runnable runnable, final Thread.UncaughtExceptionHandler exceptionHandler) {threadPool.execute(() -> {final Thread currentThread = Thread.currentThread();setName(currentThread, name);if (Objects.isNull(exceptionHandler)) {//默认只打印异常日志currentThread.setUncaughtExceptionHandler((thread, exception) -> {logger.error("线程执行异常-{}", currentThread.getName(), exception);});} else {currentThread.setUncaughtExceptionHandler(exceptionHandler);}runnable.run();});}/*** 销毁线程池* @throws Exception*/@Overridepublic void destroy() throws Exception {MoreExecutors.shutdownAndAwaitTermination(threadPool, 20, TimeUnit.SECONDS);}/*** @param callable* @return submit方式提交任务,则异常不能被异常处理器捕获,如果一个由submit提交的任务由于抛出了异常而结束,那么这个异常将被Future.get封装在ExecutionException中重新抛出*/public <T> Future<T> submit(String name, final Callable<T> callable) {return threadPool.submit(() -> {setName(Thread.currentThread(), name);return callable.call();});}private void setName(Thread thread, String name) {thread.setName(name);}@ApolloConfigChangeListener("thread-pool")private void configChangeListter(ConfigChangeEvent changeEvent) {ConfigChange corePoolSizeChange = changeEvent.getChange("thread.pool.corePoolSize");reflushThreadPoll(corePoolSizeChange, ThreadPoolExecutor::setCorePoolSize, threadPool);ConfigChange maximumPoolSizeChange = changeEvent.getChange("thread.pool.maximumPoolSize");reflushThreadPoll(maximumPoolSizeChange, ThreadPoolExecutor::setMaximumPoolSize, threadPool);logger.info("线程池已更新");}private void reflushThreadPoll(ConfigChange configChange, BiConsumer<ThreadPoolExecutor, Integer> supplier, ThreadPoolExecutor threadPool) {if (Objects.nonNull(configChange)) {String newValue = configChange.getNewValue();supplier.accept(threadPool, Integer.parseInt(newValue));}}}

对于动态更新的Queue很遗憾的是JDK原生中并没有相关实现

常用的ArrayBlockingQueue的存储方式是使用一个Object数组,而且是被声明为final,也就是说初始化之后就无法再改变它的大小
在这里插入图片描述
LinkedBlockingQueue也是如此
在这里插入图片描述
但是我们可以参考rabbitmq-client的实现 VariableLinkedBlockingQueue,这是一个LinkedBlockingQueue类的克隆,增加了一个setCapacity(int)方法,允许在使用的过程中更改容量
github地址

public class VariableLinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;/** A variant of the "two lock queue" algorithm.  The putLock gates* entry to put (and offer), and has an associated condition for* waiting puts.  Similarly for the takeLock.  The "count" field* that they both rely on is maintained as an atomic to avoid* needing to get both locks in most cases. Also, to minimize need* for puts to get takeLock and vice-versa, cascading notifies are* used. When a put notices that it has enabled at least one take,* it signals taker. That taker in turn signals others if more* items have been entered since the signal. And symmetrically for* takes signalling puts. Operations such as remove(Object) and* iterators acquire both locks.*//*** Linked list node class*/static class Node<E> {/** The item, volatile to ensure barrier separating write and read */volatile E item;Node<E> next;Node(E x) { item = x; }}/** The capacity bound, or Integer.MAX_VALUE if none */private int capacity;/** Current number of elements */private final AtomicInteger count = new AtomicInteger(0);/** Head of linked list */private transient Node<E> head;/** Tail of linked list */private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();/*** Signal a waiting take. Called only from put/offer (which do not* otherwise ordinarily lock takeLock.)*/private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}/*** Signal a waiting put. Called only from take/poll.*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}/*** Create a node and link it at end of queue* @param x the item*/private void insert(E x) {last = last.next = new Node<E>(x);}/*** Remove a node from head of queue,* @return the node*/private E extract() {Node<E> first = head.next;head = first;E x = first.item;first.item = null;return x;}/*** Lock to prevent both puts and takes.*/private void fullyLock() {putLock.lock();takeLock.lock();}/*** Unlock to allow both puts and takes.*/private void fullyUnlock() {takeLock.unlock();putLock.unlock();}/*** Creates a <tt>LinkedBlockingQueue</tt> with a capacity of* {@link Integer#MAX_VALUE}.*/public VariableLinkedBlockingQueue() {this(Integer.MAX_VALUE);}/*** Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.** @param capacity the capacity of this queue.* @throws IllegalArgumentException if <tt>capacity</tt> is not greater*         than zero.*/public VariableLinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}/*** Creates a <tt>LinkedBlockingQueue</tt> with a capacity of* {@link Integer#MAX_VALUE}, initially containing the elements of the* given collection,* added in traversal order of the collection's iterator.* @param c the collection of elements to initially contain* @throws NullPointerException if <tt>c</tt> or any element within it* is <tt>null</tt>*/public VariableLinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);for (Iterator<? extends E> it = c.iterator(); it.hasNext();)add(it.next());}// this doc comment is overridden to remove the reference to collections// greater in size than Integer.MAX_VALUE/*** Returns the number of elements in this queue.** @return  the number of elements in this queue.*/@Overridepublic int size() {return count.get();}/*** Set a new capacity for the queue. Increasing the capacity can* cause any waiting {@link #put(Object)} invocations to succeed if the new* capacity is larger than the queue.* @param capacity the new capacity for the queue*/public void setCapacity(int capacity) {final int oldCapacity = this.capacity;this.capacity = capacity;final int size = count.get();if (capacity > size && size >= oldCapacity) {signalNotFull();}}// this doc comment is a modified copy of the inherited doc comment,// without the reference to unlimited queues./*** Returns the number of elements that this queue can ideally (in* the absence of memory or resource constraints) accept without* blocking. This is always equal to the initial capacity of this queue* less the current <tt>size</tt> of this queue.* <p>Note that you <em>cannot</em> always tell if* an attempt to <tt>add</tt> an element will succeed by* inspecting <tt>remainingCapacity</tt> because it may be the* case that a waiting consumer is ready to <tt>take</tt> an* element out of an otherwise full queue.*/@Overridepublic int remainingCapacity() {return capacity - count.get();}/*** Adds the specified element to the tail of this queue, waiting if* necessary for space to become available.* @param o the element to add* @throws InterruptedException if interrupted while waiting.* @throws NullPointerException if the specified element is <tt>null</tt>.*/@Overridepublic void put(E o) throws InterruptedException {if (o == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset// local var holding count  negative to indicate failure unless set.int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from* capacity. Similarly for all other uses of count in* other wait guards.*/try {while (count.get() >= capacity)notFull.await();} catch (InterruptedException ie) {notFull.signal(); // propagate to a non-interrupted threadthrow ie;}insert(o);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}/*** Inserts the specified element at the tail of this queue, waiting if* necessary up to the specified wait time for space to become available.* @param o the element to add* @param timeout how long to wait before giving up, in units of* <tt>unit</tt>* @param unit a <tt>TimeUnit</tt> determining how to interpret the* <tt>timeout</tt> parameter* @return <tt>true</tt> if successful, or <tt>false</tt> if* the specified waiting time elapses before space is available.* @throws InterruptedException if interrupted while waiting.* @throws NullPointerException if the specified element is <tt>null</tt>.*/@Overridepublic boolean offer(E o, long timeout, TimeUnit unit)throws InterruptedException {if (o == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {for (;;) {if (count.get() < capacity) {insert(o);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();break;}if (nanos <= 0)return false;try {nanos = notFull.awaitNanos(nanos);} catch (InterruptedException ie) {notFull.signal(); // propagate to a non-interrupted threadthrow ie;}}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}/*** Inserts the specified element at the tail of this queue if possible,* returning immediately if this queue is full.** @param o the element to add.* @return <tt>true</tt> if it was possible to add the element to*         this queue, else <tt>false</tt>* @throws NullPointerException if the specified element is <tt>null</tt>*/@Overridepublic boolean offer(E o) {if (o == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() >= capacity)return false;int c = -1;final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {insert(o);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}@Overridepublic E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {try {while (count.get() == 0)notEmpty.await();} catch (InterruptedException ie) {notEmpty.signal(); // propagate to a non-interrupted threadthrow ie;}x = extract();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c >= capacity)signalNotFull();return x;}@Overridepublic E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {for (;;) {if (count.get() > 0) {x = extract();c = count.getAndDecrement();if (c > 1)notEmpty.signal();break;}if (nanos <= 0)return null;try {nanos = notEmpty.awaitNanos(nanos);} catch (InterruptedException ie) {notEmpty.signal(); // propagate to a non-interrupted threadthrow ie;}}} finally {takeLock.unlock();}if (c >= capacity)signalNotFull();return x;}@Overridepublic E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = extract();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c >= capacity)signalNotFull();return x;}@Overridepublic E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {Node<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}@Overridepublic boolean remove(Object o) {if (o == null) return false;boolean removed = false;fullyLock();try {Node<E> trail = head;Node<E> p = head.next;while (p != null) {if (o.equals(p.item)) {removed = true;break;}trail = p;p = p.next;}if (removed) {p.item = null;trail.next = p.next;if (count.getAndDecrement() >= capacity)notFull.signalAll();}} finally {fullyUnlock();}return removed;}@Overridepublic Object[] toArray() {fullyLock();try {int size = count.get();Object[] a = new Object[size];int k = 0;for (Node<E> p = head.next; p != null; p = p.next)a[k++] = p.item;return a;} finally {fullyUnlock();}}@Override@SuppressWarnings("unchecked")public <T> T[] toArray(T[] a) {fullyLock();try {int size = count.get();if (a.length < size)a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);int k = 0;for (Node<?> p = head.next; p != null; p = p.next)a[k++] = (T)p.item;return a;} finally {fullyUnlock();}}@Overridepublic String toString() {fullyLock();try {return super.toString();} finally {fullyUnlock();}}@Overridepublic void clear() {fullyLock();try {head.next = null;if (count.getAndSet(0) >= capacity)notFull.signalAll();} finally {fullyUnlock();}}@Overridepublic int drainTo(Collection<? super E> c) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();Node<E> first;fullyLock();try {first = head.next;head.next = null;if (count.getAndSet(0) >= capacity)notFull.signalAll();} finally {fullyUnlock();}// Transfer the elements outside of locksint n = 0;for (Node<E> p = first; p != null; p = p.next) {c.add(p.item);p.item = null;++n;}return n;}@Overridepublic int drainTo(Collection<? super E> c, int maxElements) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();if (maxElements <= 0)return 0;fullyLock();try {int n = 0;Node<E> p = head.next;while (p != null && n < maxElements) {c.add(p.item);p.item = null;p = p.next;++n;}if (n != 0) {head.next = p;if (count.getAndAdd(-n) >= capacity)notFull.signalAll();}return n;} finally {fullyUnlock();}}/*** Returns an iterator over the elements in this queue in proper sequence.* The returned <tt>Iterator</tt> is a "weakly consistent" iterator that* will never throw {@link java.util.ConcurrentModificationException},* and guarantees to traverse elements as they existed upon* construction of the iterator, and may (but is not guaranteed to)* reflect any modifications subsequent to construction.** @return an iterator over the elements in this queue in proper sequence.*/@Overridepublic Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {/** Basic weak-consistent iterator.  At all times hold the next* item to hand out so that if hasNext() reports true, we will* still have it to return even if lost race with a take etc.*/private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {final ReentrantLock putLock = VariableLinkedBlockingQueue.this.putLock;final ReentrantLock takeLock = VariableLinkedBlockingQueue.this.takeLock;putLock.lock();takeLock.lock();try {current = head.next;if (current != null)currentElement = current.item;} finally {takeLock.unlock();putLock.unlock();}}@Overridepublic boolean hasNext() {return current != null;}@Overridepublic E next() {final ReentrantLock putLock = VariableLinkedBlockingQueue.this.putLock;final ReentrantLock takeLock = VariableLinkedBlockingQueue.this.takeLock;putLock.lock();takeLock.lock();try {if (current == null)throw new NoSuchElementException();E x = currentElement;lastRet = current;current = current.next;if (current != null)currentElement = current.item;return x;} finally {takeLock.unlock();putLock.unlock();}}@Overridepublic void remove() {if (lastRet == null)throw new IllegalStateException();final ReentrantLock putLock = VariableLinkedBlockingQueue.this.putLock;final ReentrantLock takeLock = VariableLinkedBlockingQueue.this.takeLock;putLock.lock();takeLock.lock();try {Node<E> node = lastRet;lastRet = null;Node<E> trail = head;Node<E> p = head.next;while (p != null && p != node) {trail = p;p = p.next;}if (p == node) {p.item = null;trail.next = p.next;int c = count.getAndDecrement();if (c >= capacity)notFull.signalAll();}} finally {takeLock.unlock();putLock.unlock();}}}/*** Save the state to a stream (that is, serialize it).** @serialData The capacity is emitted (int), followed by all of* its elements (each an <tt>Object</tt>) in the proper order,* followed by a null* @param s the stream*/private void writeObject(java.io.ObjectOutputStream s)throws java.io.IOException {fullyLock();try {// Write out any hidden stuff, plus capacitys.defaultWriteObject();// Write out all elements in the proper order.for (Node<E> p = head.next; p != null; p = p.next)s.writeObject(p.item);// Use trailing null as sentinels.writeObject(null);} finally {fullyUnlock();}}/*** Reconstitute this queue instance from a stream (that is,* deserialize it).* @param s the stream*/private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {// Read in capacity, and any hidden stuffs.defaultReadObject();count.set(0);last = head = new Node<E>(null);// Read in all elements and place in queuefor (;;) {@SuppressWarnings("unchecked")E item = (E)s.readObject();if (item == null)break;add(item);}}
}
http://www.lbrq.cn/news/1529389.html

相关文章:

  • 西部数码空间的网站访问统计/中国万网域名注册官网
  • 新万网/伟哥seo博客
  • 网站技术维护/企业网络营销策划书范文
  • 可以做链接的网站/宁波seo推广服务
  • 做日签的网站/百度优化教程
  • 衡水提供网站制作公司报价/网络营销策划与推广
  • 北京网站建设北京/网站优化的方法与技巧
  • wordpress首页文章内容/在线seo推广软件
  • 青岛做网站大公司有哪些/免费发布产品信息的网站
  • 邳州城乡住房和城乡建设网站/网络整合营销4i原则是指
  • 黄埔网站建设/苏州seo关键词优化排名
  • 免费建设网站入驻/公众号seo排名优化
  • 工业产品设计就业/常用的seo查询工具
  • 英国网站域名/目前疫情最新情况
  • 网站专题怎么做/seo关键词排名软件
  • 吴苏南网站建设/宁波网站seo诊断工具
  • 网络公司手机网站模板/企业seo培训
  • 九宫格导航网站/网络营销的主要传播渠道
  • vue做网站首页/济南seo
  • 公司网站设计与管理/百度快照优化
  • 网站建设公司+长春/最有效的推广学校的方式
  • 做学校网站的内容/南宁百度seo排名
  • 企业网站首页设计原则/网站搜索引擎优化方法
  • php网站留言板模板下载/曲靖百度推广
  • 可以做结构式的网站/搜索引擎优化seo是什么
  • wordpress竖直导航栏/seoul是什么意思
  • 深圳电子商务网站 开发/新余seo
  • 玉溪网站建设网站建设/重庆好的seo平台
  • 常德房产网/权威seo技术
  • 南京公司网站建立/黄山seo排名优化技术
  • 细说数仓中不同类型的维度
  • 新手向:使用STM32通过RS485通信接口控制步进电机
  • 【C++】--指针与引用深入解析和对比
  • Python量化交易:结合爬虫与TA-Lib技术指标分析
  • 如何计算 PCM 音频与 YUV/RGB 原始视频文件大小?
  • flink+clinkhouse安装部署